Workers

The worker layer bridges the message queue and the domain layer. Workers are long-running Python processes (one per RabbitMQ queue) that consume messages and delegate execution to registered operations. They are transport-agnostic with respect to domain logic: operations are resolved by name from the OperationRegistry and receive only a database session and an emit callback.

Base worker

BaseWorker is the core execution engine. It implements the two-session pattern that decouples job claiming from job execution:

Session 1. Claim.

Loads the job, asserts it is in QUEUED status, transitions it to RUNNING, writes a job.started event, and commits. After this commit the job is visible as running to any monitoring tool or frontend. The session is then closed.

Session 2. Execute.

Opens a fresh session, resolves the operation from the registry, and calls operation.execute(session, payload, emit=emit). On success, transitions the job to SUCCEEDED (or marks it as deferred if the operation returns OperationResult(deferred=True)). On exception, transitions to FAILED, stores the error class name and message, and re-raises.

The two-session design ensures durability: a crash in the execute phase leaves the claim committed (RUNNING is visible) while the result is not, which is the correct observable state. No session is held open across a long-running GPU inference call.

Three exceptional flows are handled explicitly:

  • RetryLaterError: the job is reset to QUEUED and the consumer re-publishes it after delay_seconds. Used by the embedding coordinator when the GPU is already occupied.

  • Parent cancellation: if a child job’s parent was cancelled between claim and execute, the child transitions to CANCELLED without running.

  • Corrupt execute session: if the execute session fails to commit (e.g. the DB connection drops mid-operation), a fallback session marks the job FAILED so it is never permanently stuck in RUNNING.

class protea.workers.base_worker.BaseWorker(session_factory: sessionmaker[Session], registry: OperationRegistry, config: WorkerConfig, *, amqp_url: str | None = None)

Bases: object

Executes queued jobs using a two-session pattern.

Session 1 (claim): transitions the job from QUEUED → RUNNING and commits. Session 2 (execute): resolves the operation, runs it, and transitions to SUCCEEDED or FAILED. Every state change is recorded as a JobEvent row.

This class is transport-agnostic: it receives a job_id and handles the rest. The caller (QueueConsumer) is responsible for acking/nacking.

extend_lease(job_id: UUID) None

Extend the leased_until timestamp for a running job.

Called periodically by the consumer heartbeat loop while a job is in progress. Uses a fresh session so the heartbeat never interferes with the execute session’s transaction. No-op when the job is no longer in RUNNING state (e.g. already succeeded or was externally cancelled).

handle_job(job_id: UUID) None

Claim and execute a single job identified by job_id.

Silently returns if the job does not exist or is not in QUEUED status. Transient infrastructure failures (Postgres deadlocks, brief connection resets) are retried up to 3 times with exponential backoff before the job is marked FAILED. Re-raises any exception from the operation after recording FAILED status.

requeue_on_shutdown(job_id: UUID) bool

Transition a RUNNING job back to QUEUED so it can be re-dispatched.

Called by the queue consumer when SIGTERM/SIGINT arrives mid-job: rather than force-failing the in-flight job (which loses work and requires manual re-dispatch), flip it back to QUEUED with a cleared leased_until and started_at so the next consumer can claim it. The accompanying delivery is republished by the caller, mirroring the spec’s “NACK-requeue + UPDATE job back to PLANNED” semantics under the pre-ack consumer pattern (the original delivery has already been acked to avoid AMQP consumer_timeout).

Returns True on a successful re-queue, False when the job is no longer RUNNING (already finished naturally) or the UPDATE could not commit. Idempotent on terminal states.

class protea.workers.base_worker.WorkerConfig(worker_name: 'str')

Bases: object

worker_name: str
exception protea.workers.base_worker.WorkerShutdown

Bases: Exception

Raised when a worker is asked to exit while a job is in flight.

Used by the consumer SIGTERM handler to mark the in-flight job FAILED through BaseWorker._force_fail_job before the process exits, so deploy-keeper redeploys never leave jobs orphaned in RUNNING.

Worker entry points

Workers are started by scripts/worker.py via scripts/manage.sh. Each process is bound to a single RabbitMQ queue and registers all operations at startup, making every worker capable of executing any operation routed to its queue.

# Start the full stack (all workers + API + frontend)
bash scripts/manage.sh start [N]

# Start a single worker manually (for debugging)
poetry run python scripts/worker.py --queue protea.jobs

# Start the stale job reaper (periodic cleanup process)
poetry run python scripts/worker.py --queue reaper

QueueConsumer vs OperationConsumer

Two consumer patterns exist in protea/infrastructure/queue/consumer.py, selected by the queue configuration in scripts/worker.py:

QueueConsumer

Reads a job UUID from the queue and delegates to BaseWorker.handle_job(). Creates full Job rows with status transitions and a JobEvent audit log. Used for queues where observability and traceability matter:

  • protea.ping: smoke test

  • protea.jobs: ingestion, ontology / annotation loaders, and generate_evaluation_set

  • protea.embeddings: serialised embedding coordinator

  • protea.predictions: serialised prediction coordinator

  • protea.training: serialised dataset-export coordinator (export_research_dataset)

  • protea.evaluations: run_cafa_evaluation runner

OperationConsumer

Reads a raw serialised operation payload from the queue and executes it directly, without a Job row. Used for high-throughput batch queues where creating thousands of child rows per pipeline run would cause significant write contention and table bloat. Progress is tracked exclusively through atomic increments to the parent job’s progress_current counter:

  • protea.embeddings.batch: GPU inference per batch

  • protea.embeddings.write: bulk pgvector insert

  • protea.predictions.batch: KNN search + GO transfer

  • protea.predictions.write: bulk GOPrediction insert

Graceful shutdown

protea.workers.shutdown provides the signal-handler setup that enables graceful shutdown of long-running worker processes. It installs handlers for SIGTERM and SIGINT that set a shared stop_event, allowing the main worker loop to drain the current batch and exit cleanly rather than being killed mid-operation.

SIGTERM/SIGINT grace-period helper for queue consumers.

When deploy-keeper redeploys the stack it sends SIGTERM to running workers. If a job is in flight, the worker process dies before the FAILED transition is committed and the DB row stays in RUNNING forever.

ShutdownGuard encapsulates the watchdog timer that turns this into a graceful shutdown:

  1. The consumer reports the in-flight job id via track(job_id) / untrack() around its callback.

  2. The consumer calls arm(job_id) from its signal handler.

  3. The guard schedules a daemon threading.Timer; if the job has not finished within grace_seconds the timer fires, marks the job FAILED with error_code=WorkerShutdown through the supplied force_fail callable, and exits the process with os._exit(143) (128 + SIGTERM by convention) so init/systemd does not need to escalate to SIGKILL.

  4. If the callback finishes naturally before the timer fires, cancel() releases the watchdog.

Extracted from infrastructure/queue/consumer.py to keep that file under the file-LOC smell threshold (>800 LOC).

class protea.workers.shutdown.HeartbeatLoop(extend_lease: Callable[[UUID], None], *, interval_seconds: int)

Bases: object

Background thread that periodically extends a job’s lease.

Started by QueueConsumer around BaseWorker.handle_job so the leased_until column tracks process liveness rather than the initial claim wall-clock. StaleJobReaper uses the same column to decide when an abandoned job is safe to re-enqueue. If the heartbeat thread itself dies (unhandled exception during extend_lease) the job will simply expire at the next reaper cycle, which is the intended failure mode.

Thread-safety: start/stop are called from the main pika IO thread; the worker callback (extend_lease) is invoked from the daemon worker thread but uses its own short-lived DB session, so there is no shared mutable state with the IO thread.

start(job_id: UUID) None

Begin extending the lease for job_id every interval seconds.

Idempotent: a second start without an intervening stop is a no-op so consumers that re-enter the dispatch path (e.g. RetryLater) do not stack heartbeat threads.

stop() None

Signal the heartbeat thread to exit and wait briefly for it.

class protea.workers.shutdown.ShutdownGuard(force_fail: Callable[[UUID, Exception], None], *, grace_seconds: int, requeue: Callable[[UUID], bool] | None = None, on_requeue: Callable[[UUID], None] | None = None)

Bases: object

Coordinate force-fail-on-shutdown for a queue consumer.

Thread-safety: track/untrack run on the main (pika IO) thread; arm/cancel may run on the signal-handler frame (also main thread). _fire runs on the watchdog Timer thread. The Python signal-delivery model only switches between bytecodes so reads/writes of _current_job_id from the main thread do not race with their own signal-handler equivalents. The Timer thread only reads _current_job_id, so a stale read is safe (the worst case is a no-op force-fail).

Re-queue-first policy (F-OPS-JOBS.1 SIGTERM handler): when an optional requeue callable is supplied, the watchdog tries it before falling back to force_fail. requeue returns True when it successfully flipped the row back to QUEUED, in which case the caller is expected to republish the AMQP delivery so a sibling worker can pick it up. force_fail remains the safety net for rows that no longer match (already terminal) or sessions that cannot commit.

arm(job_id: UUID) None

Schedule the force-fail watchdog if one is not already armed.

Idempotent: a second SIGTERM during shutdown does not arm a second timer.

cancel() None

Cancel the pending watchdog (if any). Safe to call repeatedly.

property current_job_id: UUID | None
property timer: Timer | None
track(job_id: UUID) None

Mark job_id as the in-flight job.

untrack() None

Clear the in-flight tracker after the callback finishes.

Stale job reaper

The StaleJobReaper is a periodic background process that scans for jobs stuck in RUNNING status beyond a configurable timeout (default: 21 600 seconds = 6 hours). It marks them as FAILED with error code JobTimeout. The reaper runs every 60 seconds and is started via scripts/worker.py --queue reaper.

Periodic reaper that marks long-running jobs as FAILED.

Workers are single-threaded and cannot be interrupted mid-operation without risking data corruption. Instead, this lightweight reaper runs on a timer and transitions any job that has been in RUNNING status for longer than timeout_seconds to FAILED with error_code JobTimeout.

Progress-aware grace period

Deferred coordinator jobs (e.g. compute_embeddings, predict_go_terms) may legitimately run longer than timeout_seconds because the actual work is done by child batch messages on downstream queues. Killing these jobs at the global wall-clock threshold would drop hours of successful work on the floor.

To avoid that, the reaper applies a second check to every candidate: if the job has produced any JobEvent within the last stall_seconds window, it is considered alive and left in place. Only truly stalled jobs — no events for stall_seconds — are marked FAILED. The hard timeout_seconds still acts as the lower bound (a job under the timeout is never touched), so this is strictly more permissive than the previous behaviour.

Usage:

reaper = StaleJobReaper(session_factory, timeout_seconds=21600)
reaper.run(interval_seconds=60)  # checks every minute
protea.workers.stale_job_reaper.LEASE_EXPIRED_FAILED_EVENT = 'job.lease_expired'

Event written when the reaper gives up after exhausting requeue attempts.

protea.workers.stale_job_reaper.LEASE_REQUEUE_EVENT = 'job.lease_expired_requeue'

Event name written every time the reaper re-enqueues a lease-expired job. Counted by StaleJobReaper._lease_requeue_attempts() to honour the max_lease_requeues budget before falling back to lease_expired.

class protea.workers.stale_job_reaper.StaleJobReaper(session_factory: sessionmaker[Session], timeout_seconds: int = 3600, *, stall_seconds: int = 1800, amqp_url: str | None = None, max_lease_requeues: int = 3)

Bases: object

run(interval_seconds: int = 60) None

See also