Skip to content

Data Flow

How emails are processed through the SPOT Platform.

Overview

SPOT uses asynchronous job processing with RabbitMQ for scalability and reliability.

Email Analysis Flow

1. Job Submission

Client -> API Gateway -> RabbitMQ -> Analyzer Orchestrator

Flow:

  1. Client sends POST /api/v1/analyze request
  2. API Gateway validates email format (spot-sdk Email model)
  3. API Gateway generates job ID
  4. API Gateway publishes to spot.analysis exchange with email.analyze routing key
  5. API Gateway returns job_id to client immediately

2. Job Processing

Analyzer Orchestrator -> Workflow Engine -> Analyzers -> PostgreSQL

Flow:

  1. Orchestrator consumes from orchestrator.analysis.requests queue
  2. Loads workflow definition
  3. Executes parallel analysis stage:
  4. Sends POST /internal/analyze to each analyzer
  5. Waits for responses (with timeout and retries)
  6. Executes decision stage:
  7. Aggregates analyzer results
  8. Calculates threat level
  9. Determines recommended action
  10. Stores result in PostgreSQL
  11. Publishes completion to spot.results exchange

3. Status Query

Client -> API Gateway -> RabbitMQ RPC -> Analyzer Orchestrator

Flow:

  1. Client sends GET /api/v1/analyze/{job_id} request
  2. API Gateway publishes RPC request to spot.status exchange
  3. Orchestrator consumes from job.status.requests queue
  4. Orchestrator checks JobManager (in-memory cache)
  5. Falls back to PostgreSQL if not in memory
  6. Response sent via RPC reply queue
  7. API Gateway returns status to client

Sequence Diagram

Client          API Gateway        RabbitMQ         Orchestrator     Analyzers        PostgreSQL
  |                 |                 |                 |                |                |
  |--Submit Email-->|                 |                 |                |                |
  |                 |--Publish------->|                 |                |                |
  |<--Return ID-----|                 |                 |                |                |
  |                 |                 |--Consume------->|                |                |
  |                 |                 |                 |--HTTP POST---->|                |
  |                 |                 |                 |<--Response-----|                |
  |                 |                 |                 |--Store Result----------------->|
  |                 |                 |<--Publish-------|                |                |
  |                 |                 |                 |                |                |
  |--Check Status-->|                 |                 |                |                |
  |                 |--RPC Request--->|                 |                |                |
  |                 |                 |--Consume------->|                |                |
  |                 |                 |<--RPC Reply-----|                |                |
  |<--Return Result-|                 |                 |                |                |

Workflow Execution

Default Workflow

id: default-workflow
name: Default Analysis
stages:
  - id: parallel-analysis
    type: parallel
    analyzers:
      - nlp-analyzer
      - llm-analyzer
      - misp-analyzer
    min_success: 2

  - id: decision
    type: decision
    aggregator: weighted-average

Execution:

  1. Parallel Analysis Stage
  2. Runs all analyzers simultaneously via HTTP
  3. Waits for at least min_success to succeed
  4. Continues if threshold met
  5. Fails job if too few succeed

  6. Decision Stage

  7. Aggregates results using weighted average
  8. Determines overall threat level
  9. Recommends action based on confidence

Analyzer Communication

Each analyzer receives:

{
  "email": {
    "headers": {...},
    "body_text": "...",
    "body_html": "..."
  }
}

Each analyzer returns:

{
  "is_phishing": boolean,
  "threat_level": "none|low|medium|high|critical",
  "confidence": 0.0-1.0,
  "indicators": [
    {
      "type": "domain_spoofing",
      "value": "microsft.com vs microsoft.com",
      "confidence": 0.95
    }
  ]
}

Error Handling

Analyzer Failures

If an analyzer fails:

  1. Orchestrator logs error
  2. Retries with exponential backoff (max 3 attempts)
  3. Continues with remaining analyzers
  4. Checks if min_success requirement met
  5. Fails job if threshold not met

RabbitMQ Failures

If RabbitMQ unavailable:

  1. API Gateway returns 503 Service Unavailable
  2. Client should retry with exponential backoff

Database Failures

If PostgreSQL unavailable:

  1. New jobs still accepted (queued in RabbitMQ)
  2. Status queries fail for completed jobs
  3. Jobs remain in queue until database recovers

Monitoring Points

Queue Metrics

  • orchestrator.analysis.requests queue depth
  • Consumer lag time
  • Message processing rate

Processing Metrics

  • Job completion time
  • Analyzer success rate
  • Analyzer response time

Database Metrics

  • Query execution time
  • Connection pool usage

Data Retention

Analysis results:

  • Kept for 90 days by default
  • Configurable via RETENTION_DAYS

Queue messages:

  • TTL: 24 hours
  • Dead letter queue for failed messages
  • Retry limit: 3 attempts

Security Considerations

Data in Transit

  • API: HTTPS with TLS 1.2+
  • RabbitMQ: Internal network only
  • PostgreSQL: Internal network only

Data at Rest

  • Database: Encrypted volumes recommended
  • Queue: Persistent messages on disk

Data Privacy

  • Email content not logged
  • PII redacted from logs
  • Analysis results include only metadata