Skip to main content

SPEC-101 v0.4 — Marconi: in-memory signal mesh

Operator architecture lock (Frank, 2026-05-10): “The signal flow is sender shim → Marconi → receiver shim. That is it. Marconi then pushes signal to Redis, then Redis pushes to PG. Redis is by no means in the signal flow.” “Marconi switch in-mem → Marconi Cache 7d → PG audit.” “Once it hits cache it immediately goes to PG.” Redis is the 7-day rolling audit cache and the PG staging buffer. It is not on the signal-delivery path.

0. Glossary

  • Marconi — the in-memory signal-mesh service (replacing signal_service.py’s synchronous-PG hot path). Owns the routing table, the registration table, and the live WebSocket handles for every connected shim. Sender→receiver signal delivery is a Python function call from inside Marconi’s process.
  • Hot path — the synchronous request-response cycle from prism_signal API entry to 200 OK. Marconi’s hot path is in-memory only.
  • Marconi switch — the in-memory routing/delivery tier. Owns direct sender→receiver delivery.
  • Marconi Cache — Redis Stream marconi:signals:{tenant_id} with MAXLEN ~7d. Rolling audit cache and PG staging buffer.
  • PG audit — long-term signal/trace audit store in Postgres, written only by the FastAPI-side PG archiver.
  • Audit fan-out — the asynchronous pipeline from Marconi switch → Marconi Cache → PG audit. Off the hot path; never blocks delivery.
  • Durable verb — a verb whose API contract guarantees “200 = data is in PG before the response.” Examples today: prism_postmortem, prism_decide, prism_journal create. NOT signal mesh; covered by separate per-call durable=true opt-in (out of scope for v0.4).

0.1 Naming reconciliation

This spec uses signal envelope as the logical record name. The physical PG tables use the live schema names:
Logical recordPhysical table
Signal envelopesignal_queue
Signal trace eventsignal_trace_events
Signal obligationsignal_obligations
Stage 4 shadow tables mirror the live names: signal_queue_shadow, signal_trace_events_shadow, and signal_obligations_shadow. Future implementation PRs MUST use these physical names in migrations, harnesses, and code comments where table names matter.

0.2 Three-tier flow lock

The SPEC-101 v0.4 topology is locked:
  1. Marconi switch (in-memory) — signal delivery and routing.
  2. Marconi Cache (Redis Stream marconi:signals:{tenant_id}, MAXLEN ~7d) — rolling audit cache and PG staging buffer.
  3. PG audit — long-term audit, recall, reporting, and analysis.
There is no fourth tier and no bypass. Redis is never in the signal-delivery flow; PG is never written from the MCP shim.

1. Architecture (the locked picture)

┌──────────────┐                                         ┌──────────────┐
│ Sender shim  │ ─── prism_signal ───────────────────▶  │  Marconi     │
└──────────────┘                                         │ (in-memory)  │
                                                          │              │
                                                          │  routing:    │
                                                          │  identity →  │
                                                          │  WS handle   │
                                                          └──────┬───────┘

                                                  ┌──────────────┼──────────────┐
                                                  │ (sync, on hot path)         │
                                                  ▼                             │
                                         ┌──────────────────┐                   │
                                         │ Receiver shim WS │                   │
                                         │ (direct push)    │                   │
                                         └──────────────────┘                   │

                                                                  (async, off hot path)


                                                          ┌──────────────────────┐
                                                          │ Marconi Cache        │
                                                          │ Redis Stream         │
                                                          │ marconi:signals:{t}  │
                                                          │ MAXLEN ~7d           │
                                                          └──────────┬───────────┘

                                                                     │ (near-immediate
                                                                     │  XREADGROUP COUNT 1)

                                                          ┌──────────────────────┐
                                                          │ PG audit             │
                                                          │ (long-term audit,    │
                                                          │  recall, reporting)  │
                                                          └──────────────────────┘
Three tiers. Two flows. Zero Redis on the signal flow. Zero MCP-to-PG shortcuts.

2. Hot path

2.1 The contract

[Sender]                    [Marconi: API]                 [Receiver]
   │                              │                            │
   │ POST /signal ──────────────▶ │                            │
   │                              │ in-memory routing lookup   │
   │                              │ in-memory WS handle        │
   │                              │ ─── direct WS push ───────▶│
   │ ◀─── 200 ────────────────── │                            │
MUST: zero synchronous PG, Redis, Neo4j, or OTEL call on this path. Marconi’s in-memory state IS the live source of truth. Audit fan-out happens after the 200 returns. Latency target: p99 < 5ms for prism_signal end-to-end on the same-instance hot path.

2.2 Why Redis is not on the signal flow

The legacy signal_service.py + session_stream.py paths use Redis pubsub as an in-process message bus: the sender thread does PUBLISH session_channel, the recipient’s WebSocket handler does SUBSCRIBE session_channel, and a forwarding loop pumps messages from Redis pubsub onto the actual WebSocket. This works correctly but adds a round-trip through Redis for every signal even when sender and receiver are in the same backend process — which they are in the current single-process deployment. Marconi eliminates this layer. The receiver’s WebSocket handler registers itself with Marconi’s routing table at connect time (storing the WS handle or a per-session asyncio.Queue alongside the SessionRecord). The sender’s prism_signal looks up the recipient in Marconi’s routing table and pushes directly to the WS handle. No Redis publish, no Redis subscribe, no forwarding loop.

2.3 Cross-instance hot path (post-v0.4 territory, separately flagged)

When backend instances scale out (multi-process behind a load balancer), cross-instance routing becomes necessary. Either sticky-partitioned routing by tenant or pubsub-driven invalidation across instances. Out of scope for v0.4; MARCONI_CROSS_INSTANCE_FORWARD remains reserved for a later revision.

2.4 MCP boundary: no direct PG writes

The MCP shim remains thin per SPEC-044 and the Ring-2 architecture boundary: MCP calls HTTP into FastAPI; FastAPI owns business logic and store access. No MCP call writes PG directly. Marconi audit persistence follows the same boundary:
MCP shim ──▶ FastAPI API ──▶ Marconi switch ──▶ Marconi Cache ──▶ PG audit
Both audit workers are FastAPI in-process background tasks:
  • Redis Stream writer: in-memory audit queue → Marconi Cache.
  • PG archiver: Marconi Cache → PG audit.
No MCP shortcut, sidecar script, or host-local process may write the Marconi PG audit tables directly.

3. In-memory tables (Marconi-owned)

All tables are per-process, hash-keyed, async-Lock-guarded.

3.1 Routing table

KeyValue
(tenant_id, project_id, identity)LiveSession{ session_id, surface, machine_id, ws_handle, registered_at, last_heartbeat }
Lookup is O(1). Updated on register, deregister, heartbeat, WS connect, WS disconnect. Read on every send_signal.

3.2 Registration table

KeyValue
session_idRegistration{ tenant_id, project_id, identity, surface, machine_id, registered_at, last_heartbeat, ws_handle }
Sister index of routing; allows session-id-keyed lookups for heartbeat, deregister, and WS-attach.

3.3 WS handle storage

Each Marconi entry holds either the live WebSocket object directly OR a per-session asyncio.Queue that the WebSocket handler reads from. When send_signal resolves a recipient, it:
  1. Fetches the entry.
  2. Pushes the signal envelope into the entry’s WS handle / queue.
  3. Returns 200 to the sender.
If the recipient is offline (no entry) or has no live WS attached (entry but no handle), the signal is enqueued in the pending-signal index (§3.5) and outcome=queued_offline is recorded.

3.4 Obligation index

Per-tenant in-memory map of open obligations:
KeyValue
signal_idObligation{ kind, ack_sla_seconds, terminal_sla_seconds, created_at, ack_received_at, terminal_received_at, durability_status }
In-memory primary; Redis Stream handoff (§4.1) reflects durability_status. SLA sweepers operate on the in-memory map.

3.5 Pending-signal index (per recipient)

For offline-recipient delivery: per-recipient queue of undelivered signals + TTL. Drained on recipient registration / WS connect, and on explicit prism_signals_pending calls. Backed by audit-queue entries (§3.6) by reference.

3.6 In-memory audit queue (per tenant)

Internal Marconi buffer that feeds the audit fan-out (§4). Not on the delivery path. Append-only queue per tenant_id holding accepted signal envelopes between the moment send_signal returns 200 to the caller and the moment the Redis Stream writer (§4.1) successfully appends the entry to marconi:signals:{tenant_id}. Each entry is a full signal envelope + arrival timestamp + delivery outcome (pushed, queued_offline, no_subscriber, etc.). Sizing — per-tenant config:
FieldMeaning
max_entriesHard cap on queue size
max_bytesHard cap on queue memory footprint
max_age_secondsEject entries older than this regardless of size
overwrite_policyoldest (default), reject_new, bounded_back_pressure
Sizing formula: max_entries ≥ burst_rate_per_sec × max_redis_writer_lag_seconds × safety_factor (≥2), bounded by max_bytes. Defaults (personal install): max_entries=50_000, max_bytes=512MB, max_age_seconds=600, overwrite_policy=oldest. Production multi-tenant tuning requires observed lag/overwrite metrics for at least one steady-state day. Marconi does not block on the Redis writer’s health. When the writer is unreachable, the audit queue grows; when the queue fills, the oldest entries are overwritten and counted as marconi_audit_queue_overwrite_total — the operator-visible loss event under SPEC-101 v0.4 §1 loss-budget. This is a Stage 3 deliverable; the queue does not exist in Stage 1 / Stage 2. It is named here so the §4 and §6 references resolve to a concrete in-memory structure rather than an undefined “ring.”

4. Audit fan-out (FastAPI background pipelines)

Each pipeline is a FastAPI in-process background task with its own failure mode. This preserves the SPEC-044 boundary: MCP shim → FastAPI → store. Marconi continues serving the hot path regardless of any pipeline’s health.

4.1 Redis Stream writer

Consumes the in-memory audit queue (§3.6) and appends each entry to marconi:signals:{tenant_id} Redis Stream with MAXLEN ~7d (approximate trim by time). This stream is the Marconi Cache and the durable boundary for v0.4.
  • Failure: writer falls behind, audit-queue depth grows. Operator alerts on marconi_redis_writer_lag_seconds and marconi_audit_queue_depth.
  • Restart: writer resumes from last-acknowledged audit-queue offset (held in Redis as a key for restart safety).
  • Handoff: every successful XADD wakes the PG archiver’s blocking consumer group read; “once it hits cache it immediately goes to PG.”

4.2 PG archiver

Consumes the Marconi Cache Redis Stream as a consumer group with XREADGROUP COUNT 1 BLOCK 0. Each stream entry is drained near-immediately into PG audit with idempotent signal_id UPSERT semantics against signal_queue. Same pattern for signal_trace_events and signal_obligations. No batching. The v0.3 placeholder “batches writes (e.g., 100 signals per INSERT)” is explicitly replaced. The archiver optimizes for near-immediate audit persistence, not throughput batching. If future production scale needs batching, that requires a new SPEC amendment and must not weaken the “cache immediately goes to PG” operator contract.
  • Failure: archiver falls behind, Redis Stream grows; alert at marconi_pg_archiver_lag_seconds > N. Signals still flow.
  • Recoverability: Redis Stream is the upstream. If sustained archiver lag exceeds Redis MAXLEN ~7d, the trimmed window is the audit gap (operator-visible, alerted).

4.3 OTEL emitter

Per-signal: increments marconi_signals_received_total, marconi_signals_delivered_total, etc. Background; never blocks.

4.4 Neo4j edge builder

Same pattern: consumer reads from the Redis Stream (or directly from the audit queue if co-located), builds edges asynchronously.

5. Cache invalidation

No Redis pubsub. All session-state changes happen inside the same Python process as Marconi’s tables. Invalidation is a direct function call:
Source of changeHook
controller_service.registerAfter SessionStore write success → marconi.routing.put(...) and marconi.registration.put(...)
controller_service.deregister_sessionmarconi.routing.invalidate_project(...), marconi.registration.invalidate_session(...)
controller_service.handoff_master / claim_mastermarconi.routing.invalidate_project(...)
controller_service.stamp_heartbeat / stamp_engagementmarconi.routing.update_freshness(...) (cheap field update; does not invalidate the entire entry)
KeyspaceReaper (Redis TTL expiry)On expired session key → marconi.routing.invalidate_project(...) + marconi.registration.invalidate_session(...)
WS connectmarconi.routing.attach_ws_handle(...)
WS disconnectmarconi.routing.detach_ws_handle(...)
All synchronous, all in-process, zero pubsub round-trips.

6. Process-restart recovery

Cold start: in-memory tables empty until rehydrated.
  1. Backend starts → Marconi initializes empty tables.
  2. Rehydrate routing + registration from SessionStore (Redis) — sub-second (Redis remains the cross-restart durability anchor for active sessions).
  3. Backend accepts WS connections; shims reconnect, re-register (idempotent), and Marconi attaches WS handles.
  4. Window: ~1-2s where new signals route to “no subscriber, queued offline.” Acceptable for restart cadence.
In-flight audit-queue entries (§3.6) that hadn’t yet been written to the Redis Stream at restart time are lost (network-grade durability per Frank’s loss-budget signoff).

7. Durability opt-in (separate SPEC)

Some callers may require synchronous-on-return durability — prism_postmortem, prism_decide, etc. Already separate verbs, mostly already PG-direct through FastAPI. Per-call durable=true flag proposed; out of scope for v0.4.

8. Migration path (v0.4 scope)

Each implementation stage lands as its own PR with fine-grain feature flags. Old code path remains operational; flags flip traffic. Rollback per flag.

Stage 0 — Design gate (DONE; merged in PR #279, updated by this v0.4 rev)

Loss budget, recovery invariants, runbook, test stubs all in place. Frank loss-budget signoff captured. v0.3 simplifies further (R5/R6/R8 from §2 sub-doc remain; R4/R7 already removed in v0.2; pubsub-related rollback wording cleaned up).

Stage 1 — Process-local routing + registration cache (DONE; PR #280)

backend/app/marconi/{_cache,routing,registration}.py + flag-gated wiring. Default OFF.

Stage 2 — Cache lifecycle: direct write-through hooks + startup warmup

Wire direct invalidation hooks at every site that mutates session state. No pubsub subscriber anywhere. Required before MARCONI_ROUTING_TABLE_READS / MARCONI_REGISTRATION_TABLE_READS flips ON. Hook list (canonical, must all exist):
Hook siteMarconi call
controller_service.register (after SessionStore.register success)invalidator.on_register(tenant_id, project_id, session_id)
controller_service.deregister_session (after SessionStore.release success)invalidator.on_deregister(tenant_id, project_id, session_id)
controller_service.handoff_master (after master flag flip)invalidator.on_master_change(tenant_id, project_id)
controller_service.claim_master (after master flag flip)invalidator.on_master_change(tenant_id, project_id)
controller_service.stamp_heartbeat / stamp_engagementinvalidator.on_heartbeat(tenant_id, project_id, session_id) (cheap freshness update; does not full-invalidate)
KeyspaceReaper (Redis TTL expiry)invalidator.on_session_expired(tenant_id, project_id, session_id)
WS connect (session_stream handler attach)invalidator.on_ws_connect(session_id, ws_handle)
WS disconnectinvalidator.on_ws_disconnect(session_id)
marconi/lifecycle.py provides warm_caches_from_store(store) called from FastAPI lifespan after SessionStore connect. Test gates required before either flag flips ON:
  • Every hook above has a unit test covering the cache state transition.
  • Startup-warmup correctness test: post-warmup, the next routing read for an active session is a cache hit.
  • Stale-route bound test: under continuous register/deregister churn, the maximum staleness window observed is < hook-latency + cache-TTL backstop.
Exit criteria: cache hit-rate ≥ 99%; SessionStore reads converge to startup-rehydrate + write-through misses only; marconi_invalidator_errors_total is zero in steady state.

Stage 3 — In-memory audit queue + Redis Stream writer (audit fan-out lands BEFORE the hot-path cutover)

Add the Marconi audit queue (§3.6) and the Redis Stream writer (§4.1) consuming it. Marconi appends every accepted signal envelope to the audit queue synchronously inside send_signal; the writer drains async to marconi:signals:{tenant} Redis Stream. Critically: Stage 3 lands the audit pipeline while the legacy signal_service.py synchronous PG write is still active. Both paths produce records in parallel. This is intentional — the hot-path cutover (Stage 5) cannot remove the legacy PG write until the new audit pipeline is proven downstream-equivalent. Flag MARCONI_AUDIT_QUEUE_WRITE (audit queue active), flag MARCONI_REDIS_STREAM_WRITER (writer active). Exit criteria: every accepted signal lands in marconi:signals:{tenant} via the audit queue; writer lag stable; marconi_audit_queue_overwrite_total is zero in steady state.

Stage 4 — PG archiver (Marconi Cache → PG audit)

Consumer group on Marconi Cache Redis Stream with XREADGROUP COUNT 1 BLOCK 0; idempotent UPSERT into the existing signal_queue table plus companion writes to signal_trace_events and signal_obligations where the stream entry carries those records. Runs in shadow mode first — writes to parallel signal_queue_shadow, signal_trace_events_shadow, and signal_obligations_shadow tables for byte-for-byte comparison against the legacy synchronous PG path. Once parallel correctness is proven, flip to primary and the legacy PG write becomes redundant. The archiver is a FastAPI in-process background task, not an MCP-side worker. Every cache write should trigger a near-immediate PG write; the expected steady-state lag is effectively one stream entry. Flags: MARCONI_PG_ARCHIVER_SHADOW, MARCONI_PG_ARCHIVER_PRIMARY. Exit criteria: shadow archiver matches direct-write 100% over 24h; primary archiver runs cleanly with idempotent UPSERTs; marconi_pg_archiver_lag_seconds remains stable and near-zero under normal load; ready for Stage 5 cutover.

Stage 5 — Hot-path inversion (status: shipped path)

The two changes that flip the architecture from legacy to Marconi:
  1. WS delivery direct-push. Replace the signal_service.publish_targeted_event → Redis pubsub → session_stream._forward_pubsub chain with: signal_service.send_signal looks up the recipient’s WS handle in Marconi’s routing table and pushes directly. WS handler attaches/detaches via the §3.3 attach hooks (already in Stage 2) and no longer subscribes to Redis pubsub when Marconi-attached.
  2. Synchronous PG write removed. With Stage 4 archiver primary, the legacy synchronous PG INSERT in send_signal is dead code; remove it.
Flags: MARCONI_HOT_PATH_SEND, MARCONI_OBLIGATIONS_MEMORY_PRIMARY. Exit criteria: p99 send latency drops to target (<5ms); zero Redis-pubsub publishes on the signal hot path; WS handler does not subscribe to Redis pubsub when Marconi-attached; legacy synchronous PG INSERT removed from send_signal. Stage 5 cannot ship until Stage 4 archiver is primary — otherwise accepted signals would be delivered but not recorded. Pre-stamped shipped path: once a code PR demonstrates all Stage 5 exit criteria with MARCONI_HOT_PATH_SEND=true, MARCONI_PG_ARCHIVER_PRIMARY=true, and no synchronous PG write in send_signal, SPEC-101 Stage 5 may be marked status: shipped without another architecture revision. The implementation PR must cite this v0.4 clause and include the validation output.

Fine-grain feature flags

  • MARCONI_ROUTING_TABLE_READS — read routing from process-local cache vs SessionStore fallback (Stage 1, default OFF)
  • MARCONI_REGISTRATION_TABLE_READS — sister flag for registration (Stage 1, default OFF)
  • MARCONI_AUDIT_QUEUE_WRITE — Marconi appends accepted signals to in-memory audit queue (Stage 3; default OFF)
  • MARCONI_REDIS_STREAM_WRITER — Marconi async-drains the audit queue to Redis Stream (Stage 3; default OFF)
  • MARCONI_PG_ARCHIVER_SHADOW — PG archiver runs in shadow mode for byte-comparison (Stage 4; default OFF)
  • MARCONI_PG_ARCHIVER_PRIMARY — PG archiver is the only writer to canonical PG signal tables (Stage 4; default OFF)
  • MARCONI_HOT_PATH_SENDprism_signal API uses direct WS push from Marconi’s table (Stage 5 cutover; default OFF)
  • MARCONI_OBLIGATIONS_MEMORY_PRIMARY — obligations operate on in-memory index (Stage 5 sibling; default OFF)
  • MARCONI_VERB_<name> — per-verb migration flag (Stage 5; default OFF)
  • MARCONI_CROSS_INSTANCE_FORWARD — multi-process cross-instance routing (post-v0.4; default OFF)

9. Counters / observability

Every metric below MUST exist before Stage 5 hot-path cutover.

Hot-path counters (per tenant)

  • marconi_signals_received_total{tenant,signal_type} — at API entry
  • marconi_signals_delivered_total{tenant,signal_type,outcome} — outcome ∈
  • marconi_signals_acked_total{tenant,signal_type,ack_kind} — final-delivery evidence
  • marconi_signal_send_duration_seconds{tenant} — histogram, p50/p95/p99
  • marconi_routing_table_size{tenant} — gauge
  • marconi_routing_cache_hits_total{tenant} / marconi_routing_cache_misses_total{tenant}

Audit queue + fanout counters

  • marconi_audit_queue_depth{tenant} — gauge
  • marconi_audit_queue_overwrite_total{tenant} — counter (loss event)
  • marconi_redis_writer_lag_seconds{tenant} — gauge
  • marconi_redis_writer_errors_total{tenant,reason} — counter
  • marconi_pg_archiver_lag_seconds{tenant} — gauge
  • marconi_pg_archiver_errors_total{tenant,reason} — counter

Stage 2 invalidator counters

  • marconi_invalidator_calls_total{hook} — counter (hook ∈ on_register, on_deregister, on_master_change, on_heartbeat, on_session_expired, on_ws_connect, on_ws_disconnect)
  • marconi_invalidator_errors_total{hook,reason} — counter (non-fatal hook errors; non-zero in steady state is a paging incident)

Obligation counters

  • marconi_obligations_open{tenant,kind} — gauge
  • marconi_obligations_durable_total{tenant,kind} — counter
  • marconi_obligations_degraded_not_durable_total{tenant,kind} — counter
  • marconi_obligation_sla_violation_total{tenant,kind,sla} — counter

10. Naming propagation

  • Module/package: backend/app/marconi/ (replacing backend/app/services/signal_service.py)
  • Logs: marconi: prefix on all log lines from the service
  • Metrics: marconi_* namespace
  • Documentation: SPECs/ADRs/postmortems refer to “Marconi” as a proper noun
  • MCP verbs: unchanged — keep prism_signal_* external API contract stable
  • Operator-facing surfaces (prism_status, dashboard widgets): refer to “Marconi”

11. Open questions

Resolved in v0.1.x → v0.2 → v0.3

  • Ring buffer sizing, obligation persistence, session disambiguation (v0.1.1)
  • Local durable spool (v0.2 operator scope reduction)
  • Stage 1 PG-fallback wording (v0.2.2 alignment)
  • Pubsub-driven cache invalidation (v0.3 — direct write-through hooks; pubsub fully out)

Carrying beyond v0.4

  1. Cross-tenant isolation cost analysis at production multi-tenant scale.
  2. Multi-process cross-instance routing — sticky-partition or pubsub-invalidation; pick during a later revision.
  3. Durability-recovery algorithms for the “delivered but not recorded” set when Redis is down longer than the audit queue holds.

12. References

  • ADR-56 — locks the MUST and the rename
  • SPEC-100 v0.1 — loopback diagnostic; the PR (#276) where the original drift surfaced
  • Postmortem 1736e40d — drift root-cause
  • Texi ratification chain — signals 37140b982e1d6a4e4302012d8620d2f5
  • Frank operator architecture lock (2026-05-10): “the signal flow is sender shim → Marconi → receiver shim. That is it. Marconi then pushes signal to Redis, then Redis pushes to PG. Redis is by no means in the signal flow.”
  • Frank operator v0.4 GO (2026-05-10 23:30Z): “Marconi switch in-mem → Marconi Cache 7d → PG audit”; “once it hits cache it immediately goes to PG.”
  • SPEC-044 — MCP boundary: editor-host MCP shim remains thin; FastAPI owns backend store writes.
  • Memory: feedback_async_cancellation_pool_leak, feedback_optimize_later, feedback_eliminate_failures_improve_perf
Last modified on June 7, 2026