Skip to content

Background Jobs — Architecture

Audience: Architects, tech leads, senior engineers.


Context and Purpose

Swisper needs to perform work outside the interactive request-response cycle: fetching emails, syncing calendars, classifying messages, sending notifications, and maintaining system state. The Background Jobs system provides a standardized framework for these tasks.

The architectural goals are:

  • Uniform lifecycle — Every job follows the same init → execute → cleanup pattern with automatic correlation tracing
  • External scheduling — No internal scheduler; jobs are stateless executables triggered by external orchestration
  • Observability — Every job run is traceable via a unique correlation ID that flows through all logs, LLM calls, and database operations
  • Isolation — Each job type runs in its own process; failures in one job do not affect others

Architecture Overview

graph TD
    subgraph Scheduler["External Scheduler (Cron / K8s CronJob)"]
        CRON["Schedule Trigger"]
    end

    subgraph CLI["CLI Runner (main.py)"]
        PARSE["Parse job name"]
        INIT["Initialize config + LLM providers"]
        EXEC["Execute job"]
    end

    subgraph Framework["Job Framework"]
        REG["Job Registry (JOB_MAP)"]
        BASE["BaseJob ABC"]
    end

    subgraph Jobs["Job Implementations"]
        IE["IngestEmailsJob"]
        IC["IngestCalendarEventsJob"]
        CE["ClassifyEmailsJob"]
        DB["DailyBriefingJob"]
        IEN["SendImportantEmailNotificationsJob"]
        PMP["PreMeetingPrepJob"]
        CR["CommitmentReminderJob"]
        AR["AwaitingResponseNotificationJob"]
        FD["FactDecayBatchJob"]
        RM["RedisExpirationMonitoringJob"]
        TP["ThreemaPollingJob"]
    end

    subgraph Infra["Infrastructure"]
        REDIS["Redis"]
        PG["PostgreSQL"]
        LLM["LLM Providers"]
        SIGNALS["SignalsService"]
    end

    CRON --> PARSE --> REG --> INIT --> EXEC
    EXEC --> BASE
    BASE --> Jobs
    Jobs --> REDIS & PG & LLM & SIGNALS

Component Responsibilities

Component File Responsibility
BaseJob jobs/base_job.py ABC with run() (generates correlation ID, logs start/end/error, measures duration) and abstract execute()
Job Registry jobs/job_registry.py JOB_MAP dict: maps string job names to async runner functions
CLI Runner jobs/main.py Entry point: parses sys.argv[1] job name, calls initialize_configuration(), initializes LLM providers (Kvant, Azure, Vertex), runs the job via asyncio.run()
Prompt Templates jobs/prompts/ Markdown prompt files for LLM-powered jobs (notification personalization, email classification)
Notification Personalization jobs/notification_personalisation.py Loads user preferences (tone, language, detail level) for personalizing notification content
Contact Utils jobs/contact_utils.py Contact deduplication logic shared across ingestion jobs

BaseJob Lifecycle

class BaseJob(ABC):
    def __init__(self, job_name: str): ...

    @abstractmethod
    async def execute(self) -> dict[str, Any] | None: ...

    async def run(self) -> dict[str, Any] | None:
        # 1. Generate correlation_id = f"job-{uuid4()}"
        # 2. set_correlation_id(correlation_id)
        # 3. Log job start with job_name and correlation_id
        # 4. Record start time
        # 5. await self.execute()
        # 6. Log success with duration
        # 7. Return result
        # finally: clear_correlation_id()

All external callers use run(), never execute() directly. This guarantees that every job run has a correlation ID, timing, and error handling.


Job Registry

Job Name Runner Function Job Class
ingest_emails run_ingest_emails IngestEmailsJob
ingest_calendar_events run_ingest_calendar_events IngestCalendarEventsJob
classify_emails run_classify_emails ClassifyEmailsJob
send_notifications run_send_notifications SendImportantEmailNotificationsJob
fact_decay run_fact_decay FactDecayBatchJob
redis_monitoring run_redis_monitoring RedisExpirationMonitoringJob
threema_polling run_threema_polling ThreemaPollingJob
daily_briefing run_daily_briefing DailyBriefingJob
pre_meeting_prep run_pre_meeting_prep PreMeetingPrepJob
commitment_reminders run_commitment_reminders CommitmentReminderJob
awaiting_response_notifications run_awaiting_response_notifications AwaitingResponseNotificationJob

Per-Job Details

Data Ingestion Jobs

Job Key Behavior Dependencies
IngestEmailsJob Iterates all users with email tokens. Per user: fetches emails via delta sync (Office365Provider / GmailProvider), detects reply chains, creates embeddings, indexes attachments. Uses CorrelationContext per user for token tracking. EmailProviderFactory, SwisperLLMAdapter (embeddings), AttachmentIndexingService
IngestCalendarEventsJob Iterates all users with email tokens. Per user: fetches calendar events, stores CalendarEvent + attendees, creates embeddings. EmailProviderFactory, SwisperLLMAdapter (embeddings)
ClassifyEmailsJob Loads unclassified emails, builds classification prompt (summary, action items, importance, labels, deadline, is_automated), calls LLM, updates email records. SwisperLLMAdapter, EmailClassificationPromptBuilder

Notification Jobs

Job Key Behavior Dependencies
DailyBriefingJob Finds users whose local time is 7:00–8:59 AM. Generates a personalized morning briefing using LLM with calendar, email, and fact context. Delivers via SignalsService. DailyBriefingService, SignalsService, SwisperLLMAdapter
SendImportantEmailNotificationsJob Two modes: initial notifications for high-importance emails, and reminders for emails with approaching deadlines. Respects notification cooldowns. SignalsService, notification_personalisation
PreMeetingPrepJob Finds events starting within a configurable lookahead window (default 30 min). Generates meeting prep (attendee context, related emails, action items) via LLM. MeetingPrepService, SignalsService, SwisperLLMAdapter
CommitmentReminderJob Queries UserToDo items with deadlines in the next 24 hours. Generates personalized reminders via LLM. ProductivityEmailService, SignalsService, SwisperLLMAdapter
AwaitingResponseNotificationJob Finds sent emails that have not received a reply within a configurable period (e.g., 3 days). Generates follow-up reminders via LLM. ProductivityEmailService, SignalsService, SwisperLLMAdapter

Maintenance Jobs

Job Key Behavior Dependencies
FactDecayBatchJob Updates computed_relevance for all facts based on age and importance using the decay formula. FactDecayBatchService
RedisExpirationMonitoringJob Scans Redis keys nearing their TTL. Backs up critical LangGraph checkpoint state to PostgreSQL before expiry. StatePersistenceManager
ThreemaPollingJob Polls the Threema Gateway for pending registration tokens. When a token is found, activates the corresponding UserIntegration record. ThreemaPollingService

Correlation ID Tracing

Every job run generates a unique correlation ID of the form job-{uuid4()}. This ID is:

  1. Set in a ContextVar at the start of BaseJob.run()
  2. Automatically included in all log messages via get_correlated_logger()
  3. Passed to initialize_tracking() for token usage attribution
  4. Cleared in the finally block of BaseJob.run()

For multi-user jobs (email ingestion, calendar sync), a CorrelationContext is used to scope the correlation ID per user within the job run.


Key Design Decisions

Decision: External scheduling, stateless executables

  • Chosen: Jobs are CLI executables triggered by external cron/orchestration
  • Rejected: Internal scheduler (APScheduler, Celery Beat), event-driven triggers
  • Rationale: Stateless executables are simpler to deploy, scale, and monitor in Kubernetes. Each job run is a fresh process with no shared state. External scheduling gives operations full control over frequency without code changes.

Decision: BaseJob ABC with correlation tracing

  • Chosen: Abstract base class that handles correlation ID, logging, and timing for all jobs
  • Rejected: Decorator-based approach, no base class
  • Rationale: The ABC ensures uniform observability across all jobs. Every job run is traceable via its correlation ID. Developers cannot forget to set up tracing because run() handles it automatically.

Decision: LLM-powered notification content

  • Chosen: Notification jobs use LLMs to generate personalized message content
  • Rejected: Template-based static messages
  • Rationale: LLM-generated content adapts to context (calendar events, email subjects, user preferences) and can be personalized in tone and language. Prompt templates are stored as markdown files for easy iteration.

Known Trade-offs and Debt

Item Impact Remediation
No concurrency guard Running two instances of the same job simultaneously could cause duplicate processing (e.g., duplicate email ingestion) Add distributed locking (Redis lock per job name) before execute()
No retry mechanism If a job fails, the external scheduler may re-trigger it, but there is no built-in retry with backoff or dead-letter queue Add retry logic in BaseJob.run() or use a job queue (e.g., Celery)
No job status persistence Job success/failure is logged but not stored in a queryable table. There is no dashboard for job run history Add a job_runs table recording start time, end time, status, and error message
Single-process per job Large jobs (e.g., email ingestion for many users) run sequentially within a single process Consider partitioning users across parallel job instances