Async and Composition
Async Pipeline
Creating Async Pipeline
Just add an Async prefix, it's that simple:
import { createAsyncPipeline } from 'farrow-pipeline'
const pipeline = createAsyncPipeline<Input, Output>()Async Middleware
Use async/await directly, just like writing normal async code:
pipeline.use(async (input, next) => {
const data = await fetchFromDatabase(input)
return next(data)
})
pipeline.use(async (input, next) => {
await saveToCache(input)
return next(input)
})
// Run returns Promise
const result = await pipeline.run(input)💡 Tip: Sync and async middleware can be mixed! The framework automatically handles Promise chains.
Mixing Sync and Async
const pipeline = createAsyncPipeline<number, string>()
// Sync middleware
pipeline.use((input, next) => {
console.log('Sync:', input)
return next(input * 2)
})
// Async middleware
pipeline.use(async (input, next) => {
await delay(1000)
console.log('Async:', input)
return next(input)
})
// Sync middleware
pipeline.use((input) => {
return `Result: ${input}`
})
// Run
const result = await pipeline.run(5)🚀 Lazy Loading Middleware
Imagine you have an image processing library, huge (10MB), but only 1% of requests use it. What to do?
Traditional approach (bad):
import { heavyImageLib } from './10mb-monster' // 😭 Startup takes 10 seconds
pipeline.use((req, next) => {
if (req.url.startsWith('/image')) {
return next(heavyImageLib.process(req))
}
return next(req)
})Lazy loading approach (perfect):
pipeline.useLazy(async () => {
// 🎉 Only loaded when first needed!
const { heavyImageLib } = await import('./10mb-monster')
return (req, next) => {
if (req.url.startsWith('/image')) {
return next(heavyImageLib.process(req))
}
return next(req)
}
})How it works:
- First call: Execute thunk function, load module, cache middleware
- Subsequent calls: Use cached middleware directly
Conditional Lazy Loading
pipeline.useLazy(() => {
if (process.env.NODE_ENV === 'development') {
// Development environment: load debug middleware
return (req, next) => {
console.log('🐛 Debug info:', req)
return next(req)
}
}
// Production environment: skip
return (req, next) => next(req)
})Lazy Loading Async Modules
pipeline.useLazy(async () => {
// Async load heavy dependencies
const [imageLib, videoLib] = await Promise.all([
import('./image-processor'),
import('./video-processor')
])
return (req, next) => {
if (req.type === 'image') {
return next(imageLib.process(req))
}
if (req.type === 'video') {
return next(videoLib.process(req))
}
return next(req)
}
})🔌 Enable Async Tracking
farrow-pipeline's Context system relies on Node.js's AsyncLocalStorage (async_hooks) to pass context in async operations.
When do you need to manually enable?
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'
// ✅ If you're using farrow-pipeline directly (not farrow-http)
// Need to call once at application startup
asyncTracerImpl.enable()
// Now Context will be correctly passed in async operations
const pipeline = createAsyncPipeline()
pipeline.use(async (input, next) => {
UserContext.set(user)
await delay(1000) // Async wait
const user = UserContext.get() // ✅ Can get it!
return next(input)
})When don't you need it?
✅ Using farrow-http: Framework automatically calls
enable(), no manual handling needed⚠️ Browser environment: Completely unsupported (will throw error "No Implementation")
What happens if you forget to enable?
// ❌ Forgot to call asyncTracerImpl.enable()
UserContext.set(user)
await someAsyncOperation()
const user = UserContext.get() // ❌ undefined! Context is lost!Disable Async Tracking
// When application shuts down
process.on('exit', () => {
asyncTracerImpl.disable()
})💡 Tip: If you're not sure whether you need to call it, just call it! Repeatedly calling
enable()is safe.
Pipeline Composition
Nested Pipeline
Pipeline itself can be nested as middleware! But note that types must be completely consistent:
// ✅ Correct example: types completely consistent nesting
const step1 = createPipeline<number, number>()
.use((x) => x + 1)
const step2 = createPipeline<number, number>()
.use((x) => x * 2)
const step3 = createPipeline<number, number>()
.use((x) => x - 3)
// Compose: all types are <number, number>, can nest directly
const mathPipeline = createPipeline<number, number>()
.use(step1) // 5 → 6
.use(step2) // 6 → 12
.use(step3) // 12 → 9
mathPipeline.run(5) // 9❌ Error Example: Inconsistent Types
const mathPipeline = createPipeline<number, number>()
.use((x) => x * 2)
// ❌ Error! mathPipeline outputs number, but mainPipeline expects all middleware to output string
const mainPipeline = createPipeline<number, string>()
.use(mathPipeline) // Type error!
.use((x) => `Result: ${x}`)✅ Correct Approach: Use Normal Middleware to Convert Types
const mainPipeline = createPipeline<number, string>()
.use((x, next) => {
// Handle math operations inside middleware
const result = x * 2
return next(result) // Continue passing
})
.use((x) => `Result: ${x}`) // Finally convert to string
mainPipeline.run(5) // "Result: 10"⚠️ Important: When nesting Pipeline,
Pipeline<I, O>'s I and O must be consistent!
usePipeline - Maintain Context Passing
Problem scenario:
// ❌ Error: Direct run() creates new container, Context is lost
pipeline.use((input, next) => {
const result = subPipeline.run(input) // New container! UserContext.get() won't get it
return next(result)
})Correct approach:
import { usePipeline } from 'farrow-pipeline'
// ✅ Correct: usePipeline inherits current container
pipeline.use((input, next) => {
const runSubPipeline = usePipeline(subPipeline)
const result = runSubPipeline(input) // Inherits container, Context passes correctly
return next(result)
})Complete Example
import { createContext, usePipeline } from 'farrow-pipeline'
const UserContext = createContext<User | null>(null)
// Auth pipeline
const authPipeline = createPipeline<Request, User>()
.use((req) => {
const user = authenticate(req)
UserContext.set(user) // Set Context
return user
})
// Business pipeline
const businessPipeline = createPipeline<User, Response>()
.use((user) => {
const currentUser = UserContext.get() // Get Context
return { status: 200, user: currentUser }
})
// Main pipeline
const mainPipeline = createPipeline<Request, Response>()
.use((req, next) => {
const runAuth = usePipeline(authPipeline)
const runBusiness = usePipeline(businessPipeline)
const user = runAuth(req)
const response = runBusiness(user)
return response // ✅ Context passes correctly
})Practical Patterns
Pattern 1: Async Error Handling
const pipeline = createAsyncPipeline<Request, Response>()
pipeline.use(async (req, next) => {
try {
return await next(req)
} catch (error) {
console.error('Pipeline error:', error)
return { status: 500, error: 'Internal Server Error' }
}
})
pipeline.use(async (req) => {
const data = await riskyDatabaseOperation(req)
return { status: 200, data }
})Pattern 2: Timeout Control
function timeout<I, O>(ms: number) {
return (input: I, next: Next<I, O>): Promise<O> => {
return Promise.race([
next(input),
new Promise<O>((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), ms)
)
])
}
}
// Usage
const pipeline = createAsyncPipeline<Request, Response>()
.use(timeout(5000)) // 5 second timeout
.use(async (req) => {
const data = await slowOperation(req)
return { status: 200, data }
})Pattern 3: Retry Mechanism
function retry<I, O>(maxRetries: number) {
return async (input: I, next: Next<I, O>): Promise<O> => {
let lastError: Error | null = null
for (let i = 0; i < maxRetries; i++) {
try {
return await next(input)
} catch (error) {
lastError = error as Error
console.log(`Retry ${i + 1}/${maxRetries}`)
await delay(1000 * (i + 1)) // Incremental delay
}
}
throw lastError
}
}
// Usage
const pipeline = createAsyncPipeline<Request, Response>()
.use(retry(3))
.use(async (req) => {
return await unreliableOperation(req)
})