Skip to content

Async and Composition

Async Pipeline

Creating Async Pipeline

Just add an Async prefix, it's that simple:

typescript
import { createAsyncPipeline } from 'farrow-pipeline'

const pipeline = createAsyncPipeline<Input, Output>()

Async Middleware

Use async/await directly, just like writing normal async code:

typescript
pipeline.use(async (input, next) => {
  const data = await fetchFromDatabase(input)
  return next(data)
})

pipeline.use(async (input, next) => {
  await saveToCache(input)
  return next(input)
})

// Run returns Promise
const result = await pipeline.run(input)

💡 Tip: Sync and async middleware can be mixed! The framework automatically handles Promise chains.

Mixing Sync and Async

typescript
const pipeline = createAsyncPipeline<number, string>()

// Sync middleware
pipeline.use((input, next) => {
  console.log('Sync:', input)
  return next(input * 2)
})

// Async middleware
pipeline.use(async (input, next) => {
  await delay(1000)
  console.log('Async:', input)
  return next(input)
})

// Sync middleware
pipeline.use((input) => {
  return `Result: ${input}`
})

// Run
const result = await pipeline.run(5)

🚀 Lazy Loading Middleware

Imagine you have an image processing library, huge (10MB), but only 1% of requests use it. What to do?

Traditional approach (bad):

typescript
import { heavyImageLib } from './10mb-monster'  // 😭 Startup takes 10 seconds

pipeline.use((req, next) => {
  if (req.url.startsWith('/image')) {
    return next(heavyImageLib.process(req))
  }
  return next(req)
})

Lazy loading approach (perfect):

typescript
pipeline.useLazy(async () => {
  // 🎉 Only loaded when first needed!
  const { heavyImageLib } = await import('./10mb-monster')

  return (req, next) => {
    if (req.url.startsWith('/image')) {
      return next(heavyImageLib.process(req))
    }
    return next(req)
  }
})

How it works:

  1. First call: Execute thunk function, load module, cache middleware
  2. Subsequent calls: Use cached middleware directly

Conditional Lazy Loading

typescript
pipeline.useLazy(() => {
  if (process.env.NODE_ENV === 'development') {
    // Development environment: load debug middleware
    return (req, next) => {
      console.log('🐛 Debug info:', req)
      return next(req)
    }
  }
  // Production environment: skip
  return (req, next) => next(req)
})

Lazy Loading Async Modules

typescript
pipeline.useLazy(async () => {
  // Async load heavy dependencies
  const [imageLib, videoLib] = await Promise.all([
    import('./image-processor'),
    import('./video-processor')
  ])

  return (req, next) => {
    if (req.type === 'image') {
      return next(imageLib.process(req))
    }
    if (req.type === 'video') {
      return next(videoLib.process(req))
    }
    return next(req)
  }
})

🔌 Enable Async Tracking

farrow-pipeline's Context system relies on Node.js's AsyncLocalStorage (async_hooks) to pass context in async operations.

When do you need to manually enable?

typescript
import * as asyncTracerImpl from 'farrow-pipeline/asyncTracerImpl.node'

// ✅ If you're using farrow-pipeline directly (not farrow-http)
// Need to call once at application startup
asyncTracerImpl.enable()

// Now Context will be correctly passed in async operations
const pipeline = createAsyncPipeline()

pipeline.use(async (input, next) => {
  UserContext.set(user)

  await delay(1000)  // Async wait

  const user = UserContext.get()  // ✅ Can get it!
  return next(input)
})

When don't you need it?

Using farrow-http: Framework automatically calls enable(), no manual handling needed

⚠️ Browser environment: Completely unsupported (will throw error "No Implementation")

What happens if you forget to enable?

typescript
// ❌ Forgot to call asyncTracerImpl.enable()

UserContext.set(user)
await someAsyncOperation()
const user = UserContext.get()  // ❌ undefined! Context is lost!

Disable Async Tracking

typescript
// When application shuts down
process.on('exit', () => {
  asyncTracerImpl.disable()
})

💡 Tip: If you're not sure whether you need to call it, just call it! Repeatedly calling enable() is safe.

Pipeline Composition

Nested Pipeline

Pipeline itself can be nested as middleware! But note that types must be completely consistent:

typescript
// ✅ Correct example: types completely consistent nesting
const step1 = createPipeline<number, number>()
  .use((x) => x + 1)

const step2 = createPipeline<number, number>()
  .use((x) => x * 2)

const step3 = createPipeline<number, number>()
  .use((x) => x - 3)

// Compose: all types are <number, number>, can nest directly
const mathPipeline = createPipeline<number, number>()
  .use(step1)  // 5 → 6
  .use(step2)  // 6 → 12
  .use(step3)  // 12 → 9

mathPipeline.run(5)  // 9

❌ Error Example: Inconsistent Types

typescript
const mathPipeline = createPipeline<number, number>()
  .use((x) => x * 2)

// ❌ Error! mathPipeline outputs number, but mainPipeline expects all middleware to output string
const mainPipeline = createPipeline<number, string>()
  .use(mathPipeline)      // Type error!
  .use((x) => `Result: ${x}`)

✅ Correct Approach: Use Normal Middleware to Convert Types

typescript
const mainPipeline = createPipeline<number, string>()
  .use((x, next) => {
    // Handle math operations inside middleware
    const result = x * 2
    return next(result)  // Continue passing
  })
  .use((x) => `Result: ${x}`)  // Finally convert to string

mainPipeline.run(5)  // "Result: 10"

⚠️ Important: When nesting Pipeline, Pipeline<I, O>'s I and O must be consistent!

usePipeline - Maintain Context Passing

Problem scenario:

typescript
// ❌ Error: Direct run() creates new container, Context is lost
pipeline.use((input, next) => {
  const result = subPipeline.run(input)  // New container! UserContext.get() won't get it
  return next(result)
})

Correct approach:

typescript
import { usePipeline } from 'farrow-pipeline'

// ✅ Correct: usePipeline inherits current container
pipeline.use((input, next) => {
  const runSubPipeline = usePipeline(subPipeline)
  const result = runSubPipeline(input)  // Inherits container, Context passes correctly
  return next(result)
})

Complete Example

typescript
import { createContext, usePipeline } from 'farrow-pipeline'

const UserContext = createContext<User | null>(null)

// Auth pipeline
const authPipeline = createPipeline<Request, User>()
  .use((req) => {
    const user = authenticate(req)
    UserContext.set(user)  // Set Context
    return user
  })

// Business pipeline
const businessPipeline = createPipeline<User, Response>()
  .use((user) => {
    const currentUser = UserContext.get()  // Get Context
    return { status: 200, user: currentUser }
  })

// Main pipeline
const mainPipeline = createPipeline<Request, Response>()
  .use((req, next) => {
    const runAuth = usePipeline(authPipeline)
    const runBusiness = usePipeline(businessPipeline)

    const user = runAuth(req)
    const response = runBusiness(user)
    return response  // ✅ Context passes correctly
  })

Practical Patterns

Pattern 1: Async Error Handling

typescript
const pipeline = createAsyncPipeline<Request, Response>()

pipeline.use(async (req, next) => {
  try {
    return await next(req)
  } catch (error) {
    console.error('Pipeline error:', error)
    return { status: 500, error: 'Internal Server Error' }
  }
})

pipeline.use(async (req) => {
  const data = await riskyDatabaseOperation(req)
  return { status: 200, data }
})

Pattern 2: Timeout Control

typescript
function timeout<I, O>(ms: number) {
  return (input: I, next: Next<I, O>): Promise<O> => {
    return Promise.race([
      next(input),
      new Promise<O>((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), ms)
      )
    ])
  }
}

// Usage
const pipeline = createAsyncPipeline<Request, Response>()
  .use(timeout(5000))  // 5 second timeout
  .use(async (req) => {
    const data = await slowOperation(req)
    return { status: 200, data }
  })

Pattern 3: Retry Mechanism

typescript
function retry<I, O>(maxRetries: number) {
  return async (input: I, next: Next<I, O>): Promise<O> => {
    let lastError: Error | null = null

    for (let i = 0; i < maxRetries; i++) {
      try {
        return await next(input)
      } catch (error) {
        lastError = error as Error
        console.log(`Retry ${i + 1}/${maxRetries}`)
        await delay(1000 * (i + 1))  // Incremental delay
      }
    }

    throw lastError
  }
}

// Usage
const pipeline = createAsyncPipeline<Request, Response>()
  .use(retry(3))
  .use(async (req) => {
    return await unreliableOperation(req)
  })

This is a third-party Farrow documentation site | Built with ❤️ and TypeScript