Skip to main content

Marconi

Marconi is the in-memory signal-mesh service that replaced signal_service.py’s synchronous Redis-and-PG hot path in May 2026. Sender shim → Marconi → receiver shim. That is the whole signal flow. Everything else — durable cache, audit archive, OTEL — sits downstream of Marconi as an asynchronous fan-out and never blocks delivery. If you want to understand how to use the signal mesh, read Signal Mesh first. This page is the architectural deep-dive: what Marconi owns, what tables it keeps, why Redis came off the hot path, and where the stage ladder stands today. The canonical specification is SPEC-101 v0.4. The loss budget and recovery invariants are in SPEC-101 loss budget + recovery. The operator-facing runbook is Marconi disaster recovery.

The architectural lock

The architecture is locked by two operator quotes (Frank, 2026-05-10):
“The signal flow is sender shim → Marconi → receiver shim. That is it. Marconi then pushes signal to Redis, then Redis pushes to PG. Redis is by no means in the signal flow.”
“Marconi switch in-mem → Marconi Cache 7d → PG audit. Once it hits cache it immediately goes to PG.”
Three tiers, two flows, zero Redis on the signal-delivery path, zero MCP-to-PG shortcuts. The v0.4 topology is final for single-instance backends; cross-instance routing remains reserved (MARCONI_CROSS_INSTANCE_FORWARD) for a future revision.

What Marconi replaces

The legacy paths used Redis pubsub as an in-process message bus:
  • The sender thread did PUBLISH session_channel
  • The recipient’s WebSocket handler did SUBSCRIBE session_channel
  • A forwarding loop pumped messages from Redis pubsub onto the actual WebSocket
  • A synchronous INSERT into signal_queue ran on the hot path before the API returned 200
This worked correctly but added a Redis round-trip for every signal even when sender and receiver were in the same backend process — which they were in the single-process deployment. The synchronous PG write tied delivery latency to disk I/O, and any Redis blip cascaded into signal-delivery latency. The loopback diagnostic (SPEC-100, PR #276) and postmortem 1736e40d made the drift visible. Marconi eliminates the pubsub layer entirely. The receiver’s WebSocket handler registers itself with Marconi’s routing table at connect time (storing the WS handle or a per-session asyncio.Queue alongside the SessionRecord). The sender’s prism_signal looks up the recipient in Marconi’s routing table and pushes directly to the WS handle. No Redis publish, no Redis subscribe, no forwarding loop, no synchronous PG write. Module path: backend/app/marconi/. Log prefix: marconi:. Metric namespace: marconi_*. The MCP verb surface (prism_signal, prism_signal_ack, prism_signal_trace, prism_signals_pending, prism_signal_recall) is unchanged — Marconi sits behind a stable external API.

In-memory tables (Marconi-owned)

All tables are per-process, hash-keyed, and protected by asyncio.Lock.

Routing table

KeyValue
(tenant_id, project_id, identity)LiveSession{ session_id, surface, machine_id, ws_handle, registered_at, last_heartbeat }
Lookup is O(1). Updated on register, deregister, heartbeat, WS connect, WS disconnect. Read on every send_signal.

Registration table

KeyValue
session_idRegistration{ tenant_id, project_id, identity, surface, machine_id, registered_at, last_heartbeat, ws_handle }
Sister index of routing; allows session-id-keyed lookups for heartbeat, deregister, and WS-attach.

WS handle storage

Each Marconi entry holds either the live WebSocket object directly OR a per-session asyncio.Queue that the WebSocket handler reads from. When send_signal resolves a recipient, it:
  1. Fetches the entry from the routing table.
  2. Pushes the signal envelope into the entry’s WS handle / queue.
  3. Returns 200 to the sender.
If the recipient is offline (no entry) or has an entry but no live WS attached, the signal is enqueued in the pending-signal index and outcome=queued_offline is recorded.

Obligation index

Per-tenant in-memory map of open obligations:
KeyValue
signal_idObligation{ kind, ack_sla_seconds, terminal_sla_seconds, created_at, ack_received_at, terminal_received_at, durability_status }
In-memory primary; the Redis Stream handoff reflects durability_status. SLA sweepers operate on the in-memory map.

Pending-signal index

Per-recipient queue of undelivered signals + TTL. Drained on recipient registration / WS connect, and on explicit prism_signals_pending calls. Entries reference audit-queue rows by id.

In-memory audit queue

Internal Marconi buffer that feeds the audit fan-out. Not on the delivery path. Append-only queue per tenant_id holding accepted signal envelopes between the moment send_signal returns 200 and the moment the Redis Stream writer successfully appends the entry to marconi:signals:{tenant_id}. Each entry is a full signal envelope plus arrival timestamp plus delivery outcome (pushed, queued_offline, no_subscriber, etc.). Sizing — per-tenant config:
FieldMeaning
max_entriesHard cap on queue size
max_bytesHard cap on queue memory footprint
max_age_secondsEject entries older than this regardless of size
overwrite_policyoldest (default), reject_new, bounded_back_pressure
Sizing formula: max_entries ≥ burst_rate_per_sec × max_redis_writer_lag_seconds × safety_factor (≥2), bounded by max_bytes. Defaults (personal install): max_entries=50_000, max_bytes=512MB, max_age_seconds=600, overwrite_policy=oldest. Production multi-tenant tuning requires observed lag/overwrite metrics for at least one steady-state day. Marconi does not block on the Redis writer’s health. When the writer is unreachable, the audit queue grows; when the queue fills, the oldest entries are overwritten and counted as marconi_audit_queue_overwrite_total — the operator-visible loss event under v0.4’s loss budget.

Cache invalidation — direct write-through, no pubsub

All session-state changes happen inside the same Python process as Marconi’s tables. Invalidation is a direct function call. No Redis pubsub anywhere in Marconi’s surface. The canonical hook table:
Hook siteMarconi call
controller_service.register (after SessionStore.register success)invalidator.on_register(tenant_id, project_id, session_id)
controller_service.deregister_session (after SessionStore.release success)invalidator.on_deregister(tenant_id, project_id, session_id)
controller_service.handoff_master (after master flag flip)invalidator.on_master_change(tenant_id, project_id)
controller_service.claim_master (after master flag flip)invalidator.on_master_change(tenant_id, project_id)
controller_service.stamp_heartbeat / stamp_engagementinvalidator.on_heartbeat(...) (cheap freshness update; does not full-invalidate)
KeyspaceReaper (Redis TTL expiry)invalidator.on_session_expired(tenant_id, project_id, session_id)
WS connect (session_stream handler attach)invalidator.on_ws_connect(session_id, ws_handle)
WS disconnectinvalidator.on_ws_disconnect(session_id)
marconi/lifecycle.py provides warm_caches_from_store(store) called from the FastAPI lifespan after SessionStore connect. Every hook above has a unit test covering the cache state transition; marconi_invalidator_errors_total must be zero in steady state.

Audit fan-out

Three FastAPI in-process background tasks consume the audit queue downstream of the hot path:
  1. Redis Stream writer — drains the in-memory audit queue into marconi:signals:{tenant_id} Redis Stream with MAXLEN ~7d (approximate trim by time). This stream is the Marconi Cache and the durable boundary for v0.4. Writer resumes from last-acknowledged audit-queue offset (held in Redis as a key for restart safety). Every successful XADD wakes the PG archiver’s blocking consumer-group read.
  2. PG archiver — consumes the Marconi Cache as a consumer group with XREADGROUP COUNT 1 BLOCK 0. Each stream entry drains near-immediately into PG audit with idempotent signal_id UPSERT semantics against signal_queue, signal_trace_events, and signal_obligations. No batching. The archiver optimizes for near-immediate audit persistence, not throughput batching.
  3. OTEL emitter — increments marconi_signals_received_total, marconi_signals_delivered_total, etc. Background; never blocks.
The Neo4j edge builder follows the same pattern (consumer reads from the Redis Stream or directly from the audit queue if co-located, builds edges asynchronously). The audit pipeline operates under the MCP boundary rule: MCP shim → FastAPI → store. Both audit workers are FastAPI in-process background tasks, never MCP shortcuts. No sidecar script or host-local process may write the Marconi PG audit tables directly.
MCP shim ──▶ FastAPI API ──▶ Marconi switch ──▶ Marconi Cache ──▶ PG audit

Stage ladder — current shipped status

The migration landed as a sequence of feature-flagged PRs. Each stage cleared exit criteria before the next stage’s flag flipped. Flag flips, in shipping order:
FlagDefaultFlipped onPR
MARCONI_ROUTING_TABLE_READSOFFStage 1 exit gate#280
MARCONI_REGISTRATION_TABLE_READSOFFStage 1 exit gate#280
MARCONI_AUDIT_QUEUE_WRITEOFFStage 3 ship#284
MARCONI_REDIS_STREAM_WRITEROFFStage 3 ship#284 / #297
MARCONI_PG_ARCHIVER_SHADOWOFFStage 4 ship#297
MARCONI_PG_ARCHIVER_PRIMARYOFFStage 4 → primary#299
MARCONI_HOT_PATH_SENDOFFStage 5 cutover#283 / #284
MARCONI_OBLIGATIONS_MEMORY_PRIMARYOFFStage 5 sibling#284
Stage 5 could not ship until Stage 4 archiver was primary — otherwise accepted signals would be delivered but not recorded. The dead-branch cleanup in PR #301 removed AUDIT_PENDING_MARCONI and ACK_DEFERRED_MARCONI paths that the cutover obsoleted. PR #300 normalized the publish_path value from internal pushed_to_marconi to external pushed_to_ws at the archiver boundary. Additional shipping work:
  • Sign-coded result envelope (PR #285, #286) — [stage=…] banner the shim renders alongside the doorbell to make the delivery stage observable without a separate trace call.
  • Channel-probe verbs (PR #287, #290) — prism_channel_probe and prism_channel_probe_ack for operator-invoked end-to-end loopback diagnostics. Default ACK timeout bumped 5s → 10s in PR #290 to accommodate per-persona daemon turn-boundary holds.

Counters — full §9 catalog

Every metric below must exist; the Stage 5 hot-path cutover required them in place before the flag flip.

Hot-path counters (per tenant)

  • marconi_signals_received_total{tenant,signal_type} — at API entry
  • marconi_signals_delivered_total{tenant,signal_type,outcome} — outcome ∈ pushed, queued_offline, no_subscriber, dropped
  • marconi_signals_acked_total{tenant,signal_type,ack_kind} — final-delivery evidence
  • marconi_signal_send_duration_seconds{tenant} — histogram, p50/p95/p99
  • marconi_routing_table_size{tenant} — gauge
  • marconi_routing_cache_hits_total{tenant} / marconi_routing_cache_misses_total{tenant}

Audit queue + fan-out counters

  • marconi_audit_queue_depth{tenant} — gauge
  • marconi_audit_queue_overwrite_total{tenant} — counter (loss event; non-zero is a paging incident)
  • marconi_redis_writer_lag_seconds{tenant} — gauge
  • marconi_redis_writer_errors_total{tenant,reason} — counter
  • marconi_pg_archiver_lag_seconds{tenant} — gauge
  • marconi_pg_archiver_errors_total{tenant,reason} — counter

Cache-invalidator counters (Stage 2)

  • marconi_invalidator_calls_total{hook} — counter (hook ∈ on_register, on_deregister, on_master_change, on_heartbeat, on_session_expired, on_ws_connect, on_ws_disconnect)
  • marconi_invalidator_errors_total{hook,reason} — counter (non-fatal hook errors; non-zero in steady state is a paging incident)

Obligation counters

  • marconi_obligations_open{tenant,kind} — gauge
  • marconi_obligations_durable_total{tenant,kind} — counter
  • marconi_obligations_degraded_not_durable_total{tenant,kind} — counter
  • marconi_obligation_sla_violation_total{tenant,kind,sla} — counter
The OTEL → Prometheus bridge exposes the full marconi_* namespace via /metrics; see Metrics for the full Prism counter catalog and dashboard wiring.

Failure modes (summary)

Full table in the SPEC-101 loss budget. The short version: delivery is the contract; durability is best-effort with a bounded, observable loss surface.
ScenarioLoss
Backend graceful restart (SIGTERM)0 — audit queue flushed during shutdown
Backend hard kill (OOM / SIGKILL / host crash)Up to audit-queue depth that hadn’t reached Redis yet
Redis transient down0 — audit queue buffers; writer resumes from offset
Redis down longer than queue holdsAudit-queue overwrites; operator-visible (marconi_audit_queue_overwrite_total > 0)
PG down0 — Redis Stream is the upstream cache; archiver resumes from consumer-group checkpoint
Recipient WS disconnect mid-delivery0 — replay-eligible until ACK evidence
Cold-start window after process restart: ~1–2 seconds where new signals route to outcome=queued_offline while shims reconnect. Acceptable for restart cadence; routed signals land in the audit queue and reach the recipient on the next push or via startup drain.

Future work (carrying beyond v0.4)

  1. Cross-tenant isolation cost analysis at production multi-tenant scale. v0.4 sizes for personal install; production multi-tenant tuning requires observed steady-state lag/overwrite metrics.
  2. Multi-process cross-instance routing. When backend instances scale out, cross-instance routing becomes necessary. Either sticky-partitioned routing by tenant or pubsub-driven invalidation across instances. Out of scope for v0.4; MARCONI_CROSS_INSTANCE_FORWARD remains reserved.
  3. Durability-recovery algorithms for the “delivered but not recorded” set when Redis is down longer than the audit queue holds. Currently surfaced as an operator alert (marconi_audit_queue_overwrite_total) without an automatic recovery path.
  4. Per-call durable=true opt-in. Some callers may require synchronous-on-return durability (prism_postmortem, prism_decide, etc.). Already separate verbs, mostly already PG-direct through FastAPI. Per-call flag proposed; out of scope for v0.4.

References

  • SPEC-101 v0.4 — canonical spec
  • SPEC-101 loss budget + recovery — Stage 0 gate: loss budget, recovery invariants, rollback procedures
  • SPEC-100 — operator-invoked signal-mesh loopback probe (prism_channel_probe)
  • Marconi disaster-recovery runbook — operational steps for archiver lag, Redis outage, audit-queue overwrite incidents
  • ADR-56 — locks the MUST and the rename
  • Postmortem 1736e40d — drift root-cause that motivated the rewrite
  • Texi ratification chain — signals 37140b982e1d6a4e4302012d8620d2f5
  • Frank operator architecture lock (2026-05-10) and v0.4 GO (2026-05-10 23:30Z)
Last modified on May 18, 2026