HTTP API¶
The PROTEA HTTP API is a FastAPI application that exposes a set of routers
under protea/api/routers/ (the live OpenAPI is regenerated into
docs/openapi.json and is authoritative for the exact endpoint list).
All state mutations flow through this layer: it writes Job rows to
PostgreSQL and publishes messages to RabbitMQ. The API is stateless between
requests; the session factory and AMQP URL are injected via app.state
at startup, keeping every router free of global state and infrastructure
imports.
All endpoints return JSON. Error responses follow the RFC 7807
``application/problem+json`` shape (T4.4 / D4): every error body
includes type (relative URI under /problems/{slug},
e.g. /problems/not-found), title (short stable summary),
status (mirror of the HTTP code), and an optional detail +
instance (request URI). Validation errors carry an extra
errors array with the offending field paths. Existing route code
keeps raising HTTPException exactly as before; only the wire
format changed. Timestamps are ISO 8601 UTC strings. UUID identifiers
are lowercase hyphenated strings.
Every client request body is strict (model_config =
ConfigDict(extra="forbid"), PR #215): unknown keys raise a 422
instead of being silently dropped, so {"oepration": "ping"} on
POST /jobs (typo for operation) fails fast against the
schema rather than parsing as if operation were missing. The
contract covers every documented request body
(CreateJobRequest / CreateJobCommentRequest /
ScoringConfigCreate / CreateExperimentRunRequest /
UpdateExperimentRunRequest / CreateDatasetRequest /
ImportDatasetByReferenceRequest /
ImportRerankerByReferenceRequest / SupportCreate); response
models are not constrained because they are server-built and never
parse client input.
Versioning under the /v1/ prefix¶
Every router is mounted twice (T4.1, decision D4):
Canonical under the
/v1/prefix (the first major URL segment): surfaced in OpenAPI / Swagger and the only path schema exporters and codegen tools see. All new clients should target this form.Legacy alias at the root path: the same handler reachable without a prefix,
include_in_schema=Falseso OpenAPI does not advertise it. This exists for the deprecation window so existing frontend, CLI, and CI traffic keeps working without a coordinated cutover.
The endpoint paths in the per-router sections and the Endpoints
summary below are listed without the prefix for terseness; both the
bare and the prefixed paths resolve to the same handler today. Health
endpoints (/health, /health/ready) stay at the root by
convention. When the legacy aliases are retired the second
include_router call in
protea.api.app._register_routers will be removed; this page is
the source of truth for that timing.
Application factory¶
protea.api.app creates the FastAPI application, registers all routers,
and wires the session factory and AMQP URL into app.state at startup.
It also configures CORS and mounts any static middleware.
Application lifecycle and startup stages
protea.api.stages orchestrates the FastAPI lifespan: it opens the
SQLAlchemy engine, publishes the session factory into app.state, and
tears down the AMQP connection pool on shutdown.
Shared stage-classification helpers for the benchmark + showcase routers.
Both routers need to label an EvaluationResult with the
pipeline stage that produced it ("reranker" or whichever
ScoringConfig.name was applied). The logic was duplicated across
both files until this module consolidated it — the inline copy in
showcase.py carried a comment “Matches benchmark.py semantics
without cross-importing”, which is exactly the dispensable-duplication
smell this module fixes.
- protea.api.stages.stage_kind(stage: str) Literal['scoring', 'reranker']¶
Return
"reranker"for the reranker stage,"scoring"otherwise.
- protea.api.stages.stage_of(result: EvaluationResult, scoring_name: str | None) str | None¶
Classify an EvaluationResult into a stage.
Reranker dominates scoring config. Evaluations without either a scoring config or a reranker are considered incomplete and excluded from the matrix (return
None).
Jobs router¶
The /jobs router is the primary interface for job lifecycle management.
Jobs are created by POST /jobs with an operation name, a
queue_name, and an optional JSON payload. The API creates a Job
row in QUEUED status, commits, then publishes the UUID to RabbitMQ
(in that order, so workers always find the row before they try to claim it).
Job status and the structured event timeline can be polled via
GET /jobs/{id} and GET /jobs/{id}/events respectively. The frontend
uses 2-second polling on the events endpoint to render a live progress
timeline.
- class protea.api.routers.jobs.CreateJobCommentRequest(*, body: Annotated[str, MinLen(min_length=1)], author: str | None = None)¶
Bases:
BaseModelBody for
POST /jobs/{job_id}/comments.Curator/operator note attached to a Job (D11 narrative thread). Distinct from machine-emitted
JobEventrows: comments carry an opinionated message and an optionalauthortag. Markdown is permitted inbody; the UI renders the thread chronologically.- author: str | None¶
- body: str¶
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'author': 'frapercan', 'body': 'Re-running with k=10; k=5 hit the variance ceiling on PK.'}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod strip_body(v: str) str¶
- class protea.api.routers.jobs.CreateJobRequest(*, operation: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1)], queue_name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1)], payload: dict[str, ~typing.Any] = <factory>, meta: dict[str, ~typing.Any] = <factory>, description: str | None = None, tags: list[str] = <factory>)¶
Bases:
BaseModelBody for
POST /jobs.Tells PROTEA which registered operation to run (
operation) and which RabbitMQ queue to publish the work onto (queue_name). Thepayloadblob is op-specific; the operation registry validates it on dequeue.description/tagsare the D11 narrative fields surfaced in the UI run detail.- description: str | None¶
- meta: dict[str, Any]¶
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'description': 'Recompute ESM-2 embeddings for the GOA 2024-04 set.', 'meta': {}, 'operation': 'compute_embeddings', 'payload': {'annotation_set_id': '00000000-0000-0000-0000-000000000002', 'batch_size': 1, 'embedding_config_id': '00000000-0000-0000-0000-000000000001'}, 'queue_name': 'protea.embedding', 'tags': ['ablation', 'benchmark-v1']}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- operation: str¶
- payload: dict[str, Any]¶
- queue_name: str¶
- classmethod strip_and_require(v: str) str¶
- tags: list[str]¶
- class protea.api.routers.jobs.JobListFilters(status: str | None, operation: str | None, include_children: bool, parent_job_id: UUID | None, limit: int, after: datetime | None)¶
Bases:
NamedTupleBundle of query-string filters consumed by
GET /jobs.Carries the user-visible knobs so the route handler signature stays under the §3 6-param ceiling. The FastAPI dep
_job_list_filters_depexposes each field as a discrete query parameter on the wire.afteris the cursor token for pagination (T4.2): when set, the list only returns rows strictly older than the given UTC timestamp. Clients page forward by reading thecreated_atof the last row and feeding it back asafter. Microsecond resolution onJob.created_atkeeps tie collisions astronomically rare.- after: datetime | None¶
Alias for field number 5
- include_children: bool¶
Alias for field number 2
- limit: int¶
Alias for field number 4
- operation: str | None¶
Alias for field number 1
- parent_job_id: UUID | None¶
Alias for field number 3
- status: str | None¶
Alias for field number 0
- protea.api.routers.jobs.cancel_job(job_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Mark a job (and any non-terminal child jobs) as CANCELLED.
Already-finished jobs (SUCCEEDED/FAILED) are returned as-is with no state change. Children in QUEUED are cancelled immediately. Children in RUNNING are also marked CANCELLED; the worker’s parent-check in BaseWorker.handle_job() will detect the cancelled parent on the next iteration and stop gracefully.
- protea.api.routers.jobs.create_job(request: Request, response: Response, body: CreateJobRequest, deps: _CreateJobDeps = Depends(dependency=<function _create_job_deps>, use_cache=True, scope=None), principal: ApiKey | BearerPrincipal | None = Depends(dependency=<function require_role.<locals>._gate>, use_cache=True, scope=None)) dict[str, Any]¶
Create a Job row and publish its ID to the specified RabbitMQ queue.
Expensive operations (
export_research_dataset,run_cafa_evaluation) are subject to per-user daily quota limits (FARM-AUTH.7). Admins are exempt. Duplicate POSTs (same operation + payload while the previous job is active) return 409 with the existing job_id (F-OPS-JOBS.1 dedup).
- protea.api.routers.jobs.create_job_comment(job_id: UUID, body: CreateJobCommentRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Append a free-form comment to a Job.
Curators / operators use this thread to record observations, follow-ups, or post-mortems; the worker fleet keeps writing to
JobEventfor machine-emitted progress.
- protea.api.routers.jobs.delete_job(job_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Permanently delete a job and its event log. Running jobs cannot be deleted (409).
- protea.api.routers.jobs.get_job(job_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), registry: OperationRegistry = Depends(dependency=<function get_operation_registry>, use_cache=True, scope=None)) dict[str, Any]¶
Retrieve full details for a single job including its payload, meta, and progress counters.
- protea.api.routers.jobs.get_job_events(job_id: UUID, limit: int = Query(200), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
Return the structured event log for a job (newest first).
Events include progress milestones, warnings, HTTP retries, and errors. Useful for monitoring long-running operations such as compute_embeddings or predict_go_terms.
- protea.api.routers.jobs.list_job_comments(job_id: UUID, limit: int = Query(200), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
Return
JobCommentrows for a Job, oldest first.Use
afterto page forward (the comment thread grows oldest → newest, so cursor semantics flip vs. the newest-first lists).limitcaps each page at 2000 to keep payloads bounded.
- protea.api.routers.jobs.list_jobs(filters: JobListFilters = Depends(dependency=<function _job_list_filters_dep>, use_cache=True, scope=None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), registry: OperationRegistry = Depends(dependency=<function get_operation_registry>, use_cache=True, scope=None)) list[dict[str, Any]]¶
List jobs with optional filtering.
By default only top-level jobs (no parent) are returned. Set
include_children=trueor filter byparent_job_idto see batch sub-jobs from distributed pipelines. Filters travel as discrete query parameters on the wire; the dependency bundles them intoJobListFiltersfor the handler.
Proteins router¶
The /proteins router provides read access to the protein and sequence
catalogue. Proteins are not created directly through this router; they are
inserted asynchronously by the insert_proteins operation. The router
exposes list and detail endpoints with filtering by organism and review
status.
- protea.api.routers.proteins.get_protein(accession: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Full details for one protein: core fields, UniProt functional metadata, embedding count, GO annotation count, and accessions of known isoforms (if canonical).
- protea.api.routers.proteins.get_protein_annotations(accession: str, annotation_set_id: str | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
Return all GO term annotations for a protein, joined with term details and annotation set source. Optionally filter to a specific annotation set by UUID.
- protea.api.routers.proteins.get_protein_stats(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Return aggregate counts: total proteins, canonical vs isoforms, reviewed, and how many have metadata, embeddings, or GO annotations.
Cached for 5 minutes: the DISTINCT-over-JOIN counts scan 4M–80M rows and take 30+ seconds to run from scratch. Counts move slowly enough that a 5-min staleness is invisible to users.
Serves the last-known value when the recompute fails (DB blip, query timeout) so the page never blocks on a cold-cache 500. The startup hook in
protea.api.appprewarms this key and a background task refreshes it before expiry so users never hit a cold path under normal operation.
- protea.api.routers.proteins.list_proteins(search: str | None = Query(None), reviewed: bool | None = Query(None), canonical_only: bool = Query(True), limit: int = Query(50), offset: int = Query(0), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Paginated protein listing with optional full-text search across accession, entry name, gene name, and organism.
- protea.api.routers.proteins.prewarm_protein_stats(factory: sessionmaker[Session]) dict[str, Any]¶
Recompute and store
proteins:stats; used by the app startup hook and the background refresh loop. Always bypasses the existing entry so the cache is refilled with fresh counts before the old TTL expires.
Annotations router¶
The /annotations router exposes the GO ontology and annotation set data.
It provides:
Ontology snapshot listing and detail, including GO term counts per aspect.
Annotation set listing and detail.
A BFS ancestor subgraph endpoint (
GET /annotations/snapshots/{id}/subgraph) that returns the ancestor closure for a given set of GO term IDs within a snapshot. Used by the frontend to render the GO hierarchy for a prediction result.
The annotations router is split into four sub-modules, each handling one endpoint group.
Evaluation-result endpoints: list / metrics TSV / artifacts ZIP / delete.
- protea.api.routers.annotations.evaluation_results.delete_evaluation_result(eval_id: UUID, result_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) None¶
Delete one evaluation result row plus its stored artefacts.
Two-step: the ORM cascade clears the DB row first (collecting the artifact keys); the artifact store
delete()is then issued for each key outside the session so a network failure here does not leave the DB inconsistent. Returns204on success,404if the result is unknown.
- protea.api.routers.annotations.evaluation_results.download_evaluation_artifacts(eval_id: UUID, result_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse¶
Stream all stored cafaeval artefacts for one evaluation result as a ZIP.
Bundles the per-result outputs (raw predictions, CAFA scoring TSVs, plot images) that the artifact store keeps under the result’s prefix. Returns
404if the result is unknown or has no artefacts persisted.
- protea.api.routers.annotations.evaluation_results.download_evaluation_metrics(eval_id: UUID, result_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse¶
Stream the per-aspect metrics table for one evaluation result.
Renders the cafaeval-style summary (Fmax, Smin, AUPRC) as TSV with one row per CAFA aspect (
BPO/MFO/CCO). Returns404if the(eval_id, result_id)pair does not match.
- protea.api.routers.annotations.evaluation_results.list_evaluation_results(eval_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
List every cafaeval result row attached to one evaluation set.
Each row carries the prediction-set / scoring-config / reranker triple used to produce it plus the cached metrics summary, so the UI benchmark matrix can render without per-row drilldowns. Returns
404if the evaluation set itself is missing.
Embeddings router¶
The /embeddings router manages embedding configurations and prediction
sets. Embedding configurations are immutable recipes: once created, they
can be referenced by any number of embedding computation and prediction
jobs. Creating a new configuration with different parameters produces a
new UUID, preserving reproducibility.
Prediction sets are created by submitting a predict_go_terms job and
are queryable once the job completes. The
GET /embeddings/prediction-sets/{id}/predictions.tsv endpoint streams
prediction results as a tab-separated file (32 columns including re-ranker
features) using StreamingResponse with yield_per(1000), avoiding
loading the full result set into memory.
- protea.api.routers.embeddings.create_embedding_config(body: dict[str, ~typing.Any], factory: ~sqlalchemy.orm.session.sessionmaker[~sqlalchemy.orm.session.Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Create a new EmbeddingConfig that defines the model, layer selection, pooling strategy, and chunking.
This config is referenced by compute_embeddings jobs and predict_go_terms jobs to ensure query and reference embeddings were produced under identical settings.
- protea.api.routers.embeddings.delete_embedding_config(config_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Delete an EmbeddingConfig and cascade-delete all linked embeddings, prediction sets, and predictions.
- protea.api.routers.embeddings.delete_prediction_set(set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Delete a prediction set and all its GOPrediction rows.
- protea.api.routers.embeddings.download_predictions_cafa(set_id: UUID, eval_id: UUID | None = Query(None), aspect: str | None = Query(None), max_distance: float | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse¶
Stream predictions in CAFA format:
protein_accession\tgo_id\tscore.Score is computed as
max(0.0, 1.0 - distance)so that closer neighbours receive higher confidence scores in the [0, 1] range expected by the CAFA evaluator. One row per (protein, GO term) pair; duplicate GO terms for the same protein are deduplicated keeping the highest score (lowest distance).Pass
eval_idto restrict output to delta proteins only (NK + LK targets), which is required for a valid CAFA evaluation.
- protea.api.routers.embeddings.download_predictions_tsv(set_id: UUID, accession: str | None = Query(None), aspect: str | None = Query(None), max_distance: float | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) StreamingResponse¶
Stream all GO predictions for a prediction set as a tab-separated file.
Each row is one (protein, GO term, reference protein) triple. Columns include embedding distance, GO term metadata, annotation fields, and optional alignment and taxonomy features (columns are present but empty when not computed).
Optional filters:
accession,aspect(F/P/C),max_distance.The response streams rows directly from the database; suitable for large prediction sets without loading everything into memory.
- protea.api.routers.embeddings.get_embedding_config(config_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Retrieve a single EmbeddingConfig with its total stored embedding count.
- protea.api.routers.embeddings.get_go_term_distribution(set_id: UUID, limit: int = Query(50), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Return the most frequently predicted GO terms grouped by aspect (F/P/C) and the total prediction counts per aspect.
- protea.api.routers.embeddings.get_prediction_set(set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Retrieve a prediction set with total prediction count and per-protein GO term counts.
- protea.api.routers.embeddings.get_protein_predictions(set_id: UUID, accession: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
Return all predicted GO terms for a protein in a prediction set, sorted by distance (nearest first). Includes GO term details plus optional alignment (NW/SW) and taxonomy fields when computed.
- protea.api.routers.embeddings.list_embedding_configs(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
List all embedding configurations with their stored embedding counts, newest first.
The per-config GROUP BY over a 4M-row table is cached 5 minutes; new configs still appear immediately (they have 0 embeddings), only the counts are stale. Serves the last-known value when the recompute fails (DB blip, query timeout) so the page never blocks on a cold-cache 500.
- protea.api.routers.embeddings.list_prediction_set_proteins(set_id: UUID, search: str | None = Query(None), limit: int = Query(50), offset: int = Query(0), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Paginated list of proteins in a prediction set with their predicted GO count, minimum distance, known annotation count, and how many predictions match known annotations (precision proxy).
- protea.api.routers.embeddings.list_prediction_sets(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
List the 100 most recent prediction sets, cached 5 min.
The DISTINCT-over-JOIN against prediction_set + embedding_config + annotation_set + ontology_snapshot scans tens of millions of rows on cold cache (115s+ measured). The startup hook in
protea.api.appprewarms this key and a background task refreshes it before expiry so users never hit a cold path under normal operation. Serves the last-known value on producer failure to prevent a DB blip from surfacing as a 500.
- protea.api.routers.embeddings.predict_go_terms(body: dict[str, ~typing.Any], factory: ~sqlalchemy.orm.session.sessionmaker[~sqlalchemy.orm.session.Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]¶
Queue a predict_go_terms job that runs KNN-based GO term transfer.
The coordinator partitions query proteins into batches, each dispatched to protea.predictions.batch workers for KNN search (numpy or FAISS) + GO annotation transfer. Results are written to a new PredictionSet via protea.predictions.write workers.
Required body fields: embedding_config_id, annotation_set_id, ontology_snapshot_id. Optional: query_set_id (FASTA upload), limit_per_entry, distance_threshold, batch_size, search_backend. Feature-engineering flags default to True: compute_alignments, compute_taxonomy, compute_reranker_features all run unless explicitly set to false. aspect_separated_knn defaults to true (one KNN index per GO aspect to guarantee BPO/MFO/CCO coverage even when unified nearest neighbours carry only one aspect).
- protea.api.routers.embeddings.prewarm_embedding_configs(factory: sessionmaker[Session]) list[dict[str, Any]]¶
Recompute and store
embeddings:configs; used by the app startup hook and the background refresh loop. Always bypasses the existing entry so the cache is refilled with fresh counts before the old TTL expires.
- protea.api.routers.embeddings.prewarm_prediction_sets(factory: sessionmaker[Session]) list[dict[str, Any]]¶
Recompute and store
embeddings:prediction-sets; used by the app startup hook and the background refresh loop. Always bypasses the existing entry so the cache is refilled with fresh data before the old TTL expires.
Scoring router¶
The /scoring router exposes scoring configurations, the training-data
export, and read-only endpoints for applying LightGBM re-ranker models.
In-process re-ranker training was retired in F0/T0.6: boosters are now
trained offline in protea-reranker-lab and registered through the
Reranker models router (POST /reranker-models/import).
Key endpoints:
GET /scoring/prediction-sets/{id}/training-data.tsv: generates a 31-column TSV with binary labels from temporal ground truth, consumed byprotea-reranker-labto fit a booster.GET /scoring/rerankers/GET /scoring/rerankers/{id}/DELETE /scoring/rerankers/{id}: read/delete operations for registered re-ranker models. Creation lives atPOST /reranker-models/import.GET /scoring/prediction-sets/{id}/rerank.tsv: applies a trained re-ranker to a prediction set, streaming re-scored predictions.GET /scoring/prediction-sets/{id}/reranker-metrics: computes CAFA-style Fmax and AUC-PR using re-ranker probability scores.
Query sets router¶
The /query-sets router handles user-uploaded FASTA files. On
POST /query-sets, the server parses the multipart upload, creates a
QuerySet row, upserts one Sequence row per unique amino-acid string
(deduplicating by MD5 hash), and creates QuerySetEntry rows preserving
the original FASTA headers. The returned query set ID can then be referenced
in compute_embeddings and predict_go_terms job payloads.
- async protea.api.routers.query_sets.create_query_set(file: UploadFile, name: str = Form(PydanticUndefined), description: str | None = Form(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Upload a FASTA file and create a QuerySet.
Each sequence in the FASTA is stored (or reused if already present) in the
sequencetable. Aquery_set_entryrow is created per sequence, preserving the original FASTA accession. Duplicate accessions within the same upload are rejected with 422.
- protea.api.routers.query_sets.delete_query_set(query_set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Delete a query set and all its entries. Sequences are not deleted (they may be shared).
- protea.api.routers.query_sets.extract_uniprot_header_metadata(description: str) dict[str, Any]¶
Parse UniProt-style FASTA headers and extract taxonomy fields.
Matches the SwissProt/TrEMBL convention
sp|ACC|NAME OS=<species> OX=<taxid> GN=<gene> PE=<level> SV=<version>. Returns{'taxonomy_id': int | None, 'species': str | None}. Silent no-op for headers that don’t follow the convention; fields simply come back asNone.
- protea.api.routers.query_sets.get_query_set(query_set_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Retrieve a query set with its full entry list (accessions and sequence IDs).
- protea.api.routers.query_sets.list_query_sets(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
List all uploaded FASTA query sets with their entry counts, newest first.
Annotate router¶
The /annotate router provides a one-click annotation endpoint. It accepts
a FASTA file (or raw text), auto-selects the best available embedding config,
annotation set, and ontology snapshot, creates a QuerySet, and queues a
compute_embeddings job. Returns all the IDs the frontend needs to chain
predict_go_terms once embeddings finish.
One-click protein annotation endpoint.
Accepts a FASTA file (or raw text), auto-selects the best available
embedding config, annotation set, and ontology snapshot, creates a
QuerySet, and kicks off compute_embeddings. Returns all the IDs the
frontend needs to chain predict_go_terms once embeddings finish.
- class protea.api.routers.annotate.AnnotateFormOptions(*, compute_reranker_features: bool = True)¶
Bases:
BaseModelUser-controllable feature flags for the quick-annotation endpoint.
These fields map 1:1 to the
predict_go_termscoordinator payload so the frontend can expose them directly without an intermediate translation.- compute_reranker_features: bool¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async protea.api.routers.annotate.annotate(file: UploadFile | None = None, fasta_text: str | None = Form(None), name: str = Form(Quick annotation), compute_reranker_features: bool = Form(True), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]¶
One-click annotation: upload FASTA, auto-select best method, run pipeline.
Accepts either an uploaded FASTA
fileor rawfasta_text. Creates a QuerySet, picks the best embedding config (or creates the default ESM-2 650M config), and queues acompute_embeddingsjob.Returns the IDs the frontend needs to monitor progress and chain
predict_go_termsonce embeddings are ready.compute_reranker_featurescontrols whether the reranker feature families (lineage, anc2vec, anc2vec_query, emb_pca, annotation_meta) are included in the downstreampredict_go_termsjob. Default:True.
Maintenance router¶
The /maintenance router provides housekeeping endpoints for identifying
and removing orphaned data. Two pairs of preview/execute endpoints handle
orphan sequences (not referenced by any Protein or QuerySetEntry) and
unindexed embeddings (for sequences not referenced by any Protein).
Preview endpoints are read-only; execute endpoints perform the actual deletion.
- protea.api.routers.maintenance.preview_orphan_sequences(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Count orphan sequences without running the delete.
A sequence is orphaned when it has no Protein rows pointing to it AND no QuerySetEntry rows pointing to it.
- protea.api.routers.maintenance.preview_unindexed_embeddings(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Count embeddings for sequences not referenced by any Protein.
These are embeddings computed for query proteins (QuerySet uploads) or orphan sequences. They are safe to delete once predictions have been run.
- protea.api.routers.maintenance.vacuum_embeddings(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Delete embeddings for sequences not referenced by any Protein.
Destructive: removes rows from
sequence_embedding. Gated toadminso the embedding corpus (expensive to recompute on GPUs) cannot be wiped by an operator key. Safe to run once predictions have been generated; query-protein embeddings are only needed during the prediction job itself.
- protea.api.routers.maintenance.vacuum_sequences(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Delete sequences not referenced by any Protein or QuerySetEntry.
Destructive: removes rows from
sequence. Gated toadminso a compromised operator key cannot reduce the corpus. Orphan sequences have no embeddings reachable from any active protein or query set, but the deletion is permanent and feeds into downstream foreign keys, so it stays on the admin floor with the other DB-mutating housekeeping operations.
Admin router¶
The /admin router exposes destructive administrative operations.
Currently provides POST /admin/reset-db, which drops and recreates
the public schema and re-applies all Alembic migrations. Protected by the admin role via require_role() (FARM-AUTH.4).
- class protea.api.routers.admin.DlqPurgeRequest(*, operation: str | None = None, first_death_queue: str | None = None, dry_run: bool = False, max_messages: int = 10000)¶
Bases:
BaseModelFilter for DLQ messages to permanently discard.
- dry_run: bool¶
- first_death_queue: str | None¶
- max_messages: int¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- operation: str | None¶
- class protea.api.routers.admin.DlqReplayRequest(*, operation: str | None = None, first_death_queue: str | None = None, target_queue: str | None = None, dry_run: bool = False, max_messages: int = 1000)¶
Bases:
BaseModelFilter for DLQ messages to re-enqueue back onto their source queue.
- dry_run: bool¶
- first_death_queue: str | None¶
- max_messages: int¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- operation: str | None¶
- target_queue: str | None¶
- protea.api.routers.admin.get_dlq_summary(_principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)], amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None), max_peek: int = Query(500)) dict[str, Any]¶
Grouped count of dead-letter messages by operation, source queue, and age.
Peeks up to
max_peekmessages fromprotea.dead-letterwithout consuming them, groups them by{operation, first_death_queue, age_bucket}, and re-queues all peeked messages before returning. The DLQ depth is unchanged after this call.
- protea.api.routers.admin.purge_dlq(body: ~protea.api.routers.admin.DlqPurgeRequest, _principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)], amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]¶
Discard DLQ messages matching the filter.
Matching messages are acked (permanently removed from the DLQ). Non-matching messages remain in the DLQ.
dry_run=Truereports how many messages would be purged without removing them. Always prefer a dry-run first.
- protea.api.routers.admin.replay_dlq(body: ~protea.api.routers.admin.DlqReplayRequest, _principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)], amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]¶
Re-enqueue DLQ messages matching the filter.
Matching messages are published back to their original source queue (or
target_queueif specified) and acked from the DLQ. Non-matching messages remain in the DLQ.dry_run=Truereports how many messages would be replayed without actually moving them.
- protea.api.routers.admin.reset_db(request: ~starlette.requests.Request, _principal: ~typing.Annotated[~protea.infrastructure.orm.models.api_key.ApiKey | ~protea.api.bearer.BearerPrincipal | None, ~fastapi.params.Depends(dependency=~protea.api.roles.require_role.<locals>._gate, use_cache=True, scope=None)]) dict¶
Drop and recreate the public schema, then re-apply all Alembic migrations.
Requires an authenticated
adminprincipal (FARM-AUTH.4) plus the extra destructive-op guards in_authorize_reset_db().
Showcase router¶
The /showcase router aggregates platform statistics and best evaluation
results for the landing page. Returns protein counts, embedding counts,
prediction counts, best Fmax per aspect per evaluation category (NK/LK/PK),
and a method comparison table, all in a single JSON response.
Showcase endpoint: aggregates platform stats and the single best evaluation result with full embedding attribution.
Unlike protea.api.routers.benchmark, which exposes the full per-model
per-stage matrix, this module is deliberately minimal: it returns one
“spotlight” result that the Home page can use for its hero card, plus the
pipeline stage counts.
Background¶
The previous implementation collapsed every evaluation into three method
buckets (knn_baseline / knn_scored / knn_reranker) and took the
maximum Fmax across all embeddings in each bucket. That hid which concrete
embedding won a given cell, and silently dropped losing embeddings from the
UI entirely. With the introduction of the 8-model benchmark, that collapse
is actively misleading; so this endpoint now returns a single named winner
and a link to /benchmark for the full matrix.
- protea.api.routers.showcase.get_showcase(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Aggregate pipeline stage counts and return the single best evaluation result (by mean Fmax across the 9 cells) along with the embedding that produced it.
Empty-state contract:
bestisNonewhen noEvaluationResultexists;pipeline_stagesalways returns the same five entries withcount = 0for unpopulated stages;countsalways returns the same keys.
Support router¶
The /support router handles community feedback. GET /support returns
the total thumbs-up count and recent comments. POST /support submits a
new thumbs-up with an optional comment (max 500 characters).
- class protea.api.routers.support.SupportCreate(*, comment: str | None = None)¶
Bases:
BaseModelBody for
POST /support.A thumbs-up may carry an optional free-form
comment. The text is capped atapi.max_comment_lengthfrom the tuning config; longer submissions are rejected with 422 rather than silently truncated.- comment: str | None¶
- classmethod comment_within_limit(v: str | None) str | None¶
- model_config = {'extra': 'forbid'}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- protea.api.routers.support.get_support(all_comments: bool = Query(False), factory=Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Return total thumbs-up count and comments.
Pass
all_comments=trueto get all comments (up to the configured page limit) instead of the recent_limit most recent.
- protea.api.routers.support.post_support(body: SupportCreate, factory=Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Submit a thumbs-up with an optional comment.
Benchmark router¶
The /benchmark router powers the per-PLM comparison grid in the UI.
Where /showcase collapses every model into a few buckets and reports
the maximum, this router preserves which embedding produced each number
and which scoring config was used, exposing one stage per distinct
ScoringConfig.name plus an implicit "reranker" stage for evaluations
that used a re-ranker. Stage labels, GO categories, and the baseline tag are
read from protea/config/benchmark.yaml; no hardcoded constants.
Benchmark matrix endpoints.
Exposes a per-embedding, per-stage view of every EvaluationResult in the
database so the UI can render the full PLM comparison grid for the thesis
benchmark.
Where the /showcase endpoint collapses all models into a few method
buckets and takes the maximum across every embedding, this module preserves
which embedding produced each number and which scoring config was
used: one stage per distinct scoring_config.name found in the DB, plus
an implicit "reranker" stage for evaluations that used a reranker.
Zero domain constants are hardcoded here: stage labels, preferred default,
baseline tag, GO categories and aspects all come from
protea/config/benchmark.yaml via BenchmarkConfig. Model display
metadata (display name, family, param count) comes from the dedicated columns
on embedding_config; no HF-name regex heuristics.
Two endpoints are provided:
GET /benchmark/embeddingsOne row per
EmbeddingConfigwith its persisted display metadata.GET /benchmark/matrixOne row per
(embedding_config, evaluation_set, stage, category, aspect)tuple, best-Fmax only. Response also includes:stages: every stage observed in the data (with label/kind)evaluation_sets: per-eval-set metadata (stats, source, obo version)best_per_cell: cross-model winner per (category, aspect) cellwithin the active stage/K filter selection
best_per_cell_global: same shape asbest_per_cellbut ignores theuser’s stage/K filters. Stable across filter changes; the per-cell champion across the entire dataset for the current evaluation set.
categories/aspects: from YAML config
- protea.api.routers.benchmark.get_benchmark_matrix(evaluation_set_id: UUID | None = Query(None), stage: str | None = Query(None), k: int | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), cfg: BenchmarkConfig = Depends(dependency=<function get_benchmark_config>, use_cache=True, scope=None)) dict[str, Any]¶
Return a long-format table with one row per
(embedding_config, evaluation_set, stage, category, aspect)tuple containing the best Fmax / precision / recall observed in the DB, plus per-eval-set metadata and a cross-model leaderboard.
- protea.api.routers.benchmark.list_benchmark_embeddings(factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), cfg: BenchmarkConfig = Depends(dependency=<function get_benchmark_config>, use_cache=True, scope=None)) dict[str, Any]¶
Return every
EmbeddingConfigwith its persisted display metadata.The metadata lives in
embedding_config.display_name / family / param_count: filled at creation time by the seed scripts. No heuristic inference happens here. Configs listed inbenchmark.yaml: hidden_embeddingsare suppressed. Cached for 5 min; the benchmark page is the first router touch on a fresh deploy so cold pg pages push this past several seconds without the cache.
- protea.api.routers.benchmark.prewarm_benchmark_embeddings(factory: sessionmaker[Session]) dict[str, Any]¶
Recompute and store
benchmark:embeddingsfor the lifespan prewarm hook + background refresh loop.
- protea.api.routers.benchmark.prewarm_benchmark_matrix(factory: sessionmaker[Session]) None¶
Pre-warm the benchmark matrix by running the full EvaluationResult scan with no filters. The underlying pg statement is filter-agnostic (stage/K filters are applied in Python on the materialised rows), so one warm pass populates the buffer cache for every filtered variant the UI requests next. The /matrix endpoint itself is not response- cached: pg-pages-hot is enough to keep the live response sub-100ms, and filter combos multiply too much for a useful in-process cache.
Datasets router¶
The /datasets router is the registry for frozen re-ranker training
datasets. POST /datasets enqueues an export_research_dataset job
that runs the KNN + feature pipeline, publishes the
train.parquet / eval.parquet / manifest.json triple to the
configured ArtifactStore (local FS or MinIO), and inserts a
Dataset row once the upload completes. GET /datasets and
GET /datasets/{id_or_name} expose the registry to
protea-reranker-lab’s pull_dataset.py and to UI consumers.
POST /datasets/import-by-reference (LB.1) is the lightweight
registration path for datasets whose artefacts already reside in the
artifact store. The caller supplies the name, storage backend, artifact
URIs, content fingerprints (schema_sha, manifest_sha), and dump
parameters verbatim from the lab’s manifest.json; PROTEA inserts a
Dataset row pointing at those URIs without re-running the KNN
pipeline or enqueueing a job. Typical use cases are: replay after a DB
wipe while artefacts remain in MinIO, lab-side dumps produced before
export_research_dataset existed, and the FARM-EXP.2a
placeholder-digest backfill. Optional FK columns
(embedding_config_id, ontology_snapshot_id) are silently set to
NULL when the referenced row is absent in the local DB, matching the
same defensive pattern used by
POST /reranker-models/import-by-reference. The resulting Dataset
row is content-identical to one produced by an in-PROTEA export; the
only visible difference is meta.imported_by_reference = true.
Frozen re-ranker dataset registry.
POST /datasets enqueues an export_research_dataset job that runs
KNN + feature generation, publishes train/eval/manifest artefacts to the
configured artifact store (local FS or MinIO) and inserts a Dataset
row once the upload completes. The row is the durable handle the lab
uses to pull the exact dump by name or id.
POST /datasets/import-by-reference is the lightweight twin: it
registers a Dataset row pointing at already-staged artefacts (lab
side dump, salvage replay, or any out of band export) without running
the KNN pipeline. The lab uses this for benches it produced locally
before export_research_dataset existed, or for re-imports after a
DB wipe.
GET /datasets and GET /datasets/{id_or_name} expose the registry
for the lab’s pull_dataset.py and for UI consumers.
- class protea.api.routers.datasets.CreateDatasetRequest(*, output_name: Annotated[str, MinLen(min_length=1), MaxLen(max_length=255)], embedding_config_id: Annotated[str, MinLen(min_length=1)], ontology_snapshot_id: Annotated[str, MinLen(min_length=1)], train_versions: Annotated[list[int], MinLen(min_length=2)], test_versions: Annotated[list[int], MinLen(min_length=1)], annotation_source: str = 'goa', k: Annotated[int, Gt(gt=0)] = 5, search_backend: str = 'faiss', compute_alignments: bool = False, compute_taxonomy: bool = False, expand_votes_to_ancestors: bool = False, use_embedding_pca: bool = False)¶
Bases:
BaseModelBody for
POST /datasets.Mirrors the
export_research_datasetoperation payload. The caller does not pick a queue: the dataset export always runs on theprotea.trainingworker (serialized, GPU/RAM-intensive).- annotation_source: str¶
- compute_alignments: bool¶
- compute_taxonomy: bool¶
- embedding_config_id: str¶
- expand_votes_to_ancestors: bool¶
- k: int¶
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'annotation_source': 'goa', 'compute_alignments': True, 'compute_taxonomy': True, 'embedding_config_id': '00000000-0000-0000-0000-000000000001', 'expand_votes_to_ancestors': False, 'k': 5, 'ontology_snapshot_id': '00000000-0000-0000-0000-000000000002', 'output_name': 'bench-v1-K5', 'search_backend': 'faiss', 'test_versions': [230], 'train_versions': [160, 165, 170, 175], 'use_embedding_pca': False}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- ontology_snapshot_id: str¶
- output_name: str¶
- search_backend: str¶
- test_versions: list[int]¶
- train_versions: list[int]¶
- use_embedding_pca: bool¶
- class protea.api.routers.datasets.ImportDatasetByReferenceRequest(*, name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=255)], storage_backend: str = 'local', key_prefix: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=512)], train_uri: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=1024)] = None, eval_uri: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=1024)] = None, manifest_uri: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=1024)], schema_sha: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=16)], manifest_sha: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=64)] = None, k: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)], annotation_source: str = 'goa', n_train_rows: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, n_eval_rows: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, embedding_config_id: str | None = None, ontology_snapshot_id: str | None = None, train_snapshot_pairs: list[str] = <factory>, eval_snapshot_pair: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=64)] = None, producer_version: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=64)] = None, producer_git_sha: ~typing.Annotated[str | None, ~annotated_types.MaxLen(max_length=40)] = None, external_source: str | None = None, meta: dict[str, ~typing.Any] = <factory>, force: bool = False)¶
Bases:
BaseModelBody for
POST /datasets/import-by-reference.The lab calls this when the train / eval parquets and the manifest already live in the artifact store (filesystem dump, MinIO upload from a prior environment, salvage replay, etc.). PROTEA registers a
Datasetrow pointing at those URIs verbatim. No job is enqueued; the artefacts are not re-read or copied.The lab passes the fields it already has from its own
manifest.jsonso the registry row is content-identical to what an in-PROTEAexport_research_datasetrun would have produced.- annotation_source: str¶
- embedding_config_id: str | None¶
- eval_snapshot_pair: str | None¶
- eval_uri: str | None¶
- external_source: str | None¶
- force: bool¶
- k: int¶
- key_prefix: str¶
- manifest_sha: str | None¶
- manifest_uri: str¶
- meta: dict[str, Any]¶
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'annotation_source': 'goa', 'embedding_config_id': 'c0ae5b69-d6dc-41cf-a711-1739d3d2e170', 'eval_snapshot_pair': 'v226-v230', 'eval_uri': 'file:///home/frapercan/Thesis2/repositories/protea-reranker-lab/datasets/bench-v1-K5-v226-lineage-prostt5/eval.parquet', 'external_source': 'protea-reranker-lab@059db19', 'force': False, 'k': 5, 'key_prefix': 'datasets/bench-v1-K5-v226-lineage-prostt5/', 'manifest_sha': 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff', 'manifest_uri': 'file:///home/frapercan/Thesis2/repositories/protea-reranker-lab/datasets/bench-v1-K5-v226-lineage-prostt5/manifest.json', 'n_eval_rows': 1066859, 'n_train_rows': 24351779, 'name': 'bench-v1-K5-v226-lineage-prostt5', 'ontology_snapshot_id': '35c3ad67-3002-47db-8f71-eeed69d22ad6', 'producer_git_sha': '059db1907c5208a965238e8e6682184fb83537be', 'producer_version': '0.8.0', 'schema_sha': '6d97a624b8a7', 'storage_backend': 'local', 'train_snapshot_pairs': ['v220-v226'], 'train_uri': 'file:///home/frapercan/Thesis2/repositories/protea-reranker-lab/datasets/bench-v1-K5-v226-lineage-prostt5/train.parquet'}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- n_eval_rows: int¶
- n_train_rows: int¶
- name: str¶
- ontology_snapshot_id: str | None¶
- producer_git_sha: str | None¶
- producer_version: str | None¶
- schema_sha: str¶
- storage_backend: str¶
- train_snapshot_pairs: list[str]¶
- train_uri: str | None¶
- protea.api.routers.datasets.create_dataset(request: Request, response: Response, body: CreateDatasetRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), amqp_url: str = Depends(dependency=<function get_amqp_url>, use_cache=True, scope=None)) dict[str, Any]¶
Enqueue an
export_research_datasetjob.Returns
{job_id}. PollGET /jobs/{job_id}for status; once the job isSUCCEEDED,GET /datasets/{name}returns the registered row with its artifact URIs.
- protea.api.routers.datasets.download_dataset_artifact(dataset_id: str, artifact: str = Query(PydanticUndefined), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), settings: Settings = Depends(dependency=<function get_settings>, use_cache=True, scope=None)) Response¶
Mint a presigned download URL (MinIO) or stream the file (local).
For
storage_backend=miniothe endpoint 302-redirects to a 15-minute presigned GET URL. Forstorage_backend=localthe artifact bytes are streamed inline. Seedatasets_detail.
- protea.api.routers.datasets.get_dataset(id_or_name: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Resolve a dataset by UUID or by the
nameslug.Tries the UUID path first; on
ValueError(non-UUID input), falls back to thenamecolumn. Returns404if neither resolves. The lab uses the name path so dump callers can refer tobench-v1-K5without juggling UUIDs.
- protea.api.routers.datasets.get_dataset_stats(dataset_id: str, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Return per-aspect protein / GO-term / annotation counts for a dataset.
Reads from
dataset.meta['aspect_stats']when present (populated by backfill or a previous call). On a cache miss the counts are computed live and written back. Seedatasets_detailfor the implementation.
- protea.api.routers.datasets.import_dataset_by_reference(body: ImportDatasetByReferenceRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Register a
Datasetrow whose artefacts are already staged.The lab uploads (or simply leaves on disk) the parquets and
manifest.jsonitself and posts the URIs + manifest fields here. PROTEA persists a Dataset row pointing at those URIs without re-reading the artefacts. Useful for benches the lab produced beforeexport_research_datasetexisted, for replays after a DB wipe, and for thebench-v1-K5-v226-lineage-prostt5LB.1 bootstrap.The optional
embedding_config_idandontology_snapshot_idare resolved against the local DB and NULL’d when missing, so the insert never fails on a stale FK.schema_sha_v2is dual-written when thePROTEA_SCHEMA_SHA_V2_WRITE_ENABLEDflag is on (T1.6).Returns
201with the row’s id + name.409on a duplicatenameunlessforce=truewas passed.
- protea.api.routers.datasets.list_datasets(name_like: str | None = Query(None), embedding_config_id: UUID | None = Query(None), limit: int = Query(50), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
Return registered frozen datasets newest-first.
The lab’s
pull_dataset.pypolls this endpoint to discover dump artefacts produced byexport_research_dataset. Filters narrow by name substring or by sourceembedding_config_id. Pagination is cursor-based (after) plus a hardlimitceiling.
Registry router¶
The /backends, /sources, and /runners endpoints list the plugins
discovered at runtime via importlib.metadata.entry_points for the three
plugin groups: embedding backends, annotation sources, and experiment
runners. The router is intentionally stateless: it re-scans entry points
on every call rather than caching, so a worker that has just been restarted
with a newly-installed extra surfaces in the next request without an API
restart.
Plugin registry endpoints.
Three read-only endpoints listing the plugins discovered at runtime
via importlib.metadata.entry_points:
GET /backends: embedding backend plugins (protea.backends)
GET /sources: annotation source plugins (protea.sources)
GET /runners: experiment runner plugins (protea.runners)
Each response is a flat list of PluginInfo records describing
the entry-point name, class, module path, and any plugin-specific
metadata exposed via attributes (e.g. AnnotationSource.version).
The endpoints are intentionally stateless: they re-scan
entry_points on every call rather than caching, so a worker
that’s just been restarted with a newly-installed extra surfaces in
the next request without an API restart. The scan is cheap (sub-ms
on the working set of ~10 plugins).
- class protea.api.routers.registry.PluginInfo(*, name: str, cls: str, module: str, extra: dict[str, ~typing.Any]=<factory>)¶
Bases:
BaseModelMetadata for one discovered plugin.
- cls: str¶
- extra: dict[str, Any]¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- module: str¶
- name: str¶
- class protea.api.routers.registry.PluginListResponse(*, group: str, plugins: list[PluginInfo])¶
Bases:
BaseModelResponse shape for the three registry endpoints.
- group: str¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- plugins: list[PluginInfo]¶
- protea.api.routers.registry.list_backends() PluginListResponse¶
List all installed embedding backend plugins.
The plugin set depends on which
protea-backends[<extra>]extras are installed (esm, t5, ankh, esm3c). With the default install all four are discoverable; only the ones whose lazy imports succeed atstream_*time will actually run on GPU.
- protea.api.routers.registry.list_runners() PluginListResponse¶
List all installed experiment runner plugins.
Today:
baseline,knn,lightgbm. The latter two are contract-surface stubs until F2A.7 (lab →protea-runners .lightgbmmigration) and F2C.1 (protea-methodextraction) move the real implementations here.
- protea.api.routers.registry.list_sources() PluginListResponse¶
List all installed annotation source plugins.
Today:
goa,quickgo,uniprot(all real after F2A.6-real). Theextra.versionfield surfaces theAnnotationSource.versiondeclared on each plugin (e.g."uniprot-goa","quickgo-rest").
Reranker models router¶
The /reranker-models router accepts boosters trained offline in
protea-reranker-lab (or any compatible trainer) and registers them
in PROTEA. POST /reranker-models/import is the multipart flow:
the lab sends model.txt + spec.yaml + run.json inline and
the server uploads model.txt to the artifact store under
rerankers/<run_id>/. POST /reranker-models/import-by-reference
is the production flow: the lab pre-uploads model.txt to MinIO under
its own key and posts JSON with artifact_uri + run_json +
spec_yaml. Both flows share _register_model so the resulting
RerankerModel row is identical.
Re-ranker model registry.
POST /reranker-models/import accepts a trained booster from the
protea-reranker-lab (or any offline trainer), uploads it to the
configured artifact store, and inserts a RerankerModel row linked
back to the Dataset it was trained on. This replaces the in-PROTEA
LightGBM training path (see Phase 4 of the decoupling plan).
Both multipart and JSON-by-reference flows are supported:
multipart: lab sends
model.txt+spec.yaml+run.jsoninline. Server uploads the booster torerankers/<run_id>/model.txt. Simpler for dev.by-reference: lab pre-uploads
model.txtto MinIO under its own key and POSTs JSON withartifact_uri+run_json+spec_yamltext. Cleaner for prod.
Both flows share _register_model so the DB shape is identical.
- class protea.api.routers.reranker_models.ImportRerankerByReferenceRequest(*, artifact_uri: Annotated[str, MinLen(min_length=1)], spec_yaml: str, run: dict[str, Any], name: str | None = None, dataset_id: str | None = None, external_source: str | None = None, prediction_set_id: str | None = None, evaluation_set_id: str | None = None, force: bool = False)¶
Bases:
BaseModelBody for
POST /reranker-models/import-by-reference.Use this when the lab has already uploaded
model.txtto MinIO under its own key and just needs PROTEA to register the URI.- artifact_uri: str¶
- dataset_id: str | None¶
- evaluation_set_id: str | None¶
- external_source: str | None¶
- force: bool¶
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'artifact_uri': 's3://protea-rerankers/runs/r1/model.txt', 'dataset_id': '00000000-0000-0000-0000-000000000003', 'evaluation_set_id': '00000000-0000-0000-0000-000000000005', 'external_source': 'protea-reranker-lab@cec8ccd', 'force': False, 'name': 'r1-k5-bench-v1', 'prediction_set_id': '00000000-0000-0000-0000-000000000004', 'run': {'feature_schema_sha': 'ab12cd34ef56', 'metrics': {'fmax': 0.5427}, 'run_id': 'r1'}, 'spec_yaml': '# ExperimentSpec contents\nname: r1\nfeature_families: [embedding, alignment]\n'}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str | None¶
- prediction_set_id: str | None¶
- run: dict[str, Any]¶
- spec_yaml: str¶
- protea.api.routers.reranker_models.import_reranker_model_by_reference(body: ImportRerankerByReferenceRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Register a
RerankerModelwhose booster is already in MinIO.The lab uploads the booster directly (faster, no double-hop) and POSTs the URI + run.json + spec.yaml here. Server does not re-read the artifact; it trusts the URI.
- async protea.api.routers.reranker_models.import_reranker_model_multipart(files: _RerankerImportFiles = Depends(dependency=<function _reranker_import_files_dep>, use_cache=True, scope=None), fields: _RerankerImportFields = Depends(dependency=<function _reranker_import_fields_dep>, use_cache=True, scope=None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Upload a trained booster and register a
RerankerModelrow.The three files (
model.txt,spec.yaml,run.json) mirror the artefacts produced byprotea-reranker-labunderruns/<name>/. Wire format unchanged: the FastAPI deps expose every File/Form field as a discrete multipart part.
Stack router¶
The /stack router exposes metadata about the eight-repo PROTEA stack
to the UI. GET /stack returns the registry from
docs/source/_data/stack.yaml. GET /stack/pulls aggregates open
pull requests across every repo in the stack via the GitHub REST API and
caches the result in-process to stay under the unauthenticated 60 req/h
rate limit (set PROTEA_GITHUB_TOKEN to lift to 5000 req/h).
PROTEA stack metadata + cross-repo PR listing.
Two read-only endpoints intended to power the /stack page in the UI:
GET /stackreturns the eight-repo registry fromdocs/source/_data/stack.yaml.
GET /stack/pullsproxies GitHub’s/repos/{owner}/{repo}/pullsendpoint for every repo and aggregates the open PRs into a single list. Useful when bouncing between repositories during review.
The PR listing is cached in-process for _PULLS_TTL_SECONDS to keep
the unauthenticated GitHub rate limit (60 req/h) from being a problem.
Set PROTEA_GITHUB_TOKEN (or any token in GITHUB_TOKEN /
GH_TOKEN) to lift the limit to 5000 req/h.
- class protea.api.routers.stack.PullRequest(*, repo: str, number: int, title: str, url: str, state: str, draft: bool, author: str | None, created_at: str, updated_at: str, labels: list[str])¶
Bases:
BaseModelOne open PR in the stack as reported by GitHub’s REST API.
Used by the stack landing page’s PR widget; the payload is the intersection of fields the UI actually renders, not a full echo of GitHub’s response.
- author: str | None¶
- created_at: str¶
- draft: bool¶
- labels: list[str]¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- number: int¶
- repo: str¶
- state: str¶
- title: str¶
- updated_at: str¶
- url: str¶
- class protea.api.routers.stack.PullsResponse(*, fetched_at: float, cached: bool, repos_queried: int, pulls: list[~protea.api.routers.stack.PullRequest], rate_limit_remaining: int | None = None, errors: dict[str, str] = <factory>)¶
Bases:
BaseModelAggregated open-PR snapshot across all stack repos.
The handler caches the GitHub query for a few minutes;
cachedflips toTruewhen a response is served from the in-process cache,fetched_atrecords the original wall-clock time, anderrorscarries per-repo failures (e.g. rate-limited, 404).- cached: bool¶
- errors: dict[str, str]¶
- fetched_at: float¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- pulls: list[PullRequest]¶
- rate_limit_remaining: int | None¶
- repos_queried: int¶
- class protea.api.routers.stack.RepoEntry(*, name: str, slug: str, role: str, role_label: str, status: str, summary: str, github_url: str, docs_url: str | None = None, package_url: str | None = None, local_docs_path: str | None = None)¶
Bases:
BaseModelOne repository row in the multi-repo stack landing page.
Each entry represents a sibling git repo (PROTEA itself or one of the plugin/lab packages).
roleis the architectural slot (core,contracts,plugin,lab);statusis a coarse health signal sourced from the repo’s CI / release state.- docs_url: str | None¶
- github_url: str¶
- local_docs_path: str | None¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- package_url: str | None¶
- role: str¶
- role_label: str¶
- slug: str¶
- status: str¶
- summary: str¶
- class protea.api.routers.stack.StackResponse(*, repos: list[RepoEntry], thesis_pdf_url: str | None = None)¶
Bases:
BaseModelTop-level payload for
GET /stack.Lists every repository in the PROTEA family plus the link to the canonical thesis PDF. Consumed by the frontend’s stack overview page and the docs portal sidebar.
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- thesis_pdf_url: str | None¶
- protea.api.routers.stack.get_stack() StackResponse¶
Return the eight-repo PROTEA stack registry.
Single source of truth:
docs/source/_data/stack.yamlin this repo. Edit that file (and runscripts/sync_stack.py) to refresh the README block and the Sphinx page in the same commit.Per-repo
local_docs_pathand the top-levelthesis_pdf_urlare computed from the filesystem at request time: the field is populated whenever the corresponding artefact has been built intodocs/build/<slug>/html/orapps/web/public/thesis.pdfrespectively, and isNoneotherwise.
- protea.api.routers.stack.list_open_pulls() PullsResponse¶
Aggregate open pull requests across every repo in the stack.
Cached in-process for five minutes. Pass an optional
PROTEA_GITHUB_TOKENenv var to use authenticated requests (rate limit 5000/h instead of 60/h).
Experiment runs router¶
The /experiment-runs router exposes CRUD over the
ExperimentRun ORM (T4.7-T4.9, decision D11). One row aggregates
multiple Job / EvaluationResult / RerankerModel rows
under a unique human name and carries the narrative trio
(description / hypothesis / findings) plus JSONB
config / provenance and Text[] tags.
PATCH /experiment-runs/{run_id} accepts partial updates; status
transitions stamp started_at (on planned → running) and
finished_at (on running → done or → abandoned)
idempotently: re-entering a state never resets its timestamp.
ExperimentRun narrative endpoints (T4.7-T4.9 of master plan v3.2).
Surfaces the ORM created in T3.8 so the F8b Experiments page (T8b.5) and CLI tooling can manage research-run metadata. Schema mirrors the JSON shape exposed by the jobs router for consistency.
Endpoints¶
POST /experiment-runs: create (status=planned).GET /experiment-runs: list, optional status filter.GET /experiment-runs/{id}: fetch one.PATCH /experiment-runs/{id}update narrative + status +provenance overlay; transitions stamp
started_at/finished_atautomatically.
DELETE /experiment-runs/{id}: remove (rare; mostly drafts).
Linkage to Job / EvaluationResult / RerankerModel rows is intentionally out of scope here; the F-EXP campaign work (T-EXP.1-T-EXP.7) defines the join shape once it lands.
- class protea.api.routers.experiment_runs.CreateExperimentRunRequest(*, name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1)], description: str | None = None, hypothesis: str | None = None, config: dict[str, ~typing.Any] = <factory>, provenance: dict[str, ~typing.Any] = <factory>, tags: list[str] = <factory>)¶
Bases:
BaseModelBody for
POST /experiment-runs.Carries the narrative trio (
description/hypothesis/findings) plus structuredconfig+provenanceoverlays that the F-EXP campaign tooling reads back. New rows always start inplannedstatus; transitions happen viaPATCH.- config: dict[str, Any]¶
- description: str | None¶
- hypothesis: str | None¶
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'config': {'K_values': [3, 5, 10], 'embedding_backend': 'esm2'}, 'description': 'Sweep K in {3, 5, 10} on the bench-v1 dataset.', 'hypothesis': 'Larger K hurts PK but is neutral on NK/LK.', 'name': 'ablation-K-2026-05-09', 'provenance': {'campaign': 'bench-v1'}, 'tags': ['ablation', 'K-sweep']}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- provenance: dict[str, Any]¶
- classmethod strip_name(v: str) str¶
- tags: list[str]¶
- class protea.api.routers.experiment_runs.UpdateExperimentRunRequest(*, description: str | None = None, hypothesis: str | None = None, findings: str | None = None, status: ExperimentRunStatus | None = None, config: dict[str, Any] | None = None, provenance: dict[str, Any] | None = None, tags: list[str] | None = None)¶
Bases:
BaseModelAll fields optional; absent ones leave the column untouched.
- config: dict[str, Any] | None¶
- description: str | None¶
- findings: str | None¶
- hypothesis: str | None¶
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'findings': 'Fmax +0.03 on bench-v1-K5 vs K=3; PK regressed -0.005.', 'status': 'done', 'tags': ['ablation', 'K-sweep', 'results-in']}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- provenance: dict[str, Any] | None¶
- status: ExperimentRunStatus | None¶
- tags: list[str] | None¶
- protea.api.routers.experiment_runs.create_experiment_run(body: CreateExperimentRunRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Create a new
ExperimentRunrow inplannedstatus.
- protea.api.routers.experiment_runs.delete_experiment_run(run_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) None¶
Permanently delete a run. Mostly used for cleaning up draft rows.
- protea.api.routers.experiment_runs.get_experiment_run(run_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Fetch a single run by id.
- protea.api.routers.experiment_runs.list_experiment_runs(status: ExperimentRunStatus | None = Query(None), limit: int = Query(50), after: datetime | None = Query(None), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
Return runs newest-first, optionally filtered by status.
Pagination is cursor-based: pass
after=<created_at>to get the next page. Microsecond resolution oncreated_atkeeps tie collisions astronomically rare.
- protea.api.routers.experiment_runs.update_experiment_run(run_id: UUID, body: UpdateExperimentRunRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Patch narrative fields, status, config, provenance, or tags.
Only fields explicitly present in the request body are updated; omitted fields leave the column untouched. Status transitions stamp
started_at/finished_atper the rules in_stamp_status_transition().
Services layer¶
Each router delegates non-trivial business logic to a service module. Services are pure Python: they accept a SQLAlchemy session and return domain objects or raise domain exceptions. Routers map those exceptions to HTTP status codes. This separation allows the same logic to be exercised from CLI tools or batch scripts without importing FastAPI. Full symbol-level documentation lives in Services.
Jobs service: shared helpers for the queue-dispatch pattern.
Multiple routers (annotations, embeddings, predictions) follow the same shape when an HTTP endpoint queues background work:
Pydantic-validate the request body.
Insert a
Jobrow with the canonical operation name + queue.Insert a matching
JobEvent(job.created) for the audit log.Publish to RabbitMQ via
protea.infrastructure.queue.publisher.publish_job().Return
{"id": ..., "status": "queued"}.
This module exposes enqueue_job() (steps 2–3) and the higher-
level dispatch_validated_job() (steps 1–5) so routers collapse
to a single try/except.
- exception protea.services.jobs_service.InvalidJobPayloadError(errors: Any)
Bases:
ExceptionPydantic validation failed for a queue-dispatch request body.
Carries the structured
errorslist produced by Pydantic so the router can pass it through verbatim as the HTTP 422 detail.
- protea.services.jobs_service.compute_dedup_key(operation: str, payload: dict[str, Any]) str
Return a 16-hex-char deduplication key for
(operation, payload).The key is the first 16 hex digits of the SHA-256 of a canonical JSON serialisation of
{"operation": ..., "payload": ...}(keys sorted, ASCII-safe). Truncated to 16 chars (64-bit prefix) — collision probability is negligible for the expected job volume.The 16-char length fits comfortably in
VARCHAR(64)and keeps the partial unique index compact.
- protea.services.jobs_service.dispatch_validated_job(factory: sessionmaker[Session], amqp_url: str, body: dict[str, Any], payload_model: type[BaseModel], *, operation: str, queue_name: str) dict[str, Any]
End-to-end queue dispatch: validate, persist, publish, respond.
Pydantic-validates
bodyagainstpayload_model(raisingInvalidJobPayloadErroron failure for the router to map to a 422), inserts aJob+JobEventpair inside a fresh session, then publishes to RabbitMQ. Returns the canonical{"id": <uuid>, "status": "queued"}response shape used by every dispatch endpoint.
- protea.services.jobs_service.enqueue_job(session: Session, *, operation: str, queue_name: str, payload: dict[str, Any]) UUID
Insert
Job+JobEventrows for a background task.Returns the new job’s UUID. The caller is responsible for publishing to the queue (via
protea.infrastructure.queue.publisher.publish_job()) after committing this session, and for any payload validation that should happen before the row hits the database.Both rows are flushed but not committed; the caller’s
session_scopecontext manager owns the transaction.
Annotations service: pure-logic helpers extracted from
protea.api.routers.annotations.
ORM ↔ dict serialisers and the read-side handlers (snapshot/IA-url operations) live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.
The router translates the domain exceptions raised here to HTTP responses:
EntityNotFoundError→404 Not Found(e.g. anOntologySnapshotorAnnotationSetUUID does not resolve).
- exception protea.services.annotations_service.AnnotationSetReferencedError
Bases:
AnnotationsServiceErrorAn
AnnotationSetcannot be deleted because PredictionSet rows still reference it; the FK CASCADE is intentionally absent. Maps to HTTP 409 at the router boundary.
- exception protea.services.annotations_service.AnnotationsServiceError
Bases:
ExceptionBase class for annotations-service domain errors.
- exception protea.services.annotations_service.EntityNotFoundError(entity: str, entity_id: UUID)
Bases:
AnnotationsServiceErrorGeneric 404; a referenced entity does not exist.
Pickle-safe via
__reduce__so the structuredentity/entity_idattrs survive a round-trip without tripping flake8-bugbear B042.
- protea.services.annotations_service.annotation_set_to_dict(a: AnnotationSet, count: int) dict[str, Any]
Serialise an
AnnotationSetto its API dict shape.
- protea.services.annotations_service.delete_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]
Delete an annotation set and all its annotations.
Returns the deletion summary dict.
Raises:
EntityNotFoundErrorif the UUID does not resolve.AnnotationSetReferencedErrorif a PredictionSet references this set (router maps to 409).
- protea.services.annotations_service.delete_eval_result_collect_keys(session: Session, eval_id: UUID, result_id: UUID) list[str]
Delete the EvaluationResult and return the artifact keys to clean up.
Same split as
delete_evaluation_set_collect_keys(): the DB delete happens here; the artifact-store deletion is the router’s responsibility (it owns theArtifactStorefactory).
- protea.services.annotations_service.delete_evaluation_set_collect_keys(session: Session, eval_id: UUID) list[str]
Delete the EvaluationSet and return the artifact-store keys to clean.
The DB delete cascades to
EvaluationResultrows; this helper walks the results before deleting and returns the union of all artifact keys those rows referenced (per-result cafaeval outputs) so the caller can wipe them from the store. The caller is also expected to delete the set’s ground-truth artifact viaprotea.core.evaluation.groundtruth_key_for(eval_id); that key is not included here because it is a fixed function ofeval_id.Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.evaluation_result_to_dict(r: EvaluationResult) dict[str, Any]
Serialise an
EvaluationResultto its API dict shape.
- protea.services.annotations_service.evaluation_set_to_dict(e: EvaluationSet) dict[str, Any]
Serialise an
EvaluationSetto its API dict shape.
- protea.services.annotations_service.get_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]
Return a single annotation set with its annotation count.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.get_eval_result_with_keys(session: Session, eval_id: UUID, result_id: UUID) tuple[EvaluationResult, list[str]]
Fetch an EvaluationResult belonging to
eval_id; return (row, artifact_keys).Raises
EntityNotFoundError(“EvaluationResult”) when the result does not exist or does not belong toeval_id.
- protea.services.annotations_service.get_evaluation_set_data(session: Session, eval_id: UUID) dict[str, Any]
Return a single evaluation set.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.get_go_subgraph_data(session: Session, snapshot_id: UUID, go_ids: str, depth: int) dict[str, Any]
BFS the GO DAG upward from the requested seed terms.
Returns
{"nodes": [...], "edges": [...]}ready for the API. Each node hasid(DB id),go_id,name,aspect,is_query(True for the seed terms). Each edge hassource(child id),target(parent id),relation_type.Raises
EntityNotFoundErrorwhen the snapshot does not resolve. Imports it lazily to avoid the circular dependency with the re-exportingannotations_servicemodule.
- protea.services.annotations_service.get_snapshot_data(session: Session, snapshot_id: UUID) dict[str, Any]
Return a single snapshot with its GO term count.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.iter_delta_proteins_fasta(session: Session, eval_id: UUID, category: str) list[str]
Return FASTA lines for delta proteins (
nk/lk/pk/all).Only proteins whose sequence is in the DB are emitted. Header is
>ACCESSION entry_name OS=organism OX=taxon (NK|LK|PK); the sequence is wrapped at 60 chars per line.Empty result returns an empty list. Raises
EntityNotFoundErrorif the EvaluationSet does not resolve. Imports it lazily to avoid the circular dependency with the re-exportingannotations_servicemodule.
- protea.services.annotations_service.iter_groundtruth_tsv(session: Session, eval_id: UUID, category: str) list[str]
Return the rows for a CAFA
ground_truth_<CATEGORY>.tsvdownload.categoryis"nk","lk","pk"or"known". Each row is"<protein>\t<go_id>\n"; sorted by protein then GO id so the output is deterministic. The caller wraps the list in aStreamingResponse(the materialised list is small enough, a few thousand rows for typical CAFA splits, to fit in memory and keeps the streaming generator simple).Raises
EntityNotFoundErrorwhen the EvaluationSet does not resolve.
- protea.services.annotations_service.list_annotation_sets_data(session: Session, source: str | None = None) list[dict[str, Any]]
List all annotation sets with their per-set annotation counts (newest first).
Optionally filter by
source(e.g.goaorquickgo). Pure read; the caller caches at the API boundary.
- protea.services.annotations_service.list_evaluation_results_data(session: Session, eval_id: UUID) list[dict[str, Any]]
List EvaluationResult rows for one EvaluationSet (newest first).
Raises
EntityNotFoundErrorwhen the EvaluationSet does not resolve.
- protea.services.annotations_service.list_evaluation_sets_data(session: Session) list[dict[str, Any]]
List all evaluation sets, newest first.
- protea.services.annotations_service.list_snapshots_data(session: Session) list[dict[str, Any]]
Return all loaded snapshots with their GO term counts (newest first).
Pure read; the caller is responsible for caching at the API boundary if desired (the GROUP BY over the multi-million row
go_termtable is the slow part).
- protea.services.annotations_service.render_evaluation_metrics_tsv(result: EvaluationResult, aspect_codes: tuple[str, ...]) Any
Yield TSV rows for the per-(setting, namespace) metrics summary.
The caller passes the aspect-codes tuple (
ASPECT_CAFA_CODES) so the service stays free of the domain layer. Returns a generator suitable forStreamingResponse.
- protea.services.annotations_service.set_snapshot_ia_url(session: Session, snapshot_id: UUID, ia_url: str | None) dict[str, Any]
Update the IA URL on a snapshot. Empty string is treated as
None.Returns a small confirmation dict shape compatible with the legacy endpoint. Raises
EntityNotFoundErrorfor the 404 path. The caller (router) is responsible for validating request body shape (e.g. presence of theia_urlkey) before calling.
- protea.services.annotations_service.snapshot_to_dict(s: OntologySnapshot, term_count: int) dict[str, Any]
Serialise an
OntologySnapshotto its API dict shape.
Embeddings service: pure-logic helpers extracted from
protea.api.routers.embeddings.
Validation rules, ORM ↔ dict serialisers, and the predictions-TSV streaming generator live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.
The router translates the domain exceptions raised here to HTTP responses:
InvalidEmbeddingConfigError→422 Unprocessable Entity(validation errors carry a list of human-readable messages in.errors).EntityNotFoundError→404 Not Found(e.g. aPredictionSetUUID does not resolve).
- exception protea.services.embeddings_service.EmbeddingsServiceError
Bases:
ExceptionBase class for embeddings-service domain errors.
- exception protea.services.embeddings_service.EntityNotFoundError(entity: str, entity_id: UUID)
Bases:
EmbeddingsServiceErrorGeneric 404; a referenced entity does not exist.
Construct with the entity label (e.g.
"PredictionSet") and the looked-up UUID; the message becomes"<entity> not found". Pickle-safe via__reduce__so the structuredentity/entity_idattributes survive a round-trip without tripping flake8-bugbear B042.
- exception protea.services.embeddings_service.InvalidEmbeddingConfigError(errors: list[str])
Bases:
EmbeddingsServiceErrorValidation failure for an EmbeddingConfig request body.
errorscarries a list of human-readable messages, one per failed rule, suitable for inclusion in the HTTP 422 response body.
- exception protea.services.embeddings_service.InvalidUUIDFieldError(field: str)
Bases:
EmbeddingsServiceErrorPredict request body had a field that does not parse as UUID.
Carries the offending field name in
field; the router translates this to422with detail"<field> must be a valid UUID".
- protea.services.embeddings_service.assert_prediction_set_exists(session: Session, prediction_set_id: UUID) None
Raise
EntityNotFoundErrorif the PredictionSet UUID is unknown.
- protea.services.embeddings_service.config_to_dict(c: EmbeddingConfig, embedding_count: int | None = None) dict[str, Any]
Serialise an
EmbeddingConfigORM row to its API dict shape.The
embedding_countfield is only included when the caller has a number to report (the bareGET /configs/{id}endpoint does not).
- protea.services.embeddings_service.delete_embedding_config_cascade(session: Session, config_id: UUID) dict[str, Any]
Cascade-delete an
EmbeddingConfigand all linked rows.Raises
EntityNotFoundErrorwhenconfig_iddoes not resolve. Body lives in_embeddings_admin_helpers.cascade_delete_embedding_config().
- protea.services.embeddings_service.delete_prediction_set_cascade(session: Session, prediction_set_id: UUID) dict[str, Any]
Delete a
PredictionSetand all itsGOPredictionrows.Returns
{"deleted": <id>, "predictions_deleted": <count>}. RaisesEntityNotFoundErrorwhen the UUID does not resolve so the router can translate to 404.
- protea.services.embeddings_service.get_go_term_distribution_data(session: Session, *, prediction_set_id: UUID, limit: int = 50) dict[str, Any]
Return the most-frequent GO terms predicted in this set + per-aspect totals.
Raises
EntityNotFoundErrorwhen the PredictionSet does not resolve. Body lives in_embeddings_admin_helpers.compute_go_term_distribution().
- protea.services.embeddings_service.get_prediction_set_data(session: Session, prediction_set_id: UUID) dict[str, Any]
Retrieve a prediction set with total + per-protein GO term counts.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.embeddings_service.get_predictions_for_protein(session: Session, *, prediction_set_id: UUID, accession: str) list[dict[str, Any]]
Return all predicted GO terms for one protein, sorted by distance.
Raises
EntityNotFoundErrorwhen the PredictionSet does not resolve. (No 404 for unknown accession; returns empty list, matching the legacy endpoint’s behaviour.)
- protea.services.embeddings_service.iter_predictions_cafa_tsv(factory: sessionmaker[Session], *, prediction_set_id: UUID, aspect: str | None, max_distance: float | None, delta_proteins: set[str] | None) Iterator[str]
Stream the CAFA-format prediction TSV.
DB-level deduplication: a
GROUP BY (protein_accession, go_term_id)+MIN(distance)subquery keeps the best row per pair so the Python side never needs an unboundedseenset; true streaming. Score ismax(0.0, 1.0 - distance)clamped to[0, 1].
- protea.services.embeddings_service.iter_predictions_tsv(factory: Any, *, prediction_set_id: UUID, accession: str | None = None, aspect: str | None = None, max_distance: float | None = None) Iterator[str]
Yield TSV rows (as
str) of every GOPrediction in a set.Opens its own session inside the generator so the caller’s existence-check session can close cleanly. The first yielded chunk is the header line; one row per
(GOPrediction, GOTerm)pair follows, ordered by(protein_accession, distance).Optional filters:
accession(single query protein),aspect(F/P/C),max_distance.
- protea.services.embeddings_service.list_prediction_sets_data(session: Session) list[dict[str, Any]]
Top 100 most-recent
PredictionSetrows joined with their context.Returns a list of dicts each carrying the embedding-config name, annotation-set label, ontology version, plus the per-set
prediction_count. The per-set count comes from a singleGROUP BYover GOPrediction (one index-only scan) rather than a correlated subquery; for ~10⁷-row tables Postgres’ planner falls into a per-row index probe with the correlated form (~30s per outer row). The grouped form returns all 100 counts at once.
- protea.services.embeddings_service.list_proteins_in_prediction_set(session: Session, *, prediction_set_id: UUID, search: str | None = None, limit: int = 50, offset: int = 0) dict[str, Any]
Paginated list of proteins in a prediction set with derived stats.
For each row returns
go_count(number of predicted terms),min_distance(closest neighbour),annotation_count(known annotations against the same AnnotationSet) andmatch_count(predictions whose(protein, go_id)is in the known set; a precision proxy).Decomposed into private helpers (
_paginate_protein_rows,_load_protein_orm_map,_load_annotation_counts,_load_match_counts) so this orchestrator stays under the §3 method-LOC ceiling.Raises
EntityNotFoundError(imported lazily to avoid the circular dependency withembeddings_service) whenprediction_set_iddoes not resolve.
- protea.services.embeddings_service.prepare_cafa_export(session: Session, *, prediction_set_id: UUID, eval_id: UUID | None) set[str] | None
Preflight CAFA export: validate the PredictionSet exists and, if an
EvaluationSetwas supplied, compute the union of NK + LK delta proteins to restrict the export.Returns the delta-protein accession set when
eval_idis provided (the streaming generator filters on it), otherwiseNone.Raises
EntityNotFoundErrorfor missing PredictionSet or EvaluationSet so the router can translate to 404.
- protea.services.embeddings_service.validate_embedding_config_body(body: dict[str, Any]) dict[str, Any]
Validate a request body for
POST /embeddings/configs.Returns the canonicalised dict (defaults filled in) on success. Raises
InvalidEmbeddingConfigError(imported lazily to avoid the circular dep withembeddings_service) with the full list of failures otherwise; the router translates that to a 422 with the same shape it produced before extraction.Decomposed into per-field-group helpers so neither this orchestrator nor any helper breaches the 60-LOC method ceiling.
- protea.services.embeddings_service.validate_predict_request(session: Session, body: dict[str, Any]) dict[str, UUID]
Parse + validate the three required UUID fields of a predict request.
Returns a dict mapping field name to its parsed
uuid.UUID. RaisesInvalidUUIDFieldErrorfor parse failures (router → 422) orEntityNotFoundErrorif a referenced entity does not exist (router → 404). Field order is preserved so the first failure wins, matching the previous in-router behaviour.
Authentication helpers
protea.api.auth implements the credential-verification layer. It
exposes require_api_key_or_bearer, a FastAPI dependency that accepts
three header forms (Authorization: ApiKey, X-Api-Key, or
Authorization: Bearer). The API-key path computes a SHA-256 hash of
the raw key and compares it against the database; the Bearer path verifies
an HS256 JWT. A missing or invalid credential returns 401 with a
WWW-Authenticate challenge.
API key authentication primitives (T5.6a — first iteration).
This module owns:
the constant-time helper functions that hash and verify a raw API key,
the FastAPI dependency
require_api_key(),the small set of env knobs that gate the dependency in dev.
Header format¶
Two equivalent header shapes are accepted (mirroring the conventions used by most public APIs):
Authorization: ApiKey <key>X-Api-Key: <key>
Both are checked; whichever arrives first wins. The dependency returns
the matched ApiKey row so downstream handlers can audit the
caller (currently unused, but the hook is in place).
Hashing¶
Keys are stored as sha256 hex digests. sha256 is fine here because the
raw key has 32 bytes of entropy already (192 bits in base64, well above
what an offline brute-force can hope to crack). We use
hmac.compare_digest() for the verification step to avoid timing
side-channels on the hash comparison.
Env knobs¶
PROTEA_AUTHN_REQUIRED(defaulttrue) — when false, the dependency short-circuits and waves every request through. Useful for local development; production deployments must leave it set.
- protea.api.auth.generate_raw_key() str¶
Generate a fresh random API key (43 url-safe chars, 192 bits).
The returned string is the value handed to the caller exactly once. Hash + prefix are derived from it via
hash_key()/prefix_of().
- protea.api.auth.hash_key(raw: str) str¶
Return the sha256 hex digest used for the
key_hashcolumn.Wrapper exists so the algorithm can be swapped later (Argon2id, for instance, if we ever move to short user-chosen secrets) without grepping the codebase.
- protea.api.auth.prefix_of(raw: str) str¶
Return the first
PREFIX_LENcharacters ofraw.Used as the display handle in API responses and as the indexed lookup column on the
api_keytable.
- protea.api.auth.require_api_key(request: Request, background_tasks: BackgroundTasks, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), authorization: str | None = Header(None), x_api_key: str | None = Header(None)) ApiKey | None¶
FastAPI dependency that validates an API key on the request.
Behaviour:
If
PROTEA_AUTHN_REQUIREDis falsy, returnNone— gate disabled (dev stack only).Read the raw key from
Authorization: ApiKey <key>orX-Api-Key: <key>. Missing → 401.Look up by the 8-char
prefix(indexed) and compare hashes in constant time. Mismatch or revoked → 401.Schedule a background
last_used_atupdate so the request is not blocked on the write.
The matched
ApiKeysnapshot is returned to the route handler for downstream audit. Routes that wire this as a router-level dependency typically ignore the return value.
- protea.api.auth.require_api_key_or_bearer(request: Request, background_tasks: BackgroundTasks, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None), authorization: str | None = Header(None), x_api_key: str | None = Header(None)) ApiKey | BearerPrincipal | None¶
Accept either an API key or a Bearer JWT on the same route.
Resolution order:
PROTEA_AUTHN_REQUIREDfalsy → short-circuit, returnNone.Authorization: Bearer <jwt>→ validate JWT, returnBearerPrincipal.Otherwise fall back to the T5.6a ApiKey flow.
Bearer wins when both happen to be present so a misconfigured client cannot downgrade to the weaker scheme by sending both headers. Failure modes return 401 with the matching
WWW-Authenticateheader.
protea.api.bearer provides the HS256 JWT verification utilities used
by auth.require_api_key_or_bearer. Minimum required claims are
sub, iat, and exp.
Bearer JWT authentication (T5.6b — second auth iteration).
Adds an Authorization: Bearer <jwt> flow alongside the API-key
dependency from T5.6a. Both are accepted via the combined dependency
require_api_key_or_bearer().
Algorithm¶
HS256 with a shared secret from
PROTEA_JWT_SECRET.Minimum payload:
{sub, exp, iat}.audandissare accepted if present; we do not validate them in this iteration.On startup, when
PROTEA_AUTHN_REQUIRED=trueANDPROTEA_JWT_SECRETis missing the API process must fail loudly — seeassert_bearer_config()invoked fromcreate_app.
Why HS256 (not RS256)?¶
PROTEA does not issue tokens itself in this slice (T5.6b is consumer only). The thesis dev stack signs tokens out-of-band with a shared secret and the secret is rotated manually. RS256 / OIDC lands in T5.6c (post-defensa) together with the oauth2-proxy fronting layer.
- class protea.api.bearer.BearerPrincipal(sub: str, claims: dict[str, Any])¶
Bases:
objectSubject + raw claims surfaced to handlers that want them.
Mirrors the
ApiKeysnapshot shape returned byrequire_api_key()so the combined dependency can hand back one or the other without callers having to discriminate at the type level.- claims: dict[str, Any]¶
- sub: str¶
- protea.api.bearer.assert_bearer_config() None¶
Fail loudly on startup when auth is on and the secret is missing.
Call from
create_appBEFORE the routers are mounted so the process exits with a clear error message rather than 500-ing every bearer request at runtime. WhenPROTEA_AUTHN_REQUIRED=falsewe skip the check (dev stacks are allowed to operate without a secret; the gate short-circuits anyway).
- protea.api.bearer.decode_bearer_token(token: str) BearerPrincipal¶
Validate signature +
expand return the principal.Raises
HTTPException401 on every failure mode (expired, bad signature, missing required claim, malformed). The same status code is used for every cause so the API does not leak which part of the token was rejected.
- protea.api.bearer.extract_bearer_token(authorization: str | None) str | None¶
Return the raw JWT from an
Authorization: Bearer <jwt>header.Any other scheme (
ApiKey,Basic, …) returnsNoneso the caller can fall through to the next auth mechanism without swallowing tokens that belong to another dependency.
- protea.api.bearer.require_bearer(request: Request) BearerPrincipal | None¶
Standalone bearer dep — used directly only for tests / dev token.
Production routes use
require_api_key_or_bearer()so either scheme is accepted. We read the header off the request directly (instead of aHeader()arg) so the dep stays interchangeable with the combined variant.
protea.api.auth_api_keys is the router for managing API key creation
and revocation.
/auth/api-keys — manage API keys (T5.6a first iteration).
Three endpoints:
POST /auth/api-keys— mint a new key. Returns the raw value exactly once. Subsequent reads only expose the prefix and the name.GET /auth/api-keys— list keys (prefix + name + state, no secret). Used by an operator dashboard.DELETE /auth/api-keys/{id}— revoke a key (setsrevoked_at). Revocation is irreversible; deleting the row outright is out of scope for this iteration (audit trail).
The endpoints themselves are intentionally not guarded by
require_api_key in this first iteration: the bootstrap problem
(how does the first operator get a key?) is left to a manual SQL
insert or to a follow-up admin-token gate in T5.6b. Production
deployments should front this router with oauth2-proxy (T5.6c) or a
similar trusted layer.
- class protea.api.routers.auth_api_keys.CreateApiKeyRequest(*, name: Annotated[str, MinLen(min_length=1), MaxLen(max_length=255)])¶
Bases:
BaseModelBody for
POST /auth/api-keys.The caller only chooses the human-readable label; entropy is generated server-side so we never trust a client-supplied value.
- model_config = {'extra': 'forbid', 'json_schema_extra': {'example': {'name': 'lab-runner-2026-05'}}}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- protea.api.routers.auth_api_keys.create_api_key(request: Request, response: Response, body: CreateApiKeyRequest, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Create a fresh API key and return its raw value once.
Response shape:
{ "id": "<uuid>", "prefix": "abc12345", "name": "lab-runner-2026-05", "key": "<the only chance to copy this>", "created_at": "..." }
The raw
keyfield is the value the caller should store in their secret manager / CI. PROTEA stores only the sha256 hash + the 8-char prefix; we cannot recover the value if it is lost (just mint another and revoke the misplaced one).
- protea.api.routers.auth_api_keys.list_api_keys(include_revoked: bool = Query(False), limit: int = Query(50), factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) list[dict[str, Any]]¶
Return registered API keys newest-first.
The response never includes the secret — only the prefix and the name. Use this endpoint to confirm a key was created or to look up the id of a key you want to revoke.
- protea.api.routers.auth_api_keys.revoke_api_key(key_id: UUID, factory: sessionmaker[Session] = Depends(dependency=<function get_session_factory>, use_cache=True, scope=None)) dict[str, Any]¶
Mark an API key as revoked.
Sets
revoked_atto the current UTC timestamp. Subsequent uses of the key are rejected byrequire_api_key()with a 401. The row is preserved (not deleted) so the audit trail of historical access stays intact.
Request caching and rate limiting
protea.api.cache provides in-process caching utilities for expensive
read-only endpoints (showcase statistics, benchmark matrix). Results are
stored with a configurable TTL, reducing redundant database queries on
frequently-polled pages.
Tiny in-process TTL cache for aggregate API endpoints.
Built for stats/listing endpoints that run DISTINCT-over-JOIN queries on 10M+ row tables: queries that are structurally slow (tens of seconds) and whose results change slowly enough that a 5-minute TTL is not user-visible.
Process-local by design: resets on uvicorn restart, does not need Redis, does not leak across workers. Good enough for a single-instance deployment.
- protea.api.cache.cached(key: str, ttl: float, producer: Callable[[], Any], *, serve_stale_on_error: bool = False) Any¶
Return
producer()result, cached underkeyforttlseconds.When
serve_stale_on_erroris true and the producer raises while a prior value is still in the store (even if expired), return the stale value instead of propagating; lets cold-cache hangs degrade to a slightly out-of-date payload rather than a 500.
- protea.api.cache.get_last_known(key: str) Any | None¶
Return the last cached value for
key, ignoring TTL;Noneif absent.
- protea.api.cache.invalidate(key: str | None = None) None¶
Drop a single key, or the whole cache when
keyisNone.
protea.api.rate_limit configures the slowapi limiter and exposes
the per-principal rate-limit rules applied to the five write routes
protected by authentication (POST /jobs, POST /datasets,
POST /datasets/import-by-reference, POST /reranker-models/import,
POST /reranker-models/import-by-reference).
Per-endpoint rate limiting via slowapi (T5.6b).
Three POSTs are throttled out of the box:
POST /jobs— 10/min, envPROTEA_RATELIMIT_JOBSPOST /auth/api-keys— 5/hour, envPROTEA_RATELIMIT_API_KEYSPOST /datasets— 5/min, envPROTEA_RATELIMIT_DATASETS
Environment aware rate limiting¶
In test/dev environments (PROTEA_ENVIRONMENT=test|dev), rate limits are
effectively disabled (set to 9999/hour) to allow integration tests and local
iteration without hitting quota walls. Production deployments should leave
PROTEA_ENVIRONMENT unset or explicitly set it to “production”.
Key function¶
Every request is bucketed by:
The
ApiKey.prefixif the caller authenticated with an API key (slowapi runs after the dep would store it onrequest.state).The
subof the Bearer JWT if authenticated that way.The remote IP otherwise (unauthenticated requests still get a bucket so a flood of 401s does not amplify into an unbounded workload).
This keeps the buckets attributable: one misbehaving CI job does not collide with another team’s quota.
On 429¶
slowapi raises RateLimitExceeded which we map to a 429 problem
response carrying Retry-After. The body inherits the same
application/problem+json shape as the rest of the API.
- protea.api.rate_limit.api_keys_limit() str¶
- protea.api.rate_limit.datasets_limit() str¶
- protea.api.rate_limit.install_rate_limiter(app: FastAPI) None¶
Wire the limiter + middleware + custom handler onto
app.
- protea.api.rate_limit.jobs_limit() str¶
Shared dependencies and error handling
protea.api.deps provides FastAPI Depends callables shared across
multiple routers: database session injection, current-user extraction,
and pagination helpers.
Shared FastAPI dependency functions for all routers.
- protea.api.deps.get_amqp_url(request: Request) str¶
- protea.api.deps.get_artifacts_dir(request: Request) Path¶
- protea.api.deps.get_benchmark_config(request: Request) BenchmarkConfig¶
- protea.api.deps.get_operation_registry(request: Request) OperationRegistry¶
- protea.api.deps.get_session_factory(request: Request) sessionmaker[Session]¶
- protea.api.deps.get_settings(request: Request) Settings¶
Return the application-level Settings from app state.
- protea.api.deps.get_user_quota_per_day(request: Request) dict[str, int]¶
Return the per-user daily quota limit map from app state (FARM-AUTH.7).
protea.api.problem_details implements RFC 7807
application/problem+json error serialisation. Every exception handler
in the application calls into this module to produce a consistent
{"type", "title", "status", "detail", "instance"} body. Validation
errors carry an additional errors array with the offending field paths.
RFC 7807 application/problem+json error responses (T4.4).
Installs FastAPI exception handlers that convert the framework’s default
JSON error bodies ({"detail": ...}) into the canonical RFC 7807 shape
used by every modern HTTP API. Existing route code keeps raising
HTTPException exactly as before — this module only changes how the
responses look on the wire.
RFC 7807 fields¶
type— URI reference identifying the problem class. We userelative paths under
/problems/{slug}so the docs site can host human-readable descriptions per slug.
title— short, human-readable summary, stable across responsesof the same type.
status— HTTP status code (mirrors the response status).detail— long-form explanation specific to this occurrence.instance— relative URI of the request that produced the problem.
- class protea.api.problem_details.ProblemDetail(*, type: str, title: str, status: Annotated[int, Ge(ge=100), Le(le=599)], detail: str | None = None, instance: str | None = None)¶
Bases:
BaseModelPydantic model for an RFC 7807 problem-details payload.
- detail: str | None¶
- instance: str | None¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- status: int¶
- title: str¶
- type: str¶
- protea.api.problem_details.install_problem_openapi_schema(app: FastAPI) None¶
Document RFC 7807 as the default 4xx/5xx schema in the OpenAPI spec.
FastAPI’s stock OpenAPI generator points 4xx/5xx responses at
application/jsonwithHTTPValidationError(or nothing). This hook patches every operation so every 4xx/5xx response advertisesapplication/problem+jsonreferencingProblemDetail, matching what the runtime handlers (installed byregister_problem_handlers()) actually emit.The hook lives behind
app.openapi(FastAPI’s documented override point) so re-runs return the cached schema. Idempotent: an existingapplication/problem+jsoncontent entry on a given operation is left in place so endpoints can still document a more specific error shape later.
- protea.api.problem_details.register_problem_handlers(app: FastAPI) None¶
Install RFC 7807 handlers for the standard FastAPI error paths.
Three handlers cover every framework-emitted error:
StarletteHTTPException— covers FastAPI’sHTTPException(which subclasses Starlette’s). Handles 4xx/5xx raised from route bodies + dependency callables.RequestValidationError— Pydantic body / query / path validation failures (HTTP 422). The original FastAPI body lives undererrorsso clients can still drill into the per-field details.Exception— catch-all so unhandled crashes still produce a structured 500 instead of an HTML traceback.detailis intentionally generic: the full traceback is logged via the framework’s normal error path; the wire surface stays opaque.
Middleware
protea.api.middleware.visitor_counter is the WSGI middleware that
logs one VisitorEvent row per HTTP GET to a non-asset path. It
extracts the client IP, combines it with a daily salt, and stores the
first 16 hex characters of the resulting SHA-256 hash.
Anonymous visitor counting middleware.
Records one row per user-visible request into the visitor_event table so
that Grafana (or any SQL client) can compute “unique visitors per day” and
similar aggregate traffic metrics without storing IP addresses or using
cookies.
Privacy design¶
The client IP is never persisted. Instead, we compute a short hash:
visitor_hash = sha256(daily_salt || client_ip)[:16]
where daily_salt is a 32-byte random value held only in process memory
and rotated on every calendar day (UTC). When the day rolls over the old
salt is discarded, so cross-day correlation becomes cryptographically
infeasible — the same “rotating salt” approach used by Plausible and Fathom.
Noise filters¶
The middleware is deliberately narrow in scope: it only counts requests that
represent actual user navigation. It skips assets, polling endpoints, health
probes and metrics scrapes. See _should_record.
- class protea.api.middleware.visitor_counter.VisitorCounterMiddleware(app: Callable[[MutableMapping[str, Any], Callable[[], Awaitable[MutableMapping[str, Any]]], Callable[[MutableMapping[str, Any]], Awaitable[None]]], Awaitable[None]])¶
Bases:
BaseHTTPMiddlewareWrites one
VisitorEventrow per recorded request.The session factory is read from
app.state.session_factory— set bycreate_app()at startup. If the factory isn’t present (e.g. during tests that instantiate a bare FastAPI app), the middleware degrades to a no-op so it never breaks the request.- async dispatch(request, call_next)¶
Metrics router
The /metrics router exposes Prometheus-compatible scrape metrics for
the API process. Response time histograms, active-connection gauges, and
job-state counters are surfaced at GET /metrics.
Prometheus scrape endpoint (T5.2).
Exposes GET /metrics returning the standard Prometheus text-based
exposition format. The collector registry is built once in
protea.api.app.create_app() and stashed on app.state.metrics
so requests do not pay the registration cost on every scrape.
T5.2 scope is intentionally narrow: the endpoint is always served (so Prometheus can be wired up at deploy time without flipping any feature flag), and the five baseline metrics are registered up-front so they appear in the output even before any sample has been observed. Call sites that increment counters / observe histograms land in follow-up slices.
The protea_db_pool_in_use gauge is refreshed on each scrape by
reading the SQLAlchemy pool’s checkedout() count from the session
factory’s bound engine. This keeps the gauge accurate without needing
event-listener wiring, at the cost of one cheap method call per
scrape (typically every 15s).
- protea.api.routers.metrics.get_metrics(request: Request) Response¶
Render the live Prometheus exposition payload.
Returns 503 when the API was booted without the
prometheus_clientdependency (a minimal worker image, for example). This keeps the endpoint shape stable for Prometheus scrapers, which retry on 5xx, instead of leaking an import error.
Authentication and rate limits¶
Five POST routes require a credential (T5.6a + T5.6b):
POST /v1/jobsPOST /v1/datasetsPOST /v1/datasets/import-by-referencePOST /v1/reranker-models/importPOST /v1/reranker-models/import-by-reference
Three header forms are accepted, any one of which satisfies the gate:
Authorization: ApiKey <raw_key>
X-Api-Key: <raw_key>
Authorization: Bearer <jwt>
The API key path uses protea.api.auth.require_api_key_or_bearer()
(sha256 hash verification). The Bearer path uses HS256 with the
PROTEA_JWT_SECRET env var; minimum token claims are sub,
iat, and exp. A missing or invalid credential returns 401 with
WWW-Authenticate: ApiKey, Bearer. Rate limits on these routes are
enforced by slowapi per principal (API-key prefix or JWT sub);
exceeding the limit returns 429 with a Retry-After header.
See Authentication for the complete auth and rate-limit
reference, and Configuration Reference for the
PROTEA_AUTHN_REQUIRED, PROTEA_JWT_SECRET, and
PROTEA_RATELIMIT_* knobs.
Endpoints summary¶
Method |
Path |
Description |
|---|---|---|
Health |
||
|
|
Liveness probe: returns 200 if the API process is up. |
|
|
Readiness probe: verifies database and RabbitMQ connections. |
Jobs |
||
|
|
Create a job and publish its UUID to RabbitMQ. |
|
|
List jobs; filter by |
|
|
Retrieve a single job with full payload and meta. |
|
|
Retrieve the event timeline for a job (up to 2 000 events).
Cursor pagination (T4.2): pass |
|
|
Transition a |
|
|
Delete a job that is not in |
|
|
Append a |
|
|
List the |
Proteins |
||
|
|
Aggregate protein statistics (total, canonical, reviewed, organisms). |
|
|
List proteins with pagination; filter by |
|
|
Retrieve a single protein with its UniProt metadata. |
|
|
List GO annotations for a protein across all annotation sets. |
Annotations |
||
|
|
List ontology snapshots with GO term counts per aspect. |
|
|
Retrieve a snapshot with its full list of GO terms. |
|
|
Set the Information Accretion (IA) file URL on an ontology snapshot. |
|
|
Queue a |
|
|
BFS ancestor subgraph for a given set of GO term IDs. |
|
|
List annotation sets with protein GO annotation counts. |
|
|
Retrieve a single annotation set with summary statistics. |
|
|
Delete an annotation set and all its annotations. |
|
|
Queue a |
|
|
Queue a |
|
|
Queue a |
|
|
List evaluation sets with summary statistics. |
|
|
Get evaluation set details. |
|
|
Delete an evaluation set. |
|
|
Download NK ground truth in CAFA format. |
|
|
Download LK ground truth in CAFA format. |
|
|
Download PK ground truth in CAFA format. |
|
|
Download known terms from old annotation set (for PK evaluation). |
|
|
Download delta proteins as FASTA. |
|
|
Queue a |
|
|
List evaluation results for an evaluation set. |
|
|
Download evaluation metrics as TSV. |
|
|
Download all cafaeval artifacts as a zip. |
|
|
Delete an evaluation result. |
Embeddings |
||
|
|
List all embedding configurations. |
|
|
Create a new (immutable) embedding configuration. |
|
|
Retrieve an embedding configuration by UUID. |
|
|
Delete an embedding configuration. |
|
|
Queue a |
|
|
List prediction sets with entry counts. |
|
|
Retrieve a prediction set with summary statistics. |
|
|
List proteins in a prediction set. |
|
|
Get predictions for one protein. |
|
|
GO term distribution in a prediction set. |
|
|
Stream all predictions as TSV (filtered by accession / aspect / distance). |
|
|
Download predictions in CAFA submission format. |
|
|
Delete a prediction set. |
Scoring |
||
|
|
List scoring configurations. |
|
|
Create a scoring configuration. |
|
|
Create preset scoring configurations. |
|
|
Retrieve a scoring configuration. |
|
|
Delete a scoring configuration. |
|
|
Stream scored predictions as TSV. |
|
|
Compute CAFA-style metrics for scored predictions. |
|
|
Export labeled training data for the re-ranker. |
|
|
List all trained re-ranker models. |
|
|
Retrieve a re-ranker model’s metadata, metrics, and feature importance. |
|
|
Delete a trained re-ranker model. |
|
|
Apply a re-ranker to a prediction set and stream re-scored TSV. |
|
|
Compute CAFA Fmax and AUC-PR using re-ranker scores. |
Query Sets |
||
|
|
Upload a FASTA file and create a |
|
|
List all query sets with entry counts. |
|
|
Retrieve a query set with its full entry list. |
|
|
Delete a query set and all its entries. |
Annotate |
||
|
|
One-click annotation: upload FASTA, auto-run the full pipeline. |
Maintenance |
||
|
|
Count orphan sequences (preview). |
|
|
Delete orphan sequences. |
|
|
Count unindexed embeddings (preview). |
|
|
Delete unindexed embeddings. |
Admin |
||
|
|
Drop and recreate the public schema (requires admin token). |
Showcase |
||
|
|
Platform statistics and best evaluation results. |
Support |
||
|
|
Total thumbs-up count and recent comments. |
|
|
Submit a thumbs-up with optional comment. |
Benchmark |
||
|
|
List embedding configs with persisted display metadata. |
|
|
Per-embedding / per-stage Fmax matrix across all evaluation results. |
Datasets |
||
|
|
Enqueue an |
|
|
Register a |
|
|
List registered re-ranker datasets. Cursor pagination (T4.2):
pass |
|
|
Get a dataset by id or name. |
Plugin Registry |
||
|
|
List installed embedding-backend plugins. |
|
|
List installed annotation-source plugins. |
|
|
List installed experiment-runner plugins. |
Reranker Models |
||
|
|
Import a lab-trained booster (multipart). |
|
|
Import a booster already uploaded to the artifact store (JSON). |
Stack |
||
|
|
Return the eight-repo PROTEA stack registry. |
|
|
Aggregate open pull requests across every repo in the stack. |
Experiment Runs |
||
|
|
Create an |
|
|
List experiment runs newest-first; filter by |
|
|
Retrieve one experiment run. |
|
|
Partial update (T4.9). Status transitions stamp
|
|
|
Delete an experiment run (returns 204). |
Request body for POST /jobs¶
The operation and queue_name fields are required. payload is
passed verbatim to the operation’s execute method after Pydantic
validation; its schema depends on the operation. meta is stored on
the Job row and never interpreted by the API. description and
tags are optional D11 narrative fields surfaced on the
GET /jobs and GET /jobs/{id} responses; they let any caller
attach human intent and ad-hoc grouping tokens at submission time
without round-tripping through a separate metadata endpoint.
{
"operation": "insert_proteins",
"queue_name": "protea.jobs",
"payload": {
"search_criteria": "reviewed:true AND organism_id:9606"
},
"meta": {},
"description": "Backfill reviewed Swiss-Prot for benchmark_v1",
"tags": ["ablation", "benchmark_v1"]
}
Common payload examples by operation:
{ "operation": "fetch_uniprot_metadata", "queue_name": "protea.jobs",
"payload": { "search_criteria": "reviewed:true AND organism_id:9606" } }
{ "operation": "compute_embeddings", "queue_name": "protea.embeddings",
"payload": { "embedding_config_id": "<uuid>", "sequences_per_job": 64 } }
{ "operation": "predict_go_terms", "queue_name": "protea.predictions",
"payload": {
"embedding_config_id": "<uuid>",
"annotation_set_id": "<uuid>",
"ontology_snapshot_id": "<uuid>",
"query_set_id": "<uuid>",
"limit_per_entry": 5
}
}
See also
Operations: every operation referenced in a payload, with field-level documentation.
How-to Guides: concrete
curlrecipes that submit each endpoint end-to-end.Job Lifecycle: how the API turns a request into a persistent
Jobrow and a queue message.