# Cloudflare Queue Architecture
This document describes the architecture of the Cloudflare Queue integration for asynchronous filter list compilation.
## Queue Flow Diagram
```mermaid
graph TB
subgraph "Client Layer"
CLIENT[Client/Browser]
end
subgraph "API Endpoints"
ASYNC_EP[POST /compile/async]
BATCH_EP[POST /compile/batch/async]
SYNC_EP[POST /compile]
end
subgraph "Queue Producer"
ENQUEUE[Queue Message Producer]
GEN_ID[Generate Request ID]
CREATE_MSG[Create Queue Message]
end
subgraph "Cloudflare Queue"
QUEUE[(adblock-compiler-worker-queue)]
QUEUE_BATCH[Message Batching
max_batch_size: 10
max_batch_timeout: 5s]
end
subgraph "Queue Consumer"
CONSUMER[Queue Consumer Handler]
DISPATCHER[Message Type Dispatcher]
COMPILE_PROC[Process Compile Message]
BATCH_PROC[Process Batch Message]
CACHE_PROC[Process Cache Warm Message]
CHUNK_PROC[Chunk Processor
Concurrency: 3]
end
subgraph "Compilation Layer"
COMPILER[WorkerCompiler]
TRACING[Tracing Context]
DIAGNOSTICS[Diagnostics Collector]
end
subgraph "Storage Layer"
KV_CACHE[(KV: COMPILATION_CACHE
TTL: 1 hour)]
KV_METRICS[(KV: METRICS
Aggregation: 5 min)]
COMPRESS[Gzip Compression
~70-80% reduction]
end
subgraph "Monitoring Layer"
TAIL[Tail Worker]
CONSOLE[Console Logs]
ANALYTICS[Analytics Engine]
end
CLIENT -->|POST request| ASYNC_EP
CLIENT -->|POST request| BATCH_EP
CLIENT -->|GET cached result| SYNC_EP
ASYNC_EP -->|Single compilation| ENQUEUE
BATCH_EP -->|Batch compilations| ENQUEUE
ENQUEUE --> GEN_ID
GEN_ID --> CREATE_MSG
CREATE_MSG -->|send()| QUEUE
QUEUE --> QUEUE_BATCH
QUEUE_BATCH -->|Batched messages| CONSUMER
CONSUMER --> DISPATCHER
DISPATCHER -->|type: 'compile'| COMPILE_PROC
DISPATCHER -->|type: 'batch-compile'| BATCH_PROC
DISPATCHER -->|type: 'cache-warm'| CACHE_PROC
COMPILE_PROC --> COMPILER
BATCH_PROC --> CHUNK_PROC
CACHE_PROC --> CHUNK_PROC
CHUNK_PROC --> COMPILE_PROC
COMPILER --> TRACING
COMPILER --> DIAGNOSTICS
COMPILE_PROC --> COMPRESS
COMPRESS --> KV_CACHE
COMPILE_PROC -->|Metrics| KV_METRICS
COMPILE_PROC -->|Diagnostics| TAIL
COMPILE_PROC -->|Logs| CONSOLE
SYNC_EP -.->|Read cache| KV_CACHE
DIAGNOSTICS -.->|Events| TAIL
CONSOLE -.->|Stream| TAIL
TAIL -.->|Process logs| ANALYTICS
style QUEUE fill:#f9f,stroke:#333,stroke-width:4px
style CONSUMER fill:#bbf,stroke:#333,stroke-width:4px
style KV_CACHE fill:#bfb,stroke:#333,stroke-width:2px
style COMPILER fill:#fbb,stroke:#333,stroke-width:2px
```
## Message Types Flow
```mermaid
sequenceDiagram
participant C as Client
participant API as API Endpoint
participant Q as Queue
participant QC as Queue Consumer
participant Comp as Compiler
participant Cache as KV Cache
Note over C,Cache: Single Compile Flow
C->>API: POST /compile/async
API->>API: Generate Request ID
API->>Q: Send CompileQueueMessage
API-->>C: 202 Accepted (requestId)
Q->>QC: Deliver message batch
QC->>QC: Dispatch by type
QC->>Comp: Execute compilation
Comp->>Comp: Apply transformations
Comp-->>QC: Compiled rules + metrics
QC->>Cache: Store compressed result
QC->>Q: ACK message
Note over C,Cache: Batch Compile Flow
C->>API: POST /compile/batch/async
API->>API: Generate Request ID
API->>Q: Send BatchCompileQueueMessage
API-->>C: 202 Accepted (requestId, batchSize)
Q->>QC: Deliver message batch
QC->>QC: Process in chunks (3)
loop Each chunk
QC->>Comp: Compile (parallel up to 3)
Comp-->>QC: Results
QC->>Cache: Store results
end
QC->>Q: ACK message
Note over C,Cache: Cache Result Retrieval
C->>API: POST /compile (with config)
API->>Cache: Check for cached result
Cache-->>API: Compressed result
API->>API: Decompress
API-->>C: 200 OK (rules, cached: true)
```
## Processing Architecture
```mermaid
flowchart TD
START[Queue Message Received] --> VALIDATE{Validate
Message Type}
VALIDATE -->|compile| SINGLE[Single Compilation]
VALIDATE -->|batch-compile| BATCH[Batch Compilation]
VALIDATE -->|cache-warm| WARM[Cache Warming]
VALIDATE -->|unknown| UNKNOWN[Unknown Type]
SINGLE --> TRACE1[Create Tracing Context]
TRACE1 --> COMP1[Run Compilation]
COMP1 --> CACHE1{Cacheable?}
CACHE1 -->|Yes| COMPRESS1[Compress Result]
CACHE1 -->|No| ACK1[ACK Message]
COMPRESS1 --> STORE1[Store in KV]
STORE1 --> ACK1
BATCH --> CHUNK[Split into Chunks
Size: 3]
CHUNK --> PARALLEL[Process Chunks in Parallel]
PARALLEL --> COMP2[Compile Each Item]
COMP2 --> STATS{All
Successful?}
STATS -->|Yes| ACK2[ACK Message]
STATS -->|No| RETRY2[RETRY Message]
WARM --> CHUNK2[Split into Chunks
Size: 3]
CHUNK2 --> PARALLEL2[Process Chunks in Parallel]
PARALLEL2 --> COMP3[Compile + Cache Each]
COMP3 --> STATS2{All
Successful?}
STATS2 -->|Yes| ACK3[ACK Message]
STATS2 -->|No| RETRY3[RETRY Message]
UNKNOWN --> ACK_UNK[ACK to prevent
infinite retries]
ACK1 --> END[Processing Complete]
ACK2 --> END
ACK3 --> END
ACK_UNK --> END
RETRY2 --> RETRY_QUEUE[Back to Queue]
RETRY3 --> RETRY_QUEUE
RETRY_QUEUE -.->|Exponential backoff| START
style START fill:#e1f5e1
style END fill:#e1f5e1
style VALIDATE fill:#fff3cd
style RETRY_QUEUE fill:#f8d7da
```
## Concurrency Control
```mermaid
graph LR
subgraph "Queue Consumer (Sequential)"
MSG1[Message 1] --> PROC1[Process]
PROC1 --> ACK1[ACK]
ACK1 --> MSG2[Message 2]
MSG2 --> PROC2[Process]
PROC2 --> ACK2[ACK]
end
subgraph "Batch Processing (Chunked)"
BATCH[10 Items]
BATCH --> CHUNK1[Chunk 1
Items 1-3]
BATCH --> CHUNK2[Chunk 2
Items 4-6]
BATCH --> CHUNK3[Chunk 3
Items 7-9]
BATCH --> CHUNK4[Chunk 4
Item 10]
CHUNK1 --> P1[Process in Parallel]
CHUNK2 --> P2[Process in Parallel]
CHUNK3 --> P3[Process in Parallel]
CHUNK4 --> P4[Process Sequentially]
end
style PROC1 fill:#bbf
style PROC2 fill:#bbf
style P1 fill:#bfb
style P2 fill:#bfb
style P3 fill:#bfb
style P4 fill:#bfb
```
## Error Handling and Retry Logic
```mermaid
stateDiagram-v2
[*] --> Queued: Message sent
Queued --> Processing: Consumer picks up
Processing --> Success: Compilation OK
Processing --> Error: Compilation fails
Success --> Cached: Store in KV
Cached --> Acknowledged: ACK message
Acknowledged --> [*]
Error --> RetryLogic: Evaluate error
RetryLogic --> Retry1: Attempt 1
Retry1 --> Processing
Retry1 --> Retry2: Still failing
Retry2 --> Processing
Retry2 --> Retry3: Still failing
Retry3 --> Processing
Retry3 --> DeadLetter: Max retries
DeadLetter --> [*]: Manual intervention
note right of Processing
Log all operations
Track metrics
Emit diagnostics
end note
note right of RetryLogic
Exponential backoff
Automatic by Cloudflare
end note
```
## Data Flow and Caching Strategy
```mermaid
flowchart LR
subgraph "Request Flow"
REQ[Client Request] --> CHECK{Cache
Exists?}
CHECK -->|Yes| RETURN[Return Cached]
CHECK -->|No| QUEUE_IT[Queue for Async]
end
subgraph "Queue Processing"
QUEUE_IT --> PROCESS[Compile]
PROCESS --> COMPRESS[Compress]
COMPRESS --> STORE[Store in Cache]
end
subgraph "Cache Layer"
STORE --> KV[KV Namespace]
KV -->|TTL: 1 hour| EXPIRE[Auto Expire]
KV -->|Hit| RETURN
end
subgraph "Metrics Layer"
PROCESS --> METRICS[Record Metrics]
METRICS --> WINDOW[5-min Windows]
WINDOW --> AGGREGATE[Aggregate Stats]
end
style CHECK fill:#fff3cd
style KV fill:#bfb
style COMPRESS fill:#bbf
```
## Monitoring and Observability
```mermaid
graph TB
subgraph "Logging Strategy"
QUEUE_LOG[Queue Handler Logs]
COMPILE_LOG[Compilation Logs]
CACHE_LOG[Cache Operation Logs]
ERROR_LOG[Error Logs]
end
subgraph "Log Prefixes"
PREFIX1["[QUEUE:HANDLER]"]
PREFIX2["[QUEUE:COMPILE]"]
PREFIX3["[QUEUE:BATCH]"]
PREFIX4["[QUEUE:CACHE-WARM]"]
PREFIX5["[QUEUE:CHUNKS]"]
PREFIX6["[API:ASYNC]"]
PREFIX7["[API:BATCH-ASYNC]"]
end
subgraph "Metrics Tracked"
M1[Processing Time]
M2[Success/Failure Ratio]
M3[Compression Ratio]
M4[Queue Depth]
M5[Cache Hit Rate]
M6[Batch Size Stats]
end
subgraph "Diagnostics"
D1[Source Download Events]
D2[Transformation Events]
D3[Validation Events]
D4[Performance Events]
end
QUEUE_LOG --> PREFIX1
COMPILE_LOG --> PREFIX2
CACHE_LOG --> PREFIX6
COMPILE_LOG --> M1
COMPILE_LOG --> M2
COMPILE_LOG --> M3
D1 --> TAIL[Tail Worker]
D2 --> TAIL
D3 --> TAIL
D4 --> TAIL
M1 --> ANALYTICS[Analytics Engine]
M2 --> ANALYTICS
M5 --> ANALYTICS
style TAIL fill:#f9f
style ANALYTICS fill:#bbf
```
## Key Features
### 1. Asynchronous Processing
- Non-blocking API endpoints
- Immediate 202 Accepted response
- Background compilation via queue
### 2. Concurrency Control
- Sequential message processing to avoid overwhelming resources
- Chunked batch processing (max 3 parallel compilations per chunk)
- Configurable chunk size
### 3. Caching Strategy
- Gzip compression reduces storage by 70-80%
- 1-hour TTL for compiled results
- Cache key based on configuration hash
- No caching for pre-fetched content
### 4. Error Handling
- Automatic retry with exponential backoff
- Message acknowledgment on success
- Retry on failure
- Unknown message types acknowledged to prevent infinite loops
### 5. Monitoring and Diagnostics
- Structured console logging with prefixes
- Detailed metrics tracking
- Diagnostic events emitted to tail worker
- Performance timing at every stage
### 6. Scalability
- Queue handles unlimited backpressure
- No rate limiting on async endpoints
- Horizontal scaling via queue
- Batch processing for efficiency
## Configuration
### Queue Settings (wrangler.toml)
```toml
[[queues.producers]]
queue = "adblock-compiler-worker-queue"
binding = "ADBLOCK_COMPILER_QUEUE"
[[queues.consumers]]
queue = "adblock-compiler-worker-queue"
max_batch_size = 10
max_batch_timeout = 5
dead_letter_queue = "dead-letter-queue"
```
### Resource Limits
- **Batch Size**: Up to 100 requests per batch
- **Chunk Size**: 3 concurrent compilations
- **Message Timeout**: 5 seconds before batch delivery
- **Cache TTL**: 3600 seconds (1 hour)
### Performance Characteristics
- **Enqueue Time**: < 100ms
- **Processing Time**: 5-30 seconds per compilation (varies by filter list size)
- **Compression Ratio**: 70-80% reduction
- **Cache Hit Rate**: High for repeated configurations
## Usage Examples
### Single Async Compilation
```bash
curl -X POST https://worker.dev/compile/async \
-H "Content-Type: application/json" \
-d '{
"configuration": {
"name": "My Filter",
"sources": [{"source": "https://example.com/filters.txt"}]
}
}'
```
Response:
```json
{
"success": true,
"message": "Compilation job queued successfully",
"requestId": "compile-1234567890-abc123"
}
```
### Batch Async Compilation
```bash
curl -X POST https://worker.dev/compile/batch/async \
-H "Content-Type: application/json" \
-d '{
"requests": [
{
"id": "filter-1",
"configuration": {
"name": "Filter 1",
"sources": [{"source": "https://example.com/filter1.txt"}]
}
},
{
"id": "filter-2",
"configuration": {
"name": "Filter 2",
"sources": [{"source": "https://example.com/filter2.txt"}]
}
}
]
}'
```
Response:
```json
{
"success": true,
"message": "Batch of 2 compilation jobs queued successfully",
"requestId": "batch-1234567890-def456",
"batchSize": 2
}
```
## Troubleshooting
### Queue Not Processing
1. Check queue exists: `wrangler queues list`
2. Verify queue bindings in `wrangler.toml`
3. Check worker logs: `wrangler tail`
### Messages Failing
1. Check error logs for specific failure reasons
2. Verify source URLs are accessible
3. Check KV namespace bindings
### Performance Issues
1. Increase `max_batch_size` for higher throughput
2. Adjust chunk size for batch processing
3. Monitor queue depth and consumer lag
## Related Documentation
- [QUEUE_SUPPORT.md](./QUEUE_SUPPORT.md) - Detailed usage guide
- [IMPLEMENTATION_SUMMARY.md](./IMPLEMENTATION_SUMMARY.md) - Implementation details
- [Cloudflare Queues Documentation](https://developers.cloudflare.com/queues/)