Event bus (NATS)
Every MTS1B service communicates over NATS for live trading and event observability. This page explains the subject taxonomy, the wire format, and the durability/exactly-once guarantees.
Why NATS
| Want | Why NATS |
|---|---|
| Low latency (≤ 1ms p99) | NATS Core is in-memory pub/sub, no Kafka-style commit log |
| At-least-once for orders/fills | NATS JetStream gives durable streams + ack semantics |
| Wildcards for ops dashboards | mts.oms.*.> subscribes to everything OMS publishes |
| Self-hosted, no cloud dep | Single Go binary; runs on Proxmox LXC |
| TLS + JWT auth out of the box | mTLS for service-to-service, JWT for human consumers |
We considered Kafka (too heavy for our scale), Redis Streams (not durable enough), and RabbitMQ (operationally finicky). NATS won on simplicity.
Subject taxonomy
mts.<version>.<source-repo>.<noun>.<verb>[.fund.<fund_id>]
Conventions:
<version>— schema version (v1,v2, ...); allows in-place upgrade<source-repo>— the repo that PUBLISHES this subject (single producer per subject)<noun>— domain entity (orders, fills, quotes, signals, ...)<verb>— past-tense action (created, updated, filled, canceled, ...)fund.<fund_id>— optional partition key for fund-scoped subscriptions
Stable subjects (Wave 1)
| Subject | Producer | Payload |
|---|---|---|
mts.v1.oms.orders.created | mts1b-oms | Order |
mts.v1.oms.orders.accepted | mts1b-oms | Order |
mts.v1.oms.orders.rejected | mts1b-oms | Order (with rejected_reason) |
mts.v1.oms.orders.canceled | mts1b-oms | Order |
mts.v1.oms.fills.created | mts1b-oms | Fill |
mts.v1.brokers.<broker>.fills.raw | mts1b-brokers | broker-native fill record |
mts.v1.marketdata.quotes.<symbol> | mts1b-marketdata | Quote |
mts.v1.marketdata.bars.<symbol>.<interval> | mts1b-marketdata | Bar |
mts.v1.research.signals.published | mts1b-research | Signal |
mts.v1.risk.envelope.updated | mts1b-riskengine | RiskEnvelope |
mts.v1.risk.gate.failed | mts1b-riskengine | OrderRejection |
mts.v1.treasury.nav.<fund_id> | mts1b-treasury | NavSnapshot |
mts.v1.operations.halt.requested | mts1b-operations | HaltRequest |
Durability tiers
Not every event needs the same guarantees:
| Tier | Subjects | NATS feature | Use |
|---|---|---|---|
| Best-effort | quotes, bars, raw market data | NATS Core | High-frequency, ephemeral. Drop on reconnect is fine. |
| At-least-once | orders, fills, signals, risk envelopes | JetStream stream + consumer ack | Trading events. Must survive restart. |
| At-least-once + dedupe | order intents | JetStream + idempotency key on consumer | Replay-safe. Consumer dedupes on idempotency_key. |
Streams are configured in mts1b-platform/eventbus/streams.py:
# OMS_ORDERS stream
{
"name": "OMS_ORDERS",
"subjects": ["mts.v1.oms.orders.*"],
"retention": "limits",
"max_age": "7d",
"max_msgs_per_subject": 1_000_000,
"storage": "file",
"num_replicas": 1, # 3 in production
"discard": "old",
}
Wire format
All payloads are JSON-encoded pydantic models from mts1b-foundation. Headers carry routing/auth:
NATS-Msg-Id: <foundation Order.idempotency_key>
content-type: application/json
mts-trace-id: <opentelemetry trace id>
mts-source-repo: mts1b-oms
mts-schema: mts1b_foundation.orders.Order:v1
mts-actor: crypto_dual_paper
The mts-schema header pins the producer's understanding. Consumers reject (with metric nats.consume.schema_mismatch) anything they can't validate.
Subscribing — minimal example
import asyncio
from mts1b_platform.eventbus import connect
from mts1b_foundation.orders import Order
async def main():
nc = await connect() # uses platform/config — TLS + auth handled
js = nc.jetstream()
sub = await js.subscribe(
"mts.v1.oms.orders.filled",
durable="my-consumer", # JetStream tracks ack state
manual_ack=True,
)
async for msg in sub.messages:
order = Order.model_validate_json(msg.data)
try:
handle(order)
await msg.ack()
except Exception as e:
# NATS will redeliver up to max_deliver times, then move to DLQ
await msg.nak(delay=30.0)
asyncio.run(main())
Producing — minimal example
from mts1b_platform.eventbus import connect
from mts1b_foundation.orders import Order
nc = await connect()
js = nc.jetstream()
order = Order(...) # validated pydantic model
ack = await js.publish(
subject="mts.v1.oms.orders.created",
payload=order.model_dump_json().encode(),
headers={
"NATS-Msg-Id": order.idempotency_key, # dedupe key for JetStream
"mts-schema": "mts1b_foundation.orders.Order:v1",
"mts-source-repo": "mts1b-oms",
"mts-actor": order.actor,
},
)
# ack contains stream + sequence — log for audit
JetStream's built-in NATS-Msg-Id dedupe (configurable window, default 2 min) means duplicate publishes within the window are dropped — so retrying the producer is safe.
Observability
Every service uses mts1b-platform/observability:
- Prometheus counters
nats_publish_total,nats_consume_total,nats_consume_errors_total, labeled by subject + repo - OpenTelemetry traces propagate via
mts-trace-idheader - Slow-consumer alerts in
mts1b-operationsfire if a JetStream consumer lag > 30 s
Local dev: running NATS
# Single-node, no auth (dev only — DO NOT use in production)
docker run -p 4222:4222 -p 8222:8222 nats:2-alpine -js -m 8222
# With mts1b-deploy
mts1b-deploy install --profile minimal --include nats
For production, see mts1b-deploy/nats/ for the full TLS + clustered + JetStream config.
Subject reservation
If you're adding a new subject:
- Choose
mts.v1.<your-repo>.<noun>.<verb>per convention. - Add the schema to
mts1b-foundation. - Open a PR against
mts1b-foundationupdatingsubjects.md. - CI enforces uniqueness: no two repos can claim the same subject prefix.
This prevents the "two services publishing slightly different orders.created shapes" problem we saw in the monorepo.