Queue System
ClearSight uses BullMQ for all async job processing, backed by Redis.
Queues
Defined in src/modules/queue/queues.ts:
| Queue name | Processor | Concurrency | Job type |
|---|---|---|---|
crawl-discovery | crawl.processor.ts | 1 | CrawlDiscoveryJob |
page-scan | page-scan.processor.ts | WORKER_CONCURRENCY (default 3) | PageScanJob |
ai-enrichment | ai-enrichment.processor.ts | AI_CONCURRENCY (default 2) | AiEnrichmentJob |
Job type definitions
Defined in src/modules/queue/types.ts:
// CrawlDiscoveryJob
{
crawlId: string
siteId: string
rootUrl: string
maxPages?: number
}
// PageScanJob
{
scanId: string
url: string
crawlId?: string // present for crawl-initiated scans
siteId?: string
}
// AiEnrichmentJob
{
scanId: string
crawlId?: string
}Worker entry point
src/worker/index.ts registers all processors and handles graceful shutdown:
// Simplified
const crawlWorker = new Worker('crawl-discovery', crawlProcessor, { connection, concurrency: 1 })
const scanWorker = new Worker('page-scan', pageScanProcessor, { connection, concurrency: WORKER_CONCURRENCY })
const aiWorker = new Worker('ai-enrichment', aiEnrichmentProcessor, { connection, concurrency: AI_CONCURRENCY })
// Graceful shutdown on SIGTERM/SIGINT
process.on('SIGTERM', async () => {
await Promise.all([crawlWorker.close(), scanWorker.close(), aiWorker.close()])
process.exit(0)
})Job retry behavior
Failed jobs are not automatically retried by default. The job processors catch errors, update the scan/crawl status to failed, and re-throw so BullMQ marks the job as failed.
To add retries, pass { attempts: 3, backoff: { type: 'exponential' } } to the job options in the enqueue call.
Monitoring
Use Bull Board at http://localhost:3001 (or BULL_BOARD_PORT) to inspect queue state, view failed jobs, and manually retry them.
Next steps
Last updated on