Skip to main content

WebSocket Streaming

Subscribe to real-time events with 3-5 second pre-confirmation lead time on settlements.

Connect and Subscribe

use polynode::{PolyNodeClient, ws::{StreamOptions, Subscription, SubscriptionType}};
use polynode::ws_messages::WsMessage;

#[tokio::main]
async fn main() -> polynode::Result<()> {
    let client = PolyNodeClient::new("pn_live_test_session_tracking_51eca107e9b347b589f5b0a04f98eb1d")?;

    let mut stream = client.stream(StreamOptions {
        compress: true,
        auto_reconnect: true,
        ..Default::default()
    }).await?;

    // Subscribe to settlements with a minimum size filter
    let sub = Subscription::new(SubscriptionType::Settlements)
        .min_size(100.0)
        .status("pending")
        .snapshot_count(20);
    stream.subscribe(sub).await?;

    // Read events
    while let Some(msg) = stream.next().await {
        match msg {
            Ok(WsMessage::Event(event)) => {
                println!("{:?}", event);
            }
            Ok(WsMessage::Snapshot(events)) => {
                println!("snapshot: {} events", events.len());
            }
            Ok(WsMessage::Subscribed { subscription_id, .. }) => {
                println!("subscribed: {}", subscription_id);
            }
            Ok(WsMessage::PriceFeed(feed)) => {
                // Chainlink price feed update
                println!("{:?}", feed);
            }
            Ok(WsMessage::Heartbeat { ts }) => {
                // Connection alive
            }
            Err(e) => eprintln!("error: {}", e),
            _ => {}
        }
    }

    Ok(())
}

Subscription Types

use polynode::ws::{Subscription, SubscriptionType};

# fn example() {
Subscription::new(SubscriptionType::Settlements);   // pending + confirmed settlements
Subscription::new(SubscriptionType::Trades);         // all trade activity
Subscription::new(SubscriptionType::Prices);         // price-moving events
Subscription::new(SubscriptionType::Blocks);         // new Polygon blocks
Subscription::new(SubscriptionType::Wallets);        // all wallet activity
Subscription::new(SubscriptionType::Markets);        // all market activity
Subscription::new(SubscriptionType::LargeTrades);    // $1K+ trades
Subscription::new(SubscriptionType::Oracle);         // UMA resolution events
Subscription::new(SubscriptionType::Chainlink);      // real-time price feeds
Subscription::new(SubscriptionType::Global);         // everything
# }

Subscription Filters

All filters from the Subscriptions & Filters page are supported via builder methods:
use polynode::ws::{Subscription, SubscriptionType};

# fn example() {
let sub = Subscription::new(SubscriptionType::Settlements)
    .wallets(vec!["0xabc...".into()])          // filter by wallet
    .tokens(vec!["21742633...".into()])         // filter by token ID
    .slugs(vec!["bitcoin-100k".into()])         // filter by market slug
    .condition_ids(vec!["0xabc...".into()])     // filter by condition ID
    .side("BUY")                                // BUY or SELL
    .status("pending")                          // pending, confirmed, or all
    .min_size(100.0)                            // minimum USD size
    .max_size(10000.0)                          // maximum USD size
    .event_types(vec!["settlement".into()])     // override event types
    .snapshot_count(50)                         // initial snapshot (max 200)
    .feeds(vec!["BTC/USD".into()]);             // chainlink feeds
# }

Multiple Subscriptions

Subscriptions stack on the same connection. Events are deduplicated server-side:
# async fn example(stream: &polynode::ws::WsStream) -> polynode::Result<()> {
use polynode::ws::{Subscription, SubscriptionType};

// Whale trades
stream.subscribe(
    Subscription::new(SubscriptionType::LargeTrades).min_size(5000.0)
).await?;

// Specific wallet activity
stream.subscribe(
    Subscription::new(SubscriptionType::Wallets)
        .wallets(vec!["0xabc...".into()])
).await?;
# Ok(())
# }

Auto-Reconnect

Enabled by default. The SDK reconnects with exponential backoff and replays all active subscriptions automatically:
use polynode::ws::StreamOptions;
use std::time::Duration;

# fn example() {
let options = StreamOptions {
    compress: true,
    auto_reconnect: true,
    max_reconnect_attempts: None,          // unlimited by default
    initial_backoff: Duration::from_secs(1),
    max_backoff: Duration::from_secs(30),
};
# }

Cleanup

# async fn example(stream: polynode::ws::WsStream) -> polynode::Result<()> {
// Unsubscribe from a specific subscription
stream.unsubscribe(Some("sub_id_here".into())).await?;

// Unsubscribe from all
stream.unsubscribe(None).await?;

// Close the connection
stream.close().await?;
# Ok(())
# }