Skip to main content

Orderbook Streaming

Real-time orderbook data from ob.polynode.dev. Three levels of abstraction: raw stream, local state manager, or the fully managed engine.

Raw Stream

For full control over message processing:
use polynode::{PolyNodeClient, ObStreamOptions};
use polynode::types::orderbook::ObMessage;

#[tokio::main]
async fn main() -> polynode::Result<()> {
    let client = PolyNodeClient::new("pn_live_test_session_tracking_51eca107e9b347b589f5b0a04f98eb1d")?;

    let mut stream = client.orderbook_stream(ObStreamOptions::default()).await?;

    // Subscribe to specific tokens
    stream.subscribe(vec![
        "51037625779056581606819614184446816710505006861008496087735536016411882582167".into(),
    ]).await?;

    while let Some(msg) = stream.next().await {
        match msg {
            Ok(ObMessage::Update(update)) => {
                println!("{:?}", update);
            }
            Ok(ObMessage::Subscribed { markets }) => {
                println!("tracking {} markets", markets);
            }
            Ok(ObMessage::SnapshotsDone { total }) => {
                println!("all {} snapshots loaded", total);
            }
            Err(e) => eprintln!("error: {}", e),
            _ => {}
        }
    }

    Ok(())
}

Local Orderbook State

Apply snapshots and deltas to maintain a sorted local copy of the book:
use polynode::LocalOrderbook;
use polynode::types::orderbook::{ObMessage, OrderbookUpdate};

# async fn example(stream: &mut polynode::ObStream) -> polynode::Result<()> {
let mut book = LocalOrderbook::new();

while let Some(msg) = stream.next().await {
    if let Ok(ObMessage::Update(update)) = msg {
        match &update {
            OrderbookUpdate::Snapshot(snap) => book.apply_snapshot(snap),
            OrderbookUpdate::Update(delta) => book.apply_update(delta),
            OrderbookUpdate::PriceChange(_) => {}
        }
    }

    let token = "21742633...";
    if let Some(bid) = book.best_bid(token) {
        println!("best bid: {} x {}", bid.price, bid.size);
    }
    if let Some(ask) = book.best_ask(token) {
        println!("best ask: {} x {}", ask.price, ask.size);
    }
    if let Some(spread) = book.spread(token) {
        println!("spread: {:.4}", spread);
    }
}
# Ok(())
# }

Orderbook Engine

The highest-level abstraction. One shared WebSocket, automatic state management, and filtered views for different consumers:
use polynode::{OrderbookEngine, EngineOptions};

#[tokio::main]
async fn main() -> polynode::Result<()> {
    let engine = OrderbookEngine::connect(
        "pn_live_test_session_tracking_51eca107e9b347b589f5b0a04f98eb1d",
        EngineOptions::default(),
    ).await?;

    // Subscribe to tokens
    engine.subscribe(vec![
        "token_a".into(),
        "token_b".into(),
    ]).await?;

    // Query the shared state directly
    if let Some(mid) = engine.midpoint("token_a").await {
        println!("midpoint: {:.4}", mid);
    }
    if let Some(spread) = engine.spread("token_a").await {
        println!("spread: {:.4}", spread);
    }
    if let Some((bids, asks)) = engine.book("token_a").await {
        println!("bids: {}, asks: {}", bids.len(), asks.len());
    }

    // Create a filtered view for a subset of tokens
    let mut view = engine.view(vec!["token_a".into()]);
    while let Some(update) = view.next().await {
        if let Some(mid) = view.midpoint("token_a").await {
            println!("token_a midpoint: {:.4}", mid);
        }
    }

    engine.close().await?;
    Ok(())
}