Skip to main content
文档
学习中心

通过完整指南、教程和 API 文档,掌握内容审核

快捷链接

扩展内容审核规模

每天处理 1,000 条消息时可行的审核配置,到了 100 万条就会失效:速率限制开始造成影响,单次调用延迟不断叠加,成本也会迅速膨胀。本指南将介绍如何将 Discuse 审核接入高吞吐量流水线——何时拦截、何时在后台检查、缓存什么,以及如何绕开峰值负载。

当审核规模扩大时,会出现哪些问题?

每个扩展问题都有具体原因和对应的解决方案:

挑战 影响 解决方案
内容量增长 API 速率限制、成本增加 异步处理、缓存
延迟要求 用户体验不佳 预审核、队列
成本管理 预算受限 智能路由、缓存
一致性 执行标准不统一 集中化配置
峰值处理 服务质量下降 自动扩缩容、队列

我应该使用哪种架构模式?

模式 1:同步前置审核

最适合:低流量、高风险内容(支付、法律)

User Input → API Call → Moderation → Decision → Response
                ↓
           (Blocking call)
// Simple synchronous flow
async function createPost(content) {
  // Check moderation before saving
  const result = await checkModeration(content);

  if (result.has_violations) {
    throw new ModerationError(result.message);
  }

  // Safe to publish
  return await savePost(content);
}

优点:简单,可即时反馈 缺点:增加延迟,限制吞吐量

模式 2:异步后置审核

最适合:高流量、低风险内容(评论、消息)

User Input → Save (Pending) → Publish → Background Check → Action
                                              ↓
                                    ┌────────┴────────┐
                                    ↓                 ↓
                                  Safe            Violation
                                    ↓                 ↓
                                  Keep             Remove
// Async flow with queue
async function createPost(content) {
  // Save immediately with pending status
  const post = await savePost(content, { status: 'pending' });

  // Queue for async moderation
  await moderationQueue.add('check-content', {
    postId: post.id,
    content: content
  });

  return post;
}

// Background worker
moderationQueue.process('check-content', async (job) => {
  const result = await checkModeration(job.data.content);

  if (result.has_violations) {
    await removePost(job.data.postId);
    await notifyUser(job.data.postId, 'content_removed');
  } else {
    await updatePost(job.data.postId, { status: 'approved' });
  }
});

优点:非阻塞,可应对高并发 缺点:有害内容会短暂可见

模式 3:分层审核

最适合:兼顾效果与成本优化的方案

User Input → Quick Check → High Confidence? ──Yes──► Auto-decision
                              │
                              No
                              ↓
                         Full Analysis ──► Decision
async function tieredModeration(content) {
  // Tier 1: Fast local checks (regex, blocklists)
  const localResult = await quickLocalCheck(content);
  if (localResult.definiteViolation) {
    return { action: 'block', source: 'local' };
  }

  // Tier 2: Cached API results
  const cacheKey = hashContent(content);
  const cached = await cache.get(cacheKey);
  if (cached) {
    return { action: cached.action, source: 'cache' };
  }

  // Tier 3: Full API check
  const apiResult = await checkModeration(content);
  await cache.set(cacheKey, apiResult, TTL);

  return { action: determineAction(apiResult), source: 'api' };
}

模式 4:扇出聚合

最适合:多种检查类型且需要独立扩展的场景

                    ┌──► Text Check ──┐
                    │                  │
User Input ──► Router ──► Image Check ──► Aggregator ──► Decision
                    │                  │
                    └──► Link Check ──┘
async function parallelModeration(content) {
  const checks = [];

  if (content.text) {
    checks.push(checkText(content.text));
  }

  if (content.images?.length) {
    checks.push(checkImages(content.images));
  }

  if (content.links?.length) {
    checks.push(checkLinks(content.links));
  }

  // Run all checks in parallel
  const results = await Promise.all(checks);

  // Aggregate results
  return aggregateResults(results);
}

function aggregateResults(results) {
  // Most severe result wins
  const hasViolation = results.some(r => r.has_violations);
  const maxScore = Math.max(...results.map(r => r.max_score || 0));

  return {
    has_violations: hasViolation,
    max_score: maxScore,
    details: results
  };
}

缓存策略

基于内容的缓存

为相同内容缓存结果:

const cache = new Redis();

async function checkWithCache(content) {
  const hash = crypto.createHash('sha256')
    .update(content.text || '')
    .update(JSON.stringify(content.images || []))
    .digest('hex');

  // Check cache first
  const cached = await cache.get(`mod:${hash}`);
  if (cached) {
    return { ...JSON.parse(cached), cached: true };
  }

  // API call
  const result = await callModerationAPI(content);

  // Cache for 24 hours
  await cache.setex(`mod:${hash}`, 86400, JSON.stringify(result));

  return { ...result, cached: false };
}

基于用户的缓存

为屡次违规者缓存判定:

async function checkUserHistory(userId) {
  const recentViolations = await cache.get(`violations:${userId}`);

  if (recentViolations > 5) {
    // Flag for enhanced scrutiny
    return { enhancedModeration: true };
  }

  return { enhancedModeration: false };
}

async function recordViolation(userId) {
  await cache.incr(`violations:${userId}`);
  await cache.expire(`violations:${userId}`, 86400 * 7); // 7 days
}

队列管理

优先级队列

按不同优先级处理不同类型的内容:

const queues = {
  critical: new Queue('moderation-critical'),  // Reports, appeals
  high: new Queue('moderation-high'),          // Public posts
  normal: new Queue('moderation-normal'),      // Comments, DMs
  low: new Queue('moderation-low')             // Profile updates
};

async function queueForModeration(content, priority = 'normal') {
  const queue = queues[priority] || queues.normal;

  return queue.add('check', {
    contentId: content.id,
    type: content.type,
    data: content
  }, {
    priority: getPriorityNumber(priority),
    timeout: getTimeout(priority)
  });
}

遵守速率限制

每个 API key 都有每分钟请求数限制;超出限制后,API 会以“rate limit exceeded”错误拒绝调用。请在你这一侧进行限流,确保请求量低于该限制,并避免把配额浪费在重复调用上。将限流器的每分钟上限设置为与你的 key 配置的 RPM 一致:

const Bottleneck = require('bottleneck');

const limiter = new Bottleneck({
  maxConcurrent: 50,                   // Cap parallel requests
  minTime: 20,                         // Min 20ms between requests
  reservoir: YOUR_KEY_RPM,             // Match your API key's per-minute limit
  reservoirRefreshAmount: YOUR_KEY_RPM,
  reservoirRefreshInterval: 60 * 1000  // Refill each minute
});

async function rateLimitedCheck(content) {
  return limiter.schedule(() => checkModeration(content));
}

配额与速率限制是分开的:每次调用都会计入你方案的每月 API 请求额度,响应中的 usage 对象会报告 api_requests_usedapi_requests_limitapi_requests_remaining,方便你监控剩余额度。

死信队列

处理失败的审核尝试:

moderationQueue.process('check-content', async (job) => {
  try {
    const result = await checkModeration(job.data.content);
    await applyDecision(job.data.contentId, result);
  } catch (error) {
    if (job.attemptsMade < 3) {
      throw error; // Retry
    }

    // Move to dead letter queue after 3 attempts
    await deadLetterQueue.add('failed-moderation', {
      ...job.data,
      error: error.message,
      attempts: job.attemptsMade
    });

    // Apply safe default (hold for review)
    await holdForManualReview(job.data.contentId);
  }
});

成本优化

智能路由

仅在必要时调用付费 API:

async function smartModeration(content) {
  // Step 1: Free local checks
  const localResult = runLocalFilters(content);
  if (localResult.isDefinitelySpam) {
    return { action: 'block', cost: 0 };
  }

  // Step 2: Check cache (free)
  const cached = await getCachedResult(content);
  if (cached) {
    return { action: cached.action, cost: 0 };
  }

  // Step 3: Risk-based API call
  const riskScore = calculateRiskScore(content, localResult);

  if (riskScore < 0.2) {
    // Low risk: approve without API call
    return { action: 'approve', cost: 0 };
  }

  // Step 4: API call for uncertain content
  const apiResult = await checkModeration(content);
  return {
    action: determineAction(apiResult),
    cost: calculateApiCost(content)
  };
}

并发控制

/api/v2/check 端点每次调用会为一条提交内容评分。要“批量”处理任务,请在你的 worker 中对项目分组,并在有界池的限制下并发分发,而不是期待一次请求能处理多个项目。包含多个媒体 URL 的检查本身仍是一次调用,并会覆盖所有这些媒体——一次包含大量图片的调用比多次单图片调用更便宜。

// Bounded concurrency: many items, capped parallel calls.
async function processBatch(items, concurrency = 10) {
  const results = [];
  for (let i = 0; i < items.length; i += concurrency) {
    const slice = items.slice(i, i + concurrency);
    const settled = await Promise.allSettled(
      slice.map(item => checkModeration(item.content))
    );
    results.push(...settled);
  }
  return results;
}

单个请求最多可携带 10 个图片 URL、5 个 GIF URL、3 个视频 URL、5 个文档 URL 和 20 个链接,并可附带最多 10,000 个字符的文本——因此应将一次提交中的媒体合并到一次调用中,而不是拆分处理。

监控与告警

关键指标仪表盘

const metrics = {
  // Volume metrics
  total_requests: new Counter('moderation_requests_total'),
  requests_per_second: new Gauge('moderation_rps'),

  // Latency metrics
  latency_p50: new Histogram('moderation_latency_p50'),
  latency_p99: new Histogram('moderation_latency_p99'),

  // Queue metrics
  queue_depth: new Gauge('moderation_queue_depth'),
  processing_time: new Histogram('moderation_processing_time'),

  // Cost metrics
  api_calls: new Counter('moderation_api_calls'),
  estimated_cost: new Counter('moderation_estimated_cost'),

  // Accuracy metrics
  violations_detected: new Counter('moderation_violations'),
  false_positives: new Counter('moderation_false_positives')
};

告警规则

# Alert on queue backup
- alert: ModerationQueueBackup
  expr: moderation_queue_depth > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: Moderation queue backing up

# Alert on high latency
- alert: ModerationLatencyHigh
  expr: moderation_latency_p99 > 5000
  for: 2m
  labels:
    severity: critical
  annotations:
    summary: Moderation latency exceeding 5 seconds

# Alert on high error rate
- alert: ModerationErrorRate
  expr: rate(moderation_errors_total[5m]) / rate(moderation_requests_total[5m]) > 0.01
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: Moderation error rate above 1%

基础设施扩展

水平扩展

# Kubernetes HPA for moderation workers
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: moderation-worker
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: moderation-worker
  minReplicas: 3
  maxReplicas: 50
  metrics:
  - type: External
    external:
      metric:
        name: queue_depth
      target:
        type: AverageValue
        averageValue: 100

降低全球用户的延迟

Discuse API 通过单一基础 URL 访问:https://api.discuse.com。为了让全球流量保持低延迟,请将你的审核 worker 部署在靠近用户的位置,并由它们调用 API,而不是在远距离区域的面向用户请求路径中同步调用。将这一做法与上文的异步后审核和缓存模式结合起来,这样即使往返较慢,也不会阻塞用户。

最佳实践总结

  1. 积极缓存:相同内容会返回缓存结果。API 还会在短时间窗口内对同一项目中的相同内容进行去重,并返回 cached: true
  2. 使用队列:面向高并发采用异步处理。
  3. 实现分层:本地 → 缓存 → API。
  4. 全面监控:队列深度、延迟,以及相对于配额的使用量。
  5. 为失败做好准备:死信队列和安全的降级方案。
  6. 横向扩展:增加更多 worker,而不是使用更大的机器。
  7. 优化成本:智能路由、减少冗余调用,并在每次请求中合并媒体。

后续步骤

Discuse 团队 撰写 · 最后更新于 June 2026

相关文章

AI 内容审核指南

机器学习如何驱动现代内容审核系统

配置检测阈值

根据你的使用场景平衡误报和漏报