Background Jobs — Architecture¶
Audience: Architects, tech leads, senior engineers.
Context and Purpose¶
Swisper needs to perform work outside the interactive request-response cycle: fetching emails, syncing calendars, classifying messages, sending notifications, and maintaining system state. The Background Jobs system provides a standardized framework for these tasks.
The architectural goals are:
- Uniform lifecycle — Every job follows the same init → execute → cleanup pattern with automatic correlation tracing
- External scheduling — No internal scheduler; jobs are stateless executables triggered by external orchestration
- Observability — Every job run is traceable via a unique correlation ID that flows through all logs, LLM calls, and database operations
- Isolation — Each job type runs in its own process; failures in one job do not affect others
Architecture Overview¶
graph TD
subgraph Scheduler["External Scheduler (Cron / K8s CronJob)"]
CRON["Schedule Trigger"]
end
subgraph CLI["CLI Runner (main.py)"]
PARSE["Parse job name"]
INIT["Initialize config + LLM providers"]
EXEC["Execute job"]
end
subgraph Framework["Job Framework"]
REG["Job Registry (JOB_MAP)"]
BASE["BaseJob ABC"]
end
subgraph Jobs["Job Implementations"]
IE["IngestEmailsJob"]
IC["IngestCalendarEventsJob"]
CE["ClassifyEmailsJob"]
DB["DailyBriefingJob"]
IEN["SendImportantEmailNotificationsJob"]
PMP["PreMeetingPrepJob"]
CR["CommitmentReminderJob"]
AR["AwaitingResponseNotificationJob"]
FD["FactDecayBatchJob"]
RM["RedisExpirationMonitoringJob"]
TP["ThreemaPollingJob"]
end
subgraph Infra["Infrastructure"]
REDIS["Redis"]
PG["PostgreSQL"]
LLM["LLM Providers"]
SIGNALS["SignalsService"]
end
CRON --> PARSE --> REG --> INIT --> EXEC
EXEC --> BASE
BASE --> Jobs
Jobs --> REDIS & PG & LLM & SIGNALS
Component Responsibilities¶
| Component | File | Responsibility |
|---|---|---|
| BaseJob | jobs/base_job.py |
ABC with run() (generates correlation ID, logs start/end/error, measures duration) and abstract execute() |
| Job Registry | jobs/job_registry.py |
JOB_MAP dict: maps string job names to async runner functions |
| CLI Runner | jobs/main.py |
Entry point: parses sys.argv[1] job name, calls initialize_configuration(), initializes LLM providers (Kvant, Azure, Vertex), runs the job via asyncio.run() |
| Prompt Templates | jobs/prompts/ |
Markdown prompt files for LLM-powered jobs (notification personalization, email classification) |
| Notification Personalization | jobs/notification_personalisation.py |
Loads user preferences (tone, language, detail level) for personalizing notification content |
| Contact Utils | jobs/contact_utils.py |
Contact deduplication logic shared across ingestion jobs |
BaseJob Lifecycle¶
class BaseJob(ABC):
def __init__(self, job_name: str): ...
@abstractmethod
async def execute(self) -> dict[str, Any] | None: ...
async def run(self) -> dict[str, Any] | None:
# 1. Generate correlation_id = f"job-{uuid4()}"
# 2. set_correlation_id(correlation_id)
# 3. Log job start with job_name and correlation_id
# 4. Record start time
# 5. await self.execute()
# 6. Log success with duration
# 7. Return result
# finally: clear_correlation_id()
All external callers use run(), never execute() directly. This guarantees that every job run has a correlation ID, timing, and error handling.
Job Registry¶
| Job Name | Runner Function | Job Class |
|---|---|---|
ingest_emails |
run_ingest_emails |
IngestEmailsJob |
ingest_calendar_events |
run_ingest_calendar_events |
IngestCalendarEventsJob |
classify_emails |
run_classify_emails |
ClassifyEmailsJob |
send_notifications |
run_send_notifications |
SendImportantEmailNotificationsJob |
fact_decay |
run_fact_decay |
FactDecayBatchJob |
redis_monitoring |
run_redis_monitoring |
RedisExpirationMonitoringJob |
threema_polling |
run_threema_polling |
ThreemaPollingJob |
daily_briefing |
run_daily_briefing |
DailyBriefingJob |
pre_meeting_prep |
run_pre_meeting_prep |
PreMeetingPrepJob |
commitment_reminders |
run_commitment_reminders |
CommitmentReminderJob |
awaiting_response_notifications |
run_awaiting_response_notifications |
AwaitingResponseNotificationJob |
Per-Job Details¶
Data Ingestion Jobs¶
| Job | Key Behavior | Dependencies |
|---|---|---|
| IngestEmailsJob | Iterates all users with email tokens. Per user: fetches emails via delta sync (Office365Provider / GmailProvider), detects reply chains, creates embeddings, indexes attachments. Uses CorrelationContext per user for token tracking. |
EmailProviderFactory, SwisperLLMAdapter (embeddings), AttachmentIndexingService |
| IngestCalendarEventsJob | Iterates all users with email tokens. Per user: fetches calendar events, stores CalendarEvent + attendees, creates embeddings. |
EmailProviderFactory, SwisperLLMAdapter (embeddings) |
| ClassifyEmailsJob | Loads unclassified emails, builds classification prompt (summary, action items, importance, labels, deadline, is_automated), calls LLM, updates email records. | SwisperLLMAdapter, EmailClassificationPromptBuilder |
Notification Jobs¶
| Job | Key Behavior | Dependencies |
|---|---|---|
| DailyBriefingJob | Finds users whose local time is 7:00–8:59 AM. Generates a personalized morning briefing using LLM with calendar, email, and fact context. Delivers via SignalsService. |
DailyBriefingService, SignalsService, SwisperLLMAdapter |
| SendImportantEmailNotificationsJob | Two modes: initial notifications for high-importance emails, and reminders for emails with approaching deadlines. Respects notification cooldowns. | SignalsService, notification_personalisation |
| PreMeetingPrepJob | Finds events starting within a configurable lookahead window (default 30 min). Generates meeting prep (attendee context, related emails, action items) via LLM. | MeetingPrepService, SignalsService, SwisperLLMAdapter |
| CommitmentReminderJob | Queries UserToDo items with deadlines in the next 24 hours. Generates personalized reminders via LLM. |
ProductivityEmailService, SignalsService, SwisperLLMAdapter |
| AwaitingResponseNotificationJob | Finds sent emails that have not received a reply within a configurable period (e.g., 3 days). Generates follow-up reminders via LLM. | ProductivityEmailService, SignalsService, SwisperLLMAdapter |
Maintenance Jobs¶
| Job | Key Behavior | Dependencies |
|---|---|---|
| FactDecayBatchJob | Updates computed_relevance for all facts based on age and importance using the decay formula. |
FactDecayBatchService |
| RedisExpirationMonitoringJob | Scans Redis keys nearing their TTL. Backs up critical LangGraph checkpoint state to PostgreSQL before expiry. | StatePersistenceManager |
| ThreemaPollingJob | Polls the Threema Gateway for pending registration tokens. When a token is found, activates the corresponding UserIntegration record. |
ThreemaPollingService |
Correlation ID Tracing¶
Every job run generates a unique correlation ID of the form job-{uuid4()}. This ID is:
- Set in a
ContextVarat the start ofBaseJob.run() - Automatically included in all log messages via
get_correlated_logger() - Passed to
initialize_tracking()for token usage attribution - Cleared in the
finallyblock ofBaseJob.run()
For multi-user jobs (email ingestion, calendar sync), a CorrelationContext is used to scope the correlation ID per user within the job run.
Key Design Decisions¶
Decision: External scheduling, stateless executables
- Chosen: Jobs are CLI executables triggered by external cron/orchestration
- Rejected: Internal scheduler (APScheduler, Celery Beat), event-driven triggers
- Rationale: Stateless executables are simpler to deploy, scale, and monitor in Kubernetes. Each job run is a fresh process with no shared state. External scheduling gives operations full control over frequency without code changes.
Decision: BaseJob ABC with correlation tracing
- Chosen: Abstract base class that handles correlation ID, logging, and timing for all jobs
- Rejected: Decorator-based approach, no base class
- Rationale: The ABC ensures uniform observability across all jobs. Every job run is traceable via its correlation ID. Developers cannot forget to set up tracing because
run()handles it automatically.
Decision: LLM-powered notification content
- Chosen: Notification jobs use LLMs to generate personalized message content
- Rejected: Template-based static messages
- Rationale: LLM-generated content adapts to context (calendar events, email subjects, user preferences) and can be personalized in tone and language. Prompt templates are stored as markdown files for easy iteration.
Known Trade-offs and Debt¶
| Item | Impact | Remediation |
|---|---|---|
| No concurrency guard | Running two instances of the same job simultaneously could cause duplicate processing (e.g., duplicate email ingestion) | Add distributed locking (Redis lock per job name) before execute() |
| No retry mechanism | If a job fails, the external scheduler may re-trigger it, but there is no built-in retry with backoff or dead-letter queue | Add retry logic in BaseJob.run() or use a job queue (e.g., Celery) |
| No job status persistence | Job success/failure is logged but not stored in a queryable table. There is no dashboard for job run history | Add a job_runs table recording start time, end time, status, and error message |
| Single-process per job | Large jobs (e.g., email ingestion for many users) run sequentially within a single process | Consider partitioning users across parallel job instances |