Workers¶
The worker layer bridges the message queue and the domain layer. Workers
are long-running Python processes (one per RabbitMQ queue) that consume
messages and delegate execution to registered operations. They are
transport-agnostic with respect to domain logic: operations are resolved
by name from the OperationRegistry and receive only a database session
and an emit callback.
Base worker¶
BaseWorker is the core execution engine. It implements the two-session
pattern that decouples job claiming from job execution:
- Session 1. Claim.
Loads the job, asserts it is in
QUEUEDstatus, transitions it toRUNNING, writes ajob.startedevent, and commits. After this commit the job is visible as running to any monitoring tool or frontend. The session is then closed.- Session 2. Execute.
Opens a fresh session, resolves the operation from the registry, and calls
operation.execute(session, payload, emit=emit). On success, transitions the job toSUCCEEDED(or marks it as deferred if the operation returnsOperationResult(deferred=True)). On exception, transitions toFAILED, stores the error class name and message, and re-raises.
The two-session design ensures durability: a crash in the execute phase
leaves the claim committed (RUNNING is visible) while the result is
not, which is the correct observable state. No session is held open across
a long-running GPU inference call.
Three exceptional flows are handled explicitly:
RetryLaterError: the job is reset to
QUEUEDand the consumer re-publishes it afterdelay_seconds. Used by the embedding coordinator when the GPU is already occupied.Parent cancellation: if a child job’s parent was cancelled between claim and execute, the child transitions to
CANCELLEDwithout running.Corrupt execute session: if the execute session fails to commit (e.g. the DB connection drops mid-operation), a fallback session marks the job
FAILEDso it is never permanently stuck inRUNNING.
- class protea.workers.base_worker.BaseWorker(session_factory: sessionmaker[Session], registry: OperationRegistry, config: WorkerConfig, *, amqp_url: str | None = None)¶
Bases:
objectExecutes queued jobs using a two-session pattern.
Session 1 (claim): transitions the job from QUEUED → RUNNING and commits. Session 2 (execute): resolves the operation, runs it, and transitions to SUCCEEDED or FAILED. Every state change is recorded as a JobEvent row.
This class is transport-agnostic: it receives a job_id and handles the rest. The caller (QueueConsumer) is responsible for acking/nacking.
- extend_lease(job_id: UUID) None¶
Extend the
leased_untiltimestamp for a running job.Called periodically by the consumer heartbeat loop while a job is in progress. Uses a fresh session so the heartbeat never interferes with the execute session’s transaction. No-op when the job is no longer in RUNNING state (e.g. already succeeded or was externally cancelled).
- handle_job(job_id: UUID) None¶
Claim and execute a single job identified by
job_id.Silently returns if the job does not exist or is not in QUEUED status. Transient infrastructure failures (Postgres deadlocks, brief connection resets) are retried up to 3 times with exponential backoff before the job is marked FAILED. Re-raises any exception from the operation after recording FAILED status.
- requeue_on_shutdown(job_id: UUID) bool¶
Transition a RUNNING job back to QUEUED so it can be re-dispatched.
Called by the queue consumer when SIGTERM/SIGINT arrives mid-job: rather than force-failing the in-flight job (which loses work and requires manual re-dispatch), flip it back to QUEUED with a cleared
leased_untilandstarted_atso the next consumer can claim it. The accompanying delivery is republished by the caller, mirroring the spec’s “NACK-requeue + UPDATE job back to PLANNED” semantics under the pre-ack consumer pattern (the original delivery has already been acked to avoid AMQPconsumer_timeout).Returns
Trueon a successful re-queue,Falsewhen the job is no longer RUNNING (already finished naturally) or the UPDATE could not commit. Idempotent on terminal states.
- exception protea.workers.base_worker.WorkerShutdown¶
Bases:
ExceptionRaised when a worker is asked to exit while a job is in flight.
Used by the consumer SIGTERM handler to mark the in-flight job FAILED through
BaseWorker._force_fail_jobbefore the process exits, so deploy-keeper redeploys never leave jobs orphaned in RUNNING.
Worker entry points¶
Workers are started by scripts/worker.py via scripts/manage.sh.
Each process is bound to a single RabbitMQ queue and registers all
operations at startup, making every worker capable of executing any
operation routed to its queue.
# Start the full stack (all workers + API + frontend)
bash scripts/manage.sh start [N]
# Start a single worker manually (for debugging)
poetry run python scripts/worker.py --queue protea.jobs
# Start the stale job reaper (periodic cleanup process)
poetry run python scripts/worker.py --queue reaper
QueueConsumer vs OperationConsumer¶
Two consumer patterns exist in protea/infrastructure/queue/consumer.py,
selected by the queue configuration in scripts/worker.py:
- QueueConsumer
Reads a job UUID from the queue and delegates to
BaseWorker.handle_job(). Creates fullJobrows with status transitions and aJobEventaudit log. Used for queues where observability and traceability matter:protea.ping: smoke testprotea.jobs: ingestion, ontology / annotation loaders, andgenerate_evaluation_setprotea.embeddings: serialised embedding coordinatorprotea.predictions: serialised prediction coordinatorprotea.training: serialised dataset-export coordinator (export_research_dataset)protea.evaluations:run_cafa_evaluationrunner
- OperationConsumer
Reads a raw serialised operation payload from the queue and executes it directly, without a
Jobrow. Used for high-throughput batch queues where creating thousands of child rows per pipeline run would cause significant write contention and table bloat. Progress is tracked exclusively through atomic increments to the parent job’sprogress_currentcounter:protea.embeddings.batch: GPU inference per batchprotea.embeddings.write: bulk pgvector insertprotea.predictions.batch: KNN search + GO transferprotea.predictions.write: bulk GOPrediction insert
Graceful shutdown
protea.workers.shutdown provides the signal-handler setup that
enables graceful shutdown of long-running worker processes. It installs
handlers for SIGTERM and SIGINT that set a shared stop_event,
allowing the main worker loop to drain the current batch and exit cleanly
rather than being killed mid-operation.
SIGTERM/SIGINT grace-period helper for queue consumers.
When deploy-keeper redeploys the stack it sends SIGTERM to running workers. If a job is in flight, the worker process dies before the FAILED transition is committed and the DB row stays in RUNNING forever.
ShutdownGuard encapsulates the watchdog timer that turns this into
a graceful shutdown:
The consumer reports the in-flight job id via
track(job_id)/untrack()around its callback.The consumer calls
arm(job_id)from its signal handler.The guard schedules a daemon
threading.Timer; if the job has not finished withingrace_secondsthe timer fires, marks the job FAILED witherror_code=WorkerShutdownthrough the suppliedforce_failcallable, and exits the process withos._exit(143)(128 + SIGTERM by convention) so init/systemd does not need to escalate to SIGKILL.If the callback finishes naturally before the timer fires,
cancel()releases the watchdog.
Extracted from infrastructure/queue/consumer.py to keep that file
under the file-LOC smell threshold (>800 LOC).
- class protea.workers.shutdown.HeartbeatLoop(extend_lease: Callable[[UUID], None], *, interval_seconds: int)¶
Bases:
objectBackground thread that periodically extends a job’s lease.
Started by
QueueConsumeraroundBaseWorker.handle_jobso theleased_untilcolumn tracks process liveness rather than the initial claim wall-clock.StaleJobReaperuses the same column to decide when an abandoned job is safe to re-enqueue. If the heartbeat thread itself dies (unhandled exception duringextend_lease) the job will simply expire at the next reaper cycle, which is the intended failure mode.Thread-safety:
start/stopare called from the main pika IO thread; the worker callback (extend_lease) is invoked from the daemon worker thread but uses its own short-lived DB session, so there is no shared mutable state with the IO thread.- start(job_id: UUID) None¶
Begin extending the lease for
job_idevery interval seconds.Idempotent: a second start without an intervening stop is a no-op so consumers that re-enter the dispatch path (e.g. RetryLater) do not stack heartbeat threads.
- stop() None¶
Signal the heartbeat thread to exit and wait briefly for it.
- class protea.workers.shutdown.ShutdownGuard(force_fail: Callable[[UUID, Exception], None], *, grace_seconds: int, requeue: Callable[[UUID], bool] | None = None, on_requeue: Callable[[UUID], None] | None = None)¶
Bases:
objectCoordinate force-fail-on-shutdown for a queue consumer.
Thread-safety:
track/untrackrun on the main (pika IO) thread;arm/cancelmay run on the signal-handler frame (also main thread)._fireruns on the watchdog Timer thread. The Python signal-delivery model only switches between bytecodes so reads/writes of_current_job_idfrom the main thread do not race with their own signal-handler equivalents. The Timer thread only reads_current_job_id, so a stale read is safe (the worst case is a no-op force-fail).Re-queue-first policy (F-OPS-JOBS.1 SIGTERM handler): when an optional
requeuecallable is supplied, the watchdog tries it before falling back toforce_fail.requeuereturnsTruewhen it successfully flipped the row back to QUEUED, in which case the caller is expected to republish the AMQP delivery so a sibling worker can pick it up.force_failremains the safety net for rows that no longer match (already terminal) or sessions that cannot commit.- arm(job_id: UUID) None¶
Schedule the force-fail watchdog if one is not already armed.
Idempotent: a second SIGTERM during shutdown does not arm a second timer.
- cancel() None¶
Cancel the pending watchdog (if any). Safe to call repeatedly.
- property current_job_id: UUID | None¶
- property timer: Timer | None¶
- track(job_id: UUID) None¶
Mark
job_idas the in-flight job.
- untrack() None¶
Clear the in-flight tracker after the callback finishes.
Stale job reaper¶
The StaleJobReaper is a periodic background process that scans for jobs
stuck in RUNNING status beyond a configurable timeout (default: 21 600
seconds = 6 hours). It marks them as FAILED with error code
JobTimeout. The reaper runs every 60 seconds and is started via
scripts/worker.py --queue reaper.
Periodic reaper that marks long-running jobs as FAILED.
Workers are single-threaded and cannot be interrupted mid-operation without
risking data corruption. Instead, this lightweight reaper runs on a timer
and transitions any job that has been in RUNNING status for longer than
timeout_seconds to FAILED with error_code JobTimeout.
Progress-aware grace period¶
Deferred coordinator jobs (e.g. compute_embeddings, predict_go_terms)
may legitimately run longer than timeout_seconds because the actual work
is done by child batch messages on downstream queues. Killing these jobs
at the global wall-clock threshold would drop hours of successful work on
the floor.
To avoid that, the reaper applies a second check to every candidate: if the
job has produced any JobEvent within the last stall_seconds window,
it is considered alive and left in place. Only truly stalled jobs — no
events for stall_seconds — are marked FAILED. The hard timeout_seconds
still acts as the lower bound (a job under the timeout is never touched),
so this is strictly more permissive than the previous behaviour.
Usage:
reaper = StaleJobReaper(session_factory, timeout_seconds=21600)
reaper.run(interval_seconds=60) # checks every minute
- protea.workers.stale_job_reaper.LEASE_EXPIRED_FAILED_EVENT = 'job.lease_expired'¶
Event written when the reaper gives up after exhausting requeue attempts.
- protea.workers.stale_job_reaper.LEASE_REQUEUE_EVENT = 'job.lease_expired_requeue'¶
Event name written every time the reaper re-enqueues a lease-expired job. Counted by
StaleJobReaper._lease_requeue_attempts()to honour themax_lease_requeuesbudget before falling back tolease_expired.
- class protea.workers.stale_job_reaper.StaleJobReaper(session_factory: sessionmaker[Session], timeout_seconds: int = 3600, *, stall_seconds: int = 1800, amqp_url: str | None = None, max_lease_requeues: int = 3)¶
Bases:
object- run(interval_seconds: int = 60) None¶
See also
Job Lifecycle: the two-session lifecycle and parent-child coordinator pattern that
BaseWorkerimplements.Operations: what these workers actually run.
ADR-002: Two-session worker pattern: design rationale.
ADR-003: Two types of consumer: when each consumer subclass applies.