Skip to main content

Install

Add to your Cargo.toml:
[dependencies]
polynode = "0.4"
tokio = { version = "1", features = ["rt", "macros"] }

# For local cache (optional):
# polynode = { version = "0.4", features = ["cache"] }

Quick Start

use polynode::PolyNodeClient;

#[tokio::main]
async fn main() -> polynode::Result<()> {
    let client = PolyNodeClient::new("pn_live_...")?;

    // Fetch top markets
    let markets = client.markets(Some(10)).await?;
    println!("{} markets, {} total", markets.count, markets.total);

    // Search
    let results = client.search("bitcoin", Some(5), None).await?;
    for r in &results.results {
        println!("{}", r.question.as_deref().unwrap_or("?"));
    }

    Ok(())
}

REST Methods

All endpoints are async methods on PolyNodeClient:
// System
client.healthz().await?;
client.status().await?;
client.create_key(Some("my-bot")).await?;

// Markets
client.markets(Some(10)).await?;
client.market("token_id").await?;
client.market_by_slug("bitcoin-100k").await?;
client.market_by_condition("0xabc...").await?;
client.list_markets(&ListMarketsParams {
    count: Some(20),
    sort: Some("volume".into()),
    ..Default::default()
}).await?;
client.search("ethereum", Some(5), None).await?;

// Pricing
client.candles("token_id", Some(CandleResolution::OneHour), Some(100)).await?;
client.stats("token_id").await?;

// Settlements
client.recent_settlements(Some(20)).await?;
client.token_settlements("token_id", Some(10)).await?;
client.wallet_settlements("0xabc...", Some(10)).await?;

// Wallets
client.wallet("0xabc...").await?;

// RPC (rpc.polynode.dev)
client.rpc_call("eth_blockNumber", serde_json::json!([])).await?;

WebSocket Streaming

Open a stream and subscribe with the builder pattern:
use polynode::ws::{Subscription, SubscriptionType, StreamOptions};
use polynode::WsMessage;

let mut stream = client.stream(StreamOptions {
    compress: true,
    auto_reconnect: true,
    ..Default::default()
}).await?;

stream.subscribe(
    Subscription::new(SubscriptionType::Settlements)
        .min_size(100.0)
        .status("pending")
        .snapshot_count(20)
).await?;

Consuming Events

while let Some(msg) = stream.next().await {
    match msg? {
        WsMessage::Event(event) => {
            match event {
                polynode::PolyNodeEvent::Settlement(s) => {
                    println!("{} ${:.2} on {}",
                        s.taker_side, s.taker_size,
                        s.market_title.as_deref().unwrap_or("unknown"));
                }
                polynode::PolyNodeEvent::StatusUpdate(u) => {
                    println!("Confirmed in {}ms", u.latency_ms);
                }
                _ => {}
            }
        }
        WsMessage::Snapshot(events) => {
            println!("Snapshot: {} events", events.len());
        }
        WsMessage::Subscribed { subscription_id, .. } => {
            println!("Subscribed: {}", subscription_id);
        }
        WsMessage::Heartbeat { ts } => {
            // Server heartbeat every 30s
        }
        WsMessage::Error { message, .. } => {
            eprintln!("Error: {}", message);
        }
        _ => {}
    }
}

Subscription Filters

All filters from the Subscriptions & Filters page are supported:
Subscription::new(SubscriptionType::Settlements)
    .wallets(vec!["0xabc...".into()])
    .tokens(vec!["21742633...".into()])
    .slugs(vec!["bitcoin-100k".into()])
    .condition_ids(vec!["0xabc...".into()])
    .side("BUY")
    .status("pending")
    .min_size(100.0)
    .max_size(10000.0)
    .event_types(vec!["settlement".into()])
    .snapshot_count(50)
    .feeds(vec!["BTC/USD".into()])

Subscription Types

SubscriptionType::Settlements   // pending + confirmed settlements
SubscriptionType::Trades        // all trade activity
SubscriptionType::Prices        // price-moving events
SubscriptionType::Blocks        // new Polygon blocks
SubscriptionType::Wallets       // all wallet activity
SubscriptionType::Markets       // all market activity
SubscriptionType::LargeTrades   // $1K+ trades
SubscriptionType::Oracle        // UMA resolution events
SubscriptionType::Chainlink     // real-time price feeds

Multiple Subscriptions

Subscriptions stack on the same connection:
stream.subscribe(
    Subscription::new(SubscriptionType::LargeTrades)
        .min_size(5000.0)
).await?;

stream.subscribe(
    Subscription::new(SubscriptionType::Wallets)
        .wallets(vec!["0xabc...".into()])
).await?;

// Both active, events deduplicated

Compression

Zlib compression is enabled by default for all WebSocket connections (~50% bandwidth savings). Binary frames are transparently decompressed with flate2.
// To disable (not recommended):
let mut stream = client.stream(StreamOptions {
    compress: false,
    ..Default::default()
}).await?;

Auto-Reconnect

Enabled by default with exponential backoff:
let mut stream = client.stream(StreamOptions {
    auto_reconnect: true,              // default: true
    max_reconnect_attempts: None,      // None = unlimited
    initial_backoff: Duration::from_secs(1),
    max_backoff: Duration::from_secs(30),
    ..Default::default()
}).await?;
On reconnect, all active subscriptions are replayed automatically.

Cleanup

stream.unsubscribe(Some("sub_id".into())).await?;  // remove one
stream.unsubscribe(None).await?;                     // remove all
stream.close().await?;                                // close connection

Local Cache

Store trades and positions in a local SQLite database. Enable the cache feature:
polynode = { version = "0.4", features = ["cache"] }
use polynode::{PolyNodeClient, cache::{PolyNodeCache, QueryOptions, EntityType}};
use std::sync::Arc;

let client = Arc::new(PolyNodeClient::new("pn_live_...")?);
let mut cache = PolyNodeCache::builder(client)
    .db_path("./cache.db")
    .watchlist_path("./polynode.watch.json")
    .on_backfill_progress(|p| println!("{}: {} trades ({})", p.label, p.fetched, p.status))
    .build()?;

cache.start().await?;

// Query locally — instant, no API calls
let trades = cache.wallet_trades("0xabc...", &QueryOptions { limit: Some(50), ..Default::default() })?;
let positions = cache.wallet_positions("0xabc...")?;
let stats = cache.stats()?;
println!("{} trades, {} positions", stats.trade_count, positions.len());

// Add wallets at runtime
cache.add_to_watchlist(&[(EntityType::Wallet, "0xnew...".into(), "whale".into(), true)])?;

cache.stop().await?;
See Local Cache for full documentation, watchlist configuration, backfill timing, and all query methods.

Configuration

use std::time::Duration;

let client = PolyNodeClient::builder("pn_live_...")
    .base_url("https://api.polynode.dev")
    .ws_url("wss://ws.polynode.dev/ws")
    .rpc_url("https://rpc.polynode.dev")
    .timeout(Duration::from_secs(10))
    .build()?;

Error Handling

use polynode::{PolyNodeClient, Error};

match client.market("invalid-id").await {
    Ok(detail) => println!("{:?}", detail),
    Err(Error::NotFound(msg)) => println!("Not found: {}", msg),
    Err(Error::Auth(msg)) => println!("Auth failed: {}", msg),
    Err(Error::RateLimited(msg)) => println!("Rate limited: {}", msg),
    Err(e) => println!("Other error: {}", e),
}

Orderbook Streaming

The SDK includes a dedicated orderbook stream for real-time book data from ob.polynode.dev. This is a separate WebSocket connection from the event stream, with its own URL, protocol, and message format.

Connect and Subscribe

use polynode::{PolyNodeClient, ObStreamOptions, ObMessage, OrderbookUpdate};

let client = PolyNodeClient::new("pn_live_...")?;
let mut stream = client.orderbook_stream(ObStreamOptions::default()).await?;

stream.subscribe(vec![
    "token_id_1".into(),
    "token_id_2".into(),
]).await?;

Consuming Messages

Batches are flattened automatically. next() yields individual ObMessage items:
while let Some(msg) = stream.next().await {
    match msg? {
        ObMessage::Update(OrderbookUpdate::Snapshot(snap)) => {
            println!("{}: {} bids, {} asks",
                snap.asset_id, snap.bids.len(), snap.asks.len());
        }
        ObMessage::Update(OrderbookUpdate::Update(delta)) => {
            // Incremental delta. A level with size "0" means removal.
            println!("{}: {} bid changes", delta.asset_id, delta.bids.len());
        }
        ObMessage::Update(OrderbookUpdate::PriceChange(change)) => {
            for asset in &change.assets {
                println!("{} {}: {}", change.market, asset.outcome, asset.price);
            }
        }
        ObMessage::SnapshotsDone { total } => {
            println!("All {} snapshots received", total);
        }
        ObMessage::Subscribed { markets } => {
            println!("Subscribed to {} markets", markets);
        }
        ObMessage::Error { error, message } => {
            eprintln!("Error ({}): {}", error, message);
        }
        _ => {}
    }
}

LocalOrderbook Helper

Maintain a sorted local copy of the book:
use polynode::LocalOrderbook;

let mut book = LocalOrderbook::new();

// Apply updates as they arrive
match update {
    OrderbookUpdate::Snapshot(snap) => book.apply_snapshot(&snap),
    OrderbookUpdate::Update(delta) => book.apply_update(&delta),
    _ => {}
}

// Query state
if let Some((bids, asks)) = book.get_book("token_id") {
    println!("{} bids, {} asks", bids.len(), asks.len());
}
let best_bid = book.best_bid("token_id");
let best_ask = book.best_ask("token_id");
let spread = book.spread("token_id");

Options

use std::time::Duration;

let mut stream = client.orderbook_stream(ObStreamOptions {
    compress: true,
    auto_reconnect: true,
    max_reconnect_attempts: None,       // None = unlimited
    initial_backoff: Duration::from_secs(1),
    max_backoff: Duration::from_secs(30),
}).await?;

Custom URL

let client = PolyNodeClient::builder("pn_live_...")
    .ob_url("wss://ob.polynode.dev/ws")
    .build()?;

Cleanup

stream.unsubscribe().await?;  // unsubscribe from all markets
stream.close().await?;        // close connection

OrderbookEngine

The OrderbookEngine is a higher-level wrapper around the orderbook stream. It manages one WebSocket connection, maintains local state for all subscribed tokens, and lets you create filtered views that only deliver updates for specific token subsets.

Create and Subscribe

use polynode::orderbook::engine::{OrderbookEngine, EngineOptions};

let engine = OrderbookEngine::connect("pn_live_...", EngineOptions {
    compress: true,
    ..Default::default()
}).await?;

// Subscribe with token IDs, slugs, or condition IDs
engine.subscribe(vec![
    "114694726451307654528948558967898493662917070661203465131156925998487819889437".into(),
    "66255671088804707681511323064315150986307471908131081808279119719218775249892".into(),
]).await?;

// Wait for snapshots to load
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
println!("{} books loaded", engine.len().await);

Query State

The engine exposes computed helpers that read from the shared local orderbook. All queries are async because they acquire a read lock on the shared state:
engine.midpoint(token_id).await;   // Option<f64> — (best_bid + best_ask) / 2
engine.spread(token_id).await;     // Option<f64> — best_ask - best_bid
engine.best_bid(token_id).await;   // Option<OrderbookLevel>
engine.best_ask(token_id).await;   // Option<OrderbookLevel>
engine.book(token_id).await;       // Option<(Vec<OrderbookLevel>, Vec<OrderbookLevel>)>

Filtered Views

Create views that only receive updates for specific tokens. No extra connections — views filter the shared stream.
let mut trade_view = engine.view(vec![token_a.into(), token_b.into()]);

// Receive filtered updates via async channel
while let Some(update) = trade_view.next().await {
    match update {
        OrderbookUpdate::Snapshot(snap) => {
            println!("{}: {} bids", snap.asset_id, snap.bids.len());
        }
        OrderbookUpdate::Update(delta) => {
            println!("{}: {} changes", delta.asset_id, delta.bids.len());
        }
        _ => {}
    }
}
Views have the same query helpers as the engine:
trade_view.midpoint(token_a).await;
trade_view.spread(token_a).await;
trade_view.book(token_a).await;

Swap Tokens

Change which tokens a view tracks at runtime:
trade_view.set_tokens(vec![new_token_x.into(), new_token_y.into()]).await;

Cleanup

engine.close().await?;

Event Types

All event types are fully typed with serde:
use polynode::{
    PolyNodeEvent,         // tagged enum of all events
    SettlementEvent,
    TradeEvent,
    StatusUpdateEvent,
    BlockEvent,
    PositionChangeEvent,
    DepositEvent,
    PositionSplitEvent,
    PositionMergeEvent,
    OracleEvent,
    PriceFeedEvent,        // Chainlink (separate from PolyNodeEvent)

    // Orderbook types
    OrderbookLevel,
    BookSnapshot,
    BookUpdate,
    PriceChange,
    OrderbookUpdate,       // tagged enum of snapshot | update | price_change
    ObMessage,             // what the stream yields
};

Source

crates.io | GitHub