Skip to main content

WebSocket Streaming

Subscribe to real-time events with a builder pattern. WebSocket is async-only (Python convention):
import asyncio
from polynode import AsyncPolyNode

async def main():
    async with AsyncPolyNode(api_key="pn_live_...") as pn:
        sub = await pn.ws.subscribe("settlements") \
            .min_size(100) \
            .status("pending") \
            .snapshot_count(20) \
            .send()

        async for event in sub:
            print(f"{event.taker_side} ${event.taker_size:.0f} on {event.market_title}")
            print(f"  status: {event.status}, tx: {event.tx_hash[:20]}...")

asyncio.run(main())

Event Callbacks

sub.on("settlement", lambda e: print(f"{e.taker_side} ${e.taker_size} on {e.market_title}"))
sub.on("status_update", lambda e: print(f"Confirmed in {e.latency_ms}ms"))
sub.on("*", lambda e: print(e.event_type))  # catch-all

Subscription Filters

All filters from the Subscriptions & Filters page are supported:
(
    pn.ws.subscribe("settlements")
    .wallets(["0xabc..."])          # by wallet
    .tokens(["21742633..."])        # by token ID
    .slugs(["bitcoin-100k"])        # by market slug
    .condition_ids(["0xabc..."])    # by condition ID
    .side("BUY")                    # BUY or SELL
    .status("pending")              # pending, confirmed, or all
    .min_size(100)                  # min USD size
    .max_size(10000)                # max USD size
    .event_types(["settlement"])    # override event types
    .snapshot_count(50)             # initial snapshot (max 200)
    .feeds(["BTC/USD"])             # chainlink feeds
    .send()
)

Subscription Types

pn.ws.subscribe("settlements")   # pending + confirmed settlements
pn.ws.subscribe("trades")        # all trade activity
pn.ws.subscribe("prices")        # price-moving events
pn.ws.subscribe("blocks")        # new Polygon blocks
pn.ws.subscribe("wallets")       # all wallet activity
pn.ws.subscribe("markets")       # all market activity
pn.ws.subscribe("large_trades")  # $1K+ trades
pn.ws.subscribe("oracle")        # UMA resolution events
pn.ws.subscribe("chainlink")     # real-time price feeds

Multiple Subscriptions

Subscriptions stack on the same connection:
whales = await pn.ws.subscribe("large_trades").min_size(5000).send()
my_wallet = await pn.ws.subscribe("wallets").wallets(["0xabc..."]).send()

# Both active simultaneously, events deduplicated

Context Manager

async with await pn.ws.subscribe("settlements").send() as sub:
    async for event in sub:
        print(event.market_title, event.taker_price)
        break
# Auto-unsubscribes on exit

Compression

Zlib compression is enabled by default for all WebSocket connections (~50% bandwidth savings). No configuration needed.

Auto-Reconnect

Enabled by default. The SDK reconnects with exponential backoff and replays all active subscriptions:
from polynode.ws import PolyNodeWS
from polynode.types.ws import WsOptions

ws = PolyNodeWS("pn_live_...", "wss://ws.polynode.dev/ws", WsOptions(
    compress=True,
    auto_reconnect=True,
    max_reconnect_attempts=0,       # 0 = unlimited
    reconnect_base_delay=1.0,       # seconds
    reconnect_max_delay=30.0,       # seconds
))

ws.on_connect(lambda: print("connected"))
ws.on_disconnect(lambda reason: print(f"disconnected: {reason}"))
ws.on_reconnect(lambda attempt: print(f"reconnected, attempt {attempt}"))
ws.on_error(lambda err: print(f"error: {err}"))

Cleanup

sub.unsubscribe()        # remove one subscription
pn.ws.unsubscribe_all()  # remove all
pn.ws.disconnect()       # close connection