Skip to content

farrow-pipeline 完整 API 参考

目录

  1. 核心概念
  2. 完整导出清单
  3. Pipeline 类型系统
  4. Context 上下文系统
  5. Container 容器系统
  6. 异步追踪 (AsyncTracer)
  7. 工具函数
  8. 类型工具
  9. 常见问题

核心概念

设计哲学

farrow-pipeline 是一个类型安全的中间件管道系统,核心设计理念:

  1. 函数式组合 - 中间件通过函数组合形成处理链
  2. 类型安全 - 全程 TypeScript 类型推导
  3. 上下文隔离 - 每次运行独立的上下文容器
  4. 异步友好 - 基于 AsyncLocalStorage 的异步上下文传递
  5. 可组合性 - Pipeline 本身可作为中间件嵌套使用

执行模型

输入 → 中间件1 → 中间件2 → ... → 中间件N → 输出
         ↓          ↓                ↓
       next()     next()          返回值

洋葱模型

typescript
// 执行顺序示例
pipeline.use((input, next) => {
  console.log('1: 进入')       // 1
  const result = next(input)
  console.log('4: 退出')       // 4
  return result
})

pipeline.use((input, next) => {
  console.log('2: 进入')       // 2
  const result = next(input)
  console.log('3: 退出')       // 3
  return result
})

// 输出顺序: 1 → 2 → 3 → 4

完整导出清单

主入口 (farrow-pipeline)

typescript
// Pipeline 创建函数
export { createPipeline, createAsyncPipeline }

// Context 系统
export { createContext, createContainer }

// Container 工具
export { useContainer, runWithContainer }

// 类型断言
export { assertContainer, assertContext }

// 类型判断
export { isContext, isContainer, isPipeline }

// 工具函数
export { usePipeline, getMiddleware }

// 类型导出
export type {
  // Pipeline 类型
  Pipeline,
  AsyncPipeline,
  PipelineOptions,
  RunPipelineOptions,
  PipelineInput,
  PipelineOutput,

  // 中间件类型
  Middleware,
  Middlewares,
  MiddlewareInput,
  MiddlewareType,
  ThunkMiddlewareInput,

  // Context 类型
  Context,
  ContextStorage,
  Container,

  // 工具类型
  Next,
  MaybeAsync,
}

AsyncTracer 入口 (farrow-pipeline/asyncTracerImpl.node)

typescript
// Node.js 环境
export { enable, disable }

AsyncTracer 入口 (farrow-pipeline/asyncTracerImpl.browser)

typescript
// 浏览器环境 - 抛出错误(不支持)
throw new Error(`No Implementation`)

Pipeline 类型系统

createPipeline - 创建同步 Pipeline

函数签名:

typescript
function createPipeline<I, O>(options?: PipelineOptions): Pipeline<I, O>

类型参数:

  • I - 输入类型
  • O - 输出类型

参数:

typescript
type PipelineOptions = {
  contexts?: ContextStorage  // 预设的上下文存储
}

type ContextStorage = {
  [key: string]: Context<any>
}

返回值:

typescript
type Pipeline<I = unknown, O = unknown> = {
  [PipelineSymbol]: true

  // 添加中间件,支持链式调用
  use: (...inputs: MiddlewareInput<I, O>[]) => Pipeline<I, O>

  // 运行 Pipeline
  run: (input: I, options?: RunPipelineOptions<I, O>) => O

  // 作为中间件使用
  middleware: Middleware<I, O>
}

type RunPipelineOptions<I = unknown, O = unknown> = {
  container?: Container      // 指定容器
  onLast?: (input: I) => O  // 最后中间件的回调
}

基础示例:

typescript
import { createPipeline } from 'farrow-pipeline'

// 创建简单的数值处理管道
const pipeline = createPipeline<number, string>()

pipeline.use((input, next) => {
  console.log('输入:', input)
  return next(input * 2)
})

pipeline.use((input) => {
  return `结果: ${input}`
})

const result = pipeline.run(5)  // "结果: 10"

预设上下文示例:

typescript
import { createContext, createPipeline } from 'farrow-pipeline'

// 创建上下文
const UserContext = createContext({ name: 'Guest' })

// 创建带预设上下文的 Pipeline
const pipeline = createPipeline<string, string>({
  contexts: {
    user: UserContext.create({ name: 'Admin' })
  }
})

pipeline.use((input, next) => {
  const user = UserContext.get()  // { name: 'Admin' }
  return next(`${user.name}: ${input}`)
})

const result = pipeline.run('Hello')  // "Admin: Hello"

链式调用示例:

typescript
const pipeline = createPipeline<number, number>()
  .use((x, next) => next(x + 1))
  .use((x, next) => next(x * 2))
  .use((x) => x - 3)

const result = pipeline.run(5)  // (5 + 1) * 2 - 3 = 9

createAsyncPipeline - 创建异步 Pipeline

函数签名:

typescript
function createAsyncPipeline<I, O>(options?: PipelineOptions): AsyncPipeline<I, O>

返回值:

typescript
type AsyncPipeline<I = unknown, O = unknown> = Pipeline<I, MaybeAsync<O>> & {
  // 懒加载中间件
  useLazy: (thunk: ThunkMiddlewareInput<I, O>) => AsyncPipeline<I, O>
}

type MaybeAsync<T> = T | Promise<T>

type ThunkMiddlewareInput<I, O> = () => MaybeAsync<MiddlewareInput<I, MaybeAsync<O>>>

说明:

  • 支持异步中间件(返回 Promise)
  • 支持懒加载中间件(按需加载)
  • 自动处理 Promise 链

异步中间件示例:

typescript
const pipeline = createAsyncPipeline<string, User>()

// 异步数据库查询
pipeline.use(async (userId, next) => {
  const user = await db.findUser(userId)
  if (!user) throw new Error('User not found')
  return next(user)
})

// 异步日志记录
pipeline.use(async (user, next) => {
  await logger.log(`User ${user.id} accessed`)
  return next(user)
})

// 最终处理
pipeline.use((user) => {
  return user
})

// 运行(返回 Promise)
const user = await pipeline.run('user123')

懒加载中间件示例:

typescript
const pipeline = createAsyncPipeline<Request, Response>()

// 懒加载重型依赖
pipeline.useLazy(async () => {
  // 只在第一次需要时加载
  const { imageProcessor } = await import('./heavy-image-lib')

  return (req, next) => {
    if (req.url.startsWith('/image')) {
      const processed = imageProcessor(req)
      return next(processed)
    }
    return next(req)
  }
})

// 条件懒加载
pipeline.useLazy(() => {
  if (process.env.NODE_ENV === 'development') {
    return async (req, next) => {
      console.log('Dev mode:', req)
      return next(req)
    }
  }
  // 生产环境跳过
  return (req, next) => next(req)
})

懒加载工作原理:

  1. 首次调用: 执行 thunk 函数获取中间件
  2. 缓存: 将加载的中间件缓存到变量
  3. 后续调用: 直接使用缓存的中间件
  4. Promise 处理: 自动处理同步和异步加载
typescript
// 内部实现概览
useLazy: (thunk) => {
  let middleware: Middleware | null = null
  let promise: Promise<void> | null = null

  pipeline.use((input, next) => {
    // 已加载,直接使用
    if (middleware) return next(input)

    // 首次加载
    if (!promise) {
      promise = Promise.resolve(thunk()).then((result) => {
        middleware = getMiddleware(result)
      })
    }

    // 等待加载完成
    return promise.then(() => next(input))
  })

  // 调用加载完成的中间件
  pipeline.use((input, next) => {
    if (!middleware) throw new Error(`Failed to load middleware`)
    return middleware(input, next)
  })

  return asyncPipeline
}

usePipeline - 继承容器运行 Pipeline

函数签名:

typescript
function usePipeline<I, O>(pipeline: Pipeline<I, O>):
  (input: I, options?: RunPipelineOptions<I, O>) => O

说明:

  • 在中间件中运行另一个 Pipeline
  • 自动继承当前容器,保持上下文传递
  • 避免直接调用 pipeline.run() 导致的上下文丢失

为什么需要 usePipeline?

typescript
// ❌ 错误:直接运行会创建新容器,上下文丢失
pipeline.use((input, next) => {
  const result = subPipeline.run(input)  // 新容器
  return next(result)
})

// ✅ 正确:继承当前容器
pipeline.use((input, next) => {
  const runSubPipeline = usePipeline(subPipeline)
  const result = runSubPipeline(input)  // 继承容器
  return next(result)
})

完整示例:

typescript
import { createContext, createPipeline, usePipeline } from 'farrow-pipeline'

// 创建上下文
const UserContext = createContext<User | null>(null)

// 认证管道
const authPipeline = createPipeline<Request, User>()
authPipeline.use((req) => {
  const user = authenticate(req)
  UserContext.set(user)  // 设置上下文
  return user
})

// 业务管道
const businessPipeline = createPipeline<User, Response>()
businessPipeline.use((user) => {
  const currentUser = UserContext.get()  // 获取上下文
  return { status: 200, user: currentUser }
})

// 主管道
const mainPipeline = createPipeline<Request, Response>()

mainPipeline.use((req, next) => {
  // 使用 usePipeline 保持上下文
  const runAuth = usePipeline(authPipeline)
  const runBusiness = usePipeline(businessPipeline)

  const user = runAuth(req)              // 继承容器
  const response = runBusiness(user)      // 继承容器
  return response                         // 上下文正确传递
})

// 运行
const response = mainPipeline.run(request)

错误处理组合:

typescript
mainPipeline.use((input, next) => {
  const runValidation = usePipeline(validationPipeline)
  const runProcessing = usePipeline(processingPipeline)

  try {
    const validated = runValidation(input)
    const result = runProcessing(validated)
    return { status: 200, result }
  } catch (error) {
    return { status: 400, error: error.message }
  }
})

中间件类型

Middleware - 中间件函数类型

类型定义:

typescript
type Middleware<I = unknown, O = unknown> = (input: I, next: Next<I, O>) => O

type Next<I = unknown, O = unknown> = (input?: I) => O

说明:

  • input - 当前中间件的输入
  • next - 调用下一个中间件的函数
  • 返回值 - 中间件的输出,必须与 Pipeline 的输出类型匹配

中间件编写规范:

typescript
// 1. 转换中间件 - 修改输入传递给下一个
pipeline.use((input, next) => {
  const transformed = transform(input)
  return next(transformed)
})

// 2. 增强中间件 - 处理结果
pipeline.use((input, next) => {
  const result = next(input)
  return enhance(result)
})

// 3. 拦截中间件 - 条件执行
pipeline.use((input, next) => {
  if (!validate(input)) {
    return errorResponse
  }
  return next(input)
})

// 4. 包装中间件 - 前后处理
pipeline.use((input, next) => {
  before(input)
  const result = next(input)
  after(result)
  return result
})

// 5. 终止中间件 - 不调用 next
pipeline.use((input) => {
  return finalResult
})

MiddlewareInput - 中间件输入类型

类型定义:

typescript
type MiddlewareInput<I = unknown, O = unknown> =
  | Middleware<I, O>
  | { middleware: Middleware<I, O> }

说明:

  • 支持直接传入中间件函数
  • 支持传入包含 middleware 属性的对象

使用示例:

typescript
// 方式 1: 直接传入函数
pipeline.use((input, next) => next(input))

// 方式 2: 传入对象
const middlewareObj = {
  middleware: (input, next) => next(input)
}
pipeline.use(middlewareObj)

// 方式 3: 传入 Pipeline(Pipeline 实现了 middleware 属性)
const subPipeline = createPipeline()
pipeline.use(subPipeline)  // 等同于 pipeline.use(subPipeline.middleware)

getMiddleware - 提取中间件函数

函数签名:

typescript
function getMiddleware<I, O>(input: MiddlewareInput<I, O>): Middleware<I, O>

说明:

  • MiddlewareInput 中提取实际的中间件函数
  • 处理函数和对象两种形式

使用示例:

typescript
const fn = (input, next) => next(input)
const obj = { middleware: fn }

getMiddleware(fn)   // 返回 fn
getMiddleware(obj)  // 返回 obj.middleware

Pipeline 方法详解

pipeline.use - 添加中间件

方法签名:

typescript
use(...inputs: MiddlewareInput<I, O>[]): Pipeline<I, O>

说明:

  • 接受任意数量的中间件
  • 支持链式调用
  • 中间件按添加顺序执行

示例:

typescript
// 单个中间件
pipeline.use((input, next) => next(input))

// 多个中间件
pipeline.use(
  middleware1,
  middleware2,
  middleware3
)

// 链式调用
pipeline
  .use(middleware1)
  .use(middleware2)
  .use(middleware3)

// 嵌套 Pipeline
const subPipeline = createPipeline()
pipeline.use(subPipeline)

// 条件中间件
if (enableAuth) {
  pipeline.use(authMiddleware)
}

pipeline.run - 运行 Pipeline

方法签名:

typescript
run(input: I, options?: RunPipelineOptions<I, O>): O

参数:

typescript
type RunPipelineOptions<I = unknown, O = unknown> = {
  container?: Container      // 指定运行容器
  onLast?: (input: I) => O  // 最后中间件回调
}

说明:

  • input - Pipeline 的输入
  • options.container - 指定容器(默认使用 Pipeline 创建时的容器)
  • options.onLast - 当所有中间件都调用 next() 时执行的回调

基础示例:

typescript
const pipeline = createPipeline<number, number>()
  .use((x, next) => next(x + 1))
  .use((x, next) => next(x * 2))

// 基础运行
const result = pipeline.run(5)  // (5 + 1) * 2 = 12

使用 onLast 示例:

typescript
const pipeline = createPipeline<string, string>()
  .use((input, next) => {
    console.log('中间件执行')
    return next(input)
  })

// 所有中间件都调用了 next,触发 onLast
const result = pipeline.run('test', {
  onLast: (input) => {
    console.log('到达末尾:', input)
    return `默认: ${input}`
  }
})
// 输出:
// 中间件执行
// 到达末尾: test
// 返回: "默认: test"

指定容器示例:

typescript
const UserContext = createContext<User | null>(null)

const pipeline = createPipeline<string, string>()
  .use((input) => {
    const user = UserContext.get()
    return `${user?.name}: ${input}`
  })

// 创建自定义容器
const container = createContainer({
  user: UserContext.create({ name: 'Alice' })
})

// 使用自定义容器运行
const result = pipeline.run('Hello', { container })
// "Alice: Hello"

pipeline.middleware - 作为中间件使用

属性签名:

typescript
middleware: Middleware<I, O>

说明:

  • 将当前 Pipeline 转换为中间件函数
  • 可以嵌套到其他 Pipeline 中
  • 自动继承父 Pipeline 的容器

示例:

typescript
const subPipeline = createPipeline<number, number>()
  .use((x, next) => next(x + 1))
  .use((x, next) => next(x * 2))

const mainPipeline = createPipeline<number, string>()
  .use(subPipeline.middleware)  // 嵌套 subPipeline
  .use((x) => `结果: ${x}`)

const result = mainPipeline.run(5)  // "结果: 12"

等价写法:

typescript
// 方式 1: 使用 .middleware 属性
mainPipeline.use(subPipeline.middleware)

// 方式 2: 直接传入 Pipeline(内部会调用 .middleware)
mainPipeline.use(subPipeline)

// 方式 3: 使用 usePipeline(推荐,明确表达意图)
mainPipeline.use((input, next) => {
  const runSubPipeline = usePipeline(subPipeline)
  const result = runSubPipeline(input)
  return next(result)
})

Pipeline 判断函数

isPipeline - 判断是否为 Pipeline

函数签名:

typescript
function isPipeline(obj: any): obj is Pipeline

说明:

  • 类型守卫函数
  • 检查对象是否具有 Pipeline 标识符

使用示例:

typescript
import { isPipeline, createPipeline } from 'farrow-pipeline'

const pipeline = createPipeline()
const fn = () => {}

if (isPipeline(pipeline)) {
  // TypeScript 知道 pipeline 是 Pipeline 类型
  pipeline.run(input)
}

// 动态处理
function handle(handler: Pipeline | Function, input: any) {
  if (isPipeline(handler)) {
    return handler.run(input)
  } else {
    return handler(input)
  }
}

类型提取工具

PipelineInput - 提取输入类型

类型定义:

typescript
type PipelineInput<T extends Pipeline> = T extends Pipeline<infer I> ? I : never

使用示例:

typescript
const pipeline = createPipeline<number, string>()

type Input = PipelineInput<typeof pipeline>  // number

PipelineOutput - 提取输出类型

类型定义:

typescript
type PipelineOutput<T extends Pipeline> = T extends Pipeline<any, infer O> ? O : never

使用示例:

typescript
const pipeline = createPipeline<number, string>()

type Output = PipelineOutput<typeof pipeline>  // string

Context 上下文系统

createContext - 创建上下文

函数签名:

typescript
function createContext<T>(defaultValue: T): Context<T>

类型定义:

typescript
type Context<T = any> = {
  id: symbol                              // 上下文唯一标识
  [ContextSymbol]: T                      // 默认值(内部使用)

  create: (value: T) => Context<T>        // 创建新实例
  get: () => T                            // 获取当前值
  set: (value: T) => void                 // 设置当前值
  assert: () => Exclude<T, undefined | null>  // 断言非空
}

说明:

  • 创建类型安全的上下文对象
  • 每个 Context 有唯一的 id
  • 提供 CRUD 操作方法

基础示例:

typescript
import { createContext, createPipeline } from 'farrow-pipeline'

// 创建上下文
const UserContext = createContext<{ name: string } | null>(null)

const pipeline = createPipeline<string, string>()

// 设置上下文
pipeline.use((input, next) => {
  UserContext.set({ name: 'Alice' })
  return next(input)
})

// 读取上下文
pipeline.use((input) => {
  const user = UserContext.get()
  return `${user?.name}: ${input}`
})

const result = pipeline.run('Hello')  // "Alice: Hello"

Context 方法详解:

get() - 获取上下文值

typescript
get(): T

说明:

  • 从当前容器中获取上下文值
  • 如果容器中不存在,返回默认值

示例:

typescript
const CountContext = createContext(0)

pipeline.use((input, next) => {
  const count = CountContext.get()  // 0(默认值)
  CountContext.set(count + 1)
  return next(input)
})

pipeline.use((input) => {
  const count = CountContext.get()  // 1
  return `计数: ${count}`
})

set(value) - 设置上下文值

typescript
set(value: T): void

说明:

  • 在当前容器中设置上下文值
  • 会覆盖之前的值

示例:

typescript
const UserContext = createContext<User | null>(null)

pipeline.use((input, next) => {
  const user = authenticate(input)
  UserContext.set(user)  // 设置用户
  return next(input)
})

assert() - 断言非空

typescript
assert(): Exclude<T, undefined | null>

说明:

  • 断言上下文值非空
  • 如果为 nullundefined,抛出错误
  • 返回非空类型

示例:

typescript
const UserContext = createContext<User | null>(null)

pipeline.use((input) => {
  try {
    const user = UserContext.assert()  // 断言非空
    // user 类型为 User(不包含 null)
    return user.id
  } catch (error) {
    return 'Unauthorized'
  }
})

create(value) - 创建新实例

typescript
create(value: T): Context<T>

说明:

  • 创建一个新的 Context 实例
  • 新实例具有相同的 id,但不同的默认值
  • 用于创建预设容器

示例:

typescript
const ConfigContext = createContext({ debug: false })

// 创建开发环境配置
const devConfig = ConfigContext.create({ debug: true })

// 创建生产环境配置
const prodConfig = ConfigContext.create({ debug: false })

// 创建不同环境的容器
const devContainer = createContainer({
  config: devConfig
})

const prodContainer = createContainer({
  config: prodConfig
})

上下文隔离示例:

typescript
const CounterContext = createContext(0)

const pipeline = createPipeline<string, string>()

pipeline.use((input, next) => {
  const count = CounterContext.get()
  CounterContext.set(count + 1)
  return next(`${input}:${CounterContext.get()}`)
})

// 并发执行,每个都有独立的计数器
const results = await Promise.all([
  pipeline.run('A'),  // "A:1"
  pipeline.run('B'),  // "B:1"
  pipeline.run('C')   // "C:1"
])
// 每次运行都从默认值 0 开始

多上下文示例:

typescript
const UserContext = createContext<User | null>(null)
const RequestIdContext = createContext('')
const LoggerContext = createContext<Logger>(consoleLogger)

const pipeline = createPipeline<Request, Response>()

// 设置多个上下文
pipeline.use((req, next) => {
  UserContext.set(authenticate(req))
  RequestIdContext.set(generateId())
  LoggerContext.set(createLogger())
  return next(req)
})

// 使用多个上下文
pipeline.use((req) => {
  const user = UserContext.get()
  const requestId = RequestIdContext.get()
  const logger = LoggerContext.get()

  logger.log(`User ${user?.id} - Request ${requestId}`)

  return { user, requestId }
})

Context 判断函数

isContext - 判断是否为 Context

函数签名:

typescript
function isContext(input: any): input is Context

使用示例:

typescript
import { isContext, createContext } from 'farrow-pipeline'

const ctx = createContext(0)

if (isContext(ctx)) {
  // TypeScript 知道 ctx 是 Context 类型
  ctx.get()
}

assertContext - 断言为 Context

函数签名:

typescript
function assertContext(input: any): asserts input is Context

说明:

  • TypeScript 断言函数
  • 如果不是 Context,抛出错误

使用示例:

typescript
function useContextValue(ctx: any) {
  assertContext(ctx)
  // 此后 TypeScript 知道 ctx 是 Context 类型
  return ctx.get()
}

Container 容器系统

createContainer - 创建容器

函数签名:

typescript
function createContainer(contextStorage?: ContextStorage): Container

类型定义:

typescript
type Container = {
  [ContainerSymbol]: true
  read: <V>(context: Context<V>) => V
  write: <V>(context: Context<V>, value: V) => void
}

type ContextStorage = {
  [key: string]: Context<any>
}

说明:

  • 容器是 Context 的存储容器
  • 每次 pipeline.run() 都会创建独立的容器
  • 容器内的上下文互不干扰

基础示例:

typescript
import { createContext, createContainer } from 'farrow-pipeline'

const UserContext = createContext<User | null>(null)

// 创建自定义容器
const container = createContainer({
  user: UserContext.create({ id: 1, name: 'Alice' })
})

// 从容器读取
const user = container.read(UserContext)  // { id: 1, name: 'Alice' }

// 写入容器
container.write(UserContext, { id: 2, name: 'Bob' })

测试场景示例:

typescript
const DatabaseContext = createContext<Database>(productionDB)
const LoggerContext = createContext<Logger>(consoleLogger)

// 创建测试容器
const testContainer = createContainer({
  db: DatabaseContext.create(mockDatabase),
  logger: LoggerContext.create(silentLogger)
})

// 在测试中使用
const result = pipeline.run(testInput, { container: testContainer })

多环境配置示例:

typescript
const ConfigContext = createContext({ env: 'development' })

const environments = {
  development: createContainer({
    config: ConfigContext.create({ env: 'development', debug: true })
  }),
  production: createContainer({
    config: ConfigContext.create({ env: 'production', debug: false })
  }),
  test: createContainer({
    config: ConfigContext.create({ env: 'test', debug: false })
  })
}

const currentContainer = environments[process.env.NODE_ENV || 'development']

const result = pipeline.run(input, { container: currentContainer })

Container 方法详解

container.read - 读取上下文

方法签名:

typescript
read<V>(context: Context<V>): V

说明:

  • 从容器中读取指定上下文的值
  • 如果容器中不存在,返回 Context 的默认值

示例:

typescript
const UserContext = createContext<User | null>(null)

const container = createContainer({
  user: UserContext.create({ id: 1, name: 'Alice' })
})

const user = container.read(UserContext)  // { id: 1, name: 'Alice' }

container.write - 写入上下文

方法签名:

typescript
write<V>(context: Context<V>, value: V): void

说明:

  • 向容器中写入指定上下文的值
  • 会覆盖之前的值

示例:

typescript
const CountContext = createContext(0)

const container = createContainer()

container.write(CountContext, 10)
const count = container.read(CountContext)  // 10

Container 工具函数

useContainer - 获取当前容器

函数签名:

typescript
function useContainer(): Container

说明:

  • 获取当前异步追踪上下文中的容器
  • 必须在 Pipeline 运行时调用
  • 用于在中间件中访问容器

使用示例:

typescript
import { useContainer } from 'farrow-pipeline'

pipeline.use((input, next) => {
  const container = useContainer()

  // 直接操作容器
  const user = container.read(UserContext)
  container.write(LogContext, logger)

  return next(input)
})

runWithContainer - 在指定容器中运行函数

函数签名:

typescript
function runWithContainer<F extends (...args: any) => any>(
  f: F,
  container: Container
): ReturnType<F>

说明:

  • 在指定容器的上下文中运行函数
  • 函数内部可以使用 useContainer() 获取该容器
  • 用于手动控制容器作用域

使用示例:

typescript
import { runWithContainer } from 'farrow-pipeline'

const UserContext = createContext<User | null>(null)

const container = createContainer({
  user: UserContext.create({ id: 1, name: 'Alice' })
})

// 在容器中运行函数
const result = runWithContainer(() => {
  const user = UserContext.get()  // { id: 1, name: 'Alice' }
  return `User: ${user?.name}`
}, container)

isContainer - 判断是否为 Container

函数签名:

typescript
function isContainer(input: any): input is Container

使用示例:

typescript
import { isContainer, createContainer } from 'farrow-pipeline'

const container = createContainer()

if (isContainer(container)) {
  // TypeScript 知道 container 是 Container 类型
  container.read(UserContext)
}

assertContainer - 断言为 Container

函数签名:

typescript
function assertContainer(input: any): asserts input is Container

使用示例:

typescript
function useContainerValue(container: any, context: Context) {
  assertContainer(container)
  // 此后 TypeScript 知道 container 是 Container 类型
  return container.read(context)
}

异步追踪 (AsyncTracer)

概述

AsyncTracer 是 farrow-pipeline 的核心机制,用于在异步操作中保持上下文传递。

核心作用:

  1. 异步上下文传递 - 在 async/await、Promise、setTimeout 等异步操作中保持上下文
  2. 请求级隔离 - 每个请求/运行都有独立的容器,避免并发污染
  3. 自动传播 - 无需手动传递容器,框架自动管理

实现原理:

  • Node.js: 基于 AsyncLocalStorage (async_hooks)
  • 浏览器: 不支持(会抛出错误)

Node.js 环境 (farrow-pipeline/asyncTracerImpl.node)

enable - 启用异步追踪

函数签名:

typescript
function enable(): void

说明:

  • 启用 Node.js 环境的异步上下文追踪
  • farrow-http 会自动调用,无需手动启用
  • 纯 farrow-pipeline 环境需手动调用

使用示例:

typescript
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'

// 应用启动时调用
asyncTracerImpl.enable()

// 现在可以在异步操作中使用 Context
const pipeline = createPipeline()
pipeline.use(async (input, next) => {
  UserContext.set(user)

  await delay(1000)  // 异步等待

  const user = UserContext.get()  // ✅ 正确获取
  return next(input)
})

disable - 禁用异步追踪

函数签名:

typescript
function disable(): void

说明:

  • 禁用异步追踪
  • 用于清理资源

使用示例:

typescript
// 应用关闭时
process.on('exit', () => {
  asyncTracerImpl.disable()
})

浏览器环境 (farrow-pipeline/asyncTracerImpl.browser)

说明:

  • 浏览器环境完全不支持 farrow-pipeline
  • 导入该模块会直接抛出错误: No Implementation
  • farrow-pipeline 仅适用于 Node.js 服务端环境

工具函数

Koa 兼容

compose - Koa 中间件组合器

函数签名:

typescript
function compose<T, U>(
  middlewares: KoaMiddleware<T, U>[]
): (context: T, next?: KoaMiddleware<T, U>) => Promise<U | void | undefined>

type KoaMiddleware<T, U = void> = Middleware<T, MiddlewareReturnType<U>>
type MiddlewareReturnType<T> = void | undefined | T | Promise<T | void | undefined>

说明:

  • 将 Koa 风格的中间件数组组合为一个函数
  • 与 Koa 的 compose 行为一致
  • 内部使用 farrow-pipeline 实现

使用示例:

typescript
import { compose } from 'farrow-pipeline'

const middleware1 = async (ctx, next) => {
  ctx.state.count = 1
  await next()
}

const middleware2 = async (ctx, next) => {
  ctx.state.count += 1
  await next()
}

const composed = compose([middleware1, middleware2])

const ctx = { state: {} }
await composed(ctx)
console.log(ctx.state.count)  // 2

类型工具

farrow-pipeline 导出了一些类型工具,用于从现有类型中提取信息:

PipelineInput<T> - 提取 Pipeline 输入类型

从 Pipeline 类型中提取输入类型:

typescript
const pipeline = createPipeline<number, string>()
type Input = PipelineInput<typeof pipeline>  // number

PipelineOutput<T> - 提取 Pipeline 输出类型

从 Pipeline 类型中提取输出类型:

typescript
const pipeline = createPipeline<number, string>()
type Output = PipelineOutput<typeof pipeline>  // string

MiddlewareType<T> - 提取中间件类型

从中间件输入中提取中间件类型:

typescript
type MyMiddlewareInput = Middleware<Request, Response> | { middleware: Middleware<Request, Response> }
type ExtractedMiddleware = MiddlewareType<MyMiddlewareInput>  // Middleware<Request, Response>

MaybeAsync<T> - 可能异步的类型

表示一个值可能是同步的,也可能是 Promise:

typescript
type MaybeAsync<T> = T | Promise<T>

// 使用示例
function getValue(): MaybeAsync<number> {
  // 可以返回同步值
  return 42
  // 或异步值
  return Promise.resolve(42)
}

常见问题

Q: Context 在异步操作中丢失?

A: 确保启用了 AsyncTracer:

typescript
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'

// Node.js 环境必需
asyncTracerImpl.enable()

// 现在异步操作中 Context 会自动传递
pipeline.use(async (input, next) => {
  UserContext.set(user)

  await delay(1000)  // 异步等待

  const user = UserContext.get()  // ✅ 正确获取
  return next(input)
})

Q: 何时使用 usePipeline?

A: 当你需要在中间件中运行子 Pipeline 并保持上下文时:

typescript
// ❌ 错误:上下文丢失
mainPipeline.use((input, next) => {
  const result = subPipeline.run(input)  // 新容器
  return next(result)
})

// ✅ 正确:继承上下文
mainPipeline.use((input, next) => {
  const runSubPipeline = usePipeline(subPipeline)
  const result = runSubPipeline(input)  // 继承容器
  return next(result)
})

Q: Pipeline 可以重复使用吗?

A: 可以,每次 run() 都是独立执行:

typescript
const pipeline = createPipeline<number, number>()
  .use(x => x + 1)

// 多次使用同一个 Pipeline
const result1 = pipeline.run(1)  // 2
const result2 = pipeline.run(5)  // 6

// 并发执行也是安全的
await Promise.all([
  pipeline.run(1),
  pipeline.run(2),
  pipeline.run(3)
])

Q: 如何调试 Pipeline?

A: 使用日志中间件:

typescript
const debugMiddleware = (input: any, next: Next<any>) => {
  console.log('输入:', input)
  const result = next(input)
  console.log('输出:', result)
  return result
}

pipeline.use(debugMiddleware)

Q: 浏览器环境支持吗?

A: 不支持。farrow-pipeline 依赖 Node.js 的 AsyncLocalStorage(async_hooks),浏览器环境无法使用异步上下文追踪功能,导入会抛出错误。farrow-pipeline 仅适用于 Node.js 服务端环境。


总结

farrow-pipeline 提供了强大而灵活的中间件管道系统,让你能够:

  • 类型安全 - 全程 TypeScript 类型推导
  • 易于组合 - Pipeline 本身可作为中间件嵌套
  • 上下文隔离 - 每次运行独立的容器
  • 异步友好 - 基于 AsyncLocalStorage 的异步上下文传递
  • 可测试 - 容器机制便于单元测试
  • 高性能 - 轻量级实现,性能优异

通过这些特性,farrow-pipeline 能够帮你构建类型安全、可维护、高性能的应用程序。

这是一个第三方 Farrow 文档站 | 用 ❤️ 和 TypeScript 构建