Services¶
The protea.services package contains business-logic modules that
routers delegate to. Services are pure Python: they accept a SQLAlchemy
session and return domain objects or raise domain exceptions. Routers
map those exceptions to HTTP status codes. This separation allows the
same logic to be exercised from CLI tools or batch scripts without
importing FastAPI.
Public service modules
Jobs service: shared helpers for the queue-dispatch pattern.
Multiple routers (annotations, embeddings, predictions) follow the same shape when an HTTP endpoint queues background work:
Pydantic-validate the request body.
Insert a
Jobrow with the canonical operation name + queue.Insert a matching
JobEvent(job.created) for the audit log.Publish to RabbitMQ via
protea.infrastructure.queue.publisher.publish_job().Return
{"id": ..., "status": "queued"}.
This module exposes enqueue_job() (steps 2–3) and the higher-
level dispatch_validated_job() (steps 1–5) so routers collapse
to a single try/except.
- exception protea.services.jobs_service.InvalidJobPayloadError(errors: Any)¶
Bases:
ExceptionPydantic validation failed for a queue-dispatch request body.
Carries the structured
errorslist produced by Pydantic so the router can pass it through verbatim as the HTTP 422 detail.
- protea.services.jobs_service.compute_dedup_key(operation: str, payload: dict[str, Any]) str¶
Return a 16-hex-char deduplication key for
(operation, payload).The key is the first 16 hex digits of the SHA-256 of a canonical JSON serialisation of
{"operation": ..., "payload": ...}(keys sorted, ASCII-safe). Truncated to 16 chars (64-bit prefix) — collision probability is negligible for the expected job volume.The 16-char length fits comfortably in
VARCHAR(64)and keeps the partial unique index compact.
- protea.services.jobs_service.dispatch_validated_job(factory: sessionmaker[Session], amqp_url: str, body: dict[str, Any], payload_model: type[BaseModel], *, operation: str, queue_name: str) dict[str, Any]¶
End-to-end queue dispatch: validate, persist, publish, respond.
Pydantic-validates
bodyagainstpayload_model(raisingInvalidJobPayloadErroron failure for the router to map to a 422), inserts aJob+JobEventpair inside a fresh session, then publishes to RabbitMQ. Returns the canonical{"id": <uuid>, "status": "queued"}response shape used by every dispatch endpoint.
- protea.services.jobs_service.enqueue_job(session: Session, *, operation: str, queue_name: str, payload: dict[str, Any]) UUID¶
Insert
Job+JobEventrows for a background task.Returns the new job’s UUID. The caller is responsible for publishing to the queue (via
protea.infrastructure.queue.publisher.publish_job()) after committing this session, and for any payload validation that should happen before the row hits the database.Both rows are flushed but not committed; the caller’s
session_scopecontext manager owns the transaction.
Annotations service: pure-logic helpers extracted from
protea.api.routers.annotations.
ORM ↔ dict serialisers and the read-side handlers (snapshot/IA-url operations) live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.
The router translates the domain exceptions raised here to HTTP responses:
EntityNotFoundError→404 Not Found(e.g. anOntologySnapshotorAnnotationSetUUID does not resolve).
- exception protea.services.annotations_service.AnnotationSetReferencedError¶
Bases:
AnnotationsServiceErrorAn
AnnotationSetcannot be deleted because PredictionSet rows still reference it; the FK CASCADE is intentionally absent. Maps to HTTP 409 at the router boundary.
- exception protea.services.annotations_service.AnnotationsServiceError¶
Bases:
ExceptionBase class for annotations-service domain errors.
- exception protea.services.annotations_service.EntityNotFoundError(entity: str, entity_id: UUID)¶
Bases:
AnnotationsServiceErrorGeneric 404; a referenced entity does not exist.
Pickle-safe via
__reduce__so the structuredentity/entity_idattrs survive a round-trip without tripping flake8-bugbear B042.
- protea.services.annotations_service.annotation_set_to_dict(a: AnnotationSet, count: int) dict[str, Any]¶
Serialise an
AnnotationSetto its API dict shape.
- protea.services.annotations_service.delete_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]¶
Delete an annotation set and all its annotations.
Returns the deletion summary dict.
Raises:
EntityNotFoundErrorif the UUID does not resolve.AnnotationSetReferencedErrorif a PredictionSet references this set (router maps to 409).
- protea.services.annotations_service.delete_eval_result_collect_keys(session: Session, eval_id: UUID, result_id: UUID) list[str]¶
Delete the EvaluationResult and return the artifact keys to clean up.
Same split as
delete_evaluation_set_collect_keys(): the DB delete happens here; the artifact-store deletion is the router’s responsibility (it owns theArtifactStorefactory).
- protea.services.annotations_service.delete_evaluation_set_collect_keys(session: Session, eval_id: UUID) list[str]¶
Delete the EvaluationSet and return the artifact-store keys to clean.
The DB delete cascades to
EvaluationResultrows; this helper walks the results before deleting and returns the union of all artifact keys those rows referenced (per-result cafaeval outputs) so the caller can wipe them from the store. The caller is also expected to delete the set’s ground-truth artifact viaprotea.core.evaluation.groundtruth_key_for(eval_id); that key is not included here because it is a fixed function ofeval_id.Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.evaluation_result_to_dict(r: EvaluationResult) dict[str, Any]¶
Serialise an
EvaluationResultto its API dict shape.
- protea.services.annotations_service.evaluation_set_to_dict(e: EvaluationSet) dict[str, Any]¶
Serialise an
EvaluationSetto its API dict shape.
- protea.services.annotations_service.get_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]¶
Return a single annotation set with its annotation count.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.get_eval_result_with_keys(session: Session, eval_id: UUID, result_id: UUID) tuple[EvaluationResult, list[str]]¶
Fetch an EvaluationResult belonging to
eval_id; return (row, artifact_keys).Raises
EntityNotFoundError(“EvaluationResult”) when the result does not exist or does not belong toeval_id.
- protea.services.annotations_service.get_evaluation_set_data(session: Session, eval_id: UUID) dict[str, Any]¶
Return a single evaluation set.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.get_go_subgraph_data(session: Session, snapshot_id: UUID, go_ids: str, depth: int) dict[str, Any]¶
BFS the GO DAG upward from the requested seed terms.
Returns
{"nodes": [...], "edges": [...]}ready for the API. Each node hasid(DB id),go_id,name,aspect,is_query(True for the seed terms). Each edge hassource(child id),target(parent id),relation_type.Raises
EntityNotFoundErrorwhen the snapshot does not resolve. Imports it lazily to avoid the circular dependency with the re-exportingannotations_servicemodule.
- protea.services.annotations_service.get_snapshot_data(session: Session, snapshot_id: UUID) dict[str, Any]¶
Return a single snapshot with its GO term count.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.annotations_service.iter_delta_proteins_fasta(session: Session, eval_id: UUID, category: str) list[str]¶
Return FASTA lines for delta proteins (
nk/lk/pk/all).Only proteins whose sequence is in the DB are emitted. Header is
>ACCESSION entry_name OS=organism OX=taxon (NK|LK|PK); the sequence is wrapped at 60 chars per line.Empty result returns an empty list. Raises
EntityNotFoundErrorif the EvaluationSet does not resolve. Imports it lazily to avoid the circular dependency with the re-exportingannotations_servicemodule.
- protea.services.annotations_service.iter_groundtruth_tsv(session: Session, eval_id: UUID, category: str) list[str]¶
Return the rows for a CAFA
ground_truth_<CATEGORY>.tsvdownload.categoryis"nk","lk","pk"or"known". Each row is"<protein>\t<go_id>\n"; sorted by protein then GO id so the output is deterministic. The caller wraps the list in aStreamingResponse(the materialised list is small enough, a few thousand rows for typical CAFA splits, to fit in memory and keeps the streaming generator simple).Raises
EntityNotFoundErrorwhen the EvaluationSet does not resolve.
- protea.services.annotations_service.list_annotation_sets_data(session: Session, source: str | None = None) list[dict[str, Any]]¶
List all annotation sets with their per-set annotation counts (newest first).
Optionally filter by
source(e.g.goaorquickgo). Pure read; the caller caches at the API boundary.
- protea.services.annotations_service.list_evaluation_results_data(session: Session, eval_id: UUID) list[dict[str, Any]]¶
List EvaluationResult rows for one EvaluationSet (newest first).
Raises
EntityNotFoundErrorwhen the EvaluationSet does not resolve.
- protea.services.annotations_service.list_evaluation_sets_data(session: Session) list[dict[str, Any]]¶
List all evaluation sets, newest first.
- protea.services.annotations_service.list_snapshots_data(session: Session) list[dict[str, Any]]¶
Return all loaded snapshots with their GO term counts (newest first).
Pure read; the caller is responsible for caching at the API boundary if desired (the GROUP BY over the multi-million row
go_termtable is the slow part).
- protea.services.annotations_service.render_evaluation_metrics_tsv(result: EvaluationResult, aspect_codes: tuple[str, ...]) Any¶
Yield TSV rows for the per-(setting, namespace) metrics summary.
The caller passes the aspect-codes tuple (
ASPECT_CAFA_CODES) so the service stays free of the domain layer. Returns a generator suitable forStreamingResponse.
- protea.services.annotations_service.set_snapshot_ia_url(session: Session, snapshot_id: UUID, ia_url: str | None) dict[str, Any]¶
Update the IA URL on a snapshot. Empty string is treated as
None.Returns a small confirmation dict shape compatible with the legacy endpoint. Raises
EntityNotFoundErrorfor the 404 path. The caller (router) is responsible for validating request body shape (e.g. presence of theia_urlkey) before calling.
- protea.services.annotations_service.snapshot_to_dict(s: OntologySnapshot, term_count: int) dict[str, Any]¶
Serialise an
OntologySnapshotto its API dict shape.
Embeddings service: pure-logic helpers extracted from
protea.api.routers.embeddings.
Validation rules, ORM ↔ dict serialisers, and the predictions-TSV streaming generator live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.
The router translates the domain exceptions raised here to HTTP responses:
InvalidEmbeddingConfigError→422 Unprocessable Entity(validation errors carry a list of human-readable messages in.errors).EntityNotFoundError→404 Not Found(e.g. aPredictionSetUUID does not resolve).
- exception protea.services.embeddings_service.EmbeddingsServiceError
Bases:
ExceptionBase class for embeddings-service domain errors.
- exception protea.services.embeddings_service.EntityNotFoundError(entity: str, entity_id: UUID)
Bases:
EmbeddingsServiceErrorGeneric 404; a referenced entity does not exist.
Construct with the entity label (e.g.
"PredictionSet") and the looked-up UUID; the message becomes"<entity> not found". Pickle-safe via__reduce__so the structuredentity/entity_idattributes survive a round-trip without tripping flake8-bugbear B042.
- exception protea.services.embeddings_service.InvalidEmbeddingConfigError(errors: list[str])
Bases:
EmbeddingsServiceErrorValidation failure for an EmbeddingConfig request body.
errorscarries a list of human-readable messages, one per failed rule, suitable for inclusion in the HTTP 422 response body.
- exception protea.services.embeddings_service.InvalidUUIDFieldError(field: str)
Bases:
EmbeddingsServiceErrorPredict request body had a field that does not parse as UUID.
Carries the offending field name in
field; the router translates this to422with detail"<field> must be a valid UUID".
- protea.services.embeddings_service.assert_prediction_set_exists(session: Session, prediction_set_id: UUID) None
Raise
EntityNotFoundErrorif the PredictionSet UUID is unknown.
- protea.services.embeddings_service.config_to_dict(c: EmbeddingConfig, embedding_count: int | None = None) dict[str, Any]
Serialise an
EmbeddingConfigORM row to its API dict shape.The
embedding_countfield is only included when the caller has a number to report (the bareGET /configs/{id}endpoint does not).
- protea.services.embeddings_service.delete_embedding_config_cascade(session: Session, config_id: UUID) dict[str, Any]
Cascade-delete an
EmbeddingConfigand all linked rows.Raises
EntityNotFoundErrorwhenconfig_iddoes not resolve. Body lives in_embeddings_admin_helpers.cascade_delete_embedding_config().
- protea.services.embeddings_service.delete_prediction_set_cascade(session: Session, prediction_set_id: UUID) dict[str, Any]
Delete a
PredictionSetand all itsGOPredictionrows.Returns
{"deleted": <id>, "predictions_deleted": <count>}. RaisesEntityNotFoundErrorwhen the UUID does not resolve so the router can translate to 404.
- protea.services.embeddings_service.get_go_term_distribution_data(session: Session, *, prediction_set_id: UUID, limit: int = 50) dict[str, Any]
Return the most-frequent GO terms predicted in this set + per-aspect totals.
Raises
EntityNotFoundErrorwhen the PredictionSet does not resolve. Body lives in_embeddings_admin_helpers.compute_go_term_distribution().
- protea.services.embeddings_service.get_prediction_set_data(session: Session, prediction_set_id: UUID) dict[str, Any]
Retrieve a prediction set with total + per-protein GO term counts.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services.embeddings_service.get_predictions_for_protein(session: Session, *, prediction_set_id: UUID, accession: str) list[dict[str, Any]]
Return all predicted GO terms for one protein, sorted by distance.
Raises
EntityNotFoundErrorwhen the PredictionSet does not resolve. (No 404 for unknown accession; returns empty list, matching the legacy endpoint’s behaviour.)
- protea.services.embeddings_service.iter_predictions_cafa_tsv(factory: sessionmaker[Session], *, prediction_set_id: UUID, aspect: str | None, max_distance: float | None, delta_proteins: set[str] | None) Iterator[str]
Stream the CAFA-format prediction TSV.
DB-level deduplication: a
GROUP BY (protein_accession, go_term_id)+MIN(distance)subquery keeps the best row per pair so the Python side never needs an unboundedseenset; true streaming. Score ismax(0.0, 1.0 - distance)clamped to[0, 1].
- protea.services.embeddings_service.iter_predictions_tsv(factory: Any, *, prediction_set_id: UUID, accession: str | None = None, aspect: str | None = None, max_distance: float | None = None) Iterator[str]
Yield TSV rows (as
str) of every GOPrediction in a set.Opens its own session inside the generator so the caller’s existence-check session can close cleanly. The first yielded chunk is the header line; one row per
(GOPrediction, GOTerm)pair follows, ordered by(protein_accession, distance).Optional filters:
accession(single query protein),aspect(F/P/C),max_distance.
- protea.services.embeddings_service.list_prediction_sets_data(session: Session) list[dict[str, Any]]
Top 100 most-recent
PredictionSetrows joined with their context.Returns a list of dicts each carrying the embedding-config name, annotation-set label, ontology version, plus the per-set
prediction_count. The per-set count comes from a singleGROUP BYover GOPrediction (one index-only scan) rather than a correlated subquery; for ~10⁷-row tables Postgres’ planner falls into a per-row index probe with the correlated form (~30s per outer row). The grouped form returns all 100 counts at once.
- protea.services.embeddings_service.list_proteins_in_prediction_set(session: Session, *, prediction_set_id: UUID, search: str | None = None, limit: int = 50, offset: int = 0) dict[str, Any]
Paginated list of proteins in a prediction set with derived stats.
For each row returns
go_count(number of predicted terms),min_distance(closest neighbour),annotation_count(known annotations against the same AnnotationSet) andmatch_count(predictions whose(protein, go_id)is in the known set; a precision proxy).Decomposed into private helpers (
_paginate_protein_rows,_load_protein_orm_map,_load_annotation_counts,_load_match_counts) so this orchestrator stays under the §3 method-LOC ceiling.Raises
EntityNotFoundError(imported lazily to avoid the circular dependency withembeddings_service) whenprediction_set_iddoes not resolve.
- protea.services.embeddings_service.prepare_cafa_export(session: Session, *, prediction_set_id: UUID, eval_id: UUID | None) set[str] | None
Preflight CAFA export: validate the PredictionSet exists and, if an
EvaluationSetwas supplied, compute the union of NK + LK delta proteins to restrict the export.Returns the delta-protein accession set when
eval_idis provided (the streaming generator filters on it), otherwiseNone.Raises
EntityNotFoundErrorfor missing PredictionSet or EvaluationSet so the router can translate to 404.
- protea.services.embeddings_service.validate_embedding_config_body(body: dict[str, Any]) dict[str, Any]
Validate a request body for
POST /embeddings/configs.Returns the canonicalised dict (defaults filled in) on success. Raises
InvalidEmbeddingConfigError(imported lazily to avoid the circular dep withembeddings_service) with the full list of failures otherwise; the router translates that to a 422 with the same shape it produced before extraction.Decomposed into per-field-group helpers so neither this orchestrator nor any helper breaches the 60-LOC method ceiling.
- protea.services.embeddings_service.validate_predict_request(session: Session, body: dict[str, Any]) dict[str, UUID]
Parse + validate the three required UUID fields of a predict request.
Returns a dict mapping field name to its parsed
uuid.UUID. RaisesInvalidUUIDFieldErrorfor parse failures (router → 422) orEntityNotFoundErrorif a referenced entity does not exist (router → 404). Field order is preserved so the first failure wins, matching the previous in-router behaviour.
Internal helper modules
The following modules are internal helpers that implement specific phases of each service. They are documented here for completeness but are not intended to be called directly by routers or external code.
Annotations service helpers¶
EvaluationSet + EvaluationResult helpers for annotations_service.
The evaluation-side reads (evaluation_set_to_dict,
evaluation_result_to_dict, the list/get/delete handlers) plus the
baseline-scoring auto-attach helper share a divergent-change axis with
the benchmark matrix and the CAFA evaluation pipeline. Snapshot and
annotation-set CRUD evolve independently, so this cluster moves out
into its own sibling module while the parent service keeps the public
import surface via re-export.
Domain exceptions (EntityNotFoundError) are imported lazily inside
the functions that raise them to avoid a circular dependency with the
parent module.
- protea.services._annotations_evaluation_helpers.apply_baseline_scoring_default(session: Session, body: dict[str, Any], baseline_scoring_name: str | None) dict[str, Any]¶
Auto-attach the baseline
scoring_config_idto a CAFA evaluation payload when no scoring + reranker selection is provided.Without this, eval_result rows with both
scoring_config_idandreranker_model_idNULL are filtered out of the benchmark matrix (_stage_of()excludes them). When the caller supplies any ofscoring_config_id/reranker_model_id/rerankers, or when no baseline name is configured, the body is returned unchanged.
- protea.services._annotations_evaluation_helpers.assert_evaluation_set_exists(session: Session, eval_id: UUID) None¶
Raise
EntityNotFoundErrorwhen theEvaluationSetUUID does not resolve. Cheap preflight for endpoints that dispatch background work but still need a 404 path.
- protea.services._annotations_evaluation_helpers.delete_eval_result_collect_keys(session: Session, eval_id: UUID, result_id: UUID) list[str]¶
Delete the EvaluationResult and return the artifact keys to clean up.
Same split as
delete_evaluation_set_collect_keys(): the DB delete happens here; the artifact-store deletion is the router’s responsibility (it owns theArtifactStorefactory).
- protea.services._annotations_evaluation_helpers.delete_evaluation_set_collect_keys(session: Session, eval_id: UUID) list[str]¶
Delete the EvaluationSet and return the artifact-store keys to clean.
The DB delete cascades to
EvaluationResultrows; this helper walks the results before deleting and returns the union of all artifact keys those rows referenced (per-result cafaeval outputs) so the caller can wipe them from the store. The caller is also expected to delete the set’s ground-truth artifact viaprotea.core.evaluation.groundtruth_key_for(eval_id); that key is not included here because it is a fixed function ofeval_id.Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services._annotations_evaluation_helpers.evaluation_result_to_dict(r: EvaluationResult) dict[str, Any]¶
Serialise an
EvaluationResultto its API dict shape.
- protea.services._annotations_evaluation_helpers.evaluation_set_to_dict(e: EvaluationSet) dict[str, Any]¶
Serialise an
EvaluationSetto its API dict shape.
- protea.services._annotations_evaluation_helpers.get_eval_result_with_keys(session: Session, eval_id: UUID, result_id: UUID) tuple[EvaluationResult, list[str]]¶
Fetch an EvaluationResult belonging to
eval_id; return (row, artifact_keys).Raises
EntityNotFoundError(“EvaluationResult”) when the result does not exist or does not belong toeval_id.
- protea.services._annotations_evaluation_helpers.get_evaluation_set_data(session: Session, eval_id: UUID) dict[str, Any]¶
Return a single evaluation set.
Raises
EntityNotFoundErrorwhen the UUID does not resolve.
- protea.services._annotations_evaluation_helpers.list_evaluation_results_data(session: Session, eval_id: UUID) list[dict[str, Any]]¶
List EvaluationResult rows for one EvaluationSet (newest first).
Raises
EntityNotFoundErrorwhen the EvaluationSet does not resolve.
- protea.services._annotations_evaluation_helpers.list_evaluation_sets_data(session: Session) list[dict[str, Any]]¶
List all evaluation sets, newest first.
Helpers for the two long methods of annotations_service.
Splits get_go_subgraph_data (DAG BFS) and
iter_delta_proteins_fasta (FASTA emission) so neither orchestrator
nor any helper exceeds the §3 method-LOC ceiling.
Both functions are re-exported from protea.services.annotations_service
for backwards compatibility with router and CLI callers.
- protea.services._annotations_method_helpers.get_go_subgraph_data(session: Session, snapshot_id: UUID, go_ids: str, depth: int) dict[str, Any]¶
BFS the GO DAG upward from the requested seed terms.
Returns
{"nodes": [...], "edges": [...]}ready for the API. Each node hasid(DB id),go_id,name,aspect,is_query(True for the seed terms). Each edge hassource(child id),target(parent id),relation_type.Raises
EntityNotFoundErrorwhen the snapshot does not resolve. Imports it lazily to avoid the circular dependency with the re-exportingannotations_servicemodule.
- protea.services._annotations_method_helpers.iter_delta_proteins_fasta(session: Session, eval_id: UUID, category: str) list[str]¶
Return FASTA lines for delta proteins (
nk/lk/pk/all).Only proteins whose sequence is in the DB are emitted. Header is
>ACCESSION entry_name OS=organism OX=taxon (NK|LK|PK); the sequence is wrapped at 60 chars per line.Empty result returns an empty list. Raises
EntityNotFoundErrorif the EvaluationSet does not resolve. Imports it lazily to avoid the circular dependency with the re-exportingannotations_servicemodule.
Streaming + per-row TSV/zip helpers extracted from annotations_service.
These pure functions own the streaming bytes/strings that
StreamingResponse consumes (eval-artifacts zip and per-namespace
metrics TSV). They have no domain-exception coupling, so they live
in this sibling module independently of the service layer.
- protea.services._annotations_streaming_helpers.iter_evaluation_artifacts_zip(keys: list[str], result_id: UUID) Iterator[bytes]¶
Stream a deflate-compressed zip of the evaluation-result artifacts.
The artifact store is resolved internally from settings; the caller does not need to thread the store through. Keys are rewritten to drop the
eval_artifacts/<result_id>/prefix so the zip entries are the relative file names cafaeval emitted.
- protea.services._annotations_streaming_helpers.render_evaluation_metrics_tsv(result: EvaluationResult, aspect_codes: tuple[str, ...]) Any¶
Yield TSV rows for the per-(setting, namespace) metrics summary.
The caller passes the aspect-codes tuple (
ASPECT_CAFA_CODES) so the service stays free of the domain layer. Returns a generator suitable forStreamingResponse.
Embeddings service helpers¶
Admin / analytics helpers extracted from embeddings_service.
These pure functions own the heavy DB work for admin operations
(cascade-delete an EmbeddingConfig and all dependent rows)
and read-only analytics (per-aspect GO-term distribution for a
PredictionSet). The service layer keeps the existence check that
maps to EntityNotFoundError and delegates the body here, so this
module has zero coupling to the domain-exception classes.
- protea.services._embeddings_admin_helpers.cascade_delete_embedding_config(session: Session, config: EmbeddingConfig) dict[str, Any]¶
Cascade-delete the given
EmbeddingConfigand all linked rows.Bulk-deletes the dependent
GOPrediction(via PredictionSet),PredictionSet, andSequenceEmbeddingrows; then the config itself. Returns a summary dict with the deletion counts.The ORM-level
ondeletecascade would handle this onsession.delete(c)alone, but we bulk-delete explicitly here so the response reports per-table counts the UI surfaces.The caller is responsible for verifying the config exists before calling (the existence check belongs in the service layer next to the
EntityNotFoundErrordefinition).
- protea.services._embeddings_admin_helpers.compute_go_term_distribution(session: Session, *, prediction_set_id: UUID, limit: int = 50) dict[str, Any]¶
Return the most-frequent GO terms predicted in a set + per-aspect totals.
The caller is responsible for verifying the PredictionSet exists before calling (the existence check stays in the service layer).
CAFA-export helpers for embeddings_service.
The preflight (prepare_cafa_export) and the streaming TSV generator
(iter_predictions_cafa_tsv) own a self-contained divergent-change
cluster: they evolve with the CAFA file-format spec and the evaluation
data shape, not with the rest of the predictions API. Moving them out
keeps embeddings_service.py under the file-LOC budget while
preserving the public import surface (both names are re-exported from
protea.services.embeddings_service).
Domain exceptions (EntityNotFoundError) are imported lazily inside
the functions that raise them to avoid a circular dependency with the
parent module.
- protea.services._embeddings_cafa_helpers.iter_predictions_cafa_tsv(factory: sessionmaker[Session], *, prediction_set_id: UUID, aspect: str | None, max_distance: float | None, delta_proteins: set[str] | None) Iterator[str]¶
Stream the CAFA-format prediction TSV.
DB-level deduplication: a
GROUP BY (protein_accession, go_term_id)+MIN(distance)subquery keeps the best row per pair so the Python side never needs an unboundedseenset; true streaming. Score ismax(0.0, 1.0 - distance)clamped to[0, 1].
- protea.services._embeddings_cafa_helpers.prepare_cafa_export(session: Session, *, prediction_set_id: UUID, eval_id: UUID | None) set[str] | None¶
Preflight CAFA export: validate the PredictionSet exists and, if an
EvaluationSetwas supplied, compute the union of NK + LK delta proteins to restrict the export.Returns the delta-protein accession set when
eval_idis provided (the streaming generator filters on it), otherwiseNone.Raises
EntityNotFoundErrorfor missing PredictionSet or EvaluationSet so the router can translate to 404.
Per-PredictionSet query + TSV-formatting helpers extracted from
embeddings_service.
These pure functions own the GOPrediction/GOTerm join queries and the
output formatting (TSV columns, optional / float coercions). The
service layer keeps existence checks and re-exports the public API so
external callers continue importing
protea.services.embeddings_service unchanged.
- protea.services._embeddings_predictions_helpers.build_predictions_for_protein(session: Session, *, prediction_set_id: UUID, accession: str) list[dict[str, Any]]¶
Return all predicted GO terms for one protein, sorted by distance.
The PredictionSet existence check stays in the service layer (so the domain exception type lives next to the rest of the validation logic); this helper assumes the caller has already verified the id.
- protea.services._embeddings_predictions_helpers.iter_predictions_tsv(factory: Any, *, prediction_set_id: UUID, accession: str | None = None, aspect: str | None = None, max_distance: float | None = None) Iterator[str]¶
Yield TSV rows (as
str) of every GOPrediction in a set.Opens its own session inside the generator so the caller’s existence-check session can close cleanly. The first yielded chunk is the header line; one row per
(GOPrediction, GOTerm)pair follows, ordered by(protein_accession, distance).Optional filters:
accession(single query protein),aspect(F/P/C),max_distance.
Per-protein listing helpers for embeddings_service.
The orchestrator list_proteins_in_prediction_set lives here split
into private helpers (paginate / load / count) so neither it nor any
helper exceeds the §3 method-LOC ceiling, and embeddings_service.py
can shrink toward the file-LOC budget.
The function is re-exported from protea.services.embeddings_service
for backwards compatibility with existing router and CLI callers.
- protea.services._embeddings_proteins_helpers.list_proteins_in_prediction_set(session: Session, *, prediction_set_id: UUID, search: str | None = None, limit: int = 50, offset: int = 0) dict[str, Any]¶
Paginated list of proteins in a prediction set with derived stats.
For each row returns
go_count(number of predicted terms),min_distance(closest neighbour),annotation_count(known annotations against the same AnnotationSet) andmatch_count(predictions whose(protein, go_id)is in the known set; a precision proxy).Decomposed into private helpers (
_paginate_protein_rows,_load_protein_orm_map,_load_annotation_counts,_load_match_counts) so this orchestrator stays under the §3 method-LOC ceiling.Raises
EntityNotFoundError(imported lazily to avoid the circular dependency withembeddings_service) whenprediction_set_iddoes not resolve.
Embedding-config request-body validation helpers for embeddings_service.
The orchestrator validate_embedding_config_body lives here split
into per-field-group helpers so neither it nor any helper exceeds the
§3 method-LOC ceiling, and embeddings_service.py can shrink
toward the file-LOC budget.
The function is re-exported from protea.services.embeddings_service
for backwards compatibility with router and CLI callers.
Validation is duck-typed (manual isinstance checks) rather than
Pydantic to preserve the exact response payload shape and message
wording the existing tests assert on.
- protea.services._embeddings_validation_helpers.validate_embedding_config_body(body: dict[str, Any]) dict[str, Any]¶
Validate a request body for
POST /embeddings/configs.Returns the canonicalised dict (defaults filled in) on success. Raises
InvalidEmbeddingConfigError(imported lazily to avoid the circular dep withembeddings_service) with the full list of failures otherwise; the router translates that to a 422 with the same shape it produced before extraction.Decomposed into per-field-group helpers so neither this orchestrator nor any helper breaches the 60-LOC method ceiling.
Scoring service helpers¶
Pydantic response models, ORM ↔ response serialisers, and preset
ScoringConfig definitions extracted from scoring_service.
The service module re-exports every public symbol so existing router/CLI imports keep working unchanged.
- protea.services._scoring_models.PRESET_CONFIGS: list[dict[str, Any]] = [{'description': 'Pure cosine similarity of the winning neighbour, converted to [0, 1]. Baseline; tests the hypothesis that the nearest-neighbour distance alone is enough signal.', 'formula': 'linear', 'name': 'embedding_only', 'weights': {'embedding_similarity': 1.0, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Canonical KNN score: fraction of the K neighbours that vote for each GO term. Tests the hypothesis that consensus across neighbours beats the raw cosine distance of the top-1 neighbour.', 'formula': 'linear', 'name': 'vote_fraction', 'weights': {'embedding_similarity': 0.0, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 1.0, 'taxonomic_proximity': 0.0}}, {'description': 'Pure sequence-identity score (NW global 60 % + SW local 40 %), no embedding. Tests whether classical sequence alignment alone can match PLM-based KNN. Requires compute_alignments=True.', 'formula': 'linear', 'name': 'alignment_only', 'weights': {'embedding_similarity': 0.0, 'evidence_weight': 0.0, 'identity_nw': 0.6, 'identity_sw': 0.4, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Embedding (50 %) refined with global NW identity (30 %) and local SW identity (20 %). Tests whether alignment adds a usable signal on top of the embedding. Requires compute_alignments=True.', 'formula': 'linear', 'name': 'embedding_plus_alignment', 'weights': {'embedding_similarity': 0.5, 'evidence_weight': 0.0, 'identity_nw': 0.3, 'identity_sw': 0.2, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Nearest-neighbour distance (50 %) combined with K-neighbour consensus (50 %). Tests whether adding voting on top of cosine distance improves the ranking vs either signal alone.', 'formula': 'linear', 'name': 'embedding_plus_vote', 'weights': {'embedding_similarity': 0.5, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 0.5, 'taxonomic_proximity': 0.0}}, {'description': 'Embedding similarity, multiplied by the resolved evidence weight as a final veto (evidence_weighted formula with evidence_weight=0 in the linear sum to avoid double-counting). Tests whether down-ranking IEA/ND predictions via a clean multiplier beats feeding evidence into the sum.', 'formula': 'evidence_weighted', 'name': 'evidence_veto', 'weights': {'embedding_similarity': 1.0, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Kitchen-sink linear mix: embedding + alignment + taxonomy + voting. evidence_weight excluded from the linear sum (use evidence_veto when you want the multiplier). Requires compute_alignments=True and compute_taxonomy=True; tests whether more signals beat fewer.', 'formula': 'linear', 'name': 'composite', 'weights': {'embedding_similarity': 0.4, 'evidence_weight': 0.0, 'identity_nw': 0.2, 'identity_sw': 0.1, 'neighbor_vote_fraction': 0.2, 'taxonomic_proximity': 0.1}}]¶
Built-in preset ScoringConfigs seeded by the
POST /configs/presetsendpoint. Documents what the system defaults produce; none of them override evidence weights.
- class protea.services._scoring_models.RerankerResponse(*, id: UUID, name: str, prediction_set_id: UUID | None, evaluation_set_id: UUID | None, category: str, aspect: str | None, metrics: dict[str, Any], feature_importance: dict[str, Any], feature_schema_sha: str | None = None, feature_selection: dict[str, Any] | None = None, producer_version: str | None = None, producer_git_sha: str | None = None, external_source: str | None = None, dataset_id: UUID | None = None, dataset_name: str | None = None, dataset_schema_sha: str | None = None, dataset_manifest_sha: str | None = None, spec_yaml: str | None = None, created_at: Any)¶
Bases:
BaseModelSerialised representation of a stored
RerankerModel.- aspect: str | None¶
- category: str¶
- created_at: Any¶
- dataset_id: uuid.UUID | None¶
- dataset_manifest_sha: str | None¶
- dataset_name: str | None¶
- dataset_schema_sha: str | None¶
- evaluation_set_id: uuid.UUID | None¶
- external_source: str | None¶
- feature_importance: dict[str, Any]¶
- feature_schema_sha: str | None¶
- feature_selection: dict[str, Any] | None¶
- id: uuid.UUID¶
- metrics: dict[str, Any]¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- prediction_set_id: uuid.UUID | None¶
- producer_git_sha: str | None¶
- producer_version: str | None¶
- spec_yaml: str | None¶
- class protea.services._scoring_models.ScoringConfigCreate(*, name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=255)], formula: str = 'linear', weights: dict[str, float] = <factory>, evidence_weights: dict[str, float] | None = None, description: str | None = None)¶
Bases:
BaseModelRequest body for
POST /scoring/configs.Lives in the service module so non-router callers (CLI tools, batch scripts) can reuse the validation rules without pulling FastAPI in.
- description: str | None¶
- evidence_weights: dict[str, float] | None¶
- formula: str¶
- model_config = {'extra': 'forbid'}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- classmethod validate_evidence_weights(v: dict[str, float] | None) dict[str, float] | None¶
Ensure all keys are known GO codes and all values are in [0, 1].
- classmethod validate_formula(v: str) str¶
- classmethod validate_weights(v: dict[str, float]) dict[str, float]¶
- weights: dict[str, float]¶
- class protea.services._scoring_models.ScoringConfigResponse(*, id: UUID, name: str, formula: str, weights: dict[str, Any], evidence_weights: dict[str, Any] | None, description: str | None, created_at: Any)¶
Bases:
BaseModelSerialised representation of a stored
ScoringConfig.- created_at: Any¶
- description: str | None¶
- evidence_weights: dict[str, Any] | None¶
- formula: str¶
- id: uuid.UUID¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str¶
- weights: dict[str, Any]¶
- protea.services._scoring_models.to_reranker_response(m: RerankerModel, dataset: Dataset | None = None) RerankerResponse¶
Convert an ORM
RerankerModelto its API response model.datasetis the resolved linkedDatasetrow (when the caller has joined it). When provided, its content fingerprints and name are surfaced for provenance;Noneleaves those fields unset.
- protea.services._scoring_models.to_response(c: ScoringConfig) ScoringConfigResponse¶
Convert an ORM
ScoringConfigto its API response model.
Prediction-metrics computation helpers for scoring_service.
The orchestrator compute_prediction_metrics lives here split
into private helpers (validate / score / format) so neither it nor
any helper exceeds the §3 method-LOC ceiling, and scoring_service.py
can shrink toward the file-LOC budget.
The function is re-exported from protea.services.scoring_service
for backwards compatibility with router and CLI callers.
- protea.services._scoring_prediction_metrics_helpers.compute_prediction_metrics(session: Session, *, prediction_set_id: UUID, scoring_config_id: UUID, eval_context: EvalContext, category: str) dict[str, Any]¶
Compute CAFA Fmax and AUC-PR for a PredictionSet under a ScoringConfig.
Decomposed into private helpers (
_validate_*,_score_*,_format_*) so this orchestrator stays under the §3 method-LOC ceiling.- Raises:
EntityNotFoundError – Either
PredictionSetorScoringConfigdoes not exist.SignalCoverageError – The config requires signals absent from the PredictionSet.
Streaming-TSV generators extracted from scoring_service.
These pure functions own the per-row formatting + yield loop for
the three TSV streams the scoring router serves: scored, training,
and re-ranker outputs. The service layer keeps re-exports for
back-compat with router/CLI imports.
- protea.services._scoring_streaming_helpers.iter_reranked_predictions_tsv(df: Any, *, min_score: float | None = None) Any¶
Yield TSV rows (as bytes) from the scored DataFrame produced by
score_predictions_with_reranker().Empty input emits a header-only response (matches the legacy endpoint’s “no predictions” shape).
- protea.services._scoring_streaming_helpers.iter_scored_predictions(factory: Any, *, prediction_set_id: UUID, config_snap: ScoringConfig, min_score: float | None = None, accession: str | None = None) Any¶
Yield TSV rows (as bytes) of scored predictions.
Opens its own session inside the generator so the route’s initial validation phase can close its session before streaming starts; avoids holding a DB connection open for the duration of the response. The first yielded chunk is the header line; one row per GOPrediction follows.
- protea.services._scoring_streaming_helpers.iter_training_data(factory: Any, *, prediction_set_id: UUID, gt_pairs: set[tuple[str, str]]) Any¶
Yield TSV rows (as bytes) of labeled training data for the re-ranker.
Streaming generator over GOPrediction rows; opens its own session inside so the caller can close the validation session before streaming starts. Each row carries the canonical GOPrediction feature vector plus a binary
label(1 if the(protein_accession, go_id)pair is ingt_pairs, else 0). Row formatting is delegated to_format_training_rowin_scoring_training_helpersso the orchestrator stays under the §3 method-LOC ceiling.
Training-data row formatting for scoring_service.iter_training_data.
Lifts the 30-column TSV row builder out of the streaming generator so the orchestrator stays under the §3 method-LOC ceiling.
- protea.services._scoring_training_helpers.format_training_row(pred: Any, go_id: str, aspect: str | None, label: int) str¶
Render one labeled training-data TSV row (no trailing newline).
Column order matches
TRAINING_TSV_COLUMNS.labelis binary (0 / 1). Uses lazy import of_format_optionalfrom_scoring_streaming_helpers(where it canonically lives) to avoid a circular dep with the re-exporting module.
Validation + signal-coverage helpers extracted from scoring_service.
These pure functions own the heavy DB introspection (per-signal NULL
counts, temporal NK/LK/PK delta materialisation) for the scoring API.
They return raw results; the service-layer wrappers translate them
into domain exceptions (SignalCoverageError, EntityNotFoundError)
so the helper module stays free of those types.
- protea.services._scoring_validation_helpers.build_training_gt_pairs(session: Session, *, prediction_set: PredictionSet, evaluation_set: Any, category: str) set[tuple[str, str]]¶
Compute the
(protein, go_id)ground-truth pair set for training.The caller (service layer) is responsible for resolving the
PredictionSetandEvaluationSetrows and raisingEntityNotFoundErrorfor missing ids.evaluation_setis typed asAnyso the helper can stay free of theEvaluationSetORM import (which lives at module level via lazy import in the service).
- protea.services._scoring_validation_helpers.compute_missing_signals(session: Session, prediction_set_id: UUID, config_snap: ScoringConfig) list[str]¶
Return human-readable strings for every required signal absent from the set.
For each signal with a non-zero weight in
config_snap.weights(plusevidence_codewhen the formula isevidence_weighted, where the multiplier is always applied), count how many rows in the PredictionSet have the backing column non-NULL. Empty list means full coverage. The service-layer wrapper raisesSignalCoverageErrorwhen this list is non-empty.
See also
HTTP API: routers that call into these service modules.
Infrastructure: ORM models and session utilities used by services.