异步与组合
异步 Pipeline
创建异步 Pipeline
加一个 Async 前缀,就这么简单:
typescript
import { createAsyncPipeline } from 'farrow-pipeline'
const pipeline = createAsyncPipeline<Input, Output>()异步中间件
直接用 async/await,就像写普通异步代码一样:
typescript
pipeline.use(async (input, next) => {
const data = await fetchFromDatabase(input)
return next(data)
})
pipeline.use(async (input, next) => {
await saveToCache(input)
return next(input)
})
// 运行返回 Promise
const result = await pipeline.run(input)💡 提示:同步和异步中间件可以混合使用!框架会自动处理 Promise 链。
混合同步和异步
typescript
const pipeline = createAsyncPipeline<number, string>()
// 同步中间件
pipeline.use((input, next) => {
console.log('同步:', input)
return next(input * 2)
})
// 异步中间件
pipeline.use(async (input, next) => {
await delay(1000)
console.log('异步:', input)
return next(input)
})
// 同步中间件
pipeline.use((input) => {
return `结果: ${input}`
})
// 运行
const result = await pipeline.run(5)🚀 懒加载中间件
想象你有个图片处理库,体积巨大(10MB),但只有 1% 的请求会用到。怎么办?
传统做法(糟糕):
typescript
import { heavyImageLib } from './10mb-monster' // 😭 启动慢 10 秒
pipeline.use((req, next) => {
if (req.url.startsWith('/image')) {
return next(heavyImageLib.process(req))
}
return next(req)
})懒加载方式(完美):
typescript
pipeline.useLazy(async () => {
// 🎉 只在第一次需要时加载!
const { heavyImageLib } = await import('./10mb-monster')
return (req, next) => {
if (req.url.startsWith('/image')) {
return next(heavyImageLib.process(req))
}
return next(req)
}
})工作原理:
- 第一次调用:执行 thunk 函数,加载模块,缓存中间件
- 后续调用:直接使用缓存的中间件
条件懒加载
typescript
pipeline.useLazy(() => {
if (process.env.NODE_ENV === 'development') {
// 开发环境:加载调试中间件
return (req, next) => {
console.log('🐛 调试信息:', req)
return next(req)
}
}
// 生产环境:跳过
return (req, next) => next(req)
})懒加载异步模块
typescript
pipeline.useLazy(async () => {
// 异步加载重型依赖
const [imageLib, videoLib] = await Promise.all([
import('./image-processor'),
import('./video-processor')
])
return (req, next) => {
if (req.type === 'image') {
return next(imageLib.process(req))
}
if (req.type === 'video') {
return next(videoLib.process(req))
}
return next(req)
}
})🔌 启用异步追踪
farrow-pipeline 的 Context 系统依赖 Node.js 的 AsyncLocalStorage(async_hooks)来在异步操作中传递上下文。
什么时候需要手动启用?
typescript
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'
// ✅ 如果你直接使用 farrow-pipeline(不使用 farrow-http)
// 需要在应用启动时调用一次
asyncTracerImpl.enable()
// 现在异步操作中 Context 会正确传递
const pipeline = createAsyncPipeline()
pipeline.use(async (input, next) => {
UserContext.set(user)
await delay(1000) // 异步等待
const user = UserContext.get() // ✅ 能拿到!
return next(input)
})什么时候不需要?
✅ 使用 farrow-http:框架会自动调用
enable(),无需手动处理⚠️ 浏览器环境:完全不支持(会报错 "No Implementation")
如果忘记启用会怎样?
typescript
// ❌ 忘记调用 asyncTracerImpl.enable()
UserContext.set(user)
await someAsyncOperation()
const user = UserContext.get() // ❌ undefined!Context 丢失了!禁用异步追踪
typescript
// 应用关闭时
process.on('exit', () => {
asyncTracerImpl.disable()
})💡 小贴士:如果你不确定是否需要调用,那就调用一下!重复调用
enable()是安全的。
Pipeline 组合
嵌套 Pipeline
Pipeline 本身可以作为中间件嵌套!但要注意类型必须完全一致:
typescript
// ✅ 正确示例:类型完全一致的嵌套
const step1 = createPipeline<number, number>()
.use((x) => x + 1)
const step2 = createPipeline<number, number>()
.use((x) => x * 2)
const step3 = createPipeline<number, number>()
.use((x) => x - 3)
// 组合:类型都是 <number, number>,可以直接嵌套
const mathPipeline = createPipeline<number, number>()
.use(step1) // 5 → 6
.use(step2) // 6 → 12
.use(step3) // 12 → 9
mathPipeline.run(5) // 9❌ 错误示例:类型不一致
typescript
const mathPipeline = createPipeline<number, number>()
.use((x) => x * 2)
// ❌ 错误!mathPipeline 输出 number,但 mainPipeline 期望所有中间件输出 string
const mainPipeline = createPipeline<number, string>()
.use(mathPipeline) // 类型错误!
.use((x) => `结果: ${x}`)✅ 正确做法:使用普通中间件转换类型
typescript
const mainPipeline = createPipeline<number, string>()
.use((x, next) => {
// 在中间件内部处理数学运算
const result = x * 2
return next(result) // 继续传递
})
.use((x) => `结果: ${x}`) // 最终转换为 string
mainPipeline.run(5) // "结果: 10"⚠️ 重要:Pipeline 嵌套时,
Pipeline<I, O>的 I 与 O 必须一致!
usePipeline - 保持上下文传递
问题场景:
typescript
// ❌ 错误:直接 run() 会创建新容器,Context 丢失
pipeline.use((input, next) => {
const result = subPipeline.run(input) // 新容器!UserContext.get() 会拿不到
return next(result)
})正确姿势:
typescript
import { usePipeline } from 'farrow-pipeline'
// ✅ 正确:usePipeline 继承当前容器
pipeline.use((input, next) => {
const runSubPipeline = usePipeline(subPipeline)
const result = runSubPipeline(input) // 继承容器,Context 正常传递
return next(result)
})完整例子
typescript
import { createContext, usePipeline } from 'farrow-pipeline'
const UserContext = createContext<User | null>(null)
// 认证管道
const authPipeline = createPipeline<Request, User>()
.use((req) => {
const user = authenticate(req)
UserContext.set(user) // 设置 Context
return user
})
// 业务管道
const businessPipeline = createPipeline<User, Response>()
.use((user) => {
const currentUser = UserContext.get() // 获取 Context
return { status: 200, user: currentUser }
})
// 主管道
const mainPipeline = createPipeline<Request, Response>()
.use((req, next) => {
const runAuth = usePipeline(authPipeline)
const runBusiness = usePipeline(businessPipeline)
const user = runAuth(req)
const response = runBusiness(user)
return response // ✅ Context 正确传递
})实战模式
模式 1: 异步错误处理
typescript
const pipeline = createAsyncPipeline<Request, Response>()
pipeline.use(async (req, next) => {
try {
return await next(req)
} catch (error) {
console.error('Pipeline 错误:', error)
return { status: 500, error: 'Internal Server Error' }
}
})
pipeline.use(async (req) => {
const data = await riskyDatabaseOperation(req)
return { status: 200, data }
})模式 2: 超时控制
typescript
function timeout<I, O>(ms: number) {
return (input: I, next: Next<I, O>): Promise<O> => {
return Promise.race([
next(input),
new Promise<O>((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), ms)
)
])
}
}
// 使用
const pipeline = createAsyncPipeline<Request, Response>()
.use(timeout(5000)) // 5 秒超时
.use(async (req) => {
const data = await slowOperation(req)
return { status: 200, data }
})模式 3: 重试机制
typescript
function retry<I, O>(maxRetries: number) {
return async (input: I, next: Next<I, O>): Promise<O> => {
let lastError: Error | null = null
for (let i = 0; i < maxRetries; i++) {
try {
return await next(input)
} catch (error) {
lastError = error as Error
console.log(`重试 ${i + 1}/${maxRetries}`)
await delay(1000 * (i + 1)) // 递增延迟
}
}
throw lastError
}
}
// 使用
const pipeline = createAsyncPipeline<Request, Response>()
.use(retry(3))
.use(async (req) => {
return await unreliableOperation(req)
})