Services

The protea.services package contains business-logic modules that routers delegate to. Services are pure Python: they accept a SQLAlchemy session and return domain objects or raise domain exceptions. Routers map those exceptions to HTTP status codes. This separation allows the same logic to be exercised from CLI tools or batch scripts without importing FastAPI.

Public service modules

Jobs service: shared helpers for the queue-dispatch pattern.

Multiple routers (annotations, embeddings, predictions) follow the same shape when an HTTP endpoint queues background work:

  1. Pydantic-validate the request body.

  2. Insert a Job row with the canonical operation name + queue.

  3. Insert a matching JobEvent (job.created) for the audit log.

  4. Publish to RabbitMQ via protea.infrastructure.queue.publisher.publish_job().

  5. Return {"id": ..., "status": "queued"}.

This module exposes enqueue_job() (steps 2–3) and the higher- level dispatch_validated_job() (steps 1–5) so routers collapse to a single try/except.

exception protea.services.jobs_service.InvalidJobPayloadError(errors: Any)

Bases: Exception

Pydantic validation failed for a queue-dispatch request body.

Carries the structured errors list produced by Pydantic so the router can pass it through verbatim as the HTTP 422 detail.

protea.services.jobs_service.compute_dedup_key(operation: str, payload: dict[str, Any]) str

Return a 16-hex-char deduplication key for (operation, payload).

The key is the first 16 hex digits of the SHA-256 of a canonical JSON serialisation of {"operation": ..., "payload": ...} (keys sorted, ASCII-safe). Truncated to 16 chars (64-bit prefix) — collision probability is negligible for the expected job volume.

The 16-char length fits comfortably in VARCHAR(64) and keeps the partial unique index compact.

protea.services.jobs_service.dispatch_validated_job(factory: sessionmaker[Session], amqp_url: str, body: dict[str, Any], payload_model: type[BaseModel], *, operation: str, queue_name: str) dict[str, Any]

End-to-end queue dispatch: validate, persist, publish, respond.

Pydantic-validates body against payload_model (raising InvalidJobPayloadError on failure for the router to map to a 422), inserts a Job + JobEvent pair inside a fresh session, then publishes to RabbitMQ. Returns the canonical {"id": <uuid>, "status": "queued"} response shape used by every dispatch endpoint.

protea.services.jobs_service.enqueue_job(session: Session, *, operation: str, queue_name: str, payload: dict[str, Any]) UUID

Insert Job + JobEvent rows for a background task.

Returns the new job’s UUID. The caller is responsible for publishing to the queue (via protea.infrastructure.queue.publisher.publish_job()) after committing this session, and for any payload validation that should happen before the row hits the database.

Both rows are flushed but not committed; the caller’s session_scope context manager owns the transaction.

Annotations service: pure-logic helpers extracted from protea.api.routers.annotations.

ORM ↔ dict serialisers and the read-side handlers (snapshot/IA-url operations) live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.

The router translates the domain exceptions raised here to HTTP responses:

  • EntityNotFoundError404 Not Found (e.g. an OntologySnapshot or AnnotationSet UUID does not resolve).

exception protea.services.annotations_service.AnnotationSetReferencedError

Bases: AnnotationsServiceError

An AnnotationSet cannot be deleted because PredictionSet rows still reference it; the FK CASCADE is intentionally absent. Maps to HTTP 409 at the router boundary.

exception protea.services.annotations_service.AnnotationsServiceError

Bases: Exception

Base class for annotations-service domain errors.

exception protea.services.annotations_service.EntityNotFoundError(entity: str, entity_id: UUID)

Bases: AnnotationsServiceError

Generic 404; a referenced entity does not exist.

Pickle-safe via __reduce__ so the structured entity / entity_id attrs survive a round-trip without tripping flake8-bugbear B042.

protea.services.annotations_service.annotation_set_to_dict(a: AnnotationSet, count: int) dict[str, Any]

Serialise an AnnotationSet to its API dict shape.

protea.services.annotations_service.delete_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]

Delete an annotation set and all its annotations.

Returns the deletion summary dict.

Raises:

protea.services.annotations_service.delete_eval_result_collect_keys(session: Session, eval_id: UUID, result_id: UUID) list[str]

Delete the EvaluationResult and return the artifact keys to clean up.

Same split as delete_evaluation_set_collect_keys(): the DB delete happens here; the artifact-store deletion is the router’s responsibility (it owns the ArtifactStore factory).

protea.services.annotations_service.delete_evaluation_set_collect_keys(session: Session, eval_id: UUID) list[str]

Delete the EvaluationSet and return the artifact-store keys to clean.

The DB delete cascades to EvaluationResult rows; this helper walks the results before deleting and returns the union of all artifact keys those rows referenced (per-result cafaeval outputs) so the caller can wipe them from the store. The caller is also expected to delete the set’s ground-truth artifact via protea.core.evaluation.groundtruth_key_for(eval_id); that key is not included here because it is a fixed function of eval_id.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.evaluation_result_to_dict(r: EvaluationResult) dict[str, Any]

Serialise an EvaluationResult to its API dict shape.

protea.services.annotations_service.evaluation_set_to_dict(e: EvaluationSet) dict[str, Any]

Serialise an EvaluationSet to its API dict shape.

protea.services.annotations_service.get_annotation_set_data(session: Session, set_id: UUID) dict[str, Any]

Return a single annotation set with its annotation count.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.get_eval_result_with_keys(session: Session, eval_id: UUID, result_id: UUID) tuple[EvaluationResult, list[str]]

Fetch an EvaluationResult belonging to eval_id; return (row, artifact_keys).

Raises EntityNotFoundError (“EvaluationResult”) when the result does not exist or does not belong to eval_id.

protea.services.annotations_service.get_evaluation_set_data(session: Session, eval_id: UUID) dict[str, Any]

Return a single evaluation set.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.get_go_subgraph_data(session: Session, snapshot_id: UUID, go_ids: str, depth: int) dict[str, Any]

BFS the GO DAG upward from the requested seed terms.

Returns {"nodes": [...], "edges": [...]} ready for the API. Each node has id (DB id), go_id, name, aspect, is_query (True for the seed terms). Each edge has source (child id), target (parent id), relation_type.

Raises EntityNotFoundError when the snapshot does not resolve. Imports it lazily to avoid the circular dependency with the re-exporting annotations_service module.

protea.services.annotations_service.get_snapshot_data(session: Session, snapshot_id: UUID) dict[str, Any]

Return a single snapshot with its GO term count.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.annotations_service.iter_delta_proteins_fasta(session: Session, eval_id: UUID, category: str) list[str]

Return FASTA lines for delta proteins (nk / lk / pk / all).

Only proteins whose sequence is in the DB are emitted. Header is >ACCESSION entry_name OS=organism OX=taxon (NK|LK|PK); the sequence is wrapped at 60 chars per line.

Empty result returns an empty list. Raises EntityNotFoundError if the EvaluationSet does not resolve. Imports it lazily to avoid the circular dependency with the re-exporting annotations_service module.

protea.services.annotations_service.iter_groundtruth_tsv(session: Session, eval_id: UUID, category: str) list[str]

Return the rows for a CAFA ground_truth_<CATEGORY>.tsv download.

category is "nk", "lk", "pk" or "known". Each row is "<protein>\t<go_id>\n"; sorted by protein then GO id so the output is deterministic. The caller wraps the list in a StreamingResponse (the materialised list is small enough, a few thousand rows for typical CAFA splits, to fit in memory and keeps the streaming generator simple).

Raises EntityNotFoundError when the EvaluationSet does not resolve.

protea.services.annotations_service.list_annotation_sets_data(session: Session, source: str | None = None) list[dict[str, Any]]

List all annotation sets with their per-set annotation counts (newest first).

Optionally filter by source (e.g. goa or quickgo). Pure read; the caller caches at the API boundary.

protea.services.annotations_service.list_evaluation_results_data(session: Session, eval_id: UUID) list[dict[str, Any]]

List EvaluationResult rows for one EvaluationSet (newest first).

Raises EntityNotFoundError when the EvaluationSet does not resolve.

protea.services.annotations_service.list_evaluation_sets_data(session: Session) list[dict[str, Any]]

List all evaluation sets, newest first.

protea.services.annotations_service.list_snapshots_data(session: Session) list[dict[str, Any]]

Return all loaded snapshots with their GO term counts (newest first).

Pure read; the caller is responsible for caching at the API boundary if desired (the GROUP BY over the multi-million row go_term table is the slow part).

protea.services.annotations_service.render_evaluation_metrics_tsv(result: EvaluationResult, aspect_codes: tuple[str, ...]) Any

Yield TSV rows for the per-(setting, namespace) metrics summary.

The caller passes the aspect-codes tuple (ASPECT_CAFA_CODES) so the service stays free of the domain layer. Returns a generator suitable for StreamingResponse.

protea.services.annotations_service.set_snapshot_ia_url(session: Session, snapshot_id: UUID, ia_url: str | None) dict[str, Any]

Update the IA URL on a snapshot. Empty string is treated as None.

Returns a small confirmation dict shape compatible with the legacy endpoint. Raises EntityNotFoundError for the 404 path. The caller (router) is responsible for validating request body shape (e.g. presence of the ia_url key) before calling.

protea.services.annotations_service.snapshot_to_dict(s: OntologySnapshot, term_count: int) dict[str, Any]

Serialise an OntologySnapshot to its API dict shape.

Embeddings service: pure-logic helpers extracted from protea.api.routers.embeddings.

Validation rules, ORM ↔ dict serialisers, and the predictions-TSV streaming generator live here so non-router callers (CLI tools, batch scripts) can reuse them without pulling FastAPI in.

The router translates the domain exceptions raised here to HTTP responses:

  • InvalidEmbeddingConfigError422 Unprocessable Entity (validation errors carry a list of human-readable messages in .errors).

  • EntityNotFoundError404 Not Found (e.g. a PredictionSet UUID does not resolve).

exception protea.services.embeddings_service.EmbeddingsServiceError

Bases: Exception

Base class for embeddings-service domain errors.

exception protea.services.embeddings_service.EntityNotFoundError(entity: str, entity_id: UUID)

Bases: EmbeddingsServiceError

Generic 404; a referenced entity does not exist.

Construct with the entity label (e.g. "PredictionSet") and the looked-up UUID; the message becomes "<entity> not found". Pickle-safe via __reduce__ so the structured entity / entity_id attributes survive a round-trip without tripping flake8-bugbear B042.

exception protea.services.embeddings_service.InvalidEmbeddingConfigError(errors: list[str])

Bases: EmbeddingsServiceError

Validation failure for an EmbeddingConfig request body.

errors carries a list of human-readable messages, one per failed rule, suitable for inclusion in the HTTP 422 response body.

exception protea.services.embeddings_service.InvalidUUIDFieldError(field: str)

Bases: EmbeddingsServiceError

Predict request body had a field that does not parse as UUID.

Carries the offending field name in field; the router translates this to 422 with detail "<field> must be a valid UUID".

protea.services.embeddings_service.assert_prediction_set_exists(session: Session, prediction_set_id: UUID) None

Raise EntityNotFoundError if the PredictionSet UUID is unknown.

protea.services.embeddings_service.config_to_dict(c: EmbeddingConfig, embedding_count: int | None = None) dict[str, Any]

Serialise an EmbeddingConfig ORM row to its API dict shape.

The embedding_count field is only included when the caller has a number to report (the bare GET /configs/{id} endpoint does not).

protea.services.embeddings_service.delete_embedding_config_cascade(session: Session, config_id: UUID) dict[str, Any]

Cascade-delete an EmbeddingConfig and all linked rows.

Raises EntityNotFoundError when config_id does not resolve. Body lives in _embeddings_admin_helpers.cascade_delete_embedding_config().

protea.services.embeddings_service.delete_prediction_set_cascade(session: Session, prediction_set_id: UUID) dict[str, Any]

Delete a PredictionSet and all its GOPrediction rows.

Returns {"deleted": <id>, "predictions_deleted": <count>}. Raises EntityNotFoundError when the UUID does not resolve so the router can translate to 404.

protea.services.embeddings_service.get_go_term_distribution_data(session: Session, *, prediction_set_id: UUID, limit: int = 50) dict[str, Any]

Return the most-frequent GO terms predicted in this set + per-aspect totals.

Raises EntityNotFoundError when the PredictionSet does not resolve. Body lives in _embeddings_admin_helpers.compute_go_term_distribution().

protea.services.embeddings_service.get_prediction_set_data(session: Session, prediction_set_id: UUID) dict[str, Any]

Retrieve a prediction set with total + per-protein GO term counts.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services.embeddings_service.get_predictions_for_protein(session: Session, *, prediction_set_id: UUID, accession: str) list[dict[str, Any]]

Return all predicted GO terms for one protein, sorted by distance.

Raises EntityNotFoundError when the PredictionSet does not resolve. (No 404 for unknown accession; returns empty list, matching the legacy endpoint’s behaviour.)

protea.services.embeddings_service.iter_predictions_cafa_tsv(factory: sessionmaker[Session], *, prediction_set_id: UUID, aspect: str | None, max_distance: float | None, delta_proteins: set[str] | None) Iterator[str]

Stream the CAFA-format prediction TSV.

DB-level deduplication: a GROUP BY (protein_accession, go_term_id) + MIN(distance) subquery keeps the best row per pair so the Python side never needs an unbounded seen set; true streaming. Score is max(0.0, 1.0 - distance) clamped to [0, 1].

protea.services.embeddings_service.iter_predictions_tsv(factory: Any, *, prediction_set_id: UUID, accession: str | None = None, aspect: str | None = None, max_distance: float | None = None) Iterator[str]

Yield TSV rows (as str) of every GOPrediction in a set.

Opens its own session inside the generator so the caller’s existence-check session can close cleanly. The first yielded chunk is the header line; one row per (GOPrediction, GOTerm) pair follows, ordered by (protein_accession, distance).

Optional filters: accession (single query protein), aspect (F / P / C), max_distance.

protea.services.embeddings_service.list_prediction_sets_data(session: Session) list[dict[str, Any]]

Top 100 most-recent PredictionSet rows joined with their context.

Returns a list of dicts each carrying the embedding-config name, annotation-set label, ontology version, plus the per-set prediction_count. The per-set count comes from a single GROUP BY over GOPrediction (one index-only scan) rather than a correlated subquery; for ~10⁷-row tables Postgres’ planner falls into a per-row index probe with the correlated form (~30s per outer row). The grouped form returns all 100 counts at once.

protea.services.embeddings_service.list_proteins_in_prediction_set(session: Session, *, prediction_set_id: UUID, search: str | None = None, limit: int = 50, offset: int = 0) dict[str, Any]

Paginated list of proteins in a prediction set with derived stats.

For each row returns go_count (number of predicted terms), min_distance (closest neighbour), annotation_count (known annotations against the same AnnotationSet) and match_count (predictions whose (protein, go_id) is in the known set; a precision proxy).

Decomposed into private helpers (_paginate_protein_rows, _load_protein_orm_map, _load_annotation_counts, _load_match_counts) so this orchestrator stays under the §3 method-LOC ceiling.

Raises EntityNotFoundError (imported lazily to avoid the circular dependency with embeddings_service) when prediction_set_id does not resolve.

protea.services.embeddings_service.prepare_cafa_export(session: Session, *, prediction_set_id: UUID, eval_id: UUID | None) set[str] | None

Preflight CAFA export: validate the PredictionSet exists and, if an EvaluationSet was supplied, compute the union of NK + LK delta proteins to restrict the export.

Returns the delta-protein accession set when eval_id is provided (the streaming generator filters on it), otherwise None.

Raises EntityNotFoundError for missing PredictionSet or EvaluationSet so the router can translate to 404.

protea.services.embeddings_service.validate_embedding_config_body(body: dict[str, Any]) dict[str, Any]

Validate a request body for POST /embeddings/configs.

Returns the canonicalised dict (defaults filled in) on success. Raises InvalidEmbeddingConfigError (imported lazily to avoid the circular dep with embeddings_service) with the full list of failures otherwise; the router translates that to a 422 with the same shape it produced before extraction.

Decomposed into per-field-group helpers so neither this orchestrator nor any helper breaches the 60-LOC method ceiling.

protea.services.embeddings_service.validate_predict_request(session: Session, body: dict[str, Any]) dict[str, UUID]

Parse + validate the three required UUID fields of a predict request.

Returns a dict mapping field name to its parsed uuid.UUID. Raises InvalidUUIDFieldError for parse failures (router → 422) or EntityNotFoundError if a referenced entity does not exist (router → 404). Field order is preserved so the first failure wins, matching the previous in-router behaviour.

Internal helper modules

The following modules are internal helpers that implement specific phases of each service. They are documented here for completeness but are not intended to be called directly by routers or external code.

Annotations service helpers

EvaluationSet + EvaluationResult helpers for annotations_service.

The evaluation-side reads (evaluation_set_to_dict, evaluation_result_to_dict, the list/get/delete handlers) plus the baseline-scoring auto-attach helper share a divergent-change axis with the benchmark matrix and the CAFA evaluation pipeline. Snapshot and annotation-set CRUD evolve independently, so this cluster moves out into its own sibling module while the parent service keeps the public import surface via re-export.

Domain exceptions (EntityNotFoundError) are imported lazily inside the functions that raise them to avoid a circular dependency with the parent module.

protea.services._annotations_evaluation_helpers.apply_baseline_scoring_default(session: Session, body: dict[str, Any], baseline_scoring_name: str | None) dict[str, Any]

Auto-attach the baseline scoring_config_id to a CAFA evaluation payload when no scoring + reranker selection is provided.

Without this, eval_result rows with both scoring_config_id and reranker_model_id NULL are filtered out of the benchmark matrix (_stage_of() excludes them). When the caller supplies any of scoring_config_id / reranker_model_id / rerankers, or when no baseline name is configured, the body is returned unchanged.

protea.services._annotations_evaluation_helpers.assert_evaluation_set_exists(session: Session, eval_id: UUID) None

Raise EntityNotFoundError when the EvaluationSet UUID does not resolve. Cheap preflight for endpoints that dispatch background work but still need a 404 path.

protea.services._annotations_evaluation_helpers.delete_eval_result_collect_keys(session: Session, eval_id: UUID, result_id: UUID) list[str]

Delete the EvaluationResult and return the artifact keys to clean up.

Same split as delete_evaluation_set_collect_keys(): the DB delete happens here; the artifact-store deletion is the router’s responsibility (it owns the ArtifactStore factory).

protea.services._annotations_evaluation_helpers.delete_evaluation_set_collect_keys(session: Session, eval_id: UUID) list[str]

Delete the EvaluationSet and return the artifact-store keys to clean.

The DB delete cascades to EvaluationResult rows; this helper walks the results before deleting and returns the union of all artifact keys those rows referenced (per-result cafaeval outputs) so the caller can wipe them from the store. The caller is also expected to delete the set’s ground-truth artifact via protea.core.evaluation.groundtruth_key_for(eval_id); that key is not included here because it is a fixed function of eval_id.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services._annotations_evaluation_helpers.evaluation_result_to_dict(r: EvaluationResult) dict[str, Any]

Serialise an EvaluationResult to its API dict shape.

protea.services._annotations_evaluation_helpers.evaluation_set_to_dict(e: EvaluationSet) dict[str, Any]

Serialise an EvaluationSet to its API dict shape.

protea.services._annotations_evaluation_helpers.get_eval_result_with_keys(session: Session, eval_id: UUID, result_id: UUID) tuple[EvaluationResult, list[str]]

Fetch an EvaluationResult belonging to eval_id; return (row, artifact_keys).

Raises EntityNotFoundError (“EvaluationResult”) when the result does not exist or does not belong to eval_id.

protea.services._annotations_evaluation_helpers.get_evaluation_set_data(session: Session, eval_id: UUID) dict[str, Any]

Return a single evaluation set.

Raises EntityNotFoundError when the UUID does not resolve.

protea.services._annotations_evaluation_helpers.list_evaluation_results_data(session: Session, eval_id: UUID) list[dict[str, Any]]

List EvaluationResult rows for one EvaluationSet (newest first).

Raises EntityNotFoundError when the EvaluationSet does not resolve.

protea.services._annotations_evaluation_helpers.list_evaluation_sets_data(session: Session) list[dict[str, Any]]

List all evaluation sets, newest first.

Helpers for the two long methods of annotations_service.

Splits get_go_subgraph_data (DAG BFS) and iter_delta_proteins_fasta (FASTA emission) so neither orchestrator nor any helper exceeds the §3 method-LOC ceiling.

Both functions are re-exported from protea.services.annotations_service for backwards compatibility with router and CLI callers.

protea.services._annotations_method_helpers.get_go_subgraph_data(session: Session, snapshot_id: UUID, go_ids: str, depth: int) dict[str, Any]

BFS the GO DAG upward from the requested seed terms.

Returns {"nodes": [...], "edges": [...]} ready for the API. Each node has id (DB id), go_id, name, aspect, is_query (True for the seed terms). Each edge has source (child id), target (parent id), relation_type.

Raises EntityNotFoundError when the snapshot does not resolve. Imports it lazily to avoid the circular dependency with the re-exporting annotations_service module.

protea.services._annotations_method_helpers.iter_delta_proteins_fasta(session: Session, eval_id: UUID, category: str) list[str]

Return FASTA lines for delta proteins (nk / lk / pk / all).

Only proteins whose sequence is in the DB are emitted. Header is >ACCESSION entry_name OS=organism OX=taxon (NK|LK|PK); the sequence is wrapped at 60 chars per line.

Empty result returns an empty list. Raises EntityNotFoundError if the EvaluationSet does not resolve. Imports it lazily to avoid the circular dependency with the re-exporting annotations_service module.

Streaming + per-row TSV/zip helpers extracted from annotations_service.

These pure functions own the streaming bytes/strings that StreamingResponse consumes (eval-artifacts zip and per-namespace metrics TSV). They have no domain-exception coupling, so they live in this sibling module independently of the service layer.

protea.services._annotations_streaming_helpers.iter_evaluation_artifacts_zip(keys: list[str], result_id: UUID) Iterator[bytes]

Stream a deflate-compressed zip of the evaluation-result artifacts.

The artifact store is resolved internally from settings; the caller does not need to thread the store through. Keys are rewritten to drop the eval_artifacts/<result_id>/ prefix so the zip entries are the relative file names cafaeval emitted.

protea.services._annotations_streaming_helpers.render_evaluation_metrics_tsv(result: EvaluationResult, aspect_codes: tuple[str, ...]) Any

Yield TSV rows for the per-(setting, namespace) metrics summary.

The caller passes the aspect-codes tuple (ASPECT_CAFA_CODES) so the service stays free of the domain layer. Returns a generator suitable for StreamingResponse.

Embeddings service helpers

Admin / analytics helpers extracted from embeddings_service.

These pure functions own the heavy DB work for admin operations (cascade-delete an EmbeddingConfig and all dependent rows) and read-only analytics (per-aspect GO-term distribution for a PredictionSet). The service layer keeps the existence check that maps to EntityNotFoundError and delegates the body here, so this module has zero coupling to the domain-exception classes.

protea.services._embeddings_admin_helpers.cascade_delete_embedding_config(session: Session, config: EmbeddingConfig) dict[str, Any]

Cascade-delete the given EmbeddingConfig and all linked rows.

Bulk-deletes the dependent GOPrediction (via PredictionSet), PredictionSet, and SequenceEmbedding rows; then the config itself. Returns a summary dict with the deletion counts.

The ORM-level ondelete cascade would handle this on session.delete(c) alone, but we bulk-delete explicitly here so the response reports per-table counts the UI surfaces.

The caller is responsible for verifying the config exists before calling (the existence check belongs in the service layer next to the EntityNotFoundError definition).

protea.services._embeddings_admin_helpers.compute_go_term_distribution(session: Session, *, prediction_set_id: UUID, limit: int = 50) dict[str, Any]

Return the most-frequent GO terms predicted in a set + per-aspect totals.

The caller is responsible for verifying the PredictionSet exists before calling (the existence check stays in the service layer).

CAFA-export helpers for embeddings_service.

The preflight (prepare_cafa_export) and the streaming TSV generator (iter_predictions_cafa_tsv) own a self-contained divergent-change cluster: they evolve with the CAFA file-format spec and the evaluation data shape, not with the rest of the predictions API. Moving them out keeps embeddings_service.py under the file-LOC budget while preserving the public import surface (both names are re-exported from protea.services.embeddings_service).

Domain exceptions (EntityNotFoundError) are imported lazily inside the functions that raise them to avoid a circular dependency with the parent module.

protea.services._embeddings_cafa_helpers.iter_predictions_cafa_tsv(factory: sessionmaker[Session], *, prediction_set_id: UUID, aspect: str | None, max_distance: float | None, delta_proteins: set[str] | None) Iterator[str]

Stream the CAFA-format prediction TSV.

DB-level deduplication: a GROUP BY (protein_accession, go_term_id) + MIN(distance) subquery keeps the best row per pair so the Python side never needs an unbounded seen set; true streaming. Score is max(0.0, 1.0 - distance) clamped to [0, 1].

protea.services._embeddings_cafa_helpers.prepare_cafa_export(session: Session, *, prediction_set_id: UUID, eval_id: UUID | None) set[str] | None

Preflight CAFA export: validate the PredictionSet exists and, if an EvaluationSet was supplied, compute the union of NK + LK delta proteins to restrict the export.

Returns the delta-protein accession set when eval_id is provided (the streaming generator filters on it), otherwise None.

Raises EntityNotFoundError for missing PredictionSet or EvaluationSet so the router can translate to 404.

Per-PredictionSet query + TSV-formatting helpers extracted from embeddings_service.

These pure functions own the GOPrediction/GOTerm join queries and the output formatting (TSV columns, optional / float coercions). The service layer keeps existence checks and re-exports the public API so external callers continue importing protea.services.embeddings_service unchanged.

protea.services._embeddings_predictions_helpers.build_predictions_for_protein(session: Session, *, prediction_set_id: UUID, accession: str) list[dict[str, Any]]

Return all predicted GO terms for one protein, sorted by distance.

The PredictionSet existence check stays in the service layer (so the domain exception type lives next to the rest of the validation logic); this helper assumes the caller has already verified the id.

protea.services._embeddings_predictions_helpers.iter_predictions_tsv(factory: Any, *, prediction_set_id: UUID, accession: str | None = None, aspect: str | None = None, max_distance: float | None = None) Iterator[str]

Yield TSV rows (as str) of every GOPrediction in a set.

Opens its own session inside the generator so the caller’s existence-check session can close cleanly. The first yielded chunk is the header line; one row per (GOPrediction, GOTerm) pair follows, ordered by (protein_accession, distance).

Optional filters: accession (single query protein), aspect (F / P / C), max_distance.

Per-protein listing helpers for embeddings_service.

The orchestrator list_proteins_in_prediction_set lives here split into private helpers (paginate / load / count) so neither it nor any helper exceeds the §3 method-LOC ceiling, and embeddings_service.py can shrink toward the file-LOC budget.

The function is re-exported from protea.services.embeddings_service for backwards compatibility with existing router and CLI callers.

protea.services._embeddings_proteins_helpers.list_proteins_in_prediction_set(session: Session, *, prediction_set_id: UUID, search: str | None = None, limit: int = 50, offset: int = 0) dict[str, Any]

Paginated list of proteins in a prediction set with derived stats.

For each row returns go_count (number of predicted terms), min_distance (closest neighbour), annotation_count (known annotations against the same AnnotationSet) and match_count (predictions whose (protein, go_id) is in the known set; a precision proxy).

Decomposed into private helpers (_paginate_protein_rows, _load_protein_orm_map, _load_annotation_counts, _load_match_counts) so this orchestrator stays under the §3 method-LOC ceiling.

Raises EntityNotFoundError (imported lazily to avoid the circular dependency with embeddings_service) when prediction_set_id does not resolve.

Embedding-config request-body validation helpers for embeddings_service.

The orchestrator validate_embedding_config_body lives here split into per-field-group helpers so neither it nor any helper exceeds the §3 method-LOC ceiling, and embeddings_service.py can shrink toward the file-LOC budget.

The function is re-exported from protea.services.embeddings_service for backwards compatibility with router and CLI callers.

Validation is duck-typed (manual isinstance checks) rather than Pydantic to preserve the exact response payload shape and message wording the existing tests assert on.

protea.services._embeddings_validation_helpers.validate_embedding_config_body(body: dict[str, Any]) dict[str, Any]

Validate a request body for POST /embeddings/configs.

Returns the canonicalised dict (defaults filled in) on success. Raises InvalidEmbeddingConfigError (imported lazily to avoid the circular dep with embeddings_service) with the full list of failures otherwise; the router translates that to a 422 with the same shape it produced before extraction.

Decomposed into per-field-group helpers so neither this orchestrator nor any helper breaches the 60-LOC method ceiling.

Scoring service helpers

Pydantic response models, ORM ↔ response serialisers, and preset ScoringConfig definitions extracted from scoring_service.

The service module re-exports every public symbol so existing router/CLI imports keep working unchanged.

protea.services._scoring_models.PRESET_CONFIGS: list[dict[str, Any]] = [{'description': 'Pure cosine similarity of the winning neighbour, converted to [0, 1]. Baseline; tests the hypothesis that the nearest-neighbour distance alone is enough signal.', 'formula': 'linear', 'name': 'embedding_only', 'weights': {'embedding_similarity': 1.0, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Canonical KNN score: fraction of the K neighbours that vote for each GO term. Tests the hypothesis that consensus across neighbours beats the raw cosine distance of the top-1 neighbour.', 'formula': 'linear', 'name': 'vote_fraction', 'weights': {'embedding_similarity': 0.0, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 1.0, 'taxonomic_proximity': 0.0}}, {'description': 'Pure sequence-identity score (NW global 60 % + SW local 40 %), no embedding. Tests whether classical sequence alignment alone can match PLM-based KNN. Requires compute_alignments=True.', 'formula': 'linear', 'name': 'alignment_only', 'weights': {'embedding_similarity': 0.0, 'evidence_weight': 0.0, 'identity_nw': 0.6, 'identity_sw': 0.4, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Embedding (50 %) refined with global NW identity (30 %) and local SW identity (20 %). Tests whether alignment adds a usable signal on top of the embedding. Requires compute_alignments=True.', 'formula': 'linear', 'name': 'embedding_plus_alignment', 'weights': {'embedding_similarity': 0.5, 'evidence_weight': 0.0, 'identity_nw': 0.3, 'identity_sw': 0.2, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Nearest-neighbour distance (50 %) combined with K-neighbour consensus (50 %). Tests whether adding voting on top of cosine distance improves the ranking vs either signal alone.', 'formula': 'linear', 'name': 'embedding_plus_vote', 'weights': {'embedding_similarity': 0.5, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 0.5, 'taxonomic_proximity': 0.0}}, {'description': 'Embedding similarity, multiplied by the resolved evidence weight as a final veto (evidence_weighted formula with evidence_weight=0 in the linear sum to avoid double-counting). Tests whether down-ranking IEA/ND predictions via a clean multiplier beats feeding evidence into the sum.', 'formula': 'evidence_weighted', 'name': 'evidence_veto', 'weights': {'embedding_similarity': 1.0, 'evidence_weight': 0.0, 'identity_nw': 0.0, 'identity_sw': 0.0, 'neighbor_vote_fraction': 0.0, 'taxonomic_proximity': 0.0}}, {'description': 'Kitchen-sink linear mix: embedding + alignment + taxonomy + voting. evidence_weight excluded from the linear sum (use evidence_veto when you want the multiplier). Requires compute_alignments=True and compute_taxonomy=True; tests whether more signals beat fewer.', 'formula': 'linear', 'name': 'composite', 'weights': {'embedding_similarity': 0.4, 'evidence_weight': 0.0, 'identity_nw': 0.2, 'identity_sw': 0.1, 'neighbor_vote_fraction': 0.2, 'taxonomic_proximity': 0.1}}]

Built-in preset ScoringConfigs seeded by the POST /configs/presets endpoint. Documents what the system defaults produce; none of them override evidence weights.

class protea.services._scoring_models.RerankerResponse(*, id: UUID, name: str, prediction_set_id: UUID | None, evaluation_set_id: UUID | None, category: str, aspect: str | None, metrics: dict[str, Any], feature_importance: dict[str, Any], feature_schema_sha: str | None = None, feature_selection: dict[str, Any] | None = None, producer_version: str | None = None, producer_git_sha: str | None = None, external_source: str | None = None, dataset_id: UUID | None = None, dataset_name: str | None = None, dataset_schema_sha: str | None = None, dataset_manifest_sha: str | None = None, spec_yaml: str | None = None, created_at: Any)

Bases: BaseModel

Serialised representation of a stored RerankerModel.

aspect: str | None
category: str
created_at: Any
dataset_id: uuid.UUID | None
dataset_manifest_sha: str | None
dataset_name: str | None
dataset_schema_sha: str | None
evaluation_set_id: uuid.UUID | None
external_source: str | None
feature_importance: dict[str, Any]
feature_schema_sha: str | None
feature_selection: dict[str, Any] | None
id: uuid.UUID
metrics: dict[str, Any]
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
prediction_set_id: uuid.UUID | None
producer_git_sha: str | None
producer_version: str | None
spec_yaml: str | None
class protea.services._scoring_models.ScoringConfigCreate(*, name: ~typing.Annotated[str, ~annotated_types.MinLen(min_length=1), ~annotated_types.MaxLen(max_length=255)], formula: str = 'linear', weights: dict[str, float] = <factory>, evidence_weights: dict[str, float] | None = None, description: str | None = None)

Bases: BaseModel

Request body for POST /scoring/configs.

Lives in the service module so non-router callers (CLI tools, batch scripts) can reuse the validation rules without pulling FastAPI in.

description: str | None
evidence_weights: dict[str, float] | None
formula: str
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
classmethod validate_evidence_weights(v: dict[str, float] | None) dict[str, float] | None

Ensure all keys are known GO codes and all values are in [0, 1].

classmethod validate_formula(v: str) str
classmethod validate_weights(v: dict[str, float]) dict[str, float]
weights: dict[str, float]
class protea.services._scoring_models.ScoringConfigResponse(*, id: UUID, name: str, formula: str, weights: dict[str, Any], evidence_weights: dict[str, Any] | None, description: str | None, created_at: Any)

Bases: BaseModel

Serialised representation of a stored ScoringConfig.

created_at: Any
description: str | None
evidence_weights: dict[str, Any] | None
formula: str
id: uuid.UUID
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
weights: dict[str, Any]
protea.services._scoring_models.to_reranker_response(m: RerankerModel, dataset: Dataset | None = None) RerankerResponse

Convert an ORM RerankerModel to its API response model.

dataset is the resolved linked Dataset row (when the caller has joined it). When provided, its content fingerprints and name are surfaced for provenance; None leaves those fields unset.

protea.services._scoring_models.to_response(c: ScoringConfig) ScoringConfigResponse

Convert an ORM ScoringConfig to its API response model.

Prediction-metrics computation helpers for scoring_service.

The orchestrator compute_prediction_metrics lives here split into private helpers (validate / score / format) so neither it nor any helper exceeds the §3 method-LOC ceiling, and scoring_service.py can shrink toward the file-LOC budget.

The function is re-exported from protea.services.scoring_service for backwards compatibility with router and CLI callers.

protea.services._scoring_prediction_metrics_helpers.compute_prediction_metrics(session: Session, *, prediction_set_id: UUID, scoring_config_id: UUID, eval_context: EvalContext, category: str) dict[str, Any]

Compute CAFA Fmax and AUC-PR for a PredictionSet under a ScoringConfig.

Decomposed into private helpers (_validate_*, _score_*, _format_*) so this orchestrator stays under the §3 method-LOC ceiling.

Raises:
  • EntityNotFoundError – Either PredictionSet or ScoringConfig does not exist.

  • SignalCoverageError – The config requires signals absent from the PredictionSet.

Streaming-TSV generators extracted from scoring_service.

These pure functions own the per-row formatting + yield loop for the three TSV streams the scoring router serves: scored, training, and re-ranker outputs. The service layer keeps re-exports for back-compat with router/CLI imports.

protea.services._scoring_streaming_helpers.iter_reranked_predictions_tsv(df: Any, *, min_score: float | None = None) Any

Yield TSV rows (as bytes) from the scored DataFrame produced by score_predictions_with_reranker().

Empty input emits a header-only response (matches the legacy endpoint’s “no predictions” shape).

protea.services._scoring_streaming_helpers.iter_scored_predictions(factory: Any, *, prediction_set_id: UUID, config_snap: ScoringConfig, min_score: float | None = None, accession: str | None = None) Any

Yield TSV rows (as bytes) of scored predictions.

Opens its own session inside the generator so the route’s initial validation phase can close its session before streaming starts; avoids holding a DB connection open for the duration of the response. The first yielded chunk is the header line; one row per GOPrediction follows.

protea.services._scoring_streaming_helpers.iter_training_data(factory: Any, *, prediction_set_id: UUID, gt_pairs: set[tuple[str, str]]) Any

Yield TSV rows (as bytes) of labeled training data for the re-ranker.

Streaming generator over GOPrediction rows; opens its own session inside so the caller can close the validation session before streaming starts. Each row carries the canonical GOPrediction feature vector plus a binary label (1 if the (protein_accession, go_id) pair is in gt_pairs, else 0). Row formatting is delegated to _format_training_row in _scoring_training_helpers so the orchestrator stays under the §3 method-LOC ceiling.

Training-data row formatting for scoring_service.iter_training_data.

Lifts the 30-column TSV row builder out of the streaming generator so the orchestrator stays under the §3 method-LOC ceiling.

protea.services._scoring_training_helpers.format_training_row(pred: Any, go_id: str, aspect: str | None, label: int) str

Render one labeled training-data TSV row (no trailing newline).

Column order matches TRAINING_TSV_COLUMNS. label is binary (0 / 1). Uses lazy import of _format_optional from _scoring_streaming_helpers (where it canonically lives) to avoid a circular dep with the re-exporting module.

Validation + signal-coverage helpers extracted from scoring_service.

These pure functions own the heavy DB introspection (per-signal NULL counts, temporal NK/LK/PK delta materialisation) for the scoring API. They return raw results; the service-layer wrappers translate them into domain exceptions (SignalCoverageError, EntityNotFoundError) so the helper module stays free of those types.

protea.services._scoring_validation_helpers.build_training_gt_pairs(session: Session, *, prediction_set: PredictionSet, evaluation_set: Any, category: str) set[tuple[str, str]]

Compute the (protein, go_id) ground-truth pair set for training.

The caller (service layer) is responsible for resolving the PredictionSet and EvaluationSet rows and raising EntityNotFoundError for missing ids. evaluation_set is typed as Any so the helper can stay free of the EvaluationSet ORM import (which lives at module level via lazy import in the service).

protea.services._scoring_validation_helpers.compute_missing_signals(session: Session, prediction_set_id: UUID, config_snap: ScoringConfig) list[str]

Return human-readable strings for every required signal absent from the set.

For each signal with a non-zero weight in config_snap.weights (plus evidence_code when the formula is evidence_weighted, where the multiplier is always applied), count how many rows in the PredictionSet have the backing column non-NULL. Empty list means full coverage. The service-layer wrapper raises SignalCoverageError when this list is non-empty.

See also

  • HTTP API: routers that call into these service modules.

  • Infrastructure: ORM models and session utilities used by services.