farrow-pipeline 完整 API 参考
目录
核心概念
设计哲学
farrow-pipeline 是一个类型安全的中间件管道系统,核心设计理念:
- 函数式组合 - 中间件通过函数组合形成处理链
- 类型安全 - 全程 TypeScript 类型推导
- 上下文隔离 - 每次运行独立的上下文容器
- 异步友好 - 基于 AsyncLocalStorage 的异步上下文传递
- 可组合性 - Pipeline 本身可作为中间件嵌套使用
执行模型
输入 → 中间件1 → 中间件2 → ... → 中间件N → 输出
↓ ↓ ↓
next() next() 返回值洋葱模型:
// 执行顺序示例
pipeline.use((input, next) => {
console.log('1: 进入') // 1
const result = next(input)
console.log('4: 退出') // 4
return result
})
pipeline.use((input, next) => {
console.log('2: 进入') // 2
const result = next(input)
console.log('3: 退出') // 3
return result
})
// 输出顺序: 1 → 2 → 3 → 4完整导出清单
主入口 (farrow-pipeline)
// Pipeline 创建函数
export { createPipeline, createAsyncPipeline }
// Context 系统
export { createContext, createContainer }
// Container 工具
export { useContainer, runWithContainer }
// 类型断言
export { assertContainer, assertContext }
// 类型判断
export { isContext, isContainer, isPipeline }
// 工具函数
export { usePipeline, getMiddleware }
// 类型导出
export type {
// Pipeline 类型
Pipeline,
AsyncPipeline,
PipelineOptions,
RunPipelineOptions,
PipelineInput,
PipelineOutput,
// 中间件类型
Middleware,
Middlewares,
MiddlewareInput,
MiddlewareType,
ThunkMiddlewareInput,
// Context 类型
Context,
ContextStorage,
Container,
// 工具类型
Next,
MaybeAsync,
}AsyncTracer 入口 (farrow-pipeline/asyncTracerImpl.node)
// Node.js 环境
export { enable, disable }AsyncTracer 入口 (farrow-pipeline/asyncTracerImpl.browser)
// 浏览器环境 - 抛出错误(不支持)
throw new Error(`No Implementation`)Pipeline 类型系统
createPipeline - 创建同步 Pipeline
函数签名:
function createPipeline<I, O>(options?: PipelineOptions): Pipeline<I, O>类型参数:
I- 输入类型O- 输出类型
参数:
type PipelineOptions = {
contexts?: ContextStorage // 预设的上下文存储
}
type ContextStorage = {
[key: string]: Context<any>
}返回值:
type Pipeline<I = unknown, O = unknown> = {
[PipelineSymbol]: true
// 添加中间件,支持链式调用
use: (...inputs: MiddlewareInput<I, O>[]) => Pipeline<I, O>
// 运行 Pipeline
run: (input: I, options?: RunPipelineOptions<I, O>) => O
// 作为中间件使用
middleware: Middleware<I, O>
}
type RunPipelineOptions<I = unknown, O = unknown> = {
container?: Container // 指定容器
onLast?: (input: I) => O // 最后中间件的回调
}基础示例:
import { createPipeline } from 'farrow-pipeline'
// 创建简单的数值处理管道
const pipeline = createPipeline<number, string>()
pipeline.use((input, next) => {
console.log('输入:', input)
return next(input * 2)
})
pipeline.use((input) => {
return `结果: ${input}`
})
const result = pipeline.run(5) // "结果: 10"预设上下文示例:
import { createContext, createPipeline } from 'farrow-pipeline'
// 创建上下文
const UserContext = createContext({ name: 'Guest' })
// 创建带预设上下文的 Pipeline
const pipeline = createPipeline<string, string>({
contexts: {
user: UserContext.create({ name: 'Admin' })
}
})
pipeline.use((input, next) => {
const user = UserContext.get() // { name: 'Admin' }
return next(`${user.name}: ${input}`)
})
const result = pipeline.run('Hello') // "Admin: Hello"链式调用示例:
const pipeline = createPipeline<number, number>()
.use((x, next) => next(x + 1))
.use((x, next) => next(x * 2))
.use((x) => x - 3)
const result = pipeline.run(5) // (5 + 1) * 2 - 3 = 9createAsyncPipeline - 创建异步 Pipeline
函数签名:
function createAsyncPipeline<I, O>(options?: PipelineOptions): AsyncPipeline<I, O>返回值:
type AsyncPipeline<I = unknown, O = unknown> = Pipeline<I, MaybeAsync<O>> & {
// 懒加载中间件
useLazy: (thunk: ThunkMiddlewareInput<I, O>) => AsyncPipeline<I, O>
}
type MaybeAsync<T> = T | Promise<T>
type ThunkMiddlewareInput<I, O> = () => MaybeAsync<MiddlewareInput<I, MaybeAsync<O>>>说明:
- 支持异步中间件(返回 Promise)
- 支持懒加载中间件(按需加载)
- 自动处理 Promise 链
异步中间件示例:
const pipeline = createAsyncPipeline<string, User>()
// 异步数据库查询
pipeline.use(async (userId, next) => {
const user = await db.findUser(userId)
if (!user) throw new Error('User not found')
return next(user)
})
// 异步日志记录
pipeline.use(async (user, next) => {
await logger.log(`User ${user.id} accessed`)
return next(user)
})
// 最终处理
pipeline.use((user) => {
return user
})
// 运行(返回 Promise)
const user = await pipeline.run('user123')懒加载中间件示例:
const pipeline = createAsyncPipeline<Request, Response>()
// 懒加载重型依赖
pipeline.useLazy(async () => {
// 只在第一次需要时加载
const { imageProcessor } = await import('./heavy-image-lib')
return (req, next) => {
if (req.url.startsWith('/image')) {
const processed = imageProcessor(req)
return next(processed)
}
return next(req)
}
})
// 条件懒加载
pipeline.useLazy(() => {
if (process.env.NODE_ENV === 'development') {
return async (req, next) => {
console.log('Dev mode:', req)
return next(req)
}
}
// 生产环境跳过
return (req, next) => next(req)
})懒加载工作原理:
- 首次调用: 执行 thunk 函数获取中间件
- 缓存: 将加载的中间件缓存到变量
- 后续调用: 直接使用缓存的中间件
- Promise 处理: 自动处理同步和异步加载
// 内部实现概览
useLazy: (thunk) => {
let middleware: Middleware | null = null
let promise: Promise<void> | null = null
pipeline.use((input, next) => {
// 已加载,直接使用
if (middleware) return next(input)
// 首次加载
if (!promise) {
promise = Promise.resolve(thunk()).then((result) => {
middleware = getMiddleware(result)
})
}
// 等待加载完成
return promise.then(() => next(input))
})
// 调用加载完成的中间件
pipeline.use((input, next) => {
if (!middleware) throw new Error(`Failed to load middleware`)
return middleware(input, next)
})
return asyncPipeline
}usePipeline - 继承容器运行 Pipeline
函数签名:
function usePipeline<I, O>(pipeline: Pipeline<I, O>):
(input: I, options?: RunPipelineOptions<I, O>) => O说明:
- 在中间件中运行另一个 Pipeline
- 自动继承当前容器,保持上下文传递
- 避免直接调用
pipeline.run()导致的上下文丢失
为什么需要 usePipeline?
// ❌ 错误:直接运行会创建新容器,上下文丢失
pipeline.use((input, next) => {
const result = subPipeline.run(input) // 新容器
return next(result)
})
// ✅ 正确:继承当前容器
pipeline.use((input, next) => {
const runSubPipeline = usePipeline(subPipeline)
const result = runSubPipeline(input) // 继承容器
return next(result)
})完整示例:
import { createContext, createPipeline, usePipeline } from 'farrow-pipeline'
// 创建上下文
const UserContext = createContext<User | null>(null)
// 认证管道
const authPipeline = createPipeline<Request, User>()
authPipeline.use((req) => {
const user = authenticate(req)
UserContext.set(user) // 设置上下文
return user
})
// 业务管道
const businessPipeline = createPipeline<User, Response>()
businessPipeline.use((user) => {
const currentUser = UserContext.get() // 获取上下文
return { status: 200, user: currentUser }
})
// 主管道
const mainPipeline = createPipeline<Request, Response>()
mainPipeline.use((req, next) => {
// 使用 usePipeline 保持上下文
const runAuth = usePipeline(authPipeline)
const runBusiness = usePipeline(businessPipeline)
const user = runAuth(req) // 继承容器
const response = runBusiness(user) // 继承容器
return response // 上下文正确传递
})
// 运行
const response = mainPipeline.run(request)错误处理组合:
mainPipeline.use((input, next) => {
const runValidation = usePipeline(validationPipeline)
const runProcessing = usePipeline(processingPipeline)
try {
const validated = runValidation(input)
const result = runProcessing(validated)
return { status: 200, result }
} catch (error) {
return { status: 400, error: error.message }
}
})中间件类型
Middleware - 中间件函数类型
类型定义:
type Middleware<I = unknown, O = unknown> = (input: I, next: Next<I, O>) => O
type Next<I = unknown, O = unknown> = (input?: I) => O说明:
input- 当前中间件的输入next- 调用下一个中间件的函数- 返回值 - 中间件的输出,必须与 Pipeline 的输出类型匹配
中间件编写规范:
// 1. 转换中间件 - 修改输入传递给下一个
pipeline.use((input, next) => {
const transformed = transform(input)
return next(transformed)
})
// 2. 增强中间件 - 处理结果
pipeline.use((input, next) => {
const result = next(input)
return enhance(result)
})
// 3. 拦截中间件 - 条件执行
pipeline.use((input, next) => {
if (!validate(input)) {
return errorResponse
}
return next(input)
})
// 4. 包装中间件 - 前后处理
pipeline.use((input, next) => {
before(input)
const result = next(input)
after(result)
return result
})
// 5. 终止中间件 - 不调用 next
pipeline.use((input) => {
return finalResult
})MiddlewareInput - 中间件输入类型
类型定义:
type MiddlewareInput<I = unknown, O = unknown> =
| Middleware<I, O>
| { middleware: Middleware<I, O> }说明:
- 支持直接传入中间件函数
- 支持传入包含
middleware属性的对象
使用示例:
// 方式 1: 直接传入函数
pipeline.use((input, next) => next(input))
// 方式 2: 传入对象
const middlewareObj = {
middleware: (input, next) => next(input)
}
pipeline.use(middlewareObj)
// 方式 3: 传入 Pipeline(Pipeline 实现了 middleware 属性)
const subPipeline = createPipeline()
pipeline.use(subPipeline) // 等同于 pipeline.use(subPipeline.middleware)getMiddleware - 提取中间件函数
函数签名:
function getMiddleware<I, O>(input: MiddlewareInput<I, O>): Middleware<I, O>说明:
- 从
MiddlewareInput中提取实际的中间件函数 - 处理函数和对象两种形式
使用示例:
const fn = (input, next) => next(input)
const obj = { middleware: fn }
getMiddleware(fn) // 返回 fn
getMiddleware(obj) // 返回 obj.middlewarePipeline 方法详解
pipeline.use - 添加中间件
方法签名:
use(...inputs: MiddlewareInput<I, O>[]): Pipeline<I, O>说明:
- 接受任意数量的中间件
- 支持链式调用
- 中间件按添加顺序执行
示例:
// 单个中间件
pipeline.use((input, next) => next(input))
// 多个中间件
pipeline.use(
middleware1,
middleware2,
middleware3
)
// 链式调用
pipeline
.use(middleware1)
.use(middleware2)
.use(middleware3)
// 嵌套 Pipeline
const subPipeline = createPipeline()
pipeline.use(subPipeline)
// 条件中间件
if (enableAuth) {
pipeline.use(authMiddleware)
}pipeline.run - 运行 Pipeline
方法签名:
run(input: I, options?: RunPipelineOptions<I, O>): O参数:
type RunPipelineOptions<I = unknown, O = unknown> = {
container?: Container // 指定运行容器
onLast?: (input: I) => O // 最后中间件回调
}说明:
input- Pipeline 的输入options.container- 指定容器(默认使用 Pipeline 创建时的容器)options.onLast- 当所有中间件都调用next()时执行的回调
基础示例:
const pipeline = createPipeline<number, number>()
.use((x, next) => next(x + 1))
.use((x, next) => next(x * 2))
// 基础运行
const result = pipeline.run(5) // (5 + 1) * 2 = 12使用 onLast 示例:
const pipeline = createPipeline<string, string>()
.use((input, next) => {
console.log('中间件执行')
return next(input)
})
// 所有中间件都调用了 next,触发 onLast
const result = pipeline.run('test', {
onLast: (input) => {
console.log('到达末尾:', input)
return `默认: ${input}`
}
})
// 输出:
// 中间件执行
// 到达末尾: test
// 返回: "默认: test"指定容器示例:
const UserContext = createContext<User | null>(null)
const pipeline = createPipeline<string, string>()
.use((input) => {
const user = UserContext.get()
return `${user?.name}: ${input}`
})
// 创建自定义容器
const container = createContainer({
user: UserContext.create({ name: 'Alice' })
})
// 使用自定义容器运行
const result = pipeline.run('Hello', { container })
// "Alice: Hello"pipeline.middleware - 作为中间件使用
属性签名:
middleware: Middleware<I, O>说明:
- 将当前 Pipeline 转换为中间件函数
- 可以嵌套到其他 Pipeline 中
- 自动继承父 Pipeline 的容器
示例:
const subPipeline = createPipeline<number, number>()
.use((x, next) => next(x + 1))
.use((x, next) => next(x * 2))
const mainPipeline = createPipeline<number, string>()
.use(subPipeline.middleware) // 嵌套 subPipeline
.use((x) => `结果: ${x}`)
const result = mainPipeline.run(5) // "结果: 12"等价写法:
// 方式 1: 使用 .middleware 属性
mainPipeline.use(subPipeline.middleware)
// 方式 2: 直接传入 Pipeline(内部会调用 .middleware)
mainPipeline.use(subPipeline)
// 方式 3: 使用 usePipeline(推荐,明确表达意图)
mainPipeline.use((input, next) => {
const runSubPipeline = usePipeline(subPipeline)
const result = runSubPipeline(input)
return next(result)
})Pipeline 判断函数
isPipeline - 判断是否为 Pipeline
函数签名:
function isPipeline(obj: any): obj is Pipeline说明:
- 类型守卫函数
- 检查对象是否具有 Pipeline 标识符
使用示例:
import { isPipeline, createPipeline } from 'farrow-pipeline'
const pipeline = createPipeline()
const fn = () => {}
if (isPipeline(pipeline)) {
// TypeScript 知道 pipeline 是 Pipeline 类型
pipeline.run(input)
}
// 动态处理
function handle(handler: Pipeline | Function, input: any) {
if (isPipeline(handler)) {
return handler.run(input)
} else {
return handler(input)
}
}类型提取工具
PipelineInput - 提取输入类型
类型定义:
type PipelineInput<T extends Pipeline> = T extends Pipeline<infer I> ? I : never使用示例:
const pipeline = createPipeline<number, string>()
type Input = PipelineInput<typeof pipeline> // numberPipelineOutput - 提取输出类型
类型定义:
type PipelineOutput<T extends Pipeline> = T extends Pipeline<any, infer O> ? O : never使用示例:
const pipeline = createPipeline<number, string>()
type Output = PipelineOutput<typeof pipeline> // stringContext 上下文系统
createContext - 创建上下文
函数签名:
function createContext<T>(defaultValue: T): Context<T>类型定义:
type Context<T = any> = {
id: symbol // 上下文唯一标识
[ContextSymbol]: T // 默认值(内部使用)
create: (value: T) => Context<T> // 创建新实例
get: () => T // 获取当前值
set: (value: T) => void // 设置当前值
assert: () => Exclude<T, undefined | null> // 断言非空
}说明:
- 创建类型安全的上下文对象
- 每个 Context 有唯一的
id - 提供 CRUD 操作方法
基础示例:
import { createContext, createPipeline } from 'farrow-pipeline'
// 创建上下文
const UserContext = createContext<{ name: string } | null>(null)
const pipeline = createPipeline<string, string>()
// 设置上下文
pipeline.use((input, next) => {
UserContext.set({ name: 'Alice' })
return next(input)
})
// 读取上下文
pipeline.use((input) => {
const user = UserContext.get()
return `${user?.name}: ${input}`
})
const result = pipeline.run('Hello') // "Alice: Hello"Context 方法详解:
get() - 获取上下文值
get(): T说明:
- 从当前容器中获取上下文值
- 如果容器中不存在,返回默认值
示例:
const CountContext = createContext(0)
pipeline.use((input, next) => {
const count = CountContext.get() // 0(默认值)
CountContext.set(count + 1)
return next(input)
})
pipeline.use((input) => {
const count = CountContext.get() // 1
return `计数: ${count}`
})set(value) - 设置上下文值
set(value: T): void说明:
- 在当前容器中设置上下文值
- 会覆盖之前的值
示例:
const UserContext = createContext<User | null>(null)
pipeline.use((input, next) => {
const user = authenticate(input)
UserContext.set(user) // 设置用户
return next(input)
})assert() - 断言非空
assert(): Exclude<T, undefined | null>说明:
- 断言上下文值非空
- 如果为
null或undefined,抛出错误 - 返回非空类型
示例:
const UserContext = createContext<User | null>(null)
pipeline.use((input) => {
try {
const user = UserContext.assert() // 断言非空
// user 类型为 User(不包含 null)
return user.id
} catch (error) {
return 'Unauthorized'
}
})create(value) - 创建新实例
create(value: T): Context<T>说明:
- 创建一个新的 Context 实例
- 新实例具有相同的
id,但不同的默认值 - 用于创建预设容器
示例:
const ConfigContext = createContext({ debug: false })
// 创建开发环境配置
const devConfig = ConfigContext.create({ debug: true })
// 创建生产环境配置
const prodConfig = ConfigContext.create({ debug: false })
// 创建不同环境的容器
const devContainer = createContainer({
config: devConfig
})
const prodContainer = createContainer({
config: prodConfig
})上下文隔离示例:
const CounterContext = createContext(0)
const pipeline = createPipeline<string, string>()
pipeline.use((input, next) => {
const count = CounterContext.get()
CounterContext.set(count + 1)
return next(`${input}:${CounterContext.get()}`)
})
// 并发执行,每个都有独立的计数器
const results = await Promise.all([
pipeline.run('A'), // "A:1"
pipeline.run('B'), // "B:1"
pipeline.run('C') // "C:1"
])
// 每次运行都从默认值 0 开始多上下文示例:
const UserContext = createContext<User | null>(null)
const RequestIdContext = createContext('')
const LoggerContext = createContext<Logger>(consoleLogger)
const pipeline = createPipeline<Request, Response>()
// 设置多个上下文
pipeline.use((req, next) => {
UserContext.set(authenticate(req))
RequestIdContext.set(generateId())
LoggerContext.set(createLogger())
return next(req)
})
// 使用多个上下文
pipeline.use((req) => {
const user = UserContext.get()
const requestId = RequestIdContext.get()
const logger = LoggerContext.get()
logger.log(`User ${user?.id} - Request ${requestId}`)
return { user, requestId }
})Context 判断函数
isContext - 判断是否为 Context
函数签名:
function isContext(input: any): input is Context使用示例:
import { isContext, createContext } from 'farrow-pipeline'
const ctx = createContext(0)
if (isContext(ctx)) {
// TypeScript 知道 ctx 是 Context 类型
ctx.get()
}assertContext - 断言为 Context
函数签名:
function assertContext(input: any): asserts input is Context说明:
- TypeScript 断言函数
- 如果不是 Context,抛出错误
使用示例:
function useContextValue(ctx: any) {
assertContext(ctx)
// 此后 TypeScript 知道 ctx 是 Context 类型
return ctx.get()
}Container 容器系统
createContainer - 创建容器
函数签名:
function createContainer(contextStorage?: ContextStorage): Container类型定义:
type Container = {
[ContainerSymbol]: true
read: <V>(context: Context<V>) => V
write: <V>(context: Context<V>, value: V) => void
}
type ContextStorage = {
[key: string]: Context<any>
}说明:
- 容器是 Context 的存储容器
- 每次
pipeline.run()都会创建独立的容器 - 容器内的上下文互不干扰
基础示例:
import { createContext, createContainer } from 'farrow-pipeline'
const UserContext = createContext<User | null>(null)
// 创建自定义容器
const container = createContainer({
user: UserContext.create({ id: 1, name: 'Alice' })
})
// 从容器读取
const user = container.read(UserContext) // { id: 1, name: 'Alice' }
// 写入容器
container.write(UserContext, { id: 2, name: 'Bob' })测试场景示例:
const DatabaseContext = createContext<Database>(productionDB)
const LoggerContext = createContext<Logger>(consoleLogger)
// 创建测试容器
const testContainer = createContainer({
db: DatabaseContext.create(mockDatabase),
logger: LoggerContext.create(silentLogger)
})
// 在测试中使用
const result = pipeline.run(testInput, { container: testContainer })多环境配置示例:
const ConfigContext = createContext({ env: 'development' })
const environments = {
development: createContainer({
config: ConfigContext.create({ env: 'development', debug: true })
}),
production: createContainer({
config: ConfigContext.create({ env: 'production', debug: false })
}),
test: createContainer({
config: ConfigContext.create({ env: 'test', debug: false })
})
}
const currentContainer = environments[process.env.NODE_ENV || 'development']
const result = pipeline.run(input, { container: currentContainer })Container 方法详解
container.read - 读取上下文
方法签名:
read<V>(context: Context<V>): V说明:
- 从容器中读取指定上下文的值
- 如果容器中不存在,返回 Context 的默认值
示例:
const UserContext = createContext<User | null>(null)
const container = createContainer({
user: UserContext.create({ id: 1, name: 'Alice' })
})
const user = container.read(UserContext) // { id: 1, name: 'Alice' }container.write - 写入上下文
方法签名:
write<V>(context: Context<V>, value: V): void说明:
- 向容器中写入指定上下文的值
- 会覆盖之前的值
示例:
const CountContext = createContext(0)
const container = createContainer()
container.write(CountContext, 10)
const count = container.read(CountContext) // 10Container 工具函数
useContainer - 获取当前容器
函数签名:
function useContainer(): Container说明:
- 获取当前异步追踪上下文中的容器
- 必须在 Pipeline 运行时调用
- 用于在中间件中访问容器
使用示例:
import { useContainer } from 'farrow-pipeline'
pipeline.use((input, next) => {
const container = useContainer()
// 直接操作容器
const user = container.read(UserContext)
container.write(LogContext, logger)
return next(input)
})runWithContainer - 在指定容器中运行函数
函数签名:
function runWithContainer<F extends (...args: any) => any>(
f: F,
container: Container
): ReturnType<F>说明:
- 在指定容器的上下文中运行函数
- 函数内部可以使用
useContainer()获取该容器 - 用于手动控制容器作用域
使用示例:
import { runWithContainer } from 'farrow-pipeline'
const UserContext = createContext<User | null>(null)
const container = createContainer({
user: UserContext.create({ id: 1, name: 'Alice' })
})
// 在容器中运行函数
const result = runWithContainer(() => {
const user = UserContext.get() // { id: 1, name: 'Alice' }
return `User: ${user?.name}`
}, container)isContainer - 判断是否为 Container
函数签名:
function isContainer(input: any): input is Container使用示例:
import { isContainer, createContainer } from 'farrow-pipeline'
const container = createContainer()
if (isContainer(container)) {
// TypeScript 知道 container 是 Container 类型
container.read(UserContext)
}assertContainer - 断言为 Container
函数签名:
function assertContainer(input: any): asserts input is Container使用示例:
function useContainerValue(container: any, context: Context) {
assertContainer(container)
// 此后 TypeScript 知道 container 是 Container 类型
return container.read(context)
}异步追踪 (AsyncTracer)
概述
AsyncTracer 是 farrow-pipeline 的核心机制,用于在异步操作中保持上下文传递。
核心作用:
- 异步上下文传递 - 在 async/await、Promise、setTimeout 等异步操作中保持上下文
- 请求级隔离 - 每个请求/运行都有独立的容器,避免并发污染
- 自动传播 - 无需手动传递容器,框架自动管理
实现原理:
- Node.js: 基于
AsyncLocalStorage(async_hooks) - 浏览器: 不支持(会抛出错误)
Node.js 环境 (farrow-pipeline/asyncTracerImpl.node)
enable - 启用异步追踪
函数签名:
function enable(): void说明:
- 启用 Node.js 环境的异步上下文追踪
- farrow-http 会自动调用,无需手动启用
- 纯 farrow-pipeline 环境需手动调用
使用示例:
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'
// 应用启动时调用
asyncTracerImpl.enable()
// 现在可以在异步操作中使用 Context
const pipeline = createPipeline()
pipeline.use(async (input, next) => {
UserContext.set(user)
await delay(1000) // 异步等待
const user = UserContext.get() // ✅ 正确获取
return next(input)
})disable - 禁用异步追踪
函数签名:
function disable(): void说明:
- 禁用异步追踪
- 用于清理资源
使用示例:
// 应用关闭时
process.on('exit', () => {
asyncTracerImpl.disable()
})浏览器环境 (farrow-pipeline/asyncTracerImpl.browser)
说明:
- 浏览器环境完全不支持 farrow-pipeline
- 导入该模块会直接抛出错误:
No Implementation - farrow-pipeline 仅适用于 Node.js 服务端环境
工具函数
Koa 兼容
compose - Koa 中间件组合器
函数签名:
function compose<T, U>(
middlewares: KoaMiddleware<T, U>[]
): (context: T, next?: KoaMiddleware<T, U>) => Promise<U | void | undefined>
type KoaMiddleware<T, U = void> = Middleware<T, MiddlewareReturnType<U>>
type MiddlewareReturnType<T> = void | undefined | T | Promise<T | void | undefined>说明:
- 将 Koa 风格的中间件数组组合为一个函数
- 与 Koa 的
compose行为一致 - 内部使用 farrow-pipeline 实现
使用示例:
import { compose } from 'farrow-pipeline'
const middleware1 = async (ctx, next) => {
ctx.state.count = 1
await next()
}
const middleware2 = async (ctx, next) => {
ctx.state.count += 1
await next()
}
const composed = compose([middleware1, middleware2])
const ctx = { state: {} }
await composed(ctx)
console.log(ctx.state.count) // 2类型工具
farrow-pipeline 导出了一些类型工具,用于从现有类型中提取信息:
PipelineInput<T> - 提取 Pipeline 输入类型
从 Pipeline 类型中提取输入类型:
const pipeline = createPipeline<number, string>()
type Input = PipelineInput<typeof pipeline> // numberPipelineOutput<T> - 提取 Pipeline 输出类型
从 Pipeline 类型中提取输出类型:
const pipeline = createPipeline<number, string>()
type Output = PipelineOutput<typeof pipeline> // stringMiddlewareType<T> - 提取中间件类型
从中间件输入中提取中间件类型:
type MyMiddlewareInput = Middleware<Request, Response> | { middleware: Middleware<Request, Response> }
type ExtractedMiddleware = MiddlewareType<MyMiddlewareInput> // Middleware<Request, Response>MaybeAsync<T> - 可能异步的类型
表示一个值可能是同步的,也可能是 Promise:
type MaybeAsync<T> = T | Promise<T>
// 使用示例
function getValue(): MaybeAsync<number> {
// 可以返回同步值
return 42
// 或异步值
return Promise.resolve(42)
}常见问题
Q: Context 在异步操作中丢失?
A: 确保启用了 AsyncTracer:
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'
// Node.js 环境必需
asyncTracerImpl.enable()
// 现在异步操作中 Context 会自动传递
pipeline.use(async (input, next) => {
UserContext.set(user)
await delay(1000) // 异步等待
const user = UserContext.get() // ✅ 正确获取
return next(input)
})Q: 何时使用 usePipeline?
A: 当你需要在中间件中运行子 Pipeline 并保持上下文时:
// ❌ 错误:上下文丢失
mainPipeline.use((input, next) => {
const result = subPipeline.run(input) // 新容器
return next(result)
})
// ✅ 正确:继承上下文
mainPipeline.use((input, next) => {
const runSubPipeline = usePipeline(subPipeline)
const result = runSubPipeline(input) // 继承容器
return next(result)
})Q: Pipeline 可以重复使用吗?
A: 可以,每次 run() 都是独立执行:
const pipeline = createPipeline<number, number>()
.use(x => x + 1)
// 多次使用同一个 Pipeline
const result1 = pipeline.run(1) // 2
const result2 = pipeline.run(5) // 6
// 并发执行也是安全的
await Promise.all([
pipeline.run(1),
pipeline.run(2),
pipeline.run(3)
])Q: 如何调试 Pipeline?
A: 使用日志中间件:
const debugMiddleware = (input: any, next: Next<any>) => {
console.log('输入:', input)
const result = next(input)
console.log('输出:', result)
return result
}
pipeline.use(debugMiddleware)Q: 浏览器环境支持吗?
A: 不支持。farrow-pipeline 依赖 Node.js 的 AsyncLocalStorage(async_hooks),浏览器环境无法使用异步上下文追踪功能,导入会抛出错误。farrow-pipeline 仅适用于 Node.js 服务端环境。
总结
farrow-pipeline 提供了强大而灵活的中间件管道系统,让你能够:
- 类型安全 - 全程 TypeScript 类型推导
- 易于组合 - Pipeline 本身可作为中间件嵌套
- 上下文隔离 - 每次运行独立的容器
- 异步友好 - 基于 AsyncLocalStorage 的异步上下文传递
- 可测试 - 容器机制便于单元测试
- 高性能 - 轻量级实现,性能优异
通过这些特性,farrow-pipeline 能够帮你构建类型安全、可维护、高性能的应用程序。
