基础用法
掌握 Pipeline 的基本操作。
创建 Pipeline
基本创建
typescript
import { createPipeline } from 'farrow-pipeline'
// 指定输入输出类型
const pipeline = createPipeline<InputType, OutputType>()
// 例子:字符串转数字
const parser = createPipeline<string, number>()
// 例子:用户验证
interface User {
name: string
age: number
}
const userValidator = createPipeline<unknown, User | null>()类型推导
typescript
// TypeScript 会根据中间件推导类型
const pipeline = createPipeline<number, string>()
.use((x, next) => next(x + 1)) // x: number
.use((x) => `Result: ${x}`) // x: number, 返回 string添加中间件
方式 1: 单个添加
typescript
const pipeline = createPipeline<number, string>()
pipeline.use((input, next) => {
console.log('中间件 1')
return next(input)
})
pipeline.use((input, next) => {
console.log('中间件 2')
return next(input)
})
pipeline.use((input) => {
console.log('最后一个')
return `结果: ${input}`
})方式 2: 批量添加
typescript
const middleware1 = (input, next) => next(input + 1)
const middleware2 = (input, next) => next(input * 2)
const middleware3 = (input) => `值: ${input}`
pipeline.use(middleware1, middleware2, middleware3)方式 3: 链式调用
typescript
const pipeline = createPipeline<number, string>()
.use((x, next) => {
console.log('步骤 1:', x)
return next(x + 1)
})
.use((x, next) => {
console.log('步骤 2:', x)
return next(x * 2)
})
.use((x) => {
console.log('步骤 3:', x)
return `完成: ${x}`
})方式 4: 提取中间件函数
typescript
// 定义可复用的中间件
function validateInput(input: number, next) {
if (input < 0) {
throw new Error('输入必须为正数')
}
return next(input)
}
function doubleValue(input: number, next) {
return next(input * 2)
}
function formatResult(input: number) {
return `结果: ${input}`
}
// 组合使用
const pipeline = createPipeline<number, string>()
.use(validateInput)
.use(doubleValue)
.use(formatResult)运行 Pipeline
基础运行
typescript
const pipeline = createPipeline<number, string>()
.use((x, next) => next(x * 2))
.use((x) => `值: ${x}`)
const result = pipeline.run(5)
console.log(result) // "值: 10"带选项运行
typescript
const pipeline = createPipeline<number, string>()
.use((x, next) => next(x * 2))
// 注意:没有最终中间件
// 提供 onLast 作为默认处理
const result = pipeline.run(5, {
onLast: (input) => `默认: ${input}`
})
console.log(result) // "默认: 10"onLast 的作用
typescript
// 场景 1: 所有中间件都调用 next
const pipeline1 = createPipeline<number, string>()
.use((x, next) => next(x + 1))
.use((x, next) => next(x * 2))
// 没有终止中间件
pipeline1.run(5, {
onLast: (x) => `最终值: ${x}` // 会被调用
}) // "最终值: 12"
// 场景 2: 有终止中间件
const pipeline2 = createPipeline<number, string>()
.use((x, next) => next(x + 1))
.use((x) => `提前结束: ${x}`) // 终止中间件
pipeline2.run(5, {
onLast: (x) => `最终值: ${x}` // 不会被调用
}) // "提前结束: 6"中间件顺序
执行顺序很重要
typescript
// 顺序 A
const pipelineA = createPipeline<number, number>()
.use((x, next) => next(x + 10)) // 先加
.use((x, next) => next(x * 2)) // 后乘
console.log(pipelineA.run(5, { onLast: x => x })) // (5 + 10) * 2 = 30
// 顺序 B
const pipelineB = createPipeline<number, number>()
.use((x, next) => next(x * 2)) // 先乘
.use((x, next) => next(x + 10)) // 后加
console.log(pipelineB.run(5, { onLast: x => x })) // (5 * 2) + 10 = 20前置和后置处理
typescript
const pipeline = createPipeline<number, number>()
// 中间件 1: 前后处理
pipeline.use((input, next) => {
console.log('前置 1:', input) // 5
const result = next(input + 1) // 传递 6
console.log('后置 1:', result) // 14
return result + 1 // 返回 15
})
// 中间件 2: 前后处理
pipeline.use((input, next) => {
console.log('前置 2:', input) // 6
const result = next(input * 2) // 传递 12
console.log('后置 2:', result) // 12
return result + 2 // 返回 14
})
// 中间件 3: 终止
pipeline.use((input) => {
console.log('终止:', input) // 12
return input // 返回 12
})
pipeline.run(5)
// 输出:
// 前置 1: 5
// 前置 2: 6
// 终止: 12
// 后置 2: 12
// 后置 1: 14
// 最终结果: 15条件执行
根据输入决定是否继续
typescript
const pipeline = createPipeline<number, string>()
.use((input, next) => {
if (input < 0) {
return '错误:负数' // 不调用 next,提前返回
}
if (input === 0) {
return '零'
}
return next(input) // 正数才继续
})
.use((input, next) => {
if (input > 100) {
return '错误:太大'
}
return next(input)
})
.use((input) => {
return `正常值: ${input}`
})
console.log(pipeline.run(-5)) // "错误:负数"
console.log(pipeline.run(0)) // "零"
console.log(pipeline.run(50)) // "正常值: 50"
console.log(pipeline.run(200)) // "错误:太大"多路分支
typescript
type Action =
| { type: 'add', value: number }
| { type: 'multiply', value: number }
| { type: 'reset' }
const calculator = createPipeline<{ state: number, action: Action }, number>()
.use(({ state, action }, next) => {
switch (action.type) {
case 'add':
return next({ state: state + action.value, action })
case 'multiply':
return next({ state: state * action.value, action })
case 'reset':
return 0 // 直接返回,不继续
default:
return next({ state, action })
}
})
.use(({ state }) => state)
console.log(calculator.run({ state: 10, action: { type: 'add', value: 5 } })) // 15
console.log(calculator.run({ state: 10, action: { type: 'multiply', value: 3 } })) // 30
console.log(calculator.run({ state: 10, action: { type: 'reset' } })) // 0错误处理
try-catch 包装
typescript
const safePipeline = createPipeline<number, string>()
// 全局错误处理中间件
safePipeline.use((input, next) => {
try {
return next(input)
} catch (error) {
console.error('Pipeline 出错:', error)
return '执行失败'
}
})
// 可能抛出异常的中间件
safePipeline.use((input, next) => {
if (input < 0) {
throw new Error('不允许负数')
}
return next(input * 2)
})
safePipeline.use((input) => {
return `成功: ${input}`
})
console.log(safePipeline.run(5)) // "成功: 10"
console.log(safePipeline.run(-5)) // "执行失败" (捕获异常)错误传递
typescript
type Result<T> = { ok: true, value: T } | { ok: false, error: string }
const pipeline = createPipeline<number, Result<number>>()
.use((input, next) => {
if (input < 0) {
return { ok: false, error: '负数' }
}
return next(input)
})
.use((input, next) => {
if (input > 100) {
return { ok: false, error: '太大' }
}
return next(input)
})
.use((input) => {
return { ok: true, value: input * 2 }
})
const result = pipeline.run(50)
if (result.ok) {
console.log('结果:', result.value)
} else {
console.log('错误:', result.error)
}实用示例
示例 1: 数据转换管道
typescript
interface RawUser {
name: string
age: string
email: string
}
interface User {
name: string
age: number
email: string
isAdult: boolean
}
const userTransformer = createPipeline<RawUser, User>()
.use((input, next) => {
// 转换年龄
return next({
...input,
age: parseInt(input.age, 10)
} as any)
})
.use((input: any) => {
// 添加 isAdult 字段
return {
name: input.name,
age: input.age,
email: input.email,
isAdult: input.age >= 18
}
})
const raw = { name: 'Alice', age: '25', email: 'alice@example.com' }
const user = userTransformer.run(raw)
console.log(user) // { name: 'Alice', age: 25, email: 'alice@example.com', isAdult: true }示例 2: 请求处理管道
typescript
interface Request {
url: string
method: string
body?: any
}
interface Response {
status: number
body: any
}
const requestPipeline = createPipeline<Request, Response>()
// 日志
.use((req, next) => {
console.log(`${req.method} ${req.url}`)
return next(req)
})
// 认证
.use((req, next) => {
if (!req.headers?.authorization) {
return { status: 401, body: { error: 'Unauthorized' } }
}
return next(req)
})
// 路由
.use((req) => {
if (req.url === '/users') {
return { status: 200, body: { users: [] } }
}
return { status: 404, body: { error: 'Not found' } }
})