WebSocket Streaming
Subscribe to real-time events with 3-5 second pre-confirmation lead time on settlements.Connect and Subscribe
use polynode::{PolyNodeClient, ws::{StreamOptions, Subscription, SubscriptionType}};
use polynode::ws_messages::WsMessage;
#[tokio::main]
async fn main() -> polynode::Result<()> {
let client = PolyNodeClient::new("pn_live_test_session_tracking_51eca107e9b347b589f5b0a04f98eb1d")?;
let mut stream = client.stream(StreamOptions {
compress: true,
auto_reconnect: true,
..Default::default()
}).await?;
// Subscribe to settlements with a minimum size filter
let sub = Subscription::new(SubscriptionType::Settlements)
.min_size(100.0)
.status("pending")
.snapshot_count(20);
stream.subscribe(sub).await?;
// Read events
while let Some(msg) = stream.next().await {
match msg {
Ok(WsMessage::Event(event)) => {
println!("{:?}", event);
}
Ok(WsMessage::Snapshot(events)) => {
println!("snapshot: {} events", events.len());
}
Ok(WsMessage::Subscribed { subscription_id, .. }) => {
println!("subscribed: {}", subscription_id);
}
Ok(WsMessage::PriceFeed(feed)) => {
// Chainlink price feed update
println!("{:?}", feed);
}
Ok(WsMessage::Heartbeat { ts }) => {
// Connection alive
}
Err(e) => eprintln!("error: {}", e),
_ => {}
}
}
Ok(())
}
Subscription Types
use polynode::ws::{Subscription, SubscriptionType};
# fn example() {
Subscription::new(SubscriptionType::Settlements); // pending + confirmed settlements
Subscription::new(SubscriptionType::Trades); // all trade activity
Subscription::new(SubscriptionType::Prices); // price-moving events
Subscription::new(SubscriptionType::Blocks); // new Polygon blocks
Subscription::new(SubscriptionType::Wallets); // all wallet activity
Subscription::new(SubscriptionType::Markets); // all market activity
Subscription::new(SubscriptionType::LargeTrades); // $1K+ trades
Subscription::new(SubscriptionType::Oracle); // UMA resolution events
Subscription::new(SubscriptionType::Chainlink); // real-time price feeds
Subscription::new(SubscriptionType::Global); // everything
# }
Subscription Filters
All filters from the Subscriptions & Filters page are supported via builder methods:use polynode::ws::{Subscription, SubscriptionType};
# fn example() {
let sub = Subscription::new(SubscriptionType::Settlements)
.wallets(vec!["0xabc...".into()]) // filter by wallet
.tokens(vec!["21742633...".into()]) // filter by token ID
.slugs(vec!["bitcoin-100k".into()]) // filter by market slug
.condition_ids(vec!["0xabc...".into()]) // filter by condition ID
.side("BUY") // BUY or SELL
.status("pending") // pending, confirmed, or all
.min_size(100.0) // minimum USD size
.max_size(10000.0) // maximum USD size
.event_types(vec!["settlement".into()]) // override event types
.snapshot_count(50) // initial snapshot (max 200)
.feeds(vec!["BTC/USD".into()]); // chainlink feeds
# }
Multiple Subscriptions
Subscriptions stack on the same connection. Events are deduplicated server-side:# async fn example(stream: &polynode::ws::WsStream) -> polynode::Result<()> {
use polynode::ws::{Subscription, SubscriptionType};
// Whale trades
stream.subscribe(
Subscription::new(SubscriptionType::LargeTrades).min_size(5000.0)
).await?;
// Specific wallet activity
stream.subscribe(
Subscription::new(SubscriptionType::Wallets)
.wallets(vec!["0xabc...".into()])
).await?;
# Ok(())
# }
Auto-Reconnect
Enabled by default. The SDK reconnects with exponential backoff and replays all active subscriptions automatically:use polynode::ws::StreamOptions;
use std::time::Duration;
# fn example() {
let options = StreamOptions {
compress: true,
auto_reconnect: true,
max_reconnect_attempts: None, // unlimited by default
initial_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(30),
};
# }
Cleanup
# async fn example(stream: polynode::ws::WsStream) -> polynode::Result<()> {
// Unsubscribe from a specific subscription
stream.unsubscribe(Some("sub_id_here".into())).await?;
// Unsubscribe from all
stream.unsubscribe(None).await?;
// Close the connection
stream.close().await?;
# Ok(())
# }

