HTTP API

The PROTEA HTTP API is a FastAPI application that exposes a set of routers under protea/api/routers/ (the live OpenAPI is regenerated into docs/openapi.json and is authoritative for the exact endpoint list). All state mutations flow through this layer: it writes Job rows to PostgreSQL and publishes messages to RabbitMQ. The API is stateless between requests; the session factory and AMQP URL are injected via app.state at startup, keeping every router free of global state and infrastructure imports.

All endpoints return JSON. Error responses follow the RFC 7807 ``application/problem+json`` shape (T4.4 / D4): every error body includes type (relative URI under /problems/{slug}, e.g. /problems/not-found), title (short stable summary), status (mirror of the HTTP code), and an optional detail + instance (request URI). Validation errors carry an extra errors array with the offending field paths. Existing route code keeps raising HTTPException exactly as before; only the wire format changed. Timestamps are ISO 8601 UTC strings. UUID identifiers are lowercase hyphenated strings.

Every client request body is strict (model_config = ConfigDict(extra="forbid"), PR #215): unknown keys raise a 422 instead of being silently dropped, so {"oepration": "ping"} on POST /jobs (typo for operation) fails fast against the schema rather than parsing as if operation were missing. The contract covers every documented request body (CreateJobRequest / CreateJobCommentRequest / ScoringConfigCreate / CreateExperimentRunRequest / UpdateExperimentRunRequest / CreateDatasetRequest / ImportDatasetByReferenceRequest / ImportRerankerByReferenceRequest / SupportCreate); response models are not constrained because they are server-built and never parse client input.

Versioning under the /v1/ prefix

Every router is mounted twice (T4.1, decision D4):

  • Canonical under the /v1/ prefix (the first major URL segment): surfaced in OpenAPI / Swagger and the only path schema exporters and codegen tools see. All new clients should target this form.

  • Legacy alias at the root path: the same handler reachable without a prefix, include_in_schema=False so OpenAPI does not advertise it. This exists for the deprecation window so existing frontend, CLI, and CI traffic keeps working without a coordinated cutover.

The endpoint paths in the per-router sections and the Endpoints summary below are listed without the prefix for terseness; both the bare and the prefixed paths resolve to the same handler today. Health endpoints (/health, /health/ready) stay at the root by convention. When the legacy aliases are retired the second include_router call in protea.api.app._register_routers will be removed; this page is the source of truth for that timing.

Application factory

protea.api.app creates the FastAPI application, registers all routers, and wires the session factory and AMQP URL into app.state at startup. It also configures CORS and mounts any static middleware.

Application lifecycle and startup stages

protea.api.stages orchestrates the FastAPI lifespan: it opens the SQLAlchemy engine, publishes the session factory into app.state, and tears down the AMQP connection pool on shutdown.

Shared stage-classification helpers for the benchmark + showcase routers.

Both routers need to label an EvaluationResult with the pipeline stage that produced it ("reranker" or whichever ScoringConfig.name was applied). The logic was duplicated across both files until this module consolidated it — the inline copy in showcase.py carried a comment “Matches benchmark.py semantics without cross-importing”, which is exactly the dispensable-duplication smell this module fixes.

protea.api.stages.stage_kind(stage: str) Literal['scoring', 'reranker']

Return "reranker" for the reranker stage, "scoring" otherwise.

protea.api.stages.stage_of(result: EvaluationResult, scoring_name: str | None) str | None

Classify an EvaluationResult into a stage.

Reranker dominates scoring config. Evaluations without either a scoring config or a reranker are considered incomplete and excluded from the matrix (return None).

Jobs router

The /jobs router is the primary interface for job lifecycle management. Jobs are created by POST /jobs with an operation name, a queue_name, and an optional JSON payload. The API creates a Job row in QUEUED status, commits, then publishes the UUID to RabbitMQ (in that order, so workers always find the row before they try to claim it).

Job status and the structured event timeline can be polled via GET /jobs/{id} and GET /jobs/{id}/events respectively. The frontend uses 2-second polling on the events endpoint to render a live progress timeline.

class protea.api.routers.jobs.CreateJobCommentRequest(*, body: Annotated[str, MinLen(min_length=1)], author: str | None = None)

Bases: BaseModel

Body for POST /jobs/{job_id}/comments.

Curator/operator note attached to a Job (D11 narrative thread). Distinct from machine-emitted JobEvent rows: comments carry an opinionated message and an optional author tag. Markdown is permitted in body; the UI renders the thread chronologically.

author: str | None
body: str
model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'author': 'frapercan', 'body': 'Re-running with k=10; k=5 hit the variance ceiling on PK.'}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod strip_body(v: str) str
class protea.api.routers.jobs.CreateJobRequest(*, operation: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1)], queue_name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1)], payload: dict[str, ~typing.Any] = <factory>, meta: dict[str, ~typing.Any] = <factory>, description: str | None = None, tags: list[str] = <factory>)

Bases: BaseModel

Body for POST /jobs.

Tells PROTEA which registered operation to run (operation) and which RabbitMQ queue to publish the work onto (queue_name). The payload blob is op-specific; the operation registry validates it on dequeue. description / tags are the D11 narrative fields surfaced in the UI run detail.

description: str | None
meta: dict[str, Any]
model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'description': 'Recompute ESM-2 embeddings for the GOA 2024-04 set.', 'meta': {}, 'operation': 'compute_embeddings', 'payload': {'annotation_set_id': '00000000-0000-0000-0000-000000000002', 'batch_size': 1, 'embedding_config_id': '00000000-0000-0000-0000-000000000001'}, 'queue_name': 'protea.embedding', 'tags': ['ablation', 'benchmark-v1']}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

operation: str
payload: dict[str, Any]
queue_name: str
classmethod strip_and_require(v: str) str
tags: list[str]
class protea.api.routers.jobs.JobListFilters(status: str | None, operation: str | None, include_children: bool, parent_job_id: UUID | None, limit: int, after: datetime | None)

Bases: NamedTuple

Bundle of query-string filters consumed by GET /jobs.

Carries the user-visible knobs so the route handler signature stays under the §3 6-param ceiling. The FastAPI dep _job_list_filters_dep exposes each field as a discrete query parameter on the wire.

after is the cursor token for pagination (T4.2): when set, the list only returns rows strictly older than the given UTC timestamp. Clients page forward by reading the created_at of the last row and feeding it back as after. Microsecond resolution on Job.created_at keeps tie collisions astronomically rare.

after: datetime | None

Alias for field number 5

include_children: bool

Alias for field number 2

limit: int

Alias for field number 4

operation: str | None

Alias for field number 1

parent_job_id: UUID | None

Alias for field number 3

status: str | None

Alias for field number 0

protea.api.routers.jobs.cancel_job(job_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Mark a job (and any non-terminal child jobs) as CANCELLED.

Already-finished jobs (SUCCEEDED/FAILED) are returned as-is with no state change. Children in QUEUED are cancelled immediately. Children in RUNNING are also marked CANCELLED; the worker’s parent-check in BaseWorker.handle_job() will detect the cancelled parent on the next iteration and stop gracefully.

protea.api.routers.jobs.create_job(request: Request, response: Response, body: CreateJobRequest, deps: _CreateJobDeps = Depends(dependency=<function _create_job_deps>, use_cache=True, scope=None), principal: ApiKey | BearerPrincipal | None = Depends(dependency=<function require_role.<locals>._gate>, use_cache=True, scope=None)) dict[str, Any]

Create a Job row and publish its ID to the specified RabbitMQ queue.

Expensive operations (export_research_dataset, run_cafa_evaluation) are subject to per-user daily quota limits (FARM-AUTH.7). Admins are exempt. Duplicate POSTs (same operation + payload while the previous job is active) return 409 with the existing job_id (F-OPS-JOBS.1 dedup).

protea.api.routers.jobs.create_job_comment(job_id: UUID, body: CreateJobCommentRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Append a free-form comment to a Job.

Curators / operators use this thread to record observations, follow-ups, or post-mortems; the worker fleet keeps writing to JobEvent for machine-emitted progress.

protea.api.routers.jobs.delete_job(job_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Permanently delete a job and its event log. Running jobs cannot be deleted (409).

protea.api.routers.jobs.get_job(job_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), registry: OperationRegistry = Depends(dependency=<function get_operation_registry>, use_cache=True, scope=None)) dict[str, Any]

Retrieve full details for a single job including its payload, meta, and progress counters.

protea.api.routers.jobs.get_job_events(job_id: UUID, limit: int = Query(200), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

Return the structured event log for a job (newest first).

Events include progress milestones, warnings, HTTP retries, and errors. Useful for monitoring long-running operations such as compute_embeddings or predict_go_terms.

protea.api.routers.jobs.list_job_comments(job_id: UUID, limit: int = Query(200), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

Return JobComment rows for a Job, oldest first.

Use after to page forward (the comment thread grows oldest → newest, so cursor semantics flip vs. the newest-first lists). limit caps each page at 2000 to keep payloads bounded.

protea.api.routers.jobs.list_jobs(filters: JobListFilters = Depends(dependency=<function _job_list_filters_dep>, use_cache=True, scope=None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), registry: OperationRegistry = Depends(dependency=<function get_operation_registry>, use_cache=True, scope=None)) list[dict[str, Any]]

List jobs with optional filtering.

By default only top-level jobs (no parent) are returned. Set include_children=true or filter by parent_job_id to see batch sub-jobs from distributed pipelines. Filters travel as discrete query parameters on the wire; the dependency bundles them into JobListFilters for the handler.

Proteins router

The /proteins router provides read access to the protein and sequence catalogue. Proteins are not created directly through this router; they are inserted asynchronously by the insert_proteins operation. The router exposes list and detail endpoints with filtering by organism and review status.

protea.api.routers.proteins.get_protein(accession: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Full details for one protein: core fields, UniProt functional metadata, embedding count, GO annotation count, and accessions of known isoforms (if canonical).

protea.api.routers.proteins.get_protein_annotations(accession: str, annotation_set_id: str | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

Return all GO term annotations for a protein, joined with term details and annotation set source. Optionally filter to a specific annotation set by UUID.

protea.api.routers.proteins.get_protein_stats(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Return aggregate counts: total proteins, canonical vs isoforms, reviewed, and how many have metadata, embeddings, or GO annotations.

Cached for 5 minutes: the DISTINCT-over-JOIN counts scan 4M–80M rows and take 30+ seconds to run from scratch. Counts move slowly enough that a 5-min staleness is invisible to users.

Serves the last-known value when the recompute fails (DB blip, query timeout) so the page never blocks on a cold-cache 500. The startup hook in protea.api.app prewarms this key and a background task refreshes it before expiry so users never hit a cold path under normal operation.

protea.api.routers.proteins.list_proteins(search: str | None = Query(None), reviewed: bool | None = Query(None), canonical_only: bool = Query(True), limit: int = Query(50), offset: int = Query(0), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Paginated protein listing with optional full-text search across accession, entry name, gene name, and organism.

protea.api.routers.proteins.prewarm_protein_stats(factory: sessionmaker[Session]) dict[str, Any]

Recompute and store proteins:stats; used by the app startup hook and the background refresh loop. Always bypasses the existing entry so the cache is refilled with fresh counts before the old TTL expires.

Annotations router

The /annotations router exposes the GO ontology and annotation set data. It provides:

  • Ontology snapshot listing and detail, including GO term counts per aspect.

  • Annotation set listing and detail.

  • A BFS ancestor subgraph endpoint (GET /annotations/snapshots/{id}/subgraph) that returns the ancestor closure for a given set of GO term IDs within a snapshot. Used by the frontend to render the GO hierarchy for a prediction result.

The annotations router is split into four sub-modules, each handling one endpoint group.

Evaluation-result endpoints: list / metrics TSV / artifacts ZIP / delete.

protea.api.routers.annotations.evaluation_results.delete_evaluation_result(eval_id: UUID, result_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) None

Delete one evaluation result row plus its stored artefacts.

Two-step: the ORM cascade clears the DB row first (collecting the artifact keys); the artifact store delete() is then issued for each key outside the session so a network failure here does not leave the DB inconsistent. Returns 204 on success, 404 if the result is unknown.

protea.api.routers.annotations.evaluation_results.download_evaluation_artifacts(eval_id: UUID, result_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse

Stream all stored cafaeval artefacts for one evaluation result as a ZIP.

Bundles the per-result outputs (raw predictions, CAFA scoring TSVs, plot images) that the artifact store keeps under the result’s prefix. Returns 404 if the result is unknown or has no artefacts persisted.

protea.api.routers.annotations.evaluation_results.download_evaluation_metrics(eval_id: UUID, result_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse

Stream the per-aspect metrics table for one evaluation result.

Renders the cafaeval-style summary (Fmax, Smin, AUPRC) as TSV with one row per CAFA aspect (BPO/MFO/CCO). Returns 404 if the (eval_id, result_id) pair does not match.

protea.api.routers.annotations.evaluation_results.list_evaluation_results(eval_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

List every cafaeval result row attached to one evaluation set.

Each row carries the prediction-set / scoring-config / reranker triple used to produce it plus the cached metrics summary, so the UI benchmark matrix can render without per-row drilldowns. Returns 404 if the evaluation set itself is missing.

Embeddings router

The /embeddings router manages embedding configurations and prediction sets. Embedding configurations are immutable recipes: once created, they can be referenced by any number of embedding computation and prediction jobs. Creating a new configuration with different parameters produces a new UUID, preserving reproducibility.

Prediction sets are created by submitting a predict_go_terms job and are queryable once the job completes. The GET /embeddings/prediction-sets/{id}/predictions.tsv endpoint streams prediction results as a tab-separated file (32 columns including re-ranker features) using StreamingResponse with yield_per(1000), avoiding loading the full result set into memory.

protea.api.routers.embeddings.create_embedding_config(body: dict[str, ~typing.Any], factory: ~sqlalchemy.orm.session.sessionmaker[~sqlalchemy.orm.session.Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Create a new EmbeddingConfig that defines the model, layer selection, pooling strategy, and chunking.

This config is referenced by compute_embeddings jobs and predict_go_terms jobs to ensure query and reference embeddings were produced under identical settings.

protea.api.routers.embeddings.delete_embedding_config(config_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Delete an EmbeddingConfig and cascade-delete all linked embeddings, prediction sets, and predictions.

protea.api.routers.embeddings.delete_prediction_set(set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Delete a prediction set and all its GOPrediction rows.

protea.api.routers.embeddings.download_predictions_cafa(set_id: UUID, eval_id: UUID | None = Query(None), aspect: str | None = Query(None), max_distance: float | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse

Stream predictions in CAFA format: protein_accession\tgo_id\tscore.

Score is computed as max(0.0, 1.0 - distance) so that closer neighbours receive higher confidence scores in the [0, 1] range expected by the CAFA evaluator. One row per (protein, GO term) pair; duplicate GO terms for the same protein are deduplicated keeping the highest score (lowest distance).

Pass eval_id to restrict output to delta proteins only (NK + LK targets), which is required for a valid CAFA evaluation.

protea.api.routers.embeddings.download_predictions_tsv(set_id: UUID, accession: str | None = Query(None), aspect: str | None = Query(None), max_distance: float | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse

Stream all GO predictions for a prediction set as a tab-separated file.

Each row is one (protein, GO term, reference protein) triple. Columns include embedding distance, GO term metadata, annotation fields, and optional alignment and taxonomy features (columns are present but empty when not computed).

Optional filters: accession, aspect (F/P/C), max_distance.

The response streams rows directly from the database; suitable for large prediction sets without loading everything into memory.

protea.api.routers.embeddings.get_embedding_config(config_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Retrieve a single EmbeddingConfig with its total stored embedding count.

protea.api.routers.embeddings.get_go_term_distribution(set_id: UUID, limit: int = Query(50), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Return the most frequently predicted GO terms grouped by aspect (F/P/C) and the total prediction counts per aspect.

protea.api.routers.embeddings.get_prediction_set(set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Retrieve a prediction set with total prediction count and per-protein GO term counts.

protea.api.routers.embeddings.get_protein_predictions(set_id: UUID, accession: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

Return all predicted GO terms for a protein in a prediction set, sorted by distance (nearest first). Includes GO term details plus optional alignment (NW/SW) and taxonomy fields when computed.

protea.api.routers.embeddings.list_embedding_configs(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

List all embedding configurations with their stored embedding counts, newest first.

The per-config GROUP BY over a 4M-row table is cached 5 minutes; new configs still appear immediately (they have 0 embeddings), only the counts are stale. Serves the last-known value when the recompute fails (DB blip, query timeout) so the page never blocks on a cold-cache 500.

protea.api.routers.embeddings.list_prediction_set_proteins(set_id: UUID, search: str | None = Query(None), limit: int = Query(50), offset: int = Query(0), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Paginated list of proteins in a prediction set with their predicted GO count, minimum distance, known annotation count, and how many predictions match known annotations (precision proxy).

protea.api.routers.embeddings.list_prediction_sets(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

List the 100 most recent prediction sets, cached 5 min.

The DISTINCT-over-JOIN against prediction_set + embedding_config + annotation_set + ontology_snapshot scans tens of millions of rows on cold cache (115s+ measured). The startup hook in protea.api.app prewarms this key and a background task refreshes it before expiry so users never hit a cold path under normal operation. Serves the last-known value on producer failure to prevent a DB blip from surfacing as a 500.

protea.api.routers.embeddings.predict_go_terms(body: dict[str, ~typing.Any], factory: ~sqlalchemy.orm.session.sessionmaker[~sqlalchemy.orm.session.Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]

Queue a predict_go_terms job that runs KNN-based GO term transfer.

The coordinator partitions query proteins into batches, each dispatched to protea.predictions.batch workers for KNN search (numpy or FAISS) + GO annotation transfer. Results are written to a new PredictionSet via protea.predictions.write workers.

Required body fields: embedding_config_id, annotation_set_id, ontology_snapshot_id. Optional: query_set_id (FASTA upload), limit_per_entry, distance_threshold, batch_size, search_backend. Feature-engineering flags default to True: compute_alignments, compute_taxonomy, compute_reranker_features all run unless explicitly set to false. aspect_separated_knn defaults to true (one KNN index per GO aspect to guarantee BPO/MFO/CCO coverage even when unified nearest neighbours carry only one aspect).

protea.api.routers.embeddings.prewarm_embedding_configs(factory: sessionmaker[Session]) list[dict[str, Any]]

Recompute and store embeddings:configs; used by the app startup hook and the background refresh loop. Always bypasses the existing entry so the cache is refilled with fresh counts before the old TTL expires.

protea.api.routers.embeddings.prewarm_prediction_sets(factory: sessionmaker[Session]) list[dict[str, Any]]

Recompute and store embeddings:prediction-sets; used by the app startup hook and the background refresh loop. Always bypasses the existing entry so the cache is refilled with fresh data before the old TTL expires.

Scoring router

The /scoring router exposes scoring configurations, the training-data export, and read-only endpoints for applying LightGBM re-ranker models. In-process re-ranker training was retired in F0/T0.6: boosters are now trained offline in protea-reranker-lab and registered through the Reranker models router (POST /reranker-models/import).

Key endpoints:

  • GET /scoring/prediction-sets/{id}/training-data.tsv: generates a 31-column TSV with binary labels from temporal ground truth, consumed by protea-reranker-lab to fit a booster.

  • GET /scoring/rerankers / GET /scoring/rerankers/{id} / DELETE /scoring/rerankers/{id}: read/delete operations for registered re-ranker models. Creation lives at POST /reranker-models/import.

  • GET /scoring/prediction-sets/{id}/rerank.tsv: applies a trained re-ranker to a prediction set, streaming re-scored predictions.

  • GET /scoring/prediction-sets/{id}/reranker-metrics: computes CAFA-style Fmax and AUC-PR using re-ranker probability scores.

Query sets router

The /query-sets router handles user-uploaded FASTA files. On POST /query-sets, the server parses the multipart upload, creates a QuerySet row, upserts one Sequence row per unique amino-acid string (deduplicating by MD5 hash), and creates QuerySetEntry rows preserving the original FASTA headers. The returned query set ID can then be referenced in compute_embeddings and predict_go_terms job payloads.

async protea.api.routers.query_sets.create_query_set(file: UploadFile, name: str = Form(PydanticUndefined), description: str | None = Form(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Upload a FASTA file and create a QuerySet.

Each sequence in the FASTA is stored (or reused if already present) in the sequence table. A query_set_entry row is created per sequence, preserving the original FASTA accession. Duplicate accessions within the same upload are rejected with 422.

protea.api.routers.query_sets.delete_query_set(query_set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Delete a query set and all its entries. Sequences are not deleted (they may be shared).

protea.api.routers.query_sets.extract_uniprot_header_metadata(description: str) dict[str, Any]

Parse UniProt-style FASTA headers and extract taxonomy fields.

Matches the SwissProt/TrEMBL convention sp|ACC|NAME OS=<species> OX=<taxid> GN=<gene> PE=<level> SV=<version>. Returns {'taxonomy_id': int | None, 'species': str | None}. Silent no-op for headers that don’t follow the convention; fields simply come back as None.

protea.api.routers.query_sets.get_query_set(query_set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Retrieve a query set with its full entry list (accessions and sequence IDs).

protea.api.routers.query_sets.list_query_sets(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

List all uploaded FASTA query sets with their entry counts, newest first.

Annotate router

The /annotate router provides a one-click annotation endpoint. It accepts a FASTA file (or raw text), auto-selects the best available embedding config, annotation set, and ontology snapshot, creates a QuerySet, and queues a compute_embeddings job. Returns all the IDs the frontend needs to chain predict_go_terms once embeddings finish.

One-click protein annotation endpoint.

Accepts a FASTA file (or raw text), auto-selects the best available embedding config, annotation set, and ontology snapshot, creates a QuerySet, and kicks off compute_embeddings. Returns all the IDs the frontend needs to chain predict_go_terms once embeddings finish.

class protea.api.routers.annotate.AnnotateFormOptions(*, compute_reranker_features: bool = True)

Bases: BaseModel

User-controllable feature flags for the quick-annotation endpoint.

These fields map 1:1 to the predict_go_terms coordinator payload so the frontend can expose them directly without an intermediate translation.

compute_reranker_features: bool
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

async protea.api.routers.annotate.annotate(file: UploadFile | None = None, fasta_text: str | None = Form(None), name: str = Form(Quick annotation), compute_reranker_features: bool = Form(True), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]

One-click annotation: upload FASTA, auto-select best method, run pipeline.

Accepts either an uploaded FASTA file or raw fasta_text. Creates a QuerySet, picks the best embedding config (or creates the default ESM-2 650M config), and queues a compute_embeddings job.

Returns the IDs the frontend needs to monitor progress and chain predict_go_terms once embeddings are ready.

compute_reranker_features controls whether the reranker feature families (lineage, anc2vec, anc2vec_query, emb_pca, annotation_meta) are included in the downstream predict_go_terms job. Default: True.

Maintenance router

The /maintenance router provides housekeeping endpoints for identifying and removing orphaned data. Two pairs of preview/execute endpoints handle orphan sequences (not referenced by any Protein or QuerySetEntry) and unindexed embeddings (for sequences not referenced by any Protein). Preview endpoints are read-only; execute endpoints perform the actual deletion.

protea.api.routers.maintenance.preview_orphan_sequences(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Count orphan sequences without running the delete.

A sequence is orphaned when it has no Protein rows pointing to it AND no QuerySetEntry rows pointing to it.

protea.api.routers.maintenance.preview_unindexed_embeddings(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Count embeddings for sequences not referenced by any Protein.

These are embeddings computed for query proteins (QuerySet uploads) or orphan sequences. They are safe to delete once predictions have been run.

protea.api.routers.maintenance.vacuum_embeddings(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Delete embeddings for sequences not referenced by any Protein.

Destructive: removes rows from sequence_embedding. Gated to admin so the embedding corpus (expensive to recompute on GPUs) cannot be wiped by an operator key. Safe to run once predictions have been generated; query-protein embeddings are only needed during the prediction job itself.

protea.api.routers.maintenance.vacuum_sequences(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Delete sequences not referenced by any Protein or QuerySetEntry.

Destructive: removes rows from sequence. Gated to admin so a compromised operator key cannot reduce the corpus. Orphan sequences have no embeddings reachable from any active protein or query set, but the deletion is permanent and feeds into downstream foreign keys, so it stays on the admin floor with the other DB-mutating housekeeping operations.

Admin router

The /admin router exposes destructive administrative operations. Currently provides POST /admin/reset-db, which drops and recreates the public schema and re-applies all Alembic migrations. Protected by the admin role via require_role() (FARM-AUTH.4).

class protea.api.routers.admin.DlqPurgeRequest(*, operation: str | None = None, first_death_queue: str | None = None, dry_run: bool = False, max_messages: int = 10000)

Bases: BaseModel

Filter for DLQ messages to permanently discard.

dry_run: bool
first_death_queue: str | None
max_messages: int
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

operation: str | None
class protea.api.routers.admin.DlqReplayRequest(*, operation: str | None = None, first_death_queue: str | None = None, target_queue: str | None = None, dry_run: bool = False, max_messages: int = 1000)

Bases: BaseModel

Filter for DLQ messages to re-enqueue back onto their source queue.

dry_run: bool
first_death_queue: str | None
max_messages: int
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

operation: str | None
target_queue: str | None
protea.api.routers.admin.get_dlq_summary(_principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)], amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None), max_peek: int = Query(500)) dict[str, Any]

Grouped count of dead-letter messages by operation, source queue, and age.

Peeks up to max_peek messages from protea.dead-letter without consuming them, groups them by {operation, first_death_queue, age_bucket}, and re-queues all peeked messages before returning. The DLQ depth is unchanged after this call.

protea.api.routers.admin.purge_dlq(body: ~protea.api.routers.admin.DlqPurgeRequest, _principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)], amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]

Discard DLQ messages matching the filter.

Matching messages are acked (permanently removed from the DLQ). Non-matching messages remain in the DLQ.

dry_run=True reports how many messages would be purged without removing them. Always prefer a dry-run first.

protea.api.routers.admin.replay_dlq(body: ~protea.api.routers.admin.DlqReplayRequest, _principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)], amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]

Re-enqueue DLQ messages matching the filter.

Matching messages are published back to their original source queue (or target_queue if specified) and acked from the DLQ. Non-matching messages remain in the DLQ.

dry_run=True reports how many messages would be replayed without actually moving them.

protea.api.routers.admin.reset_db(request: ~starlette.requests.Request, _principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)]) dict

Drop and recreate the public schema, then re-apply all Alembic migrations.

Requires an authenticated admin principal (FARM-AUTH.4) plus the extra destructive-op guards in _authorize_reset_db().

Showcase router

The /showcase router aggregates platform statistics and best evaluation results for the landing page. Returns protein counts, embedding counts, prediction counts, best Fmax per aspect per evaluation category (NK/LK/PK), and a method comparison table, all in a single JSON response.

Showcase endpoint: aggregates platform stats and the single best evaluation result with full embedding attribution.

Unlike protea.api.routers.benchmark, which exposes the full per-model per-stage matrix, this module is deliberately minimal: it returns one “spotlight” result that the Home page can use for its hero card, plus the pipeline stage counts.

Background

The previous implementation collapsed every evaluation into three method buckets (knn_baseline / knn_scored / knn_reranker) and took the maximum Fmax across all embeddings in each bucket. That hid which concrete embedding won a given cell, and silently dropped losing embeddings from the UI entirely. With the introduction of the 8-model benchmark, that collapse is actively misleading; so this endpoint now returns a single named winner and a link to /benchmark for the full matrix.

protea.api.routers.showcase.get_showcase(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Aggregate pipeline stage counts and return the single best evaluation result (by mean Fmax across the 9 cells) along with the embedding that produced it.

Empty-state contract: best is None when no EvaluationResult exists; pipeline_stages always returns the same five entries with count = 0 for unpopulated stages; counts always returns the same keys.

Support router

The /support router handles community feedback. GET /support returns the total thumbs-up count and recent comments. POST /support submits a new thumbs-up with an optional comment (max 500 characters).

class protea.api.routers.support.SupportCreate(*, comment: str | None = None)

Bases: BaseModel

Body for POST /support.

A thumbs-up may carry an optional free-form comment. The text is capped at api.max_comment_length from the tuning config; longer submissions are rejected with 422 rather than silently truncated.

comment: str | None
classmethod comment_within_limit(v: str | None) str | None
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

protea.api.routers.support.get_support(all_comments: bool = Query(False), factory=Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Return total thumbs-up count and comments.

Pass all_comments=true to get all comments (up to the configured page limit) instead of the recent_limit most recent.

protea.api.routers.support.post_support(body: SupportCreate, factory=Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Submit a thumbs-up with an optional comment.

Benchmark router

The /benchmark router powers the per-PLM comparison grid in the UI. Where /showcase collapses every model into a few buckets and reports the maximum, this router preserves which embedding produced each number and which scoring config was used, exposing one stage per distinct ScoringConfig.name plus an implicit "reranker" stage for evaluations that used a re-ranker. Stage labels, GO categories, and the baseline tag are read from protea/config/benchmark.yaml; no hardcoded constants.

Benchmark matrix endpoints.

Exposes a per-embedding, per-stage view of every EvaluationResult in the database so the UI can render the full PLM comparison grid for the thesis benchmark.

Where the /showcase endpoint collapses all models into a few method buckets and takes the maximum across every embedding, this module preserves which embedding produced each number and which scoring config was used: one stage per distinct scoring_config.name found in the DB, plus an implicit "reranker" stage for evaluations that used a reranker.

Zero domain constants are hardcoded here: stage labels, preferred default, baseline tag, GO categories and aspects all come from protea/config/benchmark.yaml via BenchmarkConfig. Model display metadata (display name, family, param count) comes from the dedicated columns on embedding_config; no HF-name regex heuristics.

Two endpoints are provided:

GET /benchmark/embeddings

One row per EmbeddingConfig with its persisted display metadata.

GET /benchmark/matrix

One row per (embedding_config, evaluation_set, stage, category, aspect) tuple, best-Fmax only. Response also includes:

  • stages: every stage observed in the data (with label/kind)

  • evaluation_sets: per-eval-set metadata (stats, source, obo version)

  • best_per_cell: cross-model winner per (category, aspect) cell

    within the active stage/K filter selection

  • best_per_cell_global: same shape as best_per_cell but ignores the

    user’s stage/K filters. Stable across filter changes; the per-cell champion across the entire dataset for the current evaluation set.

  • categories / aspects: from YAML config

protea.api.routers.benchmark.get_benchmark_matrix(evaluation_set_id: UUID | None = Query(None), stage: str | None = Query(None), k: int | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), cfg: BenchmarkConfig = Depends(dependency=<function get_benchmark_config>, use_cache=True, scope=None)) dict[str, Any]

Return a long-format table with one row per (embedding_config, evaluation_set, stage, category, aspect) tuple containing the best Fmax / precision / recall observed in the DB, plus per-eval-set metadata and a cross-model leaderboard.

protea.api.routers.benchmark.list_benchmark_embeddings(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), cfg: BenchmarkConfig = Depends(dependency=<function get_benchmark_config>, use_cache=True, scope=None)) dict[str, Any]

Return every EmbeddingConfig with its persisted display metadata.

The metadata lives in embedding_config.display_name / family / param_count: filled at creation time by the seed scripts. No heuristic inference happens here. Configs listed in benchmark.yaml: hidden_embeddings are suppressed. Cached for 5 min; the benchmark page is the first router touch on a fresh deploy so cold pg pages push this past several seconds without the cache.

protea.api.routers.benchmark.prewarm_benchmark_embeddings(factory: sessionmaker[Session]) dict[str, Any]

Recompute and store benchmark:embeddings for the lifespan prewarm hook + background refresh loop.

protea.api.routers.benchmark.prewarm_benchmark_matrix(factory: sessionmaker[Session]) None

Pre-warm the benchmark matrix by running the full EvaluationResult scan with no filters. The underlying pg statement is filter-agnostic (stage/K filters are applied in Python on the materialised rows), so one warm pass populates the buffer cache for every filtered variant the UI requests next. The /matrix endpoint itself is not response- cached: pg-pages-hot is enough to keep the live response sub-100ms, and filter combos multiply too much for a useful in-process cache.

Datasets router

The /datasets router is the registry for frozen re-ranker training datasets. POST /datasets enqueues an export_research_dataset job that runs the KNN + feature pipeline, publishes the train.parquet / eval.parquet / manifest.json triple to the configured ArtifactStore (local FS or MinIO), and inserts a Dataset row once the upload completes. GET /datasets and GET /datasets/{id_or_name} expose the registry to protea-reranker-lab’s pull_dataset.py and to UI consumers.

POST /datasets/import-by-reference (LB.1) is the lightweight registration path for datasets whose artefacts already reside in the artifact store. The caller supplies the name, storage backend, artifact URIs, content fingerprints (schema_sha, manifest_sha), and dump parameters verbatim from the lab’s manifest.json; PROTEA inserts a Dataset row pointing at those URIs without re-running the KNN pipeline or enqueueing a job. Typical use cases are: replay after a DB wipe while artefacts remain in MinIO, lab-side dumps produced before export_research_dataset existed, and the FARM-EXP.2a placeholder-digest backfill. Optional FK columns (embedding_config_id, ontology_snapshot_id) are silently set to NULL when the referenced row is absent in the local DB, matching the same defensive pattern used by POST /reranker-models/import-by-reference. The resulting Dataset row is content-identical to one produced by an in-PROTEA export; the only visible difference is meta.imported_by_reference = true.

Frozen re-ranker dataset registry.

POST /datasets enqueues an export_research_dataset job that runs KNN + feature generation, publishes train/eval/manifest artefacts to the configured artifact store (local FS or MinIO) and inserts a Dataset row once the upload completes. The row is the durable handle the lab uses to pull the exact dump by name or id.

POST /datasets/import-by-reference is the lightweight twin: it registers a Dataset row pointing at already-staged artefacts (lab side dump, salvage replay, or any out of band export) without running the KNN pipeline. The lab uses this for benches it produced locally before export_research_dataset existed, or for re-imports after a DB wipe.

GET /datasets and GET /datasets/{id_or_name} expose the registry for the lab’s pull_dataset.py and for UI consumers.

class protea.api.routers.datasets.CreateDatasetRequest(*, output_name: Annotated[str, MinLen(min_length=1), MaxLen(max_length=255)], embedding_config_id: Annotated[str, MinLen(min_length=1)], ontology_snapshot_id: Annotated[str, MinLen(min_length=1)], train_versions: Annotated[list[int], MinLen(min_length=2)], test_versions: Annotated[list[int], MinLen(min_length=1)], annotation_source: str = 'goa', k: Annotated[int, Gt(gt=0)] = 5, search_backend: str = 'faiss', compute_alignments: bool = False, compute_taxonomy: bool = False, expand_votes_to_ancestors: bool = False, use_embedding_pca: bool = False)

Bases: BaseModel

Body for POST /datasets.

Mirrors the export_research_dataset operation payload. The caller does not pick a queue: the dataset export always runs on the protea.training worker (serialized, GPU/RAM-intensive).

annotation_source: str
compute_alignments: bool
compute_taxonomy: bool
embedding_config_id: str
expand_votes_to_ancestors: bool
k: int
model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'annotation_source': 'goa', 'compute_alignments': True, 'compute_taxonomy': True, 'embedding_config_id': '00000000-0000-0000-0000-000000000001', 'expand_votes_to_ancestors': False, 'k': 5, 'ontology_snapshot_id': '00000000-0000-0000-0000-000000000002', 'output_name': 'bench-v1-K5', 'search_backend': 'faiss', 'test_versions': [230], 'train_versions': [160, 165, 170, 175], 'use_embedding_pca': False}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ontology_snapshot_id: str
output_name: str
search_backend: str
test_versions: list[int]
train_versions: list[int]
use_embedding_pca: bool
class protea.api.routers.datasets.ImportDatasetByReferenceRequest(*, name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=255)], storage_backend: str = 'local', key_prefix: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=512)], train_uri: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=1024)] = None, eval_uri: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=1024)] = None, manifest_uri: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=1024)], schema_sha: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=16)], manifest_sha: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=64)] = None, k: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)], annotation_source: str = 'goa', n_train_rows: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, n_eval_rows: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, embedding_config_id: str | None = None, ontology_snapshot_id: str | None = None, train_snapshot_pairs: list[str] = <factory>, eval_snapshot_pair: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=64)] = None, producer_version: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=64)] = None, producer_git_sha: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=40)] = None, external_source: str | None = None, meta: dict[str, ~typing.Any] = <factory>, force: bool = False)

Bases: BaseModel

Body for POST /datasets/import-by-reference.

The lab calls this when the train / eval parquets and the manifest already live in the artifact store (filesystem dump, MinIO upload from a prior environment, salvage replay, etc.). PROTEA registers a Dataset row pointing at those URIs verbatim. No job is enqueued; the artefacts are not re-read or copied.

The lab passes the fields it already has from its own manifest.json so the registry row is content-identical to what an in-PROTEA export_research_dataset run would have produced.

annotation_source: str
embedding_config_id: str | None
eval_snapshot_pair: str | None
eval_uri: str | None
external_source: str | None
force: bool
k: int
key_prefix: str
manifest_sha: str | None
manifest_uri: str
meta: dict[str, Any]
model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'annotation_source': 'goa', 'embedding_config_id': 'c0ae5b69-d6dc-41cf-a711-1739d3d2e170', 'eval_snapshot_pair': 'v226-v230', 'eval_uri': 'file:///home/frapercan/Thesis2/repositories/protea-reranker-lab/datasets/bench-v1-K5-v226-lineage-prostt5/eval.parquet', 'external_source': 'protea-reranker-lab@059db19', 'force': False, 'k': 5, 'key_prefix': 'datasets/bench-v1-K5-v226-lineage-prostt5/', 'manifest_sha': 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff', 'manifest_uri': 'file:///home/frapercan/Thesis2/repositories/protea-reranker-lab/datasets/bench-v1-K5-v226-lineage-prostt5/manifest.json', 'n_eval_rows': 1066859, 'n_train_rows': 24351779, 'name': 'bench-v1-K5-v226-lineage-prostt5', 'ontology_snapshot_id': '35c3ad67-3002-47db-8f71-eeed69d22ad6', 'producer_git_sha': '059db1907c5208a965238e8e6682184fb83537be', 'producer_version': '0.8.0', 'schema_sha': '6d97a624b8a7', 'storage_backend': 'local', 'train_snapshot_pairs': ['v220-v226'], 'train_uri': 'file:///home/frapercan/Thesis2/repositories/protea-reranker-lab/datasets/bench-v1-K5-v226-lineage-prostt5/train.parquet'}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

n_eval_rows: int
n_train_rows: int
name: str
ontology_snapshot_id: str | None
producer_git_sha: str | None
producer_version: str | None
schema_sha: str
storage_backend: str
train_snapshot_pairs: list[str]
train_uri: str | None
protea.api.routers.datasets.create_dataset(request: Request, response: Response, body: CreateDatasetRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]

Enqueue an export_research_dataset job.

Returns {job_id}. Poll GET /jobs/{job_id} for status; once the job is SUCCEEDED, GET /datasets/{name} returns the registered row with its artifact URIs.

protea.api.routers.datasets.download_dataset_artifact(dataset_id: str, artifact: str = Query(PydanticUndefined), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), settings: Settings = Depends(dependency=<function get_settings>, use_cache=True, scope=None)) Response

Mint a presigned download URL (MinIO) or stream the file (local).

For storage_backend=minio the endpoint 302-redirects to a 15-minute presigned GET URL. For storage_backend=local the artifact bytes are streamed inline. See datasets_detail.

protea.api.routers.datasets.get_dataset(id_or_name: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Resolve a dataset by UUID or by the name slug.

Tries the UUID path first; on ValueError (non-UUID input), falls back to the name column. Returns 404 if neither resolves. The lab uses the name path so dump callers can refer to bench-v1-K5 without juggling UUIDs.

protea.api.routers.datasets.get_dataset_stats(dataset_id: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Return per-aspect protein / GO-term / annotation counts for a dataset.

Reads from dataset.meta['aspect_stats'] when present (populated by backfill or a previous call). On a cache miss the counts are computed live and written back. See datasets_detail for the implementation.

protea.api.routers.datasets.import_dataset_by_reference(body: ImportDatasetByReferenceRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Register a Dataset row whose artefacts are already staged.

The lab uploads (or simply leaves on disk) the parquets and manifest.json itself and posts the URIs + manifest fields here. PROTEA persists a Dataset row pointing at those URIs without re-reading the artefacts. Useful for benches the lab produced before export_research_dataset existed, for replays after a DB wipe, and for the bench-v1-K5-v226-lineage-prostt5 LB.1 bootstrap.

The optional embedding_config_id and ontology_snapshot_id are resolved against the local DB and NULL’d when missing, so the insert never fails on a stale FK. schema_sha_v2 is dual-written when the PROTEA_SCHEMA_SHA_V2_WRITE_ENABLED flag is on (T1.6).

Returns 201 with the row’s id + name. 409 on a duplicate name unless force=true was passed.

protea.api.routers.datasets.list_datasets(name_like: str | None = Query(None), embedding_config_id: UUID | None = Query(None), limit: int = Query(50), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

Return registered frozen datasets newest-first.

The lab’s pull_dataset.py polls this endpoint to discover dump artefacts produced by export_research_dataset. Filters narrow by name substring or by source embedding_config_id. Pagination is cursor-based (after) plus a hard limit ceiling.

Registry router

The /backends, /sources, and /runners endpoints list the plugins discovered at runtime via importlib.metadata.entry_points for the three plugin groups: embedding backends, annotation sources, and experiment runners. The router is intentionally stateless: it re-scans entry points on every call rather than caching, so a worker that has just been restarted with a newly-installed extra surfaces in the next request without an API restart.

Plugin registry endpoints.

Three read-only endpoints listing the plugins discovered at runtime via importlib.metadata.entry_points:

  • GET /backends: embedding backend plugins (protea.backends)

  • GET /sources: annotation source plugins (protea.sources)

  • GET /runners: experiment runner plugins (protea.runners)

Each response is a flat list of PluginInfo records describing the entry-point name, class, module path, and any plugin-specific metadata exposed via attributes (e.g. AnnotationSource.version).

The endpoints are intentionally stateless: they re-scan entry_points on every call rather than caching, so a worker that’s just been restarted with a newly-installed extra surfaces in the next request without an API restart. The scan is cheap (sub-ms on the working set of ~10 plugins).

class protea.api.routers.registry.PluginInfo(*, name: str, cls: str, module: str, extra: dict[str, ~typing.Any]=<factory>)

Bases: BaseModel

Metadata for one discovered plugin.

cls: str
extra: dict[str, Any]
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

module: str
name: str
class protea.api.routers.registry.PluginListResponse(*, group: str, plugins: list[PluginInfo])

Bases: BaseModel

Response shape for the three registry endpoints.

group: str
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

plugins: list[PluginInfo]
protea.api.routers.registry.list_backends() PluginListResponse

List all installed embedding backend plugins.

The plugin set depends on which protea-backends[<extra>] extras are installed (esm, t5, ankh, esm3c). With the default install all four are discoverable; only the ones whose lazy imports succeed at stream_* time will actually run on GPU.

protea.api.routers.registry.list_runners() PluginListResponse

List all installed experiment runner plugins.

Today: baseline, knn, lightgbm. The latter two are contract-surface stubs until F2A.7 (lab → protea-runners .lightgbm migration) and F2C.1 (protea-method extraction) move the real implementations here.

protea.api.routers.registry.list_sources() PluginListResponse

List all installed annotation source plugins.

Today: goa, quickgo, uniprot (all real after F2A.6-real). The extra.version field surfaces the AnnotationSource.version declared on each plugin (e.g. "uniprot-goa", "quickgo-rest").

Reranker models router

The /reranker-models router accepts boosters trained offline in protea-reranker-lab (or any compatible trainer) and registers them in PROTEA. POST /reranker-models/import is the multipart flow: the lab sends model.txt + spec.yaml + run.json inline and the server uploads model.txt to the artifact store under rerankers/<run_id>/. POST /reranker-models/import-by-reference is the production flow: the lab pre-uploads model.txt to MinIO under its own key and posts JSON with artifact_uri + run_json + spec_yaml. Both flows share _register_model so the resulting RerankerModel row is identical.

Re-ranker model registry.

POST /reranker-models/import accepts a trained booster from the protea-reranker-lab (or any offline trainer), uploads it to the configured artifact store, and inserts a RerankerModel row linked back to the Dataset it was trained on. This replaces the in-PROTEA LightGBM training path (see Phase 4 of the decoupling plan).

Both multipart and JSON-by-reference flows are supported:

  • multipart: lab sends model.txt + spec.yaml + run.json inline. Server uploads the booster to rerankers/<run_id>/model.txt. Simpler for dev.

  • by-reference: lab pre-uploads model.txt to MinIO under its own key and POSTs JSON with artifact_uri + run_json + spec_yaml text. Cleaner for prod.

Both flows share _register_model so the DB shape is identical.

class protea.api.routers.reranker_models.ImportRerankerByReferenceRequest(*, artifact_uri: Annotated[str, MinLen(min_length=1)], spec_yaml: str, run: dict[str, Any], name: str | None = None, dataset_id: str | None = None, external_source: str | None = None, prediction_set_id: str | None = None, evaluation_set_id: str | None = None, force: bool = False)

Bases: BaseModel

Body for POST /reranker-models/import-by-reference.

Use this when the lab has already uploaded model.txt to MinIO under its own key and just needs PROTEA to register the URI.

artifact_uri: str
dataset_id: str | None
evaluation_set_id: str | None
external_source: str | None
force: bool
model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'artifact_uri': 's3://protea-rerankers/runs/r1/model.txt', 'dataset_id': '00000000-0000-0000-0000-000000000003', 'evaluation_set_id': '00000000-0000-0000-0000-000000000005', 'external_source': 'protea-reranker-lab@cec8ccd', 'force': False, 'name': 'r1-k5-bench-v1', 'prediction_set_id': '00000000-0000-0000-0000-000000000004', 'run': {'feature_schema_sha': 'ab12cd34ef56', 'metrics': {'fmax': 0.5427}, 'run_id': 'r1'}, 'spec_yaml': '# ExperimentSpec contents\nname: r1\nfeature_families: [embedding, alignment]\n'}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str | None
prediction_set_id: str | None
run: dict[str, Any]
spec_yaml: str
protea.api.routers.reranker_models.import_reranker_model_by_reference(body: ImportRerankerByReferenceRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Register a RerankerModel whose booster is already in MinIO.

The lab uploads the booster directly (faster, no double-hop) and POSTs the URI + run.json + spec.yaml here. Server does not re-read the artifact; it trusts the URI.

async protea.api.routers.reranker_models.import_reranker_model_multipart(files: _RerankerImportFiles = Depends(dependency=<function _reranker_import_files_dep>, use_cache=True, scope=None), fields: _RerankerImportFields = Depends(dependency=<function _reranker_import_fields_dep>, use_cache=True, scope=None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Upload a trained booster and register a RerankerModel row.

The three files (model.txt, spec.yaml, run.json) mirror the artefacts produced by protea-reranker-lab under runs/<name>/. Wire format unchanged: the FastAPI deps expose every File/Form field as a discrete multipart part.

Stack router

The /stack router exposes metadata about the eight-repo PROTEA stack to the UI. GET /stack returns the registry from docs/source/_data/stack.yaml. GET /stack/pulls aggregates open pull requests across every repo in the stack via the GitHub REST API and caches the result in-process to stay under the unauthenticated 60 req/h rate limit (set PROTEA_GITHUB_TOKEN to lift to 5000 req/h).

PROTEA stack metadata + cross-repo PR listing.

Two read-only endpoints intended to power the /stack page in the UI:

  • GET /stack returns the eight-repo registry from docs/source/_data/stack.yaml.

  • GET /stack/pulls proxies GitHub’s /repos/{owner}/{repo}/pulls endpoint for every repo and aggregates the open PRs into a single list. Useful when bouncing between repositories during review.

The PR listing is cached in-process for _PULLS_TTL_SECONDS to keep the unauthenticated GitHub rate limit (60 req/h) from being a problem. Set PROTEA_GITHUB_TOKEN (or any token in GITHUB_TOKEN / GH_TOKEN) to lift the limit to 5000 req/h.

class protea.api.routers.stack.PullRequest(*, repo: str, number: int, title: str, url: str, state: str, draft: bool, author: str | None, created_at: str, updated_at: str, labels: list[str])

Bases: BaseModel

One open PR in the stack as reported by GitHub’s REST API.

Used by the stack landing page’s PR widget; the payload is the intersection of fields the UI actually renders, not a full echo of GitHub’s response.

author: str | None
created_at: str
draft: bool
labels: list[str]
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

number: int
repo: str
state: str
title: str
updated_at: str
url: str
class protea.api.routers.stack.PullsResponse(*, fetched_at: float, cached: bool, repos_queried: int, pulls: list[~protea.api.routers.stack.PullRequest], rate_limit_remaining: int | None = None, errors: dict[str, str] = <factory>)

Bases: BaseModel

Aggregated open-PR snapshot across all stack repos.

The handler caches the GitHub query for a few minutes; cached flips to True when a response is served from the in-process cache, fetched_at records the original wall-clock time, and errors carries per-repo failures (e.g. rate-limited, 404).

cached: bool
errors: dict[str, str]
fetched_at: float
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

pulls: list[PullRequest]
rate_limit_remaining: int | None
repos_queried: int
class protea.api.routers.stack.RepoEntry(*, name: str, slug: str, role: str, role_label: str, status: str, summary: str, github_url: str, docs_url: str | None = None, package_url: str | None = None, local_docs_path: str | None = None)

Bases: BaseModel

One repository row in the multi-repo stack landing page.

Each entry represents a sibling git repo (PROTEA itself or one of the plugin/lab packages). role is the architectural slot (core, contracts, plugin, lab); status is a coarse health signal sourced from the repo’s CI / release state.

docs_url: str | None
github_url: str
local_docs_path: str | None
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
package_url: str | None
role: str
role_label: str
slug: str
status: str
summary: str
class protea.api.routers.stack.StackResponse(*, repos: list[RepoEntry], thesis_pdf_url: str | None = None)

Bases: BaseModel

Top-level payload for GET /stack.

Lists every repository in the PROTEA family plus the link to the canonical thesis PDF. Consumed by the frontend’s stack overview page and the docs portal sidebar.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

repos: list[RepoEntry]
thesis_pdf_url: str | None
protea.api.routers.stack.get_stack() StackResponse

Return the eight-repo PROTEA stack registry.

Single source of truth: docs/source/_data/stack.yaml in this repo. Edit that file (and run scripts/sync_stack.py) to refresh the README block and the Sphinx page in the same commit.

Per-repo local_docs_path and the top-level thesis_pdf_url are computed from the filesystem at request time: the field is populated whenever the corresponding artefact has been built into docs/build/<slug>/html/ or apps/web/public/thesis.pdf respectively, and is None otherwise.

protea.api.routers.stack.list_open_pulls() PullsResponse

Aggregate open pull requests across every repo in the stack.

Cached in-process for five minutes. Pass an optional PROTEA_GITHUB_TOKEN env var to use authenticated requests (rate limit 5000/h instead of 60/h).

Experiment runs router

The /experiment-runs router exposes CRUD over the ExperimentRun ORM (T4.7-T4.9, decision D11). One row aggregates multiple Job / EvaluationResult / RerankerModel rows under a unique human name and carries the narrative trio (description / hypothesis / findings) plus JSONB config / provenance and Text[] tags. PATCH /experiment-runs/{run_id} accepts partial updates; status transitions stamp started_at (on planned running) and finished_at (on running done or abandoned) idempotently: re-entering a state never resets its timestamp.

ExperimentRun narrative endpoints (T4.7-T4.9 of master plan v3.2).

Surfaces the ORM created in T3.8 so the F8b Experiments page (T8b.5) and CLI tooling can manage research-run metadata. Schema mirrors the JSON shape exposed by the jobs router for consistency.

Endpoints

  • POST   /experiment-runs : create (status=planned).

  • GET    /experiment-runs : list, optional status filter.

  • GET    /experiment-runs/{id} : fetch one.

  • PATCH  /experiment-runs/{id}update narrative + status +

    provenance overlay; transitions stamp started_at / finished_at automatically.

  • DELETE /experiment-runs/{id} : remove (rare; mostly drafts).

Linkage to Job / EvaluationResult / RerankerModel rows is intentionally out of scope here; the F-EXP campaign work (T-EXP.1-T-EXP.7) defines the join shape once it lands.

class protea.api.routers.experiment_runs.CreateExperimentRunRequest(*, name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1)], description: str | None = None, hypothesis: str | None = None, config: dict[str, ~typing.Any] = <factory>, provenance: dict[str, ~typing.Any] = <factory>, tags: list[str] = <factory>)

Bases: BaseModel

Body for POST /experiment-runs.

Carries the narrative trio (description / hypothesis / findings) plus structured config + provenance overlays that the F-EXP campaign tooling reads back. New rows always start in planned status; transitions happen via PATCH.

config: dict[str, Any]
description: str | None
hypothesis: str | None
model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'config': {'K_values': [3, 5, 10], 'embedding_backend': 'esm2'}, 'description': 'Sweep K in {3, 5, 10} on the bench-v1 dataset.', 'hypothesis': 'Larger K hurts PK but is neutral on NK/LK.', 'name': 'ablation-K-2026-05-09', 'provenance': {'campaign': 'bench-v1'}, 'tags': ['ablation', 'K-sweep']}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
provenance: dict[str, Any]
classmethod strip_name(v: str) str
tags: list[str]
class protea.api.routers.experiment_runs.UpdateExperimentRunRequest(*, description: str | None = None, hypothesis: str | None = None, findings: str | None = None, status: ExperimentRunStatus | None = None, config: dict[str, Any] | None = None, provenance: dict[str, Any] | None = None, tags: list[str] | None = None)

Bases: BaseModel

All fields optional; absent ones leave the column untouched.

config: dict[str, Any] | None
description: str | None
findings: str | None
hypothesis: str | None
model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'findings': 'Fmax +0.03 on bench-v1-K5 vs K=3; PK regressed -0.005.', 'status': 'done', 'tags': ['ablation', 'K-sweep', 'results-in']}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

provenance: dict[str, Any] | None
status: ExperimentRunStatus | None
tags: list[str] | None
protea.api.routers.experiment_runs.create_experiment_run(body: CreateExperimentRunRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Create a new ExperimentRun row in planned status.

protea.api.routers.experiment_runs.delete_experiment_run(run_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) None

Permanently delete a run. Mostly used for cleaning up draft rows.

protea.api.routers.experiment_runs.get_experiment_run(run_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Fetch a single run by id.

protea.api.routers.experiment_runs.list_experiment_runs(status: ExperimentRunStatus | None = Query(None), limit: int = Query(50), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

Return runs newest-first, optionally filtered by status.

Pagination is cursor-based: pass after=<created_at> to get the next page. Microsecond resolution on created_at keeps tie collisions astronomically rare.

protea.api.routers.experiment_runs.update_experiment_run(run_id: UUID, body: UpdateExperimentRunRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Patch narrative fields, status, config, provenance, or tags.

Only fields explicitly present in the request body are updated; omitted fields leave the column untouched. Status transitions stamp started_at / finished_at per the rules in _stamp_status_transition().

Services layer

Each router delegates non-trivial business logic to a service module. Services are pure Python: they accept a SQLAlchemy session and return domain objects or raise domain exceptions. Routers map those exceptions to HTTP status codes. This separation allows the same logic to be exercised from CLI tools or batch scripts without importing FastAPI. Full symbol-level documentation lives in Services.

Jobs service: shared helpers for the queue-dispatch pattern.

Multiple routers (annotations, embeddings, predictions) follow the same shape when an HTTP endpoint queues background work:

  1. Pydantic-validate the request body.

  2. Insert a Job row with the canonical operation name + queue.

  3. Insert a matching JobEvent (job.created) for the audit log.

  4. Publish to RabbitMQ via protea.infrastructure.queue.publisher.publish_job().

  5. Return {"id": ..., "status": "queued"}.

This module exposes enqueue_job() (steps 2–3) and the higher- level dispatch_validated_job() (steps 1–5) so routers collapse to a single try/except.

exception protea.services.jobs_service.InvalidJobPayloadError(errors: Any)

Bases: Exception

Pydantic validation failed for a queue-dispatch request body.

Carries the structured errors list produced by Pydantic so the router can pass it through verbatim as the HTTP 422 detail.

protea.services.jobs_service.compute_dedup_key(operation: str, payload: dict[str, Any]) str

Return a 16-hex-char deduplication key for (operation, payload).

The key is the first 16 hex digits of the SHA-256 of a canonical JSON serialisation of {"operation": ..., "payload": ...} (keys sorted, ASCII-safe). Truncated to 16 chars (64-bit prefix) — collision probability is negligible for the expected job volume.

The 16-char length fits comfortably in VARCHAR(64) and keeps the partial unique index compact.

protea.services.jobs_service.dispatch_validated_job(factory: sessionmaker[Session], amqp_url: str, body: dict[str, Any], payload_model: type[BaseModel], *, operation: str, queue_name: str) dict[str, Any]

End-to-end queue dispatch: validate, persist, publish, respond.

Pydantic-validates body against payload_model (raising InvalidJobPayloadError on failure for the router to map to a 422), inserts a Job + JobEvent pair inside a fresh session, then publishes to RabbitMQ. Returns the canonical {"id": <uuid>, "status": "queued"} response shape used by every dispatch endpoint.

protea.services.jobs_service.enqueue_job(session: Session, *, operation: str, queue_name: str, payload: dict[str, Any]) UUID

Insert Job + JobEvent rows for a background task.

Returns the new job’s UUID. The caller is responsible for publishing to the queue (via protea.infrastructure.queue.publisher.publish_job()) after committing this session, and for any payload validation that should happen before the row hits the database.

Both rows are flushed but not committed; the caller’s session_scope context manager owns the transaction.

Annotations service: pure-logic helpers extracted from protea.api.routers.annotations.

ORM ↔ dict serialisers and the read-side handlers (snapshot/IA-url operations) live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.

The router translates the domain exceptions raised here to HTTP responses:

  • EntityNotFoundError404 Not Found (e.g. an OntologySnapshot or AnnotationSet UUID does not resolve).

exception protea.services.annotations_service.AnnotationSetReferencedError

Bases: AnnotationsServiceError

An AnnotationSet cannot be deleted because PredictionSet rows still reference it; the FK CASCADE is intentionally absent. Maps to HTTP 409 at the router boundary.

exception protea.services.annotations_service.AnnotationsServiceError

Bases: Exception

Base class for annotations-service domain errors.

exception protea.services.annotations_service.EntityNotFoundError(entity: str, entity_id: UUID)

Bases: AnnotationsServiceError

Generic 404; a referenced entity does not exist.

Pickle-safe via __reduce__ so the structured entity / entity_id attrs survive a round-trip without tripping flake8-bugbear B042.

protea.services.annotations_service.annotation_set_to_dict(a: AnnotationSet, count: int) dict[str, Any]

Serialise an AnnotationSet to its API dict shape.

protea.services.annotations_service.delete_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]

Delete an annotation set and all its annotations.

Returns the deletion summary dict.

Raises:

protea.services.annotations_service.delete_eval_result_collect_keys(session: Session, eval_id: UUID, result_id: UUID) list[str]

Delete the EvaluationResult and return the artifact keys to clean up.

Same split as delete_evaluation_set_collect_keys(): the DB delete happens here; the artifact-store deletion is the router’s responsibility (it owns the ArtifactStore factory).

protea.services.annotations_service.delete_evaluation_set_collect_keys(session: Session, eval_id: UUID) list[str]

Delete the EvaluationSet and return the artifact-store keys to clean.

The DB delete cascades to EvaluationResult rows; this helper walks the results before deleting and returns the union of all artifact keys those rows referenced (per-result cafaeval outputs) so the caller can wipe them from the store. The caller is also expected to delete the set’s ground-truth artifact via protea.core.evaluation.groundtruth_key_for(eval_id); that key is not included here because it is a fixed function of eval_id.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.evaluation_result_to_dict(r: EvaluationResult) dict[str, Any]

Serialise an EvaluationResult to its API dict shape.

protea.services.annotations_service.evaluation_set_to_dict(e: EvaluationSet) dict[str, Any]

Serialise an EvaluationSet to its API dict shape.

protea.services.annotations_service.get_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]

Return a single annotation set with its annotation count.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.get_eval_result_with_keys(session: Session, eval_id: UUID, result_id: UUID) tuple[EvaluationResult, list[str]]

Fetch an EvaluationResult belonging to eval_id; return (row, artifact_keys).

Raises EntityNotFoundError (“EvaluationResult”) when the result does not exist or does not belong to eval_id.

protea.services.annotations_service.get_evaluation_set_data(session: Session, eval_id: UUID) dict[str, Any]

Return a single evaluation set.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.get_go_subgraph_data(session: Session, snapshot_id: UUID, go_ids: str, depth: int) dict[str, Any]

BFS the GO DAG upward from the requested seed terms.

Returns {"nodes": [...], "edges": [...]} ready for the API. Each node has id (DB id), go_id, name, aspect, is_query (True for the seed terms). Each edge has source (child id), target (parent id), relation_type.

Raises EntityNotFoundError when the snapshot does not resolve. Imports it lazily to avoid the circular dependency with the re-exporting annotations_service module.

protea.services.annotations_service.get_snapshot_data(session: Session, snapshot_id: UUID) dict[str, Any]

Return a single snapshot with its GO term count.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.iter_delta_proteins_fasta(session: Session, eval_id: UUID, category: str) list[str]

Return FASTA lines for delta proteins (nk / lk / pk / all).

Only proteins whose sequence is in the DB are emitted. Header is >ACCESSION entry_name OS=organism OX=taxon (NK|LK|PK); the sequence is wrapped at 60 chars per line.

Empty result returns an empty list. Raises EntityNotFoundError if the EvaluationSet does not resolve. Imports it lazily to avoid the circular dependency with the re-exporting annotations_service module.

protea.services.annotations_service.iter_groundtruth_tsv(session: Session, eval_id: UUID, category: str) list[str]

Return the rows for a CAFA ground_truth_<CATEGORY>.tsv download.

category is "nk", "lk", "pk" or "known". Each row is "<protein>\t<go_id>\n"; sorted by protein then GO id so the output is deterministic. The caller wraps the list in a StreamingResponse (the materialised list is small enough, a few thousand rows for typical CAFA splits, to fit in memory and keeps the streaming generator simple).

Raises EntityNotFoundError when the EvaluationSet does not resolve.

protea.services.annotations_service.list_annotation_sets_data(session: Session, source: str | None = None) list[dict[str, Any]]

List all annotation sets with their per-set annotation counts (newest first).

Optionally filter by source (e.g. goa or quickgo). Pure read; the caller caches at the API boundary.

protea.services.annotations_service.list_evaluation_results_data(session: Session, eval_id: UUID) list[dict[str, Any]]

List EvaluationResult rows for one EvaluationSet (newest first).

Raises EntityNotFoundError when the EvaluationSet does not resolve.

protea.services.annotations_service.list_evaluation_sets_data(session: Session) list[dict[str, Any]]

List all evaluation sets, newest first.

protea.services.annotations_service.list_snapshots_data(session: Session) list[dict[str, Any]]

Return all loaded snapshots with their GO term counts (newest first).

Pure read; the caller is responsible for caching at the API boundary if desired (the GROUP BY over the multi-million row go_term table is the slow part).

protea.services.annotations_service.render_evaluation_metrics_tsv(result: EvaluationResult, aspect_codes: tuple[str, ...]) Any

Yield TSV rows for the per-(setting, namespace) metrics summary.

The caller passes the aspect-codes tuple (ASPECT_CAFA_CODES) so the service stays free of the domain layer. Returns a generator suitable for StreamingResponse.

protea.services.annotations_service.set_snapshot_ia_url(session: Session, snapshot_id: UUID, ia_url: str | None) dict[str, Any]

Update the IA URL on a snapshot. Empty string is treated as None.

Returns a small confirmation dict shape compatible with the legacy endpoint. Raises EntityNotFoundError for the 404 path. The caller (router) is responsible for validating request body shape (e.g. presence of the ia_url key) before calling.

protea.services.annotations_service.snapshot_to_dict(s: OntologySnapshot, term_count: int) dict[str, Any]

Serialise an OntologySnapshot to its API dict shape.

Embeddings service: pure-logic helpers extracted from protea.api.routers.embeddings.

Validation rules, ORM ↔ dict serialisers, and the predictions-TSV streaming generator live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.

The router translates the domain exceptions raised here to HTTP responses:

  • InvalidEmbeddingConfigError422 Unprocessable Entity (validation errors carry a list of human-readable messages in .errors).

  • EntityNotFoundError404 Not Found (e.g. a PredictionSet UUID does not resolve).

exception protea.services.embeddings_service.EmbeddingsServiceError

Bases: Exception

Base class for embeddings-service domain errors.

exception protea.services.embeddings_service.EntityNotFoundError(entity: str, entity_id: UUID)

Bases: EmbeddingsServiceError

Generic 404; a referenced entity does not exist.

Construct with the entity label (e.g. "PredictionSet") and the looked-up UUID; the message becomes "<entity> not found". Pickle-safe via __reduce__ so the structured entity / entity_id attributes survive a round-trip without tripping flake8-bugbear B042.

exception protea.services.embeddings_service.InvalidEmbeddingConfigError(errors: list[str])

Bases: EmbeddingsServiceError

Validation failure for an EmbeddingConfig request body.

errors carries a list of human-readable messages, one per failed rule, suitable for inclusion in the HTTP 422 response body.

exception protea.services.embeddings_service.InvalidUUIDFieldError(field: str)

Bases: EmbeddingsServiceError

Predict request body had a field that does not parse as UUID.

Carries the offending field name in field; the router translates this to 422 with detail "<field> must be a valid UUID".

protea.services.embeddings_service.assert_prediction_set_exists(session: Session, prediction_set_id: UUID) None

Raise EntityNotFoundError if the PredictionSet UUID is unknown.

protea.services.embeddings_service.config_to_dict(c: EmbeddingConfig, embedding_count: int | None = None) dict[str, Any]

Serialise an EmbeddingConfig ORM row to its API dict shape.

The embedding_count field is only included when the caller has a number to report (the bare GET /configs/{id} endpoint does not).

protea.services.embeddings_service.delete_embedding_config_cascade(session: Session, config_id: UUID) dict[str, Any]

Cascade-delete an EmbeddingConfig and all linked rows.

Raises EntityNotFoundError when config_id does not resolve. Body lives in _embeddings_admin_helpers.cascade_delete_embedding_config().

protea.services.embeddings_service.delete_prediction_set_cascade(session: Session, prediction_set_id: UUID) dict[str, Any]

Delete a PredictionSet and all its GOPrediction rows.

Returns {"deleted": <id>, "predictions_deleted": <count>}. Raises EntityNotFoundError when the UUID does not resolve so the router can translate to 404.

protea.services.embeddings_service.get_go_term_distribution_data(session: Session, *, prediction_set_id: UUID, limit: int = 50) dict[str, Any]

Return the most-frequent GO terms predicted in this set + per-aspect totals.

Raises EntityNotFoundError when the PredictionSet does not resolve. Body lives in _embeddings_admin_helpers.compute_go_term_distribution().

protea.services.embeddings_service.get_prediction_set_data(session: Session, prediction_set_id: UUID) dict[str, Any]

Retrieve a prediction set with total + per-protein GO term counts.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.embeddings_service.get_predictions_for_protein(session: Session, *, prediction_set_id: UUID, accession: str) list[dict[str, Any]]

Return all predicted GO terms for one protein, sorted by distance.

Raises EntityNotFoundError when the PredictionSet does not resolve. (No 404 for unknown accession; returns empty list, matching the legacy endpoint’s behaviour.)

protea.services.embeddings_service.iter_predictions_cafa_tsv(factory: sessionmaker[Session], *, prediction_set_id: UUID, aspect: str | None, max_distance: float | None, delta_proteins: set[str] | None) Iterator[str]

Stream the CAFA-format prediction TSV.

DB-level deduplication: a GROUP BY (protein_accession, go_term_id) + MIN(distance) subquery keeps the best row per pair so the Python side never needs an unbounded seen set; true streaming. Score is max(0.0, 1.0 - distance) clamped to [0, 1].

protea.services.embeddings_service.iter_predictions_tsv(factory: Any, *, prediction_set_id: UUID, accession: str | None = None, aspect: str | None = None, max_distance: float | None = None) Iterator[str]

Yield TSV rows (as str) of every GOPrediction in a set.

Opens its own session inside the generator so the caller’s existence-check session can close cleanly. The first yielded chunk is the header line; one row per (GOPrediction, GOTerm) pair follows, ordered by (protein_accession, distance).

Optional filters: accession (single query protein), aspect (F / P / C), max_distance.

protea.services.embeddings_service.list_prediction_sets_data(session: Session) list[dict[str, Any]]

Top 100 most-recent PredictionSet rows joined with their context.

Returns a list of dicts each carrying the embedding-config name, annotation-set label, ontology version, plus the per-set prediction_count. The per-set count comes from a single GROUP BY over GOPrediction (one index-only scan) rather than a correlated subquery; for ~10⁷-row tables Postgres’ planner falls into a per-row index probe with the correlated form (~30s per outer row). The grouped form returns all 100 counts at once.

protea.services.embeddings_service.list_proteins_in_prediction_set(session: Session, *, prediction_set_id: UUID, search: str | None = None, limit: int = 50, offset: int = 0) dict[str, Any]

Paginated list of proteins in a prediction set with derived stats.

For each row returns go_count (number of predicted terms), min_distance (closest neighbour), annotation_count (known annotations against the same AnnotationSet) and match_count (predictions whose (protein, go_id) is in the known set; a precision proxy).

Decomposed into private helpers (_paginate_protein_rows, _load_protein_orm_map, _load_annotation_counts, _load_match_counts) so this orchestrator stays under the §3 method-LOC ceiling.

Raises EntityNotFoundError (imported lazily to avoid the circular dependency with embeddings_service) when prediction_set_id does not resolve.

protea.services.embeddings_service.prepare_cafa_export(session: Session, *, prediction_set_id: UUID, eval_id: UUID | None) set[str] | None

Preflight CAFA export: validate the PredictionSet exists and, if an EvaluationSet was supplied, compute the union of NK + LK delta proteins to restrict the export.

Returns the delta-protein accession set when eval_id is provided (the streaming generator filters on it), otherwise None.

Raises EntityNotFoundError for missing PredictionSet or EvaluationSet so the router can translate to 404.

protea.services.embeddings_service.validate_embedding_config_body(body: dict[str, Any]) dict[str, Any]

Validate a request body for POST /embeddings/configs.

Returns the canonicalised dict (defaults filled in) on success. Raises InvalidEmbeddingConfigError (imported lazily to avoid the circular dep with embeddings_service) with the full list of failures otherwise; the router translates that to a 422 with the same shape it produced before extraction.

Decomposed into per-field-group helpers so neither this orchestrator nor any helper breaches the 60-LOC method ceiling.

protea.services.embeddings_service.validate_predict_request(session: Session, body: dict[str, Any]) dict[str, UUID]

Parse + validate the three required UUID fields of a predict request.

Returns a dict mapping field name to its parsed uuid.UUID. Raises InvalidUUIDFieldError for parse failures (router → 422) or EntityNotFoundError if a referenced entity does not exist (router → 404). Field order is preserved so the first failure wins, matching the previous in-router behaviour.

Authentication helpers

protea.api.auth implements the credential-verification layer. It exposes require_api_key_or_bearer, a FastAPI dependency that accepts three header forms (Authorization: ApiKey, X-Api-Key, or Authorization: Bearer). The API-key path computes a SHA-256 hash of the raw key and compares it against the database; the Bearer path verifies an HS256 JWT. A missing or invalid credential returns 401 with a WWW-Authenticate challenge.

API key authentication primitives (T5.6a — first iteration).

This module owns:

  • the constant-time helper functions that hash and verify a raw API key,

  • the FastAPI dependency require_api_key(),

  • the small set of env knobs that gate the dependency in dev.

Header format

Two equivalent header shapes are accepted (mirroring the conventions used by most public APIs):

  • Authorization: ApiKey <key>

  • X-Api-Key: <key>

Both are checked; whichever arrives first wins. The dependency returns the matched ApiKey row so downstream handlers can audit the caller (currently unused, but the hook is in place).

Hashing

Keys are stored as sha256 hex digests. sha256 is fine here because the raw key has 32 bytes of entropy already (192 bits in base64, well above what an offline brute-force can hope to crack). We use hmac.compare_digest() for the verification step to avoid timing side-channels on the hash comparison.

Env knobs

  • PROTEA_AUTHN_REQUIRED (default true) — when false, the dependency short-circuits and waves every request through. Useful for local development; production deployments must leave it set.

protea.api.auth.generate_raw_key() str

Generate a fresh random API key (43 url-safe chars, 192 bits).

The returned string is the value handed to the caller exactly once. Hash + prefix are derived from it via hash_key() / prefix_of().

protea.api.auth.hash_key(raw: str) str

Return the sha256 hex digest used for the key_hash column.

Wrapper exists so the algorithm can be swapped later (Argon2id, for instance, if we ever move to short user-chosen secrets) without grepping the codebase.

protea.api.auth.prefix_of(raw: str) str

Return the first PREFIX_LEN characters of raw.

Used as the display handle in API responses and as the indexed lookup column on the api_key table.

protea.api.auth.require_api_key(request: Request, background_tasks: BackgroundTasks, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), authorization: str | None = Header(None), x_api_key: str | None = Header(None)) ApiKey | None

FastAPI dependency that validates an API key on the request.

Behaviour:

  1. If PROTEA_AUTHN_REQUIRED is falsy, return None — gate disabled (dev stack only).

  2. Read the raw key from Authorization: ApiKey <key> or X-Api-Key: <key>. Missing → 401.

  3. Look up by the 8-char prefix (indexed) and compare hashes in constant time. Mismatch or revoked → 401.

  4. Schedule a background last_used_at update so the request is not blocked on the write.

The matched ApiKey snapshot is returned to the route handler for downstream audit. Routes that wire this as a router-level dependency typically ignore the return value.

protea.api.auth.require_api_key_or_bearer(request: Request, background_tasks: BackgroundTasks, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), authorization: str | None = Header(None), x_api_key: str | None = Header(None)) ApiKey | BearerPrincipal | None

Accept either an API key or a Bearer JWT on the same route.

Resolution order:

  1. PROTEA_AUTHN_REQUIRED falsy → short-circuit, return None.

  2. Authorization: Bearer <jwt> → validate JWT, return BearerPrincipal.

  3. Otherwise fall back to the T5.6a ApiKey flow.

Bearer wins when both happen to be present so a misconfigured client cannot downgrade to the weaker scheme by sending both headers. Failure modes return 401 with the matching WWW-Authenticate header.

protea.api.bearer provides the HS256 JWT verification utilities used by auth.require_api_key_or_bearer. Minimum required claims are sub, iat, and exp.

Bearer JWT authentication (T5.6b — second auth iteration).

Adds an Authorization: Bearer <jwt> flow alongside the API-key dependency from T5.6a. Both are accepted via the combined dependency require_api_key_or_bearer().

Algorithm

  • HS256 with a shared secret from PROTEA_JWT_SECRET.

  • Minimum payload: {sub, exp, iat}. aud and iss are accepted if present; we do not validate them in this iteration.

  • On startup, when PROTEA_AUTHN_REQUIRED=true AND PROTEA_JWT_SECRET is missing the API process must fail loudly — see assert_bearer_config() invoked from create_app.

Why HS256 (not RS256)?

PROTEA does not issue tokens itself in this slice (T5.6b is consumer only). The thesis dev stack signs tokens out-of-band with a shared secret and the secret is rotated manually. RS256 / OIDC lands in T5.6c (post-defensa) together with the oauth2-proxy fronting layer.

class protea.api.bearer.BearerPrincipal(sub: str, claims: dict[str, Any])

Bases: object

Subject + raw claims surfaced to handlers that want them.

Mirrors the ApiKey snapshot shape returned by require_api_key() so the combined dependency can hand back one or the other without callers having to discriminate at the type level.

claims: dict[str, Any]
sub: str
protea.api.bearer.assert_bearer_config() None

Fail loudly on startup when auth is on and the secret is missing.

Call from create_app BEFORE the routers are mounted so the process exits with a clear error message rather than 500-ing every bearer request at runtime. When PROTEA_AUTHN_REQUIRED=false we skip the check (dev stacks are allowed to operate without a secret; the gate short-circuits anyway).

protea.api.bearer.decode_bearer_token(token: str) BearerPrincipal

Validate signature + exp and return the principal.

Raises HTTPException 401 on every failure mode (expired, bad signature, missing required claim, malformed). The same status code is used for every cause so the API does not leak which part of the token was rejected.

protea.api.bearer.extract_bearer_token(authorization: str | None) str | None

Return the raw JWT from an Authorization: Bearer <jwt> header.

Any other scheme (ApiKey, Basic, …) returns None so the caller can fall through to the next auth mechanism without swallowing tokens that belong to another dependency.

protea.api.bearer.require_bearer(request: Request) BearerPrincipal | None

Standalone bearer dep — used directly only for tests / dev token.

Production routes use require_api_key_or_bearer() so either scheme is accepted. We read the header off the request directly (instead of a Header() arg) so the dep stays interchangeable with the combined variant.

protea.api.auth_api_keys is the router for managing API key creation and revocation.

/auth/api-keys — manage API keys (T5.6a first iteration).

Three endpoints:

  • POST /auth/api-keys — mint a new key. Returns the raw value exactly once. Subsequent reads only expose the prefix and the name.

  • GET  /auth/api-keys — list keys (prefix + name + state, no secret). Used by an operator dashboard.

  • DELETE /auth/api-keys/{id} — revoke a key (sets revoked_at). Revocation is irreversible; deleting the row outright is out of scope for this iteration (audit trail).

The endpoints themselves are intentionally not guarded by require_api_key in this first iteration: the bootstrap problem (how does the first operator get a key?) is left to a manual SQL insert or to a follow-up admin-token gate in T5.6b. Production deployments should front this router with oauth2-proxy (T5.6c) or a similar trusted layer.

class protea.api.routers.auth_api_keys.CreateApiKeyRequest(*, name: Annotated[str, MinLen(min_length=1), MaxLen(max_length=255)])

Bases: BaseModel

Body for POST /auth/api-keys.

The caller only chooses the human-readable label; entropy is generated server-side so we never trust a client-supplied value.

model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'name': 'lab-runner-2026-05'}}}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
protea.api.routers.auth_api_keys.create_api_key(request: Request, response: Response, body: CreateApiKeyRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Create a fresh API key and return its raw value once.

Response shape:

{
  "id": "<uuid>",
  "prefix": "abc12345",
  "name": "lab-runner-2026-05",
  "key": "<the only chance to copy this>",
  "created_at": "..."
}

The raw key field is the value the caller should store in their secret manager / CI. PROTEA stores only the sha256 hash + the 8-char prefix; we cannot recover the value if it is lost (just mint another and revoke the misplaced one).

protea.api.routers.auth_api_keys.list_api_keys(include_revoked: bool = Query(False), limit: int = Query(50), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]

Return registered API keys newest-first.

The response never includes the secret — only the prefix and the name. Use this endpoint to confirm a key was created or to look up the id of a key you want to revoke.

protea.api.routers.auth_api_keys.revoke_api_key(key_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]

Mark an API key as revoked.

Sets revoked_at to the current UTC timestamp. Subsequent uses of the key are rejected by require_api_key() with a 401. The row is preserved (not deleted) so the audit trail of historical access stays intact.

Request caching and rate limiting

protea.api.cache provides in-process caching utilities for expensive read-only endpoints (showcase statistics, benchmark matrix). Results are stored with a configurable TTL, reducing redundant database queries on frequently-polled pages.

Tiny in-process TTL cache for aggregate API endpoints.

Built for stats/listing endpoints that run DISTINCT-over-JOIN queries on 10M+ row tables: queries that are structurally slow (tens of seconds) and whose results change slowly enough that a 5-minute TTL is not user-visible.

Process-local by design: resets on uvicorn restart, does not need Redis, does not leak across workers. Good enough for a single-instance deployment.

protea.api.cache.cached(key: str, ttl: float, producer: Callable[[], Any], *, serve_stale_on_error: bool = False) Any

Return producer() result, cached under key for ttl seconds.

When serve_stale_on_error is true and the producer raises while a prior value is still in the store (even if expired), return the stale value instead of propagating; lets cold-cache hangs degrade to a slightly out-of-date payload rather than a 500.

protea.api.cache.get_last_known(key: str) Any | None

Return the last cached value for key, ignoring TTL; None if absent.

protea.api.cache.invalidate(key: str | None = None) None

Drop a single key, or the whole cache when key is None.

protea.api.rate_limit configures the slowapi limiter and exposes the per-principal rate-limit rules applied to the five write routes protected by authentication (POST /jobs, POST /datasets, POST /datasets/import-by-reference, POST /reranker-models/import, POST /reranker-models/import-by-reference).

Per-endpoint rate limiting via slowapi (T5.6b).

Three POSTs are throttled out of the box:

  • POST /jobs — 10/min, env PROTEA_RATELIMIT_JOBS

  • POST /auth/api-keys — 5/hour, env PROTEA_RATELIMIT_API_KEYS

  • POST /datasets — 5/min, env PROTEA_RATELIMIT_DATASETS

Environment aware rate limiting

In test/dev environments (PROTEA_ENVIRONMENT=test|dev), rate limits are effectively disabled (set to 9999/hour) to allow integration tests and local iteration without hitting quota walls. Production deployments should leave PROTEA_ENVIRONMENT unset or explicitly set it to “production”.

Key function

Every request is bucketed by:

  1. The ApiKey.prefix if the caller authenticated with an API key (slowapi runs after the dep would store it on request.state).

  2. The sub of the Bearer JWT if authenticated that way.

  3. The remote IP otherwise (unauthenticated requests still get a bucket so a flood of 401s does not amplify into an unbounded workload).

This keeps the buckets attributable: one misbehaving CI job does not collide with another team’s quota.

On 429

slowapi raises RateLimitExceeded which we map to a 429 problem response carrying Retry-After. The body inherits the same application/problem+json shape as the rest of the API.

protea.api.rate_limit.api_keys_limit() str
protea.api.rate_limit.datasets_limit() str
protea.api.rate_limit.install_rate_limiter(app: FastAPI) None

Wire the limiter + middleware + custom handler onto app.

protea.api.rate_limit.jobs_limit() str

Shared dependencies and error handling

protea.api.deps provides FastAPI Depends callables shared across multiple routers: database session injection, current-user extraction, and pagination helpers.

Shared FastAPI dependency functions for all routers.

protea.api.deps.get_amqp_url(request: Request) str
protea.api.deps.get_artifacts_dir(request: Request) Path
protea.api.deps.get_benchmark_config(request: Request) BenchmarkConfig
protea.api.deps.get_operation_registry(request: Request) OperationRegistry
protea.api.deps.get_session_factory(request: Request) sessionmaker[Session]
protea.api.deps.get_settings(request: Request) Settings

Return the application-level Settings from app state.

protea.api.deps.get_user_quota_per_day(request: Request) dict[str, int]

Return the per-user daily quota limit map from app state (FARM-AUTH.7).

protea.api.problem_details implements RFC 7807 application/problem+json error serialisation. Every exception handler in the application calls into this module to produce a consistent {"type", "title", "status", "detail", "instance"} body. Validation errors carry an additional errors array with the offending field paths.

RFC 7807 application/problem+json error responses (T4.4).

Installs FastAPI exception handlers that convert the framework’s default JSON error bodies ({"detail": ...}) into the canonical RFC 7807 shape used by every modern HTTP API. Existing route code keeps raising HTTPException exactly as before — this module only changes how the responses look on the wire.

RFC 7807 fields

  • type — URI reference identifying the problem class. We use

    relative paths under /problems/{slug} so the docs site can host human-readable descriptions per slug.

  • title — short, human-readable summary, stable across responses

    of the same type.

  • status — HTTP status code (mirrors the response status).

  • detail — long-form explanation specific to this occurrence.

  • instance — relative URI of the request that produced the problem.

class protea.api.problem_details.ProblemDetail(*, type: str, title: str, status: Annotated[int, Ge(ge=100), Le(le=599)], detail: str | None = None, instance: str | None = None)

Bases: BaseModel

Pydantic model for an RFC 7807 problem-details payload.

detail: str | None
instance: str | None
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

status: int
title: str
type: str
protea.api.problem_details.install_problem_openapi_schema(app: FastAPI) None

Document RFC 7807 as the default 4xx/5xx schema in the OpenAPI spec.

FastAPI’s stock OpenAPI generator points 4xx/5xx responses at application/json with HTTPValidationError (or nothing). This hook patches every operation so every 4xx/5xx response advertises application/problem+json referencing ProblemDetail, matching what the runtime handlers (installed by register_problem_handlers()) actually emit.

The hook lives behind app.openapi (FastAPI’s documented override point) so re-runs return the cached schema. Idempotent: an existing application/problem+json content entry on a given operation is left in place so endpoints can still document a more specific error shape later.

protea.api.problem_details.register_problem_handlers(app: FastAPI) None

Install RFC 7807 handlers for the standard FastAPI error paths.

Three handlers cover every framework-emitted error:

  1. StarletteHTTPException — covers FastAPI’s HTTPException (which subclasses Starlette’s). Handles 4xx/5xx raised from route bodies + dependency callables.

  2. RequestValidationError — Pydantic body / query / path validation failures (HTTP 422). The original FastAPI body lives under errors so clients can still drill into the per-field details.

  3. Exception — catch-all so unhandled crashes still produce a structured 500 instead of an HTML traceback. detail is intentionally generic: the full traceback is logged via the framework’s normal error path; the wire surface stays opaque.

Middleware

protea.api.middleware.visitor_counter is the WSGI middleware that logs one VisitorEvent row per HTTP GET to a non-asset path. It extracts the client IP, combines it with a daily salt, and stores the first 16 hex characters of the resulting SHA-256 hash.

Anonymous visitor counting middleware.

Records one row per user-visible request into the visitor_event table so that Grafana (or any SQL client) can compute “unique visitors per day” and similar aggregate traffic metrics without storing IP addresses or using cookies.

Privacy design

The client IP is never persisted. Instead, we compute a short hash:

visitor_hash = sha256(daily_salt || client_ip)[:16]

where daily_salt is a 32-byte random value held only in process memory and rotated on every calendar day (UTC). When the day rolls over the old salt is discarded, so cross-day correlation becomes cryptographically infeasible — the same “rotating salt” approach used by Plausible and Fathom.

Noise filters

The middleware is deliberately narrow in scope: it only counts requests that represent actual user navigation. It skips assets, polling endpoints, health probes and metrics scrapes. See _should_record.

class protea.api.middleware.visitor_counter.VisitorCounterMiddleware(app: Callable[[MutableMapping[str, Any], Callable[[], Awaitable[MutableMapping[str, Any]]], Callable[[MutableMapping[str, Any]], Awaitable[None]]], Awaitable[None]])

Bases: BaseHTTPMiddleware

Writes one VisitorEvent row per recorded request.

The session factory is read from app.state.session_factory — set by create_app() at startup. If the factory isn’t present (e.g. during tests that instantiate a bare FastAPI app), the middleware degrades to a no-op so it never breaks the request.

async dispatch(request, call_next)

Metrics router

The /metrics router exposes Prometheus-compatible scrape metrics for the API process. Response time histograms, active-connection gauges, and job-state counters are surfaced at GET /metrics.

Prometheus scrape endpoint (T5.2).

Exposes GET /metrics returning the standard Prometheus text-based exposition format. The collector registry is built once in protea.api.app.create_app() and stashed on app.state.metrics so requests do not pay the registration cost on every scrape.

T5.2 scope is intentionally narrow: the endpoint is always served (so Prometheus can be wired up at deploy time without flipping any feature flag), and the five baseline metrics are registered up-front so they appear in the output even before any sample has been observed. Call sites that increment counters / observe histograms land in follow-up slices.

The protea_db_pool_in_use gauge is refreshed on each scrape by reading the SQLAlchemy pool’s checkedout() count from the session factory’s bound engine. This keeps the gauge accurate without needing event-listener wiring, at the cost of one cheap method call per scrape (typically every 15s).

protea.api.routers.metrics.get_metrics(request: Request) Response

Render the live Prometheus exposition payload.

Returns 503 when the API was booted without the prometheus_client dependency (a minimal worker image, for example). This keeps the endpoint shape stable for Prometheus scrapers, which retry on 5xx, instead of leaking an import error.

Authentication and rate limits

Five POST routes require a credential (T5.6a + T5.6b):

  • POST /v1/jobs

  • POST /v1/datasets

  • POST /v1/datasets/import-by-reference

  • POST /v1/reranker-models/import

  • POST /v1/reranker-models/import-by-reference

Three header forms are accepted, any one of which satisfies the gate:

Authorization: ApiKey <raw_key>
X-Api-Key: <raw_key>
Authorization: Bearer <jwt>

The API key path uses protea.api.auth.require_api_key_or_bearer() (sha256 hash verification). The Bearer path uses HS256 with the PROTEA_JWT_SECRET env var; minimum token claims are sub, iat, and exp. A missing or invalid credential returns 401 with WWW-Authenticate: ApiKey, Bearer. Rate limits on these routes are enforced by slowapi per principal (API-key prefix or JWT sub); exceeding the limit returns 429 with a Retry-After header. See Authentication for the complete auth and rate-limit reference, and Configuration Reference for the PROTEA_AUTHN_REQUIRED, PROTEA_JWT_SECRET, and PROTEA_RATELIMIT_* knobs.

Endpoints summary

Method

Path

Description

Health

GET

/health

Liveness probe: returns 200 if the API process is up.

GET

/health/ready

Readiness probe: verifies database and RabbitMQ connections.

Jobs

POST

/jobs

Create a job and publish its UUID to RabbitMQ.

GET

/jobs

List jobs; filter by status and/or operation. Max 500 rows. Cursor pagination (T4.2): pass after=<created_at> to walk forward past the limit.

GET

/jobs/{id}

Retrieve a single job with full payload and meta.

GET

/jobs/{id}/events

Retrieve the event timeline for a job (up to 2 000 events). Cursor pagination (T4.2): pass after=<ts> to walk forward.

POST

/jobs/{id}/cancel

Transition a QUEUED or RUNNING job to CANCELLED.

DELETE

/jobs/{id}

Delete a job that is not in RUNNING status.

POST

/jobs/{id}/comments

Append a JobComment (T3.10 / D11). Body fields: body (required, non-empty), author (optional). Returns 201.

GET

/jobs/{id}/comments

List the JobComment thread chronologically (created_at ASC, id ASC tiebreaker). Cursor pagination (T4.2): pass after=<created_at> to walk forward past the limit.

Proteins

GET

/proteins/stats

Aggregate protein statistics (total, canonical, reviewed, organisms).

GET

/proteins

List proteins with pagination; filter by organism / reviewed.

GET

/proteins/{accession}

Retrieve a single protein with its UniProt metadata.

GET

/proteins/{accession}/annotations

List GO annotations for a protein across all annotation sets.

Annotations

GET

/annotations/snapshots

List ontology snapshots with GO term counts per aspect.

GET

/annotations/snapshots/{id}

Retrieve a snapshot with its full list of GO terms.

PATCH

/annotations/snapshots/{id}/ia-url

Set the Information Accretion (IA) file URL on an ontology snapshot.

POST

/annotations/snapshots/load

Queue a load_ontology_snapshot job.

GET

/annotations/snapshots/{id}/subgraph

BFS ancestor subgraph for a given set of GO term IDs.

GET

/annotations/sets

List annotation sets with protein GO annotation counts.

GET

/annotations/sets/{id}

Retrieve a single annotation set with summary statistics.

DELETE

/annotations/sets/{id}

Delete an annotation set and all its annotations.

POST

/annotations/sets/load-goa

Queue a load_goa_annotations job.

POST

/annotations/sets/load-quickgo

Queue a load_quickgo_annotations job.

POST

/annotations/evaluation-sets/generate

Queue a generate_evaluation_set job.

GET

/annotations/evaluation-sets

List evaluation sets with summary statistics.

GET

/annotations/evaluation-sets/{id}

Get evaluation set details.

DELETE

/annotations/evaluation-sets/{id}

Delete an evaluation set.

GET

/annotations/evaluation-sets/{id}/ground-truth-NK.tsv

Download NK ground truth in CAFA format.

GET

/annotations/evaluation-sets/{id}/ground-truth-LK.tsv

Download LK ground truth in CAFA format.

GET

/annotations/evaluation-sets/{id}/ground-truth-PK.tsv

Download PK ground truth in CAFA format.

GET

/annotations/evaluation-sets/{id}/known-terms.tsv

Download known terms from old annotation set (for PK evaluation).

GET

/annotations/evaluation-sets/{id}/delta-proteins.fasta

Download delta proteins as FASTA.

POST

/annotations/evaluation-sets/{id}/run

Queue a run_cafa_evaluation job.

GET

/annotations/evaluation-sets/{id}/results

List evaluation results for an evaluation set.

GET

/annotations/evaluation-sets/{id}/results/{rid}/metrics.tsv

Download evaluation metrics as TSV.

GET

/annotations/evaluation-sets/{id}/results/{rid}/artifacts.zip

Download all cafaeval artifacts as a zip.

DELETE

/annotations/evaluation-sets/{id}/results/{rid}

Delete an evaluation result.

Embeddings

GET

/embeddings/configs

List all embedding configurations.

POST

/embeddings/configs

Create a new (immutable) embedding configuration.

GET

/embeddings/configs/{id}

Retrieve an embedding configuration by UUID.

DELETE

/embeddings/configs/{id}

Delete an embedding configuration.

POST

/embeddings/predict

Queue a predict_go_terms job.

GET

/embeddings/prediction-sets

List prediction sets with entry counts.

GET

/embeddings/prediction-sets/{id}

Retrieve a prediction set with summary statistics.

GET

/embeddings/prediction-sets/{id}/proteins

List proteins in a prediction set.

GET

/embeddings/prediction-sets/{id}/proteins/{accession}

Get predictions for one protein.

GET

/embeddings/prediction-sets/{id}/go-terms

GO term distribution in a prediction set.

GET

/embeddings/prediction-sets/{id}/predictions.tsv

Stream all predictions as TSV (filtered by accession / aspect / distance).

GET

/embeddings/prediction-sets/{id}/predictions-cafa.tsv

Download predictions in CAFA submission format.

DELETE

/embeddings/prediction-sets/{id}

Delete a prediction set.

Scoring

GET

/scoring/configs

List scoring configurations.

POST

/scoring/configs

Create a scoring configuration.

POST

/scoring/configs/presets

Create preset scoring configurations.

GET

/scoring/configs/{id}

Retrieve a scoring configuration.

DELETE

/scoring/configs/{id}

Delete a scoring configuration.

GET

/scoring/prediction-sets/{id}/score.tsv

Stream scored predictions as TSV.

GET

/scoring/prediction-sets/{id}/metrics

Compute CAFA-style metrics for scored predictions.

GET

/scoring/prediction-sets/{id}/training-data.tsv

Export labeled training data for the re-ranker.

GET

/scoring/rerankers

List all trained re-ranker models.

GET

/scoring/rerankers/{id}

Retrieve a re-ranker model’s metadata, metrics, and feature importance.

DELETE

/scoring/rerankers/{id}

Delete a trained re-ranker model.

GET

/scoring/prediction-sets/{id}/rerank.tsv

Apply a re-ranker to a prediction set and stream re-scored TSV.

GET

/scoring/prediction-sets/{id}/reranker-metrics

Compute CAFA Fmax and AUC-PR using re-ranker scores.

Query Sets

POST

/query-sets

Upload a FASTA file and create a QuerySet.

GET

/query-sets

List all query sets with entry counts.

GET

/query-sets/{id}

Retrieve a query set with its full entry list.

DELETE

/query-sets/{id}

Delete a query set and all its entries.

Annotate

POST

/annotate

One-click annotation: upload FASTA, auto-run the full pipeline.

Maintenance

GET

/maintenance/vacuum-sequences/preview

Count orphan sequences (preview).

POST

/maintenance/vacuum-sequences

Delete orphan sequences.

GET

/maintenance/vacuum-embeddings/preview

Count unindexed embeddings (preview).

POST

/maintenance/vacuum-embeddings

Delete unindexed embeddings.

Admin

POST

/admin/reset-db

Drop and recreate the public schema (requires admin token).

Showcase

GET

/showcase

Platform statistics and best evaluation results.

Support

GET

/support

Total thumbs-up count and recent comments.

POST

/support

Submit a thumbs-up with optional comment.

Benchmark

GET

/benchmark/embeddings

List embedding configs with persisted display metadata.

GET

/benchmark/matrix

Per-embedding / per-stage Fmax matrix across all evaluation results.

Datasets

POST

/datasets

Enqueue an export_research_dataset job.

POST

/datasets/import-by-reference

Register a Dataset row pointing at already-staged artefacts (no job, no KNN re-run). Requires auth (LB.1).

GET

/datasets

List registered re-ranker datasets. Cursor pagination (T4.2): pass after=<created_at> to walk forward past the limit.

GET

/datasets/{id_or_name}

Get a dataset by id or name.

Plugin Registry

GET

/backends

List installed embedding-backend plugins.

GET

/sources

List installed annotation-source plugins.

GET

/runners

List installed experiment-runner plugins.

Reranker Models

POST

/reranker-models/import

Import a lab-trained booster (multipart).

POST

/reranker-models/import-by-reference

Import a booster already uploaded to the artifact store (JSON).

Stack

GET

/stack

Return the eight-repo PROTEA stack registry.

GET

/stack/pulls

Aggregate open pull requests across every repo in the stack.

Experiment Runs

POST

/experiment-runs

Create an ExperimentRun (T4.7). Body: name required + optional narrative trio + status + JSONB / tags.

GET

/experiment-runs

List experiment runs newest-first; filter by status (T4.8). Cursor pagination (T4.2): pass after=<created_at> from the previous page’s last row.

GET

/experiment-runs/{run_id}

Retrieve one experiment run.

PATCH

/experiment-runs/{run_id}

Partial update (T4.9). Status transitions stamp started_at / finished_at idempotently.

DELETE

/experiment-runs/{run_id}

Delete an experiment run (returns 204).

Request body for POST /jobs

The operation and queue_name fields are required. payload is passed verbatim to the operation’s execute method after Pydantic validation; its schema depends on the operation. meta is stored on the Job row and never interpreted by the API. description and tags are optional D11 narrative fields surfaced on the GET /jobs and GET /jobs/{id} responses; they let any caller attach human intent and ad-hoc grouping tokens at submission time without round-tripping through a separate metadata endpoint.

{
  "operation": "insert_proteins",
  "queue_name": "protea.jobs",
  "payload": {
    "search_criteria": "reviewed:true AND organism_id:9606"
  },
  "meta": {},
  "description": "Backfill reviewed Swiss-Prot for benchmark_v1",
  "tags": ["ablation", "benchmark_v1"]
}

Common payload examples by operation:

{ "operation": "fetch_uniprot_metadata",  "queue_name": "protea.jobs",
  "payload": { "search_criteria": "reviewed:true AND organism_id:9606" } }
{ "operation": "compute_embeddings", "queue_name": "protea.embeddings",
  "payload": { "embedding_config_id": "<uuid>", "sequences_per_job": 64 } }
{ "operation": "predict_go_terms", "queue_name": "protea.predictions",
  "payload": {
    "embedding_config_id": "<uuid>",
    "annotation_set_id": "<uuid>",
    "ontology_snapshot_id": "<uuid>",
    "query_set_id": "<uuid>",
    "limit_per_entry": 5
  }
}

See also

  • Operations: every operation referenced in a payload, with field-level documentation.

  • How-to Guides: concrete curl recipes that submit each endpoint end-to-end.

  • Job Lifecycle: how the API turns a request into a persistent Job row and a queue message.