中间件系统
farrow-http 基于 farrow-pipeline 构建,采用洋葱模型的中间件系统。
洋葱模型
farrow-http 使用洋葱模型,每个中间件可以在请求前后执行代码:
typescript
app.use((req, next) => {
console.log('1: 进入') // 1
const result = next(req) // 调用下一个中间件
console.log('4: 退出') // 4
return result
})
app.use((req, next) => {
console.log('2: 进入') // 2
const result = next(req)
console.log('3: 退出') // 3
return result
})
// 执行顺序: 1 → 2 → 3 → 4核心原则:
- ✅ 中间件必须返回 Response
- ✅ 调用
next(req)继续到下一个中间件 - ✅ 不调用
next()会中断执行链
基础中间件
日志中间件
typescript
// 简单日志
app.use((req, next) => {
console.log(`${req.method} ${req.pathname}`)
return next(req)
})
// 详细日志
app.use((req, next) => {
const start = Date.now()
console.log(`→ ${req.method} ${req.pathname}`)
const response = next(req)
const duration = Date.now() - start
const status = response.info.status?.code || 200
console.log(`← ${status} (${duration}ms)`)
return response
})彩色日志中间件
typescript
const logger = (req, next) => {
const start = Date.now()
console.log(`🚀 ${req.method} ${req.pathname}`)
const response = next(req)
const duration = Date.now() - start
const status = response.info.status?.code || 200
const emoji = status >= 400 ? '❌' : status >= 300 ? '↩️' : '✅'
console.log(`${emoji} [${duration}ms] ${status}`)
return response
})
app.use(logger)认证中间件
基础认证
typescript
import { createContext } from 'farrow-pipeline'
// 创建用户上下文
const UserContext = createContext<User | null>(null)
// 认证中间件
const authMiddleware = (req, next) => {
const token = req.headers?.authorization?.replace('Bearer ', '')
if (!token) {
return Response.status(401).json({ error: '需要授权' })
}
const user = validateToken(token)
if (!user) {
return Response.status(401).json({ error: '无效令牌' })
}
// 将用户信息存入上下文
UserContext.set(user)
return next(req) // 继续到下一个中间件
}
// 使用认证中间件
app.use(authMiddleware)
// 获取当前用户
app.get('/me').use(() => {
const user = UserContext.get()
return Response.json({ user })
})JWT 认证
typescript
import jwt from 'jsonwebtoken'
const SECRET_KEY = process.env.JWT_SECRET || 'your-secret-key'
interface JWTPayload {
userId: string
email: string
}
const jwtAuth = (req, next) => {
const authHeader = req.headers?.authorization
if (!authHeader?.startsWith('Bearer ')) {
return Response.status(401).json({
error: 'Missing or invalid authorization header'
})
}
const token = authHeader.substring(7)
try {
const payload = jwt.verify(token, SECRET_KEY) as JWTPayload
UserContext.set({
id: payload.userId,
email: payload.email
})
return next(req)
} catch (error) {
return Response.status(401).json({
error: 'Invalid or expired token'
})
}
}
// 使用
app.route('/api').use(jwtAuth)可选认证
typescript
// 认证可选,但如果提供了 token 就验证
const optionalAuth = (req, next) => {
const token = req.headers?.authorization?.replace('Bearer ', '')
if (token) {
const user = validateToken(token)
if (user) {
UserContext.set(user)
}
}
return next(req)
}
// 使用
app.use(optionalAuth)
app.get('/posts').use(() => {
const user = UserContext.get()
if (user) {
// 已登录用户看到更多内容
return Response.json({ posts: getPostsForUser(user) })
} else {
// 未登录用户看到公开内容
return Response.json({ posts: getPublicPosts() })
}
})CORS 中间件
基础 CORS
typescript
const cors = (req, next) => {
const response = next(req)
return Response.merge(response, Response.headers({
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
}))
}
app.use(cors)配置化 CORS
typescript
interface CORSOptions {
origin: string | string[]
methods?: string[]
allowedHeaders?: string[]
credentials?: boolean
maxAge?: number
}
function createCORS(options: CORSOptions) {
return (req, next) => {
const response = next(req)
const origin = Array.isArray(options.origin)
? options.origin[0]
: options.origin
const headers: Record<string, string> = {
'Access-Control-Allow-Origin': origin,
'Access-Control-Allow-Methods': options.methods?.join(', ') || 'GET, POST, PUT, DELETE',
'Access-Control-Allow-Headers': options.allowedHeaders?.join(', ') || 'Content-Type'
}
if (options.credentials) {
headers['Access-Control-Allow-Credentials'] = 'true'
}
if (options.maxAge) {
headers['Access-Control-Max-Age'] = String(options.maxAge)
}
return Response.merge(response, Response.headers(headers))
}
}
// 使用
app.use(createCORS({
origin: ['https://example.com', 'https://app.example.com'],
methods: ['GET', 'POST', 'PUT', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization'],
credentials: true,
maxAge: 86400
}))处理 OPTIONS 请求
typescript
app.options('/*').use(() => {
return Response
.status(204)
.header('Access-Control-Allow-Origin', '*')
.header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE')
.header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
.header('Access-Control-Max-Age', '86400')
.empty()
})错误处理中间件
全局错误处理
typescript
import { HttpError } from 'farrow-http'
// 自定义错误类
class NotFoundError extends HttpError {
constructor(resource: string) {
super(`${resource} 不存在`, 404)
}
}
class ValidationError extends HttpError {
constructor(message: string) {
super(message, 400)
}
}
// 全局错误处理
app.use(async (req, next) => {
try {
return await next(req)
} catch (error) {
console.error('请求失败:', error)
if (error instanceof HttpError) {
return Response.status(error.statusCode).json({
error: error.message
})
}
if (error instanceof Error) {
return Response.status(500).json({
error: '服务器内部错误',
message: process.env.NODE_ENV === 'development' ? error.message : undefined
})
}
return Response.status(500).json({
error: '未知错误'
})
}
})
// 在路由中抛出错误
app.get('/users/<id:int>').use((req) => {
const user = getUserById(req.params.id)
if (!user) {
throw new NotFoundError('用户') // 自动被全局错误处理器捕获
}
return Response.json(user)
})错误日志
typescript
app.use(async (req, next) => {
try {
return await next(req)
} catch (error) {
// 记录错误
logger.error({
message: error.message,
stack: error.stack,
method: req.method,
url: req.pathname,
timestamp: new Date().toISOString()
})
// 返回错误响应
return Response.status(500).json({
error: 'Internal Server Error'
})
}
})限流中间件
简单限流
typescript
const rateLimitStore = new Map<string, { count: number; resetAt: number }>()
function rateLimit(maxRequests: number, windowMs: number) {
return (req, next) => {
const ip = req.headers['x-forwarded-for'] || req.ip
const now = Date.now()
const record = rateLimitStore.get(ip)
if (!record || now > record.resetAt) {
rateLimitStore.set(ip, {
count: 1,
resetAt: now + windowMs
})
return next(req)
}
if (record.count >= maxRequests) {
return Response.status(429).json({
error: 'Too many requests',
retryAfter: Math.ceil((record.resetAt - now) / 1000)
})
}
record.count++
return next(req)
}
}
// 使用: 每分钟最多 100 个请求
app.use(rateLimit(100, 60000))分级限流
typescript
const publicRateLimit = rateLimit(10, 60000) // 公开接口: 10/分钟
const authRateLimit = rateLimit(100, 60000) // 认证接口: 100/分钟
// 公开路由
app.route('/public').use(publicRateLimit)
// 认证路由
app.route('/api')
.use(authMiddleware)
.use(authRateLimit)缓存中间件
内存缓存
typescript
const cache = new Map<string, { data: any; expiresAt: number }>()
function cacheMiddleware(ttl: number) {
return (req, next) => {
// 只缓存 GET 请求
if (req.method !== 'GET') {
return next(req)
}
const key = req.pathname + JSON.stringify(req.query)
const cached = cache.get(key)
if (cached && Date.now() < cached.expiresAt) {
console.log('缓存命中:', key)
return Response.json(cached.data)
.header('X-Cache', 'HIT')
}
const response = next(req)
// 只缓存成功的 JSON 响应
if (response.info.status?.code === 200 && response.info.body?.type === 'json') {
cache.set(key, {
data: response.info.body.value,
expiresAt: Date.now() + ttl
})
}
return Response.merge(response, Response.header('X-Cache', 'MISS'))
}
}
// 使用: 缓存 5 分钟
app.route('/api/data').use(cacheMiddleware(5 * 60 * 1000))ETag 缓存
typescript
import crypto from 'crypto'
function generateEtag(data: any): string {
const hash = crypto.createHash('md5')
hash.update(JSON.stringify(data))
return hash.digest('hex')
}
const etagMiddleware = (req, next) => {
const response = next(req)
if (response.info.body?.type === 'json') {
const etag = generateEtag(response.info.body.value)
const clientEtag = req.headers['if-none-match']
if (clientEtag === etag) {
return Response.status(304).empty()
}
return Response.merge(response, Response.header('ETag', etag))
}
return response
}
app.use(etagMiddleware)压缩中间件
typescript
import zlib from 'zlib'
import { promisify } from 'util'
const gzip = promisify(zlib.gzip)
const compressionMiddleware = async (req, next) => {
const response = await next(req)
const acceptEncoding = req.headers['accept-encoding'] || ''
if (!acceptEncoding.includes('gzip')) {
return response
}
if (response.info.body?.type === 'json') {
const data = JSON.stringify(response.info.body.value)
if (data.length < 1024) {
// 小于 1KB 不压缩
return response
}
const compressed = await gzip(data)
return Response
.status(response.info.status?.code || 200)
.headers({
'Content-Type': 'application/json',
'Content-Encoding': 'gzip',
'Content-Length': String(compressed.length)
})
.body(compressed)
}
return response
}
app.use(compressionMiddleware)安全中间件
安全 Headers
typescript
const securityHeaders = (req, next) => {
const response = next(req)
return Response.merge(response, Response.headers({
'X-Frame-Options': 'DENY',
'X-Content-Type-Options': 'nosniff',
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'Content-Security-Policy': "default-src 'self'"
}))
}
app.use(securityHeaders)XSS 防护
typescript
function sanitize(input: any): any {
if (typeof input === 'string') {
return input
.replace(/</g, '<')
.replace(/>/g, '>')
.replace(/"/g, '"')
.replace(/'/g, ''')
.replace(/\//g, '/')
}
if (Array.isArray(input)) {
return input.map(sanitize)
}
if (typeof input === 'object' && input !== null) {
const sanitized: any = {}
for (const [key, value] of Object.entries(input)) {
sanitized[key] = sanitize(value)
}
return sanitized
}
return input
}
const xssProtection = (req, next) => {
if (req.body) {
req.body = sanitize(req.body)
}
return next(req)
}
app.use(xssProtection)中间件组合
组合多个中间件
typescript
function compose(...middlewares) {
return (req, next) => {
let index = 0
function dispatch(i) {
if (i >= middlewares.length) {
return next(req)
}
const middleware = middlewares[i]
return middleware(req, () => dispatch(i + 1))
}
return dispatch(0)
}
}
// 使用
const apiMiddlewares = compose(
logger,
cors,
authMiddleware,
rateLimitMiddleware
)
app.route('/api').use(apiMiddlewares)条件中间件
typescript
function conditional(
condition: (req) => boolean,
middleware
) {
return (req, next) => {
if (condition(req)) {
return middleware(req, next)
}
return next(req)
}
}
// 使用: 只在生产环境启用压缩
app.use(
conditional(
() => process.env.NODE_ENV === 'production',
compressionMiddleware
)
)中间件继承
父路由的中间件会被子路由继承:
typescript
// 父路由的中间件会被子路由继承
const apiRouter = app.route('/api')
apiRouter.use(corsMiddleware) // 所有 /api/* 都有 CORS
apiRouter.use(rateLimitMiddleware) // 所有 /api/* 都有限流
const v1Router = apiRouter.route('/v1')
v1Router.use(authMiddleware) // 所有 /api/v1/* 都需要认证
const userRouter = v1Router.route('/users')
userRouter.use(userValidationMiddleware) // 所有 /api/v1/users/* 都有额外验证
// 请求 /api/v1/users 会依次经过:
// 1. corsMiddleware
// 2. rateLimitMiddleware
// 3. authMiddleware
// 4. userValidationMiddleware
// 5. 最终处理函数