-
Notifications
You must be signed in to change notification settings - Fork 1
Market Data
The market-data service is a shared, in-memory quote cache. Quote producers push the latest snapshot for each instrument; policies pull the current snapshot when they need a price. It is the data source the spot funds policy uses to price market orders, and it can be passed to any policy that needs live quotes.
The service is pull-based: it never calls back into producer or policy code. A producer writes the newest snapshot whenever it has one, and a reader gets whatever is currently stored. There are no callbacks, queues, or background threads.
This page covers registration, synchronization, quote buckets, and the read/write API. Two related topics live on their own pages:
- Market Data TTL - quote freshness and the eight-tier TTL cascade.
- Market Data Pricing - how the spot funds policy prices market orders from the cache.
For the full threading model see Threading Contract.
Every instrument gets a stable identifier the first time it is registered. All subsequent reads and writes use that identifier, which is cheaper than resolving the instrument by name on every quote. Registration is one-time; a second attempt to register the same instrument fails.
The market-data builder comes from the engine builder via the market data builder
method, so its synchronization mode is derived from the engine's - there is no
silent default:
- A no-sync engine yields a no-sync, single-threaded service whose internal
locks are a genuine no-op (free). Push and read it from the same thread the
engine runs on. Call
full syncon the market-data builder to upgrade it to a fully-synchronized service when a background producer must publish quotes concurrently with the engine. - A full-sync or account-sync engine yields a fully-synchronized service (real locks); it cannot be downgraded.
Across every SDK the engine-builder path is the standard way to obtain a service; prefer it in all normal code. Where an SDK also exposes a lower-level constructor for the service, treat it as an advanced escape hatch rather than the default.
Quotes are stored per instrument in three conceptual buckets:
-
Per-account bucket - a quote targeted at one specific account via
push for. -
Per-group bucket - a quote targeted at an entire account group
via
push for. -
Default bucket - the "everyone-else" quote written by the plain
push/push patchcalls; internally it is the bucket of the reserveddefault account group(id0).
A push for call targets any combination of individual accounts and groups in
one shot, all sharing the same pushed at instant. Passing the
default account group in the group list targets the default bucket directly.
A quote carries an optional mark, bid, and ask. There are two writing
semantics:
-
Replace (
push/push for) - writes the supplied quote in full; any field left unset is cleared from the target bucket. -
Patch (
push patch/push for patch) - merges into the stored snapshot; only the fields you set change, the rest keep their prior values.
A read supplies the reading account, an account info object (which
provides the account group lazily on demand), and a resolution mode that
controls which buckets are consulted, in order:
| Mode | Buckets consulted |
|---|---|
account only |
Per-account only |
account then group |
Per-account, then the account's group |
account then group then default |
Per-account, then group, then default |
account then group then default is the widest mode and is what the
spot funds policy uses internally.
Use get optional when absence is normal and get when you want to
distinguish an unknown instrument from a quote that exists but is unusable
(the latter raises an error on an expired quote that carries the stale value).
Go
// noGroupInfo is a minimal AccountInfo whose reading account has no group.
type noGroupInfo struct{}
func (noGroupInfo) AccountGroup() optional.Option[param.AccountGroupID] {
return optional.None[param.AccountGroupID]()
}
service, err := openpit.NewEngineBuilder().
FullSync().
MarketData(marketdata.InfiniteTTL()).
Build()
if err != nil {
panic(err)
}
defer service.Close()
aapl, _ := param.NewAsset("AAPL")
usd, _ := param.NewAsset("USD")
instrument := param.NewInstrument(aapl, usd)
aaplID, err := service.Register(instrument)
if err != nil {
panic(err)
}
// Publish a full snapshot into the default ("everyone-else") bucket.
mark, _ := param.NewPriceFromString("150")
bid, _ := param.NewPriceFromString("149.5")
ask, _ := param.NewPriceFromString("150.5")
if err := service.Push(
aaplID,
marketdata.NewQuote().WithMark(mark).WithBid(bid).WithAsk(ask),
); err != nil {
panic(err)
}
// Read for an account with no group: the lookup falls through to the
// default bucket. Pass any marketdata.AccountInfo; in policy code this is
// usually the pretrade.Context. The test mirror uses a no-group stub.
accountID := param.NewAccountIDFromUint64(1)
quote, ok := service.GetOptional(
aaplID,
accountID,
noGroupInfo{},
marketdata.QuoteResolutionAccountThenGroupThenDefault,
).Get()
if !ok {
panic("quote must be present")
}
gotMark, _ := quote.Mark().Get()
if !gotMark.Equal(mark) {
panic("unexpected mark")
}
gotBid, _ := quote.Bid().Get()
if !gotBid.Equal(bid) {
panic("unexpected bid")
}
// Resolve recovers the id from the instrument name.
resolved, ok := service.Resolve(instrument)
if !ok || resolved.String() != aaplID.String() {
panic("unexpected resolve result")
}Python
import types
import openpit
import openpit.marketdata
# no_sync: the engine spawns no OS threads; each call runs on the caller's
# thread. See Threading-Contract for the full model.
service = (
openpit.Engine.builder()
.no_sync()
.market_data(openpit.marketdata.QuoteTtl.infinite())
.build()
)
aapl = openpit.Instrument("AAPL", "USD")
aapl_id = service.register(aapl)
# Publish a full snapshot into the default ("everyone-else") bucket.
service.push(
aapl_id,
openpit.marketdata.Quote(mark="150", bid="149.5", ask="150.5"),
)
# Read for an account with no group: the lookup falls through to the
# default bucket. account_info exposes .account_group returning
# AccountGroupId | None. In policy code this is usually the pre-trade
# context; here we use a simple stand-in.
account_id = openpit.param.AccountId.from_int(1)
account_info = types.SimpleNamespace(account_group=None)
quote = service.get(
aapl_id,
account_id,
account_info,
openpit.marketdata.QuoteResolution.ACCOUNT_THEN_GROUP_THEN_DEFAULT,
)
assert quote.mark == openpit.param.Price("150")
assert quote.bid == openpit.param.Price("149.5")
# resolve recovers the id from the instrument name.
assert service.resolve(aapl) == aapl_idRust
use openpit::param::{AccountId, AccountGroupId, Asset, Price};
use openpit::{Engine, Instrument, Quote, QuoteResolution, QuoteTtl};
let service = Engine::builder::<(), (), ()>().no_sync().market_data(QuoteTtl::Infinite).build();
let aapl = Instrument::new(Asset::new("AAPL")?, Asset::new("USD")?);
let aapl_id = service.register(aapl.clone())?;
// Publish a full snapshot into the default ("everyone-else") bucket.
service.push(
aapl_id,
Quote::new()
.with_mark(Price::from_str("150")?)
.with_bid(Price::from_str("149.5")?)
.with_ask(Price::from_str("150.5")?),
)?;
// Read for an account with no group: the lookup falls through to the
// default bucket.
let account = AccountId::from_u64(1);
let quote = service
.get(
aapl_id,
account,
&None::<AccountGroupId>,
QuoteResolution::AccountThenGroupThenDefault,
)
.expect("quote must be present");
assert_eq!(quote.mark, Some(Price::from_str("150")?));
assert_eq!(quote.bid, Some(Price::from_str("149.5")?));
// resolve recovers the id from the instrument name.
assert_eq!(service.resolve(&aapl), Some(aapl_id));push for (and push for patch for patch semantics) fans a single quote out to
a list of accounts and a list of groups in one call. All targets share the same
pushed at instant. Passing the default account group in the group list writes
the default bucket. Calling push for with both lists empty is a caller error
(no target).
Go
groupID, _ := param.NewAccountGroupIDFromUint32(7)
// Fan out to two accounts and one group simultaneously.
if err := service.PushFor(
aaplID,
marketdata.NewQuote().WithMark(mark),
[]param.AccountID{
param.NewAccountIDFromUint64(10),
param.NewAccountIDFromUint64(11),
},
[]param.AccountGroupID{groupID},
); err != nil {
panic(err)
}
// Read back for account 10 under AccountOnly - hits the per-account bucket.
quote, ok := service.GetOptional(
aaplID,
param.NewAccountIDFromUint64(10),
noGroupInfo{},
marketdata.QuoteResolutionAccountOnly,
).Get()
if !ok {
panic("quote must be present for account 10")
}
gotMark, _ := quote.Mark().Get()
if !gotMark.Equal(mark) {
panic("unexpected mark for account 10")
}Python
group_id = openpit.param.AccountGroupId.from_int(7)
# Fan out to two accounts and one group simultaneously.
service.push_for(
aapl_id,
openpit.marketdata.Quote(mark="150"),
[
openpit.param.AccountId.from_int(10),
openpit.param.AccountId.from_int(11),
],
[group_id],
)
# Read back for account 10 under AccountOnly - hits the per-account bucket.
account_info = types.SimpleNamespace(account_group=None)
quote = service.get(
aapl_id,
openpit.param.AccountId.from_int(10),
account_info,
openpit.marketdata.QuoteResolution.ACCOUNT_ONLY,
)
assert quote.mark == openpit.param.Price("150")Rust
use openpit::param::{AccountGroupId, AccountId, Asset, Price};
use openpit::{Engine, Instrument, Quote, QuoteResolution, QuoteTtl};
let service = Engine::builder::<(), (), ()>().no_sync().market_data(QuoteTtl::Infinite).build();
let aapl_id = service.register(Instrument::new(Asset::new("AAPL")?, Asset::new("USD")?))?;
let group_id = AccountGroupId::from_u32(7)?;
// Fan out to two accounts and one group simultaneously.
service.push_for(
aapl_id,
Quote::new().with_mark(Price::from_str("150")?),
&[AccountId::from_u64(10), AccountId::from_u64(11)],
&[group_id],
)?;
// Read back for account 10 under AccountOnly - hits the per-account bucket.
let quote = service
.get(
aapl_id,
AccountId::from_u64(10),
&None::<AccountGroupId>,
QuoteResolution::AccountOnly,
)
.expect("quote must be present");
assert_eq!(quote.mark, Some(Price::from_str("150")?));push is the right call for a complete top-of-book snapshot where every field is
authoritative. push patch is for incremental feeds where each message updates
only some fields - for example a mark-price tick that should not disturb the last
known bid and ask. The same replace/patch distinction applies to push for
and push for patch.
Go
service, err := NewEngineBuilder().
FullSync().
MarketData(marketdata.InfiniteTTL()).
Build()
if err != nil {
panic(err)
}
defer service.Close()
aapl, _ := param.NewAsset("AAPL")
usd, _ := param.NewAsset("USD")
aaplID, err := service.Register(param.NewInstrument(aapl, usd))
if err != nil {
panic(err)
}
mark, _ := param.NewPriceFromString("100")
bid, _ := param.NewPriceFromString("99")
ask, _ := param.NewPriceFromString("101")
if err := service.Push(
aaplID,
marketdata.NewQuote().WithMark(mark).WithBid(bid).WithAsk(ask),
); err != nil {
panic(err)
}
// Patch only the mark; bid and ask are preserved.
newMark, _ := param.NewPriceFromString("105")
if err := service.PushPatch(
aaplID,
marketdata.NewQuote().WithMark(newMark),
); err != nil {
panic(err)
}
accountID := param.NewAccountIDFromUint64(1)
quote, ok := service.GetOptional(
aaplID,
accountID,
noGroupInfo{},
marketdata.QuoteResolutionAccountThenGroupThenDefault,
).Get()
if !ok {
panic("quote must be present")
}
gotMark, _ := quote.Mark().Get()
if !gotMark.Equal(newMark) {
panic("unexpected mark after patch")
}
gotBid, _ := quote.Bid().Get()
if !gotBid.Equal(bid) {
panic("bid must be preserved after patch")
}
gotAsk, _ := quote.Ask().Get()
if !gotAsk.Equal(ask) {
panic("ask must be preserved after patch")
}Python
import types
import openpit
import openpit.marketdata
service = (
openpit.Engine.builder()
.no_sync()
.market_data(openpit.marketdata.QuoteTtl.infinite())
.build()
)
aapl_id = service.register(openpit.Instrument("AAPL", "USD"))
service.push(
aapl_id,
openpit.marketdata.Quote(mark="100", bid="99", ask="101"),
)
# Patch only the mark; bid and ask are preserved.
service.push_patch(aapl_id, openpit.marketdata.Quote(mark="105"))
account_id = openpit.param.AccountId.from_int(1)
account_info = types.SimpleNamespace(account_group=None)
quote = service.get(
aapl_id,
account_id,
account_info,
openpit.marketdata.QuoteResolution.ACCOUNT_THEN_GROUP_THEN_DEFAULT,
)
assert quote.mark == openpit.param.Price("105")
assert quote.bid == openpit.param.Price("99")
assert quote.ask == openpit.param.Price("101")Rust
use openpit::param::{AccountId, AccountGroupId, Asset, Price};
use openpit::{Engine, Instrument, Quote, QuoteResolution, QuoteTtl};
let service = Engine::builder::<(), (), ()>().no_sync().market_data(QuoteTtl::Infinite).build();
let aapl_id = service.register(Instrument::new(Asset::new("AAPL")?, Asset::new("USD")?))?;
service.push(
aapl_id,
Quote::new()
.with_mark(Price::from_str("100")?)
.with_bid(Price::from_str("99")?)
.with_ask(Price::from_str("101")?),
)?;
// Patch only the mark; bid and ask are preserved.
service.push_patch(aapl_id, Quote::new().with_mark(Price::from_str("105")?))?;
let account = AccountId::from_u64(1);
let quote = service
.get(
aapl_id,
account,
&None::<AccountGroupId>,
QuoteResolution::AccountThenGroupThenDefault,
)
.expect("quote must be present");
assert_eq!(quote.mark, Some(Price::from_str("105")?));
assert_eq!(quote.bid, Some(Price::from_str("99")?));
assert_eq!(quote.ask, Some(Price::from_str("101")?));clear hides the current quote for an instrument across all three buckets without
unregistering it. A subsequent read reports the quote as absent, just as if it
had never been pushed, while the instrument keeps its identifier. Pushing again
restores visibility. Clearing an unknown instrument is a no-op.
Go
service, err := NewEngineBuilder().
FullSync().
MarketData(marketdata.InfiniteTTL()).
Build()
if err != nil {
panic(err)
}
defer service.Close()
aapl, _ := param.NewAsset("AAPL")
usd, _ := param.NewAsset("USD")
aaplID, err := service.Register(param.NewInstrument(aapl, usd))
if err != nil {
panic(err)
}
mark, _ := param.NewPriceFromString("200")
if err := service.Push(
aaplID,
marketdata.NewQuote().WithMark(mark),
); err != nil {
panic(err)
}
// Clear hides the quote but keeps the instrument registered.
service.Clear(aaplID)
accountID := param.NewAccountIDFromUint64(1)
if _, ok := service.GetOptional(
aaplID, accountID, noGroupInfo{},
marketdata.QuoteResolutionAccountThenGroupThenDefault,
).Get(); ok {
panic("quote must be absent after clear")
}
// Pushing again restores a quote for the same id.
recovered, _ := param.NewPriceFromString("210")
if err := service.Push(
aaplID,
marketdata.NewQuote().WithMark(recovered),
); err != nil {
panic(err)
}
quote, ok := service.GetOptional(
aaplID, accountID, noGroupInfo{},
marketdata.QuoteResolutionAccountThenGroupThenDefault,
).Get()
if !ok {
panic("quote must be present after recovery push")
}
if got, _ := quote.Mark().Get(); !got.Equal(recovered) {
panic("unexpected mark after recovery push")
}Python
import types
import openpit
import openpit.marketdata
service = (
openpit.Engine.builder()
.no_sync()
.market_data(openpit.marketdata.QuoteTtl.infinite())
.build()
)
aapl_id = service.register(openpit.Instrument("AAPL", "USD"))
account_id = openpit.param.AccountId.from_int(1)
account_info = types.SimpleNamespace(account_group=None)
service.push(aapl_id, openpit.marketdata.Quote(mark="200"))
# clear hides the quote but keeps the instrument registered.
service.clear(aapl_id)
assert (
service.get_optional(
aapl_id,
account_id,
account_info,
openpit.marketdata.QuoteResolution.ACCOUNT_THEN_GROUP_THEN_DEFAULT,
)
is None
)
# Pushing again restores a quote for the same id.
service.push(aapl_id, openpit.marketdata.Quote(mark="210"))
assert (
service.get_optional(
aapl_id,
account_id,
account_info,
openpit.marketdata.QuoteResolution.ACCOUNT_THEN_GROUP_THEN_DEFAULT,
)
is not None
)Rust
use openpit::param::{AccountId, AccountGroupId, Asset, Price};
use openpit::{Engine, Instrument, Quote, QuoteResolution, QuoteTtl};
let service = Engine::builder::<(), (), ()>().no_sync().market_data(QuoteTtl::Infinite).build();
let aapl_id = service.register(Instrument::new(Asset::new("AAPL")?, Asset::new("USD")?))?;
let account = AccountId::from_u64(1);
let read = |id| {
service
.get(
id,
account,
&None::<AccountGroupId>,
QuoteResolution::AccountThenGroupThenDefault,
)
.ok()
};
service.push(aapl_id, Quote::new().with_mark(Price::from_str("200")?))?;
assert!(read(aapl_id).is_some());
// clear hides the quote but keeps the instrument registered.
service.clear(aapl_id);
assert!(read(aapl_id).is_none());
// Pushing again restores a quote for the same id.
service.push(aapl_id, Quote::new().with_mark(Price::from_str("210")?))?;
let quote = read(aapl_id).expect("quote must be present");
assert_eq!(quote.mark, Some(Price::from_str("210")?));- Market Data TTL - quote freshness and the eight-tier TTL cascade.
- Market Data Pricing - market-order pricing from the quote cache.
- Spot Funds - uses market data to price market orders.
-
Account Groups - account group identifiers; membership drives
account then groupandaccount then group then defaultresolution and the TTL cascade group tiers. - Policies - the full built-in policy catalog.
-
Domain Types - the
priceandinstrumentvalue types used here.