Core

The protea.core package contains all domain logic. It has no dependency on the infrastructure layer: operations receive an open SQLAlchemy session and an emit callback, but they do not manage connections, queues, or transactions themselves. This strict boundary makes every operation independently testable and trivially substitutable.

Axis tuple

protea.core.axis_tuple defines the AxisTuple named-tuple used to identify a training configuration in the multi-PLM sweep. Each axis carries the PLM identifier, neighbourhood size k, reranker flag, feature family set, evaluation set name, propagation mode, and ensemble strategy. All serialised config keys in Dataset and ExperimentRun rows use the canonical axis-tuple string form derived from this type.

Axis-tuple shortid: thin shim that prefers the protea-contracts helper.

The canonical implementation lives in protea_contracts.axis_tuple (shipped by FARM-EXP.1 companion PR on protea-contracts). PROTEA pins protea-contracts@main in pyproject.toml, so once that PR merges and we re-lock, the local fallback below disappears via the try / except ImportError path.

The local fallback is byte-for-byte identical to the upstream helper so the cross-repo shortid contract holds during the brief window where PROTEA is merged before the contracts release that ships the helper.

Formula (mirrors ExperimentSpec.hash() in protea-reranker-lab/src/protea_reranker_lab/experiment.py:108-111):

sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()[:12]

When the upstream helper is available, this module re-exports it verbatim (same symbol, same digest). Callers should import from here so the swap is invisible:

from protea.core.axis_tuple import axis_tuple_shortid
protea.core.axis_tuple.CANONICAL_AXIS_KEYS: tuple[str, ...] = ('plm', 'k', 'reranker_spec_id', 'feature_schema_sha', 'eval_set_name', 'eval_set_manifest_sha', 'propagation', 'ensemble_spec')

Canonical axis-tuple key set. Falls back to the locally-pinned tuple while the upstream contracts release is being rolled out. Order does NOT affect the digest (json.dumps(..., sort_keys=True)).

protea.core.axis_tuple.SHORTID_HEX_LEN: int = 12

Length of the truncated hex digest. Matches the lab convention.

protea.core.axis_tuple.axis_tuple_shortid(axis_tuple: Mapping[str, Any]) str

Return the canonical 12-hex shortid for an axis tuple.

Parameters:

axis_tuple – mapping of axis name to value. Every value must be JSON-encodable by the standard json module without a custom default. Path / UUID / datetime objects must be pre-stringified by the caller (mirrors the lab path’s pydantic.model_dump(mode="json") step).

Returns:

12 lowercase hex chars ([0-9a-f]{12}).

Raises:

TypeError – when a value in axis_tuple is not JSON-encodable. Surfaced as a hard error so schema drift is caught at the call site rather than producing a silent shortid mismatch.

The function is intentionally pure: same input -> same output, no hidden state, no environment lookup. Order of keys in axis_tuple is irrelevant (json.dumps(..., sort_keys=True) canonicalises).

Domain types

protea.core.domain.aspect defines the Aspect enum used throughout the codebase to identify the three GO namespaces: MFO (Molecular Function), BPO (Biological Process), and CCO (Cellular Component).

GO aspect (namespace) — the three-way partition of the Gene Ontology.

Two encodings circulate in the codebase, both of them load-bearing:

  • Single-char codes ("P" / "F" / "C") — the wire format used by GOTerm.aspect in PostgreSQL and the go-basic.obo file itself. The go_term table CHECK constraint is on these codes.

  • Three-char CAFA codes ("BPO" / "MFO" / "CCO") — the format expected by cafaeval (the upstream Fmax / AuPRC evaluator) and surfaced in the UI for human readers.

Until this module landed, both encodings appeared as bare string literals in 30+ places — a textbook Primitive Obsession smell. The enum is the single source of truth; everything else converts at the boundary.

Typical usage:

from protea.core.domain.aspect import Aspect

# Iterate the three aspects in a stable canonical order
for aspect in Aspect:
    ...

# Convert from a DB row
aspect = Aspect.from_code(row.aspect)

# Hand off to cafaeval
result_dict[aspect.cafa] = ...
class protea.core.domain.aspect.Aspect(*values)

Bases: Enum

Gene Ontology aspect / namespace.

The three GO sub-ontologies. Iteration order is the canonical PROTEA order (P → F → C), matching the historical _ASPECTS = ("P", "F", "C") tuples this enum replaces.

BIOLOGICAL_PROCESS = 'P'
CELLULAR_COMPONENT = 'C'
MOLECULAR_FUNCTION = 'F'
property cafa: str

Three-char CAFA code ("BPO" / "MFO" / "CCO").

Format expected by the upstream cafaeval package and the evaluation results JSON; also the canonical UI label.

property code: str

Single-char code ("P" / "F" / "C").

Wire format in PostgreSQL (go_term.aspect column) and the go-basic.obo file. Use this when reading/writing the DB or comparing against the ORM column.

classmethod from_cafa(cafa: str) Aspect

Build an Aspect from its three-char CAFA code.

classmethod from_code(code: str) Aspect

Build an Aspect from its single-char wire code.

Contracts

The contracts module defines the interfaces that every operation must satisfy and the shared types used across the entire codebase.

Operation is a structural Protocol: any class that exposes a name string and an execute(session, payload, *, emit) method conforms to it, without needing to inherit from a base class. ProteaPayload is the immutable, strictly-typed Pydantic base class for all operation payloads: strict mode prevents silent type coercion, and frozen configuration prevents accidental mutation after validation. OperationResult is the return value of every execute call; its deferred flag tells BaseWorker that completion will be signalled by child workers rather than immediately. RetryLaterError is raised when a shared resource (e.g. the GPU) is occupied; BaseWorker catches it, resets the job to QUEUED, and re-publishes the message after a configurable delay.

class protea.core.contracts.operation.Operation(*args, **kwargs)

Bases: Protocol

Protocol that every domain operation must satisfy.

Operations are pure domain logic: they receive an open SQLAlchemy session and an emit callback for structured event logging, and return an OperationResult. They must not manage sessions or queue connections.

description is a short, static, human-readable explanation of what the operation does in general, surfaced in the jobs UI to give the operations history context.

summarize_payload returns a 1-line dynamic summary of the most informative fields in the payload (e.g. “release 211 ← OBO 2022-07-01”), so each individual job in the history tells its own story. Returning an empty string is acceptable for operations with no useful payload.

description: str
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name: str
summarize_payload(payload: dict[str, Any]) str
class protea.core.contracts.operation.OperationResult(result: dict[str, ~typing.Any]=<factory>, progress_current: int | None = None, progress_total: int | None = None, deferred: bool = False, publish_after_commit: list[tuple[str, ~uuid.UUID]]=<factory>, publish_operations: list[tuple[str, dict[str, ~typing.Any]]]=<factory>)

Bases: object

Return value of every Operation.execute() call.

result is a free-form dict that gets stored in Job.meta and surfaced in the job detail view. progress_current / progress_total are written back to the Job row so the UI can render a progress bar.

deferred: if True, BaseWorker will NOT transition the job to SUCCEEDED. Use this for coordinator operations that delegate work to child jobs; the last child is responsible for marking the parent SUCCEEDED.

publish_after_commit: list of (queue_name, job_id) pairs that BaseWorker will publish to RabbitMQ after the DB commit, guaranteeing workers always find the child job row before they try to claim it.

deferred: bool = False
progress_current: int | None = None
progress_total: int | None = None
publish_after_commit: list[tuple[str, UUID]]
publish_operations: list[tuple[str, dict[str, Any]]]
result: dict[str, Any]
exception protea.core.contracts.operation.RetryLaterError(reason: str, delay_seconds: int = 60)

Bases: Exception

Raised by an operation when it cannot run yet but should be retried.

BaseWorker resets the job to QUEUED and the consumer re-publishes the message after delay_seconds, leaving the GPU free for other work.

protea.core.contracts.operation.make_safe_emit(raw_emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]

Wrap a raw EmitFn so failures are logged and never propagate.

The platform-level emit may fail for transient reasons (DB connection lost mid-operation, JobEvent insert conflict, etc.). Operations should not crash because the audit trail hiccupped: the operation’s primary work matters more than the event row. Failures are logged at ERROR with full traceback so they remain visible in observability without breaking the running job.

OperationRegistry is a simple dict-backed mapping from operation name strings to instances. Workers resolve the correct operation at message dispatch time; new operations are registered at process startup in scripts/worker.py without modifying any worker code.

class protea.core.contracts.registry.OperationRegistry

Bases: object

get(name: str) Operation
register(op: Operation) None

parent_progress exposes the shared _update_parent_progress helper used by every coordinator operation (compute_embeddings, predict_go_terms) to advance the parent job’s progress as child workers finish their batches. Extracted to its own module in F0 (T0.7) to remove duplicated copies across coordinators.

Shared helper for child operations that report progress to a parent job.

Used by store_embeddings and store_predictions: each child batch increments the parent’s progress_current and, if it was the last batch, transitions the parent from RUNNING to SUCCEEDED.

protea.core.contracts.parent_progress.update_parent_progress(session: Session, parent_job_id: UUID, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None], *, event_name: str) None

Atomically increment parent progress; mark SUCCEEDED if last batch.

Returns silently if the parent has not yet seen all its batches (progress_current < progress_total) or if no longer RUNNING.

The event_name is the operation-specific suffix used in the emit call when the parent transitions to SUCCEEDED, e.g. "store_embeddings.parent_succeeded" or "store_predictions.parent_succeeded". The DB-level event row is always written as job.succeeded so downstream consumers see a uniform name regardless of which child closed the parent.

Retry middleware

protea.core.retry exposes with_retry, a wrapper function used by BaseWorker to run the execute session against transient database errors (deadlocks, connection drops, serialisation failures) and brief network blips. Exponential backoff with jitter; all knobs (max_attempts, base_delay, max_delay, jitter_ratio, predicate, on_retry) are bundled in a RetryPolicy frozen dataclass passed via the policy keyword argument (T-CONTEXTS, PR #237). BaseWorker instantiates a fixed policy at call site (RetryPolicy(max_attempts=3, base_delay=1.0, max_delay=10.0, jitter_ratio=0.3)); there is no global TuningSettings field for these values. Added as part of F0 (T0.3) of the master plan revision 3.

Generic retry middleware for transient infrastructure errors.

Used by BaseWorker to survive Postgres deadlocks, serialization failures and brief connection interruptions without marking the job as failed. Application errors (validation, missing data) are NOT retried; only transient infrastructure conditions are.

Example:

from protea.core.retry import RetryPolicy, with_retry

def do_work():
    ...

with_retry(do_work, policy=RetryPolicy(max_attempts=3, base_delay=1.0))
class protea.core.retry.RetryPolicy(max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 30.0, jitter_ratio: float = 0.5, predicate: ~collections.abc.Callable[[BaseException], bool] = <function is_retryable>, on_retry: ~collections.abc.Callable[[int, BaseException, float], None] | None = None)

Bases: object

Bundle of tunable retry knobs consumed by with_retry().

Frozen so a default-constructed policy can sit in a function default safely; callers override individual fields by passing a new RetryPolicy(...) keyword. Fields:

  • max_attempts: total attempts including the first; < 1 is rejected at call time so RetryPolicy(max_attempts=0) does not silently no-op.

  • base_delay / max_delay / jitter_ratio: exponential backoff schedule used between retryable failures.

  • predicate: classifier returning True for retryable exceptions; defaults to is_retryable() (DB + transient network errors).

  • on_retry: optional callback fired before each sleep with (attempt, exc, sleep_seconds). Defaults to a structured WARNING log under protea.retry.

base_delay: float = 1.0
jitter_ratio: float = 0.5
max_attempts: int = 3
max_delay: float = 30.0
on_retry: Callable[[int, BaseException, float], None] | None = None
predicate() bool

Default predicate: retryable DB errors plus transient network errors.

protea.core.retry.is_retryable(exc: BaseException) bool

Default predicate: retryable DB errors plus transient network errors.

protea.core.retry.is_retryable_connection_error(exc: BaseException) bool

Return True if the exception represents a transient network condition.

These are typically raised by pika or low-level socket layers when the broker is briefly unreachable. The publisher retry loop handles these on the publish side; this predicate exists for the consume side.

protea.core.retry.is_retryable_db_error(exc: BaseException) bool

Return True if the exception represents a transient DB condition.

protea.core.retry.with_retry(fn: ~collections.abc.Callable[[~P], ~protea.core.retry.R], *args: ~typing.~P, policy: ~protea.core.retry.RetryPolicy = RetryPolicy(max_attempts=3, base_delay=1.0, max_delay=30.0, jitter_ratio=0.5, predicate=<function is_retryable>, on_retry=None), **kwargs: ~typing.~P) R

Run fn(*args, **kwargs) with exponential backoff and jitter.

The callable is invoked up to policy.max_attempts times. After each retryable failure (per policy.predicate), the loop sleeps for min(policy.base_delay * 2**(attempt-1), policy.max_delay) seconds, jittered by a random factor in [1 - jitter_ratio, 1 + jitter_ratio].

Exceptions that do not match policy.predicate propagate immediately. Once policy.max_attempts retryable failures accumulate, the last exception propagates to the caller.

Operation catalogue

protea.core.operation_catalog builds the singleton OperationRegistry that workers consult at message dispatch. The public function build_operation_registry() instantiates each operation class and registers it under its canonical name. Adding a new operation is a one-line edit here plus a new module under protea/core/operations/.

Plugin discovery

protea.core.plugins centralises importlib.metadata.entry_points discovery for every PROTEA plugin group (protea.backends, protea.sources, protea.runners). discover_plugins(group) returns a cached {name: plugin} map and hard-errors with RuntimeError if a plugin’s name attribute drifts from its entry-point name. reset_plugin_cache is a test-only seam for suites that install/uninstall plugins between cases. Added in T2A.5 for backend dispatch and generalised in T2A.8 (PR #240).

Generic entry-point plugin discovery (T2A.5 + T2A.8).

Both backends (T2A.5) and runners (T2A.8) follow the same shape: a package declares plugins in [tool.poetry.plugins."protea.<group>"] mapping name = "module:plugin", PROTEA discovers them via importlib.metadata.entry_points, and a resolver maps a string identifier to a plugin instance for dispatch.

This module centralises that discovery + cache + name-mismatch hard error so the backend and runner code paths share one implementation instead of forking. See protea.core.runners and the _get_backend_plugins shim in protea.core.operations.compute_embeddings for the call sites.

protea.core.plugins.discover_plugins(group: str) dict[str, Any]

Discover and cache plugins in the given entry_points group.

Returns a dict keyed by plugin.name. Each plugin must declare a name class attribute that matches its entry-point name; mismatches raise RuntimeError at discovery time so the failure surface is a clear “your declaration drifted from your entry_points file” rather than a confusing “Unknown <something>” later on.

Discovery happens once per process per group; subsequent calls return the cached map. The cache key is the group string itself, so the same group cannot be discovered twice (which keeps the import-time side effects stable across reload-heavy test runs).

protea.core.plugins.reset_plugin_cache(group: str | None = None) None

Drop the cached plugin map for one (or every) group.

Test-only seam: lets unit tests force re-discovery without restarting the process. Passing group=None clears every group; passing a specific name clears only that one. Production callers should not invoke this; the cache is intentionally process-local.

Feature registry

protea.core.features.registry is the central registry that maps feature-family names to their producer functions. The ALL_FEATURES constant lists every column the pipeline may populate; adding a column without wiring a producer here causes a T1.8 invariant failure in the dataset-export pipeline.

Concrete in-process FeatureRegistry for PROTEA (T2B.1).

The registry is the single point of truth for which feature columns are computed and which families they belong to. It implements the protea_contracts.FeatureRegistry ABC and, at import time, is populated from the canonical protea_contracts.FEATURE_FAMILIES map so every consumer sees the same column set and family layout the lab was trained against.

Compute callables are intentionally placeholders in this slice (T2B.1). T2B.2 of master plan v3.2 §24 wires the existing per-feature compute helpers (feature_engineering, feature_enricher, anc2vec_embeddings, pca_cache) into CanonicalFeatureRegistry.bind_compute() so parquet_export and _predict_batch can drive feature generation through the registry instead of the open-coded loops they use today.

Calling a placeholder compute raises NotImplementedError with the feature name so a premature integration fails loudly rather than silently emitting empty columns.

A module-level REGISTRY singleton mirrors the cross-repo contract test pinned in tests.test_feature_contract (TestRegistryCoversContracts). The test activates automatically on the presence of this module and pins the registry-vs-contracts invariant.

class protea.core.features.registry.CanonicalFeatureRegistry

Bases: FeatureRegistry

In-memory FeatureRegistry populated from the contracts.

Two membership views co-exist:

  • Per-feature Feature.family scalar (the most specific family, used for metadata / single-tag dispatch).

  • The overlapping protea_contracts.FEATURE_FAMILIES map, returned by families() and consulted by selected(). This is what the lab trains against and is the invariant pinned by tests.test_feature_contract.TestRegistryCoversContracts.

Insertion order is preserved so names() returns features in the order they were registered.

bind_compute(name: str, compute: Any) None

Replace the placeholder compute for name.

T2B.2 calls this once per feature at module import time to swap the _placeholder() raiser for the real compute helper. The feature must already be registered.

families() dict[str, list[str]]

Return the overlapping family map for currently registered features.

Walks FEATURE_FAMILIES so the result mirrors the canonical contracts grouping (knn is the union of knn_distance and knn_vote plus the std feature, etc.). Only features actually registered are included; the map is empty for a fresh registry.

get(name: str) Feature

Return the Feature registered under name.

Raises:

KeyError – if name is not registered.

names() list[str]

All registered feature names, in registration order.

register(feature: Feature) None

Add a feature to the registry. Re-registering an existing name should raise ValueError so a typo never silently shadows a canonical column.

selected(active_families: list[str], drop: list[str] | None = None) list[Feature]

Union the features in every requested family, then apply drop.

Preserves registration order (which mirrors ALL_FEATURES) so the returned list is deterministic across calls. Unknown family names raise KeyError to mirror the lab’s strict family validation.

protea.core.features.registry.REGISTRY: CanonicalFeatureRegistry = <protea.core.features.registry.CanonicalFeatureRegistry object>

Process-wide canonical registry instance. Importing this name activates the cross-repo contract test tests.test_feature_contract.TestRegistryCoversContracts which pins the registry-vs-FEATURE_FAMILIES invariant.

protea.core.features.registry.get_canonical_registry() CanonicalFeatureRegistry

Return the process-wide canonical registry, building it on first use.

The registry is rebuilt only after reset_canonical_registry() (test-only seam). Callers that need to bind real compute helpers (T2B.2) acquire the singleton and call CanonicalFeatureRegistry.bind_compute() per feature.

protea.core.features.registry.reset_canonical_registry() None

Drop the cached canonical registry. Test-only seam.

Schema SHA

protea.core.schema_sha_v2 computes a 16-character deterministic fingerprint of the feature schema version. The hash is embedded in Dataset.schema_sha and RerankerModel.feature_schema_sha; a mismatch at inference time indicates that the booster was trained on a different feature set and must be rejected.

schema_sha_v2 dual-write feature flag (T1.6 of master plan v3).

The protea.infrastructure.orm.models Dataset, RerankerModel, and ExperimentRun rows carry both the legacy schema_sha / feature_schema_sha column and the canonical schema_sha_v2 column added by ADR D10. Until the production cut-over window, write paths only fill the legacy column; once the operator flips PROTEA_SCHEMA_SHA_V2_WRITE_ENABLED=true new rows dual-write both columns.

The flag defaults to false (off) so a normal deploy of this slice is a pure no-op for the data plane. The accompanying alembic migration ships the column and a one-shot backfill; production reads still come off schema_sha until a later slice flips the routers.

Truthy values follow the same convention as protea.api.auth._authn_required() so operators do not learn two different env-flag dialects: 1, true, yes, on (case insensitive). Anything else (including unset) is treated as false.

protea.core.schema_sha_v2.is_dual_write_enabled() bool

Return True when schema_sha_v2 should be written on inserts.

The check is read-fresh each call so tests can flip the value via pytest.MonkeyPatch.setenv() without restarting the process and operators can roll the flag forward without redeploying.

protea.core.schema_sha_v2.maybe_v2(value: str | None) str | None

Pass-through helper for dual-write call sites.

Returns value when the flag is on, None otherwise. Lets write paths stay terse: schema_sha_v2=maybe_v2(legacy_sha) instead of an inline ternary at every insert site.

JSONB dual-write

protea.core.jsonb_dual_write provides helpers for writing structured data to both a typed column and a JSONB fallback column in the same transaction. Used during schema-migration windows where old and new code may run concurrently against the same database.

Env-flag helper for the T3.1 GOPrediction.predictions_jsonb dual-write.

T3.1 scaffolds the prediction-tuple JSONB dual-write: writers continue filling the typed prediction columns (go_term_id, distance, evidence_code) and additionally serialise the same tuple into the predictions_jsonb blob when the environment opt-in is set. This keeps the scaffolding inert in production until a human-coordinated deploy window flips the flag.

Environment variable

PROTEA_GO_PREDICTION_JSONB_WRITE_ENABLED

Truthy values (1, true, yes, on; case-insensitive) enable the dual-write. Anything else (unset, empty, 0, false…) leaves the writer skipping the JSONB column so predictions_jsonb stays NULL on new rows. Default off.

Design notes

The helper API is intentionally tiny so every writer site can opt in with a single import + one-line call:

from protea.core.jsonb_dual_write import maybe_jsonb

row["predictions_jsonb"] = maybe_jsonb(
    [(pred["go_term_id"], pred["distance"], pred.get("evidence_code"))]
)

maybe_jsonb returns None when the flag is off (so the row dict carries an explicit NULL for SQL) and a compact dict when on. The compact shape is a single-level dict with two keys:

  • predictions: a list of {"go_term_id": int, "score": float, "evidence": str | None} records, preserving caller order.

  • count: len(predictions), redundant but useful for cheap GIN filtering and sanity checks (WHERE predictions_jsonb @> '{"count": 5}' etc.).

The list shape lets a single row carry one or more prediction tuples without re-shaping the helper at the T3.2 backfill / T3.3 reader cut-over.

protea.core.jsonb_dual_write.JSONB_DUAL_WRITE_ENV: str = 'PROTEA_GO_PREDICTION_JSONB_WRITE_ENABLED'

Environment variable name; exported for tests / docs.

protea.core.jsonb_dual_write.is_jsonb_dual_write_enabled(env: dict[str, str] | None = None) bool

Return True iff the env opt-in is set to a truthy value.

env defaults to os.environ; the explicit parameter is a test hook so callers can verify the helper without mutating process state.

protea.core.jsonb_dual_write.maybe_jsonb(predictions: Sequence[tuple[int, float, str | None]], *, env: dict[str, str] | None = None) dict[str, Any] | None

Serialise a list of (go_term_id, score, evidence) tuples.

Returns None when the dual-write env opt-in is off so the caller can assign the result directly to a row dict / ORM attribute without an explicit guard at every writer site.

When the flag is on, returns the compact JSONB shape documented in the module docstring. Empty input still returns a well-formed dict ({"predictions": [], "count": 0}) so downstream consumers can rely on the keys being present.

Experiment runners

protea.core.runners adapts the generic plugin discovery to the protea.runners group. resolve_runner(name) maps an identifier ("knn" / "baseline" / "lightgbm") to a runner plugin instance implementing the protea_contracts.ExperimentRunner interface; unknown names raise ValueError listing the discovered set. PROTEA does not yet dispatch to runners at inference time (the active KNN + reranker path stays in PredictGOTermsBatchOperation until F2C of master plan revision 3 hoists the inference core into a shared package). The adapter exists so GET /v1/runners has a stable resolver and future code has a one-line entry. Closes T2A.8 (PR #240).

Experiment-runner plugin adapter (T2A.8).

Mirrors the backend-plugin adapter in protea.core.operations.compute_embeddings: discovers runners via importlib.metadata.entry_points("protea.runners") and exposes a resolve_runner() lookup that maps a string identifier (e.g. "knn" / "baseline" / "lightgbm") to a runner plugin instance implementing protea_contracts.ExperimentRunner.

The protea-runners package declares the canonical plugin set:

  • knn: KNN-only baseline (no reranker), evaluates against a frozen dataset for ablation.

  • baseline: predict-by-frequency baseline (most-frequent GO terms in the training set).

  • lightgbm: the v9 / v18 LightGBM reranker, currently trained out-of-tree in protea-reranker-lab.

PROTEA does not yet dispatch to runners at inference time (the active KNN + reranker path lives in PredictGOTermsBatchOperation and stays there until F2C of master plan v3 hoists the inference core into a package both PROTEA and protea-runners can depend on). The adapter exists today so:

  1. The plugin registry endpoint (GET /v1/registry/runners) has a stable resolver under the hood (currently still using its own _discover for the API shape; that path is untouched here).

  2. Future code that wants a runner instance has a one-line entry: from protea.core.runners import resolve_runner.

  3. The discovery + name-mismatch hard error follows the same pattern as backends, so the lab + plugin authoring workflow is uniform across both groups.

protea.core.runners.get_runner_plugins() dict[str, Any]

Return the discovered runner plugins keyed by plugin.name.

Wraps protea.core.plugins.discover_plugins() so the runner callers do not need to know the entry-point group string; the constant lives here next to the consumers.

protea.core.runners.resolve_runner(name: str) Any

Resolve a runner identifier to a plugin instance.

Raises ValueError listing the discovered runners when the identifier is unknown, so the failure message is actionable. The returned object implements protea_contracts.ExperimentRunner.

Utilities

protea.core.utils provides two shared utilities used across multiple operations.

utcnow() returns a timezone-aware UTC datetime, avoiding the common mistake of calling datetime.utcnow() which returns a naive object. chunks(seq, n) splits any sequence into fixed-size chunks, used by coordinator operations to partition work into batches.

The previous UniProtHttpMixin (exponential backoff with jitter, Retry-After header parsing, cursor extraction for paginated UniProt REST endpoints) was inlined into InsertProteinsOperation and FetchUniProtMetadataOperation when those operations were rewritten; the retry/backoff/cursor logic now lives directly in each operation.

protea.core.utils.chunks(seq: Sequence[Any], n: int) Iterable[Sequence[Any]]

Yield successive n-sized chunks from seq.

protea.core.utils.utcnow() datetime

Return the current UTC datetime (timezone-aware).

Feature engineering

protea.core.feature_engineering enriches each query–reference pair in a prediction result with sequence-level and phylogenetic signals. These features are opt-in: they are computed only when compute_alignments=true and/or compute_taxonomy=true are set in the prediction payload.

Pairwise alignment is computed via the parasail library using the BLOSUM62 substitution matrix with gap-open/extend penalties of 10/1. Both global (Needleman–Wunsch) and local (Smith–Waterman) alignments are run for each pair, producing identity, similarity, raw score, gap percentage, and alignment length for each. These metrics capture sequence similarity beyond what the embedding distance alone encodes, which is especially valuable for distant homologues where embedding geometry may be unreliable.

Taxonomic distance is computed via ete3 and the NCBI taxonomy tree (local SQLite, downloaded on first use). For each (query, reference) pair where taxonomy IDs are available from UniProt metadata, PROTEA finds the lowest common ancestor and computes the edge count through it. Results are cached with an LRU cache keyed by taxon-ID pair to avoid redundant tree traversals across a batch.

Feature engineering utilities for functional annotation enrichment.

Provides pairwise alignment metrics (Needleman–Wunsch and Smith–Waterman) via parasail and taxonomic distance computation via ete3 NCBITaxa.

These features complement the embedding-space KNN distance stored in GOPrediction.distance with sequence-level and phylogenetic signals.

Performance notes:

  • Alignment is O(m*n) per pair; parasail uses SIMD acceleration.

  • Taxonomy lookups use an LRU cache over lineage queries (ete3 local SQLite). First call may trigger a DB download if the ete3 database is absent.

protea.core.feature_engineering.compute_alignment(seq1: str, seq2: str) dict[str, Any]

Compute both NW and SW alignment metrics in one call.

protea.core.feature_engineering.compute_nw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]

Global alignment (Needleman–Wunsch) via parasail/BLOSUM62.

Returns a dict with keys:

identity_nw, similarity_nw, alignment_score_nw, gaps_pct_nw, alignment_length_nw, length_query, length_ref

protea.core.feature_engineering.compute_sw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]

Local alignment (Smith–Waterman) via parasail/BLOSUM62.

Returns a dict with keys:

identity_sw, similarity_sw, alignment_score_sw, gaps_pct_sw, alignment_length_sw

protea.core.feature_engineering.compute_taxonomy(t1_raw: Any, t2_raw: Any) dict[str, Any]

Compute taxonomic distance between two NCBI taxonomy IDs.

Returns a dict with keys:

taxonomic_lca, taxonomic_distance, taxonomic_common_ancestors, taxonomic_relation

protea.core.feature_engineering.warmup_taxonomy_db() None

Pre-initialize the NCBITaxa database.

Call at worker startup so the download (~100 MB on first run) happens before any batch is processed, not mid-flight.

Re-ranker

protea.core.reranker implements a LightGBM binary classifier that re-scores GO term predictions using 20 numeric features (embedding distance, NW/SW alignment metrics, sequence lengths, taxonomic distance and common ancestors, and 5 aggregate re-ranker signals) plus 3 categorical features (qualifier, evidence code, taxonomic relation). The full feature list is documented in train_reranker.

The module provides:

  • prepare_dataset(df): extracts and coerces feature columns. Numeric columns are coerced with errors="coerce" (invalid strings become NaN); categorical columns are converted to pandas category dtype, which LightGBM consumes directly without manual label encoding.

  • train(df): stratified positive/negative split with early-stopping on a held-out validation set (default 20 %). Returns a TrainResult with the Booster, validation metrics (AUC, logloss, precision, recall, F1 at the 0.5 threshold), the best boosting iteration, and gain-based feature importance.

  • predict(model, df): returns probability scores in [0, 1].

  • model_to_string() / model_from_string(): serialization for DB storage in the RerankerModel table.

  • load_training_tsv(): parses a training data TSV as produced by the /scoring/prediction-sets/{id}/training-data.tsv endpoint.

Note

load_reranker / apply_reranker / infer_active_feature_families were originally split into a sibling protea.core.reranking module; they were folded back into protea.core.reranker to remove a naming trap (reranker vs reranking were impossible to grep apart). This module is now the single inference-side surface.

Parquet export (protea.core.parquet_export)

protea.core.parquet_export consolidates per-split, per-category parquet shards produced by the KNN + feature pipeline into the frozen dataset layout consumed by protea-reranker-lab: exactly train.parquet, eval.parquet and manifest.json under a single directory. The manifest follows ManifestV1 (schema version 2) and records PROTEA’s producer_version + producer_git_sha.

The single public function export_reranker_parquets(...) is shared by two callers:

  • training_dump_helpers._dump_frozen_dataset: thin wrapper that uses this helper to emit the dataset alongside a training-data dump.

  • ExportResearchDatasetOperation: stand-alone operation that only materialises and publishes the dataset, without running LightGBM.

When store is provided, the three consolidated files are additionally uploaded under key_prefix using the ArtifactStore interface, and the returned dict includes train_uri / eval_uri / manifest_uri.

Scoring

protea.core.scoring implements the scoring engine that applies weighted formulas to GO predictions. A ScoringConfig defines a set of weights for each feature column (embedding distance, alignment metrics, taxonomy, re-ranker features). The engine computes a composite score per prediction row and can stream scored results as TSV or compute CAFA-style metrics (Fmax, AUC-PR) against an evaluation set.

Scoring engine for GOPrediction rows.

Applies a ScoringConfig formula to raw prediction signals and returns a normalised [0, 1] confidence score.

The engine is intentionally stateless: every call to compute_score() is self-contained, which means any ScoringConfig can be applied to an existing PredictionSet at any time without re-running the KNN search.

Evidence-code weights

Evidence code quality is resolved through a two-level lookup:

  1. If config.evidence_weights is not None, that dict is checked first.

  2. For codes absent from the override (or when no override exists), the module- level DEFAULT_EVIDENCE_WEIGHTS table is used.

  3. Codes unknown to both tables fall back to DEFAULT_EVIDENCE_WEIGHT_FALLBACK (0.5).

This means a ScoringConfig may carry a partial override — e.g. zeroing the IEA weight for an experiment-only study — without having to redeclare every other code. The resolution order ensures backwards compatibility: configs stored without evidence_weights behave identically to older configs.

protea.core.scoring.compute_score(pred: dict[str, Any], config: ScoringConfig) float

Compute a [0, 1] confidence score for a single GOPrediction dict.

Signals are normalised to [0, 1] then weighted-averaged. Signals whose value is None (feature-engineering flag off at predict time) drop from both numerator and denominator. Returns round(base_score, 6); FORMULA_EVIDENCE_WEIGHTED multiplies the average by the resolved evidence weight even when the evidence_weight signal carries weight 0 — so IEA / ND annotations get down-ranked regardless of feature configuration.

protea.core.scoring.evidence_weight(code: str | None, *, overrides: dict[str, float] | None = None) float

Resolve the [0, 1] quality weight for a GO evidence code or ECO ID.

Resolution order

  1. Normalise code from ECO ID to GO code via ECO_TO_CODE if needed.

  2. Look up the normalised code in overrides (if provided).

  3. Fall back to DEFAULT_EVIDENCE_WEIGHTS.

  4. If still not found, return DEFAULT_EVIDENCE_WEIGHT_FALLBACK.

param code:

A GO evidence code (e.g. "IEA") or an ECO URI (e.g. "ECO:0000501"). None returns the fallback weight.

param overrides:

Optional per-config evidence weight table. May be a partial dict; codes not present here are resolved via DEFAULT_EVIDENCE_WEIGHTS.

rtype:

float in [0, 1].

protea.core.scoring.propagate_ground_truth_to_ancestors(ground_truth: dict[str, set[str]], parent_map: dict[str, set[str]]) dict[str, set[str]]

Expand each protein’s GO set with every ancestor in parent_map.

Returns a new dict; the input is not modified. Pair with propagate_scores_to_ancestors() when the downstream metric treats predictions and ground truth symmetrically (e.g. cafaeval-style Fmax).

protea.core.scoring.propagate_scores_to_ancestors(scored_predictions: list[dict[str, Any]], parent_map: dict[str, set[str]]) list[dict[str, Any]]

Max-propagate GO scores to ancestors per protein (True Path Rule).

For every (protein, go_id, score) row, ensures every ancestor a of go_id in parent_map also has score ≥ that row’s score for the same protein. The final score of an ancestor is the max over all its descendants predicted for the protein (including itself if predicted).

This mirrors cafaeval’s behaviour: annotating a child implies every ancestor, so predictions must propagate upward before PR metrics. parent_map maps a child go_id (string) to the set of its direct parent go_id strings — typically the union of is_a + part_of edges for the relevant ontology snapshot.

Returns a new list; the input is not modified. Rows that share a (protein, go_id) collapse to a single row keyed by the max score.

protea.core.scoring.score_predictions(predictions: list[dict[str, Any]], config: ScoringConfig) list[dict[str, Any]]

Add a score key to each prediction dict and return them sorted descending.

Parameters:
  • predictions – List of raw prediction dicts (same format as accepted by compute_score()).

  • config – The ScoringConfig to apply.

Returns:

  • A new list with a score key added to each item, sorted by score in

  • descending order. The original list is not modified.

Metrics

protea.core.metrics implements CAFA-style precision-recall evaluation. Provides functions for computing Fmax (maximum F-measure over all thresholds), weighted precision/recall, and coverage for a set of predictions against ground-truth annotations.

CAFA-style precision-recall metrics for GO term prediction evaluation.

Takes scored GOPrediction rows and EvaluationData (ground truth) and computes Fmax, AUC-PR, and the full precision-recall curve following the CAFA protocol.

CAFA protocol summary

  • Evaluate only on proteins present in the ground truth (NK, LK, or PK).

  • At each score threshold t:

    precision(t) = mean over proteins-with-predictions of
                   card(pred & true) / card(pred)
    recall(t)    = mean over ALL ground-truth proteins of
                   card(pred & true) / card(true)
    
  • Fmax = max_t(2 * P(t) * R(t) / (P(t) + R(t)))

  • AUC-PR via trapezoidal integration of the PR curve.

Note: This implementation uses exact GO term matching (no DAG propagation). Ancestor propagation is intentionally left for a future iteration.

class protea.core.metrics.CAFAMetrics(category: str, fmax: float, threshold_at_fmax: float, auc_pr: float, n_ground_truth_proteins: int, n_predicted_proteins: int, n_predictions: int, curve: list[PRPoint] = <factory>)

Bases: object

CAFA evaluation results for one (PredictionSet, ScoringConfig, category) triple.

auc_pr: float
category: str
curve: list[PRPoint]
fmax: float
n_ground_truth_proteins: int
n_predicted_proteins: int
n_predictions: int
summary() dict[str, Any]
threshold_at_fmax: float
class protea.core.metrics.PRPoint(threshold: 'float', precision: 'float', recall: 'float', f1: 'float')

Bases: object

f1: float
precision: float
recall: float
threshold: float
protea.core.metrics.compute_cafa_metrics(scored_predictions: list[dict[str, Any]], evaluation_data: EvaluationData, category: str = 'nk') CAFAMetrics

Compute CAFA Fmax and PR curve.

scored_predictions must carry protein_accession, go_id, score per row. category selects nk / lk / pk from the evaluation ground truth.

Evidence codes

protea.core.evidence_codes provides mappings between ECO (Evidence and Conclusion Ontology) identifiers and GO evidence codes used in GAF files. Used by the QuickGO annotation loader to resolve ECO IDs to standard three-letter evidence codes.

GO evidence code utilities.

Provides normalisation from ECO IDs to canonical GO evidence codes and classification of codes as experimental vs. non-experimental.

Mapping source (canonical ECO → GO code):

https://github.com/evidenceontology/evidenceontology/blob/master/gaf-eco-mapping.txt

Additional ECO IDs used by UniProt GAF (not in the canonical mapping file) are resolved here using the EBI QuickGO ECO term definitions; all are “used in automatic assertion” and therefore map to IEA.

protea.core.evidence_codes.is_experimental(code: str) bool

Return True if code (GO or ECO) represents experimental evidence.

Examples:

is_experimental("IDA")          # True
is_experimental("ECO:0000314")  # True  (IDA)
is_experimental("IEA")          # False
is_experimental("ECO:0000501")  # False (IEA)
protea.core.evidence_codes.normalize(code: str) str

Return the canonical GO evidence code for code.

If code is already a GO root evidence code it is returned as-is. ECO IDs are translated via ECO_TO_CODE. Unknown codes are returned unchanged so that no information is silently discarded.

Examples:

normalize("IDA")           # → "IDA"
normalize("ECO:0000314")   # → "IDA"
normalize("ECO:0000256")   # → "IEA"
normalize("UNKNOWN")       # → "UNKNOWN"

Evaluation

protea.core.evaluation implements the CAFA5 evaluation protocol for computing the ground-truth delta between two annotation snapshots.

The module’s central data structure is EvaluationData, a frozen dataclass that holds the NK, LK, PK, known, and pk_known annotation dictionaries. Each dictionary maps a protein accession to a set of GO term IDs.

EvaluationData fields:

  • nk: delta annotations for No-Knowledge proteins (no prior annotations in any namespace at t0).

  • lk: delta annotations for Limited-Knowledge proteins (had annotations in some namespaces but gained new terms in a previously empty namespace).

  • pk: novel annotations for Partial-Knowledge proteins (gained new terms in a namespace where they already had annotations).

  • pk_known: old experimental annotations for PK proteins in the relevant namespaces; passed as -known to cafaeval to exclude them from scoring.

  • known: all old experimental annotations flattened across namespaces; available for download via the reference endpoint.

The public entry point is compute_evaluation_data(session, old_annotation_set_id, new_annotation_set_id, ontology_snapshot_id). It loads the GO DAG for NOT-propagation, builds a per-namespace annotation map for both old and new sets, and classifies each (protein, namespace) pair into NK, LK, or PK. The same protein can appear in multiple categories across different namespaces simultaneously (e.g., LK in CCO and PK in BPO).

CAFA-style evaluation data computation.

This module computes the ground-truth delta between two AnnotationSets (old → new) following the official CAFA5 evaluation protocol:

  1. Experimental evidence codes only (EXP, IDA, IMP, …)

  2. NOT-qualifier annotations are excluded — including their GO descendants propagated transitively through the is_a / part_of DAG.

  3. Classification is per (protein, namespace), not globally per protein:

    NK — protein had NO experimental annotations in ANY namespace at t0.

    All novel terms across all namespaces are ground truth.

    LK — protein had annotations in SOME namespaces at t0, but NOT in

    namespace S. Novel terms in S are ground truth for LK.

    PK — protein had annotations in namespace S at t0 AND gained new terms

    in S at t1. Novel terms in S are ground truth for PK; old terms in S are the -known file for the CAFA evaluator.

Note: the same protein can be LK in one namespace and PK in another simultaneously (e.g. had MFO+BPO at t0, gains CCO → LK in CCO, gains new BPO → PK in BPO).

When the two annotation sets use different OntologySnapshots, compute_evaluation_data_reconciled implements the CAFA reconciliation protocol: propagate ancestors under each side’s native DAG, intersect with a frozen pivot snapshot, then re-propagate is handled by downstream cafaeval (idempotent under closure semantics). Reference implementation: anphan0828/democafa_package, utils/ontology.filter_terms_given_obo.

Output format (matching CAFA evaluator): 2-column TSV, no header.

protein_accession t go_id

class protea.core.evaluation.EvalContext(old_annotation_set_id: uuid.UUID, new_annotation_set_id: uuid.UUID, ontology_snapshot_id: uuid.UUID)

Bases: NamedTuple

The (old, new, snapshot) triple that identifies one evaluation delta.

Bundles the three IDs that travel together through the metrics endpoints (/scoring/prediction-sets/{id}/metrics) and the compute_evaluation_data() / _reconciled helpers, keeping downstream signatures under the master-plan §3 6-param ceiling.

new_annotation_set_id: UUID

Alias for field number 1

old_annotation_set_id: UUID

Alias for field number 0

ontology_snapshot_id: UUID

Alias for field number 2

class protea.core.evaluation.EvaluationData(nk: dict[str, set[str]]=<factory>, lk: dict[str, set[str]]=<factory>, pk: dict[str, set[str]]=<factory>, known: dict[str, set[str]]=<factory>, pk_known: dict[str, set[str]]=<factory>)

Bases: object

Computed ground-truth delta between two annotation sets.

property delta_proteins: int
known: dict[str, set[str]]
property known_terms_count: int
lk: dict[str, set[str]]
property lk_annotations: int
property lk_proteins: int
nk: dict[str, set[str]]
property nk_annotations: int
property nk_proteins: int
pk: dict[str, set[str]]
property pk_annotations: int
pk_known: dict[str, set[str]]
property pk_proteins: int
stats() dict
protea.core.evaluation.compute_evaluation_data(session: Session, old_annotation_set_id: UUID, new_annotation_set_id: UUID, ontology_snapshot_id: UUID) EvaluationData

Compute NK/LK/PK ground truth following the CAFA5 protocol.

Classification is per (protein, namespace):

  • NK — protein had no experimental annotations in any namespace at t0.

  • LK — protein had annotations in some namespaces at t0, but not in namespace S; gained new terms in S → those terms are LK ground truth.

  • PK — protein had annotations in namespace S at t0 and gained new terms in S → those novel terms are PK ground truth; old terms in S are stored in pk_known for the cafaeval -known flag.

The same protein can be simultaneously LK in one namespace and PK in another.

protea.core.evaluation.compute_evaluation_data_reconciled(session: Session, old_annotation_set_id: UUID, new_annotation_set_id: UUID, old_native_snapshot_id: UUID, new_native_snapshot_id: UUID, pivot_snapshot_id: UUID) EvaluationData

CAFA-compliant evaluation delta across mismatched ontology snapshots.

Applies the democafa filter_terms_given_obo protocol per side:

  1. Load experimental annotations under each set’s native snapshot.

  2. Propagate ancestors under the native DAG (True Path Rule).

  3. Intersect with the pivot snapshot’s term universe.

Step 4 (re-propagate under pivot) is handled by cafaeval downstream, which applies ancestor propagation before scoring — prop(prop(x)) == prop(x).

NOT-qualifier exclusion preserves PROTEA’s True Path Rule contrapositive: NOT terms are propagated to descendants under the native DAG, intersected with pivot, then further propagated under the pivot DAG. The union across both annotation sets is applied to both sides, matching same-snapshot behaviour.

protea.core.evaluation.deserialize_evaluation_data_from_bytes(blob: bytes) EvaluationData

Parse parquet bytes (as returned by ArtifactStore.get) into EvaluationData.

protea.core.evaluation.groundtruth_key_for(eval_set_id) str

Storage key under which an EvaluationSet’s ground-truth parquet lives.

protea.core.evaluation.load_evaluation_data_for_set(session: Session, eval_set) tuple[EvaluationData, UUID]

Load ground-truth for an EvaluationSet row.

Strict reuse path: if eval_set.groundtruth_uri is set, deserializes the persisted parquet via the configured ArtifactStore and returns it. If not set, raises — recomputation on-the-fly is intentionally not allowed (see the project’s “no on-the-fly reuse” rule). Use scripts/backfill_evaluation_groundtruth.py to materialize artifacts for legacy EvaluationSet rows that predate this column.

Returns the EvaluationData plus the pivot OntologySnapshot ID — the caller should use the pivot snapshot (not the old set’s) when loading the OBO for cafaeval, since propagated go_ids live in pivot term space.

protea.core.evaluation.serialize_evaluation_data_to_parquet(data: EvaluationData, dest: Path) Path

Write data to a parquet file at dest (creates parent dirs).

Returns dest for convenience. Uses snappy compression and the long layout produced by _eval_data_to_dataframe — kept stable since consumers parse it back with deserialize_evaluation_data_from_bytes.

Provenance

protea.core.provenance provides capture_provenance(extra=None), a side-effect-free runtime snapshot for jobs / experiments / artefacts to carry an audit trail without DB or network probes. Returns a fresh dict[str, Any] with auto-keys protea_version (from importlib.metadata), protea_git_sha (delegates to parquet_export.resolve_protea_git_sha), python_version, platform, hostname, and captured_at (ISO-8601 UTC). Any caller-supplied extra mapping is overlaid last, so callers always win on key collisions.

Every probe is wrapped: missing distribution metadata, a non-git checkout, or an absent git binary all degrade to None rather than raising. Added in T3.11 of master plan v3.2 §24 Fase 4.

Operations

PROTEA ships a curated set of registered operation instances at worker startup via protea.core.operation_catalog.build_operation_registry, which is the authoritative list (read the function body for the live count). The catalog splits into job-backed entries (reachable through POST /jobs) and ephemeral consumers (dispatched internally by the compute_embeddings and predict_go_terms coordinators; see Operations for that taxonomy). Each operation is a class that implements the Operation protocol: a name string and an execute method. Operations are stateless with respect to infrastructure: they receive a session and emit structured events, but do not open connections or manage transactions. The job-backed entries are documented below; the four ephemeral siblings (compute_embeddings_batch, store_embeddings, predict_go_terms_batch, store_predictions) live in Operations.

ping

Smoke-test operation. Returns immediately with a success result. Used to verify end-to-end connectivity between the API, RabbitMQ, and worker processes.

class protea.core.operations.ping.PingOperation(*args, **kwargs)

Bases: Operation

description: str = 'Smoke-test operation that emits two events and returns immediately.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name: str = 'ping'
summarize_payload(payload: dict[str, Any]) str
insert_proteins

Fetches protein sequences from the UniProt REST API using cursor-based FASTA streaming. Sequences are deduplicated by MD5 hash before upsert; proteins are upserted by accession. Exponential backoff with jitter and Retry-After header handling are implemented inline in the operation. Isoforms are parsed and stored separately, sharing the canonical sequence where the amino-acid string is identical.

class protea.core.operations.insert_proteins.InsertProteinsOperation

Bases: Operation

Fetches protein sequences from UniProt (FASTA) and upserts them into the DB.

Uses cursor-based pagination, exponential backoff with jitter, and MD5-based sequence deduplication. Many proteins can share one Sequence row. Isoforms (<canonical>-<n>) are stored as separate Protein rows grouped by canonical_accession.

description: str = 'Fetch protein sequences from UniProt (FASTA, cursor-paginated) and upsert Protein + Sequence rows; isoforms are stored grouped by canonical accession.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name: str = 'insert_proteins'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.insert_proteins.InsertProteinsPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, include_isoforms: bool = True, compressed: bool = False, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/insert_proteins (contact: you@example.org)')

Bases: ProteaPayload

backoff_base_seconds: NonNegativeFloat
backoff_max_seconds: NonNegativeFloat
compressed: bool
include_isoforms: bool
jitter_seconds: NonNegativeFloat
max_retries: PositiveInt
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
page_size: PositiveInt
search_criteria: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
user_agent: str
fetch_uniprot_metadata

Downloads TSV functional annotation data from UniProt and upserts ProteinUniProtMetadata rows keyed by canonical accession. Fields include functional description, EC numbers, pathway membership, and kinetics. Isoforms inherit metadata through the canonical_accession join; no duplicate rows are created.

class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataOperation

Bases: object

Fetches functional annotations from UniProt (TSV) and upserts ProteinUniProtMetadata rows.

One metadata row is stored per canonical accession. Isoforms share the same metadata record. Optionally updates core Protein fields (reviewed, organism, gene_name, length) if they are missing. Uses the same cursor-based pagination and backoff strategy as InsertProteinsOperation.

FIELD_MAP: dict[str, str] = {'absorption': 'Absorption', 'active_site': 'Active site', 'activity_regulation': 'Activity regulation', 'binding_site': 'Binding site', 'catalytic_activity': 'Catalytic activity', 'cofactor': 'Cofactor', 'dna_binding': 'DNA binding', 'ec_number': 'EC number', 'features': 'Features', 'function_cc': 'Function [CC]', 'keywords': 'Keywords', 'kinetics': 'Kinetics', 'pathway': 'Pathway', 'ph_dependence': 'pH dependence', 'redox_potential': 'Redox potential', 'rhea_id': 'Rhea ID', 'site': 'Site', 'temperature_dependence': 'Temperature dependence'}
UNIPROT_FIELDS: list[str] = ['accession', 'reviewed', 'id', 'protein_name', 'gene_names', 'organism_name', 'length', 'absorption', 'ft_act_site', 'ft_binding', 'cc_catalytic_activity', 'cc_cofactor', 'ft_dna_bind', 'ec', 'cc_activity_regulation', 'cc_function', 'cc_pathway', 'kinetics', 'ph_dependence', 'redox_potential', 'rhea', 'ft_site', 'temp_dependence', 'keyword', 'feature_count']
description = 'Fetch functional annotations (TSV) from UniProt and upsert ProteinUniProtMetadata rows keyed by canonical accession.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'fetch_uniprot_metadata'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, compressed: bool = True, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/fetch_uniprot_metadata (contact: you@example.org)', commit_every_page: bool = True, update_protein_core: bool = True)

Bases: ProteaPayload

backoff_base_seconds: NonNegativeFloat
backoff_max_seconds: NonNegativeFloat
commit_every_page: bool
compressed: bool
jitter_seconds: NonNegativeFloat
max_retries: PositiveInt
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
page_size: PositiveInt
search_criteria: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
update_protein_core: bool
user_agent: str
load_ontology_snapshot

Downloads a GO OBO file and populates OntologySnapshot, GOTerm, and GOTermRelationship rows. The obo_version field carries a unique constraint so that re-importing the same release is idempotent. If a snapshot already exists but its relationships are missing, they are backfilled automatically.

class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotOperation

Bases: object

Downloads a go.obo file and upserts an OntologySnapshot + GOTerm rows.

The data-version: header of the OBO file is used as the canonical version identifier (e.g. releases/2024-01-17). If a snapshot with that version already exists, the operation is a no-op and returns the existing snapshot id, making it safe to re-run.

GO term aspect is mapped from the OBO namespace field: biological_process → P, molecular_function → F, cellular_component → C.

description = 'Download a GO OBO file and persist it as an OntologySnapshot with its GOTerm + GOTermRelationship rows; idempotent on the OBO data-version.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'load_ontology_snapshot'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotPayload(*, obo_url: str, timeout_seconds: int = 120, force_relationships: bool = False)

Bases: ProteaPayload

force_relationships: bool
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
obo_url: str
timeout_seconds: int
load_goa_annotations

Bulk-loads a GAF (Gene Association Format) file. Annotations are filtered against canonical accessions present in the database, avoiding orphaned foreign keys. Each batch is committed independently to bound transaction size.

class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsOperation

Bases: object

Streams a GOA GAF file (gzip or plain) and upserts ProteinGOAnnotation rows.

The GAF file is streamed line by line from gaf_url; it is never fully loaded into memory, making it suitable for the full UniProt GAF (hundreds of millions of lines).

Only accessions present in the protein table are stored; all others are silently skipped. The canonical accession set is loaded once from the DB at the start of the operation.

GAF 2.2 columns used (1-indexed, tab-separated):

2 → DB_Object_ID (accession) 5 → GO ID 4 → Qualifier 7 → Evidence Code 6 → DB:Reference 8 → With/From 15 → Assigned By 14 → Date (YYYYMMDD)

description = 'Stream a UniProt GOA GAF release line by line and bulk-insert ProteinGOAnnotation rows for accessions already present in the DB.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'load_goa_annotations'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsPayload(*, ontology_snapshot_id: str, gaf_url: str, source_version: str, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None)

Bases: ProteaPayload

commit_every_page: bool
gaf_url: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
ontology_snapshot_id: str
page_size: PositiveInt
source_version: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
load_quickgo_annotations

Streams GO annotations from the QuickGO bulk download API (paginated TSV). Supports optional ECO→GO evidence code mapping and per-page commits. Filters out annotations whose accessions are not already in the database.

class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsOperation

Bases: object

Streams GO annotations from the QuickGO bulk download API.

Proteins to annotate are determined by the canonical accessions already present in the DB; no external FASTA or accession list is needed.

The QuickGO TSV columns used:

GENE PRODUCT ID → protein accession GO TERM → GO identifier QUALIFIER → qualifier (enables, involved_in…) ECO ID → mapped to evidence_code via eco_mapping_url (or stored raw) REFERENCE → db_reference WITH/FROM → with_from ASSIGNED BY → assigned_by DATE → annotation_date

description = "Stream GO annotations from QuickGO's bulk download endpoint and insert ProteinGOAnnotation rows for accessions already present in the DB."
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'load_quickgo_annotations'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsPayload(*, ontology_snapshot_id: str, source_version: str, quickgo_base_url: str = 'https://www.ebi.ac.uk/QuickGO/services/annotation/downloadSearch', gene_product_ids: list[str] | None = None, use_db_accessions: bool = True, eco_mapping_url: str | None = None, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, gene_product_batch_size: Annotated[int, Gt(gt=0)] = 200)

Bases: ProteaPayload

Payload for loading GO annotations from the QuickGO bulk download endpoint.

QuickGO returns a single streamed TSV filtered by the canonical accessions already present in the DB; no external accession list is needed.

eco_mapping_url (optional) points to a GAF-ECO mapping file (space-separated: ECO:XXXXXXX  CODE). When provided, ECO IDs are resolved to GO evidence codes (IDA, IEA…) before insertion. If omitted, the raw ECO ID is stored as-is in evidence_code.

commit_every_page: bool
eco_mapping_url: str | None
gene_product_batch_size: PositiveInt
gene_product_ids: list[str] | None
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
ontology_snapshot_id: str
page_size: PositiveInt
quickgo_base_url: str
source_version: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
use_db_accessions: bool
compute_embeddings

Coordinator operation that partitions the target sequence set into batches and dispatches one compute_embeddings_batch message per batch to protea.embeddings.batch. The coordinator serialises on the protea.embeddings queue (one at a time) to prevent concurrent model loads from exhausting GPU memory. Batch and write workers scale independently. Returns deferred=True; the parent job is closed by the last write worker.

predict_go_terms

Coordinator operation that loads reference embeddings into a process-level float16 cache, partitions the query set into batches, and dispatches one predict_go_terms_batch message per batch to protea.predictions.batch. Feature engineering (alignments, taxonomy) is opt-in via payload flags. Returns deferred=True; the parent job is closed by the last write worker.

generate_evaluation_set

Computes the NK/LK/PK evaluation delta between two annotation sets using the CAFA5 protocol (experimental evidence only, NOT-propagation through the GO DAG, per-namespace classification). Stores an EvaluationSet row with summary statistics. Ground-truth files are generated on-demand by the download endpoints.

class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetOperation

Bases: object

Computes the CAFA evaluation delta between two GOA annotation sets.

Applies experimental evidence code filtering, NOT-qualifier exclusion with GO DAG descendant propagation, and classifies delta proteins into NK/LK.

Stores an EvaluationSet row with summary statistics. The actual ground-truth rows are computed on-demand by the download endpoints using the same logic.

description = 'Compute the CAFA delta between an old and a new GOA annotation set, split delta proteins into NK/LK and persist an EvaluationSet. Supports cross-OBO reconciliation via an optional pivot snapshot.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'generate_evaluation_set'
summarize_payload(payload: dict[str, Any], *, session: Session | None = None) str
class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetPayload(*, old_annotation_set_id: str, new_annotation_set_id: str, pivot_ontology_snapshot_id: str | None = None)

Bases: ProteaPayload

model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
new_annotation_set_id: str
old_annotation_set_id: str
pivot_ontology_snapshot_id: str | None
classmethod pivot_opt_non_empty(v)
run_cafa_evaluation

Runs cafaeval for NK, LK, and PK settings against a given prediction set. Downloads the OBO file, writes ground-truth and prediction TSVs, calls cafa_eval() three times (NK and LK without -known, PK with pk_known_terms.tsv as -known), and persists an EvaluationResult row with per-namespace Fmax, precision, recall, τ, and coverage.

load_interpro_go_mapping

Downloads and persists the InterPro-to-GO mapping file, linking InterPro domain entries to their associated GO terms. Used as a prerequisite step before running InterProScan-based annotation.

IP.4a — InterPro2GO mapping loader.

Streams the EBI interpro2go flat file (one line per InterPro entry to GO term mapping) and upserts rows into interpro_go_mapping. The file is small enough (few thousand lines, ~1 MB) to hold in memory, but the operation still parses lazily so a partial read is graceful.

Idempotent: the unique key (ipr_accession, go_id, source_version) is enforced by an ON CONFLICT DO NOTHING upsert, so re-running the operation for the same release is a no-op.

InterPro2GO line format (text, comment-prefixed with !):

InterPro:IPR000001 Kringle > GO:protein binding ; GO:0005515

The pattern is:

InterPro:<IPR_ACCESSION> <human_label> > GO:<label> ; <GO_ID>

Evidence is implicit (curator-asserted IEA in the consuming pipeline) so the evidence column captures the literal upstream label ("IEA" by default) without inventing extra metadata.

class protea.core.operations.load_interpro_go_mapping.LoadInterProGoMappingOperation

Bases: object

Stream the EBI InterPro2GO file and upsert mapping rows.

description = 'Download the EBI InterPro2GO flat file and upsert (ipr_accession, go_id) rows into interpro_go_mapping; idempotent per source_version.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'load_interpro_go_mapping'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.load_interpro_go_mapping.LoadInterProGoMappingPayload(*, source_version: str, mapping_url: str = 'https://ftp.ebi.ac.uk/pub/databases/GO/goa/external2go/interpro2go', evidence: str = 'IEA', timeout_seconds: Annotated[int, Gt(gt=0)] = 120, chunk_size: Annotated[int, Gt(gt=0)] = 1000)

Bases: ProteaPayload

Payload for load_interpro_go_mapping.

mapping_url defaults to the canonical EBI endpoint so callers can dispatch the op without a payload body in the common case. source_version is required so the snapshot is queryable later; pass the InterPro release tag (e.g. "InterPro-104.0") here.

chunk_size: PositiveInt
evidence: str
mapping_url: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

source_version: str
timeout_seconds: PositiveInt
run_interproscan_batch

Submits sequences to the EBI InterProScan REST API in configurable batch sizes. Stores InterproAnnotation rows linking each protein to its domain signatures. Supports resume via previously stored job IDs.

run_interproscan_batch operation.

Runs InterProScan against a batch of proteins in fixed-size chunks, parses the resulting TSV through the InterProSource plugin (IP.1b), and bulk-inserts InterProAnnotation rows (IP.2).

Inputs are either a query_set_id (resolved via QuerySetEntry joins) or an explicit accessions list. An optional ipr_release_floor lets re-runs at a newer release skip proteins whose latest persisted annotation already meets the floor.

Per-chunk commits + emit progress events make the run resumable: re-dispatching the same payload picks up where the prior run left off because already-annotated proteins are filtered out via the same ipr_release_floor check on every invocation.

Out of scope for IP.3 (see slice spec):

  • The payload class lives in PROTEA for now; promotion to protea-contracts is a follow-up bookkeeping commit.

  • InterProScan installation runbook (post-defensa).

class protea.core.operations.run_interproscan_batch.RunInterProScanBatchOperation

Bases: object

Annotate a batch of proteins with InterProScan and persist hits.

Workflow:

  1. Resolve target accessions (query set or explicit list).

  2. Drop already-annotated ones if a ipr_release_floor is set.

  3. Fetch sequences in one DB roundtrip.

  4. For each chunk_size chunk: write a temp FASTA, invoke InterProSource.run, bulk-insert the parsed records, commit (optionally), and emit a progress event.

description = 'Run InterProScan on a batch of proteins in fixed-size chunks and persist parsed domain hits into the interpro_annotation table.'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'run_interproscan_batch'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.run_interproscan_batch.RunInterProScanBatchPayload(*, query_set_id: str | None = None, accessions: list[str] | None = None, ipr_release_floor: str | None = None, chunk_size: Annotated[int, ~annotated_types.Gt(gt=0)] = 50, timeout_seconds: Annotated[int, ~annotated_types.Gt(gt=0)] = 3600, extra_args: list[str] = <factory>, binary_path: str | None = None, commit_every_chunk: bool = True)

Bases: ProteaPayload

Inputs for one run_interproscan_batch invocation.

Exactly one of query_set_id / accessions must be provided. ipr_release_floor is optional: when set, proteins whose latest persisted InterProAnnotation.ipr_release is lexicographically greater than or equal to the floor are skipped, which gives the re-dispatch flow its resume semantics.

accessions: list[str] | None
binary_path: str | None
chunk_size: PositiveInt
commit_every_chunk: bool
exactly_one_input() RunInterProScanBatchPayload
extra_args: list[str]
ipr_release_floor: str | None
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str | None) str | None
classmethod must_be_non_empty_list(v: list[str] | None) list[str] | None
query_set_id: str | None
timeout_seconds: PositiveInt
predict_go_terms_from_interpro

Derives GO term predictions from stored InterproAnnotation rows and the InterproGoMapping table, without running KNN search. Produces a PredictionSet whose entries are directly evidence-backed by domain-matching rather than embedding distance.

IP.4b-PRED — InterPro-only GO term predictor.

A parallel-signal GO term predictor. Given a set of query proteins whose InterProScan hits are already persisted as InterProAnnotation rows (IP.2 / IP.3), this operation:

  1. Joins each protein’s distinct InterPro accessions against the InterProGoMapping table (IP.4a — InterPro2GO loader) at the requested source_version.

  2. Resolves the right-hand GO ids against an OntologySnapshot so the output is keyed by go_term_id like every other GOPrediction.

  3. Aggregates the per-protein votes: if N distinct InterPro entries on the same protein map to the same GO term, that’s N supporting votes and a stronger prediction than a single mapping.

  4. Persists the result as a fresh PredictionSet with meta["algorithm"] = "interpro_propagation" so downstream ensemble / cafaeval routers can pick it up alongside KNN-based PredictionSet rows (T-RES.3 ensemble multi-K scaffolding).

The operation is single-stage (no batch fan-out): the join is a cheap index scan and the per-protein InterPro hit count is tiny (O(10-100)) so partitioning would be pure overhead.

Idempotency

Within one call: per-protein duplicate (go_term_id) votes are merged before insert. Across calls: every call materialises a new PredictionSet; the predictions underneath it are unique by uq_go_prediction_set_protein_term and we additionally ON CONFLICT DO NOTHING so a partial-failure retry on the same prediction set inserts no duplicates.

GOPrediction column mapping

The GOPrediction schema was designed for KNN transfer, so a few NOT-NULL columns need a sensible value for the InterPro path:

  • ref_protein_accession — the InterPro accession that voted for this term, e.g. "IPR000001". Multiple voters: the one with the lexicographically smallest accession (stable, observable).

  • distance — the inverse vote count (1.0 / vote_count). Lower = stronger support, matching the KNN convention; a 1-voter prediction has distance 1.0, a 5-voter prediction 0.2.

  • evidence_code — propagated from the InterProGoMapping.evidence column (defaults to "IEA").

  • vote_count — the literal number of distinct IPR entries on this protein that map to this GO id at this source_version.

Out of scope (see IP.4 plan-first sub-plan):

  • Reranker feature integration (IP.4c, requires_human=true).

  • Score blending across PredictionSets (T-RES.3 ensemble router).

  • Ancestor / aspect propagation (the caller is expected to enable the existing post-process pipeline if needed).

class protea.core.operations.predict_go_terms_from_interpro.PredictGOTermsFromInterProOperation

Bases: object

Emit GO predictions for proteins via their persisted InterPro hits.

Single-stage operation: validates FKs, materialises one PredictionSet, performs an indexed three-table join (interpro_annotationinterpro_go_mappinggo_term), aggregates per-protein votes, and bulk-inserts the GOPrediction rows.

description = "Predict GO terms for a query set by joining each protein's InterProAnnotation rows against the InterPro2GO mapping table; emits a PredictionSet tagged algorithm=interpro_propagation."
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'predict_go_terms_from_interpro'
summarize_payload(payload: dict[str, Any]) str
class protea.core.operations.predict_go_terms_from_interpro.PredictGOTermsFromInterProPayload(*, embedding_config_id: str, annotation_set_id: str, ontology_snapshot_id: str, source_version: str, query_set_id: str | None = None, query_accessions: list[str] | None = None, ipr_release_floor: str | None = None, chunk_size: int = 1000)

Bases: ProteaPayload

Inputs for predict_go_terms_from_interpro.

Either query_set_id or query_accessions is required. Both may be set — the union is used. embedding_config_id, annotation_set_id, ontology_snapshot_id are required by the existing PredictionSet schema (NOT NULL FKs); for this parallel signal they act as trace pointers (the model wasn’t actually used during prediction).

source_version is the InterPro2GO mapping release tag (see LoadInterProGoMappingPayload). Predictions are filtered to mappings tagged with this exact source_version so a release skew between snapshots is observable rather than silently mixed.

annotation_set_id: str
chunk_size: int
embedding_config_id: str
ipr_release_floor: str | None
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ontology_snapshot_id: str
query_accessions: list[str] | None
query_set_id: str | None
source_version: str
export_research_dataset

Publishes the frozen re-ranker dataset (train.parquet / eval.parquet / manifest.json) consumed by protea-reranker-lab. Runs the KNN + feature-generation pipeline via TrainRerankerAutoOperation in dump_only mode and uploads the resulting artefacts through the configured ArtifactStore (local FS by default, MinIO when the storage compose profile is active). Manifest records PROTEA’s producer_version / producer_git_sha for full traceability from lab runs back to PROTEA HEAD.

Training-dump helpers

protea.core.training_dump_helpers is the home of the KNN / feature-generation helpers that were extracted in F0 (T0.6) when protea.core.operations.train_reranker was deleted. The module is deliberately not an operation: it is reused in-process by ExportResearchDatasetOperation to materialise train and eval shards before the parquet_export consolidation pass. LightGBM training itself lives in protea-reranker-lab, which consumes the published Dataset rows produced by export_research_dataset.

As of T-RES.1 (PR #368), parent_map_str is always passed to the KNN transfer runner unconditionally, regardless of the expand_votes_to_ancestors flag. The lineage producer (protea_method.lineage.compute_lineage_features) requires the parent map for its lineage_* column computation; the flag only controls ancestor-vote expansion, not the availability of the map itself.

Internal helpers

These modules are imported by the operations and the feature engineering layer; they are documented here for completeness but are not part of the public API.

  • protea.core.anc2vec_embeddings: anc2vec ancestry embeddings for GO terms, used as features by the re-ranker.

  • protea.core.annotation_intern: string interning helper for reducing memory pressure when loading large annotation sets.

  • protea.core.disk_cache: generic on-disk cache with TTL used by the KNN reference loader and the PCA cache.

  • protea.core.feature_enricher: orchestrator that combines alignment, taxonomy and anc2vec features into a single per-candidate row.

  • protea.core.pca_cache: per-PLM PCA projection cache, used to pre-compute the emb_pca feature family.

  • protea.core._anc2vec_phases: phase helpers for the anc2vec embedding pipeline.

  • protea.core._feature_enricher_helpers: low-level helpers extracted from feature_enricher to keep per-phase logic self-contained.

  • protea.core.features._bindings: internal feature-column binding declarations consumed by features.registry.

  • protea.core._knn_transfer_runner: worker-side KNN transfer runner dispatched by the predictions coordinator.

  • protea.core._leaf_record_builder: builder for leaf-prediction records in the KNN transfer pipeline.

  • protea.core._pair_feature_compute: pair-level feature computation with optional process-pool parallelism and SQLite alignment cache.

  • protea.core._training_dump_loaders: data-loading helpers used by the training-dump pipeline.

  • protea.core.training_dump._constants: shared constants for the training-dump subpackage.

  • protea.core.training_dump._contexts: context dataclasses bundling session, settings, and configuration for training-dump passes.

  • protea.core.training_dump._data_loaders: loaders that hydrate protein, embedding, and annotation data for the training-dump pipeline.

  • protea.core.training_dump._knn_transfer: KNN search and GO-term transfer logic specific to the training-dump path.

  • protea.core.training_dump._payload: Pydantic payload model for the export_research_dataset operation.

  • protea.core.training_dump._runner: top-level runner that coordinates the training-dump passes.

  • protea.core.training_dump._test_split: helper that materialises the eval/test shard of the frozen dataset.

  • protea.core.training_dump._train_split: helper that materialises the train shard of the frozen dataset.

  • protea.core.operations._compute_embeddings_backends: backend dispatch logic for the compute_embeddings coordinator.

  • protea.core.operations._compute_embeddings_helpers: batch construction and progress helpers for compute_embeddings.

  • protea.core.operations._load_ontology_helpers: OBO parsing helpers used by load_ontology_snapshot.

  • protea.core.operations._predict_go_terms_adapter: adapter that routes the predict_go_terms coordinator to the unified-path or batch-op implementations.

  • protea.core.operations.predict_go_terms._aspect_helpers: per-aspect splitting and merging helpers for the predict pipeline.

  • protea.core.operations.predict_go_terms._batch_op: batch operation executed by OperationConsumer workers.

  • protea.core.operations.predict_go_terms._batch_op_feature: feature generation phase of the batch operation.

  • protea.core.operations.predict_go_terms._batch_op_reference: reference-embedding loading and KNN search for batch operations.

  • protea.core.operations.predict_go_terms._batch_op_reranker: reranker scoring phase of the batch operation.

  • protea.core.operations.predict_go_terms._common: shared types and constants used across the predict-go-terms subpackage.

  • protea.core.operations.predict_go_terms._coordinator: coordinator that partitions the query set and dispatches batch messages.

  • protea.core.operations.predict_go_terms._post_knn_pipeline: post-KNN pipeline steps (feature enrichment, reranker, aggregation).

  • protea.core.operations.predict_go_terms._reranker_scorer: RerankerScorer compositive class that applies a loaded booster to batch prediction DataFrames.

  • protea.core.operations.predict_go_terms._store: bulk-insert helpers for writing GOPrediction rows.

  • protea.core.operations.predict_go_terms._unified_path: unified in-process execution path used when a single worker handles both KNN and write phases.

  • protea.core.operations._run_cafa_artifacts: artifact download and staging helpers for run_cafa_evaluation.

  • protea.core.operations._run_cafa_data_helpers: data-loading helpers for the CAFA evaluation pipeline.

  • protea.core.operations._run_cafa_eval_driver: driver that calls cafaeval.cafa_eval and collects per-namespace results.

  • protea.core.operations._run_cafa_helpers: shared helper functions used across the _run_cafa_* modules.

  • protea.core.operations._run_cafa_reranker_loader: loads and applies the reranker model to CAFA prediction TSVs.

  • protea.core.operations._run_cafa_setup: environment and directory setup for a CAFA evaluation run.

  • protea.core.operations._run_cafa_summary: summarises cafaeval results into EvaluationResult rows.

Process-local string interning for annotation hot loops.

The predict / train pipelines build millions of per-row dicts of the shape {"go_term_id": int, "qualifier": str|None, "evidence_code": str|None} when they materialise annotations from PostgreSQL. SQLAlchemy returns each qualifier and evidence_code as a fresh Python string — even though across a million rows there are only ~5-10 distinct values ("IEA", "IDA", "EXP", "TAS", …) and most rows have qualifier = None.

Without interning, each duplicate string allocates ~50 B in CPython, so a 5 M-row batch can carry ~500 MB of redundant string objects. Interning collapses every duplicate to a single shared instance, a Flyweight-style intrinsic-state share. Python already does this implicitly for short identifier-like literals; this module forces the same dedup for the strings that come back from the DB driver.

Process-local by design (one cache per worker), trivially small (the domain has fewer than 50 distinct codes/qualifiers in practice), and thread-safe via the GIL — setdefault is atomic for built-in dicts.

protea.core.annotation_intern.intern_string(value: str | None) str | None

Return a shared instance of value if it has been seen before.

Pass None through unchanged so callers don’t need a guard. The pool grows monotonically; reset only when the worker restarts.

protea.core.annotation_intern.pool_size() int

Diagnostic — how many distinct strings the pool has retained.

On-disk caches for the predict_go_terms pipeline.

The reference embedding pool for an (EmbeddingConfig, AnnotationSet) pair is large (millions of rows × ~1280 dims × 2 bytes = several GB). Re-fetching it from PostgreSQL on every batch worker is the main bottleneck, so the pool is materialised once into data/ref_cache/ and read back via numpy mmap on subsequent runs. Annotations are stored in a CSR-style layout (offsets + flat go_term_ids / qualifiers / evidence_codes arrays) so per-protein lookups stay O(1) instead of dictionary-of-list-of-dict.

Files (under _DISK_CACHE_DIR):

  • {cfg}__{ann}_embeddings.npy — float16 reference embeddings

  • {cfg}__{ann}_accessions.npy — aligned accession list

  • {cfg}__{ann}__{aspect}_indices.npy — int32 indices into the unified pool

  • {cfg}__{ann}__{aspect}_anno_*.npy — CSR annotation arrays

Invalidation: AnnotationSet rows are immutable once loaded, so the cache stays valid for as long as the files exist. Delete the files manually to force a reload after a model change.

Per-group helpers extracted from expand_predictions_to_ancestors.

The ancestor-expansion loop in feature_enricher mutates either an existing leaf record (when an ancestor coincides with a sibling leaf prediction) or a synthetic ancestor record (cloned from the leaf with go_id overridden). Both branches were inlined in the orchestrator, pushing it well past the master plan v3.2 §3 method-LOC ceiling. This module hosts the two branch helpers + the small label-config bundle so the orchestrator can stay readable and short.

class protea.core._feature_enricher_helpers.LabelConfig(q_acc: str, gt_pairs: set[tuple[str, str]] | None, column: str, present: bool)

Bases: NamedTuple

Bundle for the per-query label-injection knobs.

Carries the query accession (used as half of the (q_acc, anc) ground-truth lookup) plus the optional gt_pairs set and the column / presence flags that drive whether a synthesized ancestor record should carry a label at all. The training dump path passes a populated set; the live predict_go_terms path leaves gt_pairs as None and present as False.

column: str

Alias for field number 2

gt_pairs: set[tuple[str, str]] | None

Alias for field number 1

present: bool

Alias for field number 3

q_acc: str

Alias for field number 0

protea.core._feature_enricher_helpers.compute_ia_weight(anc_gid: str, leaf_gid: str, ia_weights: dict[str, float] | None) float

Weight contributed by leaf_gid to its ancestor anc_gid.

Without IA weights every edge counts as 1.0. When weights are provided, the contribution is the IA ratio anc/leaf (with a guard against zero-weight leaves to avoid division blow-ups when an incomplete IA file leaves a term at IA=0).

protea.core._feature_enricher_helpers.make_ancestor_closure(parent_map: dict[str, set[str]] | dict[str, list[str]] | None) Callable[[str], frozenset[str]]

Return a memoized go_id -> ancestor closure callable.

The orchestrator calls the result repeatedly for every leaf GO id; caching is_a / part_of closures here keeps the inner loop O(1) after the first visit. parent_map is normalised to frozenset parents up front so subsequent traversal cannot accidentally mutate the caller’s dict.

protea.core._feature_enricher_helpers.merge_into_existing_leaf(leaf_anc: dict[str, Any], leaf_rec: dict[str, Any], vote_increment: float, leaf_d: float) None

Fold a leaf-as-ancestor vote into the existing leaf candidate.

Called when the ancestor of a leaf prediction is itself already a leaf candidate for the same (protein, aspect) group. Bumps the target’s neighbor_vote_fraction and lowers its neighbor_min_distance if the contributing leaf is closer.

protea.core._feature_enricher_helpers.update_synth_entry(synth: dict[str, dict[str, Any]], anc: str, leaf_rec: dict[str, Any], leaf_d: float, vote_increment: float, label_ctx: LabelConfig) None

Insert or update a synthetic ancestor record.

When the ancestor is not already a leaf candidate the record is fabricated from the closest contributing leaf. If a synthetic entry already exists, the closer leaf wins the per-pair feature payload (alignment, taxonomy, anc2vec, emb_pca) so the booster sees a consistent training-time view; otherwise only the vote fraction is bumped.

Per-feature compute bindings for the canonical FeatureRegistry (T2B.2).

T2B.1 populated protea.core.features.registry with placeholder compute callables that raise NotImplementedError. This module wires each feature in protea_contracts.ALL_FEATURES to the legacy producer function it is filled by in the current code path and calls CanonicalFeatureRegistry.bind_compute() once at import time to replace the placeholders.

Mapping (family group -> producer):

  • KNN-derived columns (distance, vote_count, k_position, go_term_frequency, ref_annotation_density, neighbor_distance_std, neighbor_vote_fraction, neighbor_min_distance, neighbor_mean_distance) come from the per-aspect record builder driven by protea.core._knn_transfer_runner._KnnTransferRunner. They are bound to _knn_record_producer(), a marker callable that carries a reference to the runner on its __protea_producer__ attribute.

  • NW / SW alignment columns and length_query / length_ref come from protea.core.feature_engineering.compute_alignment() (called per (query, ref) pair in _knn_transfer_runner).

  • Taxonomy pair columns (taxonomic_distance, taxonomic_common_ancestors, taxonomic_relation) come from protea.core.feature_engineering.compute_taxonomy().

  • The v6 enrichment columns (tax_voters_*, go_term_frequency share, anc2vec_*, emb_pca_query_*) come from protea.core.feature_enricher.enrich_v6_features().

  • Lineage columns (lineage_is_ancestor_of_known, lineage_is_descendant_of_known, lineage_ancestor_of_count, lineage_descendant_of_count) come from protea_method.lineage.compute_lineage_features(). Added in T-RES.1 to consume the lineage feature family registered by protea-contracts v0.3.0.

  • Categorical metadata (qualifier, evidence_code, aspect) is sourced from annotation rows during record construction; the marker _annotation_metadata_producer() carries that intent.

The bound compute callables are not invoked by parquet_export (the exporter reads pre-computed shards from disk). They exist so that any downstream consumer of the registry can introspect which legacy function produces each column. The __protea_producer__ attribute makes that legacy reference machine-readable.

Idempotency: apply_canonical_bindings() is safe to call more than once on the same registry. It walks every feature in protea_contracts.ALL_FEATURES and rebinds, so a fresh registry (after reset_canonical_registry()) is restored to fully-bound state by re-importing this module’s public function.

protea.core.features._bindings.apply_canonical_bindings(registry: CanonicalFeatureRegistry) int

Bind every ALL_FEATURES feature on registry to its legacy producer reference. Returns the count of features bound.

Idempotent: rebinding an already-bound feature is a no-op (the underlying CanonicalFeatureRegistry.bind_compute() replaces the callable without touching dtype / family).

Parallel + persistent computation of per-pair alignment features.

The export pipeline’s hotspot is the per-(query, reference) alignment in _KnnTransferRunner._compute_pair_features: a single-threaded triple loop calling compute_alignment (parasail NW+SW with traceback) at a few hundred pairs/s. Two structural wins live here:

  1. Process-level parallelism over the unique alignment pairs. parasail’s traceback variants do not release the GIL well enough for threads to scale, so a ProcessPoolExecutor is used (benchmarked ~3x on a 12-core box). The taxonomy lookups stay in the parent process because they are cheap (an lru_cache over ete3 lineages) and process re-init of the ete3 sqlite handle would dwarf the work.

  2. A persistent on-disk cache keyed by the ordered sequence pair plus the alignment parameters. Intra-PLM the K=10 neighbour set is a superset of K=5 / K=3, so alignments computed once for the largest-K dataset make the smaller-K datasets (and any re-run) near-free. protea.training is serialised (one job at a time) so there is no concurrent-write contention; a plain sqlite file is enough.

This module is value-preserving by construction: the alignment feature dict it returns is exactly what compute_alignment returns, only computed concurrently and memoised. Disabling parallelism (workers <= 1) and the cache (PROTEA_ALIGN_CACHE_DIR unset / cache-miss) reduces to the original serial code path.

class protea.core._pair_feature_compute.PairAlignmentComputer

Bases: object

Compute alignment-feature dicts for many pairs, cached + parallel.

Use as a context manager so the sqlite handle and process pool are released deterministically.

compute(pairs: list[tuple[str, str]]) dict[tuple[str, str], dict[str, Any]]

Return {(q_seq, r_seq): alignment_dict} for every input pair.

Empty-sequence pairs are skipped by the caller; here every pair is assumed to have two non-empty sequences. Deduplicates on the ordered sequence pair so identical pairs are aligned once.

protea.core._pair_feature_compute.build_pair_feature_dict(q_acc: str, ref_acc: str, align_by_pair: dict[tuple[str, str], dict[str, Any]], *, do_alignments: bool, do_taxonomy: bool, tax_ids: tuple[dict[str, Any] | None, dict[str, Any] | None]) dict[str, Any]

Merge the precomputed alignment + per-pair taxonomy for one pair.

Value-identical to the original inline compute_alignment + compute_taxonomy merge: the alignment half is looked up from the batch-computed map, the taxonomy half is computed here (cheap, lru_cache-backed). Either family is skipped when its flag is off.

protea.core._pair_feature_compute.precompute_alignment_features(*, aspects: Sequence[str], neighbors_by_aspect: dict[str, list[list[tuple[str, float]]]], valid_queries: Sequence[str], query_sequences: dict[str, str], ref_sequences: dict[str, str]) dict[tuple[str, str], dict[str, Any]]

Batch-align every unique (q_acc, ref_acc) neighbour pair.

Drives PairAlignmentComputer (parallel + on-disk cache) over the deduplicated pair set and remaps the result back onto the (q_acc, ref_acc) accession keys the runner indexes by. Returns an empty map when there are no alignable pairs.

protea.core._pair_feature_compute.resolve_workers() int

Number of process-pool workers for pair-feature computation.

PROTEA_PAIR_FEATURE_WORKERS overrides; default is the CPU count. A value <= 1 forces the serial path (no pool spin-up).

Bulk DB loaders extracted out of training_dump_helpers.

Three private helpers used by the dump pipeline:

  • _count_embeddings_with_dim(): single COUNT + vector_dims probe for a given embedding-config UUID.

  • _stream_embeddings(): pre-allocated np.empty matrix filled via yield_per over the sequence_embedding join.

  • _load_annotation_aggregations(): per-aspect grouping of annotation rows under the GO P/F/C aspects, filtering against a provided acc_to_idx so only proteins with a preloaded embedding contribute to the reference set.

All three are intentionally I/O-only: they take a SQLAlchemy Connection (already opened by the caller) and return plain Python / NumPy structures. Keeping them out of training_dump_helpers lets _preload_all_embeddings and _build_reference_from_cache stay under the §3 60-LOC method ceiling without the dump-helper file ballooning further.

Shared constants for the dump pipeline submodules.

Kept here so the split helpers, the test-split helpers, and the runner can share them without forming an import cycle (T2B.6).

Parameter objects for the dump pipeline.

Originally lived in protea/core/training_dump_helpers.py. Extracted to a leaf submodule (T2B.6) so the type-only consumers in protea/core/_knn_transfer_runner.py can import them without dragging the larger orchestration code into the import graph.

class protea.core.training_dump._contexts.KnnTransferContext(valid_queries: list[str], query_emb: np.ndarray, ref_by_aspect: dict[str, dict[str, Any]], go_id_map: dict[int, str], aspect_map: dict[int, str], gt_pairs: set[tuple[str, str]], query_known_gos: dict[str, set[str]] | None = None, parent_map_str: dict[str, set[str]] | None = None, ia_weights: dict[str, float] | None = None, pca_state: tuple[np.ndarray, np.ndarray] | None = None, pivot_go_ids: set[str] | frozenset[str] | None = None, embedding_pool: np.ndarray | None = None)

Bases: object

Bundle of KNN inputs + enrichment maps for _knn_transfer_and_label.

Groups the 12 per-call data arguments (queries, references, ontology maps, optional enrichment helpers) so the entry-point signature stays under flake8-bugbear’s parameter ceiling. session, payload p, sequence_context, and stream_output remain standalone arguments because they are configuration / IO concerns, not data.

aspect_map: dict[int, str]
embedding_pool: np.ndarray | None = None
go_id_map: dict[int, str]
gt_pairs: set[tuple[str, str]]
ia_weights: dict[str, float] | None = None
parent_map_str: dict[str, set[str]] | None = None
pca_state: tuple[np.ndarray, np.ndarray] | None = None
pivot_go_ids: set[str] | frozenset[str] | None = None
query_emb: np.ndarray
query_known_gos: dict[str, set[str]] | None = None
ref_by_aspect: dict[str, dict[str, Any]]
valid_queries: list[str]
class protea.core.training_dump._contexts.SequenceContext(query_sequences: dict[str, str] | None = None, ref_sequences: dict[str, str] | None = None, query_tax_ids: dict[str, int | None] | None = None, ref_tax_ids: dict[str, int | None] | None = None)

Bases: object

Per-protein sequence and taxonomy lookups.

All four attributes are optional; passing None disables the corresponding feature family (alignment / taxonomy).

query_sequences: dict[str, str] | None = None
query_tax_ids: dict[str, int | None] | None = None
ref_sequences: dict[str, str] | None = None
ref_tax_ids: dict[str, int | None] | None = None
class protea.core.training_dump._contexts.StreamOutput(output_parquet: Path, chunk_rows: int = 100000)

Bases: object

Streaming parquet output for memory-bounded dataset generation.

When provided, _knn_transfer_and_label writes labeled rows directly to output_parquet in chunks of chunk_rows instead of accumulating the full result list in memory.

chunk_rows: int = 100000
output_parquet: Path

Reference / sequence / taxonomy loaders for the dump pipeline.

Originally lived in protea/core/training_dump_helpers.py. Extracted to a leaf submodule (T2B.6) so the orchestration code can import the loaders without pulling in the entire split runner. Behaviour is byte identical.

Thin facade over the _knn_transfer_runner Method Object.

Kept as a separate submodule (T2B.6) so the test-split and train-split helpers can import the entry point without each pulling the runner module into their import graph at top level.

Payload for the auto dump operation.

Originally lived in protea/core/training_dump_helpers.py. The model shape is unchanged; only the file location moved (T2B.6).

class protea.core.training_dump._payload.TrainRerankerAutoPayload(*, name: str, embedding_config_id: str, ontology_snapshot_id: str, train_versions: list[int], test_versions: list[int], annotation_source: str = 'goa', limit_per_entry: Annotated[int, Gt(gt=0)] = 5, distance_threshold: float | None = None, search_backend: str = 'faiss', metric: str = 'cosine', faiss_index_type: str = 'IVFFlat', faiss_nlist: int = 256, faiss_nprobe: int = 32, num_boost_round: int = 1000, early_stopping_rounds: int = 50, val_fraction: float = 0.2, neg_pos_ratio: float | None = None, reranker_objective: str = 'binary', compute_alignments: bool = False, compute_taxonomy: bool = False, ia_file: str | None = None, expand_votes_to_ancestors: bool = False, use_embedding_pca: bool = False, training_scope: str = 'per_category', per_cell_min_positives: Annotated[int, Gt(gt=0)] = 200, dump_to: str | None = None, dump_only: bool = False)

Bases: ProteaPayload

Payload for the dump_helper operation.

Generates consecutive temporal pairs from train_versions, runs KNN once per pair, then trains 3 per-category LightGBM models (NK, LK, PK) and evaluates each on the held-out test split.

annotation_source: str
classmethod at_least_one_test(v: list[int]) list[int]
classmethod at_least_two_train(v: list[int]) list[int]
compute_alignments: bool
compute_taxonomy: bool
distance_threshold: float | None
dump_only: bool
dump_to: str | None
early_stopping_rounds: int
embedding_config_id: str
expand_votes_to_ancestors: bool
faiss_index_type: str
faiss_nlist: int
faiss_nprobe: int
ia_file: str | None
limit_per_entry: PositiveInt
metric: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
name: str
neg_pos_ratio: float | None
num_boost_round: int
ontology_snapshot_id: str
per_cell_min_positives: PositiveInt
reranker_objective: str
classmethod scope_is_valid(v: str) str
search_backend: str
test_versions: list[int]
train_versions: list[int]
training_scope: str
use_embedding_pca: bool
val_fraction: float

Pure helpers extracted from compute_embeddings operation classes.

Keeps the operation file (compute_embeddings.py) under the master plan v3.2 §3 method-LOC ceiling while leaving the operation classes focused on payload validation, model loading, and the publish path.

protea.core.operations._compute_embeddings_helpers.build_batch_dispatch_messages(p: ComputeEmbeddingsPayload, parent_job_id: uuid.UUID, sequence_ids: list[int]) list[tuple[str, dict]]

Partition sequences into per-job batches and build their queue messages.

Coordinator-side helper for ComputeEmbeddingsOperation.execute: splits the full sequence_ids list into chunks of p.sequences_per_job and emits one compute_embeddings_batch message per chunk addressed to the GPU batch queue. The messages carry every payload field that the child workers need (no DB lookups happen between coordinator and worker).

protea.core.operations._compute_embeddings_helpers.build_embedding_rows(session: Session, p: StoreEmbeddingsPayload, config_id: uuid.UUID) tuple[list[dict], int, int]

Materialise SequenceEmbedding insert rows for a store-embeddings batch.

Iterates p.sequences and skips entries whose (sequence_id, config_id) pair already exists when p.skip_existing is true; otherwise deletes the existing rows so the bulk insert can replace them. Returns (rows, embeddings_stored, sequences_skipped); callers run the bulk insert + per-job-progress update.

protea.core.operations._compute_embeddings_helpers.build_store_message(parent_job_id: uuid.UUID, p: ComputeEmbeddingsBatchPayload, write_sequences: list[dict]) tuple[str, dict]

Build the queue tuple that hands off inferred batches to the write worker.

protea.core.operations._compute_embeddings_helpers.serialize_inferred_chunks(sequences: list[Sequence], batch_chunks: list[list[ChunkEmbedding]]) list[dict]

Build per-sequence dicts the store_embeddings worker consumes.

Pairs each input Sequence row with its inferred chunk list and flattens each ChunkEmbedding into JSON-friendly fields. The write worker uses these dicts directly without re-fetching from the DB.

protea.core.operations._compute_embeddings_helpers.t5_forward_pass(model: Any, input_ids: Any, attention_mask: Any) Any

Run a T5 encoder forward pass under no_grad; return hidden_states.

Lazy-imports torch so this module stays importable on machines without it (e.g. the API process where embedding code is never exercised). Frees the outputs reference and triggers a CUDA cache flush before returning so the caller can reuse the residual GPU memory for the per-sequence pool step.

Pulled out of _embed_t5 to keep that backend entry point under the §3 60-LOC ceiling.

Pure helpers extracted from LoadOntologySnapshotOperation.execute.

The OBO load path branches into a fresh-snapshot insert and an already-exists-but-needs-backfill case; both paths build GOTermRelationship rows via the same loop. This module hosts the shared relationship builder plus the two branch handlers, so the operation file (load_ontology_snapshot.py) can keep execute under the master plan v3.2 §3 method-LOC ceiling.

protea.core.operations._load_ontology_helpers.build_relationships(terms: list[dict[str, Any]], go_id_to_db_id: dict[str, UUID], snapshot_id: UUID) list[GOTermRelationship]

Materialise GOTermRelationship rows for the given parsed terms.

terms carry their parent edges in ("relation_type", "GO:nnn") tuples under the relationships key (parser output). Edges pointing at GO ids absent from go_id_to_db_id are silently skipped so a partial snapshot does not abort the whole insert.

protea.core.operations._load_ontology_helpers.handle_existing_snapshot(session: Session, existing: OntologySnapshot, terms: list[dict[str, Any]], obo_version: str, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult

Resolve an idempotent re-run of load_ontology_snapshot.

Returns the existing snapshot id as a no-op when its relationship rows are already present; otherwise dispatches to _backfill_relationships() (the GO terms themselves are assumed present from the original load).

protea.core.operations._load_ontology_helpers.insert_new_snapshot(session: Session, obo_url: str, obo_version: str, terms: list[dict[str, Any]], started_at: float, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult

Create the snapshot, terms, and relationship rows in a single flow.

The caller has already validated the payload, downloaded the OBO, and parsed the terms; this function commits the three inserts and emits the done audit event with the elapsed runtime.

Pre-cafaeval data preparation helpers extracted from RunCafaEvaluationOperation.execute.

These pure functions operate on the in-memory EvaluationData buckets before cafaeval is invoked: they restrict the ground truth to the actually-predicted protein cohort, which is the standard CAFA practice that prevents delta proteins outside the PredictionSet’s query coverage from hurting Fmax / coverage.

protea.core.operations._run_cafa_data_helpers.restrict_data_to_predicted(session: Session, *, prediction_set_id: UUID, data: EvaluationData, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) EvaluationData

Drop ground-truth proteins not present in the PredictionSet cohort.

Without this filter, delta proteins that the PredictionSet never actually predicted (e.g. the booster failed for them, or they were excluded at query time) would still count as “missing” against Fmax / coverage. CAFA practice is to evaluate only on the predicted cohort.

Returns a new EvaluationData with each of the five {protein set[go_id]} mappings filtered to the predicted accessions. Emits a gt_restricted_to_predicted event with before/after counts for the audit trail.

protea.core.operations._run_cafa_data_helpers.write_ground_truth_files(artifacts_root: Any, data: EvaluationData) dict[str, str]

Write the five gt_*.tsv / known_terms.tsv files used by cafaeval.

Returns a dict {label path} so the caller can pass each path to the per-setting cafaeval driver. artifacts_root is a pathlib.Path (or anything convertible via str); kept typed as Any so the helper does not import pathlib just for one signature.

protea.core.operations._run_cafa_data_helpers.write_terms_of_interest(toi_path: str, toi_go_ids: list[str], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) None

Persist the CAFA6 terms-of-interest file (one GO id per line, sorted).

cafaeval’s -toi flag restricts evaluation to this set so that terms not in the frozen pivot graph (e.g. new since t-1) are excluded from scoring.

Setup-phase helpers extracted from run_cafa_evaluation.

Holds the two NamedTuple bundles threaded through the run pipeline plus the small functions that load the term universe and emit the setup events. Living in a sibling keeps the operation file lean and under the master-plan v3.2 §3 LOC budget.

Re-exported by run_cafa_evaluation for backwards compatibility.

Sibling helper for RunCafaEvaluationOperation.summarize_payload.

Builds the per-job CLI / event summary that the jobs router exposes. Lives outside run_cafa_evaluation.py so the operation module stays under §3 file-LOC budget. Each _bits_from_* helper is a small, test-friendly chunk.

protea.core.operations._run_cafa_summary.summarize_payload(payload: dict[str, Any], session: Session | None) str

Build the human-readable run summary for the CAFA evaluator.

Mirrors the Job summarize_payload contract: never raises, falls back to UUID prefixes when a FK row is missing or no session is available.

See also

  • Operations: narrative documentation for every operation listed above, with payload examples and execution flow.

  • Infrastructure: the ORM models that protea.core reads and writes.

  • How-to Guides: task-oriented recipes that exercise these modules end-to-end.