Marconi
Marconi is the in-memory signal-mesh service that replaced signal_service.py’s synchronous Redis-and-PG hot path in May 2026. Sender shim → Marconi → receiver shim. That is the whole signal flow. Everything else — durable cache, audit archive, OTEL — sits downstream of Marconi as an asynchronous fan-out and never blocks delivery.
If you want to understand how to use the signal mesh, read Signal Mesh first. This page is the architectural deep-dive: what Marconi owns, what tables it keeps, why Redis came off the hot path, and where the stage ladder stands today.
The canonical specification is SPEC-101 v0.4. The loss budget and recovery invariants are in SPEC-101 loss budget + recovery. The operator-facing runbook is Marconi disaster recovery.
The architectural lock
The architecture is locked by two operator quotes (Frank, 2026-05-10):
“The signal flow is sender shim → Marconi → receiver shim. That is it. Marconi then pushes signal to Redis, then Redis pushes to PG. Redis is by no means in the signal flow.”
“Marconi switch in-mem → Marconi Cache 7d → PG audit. Once it hits cache it immediately goes to PG.”
Three tiers, two flows, zero Redis on the signal-delivery path, zero MCP-to-PG shortcuts. The v0.4 topology is final for single-instance backends; cross-instance routing remains reserved (MARCONI_CROSS_INSTANCE_FORWARD) for a future revision.
What Marconi replaces
The legacy paths used Redis pubsub as an in-process message bus:
- The sender thread did
PUBLISH session_channel
- The recipient’s WebSocket handler did
SUBSCRIBE session_channel
- A forwarding loop pumped messages from Redis pubsub onto the actual WebSocket
- A synchronous
INSERT into signal_queue ran on the hot path before the API returned 200
This worked correctly but added a Redis round-trip for every signal even when sender and receiver were in the same backend process — which they were in the single-process deployment. The synchronous PG write tied delivery latency to disk I/O, and any Redis blip cascaded into signal-delivery latency. The loopback diagnostic (SPEC-100, PR #276) and postmortem 1736e40d made the drift visible.
Marconi eliminates the pubsub layer entirely. The receiver’s WebSocket handler registers itself with Marconi’s routing table at connect time (storing the WS handle or a per-session asyncio.Queue alongside the SessionRecord). The sender’s prism_signal looks up the recipient in Marconi’s routing table and pushes directly to the WS handle. No Redis publish, no Redis subscribe, no forwarding loop, no synchronous PG write.
Module path: backend/app/marconi/. Log prefix: marconi:. Metric namespace: marconi_*. The MCP verb surface (prism_signal, prism_signal_ack, prism_signal_trace, prism_signals_pending, prism_signal_recall) is unchanged — Marconi sits behind a stable external API.
In-memory tables (Marconi-owned)
All tables are per-process, hash-keyed, and protected by asyncio.Lock.
Routing table
| Key | Value |
|---|
(tenant_id, project_id, identity) | LiveSession{ session_id, surface, machine_id, ws_handle, registered_at, last_heartbeat } |
Lookup is O(1). Updated on register, deregister, heartbeat, WS connect, WS disconnect. Read on every send_signal.
Registration table
| Key | Value |
|---|
session_id | Registration{ tenant_id, project_id, identity, surface, machine_id, registered_at, last_heartbeat, ws_handle } |
Sister index of routing; allows session-id-keyed lookups for heartbeat, deregister, and WS-attach.
WS handle storage
Each Marconi entry holds either the live WebSocket object directly OR a per-session asyncio.Queue that the WebSocket handler reads from. When send_signal resolves a recipient, it:
- Fetches the entry from the routing table.
- Pushes the signal envelope into the entry’s WS handle / queue.
- Returns 200 to the sender.
If the recipient is offline (no entry) or has an entry but no live WS attached, the signal is enqueued in the pending-signal index and outcome=queued_offline is recorded.
Obligation index
Per-tenant in-memory map of open obligations:
| Key | Value |
|---|
signal_id | Obligation{ kind, ack_sla_seconds, terminal_sla_seconds, created_at, ack_received_at, terminal_received_at, durability_status } |
In-memory primary; the Redis Stream handoff reflects durability_status. SLA sweepers operate on the in-memory map.
Pending-signal index
Per-recipient queue of undelivered signals + TTL. Drained on recipient registration / WS connect, and on explicit prism_signals_pending calls. Entries reference audit-queue rows by id.
In-memory audit queue
Internal Marconi buffer that feeds the audit fan-out. Not on the delivery path.
Append-only queue per tenant_id holding accepted signal envelopes between the moment send_signal returns 200 and the moment the Redis Stream writer successfully appends the entry to marconi:signals:{tenant_id}. Each entry is a full signal envelope plus arrival timestamp plus delivery outcome (pushed, queued_offline, no_subscriber, etc.).
Sizing — per-tenant config:
| Field | Meaning |
|---|
max_entries | Hard cap on queue size |
max_bytes | Hard cap on queue memory footprint |
max_age_seconds | Eject entries older than this regardless of size |
overwrite_policy | oldest (default), reject_new, bounded_back_pressure |
Sizing formula: max_entries ≥ burst_rate_per_sec × max_redis_writer_lag_seconds × safety_factor (≥2), bounded by max_bytes.
Defaults (personal install): max_entries=50_000, max_bytes=512MB, max_age_seconds=600, overwrite_policy=oldest. Production multi-tenant tuning requires observed lag/overwrite metrics for at least one steady-state day.
Marconi does not block on the Redis writer’s health. When the writer is unreachable, the audit queue grows; when the queue fills, the oldest entries are overwritten and counted as marconi_audit_queue_overwrite_total — the operator-visible loss event under v0.4’s loss budget.
Cache invalidation — direct write-through, no pubsub
All session-state changes happen inside the same Python process as Marconi’s tables. Invalidation is a direct function call. No Redis pubsub anywhere in Marconi’s surface.
The canonical hook table:
| Hook site | Marconi call |
|---|
controller_service.register (after SessionStore.register success) | invalidator.on_register(tenant_id, project_id, session_id) |
controller_service.deregister_session (after SessionStore.release success) | invalidator.on_deregister(tenant_id, project_id, session_id) |
controller_service.handoff_master (after master flag flip) | invalidator.on_master_change(tenant_id, project_id) |
controller_service.claim_master (after master flag flip) | invalidator.on_master_change(tenant_id, project_id) |
controller_service.stamp_heartbeat / stamp_engagement | invalidator.on_heartbeat(...) (cheap freshness update; does not full-invalidate) |
KeyspaceReaper (Redis TTL expiry) | invalidator.on_session_expired(tenant_id, project_id, session_id) |
WS connect (session_stream handler attach) | invalidator.on_ws_connect(session_id, ws_handle) |
| WS disconnect | invalidator.on_ws_disconnect(session_id) |
marconi/lifecycle.py provides warm_caches_from_store(store) called from the FastAPI lifespan after SessionStore connect. Every hook above has a unit test covering the cache state transition; marconi_invalidator_errors_total must be zero in steady state.
Audit fan-out
Three FastAPI in-process background tasks consume the audit queue downstream of the hot path:
- Redis Stream writer — drains the in-memory audit queue into
marconi:signals:{tenant_id} Redis Stream with MAXLEN ~7d (approximate trim by time). This stream is the Marconi Cache and the durable boundary for v0.4. Writer resumes from last-acknowledged audit-queue offset (held in Redis as a key for restart safety). Every successful XADD wakes the PG archiver’s blocking consumer-group read.
- PG archiver — consumes the Marconi Cache as a consumer group with
XREADGROUP COUNT 1 BLOCK 0. Each stream entry drains near-immediately into PG audit with idempotent signal_id UPSERT semantics against signal_queue, signal_trace_events, and signal_obligations. No batching. The archiver optimizes for near-immediate audit persistence, not throughput batching.
- OTEL emitter — increments
marconi_signals_received_total, marconi_signals_delivered_total, etc. Background; never blocks.
The Neo4j edge builder follows the same pattern (consumer reads from the Redis Stream or directly from the audit queue if co-located, builds edges asynchronously).
The audit pipeline operates under the MCP boundary rule: MCP shim → FastAPI → store. Both audit workers are FastAPI in-process background tasks, never MCP shortcuts. No sidecar script or host-local process may write the Marconi PG audit tables directly.
MCP shim ──▶ FastAPI API ──▶ Marconi switch ──▶ Marconi Cache ──▶ PG audit
Stage ladder — current shipped status
The migration landed as a sequence of feature-flagged PRs. Each stage cleared exit criteria before the next stage’s flag flipped.
Flag flips, in shipping order:
| Flag | Default | Flipped on | PR |
|---|
MARCONI_ROUTING_TABLE_READS | OFF | Stage 1 exit gate | #280 |
MARCONI_REGISTRATION_TABLE_READS | OFF | Stage 1 exit gate | #280 |
MARCONI_AUDIT_QUEUE_WRITE | OFF | Stage 3 ship | #284 |
MARCONI_REDIS_STREAM_WRITER | OFF | Stage 3 ship | #284 / #297 |
MARCONI_PG_ARCHIVER_SHADOW | OFF | Stage 4 ship | #297 |
MARCONI_PG_ARCHIVER_PRIMARY | OFF | Stage 4 → primary | #299 |
MARCONI_HOT_PATH_SEND | OFF | Stage 5 cutover | #283 / #284 |
MARCONI_OBLIGATIONS_MEMORY_PRIMARY | OFF | Stage 5 sibling | #284 |
Stage 5 could not ship until Stage 4 archiver was primary — otherwise accepted signals would be delivered but not recorded. The dead-branch cleanup in PR #301 removed AUDIT_PENDING_MARCONI and ACK_DEFERRED_MARCONI paths that the cutover obsoleted. PR #300 normalized the publish_path value from internal pushed_to_marconi to external pushed_to_ws at the archiver boundary.
Additional shipping work:
- Sign-coded result envelope (PR #285, #286) —
[stage=…] banner the shim renders alongside the doorbell to make the delivery stage observable without a separate trace call.
- Channel-probe verbs (PR #287, #290) —
prism_channel_probe and prism_channel_probe_ack for operator-invoked end-to-end loopback diagnostics. Default ACK timeout bumped 5s → 10s in PR #290 to accommodate per-persona daemon turn-boundary holds.
Counters — full §9 catalog
Every metric below must exist; the Stage 5 hot-path cutover required them in place before the flag flip.
Hot-path counters (per tenant)
marconi_signals_received_total{tenant,signal_type} — at API entry
marconi_signals_delivered_total{tenant,signal_type,outcome} — outcome ∈ pushed, queued_offline, no_subscriber, dropped
marconi_signals_acked_total{tenant,signal_type,ack_kind} — final-delivery evidence
marconi_signal_send_duration_seconds{tenant} — histogram, p50/p95/p99
marconi_routing_table_size{tenant} — gauge
marconi_routing_cache_hits_total{tenant} / marconi_routing_cache_misses_total{tenant}
Audit queue + fan-out counters
marconi_audit_queue_depth{tenant} — gauge
marconi_audit_queue_overwrite_total{tenant} — counter (loss event; non-zero is a paging incident)
marconi_redis_writer_lag_seconds{tenant} — gauge
marconi_redis_writer_errors_total{tenant,reason} — counter
marconi_pg_archiver_lag_seconds{tenant} — gauge
marconi_pg_archiver_errors_total{tenant,reason} — counter
Cache-invalidator counters (Stage 2)
marconi_invalidator_calls_total{hook} — counter (hook ∈ on_register, on_deregister, on_master_change, on_heartbeat, on_session_expired, on_ws_connect, on_ws_disconnect)
marconi_invalidator_errors_total{hook,reason} — counter (non-fatal hook errors; non-zero in steady state is a paging incident)
Obligation counters
marconi_obligations_open{tenant,kind} — gauge
marconi_obligations_durable_total{tenant,kind} — counter
marconi_obligations_degraded_not_durable_total{tenant,kind} — counter
marconi_obligation_sla_violation_total{tenant,kind,sla} — counter
The OTEL → Prometheus bridge exposes the full marconi_* namespace via /metrics; see Metrics for the full Prism counter catalog and dashboard wiring.
Failure modes (summary)
Full table in the SPEC-101 loss budget. The short version: delivery is the contract; durability is best-effort with a bounded, observable loss surface.
| Scenario | Loss |
|---|
| Backend graceful restart (SIGTERM) | 0 — audit queue flushed during shutdown |
| Backend hard kill (OOM / SIGKILL / host crash) | Up to audit-queue depth that hadn’t reached Redis yet |
| Redis transient down | 0 — audit queue buffers; writer resumes from offset |
| Redis down longer than queue holds | Audit-queue overwrites; operator-visible (marconi_audit_queue_overwrite_total > 0) |
| PG down | 0 — Redis Stream is the upstream cache; archiver resumes from consumer-group checkpoint |
| Recipient WS disconnect mid-delivery | 0 — replay-eligible until ACK evidence |
Cold-start window after process restart: ~1–2 seconds where new signals route to outcome=queued_offline while shims reconnect. Acceptable for restart cadence; routed signals land in the audit queue and reach the recipient on the next push or via startup drain.
Future work (carrying beyond v0.4)
- Cross-tenant isolation cost analysis at production multi-tenant scale. v0.4 sizes for personal install; production multi-tenant tuning requires observed steady-state lag/overwrite metrics.
- Multi-process cross-instance routing. When backend instances scale out, cross-instance routing becomes necessary. Either sticky-partitioned routing by tenant or pubsub-driven invalidation across instances. Out of scope for v0.4;
MARCONI_CROSS_INSTANCE_FORWARD remains reserved.
- Durability-recovery algorithms for the “delivered but not recorded” set when Redis is down longer than the audit queue holds. Currently surfaced as an operator alert (
marconi_audit_queue_overwrite_total) without an automatic recovery path.
- Per-call
durable=true opt-in. Some callers may require synchronous-on-return durability (prism_postmortem, prism_decide, etc.). Already separate verbs, mostly already PG-direct through FastAPI. Per-call flag proposed; out of scope for v0.4.
References
- SPEC-101 v0.4 — canonical spec
- SPEC-101 loss budget + recovery — Stage 0 gate: loss budget, recovery invariants, rollback procedures
- SPEC-100 — operator-invoked signal-mesh loopback probe (
prism_channel_probe)
- Marconi disaster-recovery runbook — operational steps for archiver lag, Redis outage, audit-queue overwrite incidents
- ADR-56 — locks the MUST and the rename
- Postmortem
1736e40d — drift root-cause that motivated the rewrite
- Texi ratification chain — signals
37140b98 → 2e1d6a4e → 4302012d → 8620d2f5
- Frank operator architecture lock (2026-05-10) and v0.4 GO (2026-05-10 23:30Z)
Last modified on May 18, 2026