Core¶
The protea.core package contains all domain logic. It has no dependency
on the infrastructure layer: operations receive an open SQLAlchemy session
and an emit callback, but they do not manage connections, queues, or
transactions themselves. This strict boundary makes every operation
independently testable and trivially substitutable.
Axis tuple
protea.core.axis_tuple defines the AxisTuple named-tuple used to
identify a training configuration in the multi-PLM sweep. Each axis
carries the PLM identifier, neighbourhood size k, reranker flag,
feature family set, evaluation set name, propagation mode, and ensemble
strategy. All serialised config keys in Dataset and ExperimentRun
rows use the canonical axis-tuple string form derived from this type.
Axis-tuple shortid: thin shim that prefers the protea-contracts helper.
The canonical implementation lives in protea_contracts.axis_tuple
(shipped by FARM-EXP.1 companion PR on protea-contracts). PROTEA pins
protea-contracts@main in pyproject.toml, so once that PR
merges and we re-lock, the local fallback below disappears via the
try / except ImportError path.
The local fallback is byte-for-byte identical to the upstream helper so the cross-repo shortid contract holds during the brief window where PROTEA is merged before the contracts release that ships the helper.
Formula (mirrors ExperimentSpec.hash() in protea-reranker-lab/src/protea_reranker_lab/experiment.py:108-111):
sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()[:12]
When the upstream helper is available, this module re-exports it verbatim (same symbol, same digest). Callers should import from here so the swap is invisible:
from protea.core.axis_tuple import axis_tuple_shortid
- protea.core.axis_tuple.CANONICAL_AXIS_KEYS: tuple[str, ...] = ('plm', 'k', 'reranker_spec_id', 'feature_schema_sha', 'eval_set_name', 'eval_set_manifest_sha', 'propagation', 'ensemble_spec')¶
Canonical axis-tuple key set. Falls back to the locally-pinned tuple while the upstream contracts release is being rolled out. Order does NOT affect the digest (
json.dumps(..., sort_keys=True)).
- protea.core.axis_tuple.SHORTID_HEX_LEN: int = 12¶
Length of the truncated hex digest. Matches the lab convention.
- protea.core.axis_tuple.axis_tuple_shortid(axis_tuple: Mapping[str, Any]) str¶
Return the canonical 12-hex shortid for an axis tuple.
- Parameters:
axis_tuple – mapping of axis name to value. Every value must be JSON-encodable by the standard
jsonmodule without a customdefault. Path / UUID / datetime objects must be pre-stringified by the caller (mirrors the lab path’spydantic.model_dump(mode="json")step).- Returns:
12 lowercase hex chars (
[0-9a-f]{12}).- Raises:
TypeError – when a value in
axis_tupleis not JSON-encodable. Surfaced as a hard error so schema drift is caught at the call site rather than producing a silent shortid mismatch.
The function is intentionally pure: same input -> same output, no hidden state, no environment lookup. Order of keys in
axis_tupleis irrelevant (json.dumps(..., sort_keys=True)canonicalises).
Domain types
protea.core.domain.aspect defines the Aspect enum used throughout
the codebase to identify the three GO namespaces: MFO (Molecular Function),
BPO (Biological Process), and CCO (Cellular Component).
GO aspect (namespace) — the three-way partition of the Gene Ontology.
Two encodings circulate in the codebase, both of them load-bearing:
Single-char codes (
"P"/"F"/"C") — the wire format used byGOTerm.aspectin PostgreSQL and thego-basic.obofile itself. Thego_termtable CHECK constraint is on these codes.Three-char CAFA codes (
"BPO"/"MFO"/"CCO") — the format expected bycafaeval(the upstream Fmax / AuPRC evaluator) and surfaced in the UI for human readers.
Until this module landed, both encodings appeared as bare string literals in 30+ places — a textbook Primitive Obsession smell. The enum is the single source of truth; everything else converts at the boundary.
Typical usage:
from protea.core.domain.aspect import Aspect
# Iterate the three aspects in a stable canonical order
for aspect in Aspect:
...
# Convert from a DB row
aspect = Aspect.from_code(row.aspect)
# Hand off to cafaeval
result_dict[aspect.cafa] = ...
- class protea.core.domain.aspect.Aspect(*values)¶
Bases:
EnumGene Ontology aspect / namespace.
The three GO sub-ontologies. Iteration order is the canonical PROTEA order (P → F → C), matching the historical
_ASPECTS = ("P", "F", "C")tuples this enum replaces.- BIOLOGICAL_PROCESS = 'P'¶
- CELLULAR_COMPONENT = 'C'¶
- MOLECULAR_FUNCTION = 'F'¶
- property cafa: str¶
Three-char CAFA code (
"BPO"/"MFO"/"CCO").Format expected by the upstream
cafaevalpackage and the evaluation results JSON; also the canonical UI label.
- property code: str¶
Single-char code (
"P"/"F"/"C").Wire format in PostgreSQL (
go_term.aspectcolumn) and thego-basic.obofile. Use this when reading/writing the DB or comparing against the ORM column.
Contracts¶
The contracts module defines the interfaces that every operation must satisfy and the shared types used across the entire codebase.
Operation is a structural Protocol: any class that exposes a name
string and an execute(session, payload, *, emit) method conforms to it,
without needing to inherit from a base class. ProteaPayload is the
immutable, strictly-typed Pydantic base class for all operation payloads:
strict mode prevents silent type coercion, and frozen configuration prevents
accidental mutation after validation. OperationResult is the return value
of every execute call; its deferred flag tells BaseWorker that
completion will be signalled by child workers rather than immediately.
RetryLaterError is raised when a shared resource (e.g. the GPU) is
occupied; BaseWorker catches it, resets the job to QUEUED, and
re-publishes the message after a configurable delay.
- class protea.core.contracts.operation.Operation(*args, **kwargs)¶
Bases:
ProtocolProtocol that every domain operation must satisfy.
Operations are pure domain logic: they receive an open SQLAlchemy session and an
emitcallback for structured event logging, and return anOperationResult. They must not manage sessions or queue connections.descriptionis a short, static, human-readable explanation of what the operation does in general, surfaced in the jobs UI to give the operations history context.summarize_payloadreturns a 1-line dynamic summary of the most informative fields in the payload (e.g. “release 211 ← OBO 2022-07-01”), so each individual job in the history tells its own story. Returning an empty string is acceptable for operations with no useful payload.- description: str¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name: str¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.contracts.operation.OperationResult(result: dict[str, ~typing.Any]=<factory>, progress_current: int | None = None, progress_total: int | None = None, deferred: bool = False, publish_after_commit: list[tuple[str, ~uuid.UUID]]=<factory>, publish_operations: list[tuple[str, dict[str, ~typing.Any]]]=<factory>)¶
Bases:
objectReturn value of every Operation.execute() call.
resultis a free-form dict that gets stored inJob.metaand surfaced in the job detail view.progress_current/progress_totalare written back to the Job row so the UI can render a progress bar.deferred: if True, BaseWorker will NOT transition the job to SUCCEEDED. Use this for coordinator operations that delegate work to child jobs; the last child is responsible for marking the parent SUCCEEDED.publish_after_commit: list of (queue_name, job_id) pairs that BaseWorker will publish to RabbitMQ after the DB commit, guaranteeing workers always find the child job row before they try to claim it.- deferred: bool = False¶
- progress_current: int | None = None¶
- progress_total: int | None = None¶
- publish_after_commit: list[tuple[str, UUID]]¶
- publish_operations: list[tuple[str, dict[str, Any]]]¶
- result: dict[str, Any]¶
- exception protea.core.contracts.operation.RetryLaterError(reason: str, delay_seconds: int = 60)¶
Bases:
ExceptionRaised by an operation when it cannot run yet but should be retried.
BaseWorker resets the job to QUEUED and the consumer re-publishes the message after
delay_seconds, leaving the GPU free for other work.
- protea.core.contracts.operation.make_safe_emit(raw_emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]¶
Wrap a raw EmitFn so failures are logged and never propagate.
The platform-level emit may fail for transient reasons (DB connection lost mid-operation, JobEvent insert conflict, etc.). Operations should not crash because the audit trail hiccupped: the operation’s primary work matters more than the event row. Failures are logged at ERROR with full traceback so they remain visible in observability without breaking the running job.
OperationRegistry is a simple dict-backed mapping from operation name
strings to instances. Workers resolve the correct operation at message
dispatch time; new operations are registered at process startup in
scripts/worker.py without modifying any worker code.
- class protea.core.contracts.registry.OperationRegistry¶
Bases:
object
parent_progress exposes the shared
_update_parent_progress helper used by every coordinator
operation (compute_embeddings, predict_go_terms) to advance
the parent job’s progress as child workers finish their batches.
Extracted to its own module in F0 (T0.7) to remove duplicated copies
across coordinators.
Shared helper for child operations that report progress to a parent job.
Used by store_embeddings and store_predictions: each child
batch increments the parent’s progress_current and, if it was
the last batch, transitions the parent from RUNNING to SUCCEEDED.
- protea.core.contracts.parent_progress.update_parent_progress(session: Session, parent_job_id: UUID, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None], *, event_name: str) None¶
Atomically increment parent progress; mark SUCCEEDED if last batch.
Returns silently if the parent has not yet seen all its batches (
progress_current < progress_total) or if no longer RUNNING.The
event_nameis the operation-specific suffix used in theemitcall when the parent transitions to SUCCEEDED, e.g."store_embeddings.parent_succeeded"or"store_predictions.parent_succeeded". The DB-level event row is always written asjob.succeededso downstream consumers see a uniform name regardless of which child closed the parent.
Retry middleware¶
protea.core.retry exposes with_retry, a wrapper function used
by BaseWorker to run the execute session against transient
database errors (deadlocks, connection drops, serialisation
failures) and brief network blips. Exponential backoff with jitter;
all knobs (max_attempts, base_delay, max_delay,
jitter_ratio, predicate, on_retry) are bundled in a
RetryPolicy frozen dataclass passed via the policy keyword
argument (T-CONTEXTS, PR #237). BaseWorker instantiates a fixed
policy at call site (RetryPolicy(max_attempts=3, base_delay=1.0,
max_delay=10.0, jitter_ratio=0.3)); there is no global
TuningSettings field for these values. Added as part of F0
(T0.3) of the master plan revision 3.
Generic retry middleware for transient infrastructure errors.
Used by BaseWorker to survive Postgres deadlocks, serialization failures and brief connection interruptions without marking the job as failed. Application errors (validation, missing data) are NOT retried; only transient infrastructure conditions are.
Example:
from protea.core.retry import RetryPolicy, with_retry
def do_work():
...
with_retry(do_work, policy=RetryPolicy(max_attempts=3, base_delay=1.0))
- class protea.core.retry.RetryPolicy(max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 30.0, jitter_ratio: float = 0.5, predicate: ~collections.abc.Callable[[BaseException], bool] = <function is_retryable>, on_retry: ~collections.abc.Callable[[int, BaseException, float], None] | None = None)¶
Bases:
objectBundle of tunable retry knobs consumed by
with_retry().Frozen so a default-constructed policy can sit in a function default safely; callers override individual fields by passing a new
RetryPolicy(...)keyword. Fields:max_attempts: total attempts including the first;< 1is rejected at call time soRetryPolicy(max_attempts=0)does not silently no-op.base_delay/max_delay/jitter_ratio: exponential backoff schedule used between retryable failures.predicate: classifier returningTruefor retryable exceptions; defaults tois_retryable()(DB + transient network errors).on_retry: optional callback fired before each sleep with(attempt, exc, sleep_seconds). Defaults to a structured WARNING log underprotea.retry.
- base_delay: float = 1.0¶
- jitter_ratio: float = 0.5¶
- max_attempts: int = 3¶
- max_delay: float = 30.0¶
- on_retry: Callable[[int, BaseException, float], None] | None = None¶
- predicate() bool¶
Default predicate: retryable DB errors plus transient network errors.
- protea.core.retry.is_retryable(exc: BaseException) bool¶
Default predicate: retryable DB errors plus transient network errors.
- protea.core.retry.is_retryable_connection_error(exc: BaseException) bool¶
Return True if the exception represents a transient network condition.
These are typically raised by pika or low-level socket layers when the broker is briefly unreachable. The publisher retry loop handles these on the publish side; this predicate exists for the consume side.
- protea.core.retry.is_retryable_db_error(exc: BaseException) bool¶
Return True if the exception represents a transient DB condition.
- protea.core.retry.with_retry(fn: ~collections.abc.Callable[[~P], ~protea.core.retry.R], *args: ~typing.~P, policy: ~protea.core.retry.RetryPolicy = RetryPolicy(max_attempts=3, base_delay=1.0, max_delay=30.0, jitter_ratio=0.5, predicate=<function is_retryable>, on_retry=None), **kwargs: ~typing.~P) R¶
Run
fn(*args, **kwargs)with exponential backoff and jitter.The callable is invoked up to
policy.max_attemptstimes. After each retryable failure (perpolicy.predicate), the loop sleeps formin(policy.base_delay * 2**(attempt-1), policy.max_delay)seconds, jittered by a random factor in[1 - jitter_ratio, 1 + jitter_ratio].Exceptions that do not match
policy.predicatepropagate immediately. Oncepolicy.max_attemptsretryable failures accumulate, the last exception propagates to the caller.
Operation catalogue¶
protea.core.operation_catalog builds the singleton
OperationRegistry that workers consult at message dispatch. The
public function build_operation_registry() instantiates each
operation class and registers it under its canonical name. Adding a
new operation is a one-line edit here plus a new module under
protea/core/operations/.
Plugin discovery¶
protea.core.plugins centralises importlib.metadata.entry_points
discovery for every PROTEA plugin group (protea.backends,
protea.sources, protea.runners). discover_plugins(group)
returns a cached {name: plugin} map and hard-errors with
RuntimeError if a plugin’s name attribute drifts from its
entry-point name. reset_plugin_cache is a test-only seam for
suites that install/uninstall plugins between cases. Added in T2A.5
for backend dispatch and generalised in T2A.8 (PR #240).
Generic entry-point plugin discovery (T2A.5 + T2A.8).
Both backends (T2A.5) and runners (T2A.8) follow the same shape: a
package declares plugins in [tool.poetry.plugins."protea.<group>"]
mapping name = "module:plugin", PROTEA discovers them via
importlib.metadata.entry_points, and a resolver maps a string
identifier to a plugin instance for dispatch.
This module centralises that discovery + cache + name-mismatch hard
error so the backend and runner code paths share one implementation
instead of forking. See protea.core.runners and the
_get_backend_plugins shim in
protea.core.operations.compute_embeddings for the call sites.
- protea.core.plugins.discover_plugins(group: str) dict[str, Any]¶
Discover and cache plugins in the given
entry_pointsgroup.Returns a dict keyed by
plugin.name. Each plugin must declare anameclass attribute that matches its entry-point name; mismatches raiseRuntimeErrorat discovery time so the failure surface is a clear “your declaration drifted from your entry_points file” rather than a confusing “Unknown <something>” later on.Discovery happens once per process per group; subsequent calls return the cached map. The cache key is the group string itself, so the same group cannot be discovered twice (which keeps the import-time side effects stable across reload-heavy test runs).
- protea.core.plugins.reset_plugin_cache(group: str | None = None) None¶
Drop the cached plugin map for one (or every) group.
Test-only seam: lets unit tests force re-discovery without restarting the process. Passing
group=Noneclears every group; passing a specific name clears only that one. Production callers should not invoke this; the cache is intentionally process-local.
Feature registry
protea.core.features.registry is the central registry that maps
feature-family names to their producer functions. The ALL_FEATURES
constant lists every column the pipeline may populate; adding a column
without wiring a producer here causes a T1.8 invariant failure in the
dataset-export pipeline.
Concrete in-process FeatureRegistry for PROTEA (T2B.1).
The registry is the single point of truth for which feature columns
are computed and which families they belong to. It implements the
protea_contracts.FeatureRegistry ABC and, at import time, is
populated from the canonical
protea_contracts.FEATURE_FAMILIES map so every consumer sees
the same column set and family layout the lab was trained against.
Compute callables are intentionally placeholders in this slice (T2B.1).
T2B.2 of master plan v3.2 §24 wires the existing per-feature compute
helpers (feature_engineering, feature_enricher,
anc2vec_embeddings, pca_cache) into
CanonicalFeatureRegistry.bind_compute() so parquet_export and
_predict_batch can drive feature generation through the registry
instead of the open-coded loops they use today.
Calling a placeholder compute raises NotImplementedError with the
feature name so a premature integration fails loudly rather than
silently emitting empty columns.
A module-level REGISTRY singleton mirrors the cross-repo
contract test pinned in tests.test_feature_contract
(TestRegistryCoversContracts). The test activates automatically
on the presence of this module and pins the
registry-vs-contracts invariant.
- class protea.core.features.registry.CanonicalFeatureRegistry¶
Bases:
FeatureRegistryIn-memory
FeatureRegistrypopulated from the contracts.Two membership views co-exist:
Per-feature
Feature.familyscalar (the most specific family, used for metadata / single-tag dispatch).The overlapping
protea_contracts.FEATURE_FAMILIESmap, returned byfamilies()and consulted byselected(). This is what the lab trains against and is the invariant pinned bytests.test_feature_contract.TestRegistryCoversContracts.
Insertion order is preserved so
names()returns features in the order they were registered.- bind_compute(name: str, compute: Any) None¶
Replace the placeholder compute for
name.T2B.2 calls this once per feature at module import time to swap the
_placeholder()raiser for the real compute helper. The feature must already be registered.
- families() dict[str, list[str]]¶
Return the overlapping family map for currently registered features.
Walks
FEATURE_FAMILIESso the result mirrors the canonical contracts grouping (knnis the union ofknn_distanceandknn_voteplus the std feature, etc.). Only features actually registered are included; the map is empty for a fresh registry.
- get(name: str) Feature¶
Return the
Featureregistered undername.- Raises:
KeyError – if
nameis not registered.
- names() list[str]¶
All registered feature names, in registration order.
- register(feature: Feature) None¶
Add a feature to the registry. Re-registering an existing
nameshould raiseValueErrorso a typo never silently shadows a canonical column.
- selected(active_families: list[str], drop: list[str] | None = None) list[Feature]¶
Union the features in every requested family, then apply
drop.Preserves registration order (which mirrors
ALL_FEATURES) so the returned list is deterministic across calls. Unknown family names raiseKeyErrorto mirror the lab’s strict family validation.
- protea.core.features.registry.REGISTRY: CanonicalFeatureRegistry = <protea.core.features.registry.CanonicalFeatureRegistry object>¶
Process-wide canonical registry instance. Importing this name activates the cross-repo contract test
tests.test_feature_contract.TestRegistryCoversContractswhich pins the registry-vs-FEATURE_FAMILIESinvariant.
- protea.core.features.registry.get_canonical_registry() CanonicalFeatureRegistry¶
Return the process-wide canonical registry, building it on first use.
The registry is rebuilt only after
reset_canonical_registry()(test-only seam). Callers that need to bind real compute helpers (T2B.2) acquire the singleton and callCanonicalFeatureRegistry.bind_compute()per feature.
- protea.core.features.registry.reset_canonical_registry() None¶
Drop the cached canonical registry. Test-only seam.
Schema SHA
protea.core.schema_sha_v2 computes a 16-character deterministic
fingerprint of the feature schema version. The hash is embedded in
Dataset.schema_sha and RerankerModel.feature_schema_sha; a
mismatch at inference time indicates that the booster was trained on a
different feature set and must be rejected.
schema_sha_v2 dual-write feature flag (T1.6 of master plan v3).
The protea.infrastructure.orm.models Dataset, RerankerModel, and
ExperimentRun rows carry both the legacy schema_sha /
feature_schema_sha column and the canonical
schema_sha_v2 column added by ADR D10. Until the production
cut-over window, write paths only fill the legacy column; once the
operator flips PROTEA_SCHEMA_SHA_V2_WRITE_ENABLED=true new rows
dual-write both columns.
The flag defaults to false (off) so a normal deploy of this slice
is a pure no-op for the data plane. The accompanying alembic
migration ships the column and a one-shot backfill; production reads
still come off schema_sha until a later slice flips the routers.
Truthy values follow the same convention as
protea.api.auth._authn_required() so operators do not learn two
different env-flag dialects: 1, true, yes, on (case
insensitive). Anything else (including unset) is treated as false.
- protea.core.schema_sha_v2.is_dual_write_enabled() bool¶
Return
Truewhenschema_sha_v2should be written on inserts.The check is read-fresh each call so tests can flip the value via
pytest.MonkeyPatch.setenv()without restarting the process and operators can roll the flag forward without redeploying.
- protea.core.schema_sha_v2.maybe_v2(value: str | None) str | None¶
Pass-through helper for dual-write call sites.
Returns
valuewhen the flag is on,Noneotherwise. Lets write paths stay terse:schema_sha_v2=maybe_v2(legacy_sha)instead of an inline ternary at every insert site.
JSONB dual-write
protea.core.jsonb_dual_write provides helpers for writing structured
data to both a typed column and a JSONB fallback column in the same
transaction. Used during schema-migration windows where old and new code
may run concurrently against the same database.
Env-flag helper for the T3.1 GOPrediction.predictions_jsonb dual-write.
T3.1 scaffolds the prediction-tuple JSONB dual-write: writers continue
filling the typed prediction columns (go_term_id, distance,
evidence_code) and additionally serialise the same tuple into the
predictions_jsonb blob when the environment opt-in is set. This
keeps the scaffolding inert in production until a human-coordinated
deploy window flips the flag.
Environment variable¶
PROTEA_GO_PREDICTION_JSONB_WRITE_ENABLEDTruthy values (
1,true,yes,on; case-insensitive) enable the dual-write. Anything else (unset, empty,0,false…) leaves the writer skipping the JSONB column sopredictions_jsonbstaysNULLon new rows. Default off.
Design notes¶
The helper API is intentionally tiny so every writer site can opt in with a single import + one-line call:
from protea.core.jsonb_dual_write import maybe_jsonb
row["predictions_jsonb"] = maybe_jsonb(
[(pred["go_term_id"], pred["distance"], pred.get("evidence_code"))]
)
maybe_jsonb returns None when the flag is off (so the row
dict carries an explicit NULL for SQL) and a compact dict when on.
The compact shape is a single-level dict with two keys:
predictions: a list of{"go_term_id": int, "score": float, "evidence": str | None}records, preserving caller order.count: len(predictions), redundant but useful for cheap GIN filtering and sanity checks (WHERE predictions_jsonb @> '{"count": 5}'etc.).
The list shape lets a single row carry one or more prediction tuples without re-shaping the helper at the T3.2 backfill / T3.3 reader cut-over.
- protea.core.jsonb_dual_write.JSONB_DUAL_WRITE_ENV: str = 'PROTEA_GO_PREDICTION_JSONB_WRITE_ENABLED'¶
Environment variable name; exported for tests / docs.
- protea.core.jsonb_dual_write.is_jsonb_dual_write_enabled(env: dict[str, str] | None = None) bool¶
Return
Trueiff the env opt-in is set to a truthy value.envdefaults toos.environ; the explicit parameter is a test hook so callers can verify the helper without mutating process state.
- protea.core.jsonb_dual_write.maybe_jsonb(predictions: Sequence[tuple[int, float, str | None]], *, env: dict[str, str] | None = None) dict[str, Any] | None¶
Serialise a list of (go_term_id, score, evidence) tuples.
Returns
Nonewhen the dual-write env opt-in is off so the caller can assign the result directly to a row dict / ORM attribute without an explicit guard at every writer site.When the flag is on, returns the compact JSONB shape documented in the module docstring. Empty input still returns a well-formed dict (
{"predictions": [], "count": 0}) so downstream consumers can rely on the keys being present.
Experiment runners¶
protea.core.runners adapts the generic plugin discovery to the
protea.runners group. resolve_runner(name) maps an identifier
("knn" / "baseline" / "lightgbm") to a runner plugin
instance implementing the protea_contracts.ExperimentRunner
interface; unknown names raise ValueError listing the discovered
set. PROTEA does not yet dispatch to runners at inference time (the
active KNN + reranker path stays in PredictGOTermsBatchOperation
until F2C of master plan revision 3 hoists the inference core into a
shared package). The adapter exists so GET /v1/runners has a
stable resolver and future code has a one-line entry. Closes T2A.8
(PR #240).
Experiment-runner plugin adapter (T2A.8).
Mirrors the backend-plugin adapter in
protea.core.operations.compute_embeddings: discovers runners via
importlib.metadata.entry_points("protea.runners") and exposes a
resolve_runner() lookup that maps a string identifier (e.g.
"knn" / "baseline" / "lightgbm") to a runner plugin
instance implementing protea_contracts.ExperimentRunner.
The protea-runners package declares the canonical plugin set:
knn: KNN-only baseline (no reranker), evaluates against a frozen dataset for ablation.baseline: predict-by-frequency baseline (most-frequent GO terms in the training set).lightgbm: the v9 / v18 LightGBM reranker, currently trained out-of-tree inprotea-reranker-lab.
PROTEA does not yet dispatch to runners at inference time (the active
KNN + reranker path lives in PredictGOTermsBatchOperation and stays
there until F2C of master plan v3 hoists the inference core into a
package both PROTEA and protea-runners can depend on). The adapter
exists today so:
The plugin registry endpoint (
GET /v1/registry/runners) has a stable resolver under the hood (currently still using its own_discoverfor the API shape; that path is untouched here).Future code that wants a runner instance has a one-line entry:
from protea.core.runners import resolve_runner.The discovery + name-mismatch hard error follows the same pattern as backends, so the lab + plugin authoring workflow is uniform across both groups.
- protea.core.runners.get_runner_plugins() dict[str, Any]¶
Return the discovered runner plugins keyed by
plugin.name.Wraps
protea.core.plugins.discover_plugins()so the runner callers do not need to know the entry-point group string; the constant lives here next to the consumers.
- protea.core.runners.resolve_runner(name: str) Any¶
Resolve a runner identifier to a plugin instance.
Raises
ValueErrorlisting the discovered runners when the identifier is unknown, so the failure message is actionable. The returned object implementsprotea_contracts.ExperimentRunner.
Utilities¶
protea.core.utils provides two shared utilities used across multiple
operations.
utcnow() returns a timezone-aware UTC datetime, avoiding the common
mistake of calling datetime.utcnow() which returns a naive object.
chunks(seq, n) splits any sequence into fixed-size chunks, used by
coordinator operations to partition work into batches.
The previous UniProtHttpMixin (exponential backoff with jitter,
Retry-After header parsing, cursor extraction for paginated UniProt
REST endpoints) was inlined into InsertProteinsOperation and
FetchUniProtMetadataOperation when those operations were rewritten;
the retry/backoff/cursor logic now lives directly in each operation.
- protea.core.utils.chunks(seq: Sequence[Any], n: int) Iterable[Sequence[Any]]¶
Yield successive n-sized chunks from seq.
- protea.core.utils.utcnow() datetime¶
Return the current UTC datetime (timezone-aware).
KNN search¶
protea.core.knn_search provides the nearest-neighbour search layer used
during GO term prediction. The single public entry point is search_knn(),
which dispatches to one of two backends based on the backend parameter.
The numpy backend computes exact cosine or L2 distances via matrix multiplication. It requires no additional dependencies and is the default. For cosine distance, query and reference matrices are L2-normalised and the distance is computed as \(D = 1 - \cos(\theta) \in [0, 2]\). This is \(O(NQ)\) and is appropriate for reference sets up to approximately 100 000 proteins when embeddings fit in RAM as float16.
The faiss backend wraps the FAISS library and supports three index
types: Flat (exact), IVFFlat (approximate, Voronoi partitioning),
and HNSW (approximate, hierarchical graph). IVFFlat is recommended
for datasets above 100 000 vectors: it restricts search to the nprobe
nearest Voronoi cells, reducing query time from \(O(N)\) to approximately
\(O(\sqrt{N})\) with negligible recall loss at default settings.
Important
KNN search is never performed at the database layer. pgvector index types (HNSW, IVFFlat) are not used. All search happens in Python after loading reference embeddings into a numpy array. See Predict GO terms in the how-to guides.
Feature engineering¶
protea.core.feature_engineering enriches each query–reference pair in a
prediction result with sequence-level and phylogenetic signals. These features
are opt-in: they are computed only when compute_alignments=true and/or
compute_taxonomy=true are set in the prediction payload.
Pairwise alignment is computed via the parasail library using the
BLOSUM62 substitution matrix with gap-open/extend penalties of 10/1. Both
global (Needleman–Wunsch) and local (Smith–Waterman) alignments are run for
each pair, producing identity, similarity, raw score, gap percentage, and
alignment length for each. These metrics capture sequence similarity beyond
what the embedding distance alone encodes, which is especially valuable for
distant homologues where embedding geometry may be unreliable.
Taxonomic distance is computed via ete3 and the NCBI taxonomy tree
(local SQLite, downloaded on first use). For each (query, reference) pair
where taxonomy IDs are available from UniProt metadata, PROTEA finds the
lowest common ancestor and computes the edge count through it. Results are
cached with an LRU cache keyed by taxon-ID pair to avoid redundant tree
traversals across a batch.
Feature engineering utilities for functional annotation enrichment.
Provides pairwise alignment metrics (Needleman–Wunsch and Smith–Waterman) via parasail and taxonomic distance computation via ete3 NCBITaxa.
These features complement the embedding-space KNN distance stored in
GOPrediction.distance with sequence-level and phylogenetic signals.
Performance notes:
Alignment is O(m*n) per pair; parasail uses SIMD acceleration.
Taxonomy lookups use an LRU cache over lineage queries (ete3 local SQLite). First call may trigger a DB download if the ete3 database is absent.
- protea.core.feature_engineering.compute_alignment(seq1: str, seq2: str) dict[str, Any]¶
Compute both NW and SW alignment metrics in one call.
- protea.core.feature_engineering.compute_nw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]¶
Global alignment (Needleman–Wunsch) via parasail/BLOSUM62.
- Returns a dict with keys:
identity_nw, similarity_nw, alignment_score_nw, gaps_pct_nw, alignment_length_nw, length_query, length_ref
- protea.core.feature_engineering.compute_sw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]¶
Local alignment (Smith–Waterman) via parasail/BLOSUM62.
- Returns a dict with keys:
identity_sw, similarity_sw, alignment_score_sw, gaps_pct_sw, alignment_length_sw
- protea.core.feature_engineering.compute_taxonomy(t1_raw: Any, t2_raw: Any) dict[str, Any]¶
Compute taxonomic distance between two NCBI taxonomy IDs.
- Returns a dict with keys:
taxonomic_lca, taxonomic_distance, taxonomic_common_ancestors, taxonomic_relation
- protea.core.feature_engineering.warmup_taxonomy_db() None¶
Pre-initialize the NCBITaxa database.
Call at worker startup so the download (~100 MB on first run) happens before any batch is processed, not mid-flight.
Re-ranker¶
protea.core.reranker implements a LightGBM binary classifier that
re-scores GO term predictions using 20 numeric features (embedding distance,
NW/SW alignment metrics, sequence lengths, taxonomic distance and common
ancestors, and 5 aggregate re-ranker signals) plus 3 categorical features
(qualifier, evidence code, taxonomic relation). The full feature list is
documented in train_reranker.
The module provides:
prepare_dataset(df): extracts and coerces feature columns. Numeric columns are coerced witherrors="coerce"(invalid strings becomeNaN); categorical columns are converted to pandascategorydtype, which LightGBM consumes directly without manual label encoding.train(df): stratified positive/negative split with early-stopping on a held-out validation set (default 20 %). Returns aTrainResultwith the Booster, validation metrics (AUC, logloss, precision, recall, F1 at the 0.5 threshold), the best boosting iteration, and gain-based feature importance.predict(model, df): returns probability scores in[0, 1].model_to_string()/model_from_string(): serialization for DB storage in theRerankerModeltable.load_training_tsv(): parses a training data TSV as produced by the/scoring/prediction-sets/{id}/training-data.tsvendpoint.
Note
load_reranker / apply_reranker / infer_active_feature_families
were originally split into a sibling protea.core.reranking module;
they were folded back into protea.core.reranker to remove a naming
trap (reranker vs reranking were impossible to grep apart).
This module is now the single inference-side surface.
Parquet export (protea.core.parquet_export)¶
protea.core.parquet_export consolidates per-split, per-category
parquet shards produced by the KNN + feature pipeline into the frozen
dataset layout consumed by protea-reranker-lab: exactly
train.parquet, eval.parquet and manifest.json under a single
directory. The manifest follows ManifestV1 (schema version 2)
and records PROTEA’s producer_version + producer_git_sha.
The single public function export_reranker_parquets(...) is shared
by two callers:
training_dump_helpers._dump_frozen_dataset: thin wrapper that uses this helper to emit the dataset alongside a training-data dump.ExportResearchDatasetOperation: stand-alone operation that only materialises and publishes the dataset, without running LightGBM.
When store is provided, the three consolidated files are
additionally uploaded under key_prefix using the ArtifactStore
interface, and the returned dict includes train_uri / eval_uri
/ manifest_uri.
Scoring¶
protea.core.scoring implements the scoring engine that applies weighted
formulas to GO predictions. A ScoringConfig defines a set of weights for
each feature column (embedding distance, alignment metrics, taxonomy, re-ranker
features). The engine computes a composite score per prediction row and can
stream scored results as TSV or compute CAFA-style metrics (Fmax, AUC-PR)
against an evaluation set.
Scoring engine for GOPrediction rows.
Applies a ScoringConfig
formula to raw prediction signals and returns a normalised [0, 1] confidence score.
The engine is intentionally stateless: every call to compute_score()
is self-contained, which means any ScoringConfig can be applied to an
existing PredictionSet at any time without re-running the KNN search.
Evidence-code weights¶
Evidence code quality is resolved through a two-level lookup:
If
config.evidence_weightsis notNone, that dict is checked first.For codes absent from the override (or when no override exists), the module- level
DEFAULT_EVIDENCE_WEIGHTStable is used.Codes unknown to both tables fall back to
DEFAULT_EVIDENCE_WEIGHT_FALLBACK(0.5).
This means a ScoringConfig may carry a partial override — e.g. zeroing
the IEA weight for an experiment-only study — without having to redeclare
every other code. The resolution order ensures backwards compatibility:
configs stored without evidence_weights behave identically to older
configs.
- protea.core.scoring.compute_score(pred: dict[str, Any], config: ScoringConfig) float¶
Compute a [0, 1] confidence score for a single GOPrediction dict.
Signals are normalised to [0, 1] then weighted-averaged. Signals whose value is
None(feature-engineering flag off at predict time) drop from both numerator and denominator. Returnsround(base_score, 6);FORMULA_EVIDENCE_WEIGHTEDmultiplies the average by the resolved evidence weight even when the evidence_weight signal carries weight 0 — so IEA / ND annotations get down-ranked regardless of feature configuration.
- protea.core.scoring.evidence_weight(code: str | None, *, overrides: dict[str, float] | None = None) float¶
Resolve the [0, 1] quality weight for a GO evidence code or ECO ID.
Resolution order¶
Normalise code from ECO ID to GO code via
ECO_TO_CODEif needed.Look up the normalised code in overrides (if provided).
Fall back to
DEFAULT_EVIDENCE_WEIGHTS.If still not found, return
DEFAULT_EVIDENCE_WEIGHT_FALLBACK.
- param code:
A GO evidence code (e.g.
"IEA") or an ECO URI (e.g."ECO:0000501").Nonereturns the fallback weight.- param overrides:
Optional per-config evidence weight table. May be a partial dict; codes not present here are resolved via
DEFAULT_EVIDENCE_WEIGHTS.- rtype:
float in [0, 1].
- protea.core.scoring.propagate_ground_truth_to_ancestors(ground_truth: dict[str, set[str]], parent_map: dict[str, set[str]]) dict[str, set[str]]¶
Expand each protein’s GO set with every ancestor in
parent_map.Returns a new dict; the input is not modified. Pair with
propagate_scores_to_ancestors()when the downstream metric treats predictions and ground truth symmetrically (e.g. cafaeval-style Fmax).
- protea.core.scoring.propagate_scores_to_ancestors(scored_predictions: list[dict[str, Any]], parent_map: dict[str, set[str]]) list[dict[str, Any]]¶
Max-propagate GO scores to ancestors per protein (True Path Rule).
For every (protein, go_id, score) row, ensures every ancestor
aofgo_idinparent_mapalso has score ≥ that row’s score for the same protein. The final score of an ancestor is the max over all its descendants predicted for the protein (including itself if predicted).This mirrors cafaeval’s behaviour: annotating a child implies every ancestor, so predictions must propagate upward before PR metrics.
parent_mapmaps a childgo_id(string) to the set of its direct parentgo_idstrings — typically the union ofis_a+part_ofedges for the relevant ontology snapshot.Returns a new list; the input is not modified. Rows that share a (protein, go_id) collapse to a single row keyed by the max score.
- protea.core.scoring.score_predictions(predictions: list[dict[str, Any]], config: ScoringConfig) list[dict[str, Any]]¶
Add a
scorekey to each prediction dict and return them sorted descending.- Parameters:
predictions – List of raw prediction dicts (same format as accepted by
compute_score()).config – The
ScoringConfigto apply.
- Returns:
A new list with a
scorekey added to each item, sorted by score indescending order. The original list is not modified.
Metrics¶
protea.core.metrics implements CAFA-style precision-recall evaluation.
Provides functions for computing Fmax (maximum F-measure over all thresholds),
weighted precision/recall, and coverage for a set of predictions against
ground-truth annotations.
CAFA-style precision-recall metrics for GO term prediction evaluation.
Takes scored GOPrediction rows and EvaluationData (ground truth) and computes Fmax, AUC-PR, and the full precision-recall curve following the CAFA protocol.
CAFA protocol summary¶
Evaluate only on proteins present in the ground truth (NK, LK, or PK).
At each score threshold
t:precision(t) = mean over proteins-with-predictions of card(pred & true) / card(pred) recall(t) = mean over ALL ground-truth proteins of card(pred & true) / card(true)
Fmax = max_t(2 * P(t) * R(t) / (P(t) + R(t)))AUC-PR via trapezoidal integration of the PR curve.
Note: This implementation uses exact GO term matching (no DAG propagation). Ancestor propagation is intentionally left for a future iteration.
- class protea.core.metrics.CAFAMetrics(category: str, fmax: float, threshold_at_fmax: float, auc_pr: float, n_ground_truth_proteins: int, n_predicted_proteins: int, n_predictions: int, curve: list[PRPoint] = <factory>)¶
Bases:
objectCAFA evaluation results for one (PredictionSet, ScoringConfig, category) triple.
- auc_pr: float¶
- category: str¶
- fmax: float¶
- n_ground_truth_proteins: int¶
- n_predicted_proteins: int¶
- n_predictions: int¶
- summary() dict[str, Any]¶
- threshold_at_fmax: float¶
- class protea.core.metrics.PRPoint(threshold: 'float', precision: 'float', recall: 'float', f1: 'float')¶
Bases:
object- f1: float¶
- precision: float¶
- recall: float¶
- threshold: float¶
- protea.core.metrics.compute_cafa_metrics(scored_predictions: list[dict[str, Any]], evaluation_data: EvaluationData, category: str = 'nk') CAFAMetrics¶
Compute CAFA Fmax and PR curve.
scored_predictionsmust carryprotein_accession,go_id,scoreper row.categoryselectsnk/lk/pkfrom the evaluation ground truth.
Evidence codes¶
protea.core.evidence_codes provides mappings between ECO (Evidence and
Conclusion Ontology) identifiers and GO evidence codes used in GAF files.
Used by the QuickGO annotation loader to resolve ECO IDs to standard
three-letter evidence codes.
GO evidence code utilities.
Provides normalisation from ECO IDs to canonical GO evidence codes and classification of codes as experimental vs. non-experimental.
- Mapping source (canonical ECO → GO code):
https://github.com/evidenceontology/evidenceontology/blob/master/gaf-eco-mapping.txt
Additional ECO IDs used by UniProt GAF (not in the canonical mapping file) are resolved here using the EBI QuickGO ECO term definitions; all are “used in automatic assertion” and therefore map to IEA.
- protea.core.evidence_codes.is_experimental(code: str) bool¶
Return True if code (GO or ECO) represents experimental evidence.
Examples:
is_experimental("IDA") # True is_experimental("ECO:0000314") # True (IDA) is_experimental("IEA") # False is_experimental("ECO:0000501") # False (IEA)
- protea.core.evidence_codes.normalize(code: str) str¶
Return the canonical GO evidence code for code.
If code is already a GO root evidence code it is returned as-is. ECO IDs are translated via
ECO_TO_CODE. Unknown codes are returned unchanged so that no information is silently discarded.Examples:
normalize("IDA") # → "IDA" normalize("ECO:0000314") # → "IDA" normalize("ECO:0000256") # → "IEA" normalize("UNKNOWN") # → "UNKNOWN"
Evaluation¶
protea.core.evaluation implements the CAFA5 evaluation protocol for
computing the ground-truth delta between two annotation snapshots.
The module’s central data structure is EvaluationData, a frozen dataclass
that holds the NK, LK, PK, known, and pk_known annotation dictionaries.
Each dictionary maps a protein accession to a set of GO term IDs.
EvaluationData fields:
nk: delta annotations for No-Knowledge proteins (no prior annotations in any namespace at t0).lk: delta annotations for Limited-Knowledge proteins (had annotations in some namespaces but gained new terms in a previously empty namespace).pk: novel annotations for Partial-Knowledge proteins (gained new terms in a namespace where they already had annotations).pk_known: old experimental annotations for PK proteins in the relevant namespaces; passed as-knowntocafaevalto exclude them from scoring.known: all old experimental annotations flattened across namespaces; available for download via the reference endpoint.
The public entry point is compute_evaluation_data(session,
old_annotation_set_id, new_annotation_set_id, ontology_snapshot_id).
It loads the GO DAG for NOT-propagation, builds a per-namespace annotation
map for both old and new sets, and classifies each (protein, namespace) pair
into NK, LK, or PK. The same protein can appear in multiple categories across
different namespaces simultaneously (e.g., LK in CCO and PK in BPO).
CAFA-style evaluation data computation.
This module computes the ground-truth delta between two AnnotationSets (old → new) following the official CAFA5 evaluation protocol:
Experimental evidence codes only (EXP, IDA, IMP, …)
NOT-qualifier annotations are excluded — including their GO descendants propagated transitively through the is_a / part_of DAG.
Classification is per (protein, namespace), not globally per protein:
- NK — protein had NO experimental annotations in ANY namespace at t0.
All novel terms across all namespaces are ground truth.
- LK — protein had annotations in SOME namespaces at t0, but NOT in
namespace S. Novel terms in S are ground truth for LK.
- PK — protein had annotations in namespace S at t0 AND gained new terms
in S at t1. Novel terms in S are ground truth for PK; old terms in S are the
-knownfile for the CAFA evaluator.Note: the same protein can be LK in one namespace and PK in another simultaneously (e.g. had MFO+BPO at t0, gains CCO → LK in CCO, gains new BPO → PK in BPO).
When the two annotation sets use different OntologySnapshots,
compute_evaluation_data_reconciled implements the CAFA reconciliation
protocol: propagate ancestors under each side’s native DAG, intersect with a
frozen pivot snapshot, then re-propagate is handled by downstream cafaeval
(idempotent under closure semantics). Reference implementation:
anphan0828/democafa_package, utils/ontology.filter_terms_given_obo.
- Output format (matching CAFA evaluator): 2-column TSV, no header.
protein_accession t go_id
- class protea.core.evaluation.EvalContext(old_annotation_set_id: uuid.UUID, new_annotation_set_id: uuid.UUID, ontology_snapshot_id: uuid.UUID)¶
Bases:
NamedTupleThe (old, new, snapshot) triple that identifies one evaluation delta.
Bundles the three IDs that travel together through the metrics endpoints (
/scoring/prediction-sets/{id}/metrics) and thecompute_evaluation_data()/_reconciledhelpers, keeping downstream signatures under the master-plan §3 6-param ceiling.- new_annotation_set_id: UUID¶
Alias for field number 1
- old_annotation_set_id: UUID¶
Alias for field number 0
- ontology_snapshot_id: UUID¶
Alias for field number 2
- class protea.core.evaluation.EvaluationData(nk: dict[str, set[str]]=<factory>, lk: dict[str, set[str]]=<factory>, pk: dict[str, set[str]]=<factory>, known: dict[str, set[str]]=<factory>, pk_known: dict[str, set[str]]=<factory>)¶
Bases:
objectComputed ground-truth delta between two annotation sets.
- property delta_proteins: int¶
- known: dict[str, set[str]]¶
- property known_terms_count: int¶
- lk: dict[str, set[str]]¶
- property lk_annotations: int¶
- property lk_proteins: int¶
- nk: dict[str, set[str]]¶
- property nk_annotations: int¶
- property nk_proteins: int¶
- pk: dict[str, set[str]]¶
- property pk_annotations: int¶
- pk_known: dict[str, set[str]]¶
- property pk_proteins: int¶
- stats() dict¶
- protea.core.evaluation.compute_evaluation_data(session: Session, old_annotation_set_id: UUID, new_annotation_set_id: UUID, ontology_snapshot_id: UUID) EvaluationData¶
Compute NK/LK/PK ground truth following the CAFA5 protocol.
Classification is per
(protein, namespace):NK — protein had no experimental annotations in any namespace at
t0.LK — protein had annotations in some namespaces at
t0, but not in namespaceS; gained new terms inS→ those terms are LK ground truth.PK — protein had annotations in namespace
Satt0and gained new terms inS→ those novel terms are PK ground truth; old terms inSare stored inpk_knownfor the cafaeval-knownflag.
The same protein can be simultaneously LK in one namespace and PK in another.
- protea.core.evaluation.compute_evaluation_data_reconciled(session: Session, old_annotation_set_id: UUID, new_annotation_set_id: UUID, old_native_snapshot_id: UUID, new_native_snapshot_id: UUID, pivot_snapshot_id: UUID) EvaluationData¶
CAFA-compliant evaluation delta across mismatched ontology snapshots.
Applies the democafa
filter_terms_given_oboprotocol per side:Load experimental annotations under each set’s native snapshot.
Propagate ancestors under the native DAG (True Path Rule).
Intersect with the pivot snapshot’s term universe.
Step 4 (re-propagate under pivot) is handled by cafaeval downstream, which applies ancestor propagation before scoring —
prop(prop(x)) == prop(x).NOT-qualifier exclusion preserves PROTEA’s True Path Rule contrapositive: NOT terms are propagated to descendants under the native DAG, intersected with pivot, then further propagated under the pivot DAG. The union across both annotation sets is applied to both sides, matching same-snapshot behaviour.
- protea.core.evaluation.deserialize_evaluation_data_from_bytes(blob: bytes) EvaluationData¶
Parse parquet bytes (as returned by
ArtifactStore.get) into EvaluationData.
- protea.core.evaluation.groundtruth_key_for(eval_set_id) str¶
Storage key under which an EvaluationSet’s ground-truth parquet lives.
- protea.core.evaluation.load_evaluation_data_for_set(session: Session, eval_set) tuple[EvaluationData, UUID]¶
Load ground-truth for an EvaluationSet row.
Strict reuse path: if
eval_set.groundtruth_uriis set, deserializes the persisted parquet via the configured ArtifactStore and returns it. If not set, raises — recomputation on-the-fly is intentionally not allowed (see the project’s “no on-the-fly reuse” rule). Usescripts/backfill_evaluation_groundtruth.pyto materialize artifacts for legacy EvaluationSet rows that predate this column.Returns the EvaluationData plus the pivot OntologySnapshot ID — the caller should use the pivot snapshot (not the old set’s) when loading the OBO for cafaeval, since propagated go_ids live in pivot term space.
- protea.core.evaluation.serialize_evaluation_data_to_parquet(data: EvaluationData, dest: Path) Path¶
Write
datato a parquet file atdest(creates parent dirs).Returns
destfor convenience. Uses snappy compression and the long layout produced by_eval_data_to_dataframe— kept stable since consumers parse it back withdeserialize_evaluation_data_from_bytes.
Provenance¶
protea.core.provenance provides capture_provenance(extra=None),
a side-effect-free runtime snapshot for jobs / experiments / artefacts
to carry an audit trail without DB or network probes. Returns a fresh
dict[str, Any] with auto-keys protea_version (from
importlib.metadata), protea_git_sha (delegates to
parquet_export.resolve_protea_git_sha), python_version,
platform, hostname, and captured_at (ISO-8601 UTC). Any
caller-supplied extra mapping is overlaid last, so callers always
win on key collisions.
Every probe is wrapped: missing distribution metadata, a non-git
checkout, or an absent git binary all degrade to None rather
than raising. Added in T3.11 of master plan v3.2 §24 Fase 4.
Operations¶
PROTEA ships a curated set of registered operation instances at worker
startup via protea.core.operation_catalog.build_operation_registry,
which is the authoritative list (read the function body for the live
count).
The catalog splits into job-backed entries (reachable through
POST /jobs) and ephemeral consumers (dispatched internally by the
compute_embeddings and predict_go_terms coordinators; see
Operations for that taxonomy). Each operation is a
class that implements the Operation protocol: a name string and
an execute method. Operations are stateless with respect to
infrastructure: they receive a session and emit structured events, but
do not open connections or manage transactions. The job-backed entries
are documented below; the four ephemeral siblings
(compute_embeddings_batch, store_embeddings,
predict_go_terms_batch, store_predictions) live in
Operations.
- ping
Smoke-test operation. Returns immediately with a success result. Used to verify end-to-end connectivity between the API, RabbitMQ, and worker processes.
- class protea.core.operations.ping.PingOperation(*args, **kwargs)¶
Bases:
Operation- description: str = 'Smoke-test operation that emits two events and returns immediately.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name: str = 'ping'¶
- summarize_payload(payload: dict[str, Any]) str¶
- insert_proteins
Fetches protein sequences from the UniProt REST API using cursor-based FASTA streaming. Sequences are deduplicated by MD5 hash before upsert; proteins are upserted by accession. Exponential backoff with jitter and
Retry-Afterheader handling are implemented inline in the operation. Isoforms are parsed and stored separately, sharing the canonical sequence where the amino-acid string is identical.
- class protea.core.operations.insert_proteins.InsertProteinsOperation¶
Bases:
OperationFetches protein sequences from UniProt (FASTA) and upserts them into the DB.
Uses cursor-based pagination, exponential backoff with jitter, and MD5-based sequence deduplication. Many proteins can share one Sequence row. Isoforms (
<canonical>-<n>) are stored as separate Protein rows grouped bycanonical_accession.- description: str = 'Fetch protein sequences from UniProt (FASTA, cursor-paginated) and upsert Protein + Sequence rows; isoforms are stored grouped by canonical accession.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name: str = 'insert_proteins'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.insert_proteins.InsertProteinsPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, include_isoforms: bool = True, compressed: bool = False, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/insert_proteins (contact: you@example.org)')¶
Bases:
ProteaPayload- backoff_base_seconds: NonNegativeFloat¶
- backoff_max_seconds: NonNegativeFloat¶
- compressed: bool¶
- include_isoforms: bool¶
- jitter_seconds: NonNegativeFloat¶
- max_retries: PositiveInt¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- page_size: PositiveInt¶
- search_criteria: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- user_agent: str¶
- fetch_uniprot_metadata
Downloads TSV functional annotation data from UniProt and upserts
ProteinUniProtMetadatarows keyed by canonical accession. Fields include functional description, EC numbers, pathway membership, and kinetics. Isoforms inherit metadata through thecanonical_accessionjoin; no duplicate rows are created.
- class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataOperation¶
Bases:
objectFetches functional annotations from UniProt (TSV) and upserts ProteinUniProtMetadata rows.
One metadata row is stored per canonical accession. Isoforms share the same metadata record. Optionally updates core Protein fields (reviewed, organism, gene_name, length) if they are missing. Uses the same cursor-based pagination and backoff strategy as InsertProteinsOperation.
- FIELD_MAP: dict[str, str] = {'absorption': 'Absorption', 'active_site': 'Active site', 'activity_regulation': 'Activity regulation', 'binding_site': 'Binding site', 'catalytic_activity': 'Catalytic activity', 'cofactor': 'Cofactor', 'dna_binding': 'DNA binding', 'ec_number': 'EC number', 'features': 'Features', 'function_cc': 'Function [CC]', 'keywords': 'Keywords', 'kinetics': 'Kinetics', 'pathway': 'Pathway', 'ph_dependence': 'pH dependence', 'redox_potential': 'Redox potential', 'rhea_id': 'Rhea ID', 'site': 'Site', 'temperature_dependence': 'Temperature dependence'}¶
- UNIPROT_FIELDS: list[str] = ['accession', 'reviewed', 'id', 'protein_name', 'gene_names', 'organism_name', 'length', 'absorption', 'ft_act_site', 'ft_binding', 'cc_catalytic_activity', 'cc_cofactor', 'ft_dna_bind', 'ec', 'cc_activity_regulation', 'cc_function', 'cc_pathway', 'kinetics', 'ph_dependence', 'redox_potential', 'rhea', 'ft_site', 'temp_dependence', 'keyword', 'feature_count']¶
- description = 'Fetch functional annotations (TSV) from UniProt and upsert ProteinUniProtMetadata rows keyed by canonical accession.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'fetch_uniprot_metadata'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, compressed: bool = True, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/fetch_uniprot_metadata (contact: you@example.org)', commit_every_page: bool = True, update_protein_core: bool = True)¶
Bases:
ProteaPayload- backoff_base_seconds: NonNegativeFloat¶
- backoff_max_seconds: NonNegativeFloat¶
- commit_every_page: bool¶
- compressed: bool¶
- jitter_seconds: NonNegativeFloat¶
- max_retries: PositiveInt¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- page_size: PositiveInt¶
- search_criteria: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- update_protein_core: bool¶
- user_agent: str¶
- load_ontology_snapshot
Downloads a GO OBO file and populates
OntologySnapshot,GOTerm, andGOTermRelationshiprows. Theobo_versionfield carries a unique constraint so that re-importing the same release is idempotent. If a snapshot already exists but its relationships are missing, they are backfilled automatically.
- class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotOperation¶
Bases:
objectDownloads a go.obo file and upserts an OntologySnapshot + GOTerm rows.
The
data-version:header of the OBO file is used as the canonical version identifier (e.g.releases/2024-01-17). If a snapshot with that version already exists, the operation is a no-op and returns the existing snapshot id, making it safe to re-run.GO term aspect is mapped from the OBO
namespacefield:biological_process→ P,molecular_function→ F,cellular_component→ C.- description = 'Download a GO OBO file and persist it as an OntologySnapshot with its GOTerm + GOTermRelationship rows; idempotent on the OBO data-version.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'load_ontology_snapshot'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotPayload(*, obo_url: str, timeout_seconds: int = 120, force_relationships: bool = False)¶
Bases:
ProteaPayload- force_relationships: bool¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- obo_url: str¶
- timeout_seconds: int¶
- load_goa_annotations
Bulk-loads a GAF (Gene Association Format) file. Annotations are filtered against canonical accessions present in the database, avoiding orphaned foreign keys. Each batch is committed independently to bound transaction size.
- class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsOperation¶
Bases:
objectStreams a GOA GAF file (gzip or plain) and upserts ProteinGOAnnotation rows.
The GAF file is streamed line by line from
gaf_url; it is never fully loaded into memory, making it suitable for the full UniProt GAF (hundreds of millions of lines).Only accessions present in the
proteintable are stored; all others are silently skipped. The canonical accession set is loaded once from the DB at the start of the operation.- GAF 2.2 columns used (1-indexed, tab-separated):
2 → DB_Object_ID (accession) 5 → GO ID 4 → Qualifier 7 → Evidence Code 6 → DB:Reference 8 → With/From 15 → Assigned By 14 → Date (YYYYMMDD)
- description = 'Stream a UniProt GOA GAF release line by line and bulk-insert ProteinGOAnnotation rows for accessions already present in the DB.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'load_goa_annotations'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsPayload(*, ontology_snapshot_id: str, gaf_url: str, source_version: str, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None)¶
Bases:
ProteaPayload- commit_every_page: bool¶
- gaf_url: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- ontology_snapshot_id: str¶
- page_size: PositiveInt¶
- source_version: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- load_quickgo_annotations
Streams GO annotations from the QuickGO bulk download API (paginated TSV). Supports optional ECO→GO evidence code mapping and per-page commits. Filters out annotations whose accessions are not already in the database.
- class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsOperation¶
Bases:
objectStreams GO annotations from the QuickGO bulk download API.
Proteins to annotate are determined by the canonical accessions already present in the DB; no external FASTA or accession list is needed.
- The QuickGO TSV columns used:
GENE PRODUCT ID → protein accession GO TERM → GO identifier QUALIFIER → qualifier (enables, involved_in…) ECO ID → mapped to evidence_code via eco_mapping_url (or stored raw) REFERENCE → db_reference WITH/FROM → with_from ASSIGNED BY → assigned_by DATE → annotation_date
- description = "Stream GO annotations from QuickGO's bulk download endpoint and insert ProteinGOAnnotation rows for accessions already present in the DB."¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'load_quickgo_annotations'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsPayload(*, ontology_snapshot_id: str, source_version: str, quickgo_base_url: str = 'https://www.ebi.ac.uk/QuickGO/services/annotation/downloadSearch', gene_product_ids: list[str] | None = None, use_db_accessions: bool = True, eco_mapping_url: str | None = None, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, gene_product_batch_size: Annotated[int, Gt(gt=0)] = 200)¶
Bases:
ProteaPayloadPayload for loading GO annotations from the QuickGO bulk download endpoint.
QuickGO returns a single streamed TSV filtered by the canonical accessions already present in the DB; no external accession list is needed.
eco_mapping_url(optional) points to a GAF-ECO mapping file (space-separated:ECO:XXXXXXX CODE). When provided, ECO IDs are resolved to GO evidence codes (IDA, IEA…) before insertion. If omitted, the raw ECO ID is stored as-is inevidence_code.- commit_every_page: bool¶
- eco_mapping_url: str | None¶
- gene_product_batch_size: PositiveInt¶
- gene_product_ids: list[str] | None¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- ontology_snapshot_id: str¶
- page_size: PositiveInt¶
- quickgo_base_url: str¶
- source_version: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- use_db_accessions: bool¶
- compute_embeddings
Coordinator operation that partitions the target sequence set into batches and dispatches one
compute_embeddings_batchmessage per batch toprotea.embeddings.batch. The coordinator serialises on theprotea.embeddingsqueue (one at a time) to prevent concurrent model loads from exhausting GPU memory. Batch and write workers scale independently. Returnsdeferred=True; the parent job is closed by the last write worker.
- predict_go_terms
Coordinator operation that loads reference embeddings into a process-level float16 cache, partitions the query set into batches, and dispatches one
predict_go_terms_batchmessage per batch toprotea.predictions.batch. Feature engineering (alignments, taxonomy) is opt-in via payload flags. Returnsdeferred=True; the parent job is closed by the last write worker.
- generate_evaluation_set
Computes the NK/LK/PK evaluation delta between two annotation sets using the CAFA5 protocol (experimental evidence only, NOT-propagation through the GO DAG, per-namespace classification). Stores an
EvaluationSetrow with summary statistics. Ground-truth files are generated on-demand by the download endpoints.
- class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetOperation¶
Bases:
objectComputes the CAFA evaluation delta between two GOA annotation sets.
Applies experimental evidence code filtering, NOT-qualifier exclusion with GO DAG descendant propagation, and classifies delta proteins into NK/LK.
Stores an EvaluationSet row with summary statistics. The actual ground-truth rows are computed on-demand by the download endpoints using the same logic.
- description = 'Compute the CAFA delta between an old and a new GOA annotation set, split delta proteins into NK/LK and persist an EvaluationSet. Supports cross-OBO reconciliation via an optional pivot snapshot.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'generate_evaluation_set'¶
- summarize_payload(payload: dict[str, Any], *, session: Session | None = None) str¶
- class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetPayload(*, old_annotation_set_id: str, new_annotation_set_id: str, pivot_ontology_snapshot_id: str | None = None)¶
Bases:
ProteaPayload- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- new_annotation_set_id: str¶
- old_annotation_set_id: str¶
- pivot_ontology_snapshot_id: str | None¶
- classmethod pivot_opt_non_empty(v)¶
- run_cafa_evaluation
Runs
cafaevalfor NK, LK, and PK settings against a given prediction set. Downloads the OBO file, writes ground-truth and prediction TSVs, callscafa_eval()three times (NK and LK without-known, PK withpk_known_terms.tsvas-known), and persists anEvaluationResultrow with per-namespace Fmax, precision, recall, τ, and coverage.
- load_interpro_go_mapping
Downloads and persists the InterPro-to-GO mapping file, linking InterPro domain entries to their associated GO terms. Used as a prerequisite step before running InterProScan-based annotation.
IP.4a — InterPro2GO mapping loader.
Streams the EBI interpro2go flat file (one line per InterPro entry
to GO term mapping) and upserts rows into interpro_go_mapping.
The file is small enough (few thousand lines, ~1 MB) to hold in
memory, but the operation still parses lazily so a partial read is
graceful.
Idempotent: the unique key
(ipr_accession, go_id, source_version) is enforced by an
ON CONFLICT DO NOTHING upsert, so re-running the operation for
the same release is a no-op.
InterPro2GO line format (text, comment-prefixed with !):
InterPro:IPR000001 Kringle > GO:protein binding ; GO:0005515
The pattern is:
InterPro:<IPR_ACCESSION> <human_label> > GO:<label> ; <GO_ID>
Evidence is implicit (curator-asserted IEA in the consuming pipeline)
so the evidence column captures the literal upstream label
("IEA" by default) without inventing extra metadata.
- class protea.core.operations.load_interpro_go_mapping.LoadInterProGoMappingOperation¶
Bases:
objectStream the EBI InterPro2GO file and upsert mapping rows.
- description = 'Download the EBI InterPro2GO flat file and upsert (ipr_accession, go_id) rows into interpro_go_mapping; idempotent per source_version.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'load_interpro_go_mapping'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.load_interpro_go_mapping.LoadInterProGoMappingPayload(*, source_version: str, mapping_url: str = 'https://ftp.ebi.ac.uk/pub/databases/GO/goa/external2go/interpro2go', evidence: str = 'IEA', timeout_seconds: Annotated[int, Gt(gt=0)] = 120, chunk_size: Annotated[int, Gt(gt=0)] = 1000)¶
Bases:
ProteaPayloadPayload for
load_interpro_go_mapping.mapping_urldefaults to the canonical EBI endpoint so callers can dispatch the op without a payload body in the common case.source_versionis required so the snapshot is queryable later; pass the InterPro release tag (e.g."InterPro-104.0") here.- chunk_size: PositiveInt¶
- evidence: str¶
- mapping_url: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- source_version: str¶
- timeout_seconds: PositiveInt¶
- run_interproscan_batch
Submits sequences to the EBI InterProScan REST API in configurable batch sizes. Stores
InterproAnnotationrows linking each protein to its domain signatures. Supports resume via previously stored job IDs.
run_interproscan_batch operation.
Runs InterProScan against a batch of proteins in fixed-size chunks,
parses the resulting TSV through the InterProSource plugin
(IP.1b), and bulk-inserts InterProAnnotation rows (IP.2).
Inputs are either a query_set_id (resolved via
QuerySetEntry joins) or an explicit accessions list. An
optional ipr_release_floor lets re-runs at a newer release skip
proteins whose latest persisted annotation already meets the floor.
Per-chunk commits + emit progress events make the run resumable:
re-dispatching the same payload picks up where the prior run left off
because already-annotated proteins are filtered out via the same
ipr_release_floor check on every invocation.
Out of scope for IP.3 (see slice spec):
The payload class lives in PROTEA for now; promotion to
protea-contractsis a follow-up bookkeeping commit.InterProScan installation runbook (post-defensa).
- class protea.core.operations.run_interproscan_batch.RunInterProScanBatchOperation¶
Bases:
objectAnnotate a batch of proteins with InterProScan and persist hits.
Workflow:
Resolve target accessions (query set or explicit list).
Drop already-annotated ones if a
ipr_release_flooris set.Fetch sequences in one DB roundtrip.
For each
chunk_sizechunk: write a temp FASTA, invokeInterProSource.run, bulk-insert the parsed records, commit (optionally), and emit a progress event.
- description = 'Run InterProScan on a batch of proteins in fixed-size chunks and persist parsed domain hits into the interpro_annotation table.'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'run_interproscan_batch'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.run_interproscan_batch.RunInterProScanBatchPayload(*, query_set_id: str | None = None, accessions: list[str] | None = None, ipr_release_floor: str | None = None, chunk_size: Annotated[int, ~annotated_types.Gt(gt=0)] = 50, timeout_seconds: Annotated[int, ~annotated_types.Gt(gt=0)] = 3600, extra_args: list[str] = <factory>, binary_path: str | None = None, commit_every_chunk: bool = True)¶
Bases:
ProteaPayloadInputs for one
run_interproscan_batchinvocation.Exactly one of
query_set_id/accessionsmust be provided.ipr_release_flooris optional: when set, proteins whose latest persistedInterProAnnotation.ipr_releaseis lexicographically greater than or equal to the floor are skipped, which gives the re-dispatch flow its resume semantics.- accessions: list[str] | None¶
- binary_path: str | None¶
- chunk_size: PositiveInt¶
- commit_every_chunk: bool¶
- exactly_one_input() RunInterProScanBatchPayload¶
- extra_args: list[str]¶
- ipr_release_floor: str | None¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str | None) str | None¶
- classmethod must_be_non_empty_list(v: list[str] | None) list[str] | None¶
- query_set_id: str | None¶
- timeout_seconds: PositiveInt¶
- predict_go_terms_from_interpro
Derives GO term predictions from stored
InterproAnnotationrows and theInterproGoMappingtable, without running KNN search. Produces aPredictionSetwhose entries are directly evidence-backed by domain-matching rather than embedding distance.
IP.4b-PRED — InterPro-only GO term predictor.
A parallel-signal GO term predictor. Given a set of query proteins
whose InterProScan hits are already persisted as
InterProAnnotation rows (IP.2 / IP.3), this operation:
Joins each protein’s distinct InterPro accessions against the
InterProGoMappingtable (IP.4a — InterPro2GO loader) at the requestedsource_version.Resolves the right-hand GO ids against an
OntologySnapshotso the output is keyed bygo_term_idlike every otherGOPrediction.Aggregates the per-protein votes: if N distinct InterPro entries on the same protein map to the same GO term, that’s N supporting votes and a stronger prediction than a single mapping.
Persists the result as a fresh
PredictionSetwithmeta["algorithm"] = "interpro_propagation"so downstream ensemble / cafaeval routers can pick it up alongside KNN-basedPredictionSetrows (T-RES.3 ensemble multi-K scaffolding).
The operation is single-stage (no batch fan-out): the join is a cheap index scan and the per-protein InterPro hit count is tiny (O(10-100)) so partitioning would be pure overhead.
Idempotency¶
Within one call: per-protein duplicate (go_term_id) votes are
merged before insert. Across calls: every call materialises a new
PredictionSet; the predictions underneath it are unique
by uq_go_prediction_set_protein_term and we additionally
ON CONFLICT DO NOTHING so a partial-failure retry on the same
prediction set inserts no duplicates.
GOPrediction column mapping¶
The GOPrediction schema was designed for KNN transfer, so a
few NOT-NULL columns need a sensible value for the InterPro path:
ref_protein_accession— the InterPro accession that voted for this term, e.g."IPR000001". Multiple voters: the one with the lexicographically smallest accession (stable, observable).distance— the inverse vote count (1.0 / vote_count). Lower = stronger support, matching the KNN convention; a 1-voter prediction has distance 1.0, a 5-voter prediction 0.2.evidence_code— propagated from theInterProGoMapping.evidencecolumn (defaults to"IEA").vote_count— the literal number of distinct IPR entries on this protein that map to this GO id at thissource_version.
Out of scope (see IP.4 plan-first sub-plan):
Reranker feature integration (IP.4c,
requires_human=true).Score blending across PredictionSets (T-RES.3 ensemble router).
Ancestor / aspect propagation (the caller is expected to enable the existing post-process pipeline if needed).
- class protea.core.operations.predict_go_terms_from_interpro.PredictGOTermsFromInterProOperation¶
Bases:
objectEmit GO predictions for proteins via their persisted InterPro hits.
Single-stage operation: validates FKs, materialises one
PredictionSet, performs an indexed three-table join (interpro_annotation⨝interpro_go_mapping⨝go_term), aggregates per-protein votes, and bulk-inserts theGOPredictionrows.- description = "Predict GO terms for a query set by joining each protein's InterProAnnotation rows against the InterPro2GO mapping table; emits a PredictionSet tagged algorithm=interpro_propagation."¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'predict_go_terms_from_interpro'¶
- summarize_payload(payload: dict[str, Any]) str¶
- class protea.core.operations.predict_go_terms_from_interpro.PredictGOTermsFromInterProPayload(*, embedding_config_id: str, annotation_set_id: str, ontology_snapshot_id: str, source_version: str, query_set_id: str | None = None, query_accessions: list[str] | None = None, ipr_release_floor: str | None = None, chunk_size: int = 1000)¶
Bases:
ProteaPayloadInputs for
predict_go_terms_from_interpro.Either
query_set_idorquery_accessionsis required. Both may be set — the union is used.embedding_config_id,annotation_set_id,ontology_snapshot_idare required by the existingPredictionSetschema (NOT NULL FKs); for this parallel signal they act as trace pointers (the model wasn’t actually used during prediction).source_versionis the InterPro2GO mapping release tag (seeLoadInterProGoMappingPayload). Predictions are filtered to mappings tagged with this exactsource_versionso a release skew between snapshots is observable rather than silently mixed.- annotation_set_id: str¶
- chunk_size: int¶
- embedding_config_id: str¶
- ipr_release_floor: str | None¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- ontology_snapshot_id: str¶
- query_accessions: list[str] | None¶
- query_set_id: str | None¶
- source_version: str¶
- export_research_dataset
Publishes the frozen re-ranker dataset (
train.parquet/eval.parquet/manifest.json) consumed byprotea-reranker-lab. Runs the KNN + feature-generation pipeline viaTrainRerankerAutoOperationindump_onlymode and uploads the resulting artefacts through the configuredArtifactStore(local FS by default, MinIO when thestoragecompose profile is active). Manifest records PROTEA’sproducer_version/producer_git_shafor full traceability from lab runs back to PROTEA HEAD.
Training-dump helpers¶
protea.core.training_dump_helpers is the home of the KNN /
feature-generation helpers that were extracted in F0 (T0.6) when
protea.core.operations.train_reranker was deleted. The module is
deliberately not an operation: it is reused in-process by
ExportResearchDatasetOperation to materialise train and
eval shards before the parquet_export consolidation pass.
LightGBM training itself lives in
protea-reranker-lab,
which consumes the published Dataset rows produced by
export_research_dataset.
As of T-RES.1 (PR #368), parent_map_str is always passed to the KNN
transfer runner unconditionally, regardless of the expand_votes_to_ancestors
flag. The lineage producer (protea_method.lineage.compute_lineage_features)
requires the parent map for its lineage_* column computation; the flag only
controls ancestor-vote expansion, not the availability of the map itself.
Internal helpers¶
These modules are imported by the operations and the feature engineering layer; they are documented here for completeness but are not part of the public API.
protea.core.anc2vec_embeddings: anc2vec ancestry embeddings for GO terms, used as features by the re-ranker.protea.core.annotation_intern: string interning helper for reducing memory pressure when loading large annotation sets.protea.core.disk_cache: generic on-disk cache with TTL used by the KNN reference loader and the PCA cache.protea.core.feature_enricher: orchestrator that combines alignment, taxonomy and anc2vec features into a single per-candidate row.protea.core.pca_cache: per-PLM PCA projection cache, used to pre-compute theemb_pcafeature family.protea.core._anc2vec_phases: phase helpers for the anc2vec embedding pipeline.protea.core._feature_enricher_helpers: low-level helpers extracted fromfeature_enricherto keep per-phase logic self-contained.protea.core.features._bindings: internal feature-column binding declarations consumed byfeatures.registry.protea.core._knn_transfer_runner: worker-side KNN transfer runner dispatched by the predictions coordinator.protea.core._leaf_record_builder: builder for leaf-prediction records in the KNN transfer pipeline.protea.core._pair_feature_compute: pair-level feature computation with optional process-pool parallelism and SQLite alignment cache.protea.core._training_dump_loaders: data-loading helpers used by the training-dump pipeline.protea.core.training_dump._constants: shared constants for the training-dump subpackage.protea.core.training_dump._contexts: context dataclasses bundling session, settings, and configuration for training-dump passes.protea.core.training_dump._data_loaders: loaders that hydrate protein, embedding, and annotation data for the training-dump pipeline.protea.core.training_dump._knn_transfer: KNN search and GO-term transfer logic specific to the training-dump path.protea.core.training_dump._payload: Pydantic payload model for theexport_research_datasetoperation.protea.core.training_dump._runner: top-level runner that coordinates the training-dump passes.protea.core.training_dump._test_split: helper that materialises the eval/test shard of the frozen dataset.protea.core.training_dump._train_split: helper that materialises the train shard of the frozen dataset.protea.core.operations._compute_embeddings_backends: backend dispatch logic for thecompute_embeddingscoordinator.protea.core.operations._compute_embeddings_helpers: batch construction and progress helpers forcompute_embeddings.protea.core.operations._load_ontology_helpers: OBO parsing helpers used byload_ontology_snapshot.protea.core.operations._predict_go_terms_adapter: adapter that routes thepredict_go_termscoordinator to the unified-path or batch-op implementations.protea.core.operations.predict_go_terms._aspect_helpers: per-aspect splitting and merging helpers for the predict pipeline.protea.core.operations.predict_go_terms._batch_op: batch operation executed byOperationConsumerworkers.protea.core.operations.predict_go_terms._batch_op_feature: feature generation phase of the batch operation.protea.core.operations.predict_go_terms._batch_op_reference: reference-embedding loading and KNN search for batch operations.protea.core.operations.predict_go_terms._batch_op_reranker: reranker scoring phase of the batch operation.protea.core.operations.predict_go_terms._common: shared types and constants used across the predict-go-terms subpackage.protea.core.operations.predict_go_terms._coordinator: coordinator that partitions the query set and dispatches batch messages.protea.core.operations.predict_go_terms._post_knn_pipeline: post-KNN pipeline steps (feature enrichment, reranker, aggregation).protea.core.operations.predict_go_terms._reranker_scorer:RerankerScorercompositive class that applies a loaded booster to batch prediction DataFrames.protea.core.operations.predict_go_terms._store: bulk-insert helpers for writingGOPredictionrows.protea.core.operations.predict_go_terms._unified_path: unified in-process execution path used when a single worker handles both KNN and write phases.protea.core.operations._run_cafa_artifacts: artifact download and staging helpers forrun_cafa_evaluation.protea.core.operations._run_cafa_data_helpers: data-loading helpers for the CAFA evaluation pipeline.protea.core.operations._run_cafa_eval_driver: driver that callscafaeval.cafa_evaland collects per-namespace results.protea.core.operations._run_cafa_helpers: shared helper functions used across the_run_cafa_*modules.protea.core.operations._run_cafa_reranker_loader: loads and applies the reranker model to CAFA prediction TSVs.protea.core.operations._run_cafa_setup: environment and directory setup for a CAFA evaluation run.protea.core.operations._run_cafa_summary: summarises cafaeval results intoEvaluationResultrows.
Process-local string interning for annotation hot loops.
The predict / train pipelines build millions of per-row dicts of the
shape {"go_term_id": int, "qualifier": str|None, "evidence_code": str|None}
when they materialise annotations from PostgreSQL. SQLAlchemy returns
each qualifier and evidence_code as a fresh Python string —
even though across a million rows there are only ~5-10 distinct values
("IEA", "IDA", "EXP", "TAS", …) and most rows have
qualifier = None.
Without interning, each duplicate string allocates ~50 B in CPython, so a 5 M-row batch can carry ~500 MB of redundant string objects. Interning collapses every duplicate to a single shared instance, a Flyweight-style intrinsic-state share. Python already does this implicitly for short identifier-like literals; this module forces the same dedup for the strings that come back from the DB driver.
Process-local by design (one cache per worker), trivially small (the
domain has fewer than 50 distinct codes/qualifiers in practice), and
thread-safe via the GIL — setdefault is atomic for built-in dicts.
- protea.core.annotation_intern.intern_string(value: str | None) str | None¶
Return a shared instance of
valueif it has been seen before.Pass
Nonethrough unchanged so callers don’t need a guard. The pool grows monotonically; reset only when the worker restarts.
- protea.core.annotation_intern.pool_size() int¶
Diagnostic — how many distinct strings the pool has retained.
On-disk caches for the predict_go_terms pipeline.
The reference embedding pool for an (EmbeddingConfig, AnnotationSet)
pair is large (millions of rows × ~1280 dims × 2 bytes = several GB).
Re-fetching it from PostgreSQL on every batch worker is the main
bottleneck, so the pool is materialised once into data/ref_cache/
and read back via numpy mmap on subsequent runs. Annotations are stored
in a CSR-style layout (offsets + flat go_term_ids /
qualifiers / evidence_codes arrays) so per-protein lookups stay
O(1) instead of dictionary-of-list-of-dict.
Files (under _DISK_CACHE_DIR):
{cfg}__{ann}_embeddings.npy— float16 reference embeddings{cfg}__{ann}_accessions.npy— aligned accession list{cfg}__{ann}__{aspect}_indices.npy— int32 indices into the unified pool{cfg}__{ann}__{aspect}_anno_*.npy— CSR annotation arrays
Invalidation: AnnotationSet rows are immutable once loaded, so the
cache stays valid for as long as the files exist. Delete the files
manually to force a reload after a model change.
Per-group helpers extracted from expand_predictions_to_ancestors.
The ancestor-expansion loop in feature_enricher mutates either an
existing leaf record (when an ancestor coincides with a sibling leaf
prediction) or a synthetic ancestor record (cloned from the leaf with
go_id overridden). Both branches were inlined in the orchestrator,
pushing it well past the master plan v3.2 §3 method-LOC ceiling. This
module hosts the two branch helpers + the small label-config bundle so
the orchestrator can stay readable and short.
- class protea.core._feature_enricher_helpers.LabelConfig(q_acc: str, gt_pairs: set[tuple[str, str]] | None, column: str, present: bool)¶
Bases:
NamedTupleBundle for the per-query label-injection knobs.
Carries the query accession (used as half of the
(q_acc, anc)ground-truth lookup) plus the optionalgt_pairsset and the column / presence flags that drive whether a synthesized ancestor record should carry a label at all. The training dump path passes a populated set; the livepredict_go_termspath leavesgt_pairsasNoneandpresentasFalse.- column: str¶
Alias for field number 2
- gt_pairs: set[tuple[str, str]] | None¶
Alias for field number 1
- present: bool¶
Alias for field number 3
- q_acc: str¶
Alias for field number 0
- protea.core._feature_enricher_helpers.compute_ia_weight(anc_gid: str, leaf_gid: str, ia_weights: dict[str, float] | None) float¶
Weight contributed by
leaf_gidto its ancestoranc_gid.Without IA weights every edge counts as 1.0. When weights are provided, the contribution is the IA ratio anc/leaf (with a guard against zero-weight leaves to avoid division blow-ups when an incomplete IA file leaves a term at IA=0).
- protea.core._feature_enricher_helpers.make_ancestor_closure(parent_map: dict[str, set[str]] | dict[str, list[str]] | None) Callable[[str], frozenset[str]]¶
Return a memoized
go_id -> ancestor closurecallable.The orchestrator calls the result repeatedly for every leaf GO id; caching is_a / part_of closures here keeps the inner loop O(1) after the first visit.
parent_mapis normalised tofrozensetparents up front so subsequent traversal cannot accidentally mutate the caller’s dict.
- protea.core._feature_enricher_helpers.merge_into_existing_leaf(leaf_anc: dict[str, Any], leaf_rec: dict[str, Any], vote_increment: float, leaf_d: float) None¶
Fold a leaf-as-ancestor vote into the existing leaf candidate.
Called when the ancestor of a leaf prediction is itself already a leaf candidate for the same
(protein, aspect)group. Bumps the target’sneighbor_vote_fractionand lowers itsneighbor_min_distanceif the contributing leaf is closer.
- protea.core._feature_enricher_helpers.update_synth_entry(synth: dict[str, dict[str, Any]], anc: str, leaf_rec: dict[str, Any], leaf_d: float, vote_increment: float, label_ctx: LabelConfig) None¶
Insert or update a synthetic ancestor record.
When the ancestor is not already a leaf candidate the record is fabricated from the closest contributing leaf. If a synthetic entry already exists, the closer leaf wins the per-pair feature payload (alignment, taxonomy, anc2vec, emb_pca) so the booster sees a consistent training-time view; otherwise only the vote fraction is bumped.
Per-feature compute bindings for the canonical FeatureRegistry (T2B.2).
T2B.1 populated protea.core.features.registry with placeholder
compute callables that raise NotImplementedError. This
module wires each feature in protea_contracts.ALL_FEATURES to
the legacy producer function it is filled by in the current code path
and calls CanonicalFeatureRegistry.bind_compute() once at
import time to replace the placeholders.
Mapping (family group -> producer):
KNN-derived columns (
distance,vote_count,k_position,go_term_frequency,ref_annotation_density,neighbor_distance_std,neighbor_vote_fraction,neighbor_min_distance,neighbor_mean_distance) come from the per-aspect record builder driven byprotea.core._knn_transfer_runner._KnnTransferRunner. They are bound to_knn_record_producer(), a marker callable that carries a reference to the runner on its__protea_producer__attribute.NW / SW alignment columns and
length_query/length_refcome fromprotea.core.feature_engineering.compute_alignment()(called per (query, ref) pair in_knn_transfer_runner).Taxonomy pair columns (
taxonomic_distance,taxonomic_common_ancestors,taxonomic_relation) come fromprotea.core.feature_engineering.compute_taxonomy().The v6 enrichment columns (
tax_voters_*,go_term_frequencyshare,anc2vec_*,emb_pca_query_*) come fromprotea.core.feature_enricher.enrich_v6_features().Lineage columns (
lineage_is_ancestor_of_known,lineage_is_descendant_of_known,lineage_ancestor_of_count,lineage_descendant_of_count) come fromprotea_method.lineage.compute_lineage_features(). Added in T-RES.1 to consume the lineage feature family registered byprotea-contractsv0.3.0.Categorical metadata (
qualifier,evidence_code,aspect) is sourced from annotation rows during record construction; the marker_annotation_metadata_producer()carries that intent.
The bound compute callables are not invoked by parquet_export
(the exporter reads pre-computed shards from disk). They exist so
that any downstream consumer of the registry can introspect which
legacy function produces each column. The __protea_producer__
attribute makes that legacy reference machine-readable.
Idempotency: apply_canonical_bindings() is safe to call more
than once on the same registry. It walks every feature in
protea_contracts.ALL_FEATURES and rebinds, so a fresh
registry (after reset_canonical_registry()) is restored to
fully-bound state by re-importing this module’s public function.
- protea.core.features._bindings.apply_canonical_bindings(registry: CanonicalFeatureRegistry) int¶
Bind every
ALL_FEATURESfeature onregistryto its legacy producer reference. Returns the count of features bound.Idempotent: rebinding an already-bound feature is a no-op (the underlying
CanonicalFeatureRegistry.bind_compute()replaces the callable without touching dtype / family).
Parallel + persistent computation of per-pair alignment features.
The export pipeline’s hotspot is the per-(query, reference) alignment in
_KnnTransferRunner._compute_pair_features: a single-threaded triple
loop calling compute_alignment (parasail NW+SW with traceback) at a
few hundred pairs/s. Two structural wins live here:
Process-level parallelism over the unique alignment pairs. parasail’s traceback variants do not release the GIL well enough for threads to scale, so a
ProcessPoolExecutoris used (benchmarked ~3x on a 12-core box). The taxonomy lookups stay in the parent process because they are cheap (anlru_cacheover ete3 lineages) and process re-init of the ete3 sqlite handle would dwarf the work.A persistent on-disk cache keyed by the ordered sequence pair plus the alignment parameters. Intra-PLM the K=10 neighbour set is a superset of K=5 / K=3, so alignments computed once for the largest-K dataset make the smaller-K datasets (and any re-run) near-free.
protea.trainingis serialised (one job at a time) so there is no concurrent-write contention; a plain sqlite file is enough.
This module is value-preserving by construction: the alignment feature
dict it returns is exactly what compute_alignment returns, only
computed concurrently and memoised. Disabling parallelism (workers <= 1)
and the cache (PROTEA_ALIGN_CACHE_DIR unset / cache-miss) reduces to
the original serial code path.
- class protea.core._pair_feature_compute.PairAlignmentComputer¶
Bases:
objectCompute alignment-feature dicts for many pairs, cached + parallel.
Use as a context manager so the sqlite handle and process pool are released deterministically.
- compute(pairs: list[tuple[str, str]]) dict[tuple[str, str], dict[str, Any]]¶
Return
{(q_seq, r_seq): alignment_dict}for every input pair.Empty-sequence pairs are skipped by the caller; here every pair is assumed to have two non-empty sequences. Deduplicates on the ordered sequence pair so identical pairs are aligned once.
- protea.core._pair_feature_compute.build_pair_feature_dict(q_acc: str, ref_acc: str, align_by_pair: dict[tuple[str, str], dict[str, Any]], *, do_alignments: bool, do_taxonomy: bool, tax_ids: tuple[dict[str, Any] | None, dict[str, Any] | None]) dict[str, Any]¶
Merge the precomputed alignment + per-pair taxonomy for one pair.
Value-identical to the original inline
compute_alignment+compute_taxonomymerge: the alignment half is looked up from the batch-computed map, the taxonomy half is computed here (cheap,lru_cache-backed). Either family is skipped when its flag is off.
- protea.core._pair_feature_compute.precompute_alignment_features(*, aspects: Sequence[str], neighbors_by_aspect: dict[str, list[list[tuple[str, float]]]], valid_queries: Sequence[str], query_sequences: dict[str, str], ref_sequences: dict[str, str]) dict[tuple[str, str], dict[str, Any]]¶
Batch-align every unique (q_acc, ref_acc) neighbour pair.
Drives
PairAlignmentComputer(parallel + on-disk cache) over the deduplicated pair set and remaps the result back onto the(q_acc, ref_acc)accession keys the runner indexes by. Returns an empty map when there are no alignable pairs.
- protea.core._pair_feature_compute.resolve_workers() int¶
Number of process-pool workers for pair-feature computation.
PROTEA_PAIR_FEATURE_WORKERSoverrides; default is the CPU count. A value <= 1 forces the serial path (no pool spin-up).
Bulk DB loaders extracted out of training_dump_helpers.
Three private helpers used by the dump pipeline:
_count_embeddings_with_dim(): single COUNT +vector_dimsprobe for a given embedding-config UUID._stream_embeddings(): pre-allocatednp.emptymatrix filled viayield_perover thesequence_embeddingjoin._load_annotation_aggregations(): per-aspect grouping of annotation rows under the GOP/F/Caspects, filtering against a providedacc_to_idxso only proteins with a preloaded embedding contribute to the reference set.
All three are intentionally I/O-only: they take a SQLAlchemy
Connection (already opened by the caller) and return plain Python /
NumPy structures. Keeping them out of training_dump_helpers lets
_preload_all_embeddings and _build_reference_from_cache stay
under the §3 60-LOC method ceiling without the dump-helper file
ballooning further.
Shared constants for the dump pipeline submodules.
Kept here so the split helpers, the test-split helpers, and the runner can share them without forming an import cycle (T2B.6).
Parameter objects for the dump pipeline.
Originally lived in protea/core/training_dump_helpers.py. Extracted
to a leaf submodule (T2B.6) so the type-only consumers in
protea/core/_knn_transfer_runner.py can import them without
dragging the larger orchestration code into the import graph.
- class protea.core.training_dump._contexts.KnnTransferContext(valid_queries: list[str], query_emb: np.ndarray, ref_by_aspect: dict[str, dict[str, Any]], go_id_map: dict[int, str], aspect_map: dict[int, str], gt_pairs: set[tuple[str, str]], query_known_gos: dict[str, set[str]] | None = None, parent_map_str: dict[str, set[str]] | None = None, ia_weights: dict[str, float] | None = None, pca_state: tuple[np.ndarray, np.ndarray] | None = None, pivot_go_ids: set[str] | frozenset[str] | None = None, embedding_pool: np.ndarray | None = None)
Bases:
objectBundle of KNN inputs + enrichment maps for
_knn_transfer_and_label.Groups the 12 per-call data arguments (queries, references, ontology maps, optional enrichment helpers) so the entry-point signature stays under flake8-bugbear’s parameter ceiling.
session, payloadp,sequence_context, andstream_outputremain standalone arguments because they are configuration / IO concerns, not data.- aspect_map: dict[int, str]
- embedding_pool: np.ndarray | None = None
- go_id_map: dict[int, str]
- gt_pairs: set[tuple[str, str]]
- ia_weights: dict[str, float] | None = None
- parent_map_str: dict[str, set[str]] | None = None
- pca_state: tuple[np.ndarray, np.ndarray] | None = None
- pivot_go_ids: set[str] | frozenset[str] | None = None
- query_emb: np.ndarray
- query_known_gos: dict[str, set[str]] | None = None
- ref_by_aspect: dict[str, dict[str, Any]]
- valid_queries: list[str]
- class protea.core.training_dump._contexts.SequenceContext(query_sequences: dict[str, str] | None = None, ref_sequences: dict[str, str] | None = None, query_tax_ids: dict[str, int | None] | None = None, ref_tax_ids: dict[str, int | None] | None = None)
Bases:
objectPer-protein sequence and taxonomy lookups.
All four attributes are optional; passing
Nonedisables the corresponding feature family (alignment / taxonomy).- query_sequences: dict[str, str] | None = None
- query_tax_ids: dict[str, int | None] | None = None
- ref_sequences: dict[str, str] | None = None
- ref_tax_ids: dict[str, int | None] | None = None
- class protea.core.training_dump._contexts.StreamOutput(output_parquet: Path, chunk_rows: int = 100000)
Bases:
objectStreaming parquet output for memory-bounded dataset generation.
When provided,
_knn_transfer_and_labelwrites labeled rows directly tooutput_parquetin chunks ofchunk_rowsinstead of accumulating the full result list in memory.- chunk_rows: int = 100000
- output_parquet: Path
Reference / sequence / taxonomy loaders for the dump pipeline.
Originally lived in protea/core/training_dump_helpers.py. Extracted
to a leaf submodule (T2B.6) so the orchestration code can import the
loaders without pulling in the entire split runner. Behaviour is byte
identical.
Thin facade over the _knn_transfer_runner Method Object.
Kept as a separate submodule (T2B.6) so the test-split and train-split helpers can import the entry point without each pulling the runner module into their import graph at top level.
Payload for the auto dump operation.
Originally lived in protea/core/training_dump_helpers.py. The model
shape is unchanged; only the file location moved (T2B.6).
- class protea.core.training_dump._payload.TrainRerankerAutoPayload(*, name: str, embedding_config_id: str, ontology_snapshot_id: str, train_versions: list[int], test_versions: list[int], annotation_source: str = 'goa', limit_per_entry: Annotated[int, Gt(gt=0)] = 5, distance_threshold: float | None = None, search_backend: str = 'faiss', metric: str = 'cosine', faiss_index_type: str = 'IVFFlat', faiss_nlist: int = 256, faiss_nprobe: int = 32, num_boost_round: int = 1000, early_stopping_rounds: int = 50, val_fraction: float = 0.2, neg_pos_ratio: float | None = None, reranker_objective: str = 'binary', compute_alignments: bool = False, compute_taxonomy: bool = False, ia_file: str | None = None, expand_votes_to_ancestors: bool = False, use_embedding_pca: bool = False, training_scope: str = 'per_category', per_cell_min_positives: Annotated[int, Gt(gt=0)] = 200, dump_to: str | None = None, dump_only: bool = False)
Bases:
ProteaPayloadPayload for the dump_helper operation.
Generates consecutive temporal pairs from
train_versions, runs KNN once per pair, then trains 3 per-category LightGBM models (NK, LK, PK) and evaluates each on the held-out test split.- annotation_source: str
- classmethod at_least_one_test(v: list[int]) list[int]
- classmethod at_least_two_train(v: list[int]) list[int]
- compute_alignments: bool
- compute_taxonomy: bool
- distance_threshold: float | None
- dump_only: bool
- dump_to: str | None
- early_stopping_rounds: int
- embedding_config_id: str
- expand_votes_to_ancestors: bool
- faiss_index_type: str
- faiss_nlist: int
- faiss_nprobe: int
- ia_file: str | None
- limit_per_entry: PositiveInt
- metric: str
- model_config = {'frozen': True, 'strict': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str
- name: str
- neg_pos_ratio: float | None
- num_boost_round: int
- ontology_snapshot_id: str
- per_cell_min_positives: PositiveInt
- reranker_objective: str
- classmethod scope_is_valid(v: str) str
- search_backend: str
- test_versions: list[int]
- train_versions: list[int]
- training_scope: str
- use_embedding_pca: bool
- val_fraction: float
Pure helpers extracted from compute_embeddings operation classes.
Keeps the operation file (compute_embeddings.py) under the master
plan v3.2 §3 method-LOC ceiling while leaving the operation classes
focused on payload validation, model loading, and the publish path.
- protea.core.operations._compute_embeddings_helpers.build_batch_dispatch_messages(p: ComputeEmbeddingsPayload, parent_job_id: uuid.UUID, sequence_ids: list[int]) list[tuple[str, dict]]
Partition sequences into per-job batches and build their queue messages.
Coordinator-side helper for
ComputeEmbeddingsOperation.execute: splits the fullsequence_idslist into chunks ofp.sequences_per_joband emits onecompute_embeddings_batchmessage per chunk addressed to the GPU batch queue. The messages carry every payload field that the child workers need (no DB lookups happen between coordinator and worker).
- protea.core.operations._compute_embeddings_helpers.build_embedding_rows(session: Session, p: StoreEmbeddingsPayload, config_id: uuid.UUID) tuple[list[dict], int, int]
Materialise SequenceEmbedding insert rows for a store-embeddings batch.
Iterates
p.sequencesand skips entries whose(sequence_id, config_id)pair already exists whenp.skip_existingis true; otherwise deletes the existing rows so the bulk insert can replace them. Returns(rows, embeddings_stored, sequences_skipped); callers run the bulk insert + per-job-progress update.
- protea.core.operations._compute_embeddings_helpers.build_store_message(parent_job_id: uuid.UUID, p: ComputeEmbeddingsBatchPayload, write_sequences: list[dict]) tuple[str, dict]
Build the queue tuple that hands off inferred batches to the write worker.
- protea.core.operations._compute_embeddings_helpers.serialize_inferred_chunks(sequences: list[Sequence], batch_chunks: list[list[ChunkEmbedding]]) list[dict]
Build per-sequence dicts the store_embeddings worker consumes.
Pairs each input
Sequencerow with its inferred chunk list and flattens eachChunkEmbeddinginto JSON-friendly fields. The write worker uses these dicts directly without re-fetching from the DB.
- protea.core.operations._compute_embeddings_helpers.t5_forward_pass(model: Any, input_ids: Any, attention_mask: Any) Any
Run a T5 encoder forward pass under
no_grad; returnhidden_states.Lazy-imports
torchso this module stays importable on machines without it (e.g. the API process where embedding code is never exercised). Frees theoutputsreference and triggers a CUDA cache flush before returning so the caller can reuse the residual GPU memory for the per-sequence pool step.Pulled out of
_embed_t5to keep that backend entry point under the §3 60-LOC ceiling.
Pure helpers extracted from LoadOntologySnapshotOperation.execute.
The OBO load path branches into a fresh-snapshot insert and an
already-exists-but-needs-backfill case; both paths build
GOTermRelationship rows via the same loop. This module hosts the
shared relationship builder plus the two branch handlers, so the
operation file (load_ontology_snapshot.py) can keep execute
under the master plan v3.2 §3 method-LOC ceiling.
- protea.core.operations._load_ontology_helpers.build_relationships(terms: list[dict[str, Any]], go_id_to_db_id: dict[str, UUID], snapshot_id: UUID) list[GOTermRelationship]¶
Materialise GOTermRelationship rows for the given parsed terms.
termscarry their parent edges in("relation_type", "GO:nnn")tuples under therelationshipskey (parser output). Edges pointing at GO ids absent fromgo_id_to_db_idare silently skipped so a partial snapshot does not abort the whole insert.
- protea.core.operations._load_ontology_helpers.handle_existing_snapshot(session: Session, existing: OntologySnapshot, terms: list[dict[str, Any]], obo_version: str, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
Resolve an idempotent re-run of
load_ontology_snapshot.Returns the existing snapshot id as a no-op when its relationship rows are already present; otherwise dispatches to
_backfill_relationships()(the GO terms themselves are assumed present from the original load).
- protea.core.operations._load_ontology_helpers.insert_new_snapshot(session: Session, obo_url: str, obo_version: str, terms: list[dict[str, Any]], started_at: float, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
Create the snapshot, terms, and relationship rows in a single flow.
The caller has already validated the payload, downloaded the OBO, and parsed the terms; this function commits the three inserts and emits the
doneaudit event with the elapsed runtime.
Pre-cafaeval data preparation helpers extracted from
RunCafaEvaluationOperation.execute.
These pure functions operate on the in-memory EvaluationData
buckets before cafaeval is invoked: they restrict the ground truth
to the actually-predicted protein cohort, which is the standard CAFA
practice that prevents delta proteins outside the PredictionSet’s
query coverage from hurting Fmax / coverage.
- protea.core.operations._run_cafa_data_helpers.restrict_data_to_predicted(session: Session, *, prediction_set_id: UUID, data: EvaluationData, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) EvaluationData¶
Drop ground-truth proteins not present in the PredictionSet cohort.
Without this filter, delta proteins that the PredictionSet never actually predicted (e.g. the booster failed for them, or they were excluded at query time) would still count as “missing” against Fmax / coverage. CAFA practice is to evaluate only on the predicted cohort.
Returns a new
EvaluationDatawith each of the five{protein → set[go_id]}mappings filtered to the predicted accessions. Emits agt_restricted_to_predictedevent with before/after counts for the audit trail.
- protea.core.operations._run_cafa_data_helpers.write_ground_truth_files(artifacts_root: Any, data: EvaluationData) dict[str, str]¶
Write the five
gt_*.tsv/known_terms.tsvfiles used by cafaeval.Returns a dict
{label → path}so the caller can pass each path to the per-setting cafaeval driver.artifacts_rootis apathlib.Path(or anything convertible viastr); kept typed asAnyso the helper does not importpathlibjust for one signature.
- protea.core.operations._run_cafa_data_helpers.write_terms_of_interest(toi_path: str, toi_go_ids: list[str], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) None¶
Persist the CAFA6 terms-of-interest file (one GO id per line, sorted).
cafaeval’s
-toiflag restricts evaluation to this set so that terms not in the frozen pivot graph (e.g. new since t-1) are excluded from scoring.
Setup-phase helpers extracted from run_cafa_evaluation.
Holds the two NamedTuple bundles threaded through the run pipeline
plus the small functions that load the term universe and emit the
setup events. Living in a sibling keeps the operation file lean and
under the master-plan v3.2 §3 LOC budget.
Re-exported by run_cafa_evaluation for backwards compatibility.
Sibling helper for RunCafaEvaluationOperation.summarize_payload.
Builds the per-job CLI / event summary that the jobs router exposes.
Lives outside run_cafa_evaluation.py so the operation module stays
under §3 file-LOC budget. Each _bits_from_* helper is a small,
test-friendly chunk.
- protea.core.operations._run_cafa_summary.summarize_payload(payload: dict[str, Any], session: Session | None) str¶
Build the human-readable run summary for the CAFA evaluator.
Mirrors the Job summarize_payload contract: never raises, falls back to UUID prefixes when a FK row is missing or no session is available.
See also
Operations: narrative documentation for every operation listed above, with payload examples and execution flow.
Infrastructure: the ORM models that
protea.corereads and writes.How-to Guides: task-oriented recipes that exercise these modules end-to-end.