Orderbook Streaming
Real-time orderbook data fromob.polynode.dev. Three levels of abstraction: raw stream, local state manager, or the fully managed engine.
Raw Stream
For full control over message processing:use polynode::{PolyNodeClient, ObStreamOptions};
use polynode::types::orderbook::ObMessage;
#[tokio::main]
async fn main() -> polynode::Result<()> {
let client = PolyNodeClient::new("pn_live_test_session_tracking_51eca107e9b347b589f5b0a04f98eb1d")?;
let mut stream = client.orderbook_stream(ObStreamOptions::default()).await?;
// Subscribe to specific tokens
stream.subscribe(vec![
"51037625779056581606819614184446816710505006861008496087735536016411882582167".into(),
]).await?;
while let Some(msg) = stream.next().await {
match msg {
Ok(ObMessage::Update(update)) => {
println!("{:?}", update);
}
Ok(ObMessage::Subscribed { markets }) => {
println!("tracking {} markets", markets);
}
Ok(ObMessage::SnapshotsDone { total }) => {
println!("all {} snapshots loaded", total);
}
Err(e) => eprintln!("error: {}", e),
_ => {}
}
}
Ok(())
}
Local Orderbook State
Apply snapshots and deltas to maintain a sorted local copy of the book:use polynode::LocalOrderbook;
use polynode::types::orderbook::{ObMessage, OrderbookUpdate};
# async fn example(stream: &mut polynode::ObStream) -> polynode::Result<()> {
let mut book = LocalOrderbook::new();
while let Some(msg) = stream.next().await {
if let Ok(ObMessage::Update(update)) = msg {
match &update {
OrderbookUpdate::Snapshot(snap) => book.apply_snapshot(snap),
OrderbookUpdate::Update(delta) => book.apply_update(delta),
OrderbookUpdate::PriceChange(_) => {}
}
}
let token = "21742633...";
if let Some(bid) = book.best_bid(token) {
println!("best bid: {} x {}", bid.price, bid.size);
}
if let Some(ask) = book.best_ask(token) {
println!("best ask: {} x {}", ask.price, ask.size);
}
if let Some(spread) = book.spread(token) {
println!("spread: {:.4}", spread);
}
}
# Ok(())
# }
Orderbook Engine
The highest-level abstraction. One shared WebSocket, automatic state management, and filtered views for different consumers:use polynode::{OrderbookEngine, EngineOptions};
#[tokio::main]
async fn main() -> polynode::Result<()> {
let engine = OrderbookEngine::connect(
"pn_live_test_session_tracking_51eca107e9b347b589f5b0a04f98eb1d",
EngineOptions::default(),
).await?;
// Subscribe to tokens
engine.subscribe(vec![
"token_a".into(),
"token_b".into(),
]).await?;
// Query the shared state directly
if let Some(mid) = engine.midpoint("token_a").await {
println!("midpoint: {:.4}", mid);
}
if let Some(spread) = engine.spread("token_a").await {
println!("spread: {:.4}", spread);
}
if let Some((bids, asks)) = engine.book("token_a").await {
println!("bids: {}, asks: {}", bids.len(), asks.len());
}
// Create a filtered view for a subset of tokens
let mut view = engine.view(vec!["token_a".into()]);
while let Some(update) = view.next().await {
if let Some(mid) = view.midpoint("token_a").await {
println!("token_a midpoint: {:.4}", mid);
}
}
engine.close().await?;
Ok(())
}

