扩展内容审核规模
每天处理 1,000 条消息时可行的审核配置,到了 100 万条就会失效:速率限制开始造成影响,单次调用延迟不断叠加,成本也会迅速膨胀。本指南将介绍如何将 Discuse 审核接入高吞吐量流水线——何时拦截、何时在后台检查、缓存什么,以及如何绕开峰值负载。
当审核规模扩大时,会出现哪些问题?
每个扩展问题都有具体原因和对应的解决方案:
| 挑战 | 影响 | 解决方案 |
|---|---|---|
| 内容量增长 | API 速率限制、成本增加 | 异步处理、缓存 |
| 延迟要求 | 用户体验不佳 | 预审核、队列 |
| 成本管理 | 预算受限 | 智能路由、缓存 |
| 一致性 | 执行标准不统一 | 集中化配置 |
| 峰值处理 | 服务质量下降 | 自动扩缩容、队列 |
我应该使用哪种架构模式?
模式 1:同步前置审核
最适合:低流量、高风险内容(支付、法律)
User Input → API Call → Moderation → Decision → Response
↓
(Blocking call)
// Simple synchronous flow
async function createPost(content) {
// Check moderation before saving
const result = await checkModeration(content);
if (result.has_violations) {
throw new ModerationError(result.message);
}
// Safe to publish
return await savePost(content);
}
优点:简单,可即时反馈 缺点:增加延迟,限制吞吐量
模式 2:异步后置审核
最适合:高流量、低风险内容(评论、消息)
User Input → Save (Pending) → Publish → Background Check → Action
↓
┌────────┴────────┐
↓ ↓
Safe Violation
↓ ↓
Keep Remove
// Async flow with queue
async function createPost(content) {
// Save immediately with pending status
const post = await savePost(content, { status: 'pending' });
// Queue for async moderation
await moderationQueue.add('check-content', {
postId: post.id,
content: content
});
return post;
}
// Background worker
moderationQueue.process('check-content', async (job) => {
const result = await checkModeration(job.data.content);
if (result.has_violations) {
await removePost(job.data.postId);
await notifyUser(job.data.postId, 'content_removed');
} else {
await updatePost(job.data.postId, { status: 'approved' });
}
});
优点:非阻塞,可应对高并发 缺点:有害内容会短暂可见
模式 3:分层审核
最适合:兼顾效果与成本优化的方案
User Input → Quick Check → High Confidence? ──Yes──► Auto-decision
│
No
↓
Full Analysis ──► Decision
async function tieredModeration(content) {
// Tier 1: Fast local checks (regex, blocklists)
const localResult = await quickLocalCheck(content);
if (localResult.definiteViolation) {
return { action: 'block', source: 'local' };
}
// Tier 2: Cached API results
const cacheKey = hashContent(content);
const cached = await cache.get(cacheKey);
if (cached) {
return { action: cached.action, source: 'cache' };
}
// Tier 3: Full API check
const apiResult = await checkModeration(content);
await cache.set(cacheKey, apiResult, TTL);
return { action: determineAction(apiResult), source: 'api' };
}
模式 4:扇出聚合
最适合:多种检查类型且需要独立扩展的场景
┌──► Text Check ──┐
│ │
User Input ──► Router ──► Image Check ──► Aggregator ──► Decision
│ │
└──► Link Check ──┘
async function parallelModeration(content) {
const checks = [];
if (content.text) {
checks.push(checkText(content.text));
}
if (content.images?.length) {
checks.push(checkImages(content.images));
}
if (content.links?.length) {
checks.push(checkLinks(content.links));
}
// Run all checks in parallel
const results = await Promise.all(checks);
// Aggregate results
return aggregateResults(results);
}
function aggregateResults(results) {
// Most severe result wins
const hasViolation = results.some(r => r.has_violations);
const maxScore = Math.max(...results.map(r => r.max_score || 0));
return {
has_violations: hasViolation,
max_score: maxScore,
details: results
};
}
缓存策略
基于内容的缓存
为相同内容缓存结果:
const cache = new Redis();
async function checkWithCache(content) {
const hash = crypto.createHash('sha256')
.update(content.text || '')
.update(JSON.stringify(content.images || []))
.digest('hex');
// Check cache first
const cached = await cache.get(`mod:${hash}`);
if (cached) {
return { ...JSON.parse(cached), cached: true };
}
// API call
const result = await callModerationAPI(content);
// Cache for 24 hours
await cache.setex(`mod:${hash}`, 86400, JSON.stringify(result));
return { ...result, cached: false };
}
基于用户的缓存
为屡次违规者缓存判定:
async function checkUserHistory(userId) {
const recentViolations = await cache.get(`violations:${userId}`);
if (recentViolations > 5) {
// Flag for enhanced scrutiny
return { enhancedModeration: true };
}
return { enhancedModeration: false };
}
async function recordViolation(userId) {
await cache.incr(`violations:${userId}`);
await cache.expire(`violations:${userId}`, 86400 * 7); // 7 days
}
队列管理
优先级队列
按不同优先级处理不同类型的内容:
const queues = {
critical: new Queue('moderation-critical'), // Reports, appeals
high: new Queue('moderation-high'), // Public posts
normal: new Queue('moderation-normal'), // Comments, DMs
low: new Queue('moderation-low') // Profile updates
};
async function queueForModeration(content, priority = 'normal') {
const queue = queues[priority] || queues.normal;
return queue.add('check', {
contentId: content.id,
type: content.type,
data: content
}, {
priority: getPriorityNumber(priority),
timeout: getTimeout(priority)
});
}
遵守速率限制
每个 API key 都有每分钟请求数限制;超出限制后,API 会以“rate limit exceeded”错误拒绝调用。请在你这一侧进行限流,确保请求量低于该限制,并避免把配额浪费在重复调用上。将限流器的每分钟上限设置为与你的 key 配置的 RPM 一致:
const Bottleneck = require('bottleneck');
const limiter = new Bottleneck({
maxConcurrent: 50, // Cap parallel requests
minTime: 20, // Min 20ms between requests
reservoir: YOUR_KEY_RPM, // Match your API key's per-minute limit
reservoirRefreshAmount: YOUR_KEY_RPM,
reservoirRefreshInterval: 60 * 1000 // Refill each minute
});
async function rateLimitedCheck(content) {
return limiter.schedule(() => checkModeration(content));
}
配额与速率限制是分开的:每次调用都会计入你方案的每月 API 请求额度,响应中的 usage 对象会报告 api_requests_used、api_requests_limit 和 api_requests_remaining,方便你监控剩余额度。
死信队列
处理失败的审核尝试:
moderationQueue.process('check-content', async (job) => {
try {
const result = await checkModeration(job.data.content);
await applyDecision(job.data.contentId, result);
} catch (error) {
if (job.attemptsMade < 3) {
throw error; // Retry
}
// Move to dead letter queue after 3 attempts
await deadLetterQueue.add('failed-moderation', {
...job.data,
error: error.message,
attempts: job.attemptsMade
});
// Apply safe default (hold for review)
await holdForManualReview(job.data.contentId);
}
});
成本优化
智能路由
仅在必要时调用付费 API:
async function smartModeration(content) {
// Step 1: Free local checks
const localResult = runLocalFilters(content);
if (localResult.isDefinitelySpam) {
return { action: 'block', cost: 0 };
}
// Step 2: Check cache (free)
const cached = await getCachedResult(content);
if (cached) {
return { action: cached.action, cost: 0 };
}
// Step 3: Risk-based API call
const riskScore = calculateRiskScore(content, localResult);
if (riskScore < 0.2) {
// Low risk: approve without API call
return { action: 'approve', cost: 0 };
}
// Step 4: API call for uncertain content
const apiResult = await checkModeration(content);
return {
action: determineAction(apiResult),
cost: calculateApiCost(content)
};
}
并发控制
/api/v2/check 端点每次调用会为一条提交内容评分。要“批量”处理任务,请在你的 worker 中对项目分组,并在有界池的限制下并发分发,而不是期待一次请求能处理多个项目。包含多个媒体 URL 的检查本身仍是一次调用,并会覆盖所有这些媒体——一次包含大量图片的调用比多次单图片调用更便宜。
// Bounded concurrency: many items, capped parallel calls.
async function processBatch(items, concurrency = 10) {
const results = [];
for (let i = 0; i < items.length; i += concurrency) {
const slice = items.slice(i, i + concurrency);
const settled = await Promise.allSettled(
slice.map(item => checkModeration(item.content))
);
results.push(...settled);
}
return results;
}
单个请求最多可携带 10 个图片 URL、5 个 GIF URL、3 个视频 URL、5 个文档 URL 和 20 个链接,并可附带最多 10,000 个字符的文本——因此应将一次提交中的媒体合并到一次调用中,而不是拆分处理。
监控与告警
关键指标仪表盘
const metrics = {
// Volume metrics
total_requests: new Counter('moderation_requests_total'),
requests_per_second: new Gauge('moderation_rps'),
// Latency metrics
latency_p50: new Histogram('moderation_latency_p50'),
latency_p99: new Histogram('moderation_latency_p99'),
// Queue metrics
queue_depth: new Gauge('moderation_queue_depth'),
processing_time: new Histogram('moderation_processing_time'),
// Cost metrics
api_calls: new Counter('moderation_api_calls'),
estimated_cost: new Counter('moderation_estimated_cost'),
// Accuracy metrics
violations_detected: new Counter('moderation_violations'),
false_positives: new Counter('moderation_false_positives')
};
告警规则
# Alert on queue backup
- alert: ModerationQueueBackup
expr: moderation_queue_depth > 10000
for: 5m
labels:
severity: warning
annotations:
summary: Moderation queue backing up
# Alert on high latency
- alert: ModerationLatencyHigh
expr: moderation_latency_p99 > 5000
for: 2m
labels:
severity: critical
annotations:
summary: Moderation latency exceeding 5 seconds
# Alert on high error rate
- alert: ModerationErrorRate
expr: rate(moderation_errors_total[5m]) / rate(moderation_requests_total[5m]) > 0.01
for: 5m
labels:
severity: critical
annotations:
summary: Moderation error rate above 1%
基础设施扩展
水平扩展
# Kubernetes HPA for moderation workers
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: moderation-worker
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: moderation-worker
minReplicas: 3
maxReplicas: 50
metrics:
- type: External
external:
metric:
name: queue_depth
target:
type: AverageValue
averageValue: 100
降低全球用户的延迟
Discuse API 通过单一基础 URL 访问:https://api.discuse.com。为了让全球流量保持低延迟,请将你的审核 worker 部署在靠近用户的位置,并由它们调用 API,而不是在远距离区域的面向用户请求路径中同步调用。将这一做法与上文的异步后审核和缓存模式结合起来,这样即使往返较慢,也不会阻塞用户。
最佳实践总结
- 积极缓存:相同内容会返回缓存结果。API 还会在短时间窗口内对同一项目中的相同内容进行去重,并返回
cached: true。 - 使用队列:面向高并发采用异步处理。
- 实现分层:本地 → 缓存 → API。
- 全面监控:队列深度、延迟,以及相对于配额的使用量。
- 为失败做好准备:死信队列和安全的降级方案。
- 横向扩展:增加更多 worker,而不是使用更大的机器。
- 优化成本:智能路由、减少冗余调用,并在每次请求中合并媒体。