Are you an LLM? Read llms.txt for a summary of the docs, or llms-full.txt for the full context.
Skip to content
Go deeper

Pub-Sub & Semantic Subscriptions

Build event-driven workflows with run-scoped control streams and semantic subscriptions on the data plane.

Polling state endpoints does not scale for reactive systems. MuBit exposes two streaming surfaces so workers wake on state changes instead of spinning.

Decision model

StreamScopeMethodBest use
Run-scoped control eventsrun_idclient.control.subscribe(...) / client.advanced.subscribe(...) (SSE or gRPC)Workflow coordination inside one run: ingestion completion, checkpoints, lesson validation and promotion, handoffs, outcome recording, drift signals
Core data-plane pub-subUser + filterclient.core.subscribe_events(...) (SSE or gRPC)Topic-triggered automation across broader state: wake on node insert/update/delete, wake when a newly-written node crosses a semantic-similarity threshold

Both surfaces stream events as they happen. The subscription lives for the lifetime of the HTTP or gRPC stream — on disconnect the server cleans up the subscription automatically.

Control event stream

Run-scoped events fire for every meaningful control-plane state change: ~24 types covering ingest, reflection, checkpointing, outcome recording, handoffs, prompt activations, drift, project/agent/skill CRUD, run-monitor signals. Filter with event_types to keep the stream narrow.

Lesson lifecycle events flow through this stream too, and reflect a gated candidate lifecycle. A freshly reflected lesson enters as a pending candidate and is not trusted immediately — it stays pending until it passes a held-out validation gate. Watch for context.lesson_validation_passed (candidate marked active) and context.lesson_validation_failed (candidate rejected and down-weighted) alongside context.lesson_promoted; promotion now reflects a validated lesson rather than an unconditional one.

events = client.advanced.subscribe({
    "run_id": run_id,
    "event_types": ["context.ingest_completed", "context.checkpoint_created"],
})
for event in events:
    print(event["type"], event["payload"])
const stream = await client.control.subscribe({
  run_id: runId,
  event_types: ["context.ingest_completed", "context.checkpoint_created"],
});
for await (const event of stream) {
  console.log(event.type, event.payload);
}

Events include id, type, run_id, agent_id, payload (event-specific JSON), and created_at. Recent events are replayable from Redis Streams (default retention 10,000 per run, env-configurable).

Core semantic subscriptions

core.subscribe_events opens an SSE (or server-streaming gRPC) connection that delivers pub-sub events as they are published by the storage layer. Every event yielded by the SDK has the same shape across Python, JavaScript, and Rust — and across both HTTP and gRPC transports. The first event on the stream is always subscribed; after that, events are node.inserted, node.updated, node.deleted, or memory.added.

Event shape

Every event is a dict / object with a type discriminator plus the fields relevant to that type. The SDK wrappers parse metadata_json / entry_json wire strings into native metadata / entry objects and strip proto3 scalar defaults so callers see a tight shape:

typeFields
subscribedsubscription_id
node.inserted / node.updatednode_id, run_id, metadata, created_at, updated_at
node.deletednode_id
memory.addedsession_id, entry

Example stream:

{"type":"subscribed","subscription_id":42}
{"type":"node.inserted","node_id":7,"run_id":"r1","metadata":{"intent":"fact"},"created_at":1714000000,"updated_at":1714000000}
{"type":"node.deleted","node_id":7}
# Fire only when a node whose vector is near the "billing escalation" embedding lands.
events = client.core.subscribe_events({
    "filter_type": "semantic",
    "query_text": "billing escalation",
    "threshold": 0.82,
})
 
for event in events:
    if event["type"] == "subscribed":
        print("subscription id:", event["subscription_id"])
        continue
    if event["type"] == "node.inserted":
        node_id = event["node_id"]
        # Wake the agent loop, page an on-call, enqueue a follow-up recall, …
        handle_billing_escalation(node_id)
use serde_json::json;
use tokio_stream::StreamExt;
 
let mut stream = client.core.subscribe_events(json!({
    "filter_type": "semantic",
    "query_text": "billing escalation",
    "threshold": 0.82
})).await?;
 
while let Some(event) = stream.next().await {
    let evt = event?;
    if evt["type"] == "node.inserted" {
        let node_id = evt["node_id"].as_u64().unwrap_or(0);
        // react…
    }
}

Filter types

filter_typeWakes on
"all"Any node insert/update/delete (use for admin tooling, not production)
"node"Events touching a single node_id (needs node_id in the request)
"semantic"Inserts/updates whose vector exceeds threshold cosine similarity to the encoded query_text. Default threshold 0.8.
"session"memory.added events scoped to one session id

ACL filtering runs server-side: subscribers only see events for nodes they have permission to read.

Cleanup and lifecycle

  • Close the SSE response / drop the gRPC stream to end the subscription. The server unsubscribes automatically — you do not need to call unsubscribe explicitly.
  • POST /v2/core/pubsub/unsubscribe { subscription_id } exists for admin tooling or for cleaning up subscriptions whose originating client was killed before it could close the stream.
  • POST /v2/core/pubsub/list returns the active subscription IDs for the calling user plus the server-wide count — useful for monitoring.

Failure modes and troubleshooting

SymptomRoot causeFix
Too many eventsBroad filter ("all") or a low semantic thresholdNarrow filter_type, raise threshold, or filter client-side by event_types on the control stream
Missing eventsSubscriber scoped to a different run_id (control stream) or subscribed before the data plane had permission to read the emitting node (core stream)Verify the producer and subscriber share a run_id; check ACLs on the node class you expect to see
SSE stream idle for minutesProxy / load balancer idle timeoutThe server emits SSE keep-alives, but some proxies still cut idle connections — configure the proxy for long-lived SSE or fall back to the gRPC surface
Core subscribe returns 404 / deniedData-plane access policy not enabled for the route/v2/core/pubsub/* sits under the core route policy. See Core Direct Lanes and Policy for the flag set and rollout guidance

Next steps