Infrastructure

The protea.infrastructure package implements the persistence and messaging layer. It is the only package that imports SQLAlchemy, psycopg2, or aio-pika directly. All other layers interact with the database through the session factory and with the queue through the publisher interface.

Database engine

protea.infrastructure.database.engine creates the SQLAlchemy engine from the configured database URL. The engine is constructed once at application startup and shared across all session factories.

protea.infrastructure.database.engine.build_engine(db_url: str) Engine

Settings

protea.infrastructure.settings loads configuration from protea/config/system.yaml and applies environment variable overrides. The two mandatory settings are db_url (PostgreSQL connection string) and amqp_url (RabbitMQ connection string). Both can be overridden at runtime via PROTEA_DB_URL and PROTEA_AMQP_URL environment variables, which takes precedence over the YAML file. This makes the same configuration file usable across local development, CI, and production deployments.

Runtime settings loader for PROTEA (T-OPS.6).

Backs the historical Settings frozen dataclass with a pydantic-settings BaseSettings pipeline. The public surface (Settings + load_settings(project_root, *, env_prefix=...)) stays byte-identical so the dozens of call sites and the tests/test_settings.py suite keep working without edits.

Source priority (highest first)

  1. process environment (PROTEA_* by default)

  2. .env file at the project root (read by python-dotenv)

  3. protea/config/system.yaml (flattened from nested YAML keys)

  4. built-in defaults (declared on the model)

The YAML loader is a custom PydanticBaseSettingsSource so we can keep the historical nested YAML shape (database.url, storage.minio.endpoint) while exposing flat field names to the rest of the codebase.

Deprecation warnings

Two backward-compat affordances emit DeprecationWarning:

  • legacy unprefixed env aliases (DATABASE_URL, AMQP_URL) are honoured but flagged so deployment manifests migrate to the PROTEA_* prefix at their own pace

  • callers passing env_prefix other than PROTEA_ get a warning; pydantic-settings handles the prefix natively, but the override path is undocumented and only kept for one or two legacy scripts

class protea.infrastructure.settings.Settings(db_url: str, amqp_url: str, artifacts_dir: Path, storage_backend: str = 'local', storage_root: Path | None = None, minio_endpoint: str | None = None, minio_bucket: str = 'protea', minio_access_key: str | None = None, minio_secret_key: str | None = None, minio_secure: bool = False, allowed_origins: tuple[str, ...]=('http://localhost:3000', 'http: //127.0.0.1:3000', 'https: //protea.ngrok.app'), anc2vec_path: str | None = None, anon_quota_per_day: int = 5, user_quota_per_day: dict[str, int]=<factory>, smtp_enabled: bool = False, smtp_host: str | None = None, smtp_port: int = 587, smtp_user: str | None = None, smtp_password: str | None = None, smtp_from_addr: str | None = None)

Bases: object

Immutable view of resolved runtime settings.

Kept as a frozen dataclass (not a pydantic BaseModel) so the historical attribute access patterns and MagicMock test doubles keep working unchanged. Constructed by load_settings() from a private _ProteaBaseSettings after path-resolution + tuple coercion.

allowed_origins: tuple[str, ...] = ('http://localhost:3000', 'http://127.0.0.1:3000', 'https://protea.ngrok.app')
amqp_url: str
anc2vec_path: str | None = None
anon_quota_per_day: int = 5
artifacts_dir: Path
db_url: str
minio_access_key: str | None = None
minio_bucket: str = 'protea'
minio_endpoint: str | None = None
minio_secret_key: str | None = None
minio_secure: bool = False
smtp_enabled: bool = False
smtp_from_addr: str | None = None
smtp_host: str | None = None
smtp_password: str | None = None
smtp_port: int = 587
smtp_user: str | None = None
storage_backend: str = 'local'
storage_root: Path | None = None
user_quota_per_day: dict[str, int]
protea.infrastructure.settings.load_settings(project_root: Path, *, env_prefix: str = 'PROTEA_') Settings

Resolve runtime settings via env > .env > system.yaml > defaults.

See the module docstring for the full env-var contract and the deprecation matrix. env_prefix defaults to "PROTEA_"; passing anything else emits a DeprecationWarning.

Session management

session_scope() is the single entry point for all database access in PROTEA. It is a context manager that commits on normal exit and rolls back on any exception, then always closes the session. Workers open and close sessions explicitly rather than relying on this context manager for long-lived operations, but it is used throughout the API layer and in tests.

The build_session_factory() function creates a SQLAlchemy sessionmaker bound to the given database URL. It is called once at application startup and stored on app.state.session_factory, keeping the router free of global state.

protea.infrastructure.session.build_session_factory(db_url: str) sessionmaker[Session]

Create a SQLAlchemy session factory bound to the given database URL.

protea.infrastructure.session.session_scope(factory: sessionmaker[Session]) Iterator[Session]

Context manager that commits on success and rolls back on exception.

Telemetry

protea.infrastructure.telemetry configures OpenTelemetry tracing and Prometheus metrics collection. It instruments the FastAPI application, SQLAlchemy engine, and aio-pika AMQP client, emitting spans and counters for every request, query, and queue publish.

OpenTelemetry SDK boot for PROTEA (T5.1a + T5.1b).

This module is the single entry point for tracing in PROTEA. It wires a global opentelemetry.sdk.trace.TracerProvider configured from environment variables (per ADR D07) and instruments FastAPI, SQLAlchemy engines, and pika.

The OTel SDK is treated as optional at import time: if the libraries are not installed (e.g. minimal worker images), configure_telemetry() logs a single warning and returns the resolved config instead of raising. This keeps the existing developer workflow (poetry install) green until the F-OPS stack rolls out and is the pattern recommended by the OTel docs for SDK-optional applications.

T5.1a scope: SDK boot + env-driven exporter URL + FastAPI instrumentation. T5.1b scope: SQLAlchemy engine instrumentation + pika producer/consumer context propagation via the W3C traceparent header so spans stitch across HTTP -> queue -> worker boundaries.

Environment variables consumed

PROTEA_OTEL_ENABLED

Truthy values (1, true, yes, on) enable tracing. Default false so opting in is explicit and the developer workflow never blocks on a running collector.

PROTEA_OTEL_ENDPOINT

OTLP HTTP exporter endpoint (e.g. http://otel-collector:4318). When unset and tracing is enabled, the OTLP HTTP exporter falls back to its own default (http://localhost:4318).

PROTEA_OTEL_SERVICE_NAME

service.name resource attribute. Defaults to protea-api. Workers set this to protea-worker-<queue> at boot.

PROTEA_OTEL_SAMPLE_RATIO

ParentBased(TraceIdRatioBased(<ratio>)) sampler ratio. 0.0 disables sampling, 1.0 samples every trace. Default 1.0 (sampling is expected to be tuned via the collector once F-OPS sets up budgets).

class protea.infrastructure.telemetry.MetricRegistry(registry: Any, jobs_total: Any, job_duration_seconds: Any, embeddings_batch_seconds: Any, predictions_batch_seconds: Any, db_pool_in_use: Any, http_requests_total: Any, http_request_duration_seconds: Any, http_requests_in_flight: Any)

Bases: object

Bag of the five Prometheus collectors exposed by /v1/metrics.

The registry is intentionally a Parameter Object (not a module-level set of globals) so tests can build a throwaway one and the API can stash a single instance on app.state.metrics.

db_pool_in_use: Any
embeddings_batch_seconds: Any
http_request_duration_seconds: Any
http_requests_in_flight: Any
http_requests_total: Any
job_duration_seconds: Any
jobs_total: Any
predictions_batch_seconds: Any
registry: Any
class protea.infrastructure.telemetry.SdkBundle(TracerProvider: Any, Resource: Any, BatchSpanProcessor: Any, OTLPSpanExporter: Any, ParentBased: Any, TraceIdRatioBased: Any)

Bases: object

Bag of OTel SDK classes used by _build_provider().

Wrapped in a Parameter Object so _build_provider()’s signature stays under the project-wide 6-arg ceiling (see check_smells.py) while remaining testable with stand-ins.

BatchSpanProcessor: Any
OTLPSpanExporter: Any
ParentBased: Any
Resource: Any
TraceIdRatioBased: Any
TracerProvider: Any
class protea.infrastructure.telemetry.TelemetryConfig(enabled: bool, endpoint: str | None, service_name: str, sample_ratio: float)

Bases: object

Resolved telemetry settings.

Built by resolve_telemetry_config() from the environment so callers can introspect what the SDK boot will actually do without triggering the boot itself (handy for /health reporting and tests).

enabled: bool
endpoint: str | None
sample_ratio: float
service_name: str
protea.infrastructure.telemetry.build_metric_registry() MetricRegistry | None

Construct the five-metric registry, returning None when prometheus_client is not installed.

Mirrors the soft-fail pattern of configure_telemetry(): a minimal worker image that does not pull the observability extras keeps booting; the /v1/metrics router degrades to a 503 instead of crashing the whole API.

protea.infrastructure.telemetry.configure_telemetry(app: Any | None = None, *, config: TelemetryConfig | None = None, default_service_name: str = 'protea-api') TelemetryConfig

Boot the OTel SDK and (optionally) instrument a FastAPI app.

Idempotent: a second call with an already-configured global provider is a no-op (logged at DEBUG). Returns the resolved TelemetryConfig regardless of whether tracing was actually enabled, so callers can stash it on app.state and surface it via /health.

protea.infrastructure.telemetry.extract_trace_context(headers: dict[str, Any] | None) Any

Return an OTel context extracted from inbound AMQP headers.

The returned object is opaque (opentelemetry.context.Context); callers pass it to tracer.start_as_current_span(..., context=ctx). Returns None when the OTel API is not installed so callers can short-circuit to a no-op path.

protea.infrastructure.telemetry.get_tracer(name: str) Any

Return an OTel tracer for name or a no-op tracer when the API is not installed.

Keeps call sites (queue consumer, etc.) free of optional-import boilerplate: they unconditionally call get_tracer(__name__) and use the resulting object via the standard tracer API.

protea.infrastructure.telemetry.inject_trace_context(headers: dict[str, Any]) dict[str, Any]

Inject the current trace context into a mutable header mapping.

Used by the pika publisher to propagate traceparent (and tracestate) onto outbound AMQP messages so consumers can stitch spans back to the producing HTTP span. Returns the same mapping for convenience; mutates in place. When the OTel API is not installed or no context is active this is a no-op.

protea.infrastructure.telemetry.instrument_sqlalchemy_engine(engine: Any) None

Wrap a SQLAlchemy Engine with the OTel instrumentor.

Safe to call when telemetry is disabled or the instrumentor is not installed: both cases log at DEBUG / WARNING and return. Workers and the API boot call this from build_engine() so every engine created in the process emits db.* spans without each call site needing to know about OTel.

protea.infrastructure.telemetry.refresh_db_pool_gauge(metrics: MetricRegistry, engine: Any) None

Read the SQLAlchemy pool’s checked-out count and set the gauge.

Called from the /v1/metrics handler on every scrape so the gauge reflects the live pool state without requiring event-listener plumbing. engine is expected to be a SQLAlchemy Engine; we duck-type via engine.pool.checkedout() so tests can pass a minimal stub. Any AttributeError is swallowed (the pool may not expose the method on some dialects).

protea.infrastructure.telemetry.render_metrics(metrics: MetricRegistry) tuple[bytes, str]

Generate the Prometheus exposition payload for metrics.

Returns the body bytes + the canonical Content-Type header value so the router can return a Response without re-importing prometheus_client itself.

protea.infrastructure.telemetry.resolve_telemetry_config(env: dict[str, str] | None = None, *, default_service_name: str = 'protea-api') TelemetryConfig

Resolve telemetry settings from the environment.

env defaults to os.environ. default_service_name lets workers override the default protea-api value at boot.

Benchmark configuration

protea.infrastructure.benchmark_config loads the protea/config/benchmark.yaml file that drives the /benchmark router. It maps scoring-config display names, GO categories, and the baseline tag used in the Fmax comparison grid.

Loader for protea/config/benchmark.yaml.

Mirrors the load_settings pattern in settings.py but returns a typed dataclass specific to the benchmark matrix view. Consumed by the /benchmark/matrix router to avoid hardcoding stage taxonomy, labels, or GO-namespace constants.

class protea.infrastructure.benchmark_config.BenchmarkConfig(preferred_default_stages: 'tuple[str, ...]', baseline_scoring_name: 'str | None', hidden_stages: 'frozenset[str]', hidden_embeddings: 'frozenset[str]', stage_labels: 'dict[str, str]', eval_set_labels: 'dict[str, str]', categories: 'tuple[str, ...]', aspects: 'tuple[str, ...]')

Bases: object

aspects: tuple[str, ...]
baseline_scoring_name: str | None
categories: tuple[str, ...]
eval_set_labels: dict[str, str]
hidden_embeddings: frozenset[str]
hidden_stages: frozenset[str]
label_for_stage(name: str) str

Return the human-readable label for a stage.

Falls back to a Title-Cased version of the raw name if the YAML does not define an explicit label.

preferred_default_stages: tuple[str, ...]
stage_labels: dict[str, str]
protea.infrastructure.benchmark_config.load_benchmark_config(project_root: Path) BenchmarkConfig

Load protea/config/benchmark.yaml into a BenchmarkConfig.

Missing file → sane defaults (no hidden stages, no custom labels, categories = NK/LK/PK, aspects = BPO/MFO/CCO). This keeps the API functional even in fresh checkouts without the YAML.

ORM models

All models use SQLAlchemy 2.x declarative style with Mapped[] type annotations. The schema is managed by Alembic; migrations are generated via alembic revision --autogenerate and stored under alembic/versions/.

The ORM declarative base is defined in protea.infrastructure.orm.base. All models inherit from this base, which applies the default naming convention (snake-case table names, consistent constraint prefixes).

class protea.infrastructure.orm.base.Base(**kwargs: Any)

Bases: DeclarativeBase

registry: ClassVar[registry] = <sqlalchemy.orm.decl_api.registry object>

Refers to the _orm.registry in use where new _orm.Mapper objects will be associated.

Job, JobEvent, and JobComment

Job is the central entity of the job queue. It implements a five-state machine (QUEUED RUNNING SUCCEEDED | FAILED, or QUEUED CANCELLED). The parent_job_id foreign key links batch child jobs to their coordinator parent. payload and meta are PostgreSQL JSONB columns, allowing arbitrary structured data without schema migrations for new operation types. progress_current and progress_total are updated atomically by write workers to drive the frontend progress bar. The narrative columns description / findings (Text, nullable) and tags (Text[], default empty) carry operator-supplied context and never affect dispatch (T3.9 / D11).

JobEvent is an append-only audit log: rows are written by the emit callback during execution and are never updated or deleted. The frontend renders them as a chronological event timeline.

JobComment is the human-authored note thread attached to a Job (T3.10 / D11). One row per comment with author (Text, nullable), body (Text, required), and created_at (Timestamptz). Foreign key to job(id) cascades on delete. Written via POST /jobs/{job_id}/comments and read chronologically via GET /jobs/{job_id}/comments.

protea.infrastructure.orm.models.job.ACTIVE_STATUSES: tuple[str, ...] = (JobStatus.QUEUED, JobStatus.RUNNING)

Statuses that are considered “active” for deduplication purposes. A new job with a matching dedup_key is rejected (409) only when an existing job is in one of these states.

class protea.infrastructure.orm.models.job.Job(**kwargs)

Bases: Base

created_at: Mapped[datetime]
dedup_key: Mapped[str | None]
description: Mapped[str | None]
error_code: Mapped[str | None]
error_message: Mapped[str | None]
events: Mapped[list[JobEvent]]
findings: Mapped[str | None]
finished_at: Mapped[datetime | None]
id: Mapped[UUID]
leased_until: Mapped[datetime | None]
meta: Mapped[dict[str, Any]]
operation: Mapped[str]
parent_job_id: Mapped[UUID | None]
payload: Mapped[dict[str, Any]]
progress_current: Mapped[int | None]
progress_total: Mapped[int | None]
queue_name: Mapped[str]
started_at: Mapped[datetime | None]
status: Mapped[JobStatus]
tags: Mapped[list[str]]
class protea.infrastructure.orm.models.job.JobComment(**kwargs)

Bases: Base

Free-form note attached to a Job (decision D11 thread).

Unlike JobEvent (machine-emitted, structured fields), a JobComment carries an opinionated curator/operator note plus its author tag. Cascades on parent delete; sorted chronologically by created_at via the supporting index.

author: Mapped[str | None]
body: Mapped[str]
created_at: Mapped[datetime]
id: Mapped[int]
job_id: Mapped[UUID]
class protea.infrastructure.orm.models.job.JobEvent(**kwargs)

Bases: Base

event: Mapped[str]
fields: Mapped[dict[str, Any]]
id: Mapped[int]
job: Mapped[Job]
job_id: Mapped[UUID]
level: Mapped[str]
message: Mapped[str | None]
ts: Mapped[datetime]
class protea.infrastructure.orm.models.job.JobStatus(*values)

Bases: StrEnum

CANCELLED = 'CANCELLED'
FAILED = 'FAILED'
QUEUED = 'QUEUED'
RUNNING = 'RUNNING'
SUCCEEDED = 'SUCCEEDED'

Protein and Sequence

Sequence stores unique amino-acid strings, deduplicated by MD5 hash. Multiple Protein rows (canonical accessions and isoforms) may reference the same Sequence row, preventing redundant embedding computation for sequences that appear under different accessions.

Protein stores one row per UniProt accession, including isoforms (<canonical>-<n>). The canonical_accession field groups isoforms together, and the view-only relationship to ProteinUniProtMetadata is joined on this field rather than a foreign key, so metadata rows are not duplicated for each isoform.

class protea.infrastructure.orm.models.protein.protein.Protein(**kwargs)

Bases: Base

One row per UniProt accession, including isoforms (<canonical>-<n>).

Isoforms are grouped by canonical_accession. Many proteins can share the same Sequence row — sequence_id is deliberately non-unique. The uniprot_metadata relationship is view-only, joined by canonical_accession.

accession: Mapped[str]
canonical_accession: Mapped[str]
created_at: Mapped[datetime]
entry_name: Mapped[str | None]
gene_name: Mapped[str | None]
is_canonical: Mapped[bool]
isoform_index: Mapped[int | None]
length: Mapped[int | None]
organism: Mapped[str | None]
static parse_isoform(accession: str) tuple[str, bool, int | None]

Parse isoform accession pattern "<canonical>-<n>".

Forwards to protea_contracts.parse_isoform(). The canonical implementation lives in protea-contracts.bio_utils so the FASTA parser in protea-sources can reuse it without inverting the C-stack dependency direction (D-MIGR-04 of master plan v3).

reviewed: Mapped[bool | None]
sequence: Mapped[Sequence | None]
sequence_id: Mapped[int | None]
taxonomy_id: Mapped[str | None]
uniprot_metadata: Mapped[ProteinUniProtMetadata | None]
updated_at: Mapped[datetime]
class protea.infrastructure.orm.models.protein.protein_metadata.ProteinUniProtMetadata(**kwargs)

Bases: Base

Raw UniProt metadata stored ONCE per canonical accession.

  • Primary key: canonical_accession (e.g., X6R8D5)

  • Isoforms reuse the same metadata via Protein.canonical_accession.

  • Relationship to Protein is view-only (no FK required).

absorption: Mapped[str | None]
active_site: Mapped[str | None]
activity_regulation: Mapped[str | None]
binding_site: Mapped[str | None]
canonical_accession: Mapped[str]
catalytic_activity: Mapped[str | None]
cofactor: Mapped[str | None]
created_at: Mapped[datetime]
dna_binding: Mapped[str | None]
ec_number: Mapped[str | None]
features: Mapped[str | None]
function_cc: Mapped[str | None]
keywords: Mapped[str | None]
kinetics: Mapped[str | None]
pathway: Mapped[str | None]
ph_dependence: Mapped[str | None]
proteins: Mapped[list[Protein]]
redox_potential: Mapped[str | None]
rhea_id: Mapped[str | None]
site: Mapped[str | None]
temperature_dependence: Mapped[str | None]
updated_at: Mapped[datetime]
class protea.infrastructure.orm.models.sequence.sequence.Sequence(*args, **kwargs)

Bases: Base

Production-grade Sequence schema.

Key points: - Stores raw amino-acid sequence. - Deduplicated by sequence_hash (MD5). - A Sequence can be referenced by MANY proteins (Sequence.proteins).

static compute_hash(seq: str) str

Forward to protea_contracts.compute_sequence_hash().

The canonical implementation lives in protea-contracts.bio_utils (D-MIGR-04 of master plan v3) so the FASTA parser in protea-sources can populate UniProtProteinRecord .sequence_hash without depending on PROTEA’s ORM. The wrapper keeps every existing call site (Sequence.compute_hash(seq)) working unchanged.

created_at: Mapped[datetime]
id: Mapped[int]
proteins: Mapped[list[Protein]]
sequence: Mapped[str]
sequence_hash: Mapped[str]
updated_at: Mapped[datetime]

GO Ontology

OntologySnapshot records one complete GO OBO release, versioned by the obo_version string from the OBO file header. The unique constraint on obo_version makes repeated imports idempotent. GOTerm stores one row per term per snapshot; GOTermRelationship stores the directed edges of the GO DAG with their relation types (is_a, part_of, regulates, etc.). The /annotations/snapshots/{id}/subgraph endpoint uses BFS over these edges to return ancestor subgraphs for a given set of GO term IDs.

class protea.infrastructure.orm.models.annotation.ontology_snapshot.OntologySnapshot(**kwargs)

Bases: Base

One row per loaded go.obo file.

obo_version is extracted from the data-version: header of the OBO file (e.g. releases/2024-01-17). obo_url is the URL from which the file was downloaded, providing full provenance. Multiple AnnotationSet rows can reference the same snapshot when they were built against the same ontology release.

ia_url optionally points to the Information Accretion (IA) TSV file associated with this ontology release. IA files are published alongside each CAFA benchmark (e.g. IA_cafa6.tsv) and contain per-term information-content weights that make cafaeval penalise predictions of common, easy-to-predict terms less than rare, specific ones. When present, run_cafa_evaluation downloads and passes this file to cafaeval automatically — no manual path is required in the job payload. Set a new ia_url on each future snapshot (CAFA7, etc.) to keep evaluations comparable across benchmark generations.

annotation_sets: Mapped[list[AnnotationSet]]
go_terms: Mapped[list[GOTerm]]
ia_url: Mapped[str | None]
id: Mapped[uuid.UUID]
loaded_at: Mapped[datetime]
obo_url: Mapped[str]
obo_version: Mapped[str]
class protea.infrastructure.orm.models.annotation.go_term.GOTerm(**kwargs)

Bases: Base

One row per GO term per ontology snapshot.

GO terms are scoped to an OntologySnapshot so that the meaning of a term at a specific ontology release is preserved. (go_id, ontology_snapshot_id) is unique — the same GO:XXXXXXX can exist in multiple snapshots with potentially different names or definitions.

annotations: Mapped[list[ProteinGOAnnotation]]
aspect: Mapped[str | None]
definition: Mapped[str | None]
go_id: Mapped[str]
id: Mapped[int]
is_obsolete: Mapped[bool]
name: Mapped[str | None]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
class protea.infrastructure.orm.models.annotation.go_term_relationship.GOTermRelationship(**kwargs)

Bases: Base

Directed edge in the GO DAG for a specific ontology snapshot.

child_go_term_idparent_go_term_id with a given relation_type (is_a, part_of, regulates, negatively_regulates, positively_regulates).

child: Mapped[GOTerm]
child_go_term_id: Mapped[int]
id: Mapped[int]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
parent: Mapped[GOTerm]
parent_go_term_id: Mapped[int]
relation_type: Mapped[str]

InterPro Annotations

InterproAnnotation stores the domain signature results returned by the EBI InterProScan API for each protein. InterproGoMapping stores the static mapping from InterPro domain entries to their associated GO terms, as distributed by InterPro’s interpro2go file.

class protea.infrastructure.orm.models.annotation.interpro_annotation.InterProAnnotation(**kwargs)

Bases: Base

One InterProScan domain hit persisted for a protein.

Mirrors the IP.1a parser record (protea_sources.interpro.parser. InterProAnnotation) one-to-one, plus a surrogate UUID primary key and the ipr_release tag so multiple InterProScan runs at different release versions can coexist for the same protein without conflict.

The protein_accession FK follows the same pattern as ProteinGOAnnotation: PROTEA’s Protein PK is the UniProt accession string, not a surrogate integer, so domain tables key by accession directly. The IP.2 spec language protein_id is preserved at the relationship level (self.protein) but the column is protein_accession to stay consistent with the rest of the annotation schema.

Coordinate invariants (start >= 1, end >= start) are enforced as table-level CHECK constraints so a buggy upstream caller cannot smuggle nonsense rows past the database.

accession: Mapped[str]
created_at: Mapped[datetime]
end: Mapped[int]
evidence: Mapped[str | None]
id: Mapped[uuid.UUID]
ipr_release: Mapped[str]
protein: Mapped[Protein]
protein_accession: Mapped[str]
source_db: Mapped[str]
start: Mapped[int]
class protea.infrastructure.orm.models.annotation.interpro_go_mapping.InterProGoMapping(**kwargs)

Bases: Base

One InterPro entry to GO term mapping row (InterPro2GO).

IP.4a of the executor plan (F-IP annotation ensemble). Persists the EBI interpro2go flat file as rows keyed by (ipr_accession, go_id). Each line of the upstream file declares that the InterPro entry on the left implies the GO term on the right; the same IPR can map to many GO ids, and a single GO id can be claimed by many IPR entries, so the natural key is the pair.

Loaded by LoadInterProGoMappingOperation (idempotent upsert keyed by the uq_interpro_go_mapping_pair unique constraint). Released as a versioned snapshot via source_version so callers can pin the InterPro release used by a given prediction batch.

Intentionally NOT FK-linked to GOTerm: the EBI mapping file is independent of any specific OBO snapshot, so it can reference GO ids that are missing or obsolete in the active OntologySnapshot. Consumers join in application code where appropriate, so a release skew between InterPro2GO and the GO OBO is observable rather than a hard load error.

created_at: Mapped[datetime]
evidence: Mapped[str | None]
go_id: Mapped[str]
id: Mapped[UUID]
ipr_accession: Mapped[str]
source_version: Mapped[str]

Annotation Sets

AnnotationSet groups a batch of protein GO annotations by source (goa or quickgo) and ontology snapshot version. This design allows side-by-side comparison of annotation sets from different sources or dates and ties every prediction result to a specific, versioned annotation input. ProteinGOAnnotation stores all GAF/QuickGO evidence fields verbatim: qualifier, evidence code, assigned-by, database reference, with/from, and annotation date.

class protea.infrastructure.orm.models.annotation.annotation_set.AnnotationSet(**kwargs)

Bases: Base

A versioned batch of GO annotations from a single source.

Each load operation (QuickGO download, GOA GAF ingest, CAFA dataset) creates one AnnotationSet row. This allows multiple temporal snapshots of the same source to coexist and be queried independently.

ontology_snapshot_id pins the exact GO ontology release used to interpret the annotations in this set. job_id links back to the PROTEA job that created it, providing full audit trail.

annotations: Mapped[list[ProteinGOAnnotation]]
created_at: Mapped[datetime]
id: Mapped[uuid.UUID]
job: Mapped[Job | None]
job_id: Mapped[uuid.UUID | None]
meta: Mapped[dict[str, Any]]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
source: Mapped[str]
source_published_at: Mapped[datetime | None]
source_version: Mapped[str | None]
class protea.infrastructure.orm.models.annotation.protein_go_annotation.ProteinGOAnnotation(**kwargs)

Bases: Base

Association between a protein and a GO term within an annotation set.

Fields map directly from GAF/QuickGO columns:

  • qualifier: e.g. enables, involved_in, located_in

  • evidence_code: GO evidence code resolved from ECO (IDA, IEA, ISS…)

  • assigned_by: database that made the annotation (UniProt, RHEA…)

  • db_reference: supporting reference (PMID:…, GO_REF:…)

  • with_from: with/from field from GAF column 8

  • annotation_date: YYYYMMDD string from the source file

annotation_date: Mapped[str | None]
annotation_set: Mapped[AnnotationSet]
annotation_set_id: Mapped[uuid.UUID]
assigned_by: Mapped[str | None]
db_reference: Mapped[str | None]
evidence_code: Mapped[str | None]
go_term: Mapped[GOTerm]
go_term_id: Mapped[int]
id: Mapped[int]
protein: Mapped[Protein]
protein_accession: Mapped[str]
qualifier: Mapped[str | None]
with_from: Mapped[str | None]

Evaluation Sets

EvaluationSet stores the CAFA-style temporal holdout delta between two annotation sets (old → new). Contains summary statistics (NK/LK/PK protein and annotation counts) in a JSONB stats column. A DB-level UNIQUE(old_annotation_set_id, new_annotation_set_id) constraint (alembic revision b8e3f1a7c2d9) enforces that each (old, new) pair can have at most one EvaluationSet, making generate_evaluation_set idempotent at the schema layer. EvaluationResult stores the output of running cafaeval against a prediction set: per-namespace Fmax, precision, recall, τ, and coverage for NK, LK, and PK settings.

class protea.infrastructure.orm.models.annotation.evaluation_set.EvaluationSet(**kwargs)

Bases: Base

Result of comparing two AnnotationSets to produce CAFA-style evaluation data.

Stores metadata and statistics about the delta between an old and a new GOA annotation set. The actual ground-truth rows are computed on-demand from the stored annotation sets and streamed directly to the client.

old_annotation_set_id is the reference (training) snapshot. new_annotation_set_id is the evaluation (ground-truth) snapshot. Delta proteins are those that gained at least one new experimental annotation between old → new. NK/LK classification and NOT-qualifier propagation are applied during both generation and export.

created_at: Mapped[datetime]
groundtruth_uri: Mapped[str | None]
id: Mapped[uuid.UUID]
job: Mapped[Job | None]
job_id: Mapped[uuid.UUID | None]
new_annotation_set: Mapped[AnnotationSet]
new_annotation_set_id: Mapped[uuid.UUID]
old_annotation_set: Mapped[AnnotationSet]
old_annotation_set_id: Mapped[uuid.UUID]
stats: Mapped[dict[str, Any]]
class protea.infrastructure.orm.models.annotation.evaluation_result.EvaluationResult(**kwargs)

Bases: Base

Fmax / PR / RC results from running the CAFA evaluator on a prediction set.

Stores per-setting (NK/LK/PK) and per-namespace (BPO/MFO/CCO) metrics in a JSONB column so they can be displayed in the UI without additional queries.

results structure:

{
  "NK": {
    "BPO": {"fmax": 0.45, "precision": 0.51, "recall": 0.40,
            "tau": 0.32, "coverage": 0.95, "n_proteins": 100},
    "MFO": {...},
    "CCO": {...}
  },
  "LK": {...},
  "PK": {...}
}
created_at: Mapped[datetime]
evaluation_set: Mapped[EvaluationSet]
evaluation_set_id: Mapped[uuid.UUID]
id: Mapped[uuid.UUID]
job: Mapped[Job | None]
job_id: Mapped[uuid.UUID | None]
prediction_set: Mapped[PredictionSet]
prediction_set_id: Mapped[uuid.UUID]
reranker_config: Mapped[dict[str, Any] | None]
reranker_model: Mapped[RerankerModel | None]
reranker_model_id: Mapped[uuid.UUID | None]
results: Mapped[dict[str, Any]]
scoring_config: Mapped[ScoringConfig | None]
scoring_config_id: Mapped[uuid.UUID | None]

Embeddings

EmbeddingConfig defines a reproducible embedding recipe: model identifier, layer selection, pooling strategy, normalisation flags, and chunking parameters. Its UUID primary key is stable; changing any parameter creates a new configuration row. Both SequenceEmbedding rows and PredictionSet rows reference the same EmbeddingConfig, guaranteeing that query and reference embeddings are always comparable.

SequenceEmbedding stores a pgvector VECTOR for each (sequence, config, chunk) triple. When chunking is disabled the chunk index is 0 and the end index is NULL. pgvector is used for storage only; nearest- neighbour queries are performed in Python via protea.core.knn_search.

class protea.infrastructure.orm.models.embedding.embedding_config.EmbeddingConfig(**kwargs)

Bases: Base

Defines a reproducible recipe for computing protein embeddings.

Every SequenceEmbedding row points to exactly one EmbeddingConfig, providing complete provenance: which model, which transformer layers, how layers are aggregated, how the sequence is pooled, and whether the vector is L2-normalised.

Layer indexing convention (reverse, consistent with PIS)

layer_indices use reverse indexing: 0 = last (most semantic) layer, 1 = penultimate, 2 = antepenultimate, etc. This matches the convention used across all backends in PIS / FANTASIA.

Layer aggregation strategies

  • mean : element-wise average across selected layers (dim unchanged).

  • concat : concatenation of all selected layers (dim × n_layers).

Sequence pooling strategies

  • mean : mean over residue representations.

  • max : max over residue representations.

  • cls : CLS/BOS token only (position 0 of raw hidden states).

  • mean_max : concatenation of mean and max (dim × 2).

Model backends

  • esm : HuggingFace EsmModel (ESM-2 family).

  • esm3cESM SDK ESMC (ESM3c family). No external tokenizer.

    Runs FP16 on GPU. CLS and EOS tokens stripped for pooling.

  • t5HuggingFace T5EncoderModel (ProstT5, prot_t5_xl…).

    ProSTT5 mode auto-detected from model_name.

  • ankhHuggingFace T5EncoderModel loaded via AutoTokenizer

    (Ankh base/large). No <AA2fold> prefix; ambiguous residues are substituted with X like the other T5 path.

  • auto : falls back to esm.

Normalisation

  • normalize_residuesL2-normalise each residue representation before

    pooling (applied after layer aggregation).

  • normalize : L2-normalise the final pooled vector.

Chunking

Long sequences can be split into overlapping chunks before pooling. Each chunk produces one SequenceEmbedding row identified by chunk_index_s and chunk_index_e.

chunk_overlap: Mapped[int]
chunk_size: Mapped[int]
created_at: Mapped[datetime]
description: Mapped[str | None]
display_name: Mapped[str | None]
family: Mapped[str | None]
id: Mapped[UUID]
layer_agg: Mapped[str]
layer_indices: Mapped[list[Any]]
max_length: Mapped[int]
model_backend: Mapped[str]
model_name: Mapped[str]
normalize: Mapped[bool]
normalize_residues: Mapped[bool]
param_count: Mapped[int | None]
pooling: Mapped[str]
use_chunking: Mapped[bool]
class protea.infrastructure.orm.models.embedding.sequence_embedding.SequenceEmbedding(**kwargs)

Bases: Base

Stores a computed embedding for a sequence under a specific EmbeddingConfig.

One row per (sequence, config, chunk_start). When chunking is disabled chunk_index_s=0 and chunk_index_e=None (NULL in the DB), meaning the embedding covers the full sequence. When chunking is enabled, each chunk produces a separate row identified by its start/end residue indices.

Proteins sharing the same amino-acid sequence share one set of embedding rows per config (deduplicated at the Sequence level).

Full traceability: embedding_config records the exact model, transformer layers, aggregation strategy, pooling, normalisation, and chunking parameters used.

chunk_index_e: Mapped[int | None]
chunk_index_s: Mapped[int]
created_at: Mapped[datetime]
embedding: Mapped[Any]
embedding_config: Mapped[EmbeddingConfig]
embedding_config_id: Mapped[uuid.UUID]
embedding_dim: Mapped[int]
id: Mapped[int]
sequence: Mapped[Sequence]
sequence_id: Mapped[int]

GO Prediction Features

GOPredictionFeatures stores the per-prediction feature vector used by the re-ranker as a separate table, normalised away from GOPrediction to keep the hot predictions table slim. Each row carries the full 56-column feature set produced by the feature-enrichment pipeline.

Dual-write helpers for the GOPrediction.features JSONB column.

T3.1a of the executor plan ships the JSONB column as a unidirectional write target: every typed feature column on GOPrediction is mirrored into the blob at insert time. Readers stay on the typed columns until T3.1b. from_json round-trips the blob into a feature dict so the T3.1b cut-over can land without changing the helper’s contract.

JSONB key naming follows the typed column names (snake_case, identical), so a future reader cut-over is a column-rename-free swap.

protea.infrastructure.orm.models.embedding.go_prediction_features.FEATURE_JSONB_KEYS: tuple[str, ...] = ('distance', 'qualifier', 'evidence_code', 'identity_nw', 'similarity_nw', 'alignment_score_nw', 'gaps_pct_nw', 'alignment_length_nw', 'identity_sw', 'similarity_sw', 'alignment_score_sw', 'gaps_pct_sw', 'alignment_length_sw', 'length_query', 'length_ref', 'vote_count', 'k_position', 'go_term_frequency', 'ref_annotation_density', 'neighbor_distance_std', 'neighbor_vote_fraction', 'neighbor_min_distance', 'neighbor_mean_distance', 'query_taxonomy_id', 'ref_taxonomy_id', 'taxonomic_lca', 'taxonomic_distance', 'taxonomic_common_ancestors', 'taxonomic_relation', 'anc2vec_neighbor_cos', 'anc2vec_neighbor_maxcos', 'anc2vec_has_emb', 'anc2vec_query_known_cos', 'anc2vec_query_known_maxcos', 'anc2vec_query_known_count', 'tax_voters_same_frac', 'tax_voters_close_frac', 'tax_voters_mean_common_ancestors', 'emb_pca_query_0', 'emb_pca_query_1', 'emb_pca_query_2', 'emb_pca_query_3', 'emb_pca_query_4', 'emb_pca_query_5', 'emb_pca_query_6', 'emb_pca_query_7', 'emb_pca_query_8', 'emb_pca_query_9', 'emb_pca_query_10', 'emb_pca_query_11', 'emb_pca_query_12', 'emb_pca_query_13', 'emb_pca_query_14', 'emb_pca_query_15')

Canonical feature keys mirrored into GOPrediction.features. Order is stable so JSONB dumps diff predictably.

protea.infrastructure.orm.models.embedding.go_prediction_features.build_feature_jsonb(typed_row: Mapping[str, Any]) dict[str, Any]

Mirror the typed-column values into a JSONB-shaped dict.

Single source of truth for the T3.1a dual-write: every writer hands its already-cleaned GOPrediction row dict to this helper, and the result is what lands in the features JSONB column.

Identity columns (prediction_set_id, protein_accession …) are dropped because they are already keys on the row; only the feature signals are mirrored. Keys absent from typed_row map to None so the blob shape stays stable across writer sites.

protea.infrastructure.orm.models.embedding.go_prediction_features.from_json(blob: Mapping[str, Any] | None) dict[str, Any]

Round-trip a JSONB blob back into a feature dict.

Reserved for the T3.1b reader cut-over: readers will call this on the JSONB column and get the same shape build_feature_jsonb produced. None (legacy / pre-dual-write rows) returns an empty dict so callers can treat missing as “no features”.

Predictions

PredictionSet is the result container for one run of predict_go_terms. It links the query set, embedding configuration, annotation set, and ontology snapshot used, making every prediction set fully reproducible. GOPrediction stores one row per (query protein, GO term, reference protein) triple. The 14 optional feature-engineering columns (alignment statistics and taxonomy fields) and 5 re-ranker aggregate features (vote_count, k_position, go_term_frequency, ref_annotation_density, neighbor_distance_std) are NULL unless the corresponding flags were set in the prediction payload.

class protea.infrastructure.orm.models.embedding.prediction_set.PredictionSet(**kwargs)

Bases: Base

Groups GO predictions from a single prediction run.

Records which EmbeddingConfig was used for similarity search and which AnnotationSet was used as the reference, providing complete traceability for every predicted GO term.

annotation_set: Mapped[AnnotationSet]
annotation_set_id: Mapped[uuid.UUID]
created_at: Mapped[datetime]
distance_threshold: Mapped[float | None]
embedding_config: Mapped[EmbeddingConfig]
embedding_config_id: Mapped[uuid.UUID]
id: Mapped[uuid.UUID]
limit_per_entry: Mapped[int]
meta: Mapped[dict[str, Any]]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
predictions: Mapped[list[GOPrediction]]
query_set: Mapped[QuerySet | None]
query_set_id: Mapped[uuid.UUID | None]
class protea.infrastructure.orm.models.embedding.go_prediction.GOPrediction(**kwargs)

Bases: Base

One predicted GO term for a protein within a prediction set.

The prediction is derived by transferring annotations from the nearest reference protein (ref_protein_accession) in embedding space. The distance field records the cosine distance to that neighbor, which serves as a proxy for prediction confidence (lower = more similar).

alignment_length_nw: Mapped[float | None]
alignment_length_sw: Mapped[float | None]
alignment_score_nw: Mapped[float | None]
alignment_score_sw: Mapped[float | None]
anc2vec_has_emb: Mapped[float | None]
anc2vec_neighbor_cos: Mapped[float | None]
anc2vec_neighbor_maxcos: Mapped[float | None]
anc2vec_query_known_cos: Mapped[float | None]
anc2vec_query_known_count: Mapped[float | None]
anc2vec_query_known_maxcos: Mapped[float | None]
distance: Mapped[float]
emb_pca_query_0: Mapped[float | None]
emb_pca_query_1: Mapped[float | None]
emb_pca_query_10: Mapped[float | None]
emb_pca_query_11: Mapped[float | None]
emb_pca_query_12: Mapped[float | None]
emb_pca_query_13: Mapped[float | None]
emb_pca_query_14: Mapped[float | None]
emb_pca_query_15: Mapped[float | None]
emb_pca_query_2: Mapped[float | None]
emb_pca_query_3: Mapped[float | None]
emb_pca_query_4: Mapped[float | None]
emb_pca_query_5: Mapped[float | None]
emb_pca_query_6: Mapped[float | None]
emb_pca_query_7: Mapped[float | None]
emb_pca_query_8: Mapped[float | None]
emb_pca_query_9: Mapped[float | None]
evidence_code: Mapped[str | None]
features: Mapped[dict | None]
gaps_pct_nw: Mapped[float | None]
gaps_pct_sw: Mapped[float | None]
go_term: Mapped[GOTerm]
go_term_frequency: Mapped[int | None]
go_term_id: Mapped[int]
id: Mapped[int]
identity_nw: Mapped[float | None]
identity_sw: Mapped[float | None]
k_position: Mapped[int | None]
length_query: Mapped[int | None]
length_ref: Mapped[int | None]
neighbor_distance_std: Mapped[float | None]
neighbor_mean_distance: Mapped[float | None]
neighbor_min_distance: Mapped[float | None]
neighbor_vote_fraction: Mapped[float | None]
prediction_set: Mapped[PredictionSet]
prediction_set_id: Mapped[uuid.UUID]
predictions_jsonb: Mapped[dict | None]
protein_accession: Mapped[str]
qualifier: Mapped[str | None]
query_taxonomy_id: Mapped[int | None]
ref_annotation_density: Mapped[int | None]
ref_protein_accession: Mapped[str]
ref_taxonomy_id: Mapped[int | None]
similarity_nw: Mapped[float | None]
similarity_sw: Mapped[float | None]
tax_voters_close_frac: Mapped[float | None]
tax_voters_mean_common_ancestors: Mapped[float | None]
tax_voters_same_frac: Mapped[float | None]
taxonomic_common_ancestors: Mapped[int | None]
taxonomic_distance: Mapped[int | None]
taxonomic_lca: Mapped[int | None]
taxonomic_relation: Mapped[str | None]
vote_count: Mapped[int | None]

Re-ranker Models

RerankerModel stores a trained LightGBM re-ranker for re-scoring GO term predictions. The booster itself can live either inline in the legacy model_data (Text, nullable) column or by reference in artifact_uri (String(512)) resolved through an ArtifactStore. New rows registered via scripts/register_reranker.py from a protea-reranker-lab run always use the artifact-backed path.

Provenance columns travel with every artifact-backed row: feature_schema_sha (String(16), load-bearing at inference time), embedding_config_id / ontology_snapshot_id (FKs, both SET NULL), producer_version (String(64)), producer_git_sha (String(40)) and spec_yaml (Text). metrics and feature_importance remain JSONB.

class protea.infrastructure.orm.models.embedding.reranker_model.RerankerModel(**kwargs)

Bases: Base

A trained LightGBM re-ranker model.

The booster can be stored inline (model_data, legacy) or by reference (artifact_uri, preferred). Rows registered through scripts/register_reranker.py always point at the artifact store; older rows still serialize the booster inline.

Provenance columns (feature_schema_sha, producer_version, producer_git_sha, spec_yaml) let us reproduce and audit a model without re-running the lab. feature_schema_sha is load-bearing at inference time: the predict operation refuses to use a booster whose expected feature schema does not match the live pipeline (fallback to no-reranking).

artifact_uri: Mapped[str | None]
aspect: Mapped[str | None]
category: Mapped[str]
created_at: Mapped[datetime]
dataset_id: Mapped[UUID | None]
embedding_config_id: Mapped[UUID | None]
evaluation_set_id: Mapped[UUID | None]
external_source: Mapped[str | None]
feature_importance: Mapped[dict[str, Any]]
feature_schema_sha: Mapped[str | None]
id: Mapped[UUID]
metrics: Mapped[dict[str, Any]]
model_data: Mapped[str | None]
name: Mapped[str]
ontology_snapshot_id: Mapped[UUID | None]
prediction_set_id: Mapped[UUID | None]
producer_git_sha: Mapped[str | None]
producer_version: Mapped[str | None]
schema_sha_v2: Mapped[str | None]
spec_yaml: Mapped[str | None]

Scoring Configurations

ScoringConfig defines a set of feature weights and parameters for scoring GO predictions. Each config is a named, immutable recipe that can be applied to any prediction set to produce a composite score per prediction row.

ORM model for ScoringConfig — reproducible scoring formulas for GOPrediction rows.

A ScoringConfig stores two complementary layers of configuration:

  1. Signal weights (weights field): a dict mapping each composite signal (e.g. embedding_similarity, identity_nw) to a relative weight [0, 1]. Missing signals — because the corresponding feature-engineering flag was not enabled at prediction time — are automatically excluded from the denominator, so the remaining active signals always produce a normalised [0, 1] score.

  2. Evidence-code weights (evidence_weights field, optional JSONB): a per-GO-evidence-code quality multiplier, also in [0, 1]. When None, DEFAULT_EVIDENCE_WEIGHTS is used as the fallback. Supplying a partial dict overrides only the codes present; codes absent from the dict fall back to the system default, making partial overrides safe.

This two-layer design separates how much a signal matters (signal weights) from how trustworthy the underlying annotation is (evidence weights), which are independent research decisions.

Formulas

linear

score = Σ(w_i · s_i) / Σ(w_i) for all active (w_i > 0, s_i available) signals.

evidence_weighted

Same as linear but the resolved evidence weight is always applied as a final multiplier on top of the weighted sum — even when its signal weight is 0. This allows down-ranking IEA-sourced predictions regardless of how strong the embedding or alignment signals are.

Recommended usage: set evidence_weight = 0 in weights when using this formula. The multiplier is always applied, so including evidence_weight in the linear sum compounds the evidence signal twice — usually not intentional. If you want a preset that only vetoes low-evidence predictions without bending the rest of the ranking, use formula=evidence_weighted with evidence_weight=0 in weights.

protea.infrastructure.orm.models.embedding.scoring_config.DEFAULT_EVIDENCE_WEIGHT_FALLBACK: float = 0.5

Fallback weight applied when a code is not found in any lookup table.

protea.infrastructure.orm.models.embedding.scoring_config.EVIDENCE_CODE_GROUPS: dict[str, list[str]] = {'Computational / Phylogenetic': ['ISS', 'ISO', 'ISA', 'ISM', 'IGC', 'IBA', 'IBD', 'IKR', 'IRD', 'RCA'], 'Electronic': ['NAS', 'IEA'], 'Experimental': ['EXP', 'IDA', 'IPI', 'IMP', 'IGI', 'IEP', 'HTP', 'HDA', 'HMP', 'HGI', 'HEP', 'IC', 'TAS'], 'No data': ['ND']}

Ordered grouping of evidence codes used for UI rendering and documentation. Preserves the biological meaning of each tier.

class protea.infrastructure.orm.models.embedding.scoring_config.ScoringConfig(**kwargs)

Bases: Base

Persistent scoring formula definition.

Instances are stored in the scoring_config table and referenced by evaluation endpoints and the UI scoring selector. Every field that influences score computation is serialised, making any result fully reproducible by re-applying the same ScoringConfig to the raw GOPrediction rows.

The formula column must be one of VALID_FORMULAS. The weights JSONB maps signal keys (see DEFAULT_WEIGHTS) to their relative weights — a weight of 0 deactivates the signal. The optional evidence_weights JSONB maps GO evidence codes to per-code quality multipliers in [0, 1] and falls back to DEFAULT_EVIDENCE_WEIGHTS for absent codes.

created_at: Mapped[datetime]
description: Mapped[str | None]
evidence_weights: Mapped[dict[str, Any] | None]
formula: Mapped[str]
id: Mapped[UUID]
name: Mapped[str]
weights: Mapped[dict[str, Any]]

Datasets

Dataset is the durable handle for a frozen re-ranker training dataset published to the artifact store. One row per export_research_dataset run that completes successfully; protea-reranker-lab’s pull_dataset.py resolves either a UUID or a human name against this table to fetch the exact train.parquet / eval.parquet / manifest.json triple that produced a given booster.

Storage is backend-agnostic: train_uri / eval_uri / manifest_uri are opaque URIs (file://… for the local backend, s3://bucket/key for MinIO) resolved through the ArtifactStore interface; callers never need to know which backend is active. Two content fingerprints provide drift detection: schema_sha (16-char) records the feature-set version (must match the booster’s feature_schema_sha at inference time) and manifest_sha (64-char) is the sha256 of the serialized manifest bytes. Provenance lives in producer_version / producer_git_sha so any registered booster can be traced back to the PROTEA HEAD that emitted its dataset.

class protea.infrastructure.orm.models.embedding.dataset.Dataset(**kwargs)

Bases: Base

A frozen re-ranker dataset published to the artifact store.

One row per export_research_dataset (or dump_reranker_dataset) run that completes successfully. The row is the durable handle the lab uses to pull an exact dump — lookups by name (human label) or id (UUID) both resolve here.

train_uri / eval_uri / manifest_uri are opaque URIs in the backend scheme (file://…, s3://bucket/key). The caller does not need to know the backend; ArtifactStore resolves them.

Content addressing: schema_sha fingerprints the feature set (must match the booster’s feature_schema_sha at inference) and manifest_sha is the sha256 of the serialized manifest bytes — two independent dumps with identical provenance will collide on this field, making drift detectable.

annotation_source: Mapped[str]
created_at: Mapped[datetime]
embedding_config_id: Mapped[UUID | None]
eval_snapshot_pair: Mapped[str | None]
eval_uri: Mapped[str | None]
id: Mapped[UUID]
job_id: Mapped[UUID | None]
k: Mapped[int]
key_prefix: Mapped[str]
manifest_sha: Mapped[str | None]
manifest_uri: Mapped[str]
meta: Mapped[dict[str, Any]]
n_eval_rows: Mapped[int]
n_train_rows: Mapped[int]
name: Mapped[str]
ontology_snapshot_id: Mapped[UUID | None]
operation: Mapped[str]
producer_git_sha: Mapped[str | None]
producer_version: Mapped[str | None]
schema_sha: Mapped[str]
schema_sha_v2: Mapped[str | None]
storage_backend: Mapped[str]
train_snapshot_pairs: Mapped[list[str]]
train_uri: Mapped[str | None]

API Keys

APIKey stores hashed API keys together with an optional name label and last_used_at timestamp. The raw key is never stored; only the SHA-256 hex digest is persisted. The is_active flag allows revocation without deletion.

ApiKey ORM model — first iteration of authn (T5.6a).

Stores hashed API keys for HTTP authentication of the most sensitive PROTEA endpoints (POST /jobs, POST /datasets, the reranker import paths). The raw key is shown to the caller exactly once at creation time; subsequent reads expose only the human label and the 8-character prefix (first chars of the raw key) so an operator can identify a key in listings without ever giving the actual secret back.

Storage rules (security-load-bearing)

  • key_hash: sha256 hex digest of the raw key. Lookup is by prefix (indexed) followed by a constant-time hash comparison. The plaintext key never touches the database or any log line.

  • revoked_at is a tombstone column: once set, the key is rejected by require_api_key even if the hash still matches. Revocation is irreversible (no resurrection endpoint) to avoid confusion.

  • last_used_at is best-effort. Updates happen in a background task so they never block the originating request, and a missed update (e.g. process crash between auth and BackgroundTasks.run) is acceptable: it is observability data, not a security gate.

class protea.infrastructure.orm.models.api_key.ApiKey(**kwargs)

Bases: Base

A hashed API key used to authenticate sensitive HTTP requests.

The raw key is generated server-side, returned exactly once by POST /auth/api-keys, and never stored. Persisted columns:

  • id — UUID primary key.

  • key_hash — sha256 hex digest of the raw key (64 chars).

  • prefix — first 8 chars of the raw key, used as a non-unique lookup index and as the display handle in listings.

  • name — human-readable label supplied by the caller.

  • created_at / revoked_at / last_used_at — lifecycle timestamps. revoked_at is a tombstone; last_used_at is best-effort observability.

created_at: Mapped[datetime]
id: Mapped[UUID]
key_hash: Mapped[str]
last_used_at: Mapped[datetime | None]
name: Mapped[str]
prefix: Mapped[str]
revoked_at: Mapped[datetime | None]
role: Mapped[str | None]

Support Entries

SupportEntry stores community feedback: a thumbs-up with an optional comment. Used by the /support router.

class protea.infrastructure.orm.models.support_entry.SupportEntry(**kwargs)

Bases: Base

A thumbs-up + optional comment submitted by a visitor.

comment: Mapped[str | None]
created_at: Mapped[datetime]
id: Mapped[UUID]

Query Sets

QuerySet represents a user-uploaded FASTA dataset. QuerySetEntry stores one row per FASTA entry, preserving the original accession header and linking to the deduplicated Sequence row. If the amino-acid string already exists in the database, the existing Sequence row is reused, avoiding redundant embedding computation.

class protea.infrastructure.orm.models.query.query_set.QuerySet(**kwargs)

Bases: Base

A named collection of sequences uploaded by the user for GO term prediction.

Each uploaded FASTA file creates one QuerySet row. Entries preserve the original accession strings from the FASTA headers and link to the deduplicated Sequence rows. This allows the same physical sequence to appear in multiple query sets without duplication.

created_at: Mapped[datetime]
description: Mapped[str | None]
entries: Mapped[list[QuerySetEntry]]
id: Mapped[UUID]
name: Mapped[str]
class protea.infrastructure.orm.models.query.query_set.QuerySetEntry(**kwargs)

Bases: Base

One sequence within a QuerySet, preserving the original FASTA accession.

accession is the raw identifier from the FASTA header (may not exist in the protein table). sequence_id links to the deduplicated Sequence row used for embedding computation and similarity search.

accession: Mapped[str]
created_at: Mapped[datetime]
id: Mapped[int]
query_set: Mapped[QuerySet]
query_set_id: Mapped[uuid.UUID]
sequence: Mapped[Sequence]
sequence_id: Mapped[int]
species: Mapped[str | None]
taxonomy_id: Mapped[int | None]

Visitor Events

VisitorEvent is the append-only log used by the Grafana “unique visitors” dashboard. One row is written per HTTP GET to a non-asset path. The schema deliberately omits IP addresses: each row stores only visitor_hash, the first 16 hex chars of sha256(daily_salt || client_ip) where daily_salt is a 32-byte random value held in process memory and rotated every calendar day. Once the day rolls over the salt is gone, so cross-day correlation of a visitor becomes cryptographically infeasible. This is the same no-PII model used by Plausible and Fathom.

class protea.infrastructure.orm.models.visitor_event.VisitorEvent(**kwargs)

Bases: Base

Anonymous visit record used for aggregate traffic analytics.

Privacy model: the visitor_hash column never stores an IP address. It is the first 16 hex chars of sha256(daily_salt || client_ip) where daily_salt is a 32-byte random value kept only in process memory and regenerated every calendar day. Once the day rolls over, correlating a visitor across days becomes cryptographically infeasible. This is the same approach Plausible and Fathom use to avoid storing PII while still being able to report “unique visitors per day”.

created_at: Mapped[datetime]
day: Mapped[date]
id: Mapped[int]
method: Mapped[str]
path: Mapped[str]
status: Mapped[int]
visitor_hash: Mapped[str]

Experiment Runs

ExperimentRun is the per-research-run narrative + provenance anchor introduced in T3.8 (Fase 4). A single row aggregates multiple Job / EvaluationResult / RerankerModel rows under one human name (unique). The narrative trio (description / hypothesis / findings) mirrors Job’s T3.9 D11 columns, and the JSONB config + provenance bags are designed to receive snapshots from protea.core.provenance.capture_provenance(). The ExperimentRunStatus enum is plannedrunningdone or abandoned; planned (rather than queued) reflects the draft-first lifecycle of a research run, and abandoned (rather than failed) covers stops without a hard error. Linkage to sibling rows lands in T4.7-T4.9.

ExperimentRun ORM (T3.8 of master plan v3.2 §24 Fase 4).

The narrative anchor that F-EXP campaigns and the F8b Experiments page hang their per-run metadata off of. Mirrors Job’s narrative trio (description / findings / tags from T3.9) but for the broader research-run scope: a single ExperimentRun typically aggregates multiple Jobs / EvaluationResults / RerankerModels under one name.

Linkage to those sibling rows (Job FK array, EvaluationResult join, etc.) lives in follow-up tasks T4.7-T4.9 + the F-EXP campaign work so this slice stays a clean additive ORM bring-up.

class protea.infrastructure.orm.models.experiment_run.ExperimentRun(**kwargs)

Bases: Base

Per-research-run narrative + provenance anchor.

axis_tuple_shortid: Mapped[str | None]
config: Mapped[dict[str, Any]]
created_at: Mapped[datetime]
description: Mapped[str | None]
ensemble_spec: Mapped[dict[str, Any] | None]
eval_set_manifest_sha: Mapped[str | None]
eval_set_name: Mapped[str | None]
feature_schema_sha: Mapped[str | None]
findings: Mapped[str | None]
finished_at: Mapped[datetime | None]
hypothesis: Mapped[str | None]
id: Mapped[UUID]
k: Mapped[int | None]
name: Mapped[str]
plm: Mapped[str | None]
propagation: Mapped[str | None]
provenance: Mapped[dict[str, Any]]
reranker_spec_id: Mapped[str | None]
schema_sha_v2: Mapped[str | None]
started_at: Mapped[datetime | None]
status: Mapped[ExperimentRunStatus]
tags: Mapped[list[str]]
class protea.infrastructure.orm.models.experiment_run.ExperimentRunStatus(*values)

Bases: StrEnum

Lifecycle states for an ExperimentRun.

Mirrors JobStatus semantically but with planned as the initial state (Jobs default to QUEUED; experiments often live as drafts before any compute kicks off) and abandoned instead of failed (a research run can be stopped without a hard error).

ABANDONED = 'abandoned'
DONE = 'done'
PLANNED = 'planned'
RUNNING = 'running'

Logging

protea.infrastructure.logging provides structured JSON logging via a custom JSONFormatter. The configure_logging() function sets up the root logger with either JSON or plain text output, used by worker processes and the API server.

Structured logging configuration for PROTEA.

Provides a JSON formatter using only the Python standard library and a configure_logging() helper that workers and the API can call at startup.

class protea.infrastructure.logging.JSONFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)

Bases: Formatter

Formats log records as single-line JSON objects.

Each line contains at least timestamp, level, logger, and message. Any extra fields attached to the record are merged into the top-level JSON object, making it easy to add structured context (e.g. logger.info("started", extra={"queue": "protea.jobs"})).

format(record: LogRecord) str

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

protea.infrastructure.logging.configure_logging(*, json: bool = True, level: str = 'INFO') None

Configure the root logger for the process.

Parameters:
  • json – When True (the default), use JSONFormatter so that every log line is a valid JSON object. When False, fall back to the plain-text format used during local development.

  • level – Root log level name (e.g. "INFO", "DEBUG").

Artifact storage

protea.infrastructure.storage defines the ArtifactStore Protocol and its two concrete backends. It is the single surface for writing and reading large produced blobs (re-ranker boosters, frozen datasets) and is kept strictly separate from the evaluation-artifacts directory consumed by run_cafa_evaluation.

ArtifactStore is a typing.Protocol with four methods:

  • put(key: str, src: Path | bytes) -> str: store a blob under key and return its URI.

  • get(key: str) -> bytes: fetch raw bytes stored at key.

  • url(key: str) -> str: return the backend-specific URI for key without performing I/O.

  • exists(key: str) -> bool: check whether key is present.

URIs are always persisted verbatim in the database so consumers can resolve them without knowing the concrete backend:

  • LocalFsArtifactStore emits file:///absolute/path/... URIs.

  • MinioArtifactStore emits s3://<bucket>/<key> URIs.

The MinioArtifactStore client is imported lazily so PROTEA can be installed without the minio package; the constructor raises a clear ImportError pointing at the [storage] extra when the dependency is missing.

get_artifact_store(settings) is the entry point used by every operation that needs to write a blob. It reads settings.storage_backend ("local" or "minio") and returns the appropriate instance. If MinIO is selected but the endpoint or credentials are incomplete, or the server is unreachable at construction time, the factory logs a warning and returns a LocalFsArtifactStore rooted at settings.storage_root (or settings.artifacts_dir as a fallback). This prevents missing optional infrastructure from crashing the stack in development.

Artifact storage abstraction.

An ArtifactStore is the single write/read surface for large blobs produced by PROTEA (exported datasets, reranker models, etc.) — separate from the filesystem-based artifacts_dir used by the evaluation pipeline.

Two backends are provided:

  • LocalFsArtifactStore — a plain directory on disk. Default, always available.

  • MinioArtifactStore — S3-compatible object storage (self-hosted). Requires the minio extra (pip install 'protea[storage]').

Choose a backend through get_artifact_store() which reads PROTEA_STORAGE_BACKEND / settings.storage_backend.

class protea.infrastructure.storage.ArtifactStore(*args, **kwargs)

Bases: Protocol

Minimal blob store interface.

Keys are forward-slash separated strings (foo/bar/baz.txt). URIs returned by put() / url() are backend-specific (file:///… for local, s3://bucket/key for MinIO) and should be persisted as-is in the database so consumers can resolve them without knowing the backend.

delete(key: str) bool

Remove key from the store. Returns True if an object was deleted, False if the key did not exist. Idempotent.

exists(key: str) bool

Whether key is present in the store.

get(key: str) bytes

Fetch raw bytes stored at key.

put(key: str, src: Path | bytes) str

Store src under key and return its URI.

url(key: str) str

Return the URI for key without performing I/O.

class protea.infrastructure.storage.LocalFsArtifactStore(root: Path)

Bases: object

Stores blobs under root as regular files.

URIs are file:///absolute/path/… so a consumer that reads a URI out of the DB can open(urlparse(uri).path, "rb") without any client.

delete(key: str) bool
exists(key: str) bool
get(key: str) bytes
put(key: str, src: Path | bytes) str
url(key: str) str
protea.infrastructure.storage.get_artifact_store(settings: Settings)

Filesystem-backed artifact store.

class protea.infrastructure.storage.local.LocalFsArtifactStore(root: Path)

Bases: object

Stores blobs under root as regular files.

URIs are file:///absolute/path/… so a consumer that reads a URI out of the DB can open(urlparse(uri).path, "rb") without any client.

delete(key: str) bool
exists(key: str) bool
get(key: str) bytes
put(key: str, src: Path | bytes) str
url(key: str) str

MinIO-backed artifact store.

The minio client is imported lazily so the package can be installed without it (protea[storage] opts in). A clear ImportError is raised on instantiation if the dependency is missing.

class protea.infrastructure.storage.minio_store.MinioArtifactStore(*, endpoint: str, bucket: str, access_key: str, secret_key: str, secure: bool = False)

Bases: object

S3-compatible object store (MinIO).

URIs are s3://<bucket>/<key>. The endpoint, credentials, and bucket come from settings; the caller should not need to touch them directly.

delete(key: str) bool
exists(key: str) bool
get(key: str) bytes
put(key: str, src: Path | bytes) str
url(key: str) str

Backend selection for ArtifactStore.

Reads settings.storage_backend (with the PROTEA_STORAGE_BACKEND env var override resolved by load_settings()) and returns a concrete store. Missing MinIO configuration (no endpoint/credentials) falls back to the local filesystem with a warning so dev stacks boot cleanly. An explicit backend: minio with an unreachable endpoint raises ArtifactStoreUnavailable — silent degradation here would produce Dataset rows with storage_backend=local + file://… URIs that the lab cannot resolve from another host, which is a harder bug to spot than a startup failure.

exception protea.infrastructure.storage.factory.ArtifactStoreUnavailable

Bases: RuntimeError

Raised when the configured MinIO backend cannot be reached.

protea.infrastructure.storage.factory.get_artifact_store(settings: Settings)

Queue

The queue layer provides two classes: QueueConsumer and OperationConsumer.

QueueConsumer reads a job UUID from a RabbitMQ queue and delegates to BaseWorker.handle_job(). It is used for queues where every message corresponds to a tracked Job row: protea.ping, protea.jobs, and protea.embeddings. Before dispatching, QueueConsumer checks whether the Job row is already in CANCELLED state and, if so, nacks the delivery with requeue=False. This prevents pointless worker execution on stale pre-queued messages and avoids a prefetch=1 deadlock on queues full of orphaned cancellations (T-INFRA.NACK, PR #373).

OperationConsumer reads a raw serialised operation payload from the queue and executes it directly, without creating a Job row. It is used for high-throughput batch queues (protea.embeddings.batch, protea.embeddings.write, protea.predictions.batch, protea.predictions.write) where creating thousands of child rows would cause queue bloat. Progress is tracked at the parent job level only, via the atomic progress_current increment.

The publisher module provides publish_job() and publish_operation() helpers. Both are called by BaseWorker after the DB commit (not before), guaranteeing that workers always find the DB row before they try to claim it.

protea.infrastructure.queue.publisher.publish_job(amqp_url: str, queue_name: str, job_id: UUID) None

Publish a job dispatch message {"job_id": "<uuid>"} to a queue.

protea.infrastructure.queue.publisher.publish_operation(amqp_url: str, queue_name: str, payload: dict[str, Any]) None

Publish an ephemeral operation message to a queue.

Unlike publish_job, publishes a full payload dict consumed by OperationConsumer. Expected format:

{
    "operation": "<name>",
    "job_id":    "<parent-uuid>",
    "payload":   { ... operation-specific fields ... },
}
protea.infrastructure.queue.publisher.safe_republish_job(amqp_url: str, queue_name: str, job_id: UUID) None

Re-publish a re-queued job; logs but does NOT raise on failure.

Used by the SIGTERM grace watchdog after BaseWorker.requeue_on_shutdown() flips the row back to QUEUED. Failures fall through to the StaleJobReaper via the lease-expiry path so a flaky RabbitMQ never leaves the row stranded.

class protea.infrastructure.queue.consumer.ConsumerOptions(prefetch_count: int = 1, requeue_on_failure: bool = False)

Bases: NamedTuple

Tunable knobs shared by QueueConsumer / OperationConsumer.

Bundles the two AMQP-side tunables (prefetch_count and requeue_on_failure) so consumer constructors stay under the §3 6-param ceiling. Call sites can pass ConsumerOptions(...) or omit it to accept the defaults (prefetch=1, no requeue on failure).

prefetch_count: int

Alias for field number 0

requeue_on_failure: bool

Alias for field number 1

class protea.infrastructure.queue.consumer.OperationConsumer(amqp_url: str, queue_name: str, registry: OperationRegistry, session_factory: sessionmaker[Session], *, options: ConsumerOptions = (1, False))

Bases: object

RabbitMQ consumer for ephemeral operation messages.

Unlike QueueConsumer (which manages the full Job lifecycle via BaseWorker), this consumer handles lightweight operation messages that have no DB Job row of their own. Workers process the operation, write results directly to the DB, and atomically update the parent Job’s progress counter.

Expected message format:

{
    "operation": "<operation-name>",
    "job_id":    "<parent-job-uuid>",
    "payload":   { ... operation-specific fields ... }
}
run() None
class protea.infrastructure.queue.consumer.QueueConsumer(amqp_url: str, queue_name: str, worker: BaseWorker, *, options: ConsumerOptions = (1, False))

Bases: object

Thin RabbitMQ consumer that delegates job execution to BaseWorker.

Responsibilities are strictly limited to transport concerns: - Connect to RabbitMQ and declare the queue. - Receive messages containing a JSON {"job_id": "<uuid>"} body. - Call BaseWorker.handle_job(job_id) for each valid message. - Ack on success, nack on failure or invalid message. - Graceful shutdown on SIGINT / SIGTERM.

All business logic, DB state transitions, and event emission happen inside BaseWorker — this class knows nothing about operations.

run() None

See also