SPEC-101 v0.4 — Marconi: in-memory signal mesh
Operator architecture lock (Frank, 2026-05-10): “The signal flow is sender shim → Marconi → receiver shim. That is it. Marconi then pushes signal to Redis, then Redis pushes to PG. Redis is by no means in the signal flow.” “Marconi switch in-mem → Marconi Cache 7d → PG audit.” “Once it hits cache it immediately goes to PG.” Redis is the 7-day rolling audit cache and the PG staging buffer. It is not on the signal-delivery path.
0. Glossary
- Marconi — the in-memory signal-mesh service (replacing
signal_service.py’s synchronous-PG hot path). Owns the routing table, the registration table, and the live WebSocket handles for every connected shim. Sender→receiver signal delivery is a Python function call from inside Marconi’s process. - Hot path — the synchronous request-response cycle from
prism_signalAPI entry to200 OK. Marconi’s hot path is in-memory only. - Marconi switch — the in-memory routing/delivery tier. Owns direct sender→receiver delivery.
- Marconi Cache — Redis Stream
marconi:signals:{tenant_id}withMAXLEN ~7d. Rolling audit cache and PG staging buffer. - PG audit — long-term signal/trace audit store in Postgres, written only by the FastAPI-side PG archiver.
- Audit fan-out — the asynchronous pipeline from Marconi switch → Marconi Cache → PG audit. Off the hot path; never blocks delivery.
- Durable verb — a verb whose API contract guarantees “200 = data is in PG before the response.” Examples today:
prism_postmortem,prism_decide,prism_journal create. NOT signal mesh; covered by separate per-calldurable=trueopt-in (out of scope for v0.4).
0.1 Naming reconciliation
This spec uses signal envelope as the logical record name. The physical PG tables use the live schema names:| Logical record | Physical table |
|---|---|
| Signal envelope | signal_queue |
| Signal trace event | signal_trace_events |
| Signal obligation | signal_obligations |
signal_queue_shadow, signal_trace_events_shadow, and signal_obligations_shadow. Future implementation PRs MUST use these physical names in migrations, harnesses, and code comments where table names matter.
0.2 Three-tier flow lock
The SPEC-101 v0.4 topology is locked:- Marconi switch (in-memory) — signal delivery and routing.
- Marconi Cache (Redis Stream
marconi:signals:{tenant_id},MAXLEN ~7d) — rolling audit cache and PG staging buffer. - PG audit — long-term audit, recall, reporting, and analysis.
1. Architecture (the locked picture)
2. Hot path
2.1 The contract
< 5ms for prism_signal end-to-end on the same-instance hot path.
2.2 Why Redis is not on the signal flow
The legacysignal_service.py + session_stream.py paths use Redis pubsub as an in-process message bus: the sender thread does PUBLISH session_channel, the recipient’s WebSocket handler does SUBSCRIBE session_channel, and a forwarding loop pumps messages from Redis pubsub onto the actual WebSocket. This works correctly but adds a round-trip through Redis for every signal even when sender and receiver are in the same backend process — which they are in the current single-process deployment.
Marconi eliminates this layer. The receiver’s WebSocket handler registers itself with Marconi’s routing table at connect time (storing the WS handle or a per-session asyncio.Queue alongside the SessionRecord). The sender’s prism_signal looks up the recipient in Marconi’s routing table and pushes directly to the WS handle. No Redis publish, no Redis subscribe, no forwarding loop.
2.3 Cross-instance hot path (post-v0.4 territory, separately flagged)
When backend instances scale out (multi-process behind a load balancer), cross-instance routing becomes necessary. Either sticky-partitioned routing by tenant or pubsub-driven invalidation across instances. Out of scope for v0.4;MARCONI_CROSS_INSTANCE_FORWARD remains reserved for a later revision.
2.4 MCP boundary: no direct PG writes
The MCP shim remains thin per SPEC-044 and the Ring-2 architecture boundary: MCP calls HTTP into FastAPI; FastAPI owns business logic and store access. No MCP call writes PG directly. Marconi audit persistence follows the same boundary:- Redis Stream writer: in-memory audit queue → Marconi Cache.
- PG archiver: Marconi Cache → PG audit.
3. In-memory tables (Marconi-owned)
All tables are per-process, hash-keyed, async-Lock-guarded.3.1 Routing table
| Key | Value |
|---|---|
(tenant_id, project_id, identity) | LiveSession{ session_id, surface, machine_id, ws_handle, registered_at, last_heartbeat } |
register, deregister, heartbeat, WS connect, WS disconnect. Read on every send_signal.
3.2 Registration table
| Key | Value |
|---|---|
session_id | Registration{ tenant_id, project_id, identity, surface, machine_id, registered_at, last_heartbeat, ws_handle } |
3.3 WS handle storage
Each Marconi entry holds either the liveWebSocket object directly OR a per-session asyncio.Queue that the WebSocket handler reads from. When send_signal resolves a recipient, it:
- Fetches the entry.
- Pushes the signal envelope into the entry’s WS handle / queue.
- Returns 200 to the sender.
outcome=queued_offline is recorded.
3.4 Obligation index
Per-tenant in-memory map of open obligations:| Key | Value |
|---|---|
signal_id | Obligation{ kind, ack_sla_seconds, terminal_sla_seconds, created_at, ack_received_at, terminal_received_at, durability_status } |
3.5 Pending-signal index (per recipient)
For offline-recipient delivery: per-recipient queue of undelivered signals + TTL. Drained on recipient registration / WS connect, and on explicitprism_signals_pending calls. Backed by audit-queue entries (§3.6) by reference.
3.6 In-memory audit queue (per tenant)
Internal Marconi buffer that feeds the audit fan-out (§4). Not on the delivery path. Append-only queue pertenant_id holding accepted signal envelopes between the moment send_signal returns 200 to the caller and the moment the Redis Stream writer (§4.1) successfully appends the entry to marconi:signals:{tenant_id}. Each entry is a full signal envelope + arrival timestamp + delivery outcome (pushed, queued_offline, no_subscriber, etc.).
Sizing — per-tenant config:
| Field | Meaning |
|---|---|
max_entries | Hard cap on queue size |
max_bytes | Hard cap on queue memory footprint |
max_age_seconds | Eject entries older than this regardless of size |
overwrite_policy | oldest (default), reject_new, bounded_back_pressure |
max_entries ≥ burst_rate_per_sec × max_redis_writer_lag_seconds × safety_factor (≥2), bounded by max_bytes.
Defaults (personal install): max_entries=50_000, max_bytes=512MB, max_age_seconds=600, overwrite_policy=oldest. Production multi-tenant tuning requires observed lag/overwrite metrics for at least one steady-state day.
Marconi does not block on the Redis writer’s health. When the writer is unreachable, the audit queue grows; when the queue fills, the oldest entries are overwritten and counted as marconi_audit_queue_overwrite_total — the operator-visible loss event under SPEC-101 v0.4 §1 loss-budget.
This is a Stage 3 deliverable; the queue does not exist in Stage 1 / Stage 2. It is named here so the §4 and §6 references resolve to a concrete in-memory structure rather than an undefined “ring.”
4. Audit fan-out (FastAPI background pipelines)
Each pipeline is a FastAPI in-process background task with its own failure mode. This preserves the SPEC-044 boundary: MCP shim → FastAPI → store. Marconi continues serving the hot path regardless of any pipeline’s health.4.1 Redis Stream writer
Consumes the in-memory audit queue (§3.6) and appends each entry tomarconi:signals:{tenant_id} Redis Stream with MAXLEN ~7d (approximate trim by time). This stream is the Marconi Cache and the durable boundary for v0.4.
- Failure: writer falls behind, audit-queue depth grows. Operator alerts on
marconi_redis_writer_lag_secondsandmarconi_audit_queue_depth. - Restart: writer resumes from last-acknowledged audit-queue offset (held in Redis as a key for restart safety).
- Handoff: every successful
XADDwakes the PG archiver’s blocking consumer group read; “once it hits cache it immediately goes to PG.”
4.2 PG archiver
Consumes the Marconi Cache Redis Stream as a consumer group withXREADGROUP COUNT 1 BLOCK 0. Each stream entry is drained near-immediately into PG audit with idempotent signal_id UPSERT semantics against signal_queue. Same pattern for signal_trace_events and signal_obligations.
No batching. The v0.3 placeholder “batches writes (e.g., 100 signals per INSERT)” is explicitly replaced. The archiver optimizes for near-immediate audit persistence, not throughput batching. If future production scale needs batching, that requires a new SPEC amendment and must not weaken the “cache immediately goes to PG” operator contract.
- Failure: archiver falls behind, Redis Stream grows; alert at
marconi_pg_archiver_lag_seconds > N. Signals still flow. - Recoverability: Redis Stream is the upstream. If sustained archiver lag exceeds Redis
MAXLEN ~7d, the trimmed window is the audit gap (operator-visible, alerted).
4.3 OTEL emitter
Per-signal: incrementsmarconi_signals_received_total, marconi_signals_delivered_total, etc. Background; never blocks.
4.4 Neo4j edge builder
Same pattern: consumer reads from the Redis Stream (or directly from the audit queue if co-located), builds edges asynchronously.5. Cache invalidation
No Redis pubsub. All session-state changes happen inside the same Python process as Marconi’s tables. Invalidation is a direct function call:| Source of change | Hook |
|---|---|
controller_service.register | After SessionStore write success → marconi.routing.put(...) and marconi.registration.put(...) |
controller_service.deregister_session | marconi.routing.invalidate_project(...), marconi.registration.invalidate_session(...) |
controller_service.handoff_master / claim_master | marconi.routing.invalidate_project(...) |
controller_service.stamp_heartbeat / stamp_engagement | marconi.routing.update_freshness(...) (cheap field update; does not invalidate the entire entry) |
KeyspaceReaper (Redis TTL expiry) | On expired session key → marconi.routing.invalidate_project(...) + marconi.registration.invalidate_session(...) |
| WS connect | marconi.routing.attach_ws_handle(...) |
| WS disconnect | marconi.routing.detach_ws_handle(...) |
6. Process-restart recovery
Cold start: in-memory tables empty until rehydrated.- Backend starts → Marconi initializes empty tables.
- Rehydrate routing + registration from SessionStore (Redis) — sub-second (Redis remains the cross-restart durability anchor for active sessions).
- Backend accepts WS connections; shims reconnect, re-register (idempotent), and Marconi attaches WS handles.
- Window: ~1-2s where new signals route to “no subscriber, queued offline.” Acceptable for restart cadence.
7. Durability opt-in (separate SPEC)
Some callers may require synchronous-on-return durability —prism_postmortem, prism_decide, etc. Already separate verbs, mostly already PG-direct through FastAPI. Per-call durable=true flag proposed; out of scope for v0.4.
8. Migration path (v0.4 scope)
Each implementation stage lands as its own PR with fine-grain feature flags. Old code path remains operational; flags flip traffic. Rollback per flag.Stage 0 — Design gate (DONE; merged in PR #279, updated by this v0.4 rev)
Loss budget, recovery invariants, runbook, test stubs all in place. Frank loss-budget signoff captured. v0.3 simplifies further (R5/R6/R8 from §2 sub-doc remain; R4/R7 already removed in v0.2; pubsub-related rollback wording cleaned up).Stage 1 — Process-local routing + registration cache (DONE; PR #280)
backend/app/marconi/{_cache,routing,registration}.py + flag-gated wiring. Default OFF.
Stage 2 — Cache lifecycle: direct write-through hooks + startup warmup
Wire direct invalidation hooks at every site that mutates session state. No pubsub subscriber anywhere. Required beforeMARCONI_ROUTING_TABLE_READS / MARCONI_REGISTRATION_TABLE_READS flips ON.
Hook list (canonical, must all exist):
| Hook site | Marconi call |
|---|---|
controller_service.register (after SessionStore.register success) | invalidator.on_register(tenant_id, project_id, session_id) |
controller_service.deregister_session (after SessionStore.release success) | invalidator.on_deregister(tenant_id, project_id, session_id) |
controller_service.handoff_master (after master flag flip) | invalidator.on_master_change(tenant_id, project_id) |
controller_service.claim_master (after master flag flip) | invalidator.on_master_change(tenant_id, project_id) |
controller_service.stamp_heartbeat / stamp_engagement | invalidator.on_heartbeat(tenant_id, project_id, session_id) (cheap freshness update; does not full-invalidate) |
KeyspaceReaper (Redis TTL expiry) | invalidator.on_session_expired(tenant_id, project_id, session_id) |
WS connect (session_stream handler attach) | invalidator.on_ws_connect(session_id, ws_handle) |
| WS disconnect | invalidator.on_ws_disconnect(session_id) |
marconi/lifecycle.py provides warm_caches_from_store(store) called from FastAPI lifespan after SessionStore connect.
Test gates required before either flag flips ON:
- Every hook above has a unit test covering the cache state transition.
- Startup-warmup correctness test: post-warmup, the next routing read for an active session is a cache hit.
- Stale-route bound test: under continuous register/deregister churn, the maximum staleness window observed is < hook-latency + cache-TTL backstop.
marconi_invalidator_errors_total is zero in steady state.
Stage 3 — In-memory audit queue + Redis Stream writer (audit fan-out lands BEFORE the hot-path cutover)
Add the Marconi audit queue (§3.6) and the Redis Stream writer (§4.1) consuming it. Marconi appends every accepted signal envelope to the audit queue synchronously insidesend_signal; the writer drains async to marconi:signals:{tenant} Redis Stream.
Critically: Stage 3 lands the audit pipeline while the legacy signal_service.py synchronous PG write is still active. Both paths produce records in parallel. This is intentional — the hot-path cutover (Stage 5) cannot remove the legacy PG write until the new audit pipeline is proven downstream-equivalent.
Flag MARCONI_AUDIT_QUEUE_WRITE (audit queue active), flag MARCONI_REDIS_STREAM_WRITER (writer active).
Exit criteria: every accepted signal lands in marconi:signals:{tenant} via the audit queue; writer lag stable; marconi_audit_queue_overwrite_total is zero in steady state.
Stage 4 — PG archiver (Marconi Cache → PG audit)
Consumer group on Marconi Cache Redis Stream withXREADGROUP COUNT 1 BLOCK 0; idempotent UPSERT into the existing signal_queue table plus companion writes to signal_trace_events and signal_obligations where the stream entry carries those records. Runs in shadow mode first — writes to parallel signal_queue_shadow, signal_trace_events_shadow, and signal_obligations_shadow tables for byte-for-byte comparison against the legacy synchronous PG path. Once parallel correctness is proven, flip to primary and the legacy PG write becomes redundant.
The archiver is a FastAPI in-process background task, not an MCP-side worker. Every cache write should trigger a near-immediate PG write; the expected steady-state lag is effectively one stream entry.
Flags: MARCONI_PG_ARCHIVER_SHADOW, MARCONI_PG_ARCHIVER_PRIMARY.
Exit criteria: shadow archiver matches direct-write 100% over 24h; primary archiver runs cleanly with idempotent UPSERTs; marconi_pg_archiver_lag_seconds remains stable and near-zero under normal load; ready for Stage 5 cutover.
Stage 5 — Hot-path inversion (status: shipped path)
The two changes that flip the architecture from legacy to Marconi:- WS delivery direct-push. Replace the
signal_service.publish_targeted_event→ Redis pubsub →session_stream._forward_pubsubchain with:signal_service.send_signallooks up the recipient’s WS handle in Marconi’s routing table and pushes directly. WS handler attaches/detaches via the §3.3 attach hooks (already in Stage 2) and no longer subscribes to Redis pubsub when Marconi-attached. - Synchronous PG write removed. With Stage 4 archiver primary, the legacy synchronous PG INSERT in
send_signalis dead code; remove it.
MARCONI_HOT_PATH_SEND, MARCONI_OBLIGATIONS_MEMORY_PRIMARY.
Exit criteria: p99 send latency drops to target (<5ms); zero Redis-pubsub publishes on the signal hot path; WS handler does not subscribe to Redis pubsub when Marconi-attached; legacy synchronous PG INSERT removed from send_signal.
Stage 5 cannot ship until Stage 4 archiver is primary — otherwise accepted signals would be delivered but not recorded.
Pre-stamped shipped path: once a code PR demonstrates all Stage 5 exit criteria with MARCONI_HOT_PATH_SEND=true, MARCONI_PG_ARCHIVER_PRIMARY=true, and no synchronous PG write in send_signal, SPEC-101 Stage 5 may be marked status: shipped without another architecture revision. The implementation PR must cite this v0.4 clause and include the validation output.
Fine-grain feature flags
MARCONI_ROUTING_TABLE_READS— read routing from process-local cache vs SessionStore fallback (Stage 1, default OFF)MARCONI_REGISTRATION_TABLE_READS— sister flag for registration (Stage 1, default OFF)MARCONI_AUDIT_QUEUE_WRITE— Marconi appends accepted signals to in-memory audit queue (Stage 3; default OFF)MARCONI_REDIS_STREAM_WRITER— Marconi async-drains the audit queue to Redis Stream (Stage 3; default OFF)MARCONI_PG_ARCHIVER_SHADOW— PG archiver runs in shadow mode for byte-comparison (Stage 4; default OFF)MARCONI_PG_ARCHIVER_PRIMARY— PG archiver is the only writer to canonical PG signal tables (Stage 4; default OFF)MARCONI_HOT_PATH_SEND—prism_signalAPI uses direct WS push from Marconi’s table (Stage 5 cutover; default OFF)MARCONI_OBLIGATIONS_MEMORY_PRIMARY— obligations operate on in-memory index (Stage 5 sibling; default OFF)MARCONI_VERB_<name>— per-verb migration flag (Stage 5; default OFF)MARCONI_CROSS_INSTANCE_FORWARD— multi-process cross-instance routing (post-v0.4; default OFF)
9. Counters / observability
Every metric below MUST exist before Stage 5 hot-path cutover.Hot-path counters (per tenant)
marconi_signals_received_total{tenant,signal_type}— at API entrymarconi_signals_delivered_total{tenant,signal_type,outcome}— outcome ∈marconi_signals_acked_total{tenant,signal_type,ack_kind}— final-delivery evidencemarconi_signal_send_duration_seconds{tenant}— histogram, p50/p95/p99marconi_routing_table_size{tenant}— gaugemarconi_routing_cache_hits_total{tenant}/marconi_routing_cache_misses_total{tenant}
Audit queue + fanout counters
marconi_audit_queue_depth{tenant}— gaugemarconi_audit_queue_overwrite_total{tenant}— counter (loss event)marconi_redis_writer_lag_seconds{tenant}— gaugemarconi_redis_writer_errors_total{tenant,reason}— countermarconi_pg_archiver_lag_seconds{tenant}— gaugemarconi_pg_archiver_errors_total{tenant,reason}— counter
Stage 2 invalidator counters
marconi_invalidator_calls_total{hook}— counter (hook ∈ on_register, on_deregister, on_master_change, on_heartbeat, on_session_expired, on_ws_connect, on_ws_disconnect)marconi_invalidator_errors_total{hook,reason}— counter (non-fatal hook errors; non-zero in steady state is a paging incident)
Obligation counters
marconi_obligations_open{tenant,kind}— gaugemarconi_obligations_durable_total{tenant,kind}— countermarconi_obligations_degraded_not_durable_total{tenant,kind}— countermarconi_obligation_sla_violation_total{tenant,kind,sla}— counter
10. Naming propagation
- Module/package:
backend/app/marconi/(replacingbackend/app/services/signal_service.py) - Logs:
marconi:prefix on all log lines from the service - Metrics:
marconi_*namespace - Documentation: SPECs/ADRs/postmortems refer to “Marconi” as a proper noun
- MCP verbs: unchanged — keep
prism_signal_*external API contract stable - Operator-facing surfaces (
prism_status, dashboard widgets): refer to “Marconi”
11. Open questions
Resolved in v0.1.x → v0.2 → v0.3
Ring buffer sizing,obligation persistence,session disambiguation(v0.1.1)Local durable spool(v0.2 operator scope reduction)Stage 1 PG-fallback wording(v0.2.2 alignment)Pubsub-driven cache invalidation(v0.3 — direct write-through hooks; pubsub fully out)
Carrying beyond v0.4
- Cross-tenant isolation cost analysis at production multi-tenant scale.
- Multi-process cross-instance routing — sticky-partition or pubsub-invalidation; pick during a later revision.
- Durability-recovery algorithms for the “delivered but not recorded” set when Redis is down longer than the audit queue holds.
12. References
- ADR-56 — locks the MUST and the rename
- SPEC-100 v0.1 — loopback diagnostic; the PR (#276) where the original drift surfaced
- Postmortem
1736e40d— drift root-cause - Texi ratification chain — signals
37140b98→2e1d6a4e→4302012d→8620d2f5 - Frank operator architecture lock (2026-05-10): “the signal flow is sender shim → Marconi → receiver shim. That is it. Marconi then pushes signal to Redis, then Redis pushes to PG. Redis is by no means in the signal flow.”
- Frank operator v0.4 GO (2026-05-10 23:30Z): “Marconi switch in-mem → Marconi Cache 7d → PG audit”; “once it hits cache it immediately goes to PG.”
- SPEC-044 — MCP boundary: editor-host MCP shim remains thin; FastAPI owns backend store writes.
- Memory:
feedback_async_cancellation_pool_leak,feedback_optimize_later,feedback_eliminate_failures_improve_perf

