Skip to content

异步与组合

异步 Pipeline

创建异步 Pipeline

加一个 Async 前缀,就这么简单:

typescript
import { createAsyncPipeline } from 'farrow-pipeline'

const pipeline = createAsyncPipeline<Input, Output>()

异步中间件

直接用 async/await,就像写普通异步代码一样:

typescript
pipeline.use(async (input, next) => {
  const data = await fetchFromDatabase(input)
  return next(data)
})

pipeline.use(async (input, next) => {
  await saveToCache(input)
  return next(input)
})

// 运行返回 Promise
const result = await pipeline.run(input)

💡 提示:同步和异步中间件可以混合使用!框架会自动处理 Promise 链。

混合同步和异步

typescript
const pipeline = createAsyncPipeline<number, string>()

// 同步中间件
pipeline.use((input, next) => {
  console.log('同步:', input)
  return next(input * 2)
})

// 异步中间件
pipeline.use(async (input, next) => {
  await delay(1000)
  console.log('异步:', input)
  return next(input)
})

// 同步中间件
pipeline.use((input) => {
  return `结果: ${input}`
})

// 运行
const result = await pipeline.run(5)

🚀 懒加载中间件

想象你有个图片处理库,体积巨大(10MB),但只有 1% 的请求会用到。怎么办?

传统做法(糟糕)

typescript
import { heavyImageLib } from './10mb-monster'  // 😭 启动慢 10 秒

pipeline.use((req, next) => {
  if (req.url.startsWith('/image')) {
    return next(heavyImageLib.process(req))
  }
  return next(req)
})

懒加载方式(完美)

typescript
pipeline.useLazy(async () => {
  // 🎉 只在第一次需要时加载!
  const { heavyImageLib } = await import('./10mb-monster')

  return (req, next) => {
    if (req.url.startsWith('/image')) {
      return next(heavyImageLib.process(req))
    }
    return next(req)
  }
})

工作原理

  1. 第一次调用:执行 thunk 函数,加载模块,缓存中间件
  2. 后续调用:直接使用缓存的中间件

条件懒加载

typescript
pipeline.useLazy(() => {
  if (process.env.NODE_ENV === 'development') {
    // 开发环境:加载调试中间件
    return (req, next) => {
      console.log('🐛 调试信息:', req)
      return next(req)
    }
  }
  // 生产环境:跳过
  return (req, next) => next(req)
})

懒加载异步模块

typescript
pipeline.useLazy(async () => {
  // 异步加载重型依赖
  const [imageLib, videoLib] = await Promise.all([
    import('./image-processor'),
    import('./video-processor')
  ])

  return (req, next) => {
    if (req.type === 'image') {
      return next(imageLib.process(req))
    }
    if (req.type === 'video') {
      return next(videoLib.process(req))
    }
    return next(req)
  }
})

🔌 启用异步追踪

farrow-pipeline 的 Context 系统依赖 Node.js 的 AsyncLocalStorage(async_hooks)来在异步操作中传递上下文。

什么时候需要手动启用?

typescript
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'

// ✅ 如果你直接使用 farrow-pipeline(不使用 farrow-http)
// 需要在应用启动时调用一次
asyncTracerImpl.enable()

// 现在异步操作中 Context 会正确传递
const pipeline = createAsyncPipeline()

pipeline.use(async (input, next) => {
  UserContext.set(user)

  await delay(1000)  // 异步等待

  const user = UserContext.get()  // ✅ 能拿到!
  return next(input)
})

什么时候不需要?

使用 farrow-http:框架会自动调用 enable(),无需手动处理

⚠️ 浏览器环境:完全不支持(会报错 "No Implementation")

如果忘记启用会怎样?

typescript
// ❌ 忘记调用 asyncTracerImpl.enable()

UserContext.set(user)
await someAsyncOperation()
const user = UserContext.get()  // ❌ undefined!Context 丢失了!

禁用异步追踪

typescript
// 应用关闭时
process.on('exit', () => {
  asyncTracerImpl.disable()
})

💡 小贴士:如果你不确定是否需要调用,那就调用一下!重复调用 enable() 是安全的。

Pipeline 组合

嵌套 Pipeline

Pipeline 本身可以作为中间件嵌套!但要注意类型必须完全一致

typescript
// ✅ 正确示例:类型完全一致的嵌套
const step1 = createPipeline<number, number>()
  .use((x) => x + 1)

const step2 = createPipeline<number, number>()
  .use((x) => x * 2)

const step3 = createPipeline<number, number>()
  .use((x) => x - 3)

// 组合:类型都是 <number, number>,可以直接嵌套
const mathPipeline = createPipeline<number, number>()
  .use(step1)  // 5 → 6
  .use(step2)  // 6 → 12
  .use(step3)  // 12 → 9

mathPipeline.run(5)  // 9

❌ 错误示例:类型不一致

typescript
const mathPipeline = createPipeline<number, number>()
  .use((x) => x * 2)

// ❌ 错误!mathPipeline 输出 number,但 mainPipeline 期望所有中间件输出 string
const mainPipeline = createPipeline<number, string>()
  .use(mathPipeline)      // 类型错误!
  .use((x) => `结果: ${x}`)

✅ 正确做法:使用普通中间件转换类型

typescript
const mainPipeline = createPipeline<number, string>()
  .use((x, next) => {
    // 在中间件内部处理数学运算
    const result = x * 2
    return next(result)  // 继续传递
  })
  .use((x) => `结果: ${x}`)  // 最终转换为 string

mainPipeline.run(5)  // "结果: 10"

⚠️ 重要:Pipeline 嵌套时,Pipeline<I, O> 的 I 与 O 必须一致!

usePipeline - 保持上下文传递

问题场景

typescript
// ❌ 错误:直接 run() 会创建新容器,Context 丢失
pipeline.use((input, next) => {
  const result = subPipeline.run(input)  // 新容器!UserContext.get() 会拿不到
  return next(result)
})

正确姿势

typescript
import { usePipeline } from 'farrow-pipeline'

// ✅ 正确:usePipeline 继承当前容器
pipeline.use((input, next) => {
  const runSubPipeline = usePipeline(subPipeline)
  const result = runSubPipeline(input)  // 继承容器,Context 正常传递
  return next(result)
})

完整例子

typescript
import { createContext, usePipeline } from 'farrow-pipeline'

const UserContext = createContext<User | null>(null)

// 认证管道
const authPipeline = createPipeline<Request, User>()
  .use((req) => {
    const user = authenticate(req)
    UserContext.set(user)  // 设置 Context
    return user
  })

// 业务管道
const businessPipeline = createPipeline<User, Response>()
  .use((user) => {
    const currentUser = UserContext.get()  // 获取 Context
    return { status: 200, user: currentUser }
  })

// 主管道
const mainPipeline = createPipeline<Request, Response>()
  .use((req, next) => {
    const runAuth = usePipeline(authPipeline)
    const runBusiness = usePipeline(businessPipeline)

    const user = runAuth(req)
    const response = runBusiness(user)
    return response  // ✅ Context 正确传递
  })

实战模式

模式 1: 异步错误处理

typescript
const pipeline = createAsyncPipeline<Request, Response>()

pipeline.use(async (req, next) => {
  try {
    return await next(req)
  } catch (error) {
    console.error('Pipeline 错误:', error)
    return { status: 500, error: 'Internal Server Error' }
  }
})

pipeline.use(async (req) => {
  const data = await riskyDatabaseOperation(req)
  return { status: 200, data }
})

模式 2: 超时控制

typescript
function timeout<I, O>(ms: number) {
  return (input: I, next: Next<I, O>): Promise<O> => {
    return Promise.race([
      next(input),
      new Promise<O>((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), ms)
      )
    ])
  }
}

// 使用
const pipeline = createAsyncPipeline<Request, Response>()
  .use(timeout(5000))  // 5 秒超时
  .use(async (req) => {
    const data = await slowOperation(req)
    return { status: 200, data }
  })

模式 3: 重试机制

typescript
function retry<I, O>(maxRetries: number) {
  return async (input: I, next: Next<I, O>): Promise<O> => {
    let lastError: Error | null = null

    for (let i = 0; i < maxRetries; i++) {
      try {
        return await next(input)
      } catch (error) {
        lastError = error as Error
        console.log(`重试 ${i + 1}/${maxRetries}`)
        await delay(1000 * (i + 1))  // 递增延迟
      }
    }

    throw lastError
  }
}

// 使用
const pipeline = createAsyncPipeline<Request, Response>()
  .use(retry(3))
  .use(async (req) => {
    return await unreliableOperation(req)
  })

这是一个第三方 Farrow 文档站 | 用 ❤️ 和 TypeScript 构建