mts1b-platform
Shared platform layer: auth, eventbus, observability, messaging, logging, config, DB pools, retry, ratelimit, HTTP, secrets redaction. Transport-only.
Repo: github.com/MTS1B/mts1b-platform Layer: 1 Depends on: foundation + (httpx, pydantic-settings, structlog, opentelemetry, nats-py, redis-py, psycopg, ...) Audience: every service repo
What it is
The single canonical implementation of 11 cross-cutting concerns that institutional codebases centralize but the source monorepo had scattered. Each was deduped from many copies:
| Module | Was | Now |
|---|---|---|
messaging | 6 places sending Telegram | one dispatcher |
calendars | 5+ places computing market sessions | one singleton |
symbology | 3+ places normalizing BTC-USD vs BTCUSD | one normalizer |
logging | scattered handler wiring | one factory |
config | scattered BaseSettings | one config tree |
db | scattered pool wrappers | one pool registry |
retry | scattered tenacity wrappers | one decorator family |
ratelimit | scattered token buckets | one limiter |
http | scattered httpx factories | one client factory |
security/redact | scattered log filters | one redactor |
observability | scattered metrics + tracing wiring | one OTel setup |
Module layout
mts1b_platform/
├── auth/ # JWT, mTLS service tokens
├── audit/ # Merkle-hashed audit chain
├── eventbus/ # NATS connect + JetStream wrappers + schema validation
├── observability/ # OpenTelemetry + Prometheus + structured logs
├── messaging/ # Telegram, Slack, Discord, PagerDuty dispatch
├── calendars/ # NYSE, NASDAQ, CME, Coinbase 24/7, ...
├── symbology/ # symbol normalization across venues
├── logging/ # get_logger() factory + JSON formatter
├── config/ # Vault-aware Pydantic Settings
├── db/ # Postgres pool, DuckDB wrapper, NATS K/V
├── retry/ # @with_retry, async_retry context manager
├── ratelimit/ # RateLimiter (token bucket, fixed window, sliding window)
├── http/ # http_client(name, retries, timeout) factory
└── security/
└── redact.py # log filter + manual redact()
Top APIs
get_logger
from mts1b_platform.logging import get_logger
log = get_logger(__name__)
log.info("order received", extra={"order_id": "abc", "symbol": "SPY"})
# {"ts": "2026-05-23T19:00:00Z", "level": "INFO", "msg": "order received",
# "logger": "mts1b_oms.service", "order_id": "abc", "symbol": "SPY", ...}
JSON-structured by default, human-readable in dev (MTS1B_LOG_FORMAT=text). Secrets are auto-redacted by the redact filter.
load_config
from mts1b_platform.config import load_config
from mts1b_foundation.funds import FundConfig
class OmsSettings(BaseSettings):
nats_url: str = "nats://localhost:4222"
db_dsn: str
log_level: str = "INFO"
funds: list[FundConfig]
settings = load_config(OmsSettings, vault_path="secret/mts1b/oms")
Resolution order: env var → Vault → .env → defaults. Vault paths support templating: {{ env "FUND_ID" }} in YAML.
http_client
from mts1b_platform.http import http_client
async with http_client("ibkr", base_url="https://api.ibkr.com",
timeout=20, retries=3) as c:
r = await c.get("/v1/positions")
Pre-configured with: OTel tracing headers, retry with backoff, rate-limit aware, secret-aware log filtering (the request body is redacted before logging).
with_retry
from mts1b_platform.retry import with_retry
@with_retry(retries=3, backoff="exp", base=1.0, max_delay=30.0,
retry_on=(httpx.HTTPStatusError, asyncio.TimeoutError))
async def fetch_quote(symbol: str) -> Quote:
...
RateLimiter
from mts1b_platform.ratelimit import RateLimiter
limiter = RateLimiter(name="polygon", rate=5, per_seconds=60)
async with limiter:
quote = await fetch_quote_from_polygon(...)
Per-name registry; same name across services shares the budget.
messaging.send
from mts1b_platform.messaging import send
await send(channels=["telegram", "slack"],
level="warning",
subject="Risk gate fired",
body=f"Order {order.order_id} rejected: max_position_pct exceeded",
data={"order_id": order.order_id, "envelope_id": envelope.envelope_id})
Routes to configured channels per level. Per-service Vault paths control which channels are wired. Free-tier rate limits applied per channel.
calendars
from mts1b_platform.calendars import market_calendar
cal = market_calendar("NYSE")
cal.is_trading_day(date(2026, 5, 25)) # False (Memorial Day)
cal.next_open(datetime.utcnow()) # next NYSE open
cal.sessions(asof, lookback_days=21) # list of (open, close) datetimes
# 24/7 markets
crypto = market_calendar("COINBASE")
crypto.is_trading_day(date(2026, 12, 25)) # True (crypto never closes)
symbology.normalize
from mts1b_platform.symbology import normalize
normalize("BTCUSD", venue="coinbase") # "BTC-USD"
normalize("btc/usd", venue="binance") # "BTC-USD"
normalize("BTC.USD", venue="kraken") # "BTC-USD"
One canonical format (BASE-QUOTE) inside MTS1B; adapter-specific renderers handle the wire format.