Skip to content

中间件系统

farrow-http 基于 farrow-pipeline 构建,采用洋葱模型的中间件系统。

洋葱模型

farrow-http 使用洋葱模型,每个中间件可以在请求前后执行代码:

typescript
app.use((req, next) => {
  console.log('1: 进入')         // 1
  const result = next(req)       // 调用下一个中间件
  console.log('4: 退出')         // 4
  return result
})

app.use((req, next) => {
  console.log('2: 进入')         // 2
  const result = next(req)
  console.log('3: 退出')         // 3
  return result
})

// 执行顺序: 1 → 2 → 3 → 4

核心原则:

  • ✅ 中间件必须返回 Response
  • ✅ 调用 next(req) 继续到下一个中间件
  • ✅ 不调用 next() 会中断执行链

基础中间件

日志中间件

typescript
// 简单日志
app.use((req, next) => {
  console.log(`${req.method} ${req.pathname}`)
  return next(req)
})

// 详细日志
app.use((req, next) => {
  const start = Date.now()
  console.log(`→ ${req.method} ${req.pathname}`)

  const response = next(req)

  const duration = Date.now() - start
  const status = response.info.status?.code || 200
  console.log(`← ${status} (${duration}ms)`)

  return response
})

彩色日志中间件

typescript
const logger = (req, next) => {
  const start = Date.now()
  console.log(`🚀 ${req.method} ${req.pathname}`)

  const response = next(req)

  const duration = Date.now() - start
  const status = response.info.status?.code || 200
  const emoji = status >= 400 ? '❌' : status >= 300 ? '↩️' : '✅'

  console.log(`${emoji} [${duration}ms] ${status}`)
  return response
})

app.use(logger)

认证中间件

基础认证

typescript
import { createContext } from 'farrow-pipeline'

// 创建用户上下文
const UserContext = createContext<User | null>(null)

// 认证中间件
const authMiddleware = (req, next) => {
  const token = req.headers?.authorization?.replace('Bearer ', '')

  if (!token) {
    return Response.status(401).json({ error: '需要授权' })
  }

  const user = validateToken(token)
  if (!user) {
    return Response.status(401).json({ error: '无效令牌' })
  }

  // 将用户信息存入上下文
  UserContext.set(user)

  return next(req)  // 继续到下一个中间件
}

// 使用认证中间件
app.use(authMiddleware)

// 获取当前用户
app.get('/me').use(() => {
  const user = UserContext.get()
  return Response.json({ user })
})

JWT 认证

typescript
import jwt from 'jsonwebtoken'

const SECRET_KEY = process.env.JWT_SECRET || 'your-secret-key'

interface JWTPayload {
  userId: string
  email: string
}

const jwtAuth = (req, next) => {
  const authHeader = req.headers?.authorization

  if (!authHeader?.startsWith('Bearer ')) {
    return Response.status(401).json({
      error: 'Missing or invalid authorization header'
    })
  }

  const token = authHeader.substring(7)

  try {
    const payload = jwt.verify(token, SECRET_KEY) as JWTPayload
    UserContext.set({
      id: payload.userId,
      email: payload.email
    })
    return next(req)
  } catch (error) {
    return Response.status(401).json({
      error: 'Invalid or expired token'
    })
  }
}

// 使用
app.route('/api').use(jwtAuth)

可选认证

typescript
// 认证可选,但如果提供了 token 就验证
const optionalAuth = (req, next) => {
  const token = req.headers?.authorization?.replace('Bearer ', '')

  if (token) {
    const user = validateToken(token)
    if (user) {
      UserContext.set(user)
    }
  }

  return next(req)
}

// 使用
app.use(optionalAuth)

app.get('/posts').use(() => {
  const user = UserContext.get()

  if (user) {
    // 已登录用户看到更多内容
    return Response.json({ posts: getPostsForUser(user) })
  } else {
    // 未登录用户看到公开内容
    return Response.json({ posts: getPublicPosts() })
  }
})

CORS 中间件

基础 CORS

typescript
const cors = (req, next) => {
  const response = next(req)

  return Response.merge(response, Response.headers({
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
    'Access-Control-Allow-Headers': 'Content-Type, Authorization'
  }))
}

app.use(cors)

配置化 CORS

typescript
interface CORSOptions {
  origin: string | string[]
  methods?: string[]
  allowedHeaders?: string[]
  credentials?: boolean
  maxAge?: number
}

function createCORS(options: CORSOptions) {
  return (req, next) => {
    const response = next(req)

    const origin = Array.isArray(options.origin)
      ? options.origin[0]
      : options.origin

    const headers: Record<string, string> = {
      'Access-Control-Allow-Origin': origin,
      'Access-Control-Allow-Methods': options.methods?.join(', ') || 'GET, POST, PUT, DELETE',
      'Access-Control-Allow-Headers': options.allowedHeaders?.join(', ') || 'Content-Type'
    }

    if (options.credentials) {
      headers['Access-Control-Allow-Credentials'] = 'true'
    }

    if (options.maxAge) {
      headers['Access-Control-Max-Age'] = String(options.maxAge)
    }

    return Response.merge(response, Response.headers(headers))
  }
}

// 使用
app.use(createCORS({
  origin: ['https://example.com', 'https://app.example.com'],
  methods: ['GET', 'POST', 'PUT', 'DELETE'],
  allowedHeaders: ['Content-Type', 'Authorization'],
  credentials: true,
  maxAge: 86400
}))

处理 OPTIONS 请求

typescript
app.options('/*').use(() => {
  return Response
    .status(204)
    .header('Access-Control-Allow-Origin', '*')
    .header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE')
    .header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
    .header('Access-Control-Max-Age', '86400')
    .empty()
})

错误处理中间件

全局错误处理

typescript
import { HttpError } from 'farrow-http'

// 自定义错误类
class NotFoundError extends HttpError {
  constructor(resource: string) {
    super(`${resource} 不存在`, 404)
  }
}

class ValidationError extends HttpError {
  constructor(message: string) {
    super(message, 400)
  }
}

// 全局错误处理
app.use(async (req, next) => {
  try {
    return await next(req)
  } catch (error) {
    console.error('请求失败:', error)

    if (error instanceof HttpError) {
      return Response.status(error.statusCode).json({
        error: error.message
      })
    }

    if (error instanceof Error) {
      return Response.status(500).json({
        error: '服务器内部错误',
        message: process.env.NODE_ENV === 'development' ? error.message : undefined
      })
    }

    return Response.status(500).json({
      error: '未知错误'
    })
  }
})

// 在路由中抛出错误
app.get('/users/<id:int>').use((req) => {
  const user = getUserById(req.params.id)
  if (!user) {
    throw new NotFoundError('用户')  // 自动被全局错误处理器捕获
  }
  return Response.json(user)
})

错误日志

typescript
app.use(async (req, next) => {
  try {
    return await next(req)
  } catch (error) {
    // 记录错误
    logger.error({
      message: error.message,
      stack: error.stack,
      method: req.method,
      url: req.pathname,
      timestamp: new Date().toISOString()
    })

    // 返回错误响应
    return Response.status(500).json({
      error: 'Internal Server Error'
    })
  }
})

限流中间件

简单限流

typescript
const rateLimitStore = new Map<string, { count: number; resetAt: number }>()

function rateLimit(maxRequests: number, windowMs: number) {
  return (req, next) => {
    const ip = req.headers['x-forwarded-for'] || req.ip
    const now = Date.now()

    const record = rateLimitStore.get(ip)

    if (!record || now > record.resetAt) {
      rateLimitStore.set(ip, {
        count: 1,
        resetAt: now + windowMs
      })
      return next(req)
    }

    if (record.count >= maxRequests) {
      return Response.status(429).json({
        error: 'Too many requests',
        retryAfter: Math.ceil((record.resetAt - now) / 1000)
      })
    }

    record.count++
    return next(req)
  }
}

// 使用: 每分钟最多 100 个请求
app.use(rateLimit(100, 60000))

分级限流

typescript
const publicRateLimit = rateLimit(10, 60000)   // 公开接口: 10/分钟
const authRateLimit = rateLimit(100, 60000)    // 认证接口: 100/分钟

// 公开路由
app.route('/public').use(publicRateLimit)

// 认证路由
app.route('/api')
  .use(authMiddleware)
  .use(authRateLimit)

缓存中间件

内存缓存

typescript
const cache = new Map<string, { data: any; expiresAt: number }>()

function cacheMiddleware(ttl: number) {
  return (req, next) => {
    // 只缓存 GET 请求
    if (req.method !== 'GET') {
      return next(req)
    }

    const key = req.pathname + JSON.stringify(req.query)
    const cached = cache.get(key)

    if (cached && Date.now() < cached.expiresAt) {
      console.log('缓存命中:', key)
      return Response.json(cached.data)
        .header('X-Cache', 'HIT')
    }

    const response = next(req)

    // 只缓存成功的 JSON 响应
    if (response.info.status?.code === 200 && response.info.body?.type === 'json') {
      cache.set(key, {
        data: response.info.body.value,
        expiresAt: Date.now() + ttl
      })
    }

    return Response.merge(response, Response.header('X-Cache', 'MISS'))
  }
}

// 使用: 缓存 5 分钟
app.route('/api/data').use(cacheMiddleware(5 * 60 * 1000))

ETag 缓存

typescript
import crypto from 'crypto'

function generateEtag(data: any): string {
  const hash = crypto.createHash('md5')
  hash.update(JSON.stringify(data))
  return hash.digest('hex')
}

const etagMiddleware = (req, next) => {
  const response = next(req)

  if (response.info.body?.type === 'json') {
    const etag = generateEtag(response.info.body.value)
    const clientEtag = req.headers['if-none-match']

    if (clientEtag === etag) {
      return Response.status(304).empty()
    }

    return Response.merge(response, Response.header('ETag', etag))
  }

  return response
}

app.use(etagMiddleware)

压缩中间件

typescript
import zlib from 'zlib'
import { promisify } from 'util'

const gzip = promisify(zlib.gzip)

const compressionMiddleware = async (req, next) => {
  const response = await next(req)

  const acceptEncoding = req.headers['accept-encoding'] || ''

  if (!acceptEncoding.includes('gzip')) {
    return response
  }

  if (response.info.body?.type === 'json') {
    const data = JSON.stringify(response.info.body.value)

    if (data.length < 1024) {
      // 小于 1KB 不压缩
      return response
    }

    const compressed = await gzip(data)

    return Response
      .status(response.info.status?.code || 200)
      .headers({
        'Content-Type': 'application/json',
        'Content-Encoding': 'gzip',
        'Content-Length': String(compressed.length)
      })
      .body(compressed)
  }

  return response
}

app.use(compressionMiddleware)

安全中间件

安全 Headers

typescript
const securityHeaders = (req, next) => {
  const response = next(req)

  return Response.merge(response, Response.headers({
    'X-Frame-Options': 'DENY',
    'X-Content-Type-Options': 'nosniff',
    'X-XSS-Protection': '1; mode=block',
    'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
    'Content-Security-Policy': "default-src 'self'"
  }))
}

app.use(securityHeaders)

XSS 防护

typescript
function sanitize(input: any): any {
  if (typeof input === 'string') {
    return input
      .replace(/</g, '&lt;')
      .replace(/>/g, '&gt;')
      .replace(/"/g, '&quot;')
      .replace(/'/g, '&#x27;')
      .replace(/\//g, '&#x2F;')
  }

  if (Array.isArray(input)) {
    return input.map(sanitize)
  }

  if (typeof input === 'object' && input !== null) {
    const sanitized: any = {}
    for (const [key, value] of Object.entries(input)) {
      sanitized[key] = sanitize(value)
    }
    return sanitized
  }

  return input
}

const xssProtection = (req, next) => {
  if (req.body) {
    req.body = sanitize(req.body)
  }
  return next(req)
}

app.use(xssProtection)

中间件组合

组合多个中间件

typescript
function compose(...middlewares) {
  return (req, next) => {
    let index = 0

    function dispatch(i) {
      if (i >= middlewares.length) {
        return next(req)
      }

      const middleware = middlewares[i]
      return middleware(req, () => dispatch(i + 1))
    }

    return dispatch(0)
  }
}

// 使用
const apiMiddlewares = compose(
  logger,
  cors,
  authMiddleware,
  rateLimitMiddleware
)

app.route('/api').use(apiMiddlewares)

条件中间件

typescript
function conditional(
  condition: (req) => boolean,
  middleware
) {
  return (req, next) => {
    if (condition(req)) {
      return middleware(req, next)
    }
    return next(req)
  }
}

// 使用: 只在生产环境启用压缩
app.use(
  conditional(
    () => process.env.NODE_ENV === 'production',
    compressionMiddleware
  )
)

中间件继承

父路由的中间件会被子路由继承:

typescript
// 父路由的中间件会被子路由继承
const apiRouter = app.route('/api')
apiRouter.use(corsMiddleware)         // 所有 /api/* 都有 CORS
apiRouter.use(rateLimitMiddleware)    // 所有 /api/* 都有限流

const v1Router = apiRouter.route('/v1')
v1Router.use(authMiddleware)          // 所有 /api/v1/* 都需要认证

const userRouter = v1Router.route('/users')
userRouter.use(userValidationMiddleware)  // 所有 /api/v1/users/* 都有额外验证

// 请求 /api/v1/users 会依次经过:
// 1. corsMiddleware
// 2. rateLimitMiddleware
// 3. authMiddleware
// 4. userValidationMiddleware
// 5. 最终处理函数

这是一个第三方 Farrow 文档站 | 用 ❤️ 和 TypeScript 构建