Skip to main content

Install

pip install polynode
Requires Python 3.10+. For trading support (order placement on Polymarket):
pip install polynode[trading]

Quick Start

from polynode import PolyNode

pn = PolyNode(api_key="pn_live_...")

# Fetch top markets
markets = pn.markets(count=10)
print(f"{markets.count} markets, {markets.total} total")

# Search
results = pn.search("bitcoin")
print(results.results[0].question)

pn.close()

Context Manager

with PolyNode(api_key="pn_live_...") as pn:
    status = pn.status()
    print(f"Tracking {status.state.market_count} markets")

Async Client

Every method is available in both sync and async variants:
import asyncio
from polynode import AsyncPolyNode

async def main():
    async with AsyncPolyNode(api_key="pn_live_...") as pn:
        status = await pn.status()
        markets = await pn.markets(count=3)
        print(f"{status.state.market_count} markets, {status.ws_subscribers} ws subs")

asyncio.run(main())

REST Methods

Every REST endpoint has a typed method on the PolyNode client. All return Pydantic models with full IDE autocomplete.
# System
pn.healthz()                           # "ok"
pn.status()                            # StatusResponse
pn.create_key("my-bot")               # ApiKeyResponse

# Markets
pn.markets(count=10)                   # MarketsResponse
pn.market(token_id)                    # dict
pn.market_by_slug("bitcoin-100k")     # dict
pn.market_by_condition(condition_id)   # dict
pn.markets_list(count=20, sort="volume")  # MarketsListResponse
pn.search("ethereum", limit=5)        # SearchResponse

# Pricing
pn.candles(token_id, resolution="1h", limit=100)  # CandlesResponse
pn.stats(token_id)                     # dict

# Settlements
pn.recent_settlements(count=20)        # SettlementsResponse
pn.token_settlements(token_id, count=10)
pn.wallet_settlements(address, count=10)

# Wallets
pn.wallet(address)                     # WalletResponse
pn.wallet_trades(address, limit=50)    # dict
pn.wallet_positions(address, limit=50) # dict
pn.wallet_onchain_positions(address)   # dict

# Orderbook (REST)
pn.orderbook_rest(token_id)           # OrderbookResponse
pn.midpoint(token_id)                 # MidpointResponse
pn.spread(token_id)                   # SpreadResponse

# Enriched Data (1 req/sec rate limit)
pn.leaderboard(period="weekly", sort="profit")  # LeaderboardResponse
pn.trending()                          # TrendingResponse
pn.activity()                          # ActivityResponse
pn.movers()                           # MoversResponse
pn.trader_profile("0xabc...")         # TraderProfile
pn.trader_pnl("0xabc...", period="1W")  # TraderPnlResponse
pn.event("how-many-fed-rate-cuts-2026")  # EventDetailResponse
pn.search_events("recession", limit=5)  # EventSearchResponse
pn.markets_by_category("crypto")      # MarketsListResponse

# RPC (rpc.polynode.dev)
pn.rpc("eth_blockNumber")
pn.rpc("eth_getBlockByNumber", ["latest", False])

Example: Market Data

with PolyNode(api_key="pn_live_...") as pn:
    # Top 3 markets by volume
    markets = pn.markets(count=3)
    for m in markets.markets:
        print(f"{m.question} — ${m.volume_24h:,.0f} vol")

    # OHLCV candles
    candles = pn.candles(markets.markets[0].token_id, resolution="1h", limit=3)
    for c in candles.candles:
        print(f"  O={c.open} H={c.high} L={c.low} C={c.close} V={c.volume:.0f}")

Example: Wallet Activity

with PolyNode(api_key="pn_live_...") as pn:
    wallet = pn.wallet("0xB27BC932bf8110D8F78e55da7d5f0497A18B5b82")
    a = wallet.activity
    print(f"Trades: {a.trade_count}, Volume: ${a.trade_volume_usd:,.0f}")

    profile = pn.trader_profile(wallet.wallet)
    print(f"{profile.pseudonym}: PnL ${profile.totalPnl:,.0f}")

WebSocket Streaming

Subscribe to real-time events with a builder pattern. WebSocket is async-only (Python convention):
import asyncio
from polynode import AsyncPolyNode

async def main():
    async with AsyncPolyNode(api_key="pn_live_...") as pn:
        sub = await pn.ws.subscribe("settlements") \
            .min_size(100) \
            .status("pending") \
            .snapshot_count(20) \
            .send()

        async for event in sub:
            print(f"{event.taker_side} ${event.taker_size:.0f} on {event.market_title}")
            print(f"  status: {event.status}, tx: {event.tx_hash[:20]}...")

asyncio.run(main())

Event Callbacks

sub.on("settlement", lambda e: print(f"{e.taker_side} ${e.taker_size} on {e.market_title}"))
sub.on("status_update", lambda e: print(f"Confirmed in {e.latency_ms}ms"))
sub.on("*", lambda e: print(e.event_type))  # catch-all

Subscription Filters

All filters from the Subscriptions & Filters page are supported:
(
    pn.ws.subscribe("settlements")
    .wallets(["0xabc..."])          # by wallet
    .tokens(["21742633..."])        # by token ID
    .slugs(["bitcoin-100k"])        # by market slug
    .condition_ids(["0xabc..."])    # by condition ID
    .side("BUY")                    # BUY or SELL
    .status("pending")              # pending, confirmed, or all
    .min_size(100)                  # min USD size
    .max_size(10000)                # max USD size
    .event_types(["settlement"])    # override event types
    .snapshot_count(50)             # initial snapshot (max 200)
    .feeds(["BTC/USD"])             # chainlink feeds
    .send()
)

Subscription Types

pn.ws.subscribe("settlements")   # pending + confirmed settlements
pn.ws.subscribe("trades")        # all trade activity
pn.ws.subscribe("prices")        # price-moving events
pn.ws.subscribe("blocks")        # new Polygon blocks
pn.ws.subscribe("wallets")       # all wallet activity
pn.ws.subscribe("markets")       # all market activity
pn.ws.subscribe("large_trades")  # $1K+ trades
pn.ws.subscribe("oracle")        # UMA resolution events
pn.ws.subscribe("chainlink")     # real-time price feeds

Multiple Subscriptions

Subscriptions stack on the same connection:
whales = await pn.ws.subscribe("large_trades").min_size(5000).send()
my_wallet = await pn.ws.subscribe("wallets").wallets(["0xabc..."]).send()

# Both active simultaneously, events deduplicated

Context Manager

async with await pn.ws.subscribe("settlements").send() as sub:
    async for event in sub:
        print(event.market_title, event.taker_price)
        break
# Auto-unsubscribes on exit

Compression

Zlib compression is enabled by default for all WebSocket connections (~50% bandwidth savings). No configuration needed.

Auto-Reconnect

Enabled by default. The SDK reconnects with exponential backoff and replays all active subscriptions:
from polynode.ws import PolyNodeWS
from polynode.types.ws import WsOptions

ws = PolyNodeWS("pn_live_...", "wss://ws.polynode.dev/ws", WsOptions(
    compress=True,
    auto_reconnect=True,
    max_reconnect_attempts=0,       # 0 = unlimited
    reconnect_base_delay=1.0,       # seconds
    reconnect_max_delay=30.0,       # seconds
))

ws.on_connect(lambda: print("connected"))
ws.on_disconnect(lambda reason: print(f"disconnected: {reason}"))
ws.on_reconnect(lambda attempt: print(f"reconnected, attempt {attempt}"))
ws.on_error(lambda err: print(f"error: {err}"))

Cleanup

sub.unsubscribe()        # remove one subscription
pn.ws.unsubscribe_all()  # remove all
pn.ws.disconnect()       # close connection

Orderbook Streaming

The SDK includes a dedicated orderbook client for real-time book data from ob.polynode.dev. This is a separate WebSocket connection from the event stream.

Subscribe

# Lazy-initialized, connects on first subscribe
await pn.orderbook.subscribe(["token_id_1", "token_id_2"])

Event Handlers

pn.orderbook.on("snapshot", lambda snap: print(f"{snap.asset_id}: {len(snap.bids)} bids"))
pn.orderbook.on("update", lambda delta: print(f"{delta.asset_id} updated"))
pn.orderbook.on("price", lambda c: print(f"price: {c.assets[0].price}"))
pn.orderbook.on("snapshots_done", lambda msg: print(f"All {msg.total} snapshots received"))
pn.orderbook.on("*", lambda u: print(u.type))  # catch-all

LocalOrderbook

Maintain a sorted local copy of the book:
from polynode import LocalOrderbook

book = LocalOrderbook()

# Wire to orderbook WS events
pn.orderbook.on("snapshot", lambda snap: book.apply_snapshot(snap))
pn.orderbook.on("update", lambda delta: book.apply_update(delta))

# Query state
full_book = book.get_book(token_id)    # (bids, asks) or None
best_bid = book.get_best_bid(token_id)  # OrderbookLevel or None
best_ask = book.get_best_ask(token_id)  # OrderbookLevel or None
spread = book.get_spread(token_id)      # float or None

Cleanup

pn.orderbook.unsubscribe()    # unsubscribe from all markets
pn.orderbook.disconnect()     # close connection

OrderbookEngine

Higher-level wrapper that manages one connection, maintains local state, and routes updates to filtered views.

Create and Subscribe

from polynode import OrderbookEngine

engine = OrderbookEngine(api_key="pn_live_...")
await engine.subscribe([token_a, token_b, token_c])

engine.on("ready", lambda: print(f"{engine.size} books loaded"))

Query State

engine.midpoint(token_id)    # float | None
engine.spread(token_id)      # float | None
engine.best_bid(token_id)    # OrderbookLevel | None
engine.best_ask(token_id)    # OrderbookLevel | None
engine.book(token_id)        # (bids, asks) | None

Filtered Views

Create lightweight views that only receive updates for specific tokens:
trade_view = engine.view([token_a, token_b])
portfolio_view = engine.view(my_position_tokens)

trade_view.on("update", lambda u: print(f"{u.asset_id} changed"))
trade_view.on("price", lambda c: print(f"price: {c.assets}"))

trade_view.midpoint(token_a)
trade_view.spread(token_a)

# Swap to different tokens
trade_view.set_tokens([new_token_x, new_token_y])

# Or destroy entirely
trade_view.destroy()

Cleanup

engine.close()  # disconnects WS, destroys all views, clears state

Trading

Place orders on Polymarket with local credential custody and builder attribution. Requires the trading extras:
pip install polynode[trading]

Generate a Wallet

import asyncio
from polynode.trading import PolyNodeTrader

async def main():
    wallet = await PolyNodeTrader.generate_wallet()
    print(f"Address: {wallet.address}")
    print(f"Private key: {wallet.private_key}")
    # BACK UP THE PRIVATE KEY — it cannot be recovered

asyncio.run(main())

One-Call Onboarding

from polynode.trading import PolyNodeTrader, TraderConfig

trader = PolyNodeTrader(TraderConfig(polynode_key="pn_live_..."))

# Auto-detects wallet type, deploys Safe if needed, sets approvals, creates CLOB credentials
status = await trader.ensure_ready("0xYourPrivateKey...")
print(f"Wallet: {status.wallet}")
print(f"Funder: {status.funder_address}")
print(f"Type: {status.signature_type.name}")  # POLY_GNOSIS_SAFE
print(f"Actions: {status.actions}")

Place Orders

from polynode.trading import OrderParams

result = await trader.order(OrderParams(
    token_id="51037625779056581606819614184446816710505006861008496087735536016411882582167",
    side="BUY",
    price=0.55,
    size=100,
))
print(f"Success: {result.success}, Order ID: {result.order_id}")

Cancel Orders

# Cancel one
cancel = await trader.cancel_order("order-id-here")

# Cancel all
cancel = await trader.cancel_all()

# Cancel all in a market
cancel = await trader.cancel_all(market="condition-id")

Open Orders

orders = await trader.get_open_orders()
for o in orders:
    print(f"{o.side} {o.original_size} @ {o.price}{o.status}")

Pre-Trade Checks

# Check token approvals
approvals = await trader.check_approvals()
print(f"All approved: {approvals.all_approved}")

# Check balances
balance = await trader.check_balance()
print(f"USDC: {balance.usdc}, MATIC: {balance.matic}")

Wallet Management

# Link existing credentials (no signing needed)
trader.link_credentials(
    wallet="0x...",
    api_key="...",
    api_secret="...",
    api_passphrase="...",
)

# Export for backup
exported = trader.export_wallet()
# Import on another machine
trader.import_wallet(exported)

# List all linked wallets
wallets = trader.get_linked_wallets()

Address Derivation

from polynode.trading import derive_safe_address, derive_proxy_address

eoa = "0xacd89cFCB82Ae1f843467D56b58796bb928C9E1A"
safe = derive_safe_address(eoa)    # 0xf75564cEe0ed847463E18D20B5005c9db245c374
proxy = derive_proxy_address(eoa)  # 0x65FA7F1aFB344A3F78a62D662798A32d850738c8

Configuration

from polynode.trading import TraderConfig, SignatureType

config = TraderConfig(
    polynode_key="pn_live_...",                          # for builder attribution
    db_path="./polynode-trading.db",                     # local SQLite storage
    cosigner_url="https://trade.polynode.dev",           # co-signer proxy
    fallback_direct=True,                                 # direct CLOB if co-signer down
    default_signature_type=SignatureType.POLY_GNOSIS_SAFE,  # Safe (2), Proxy (1), or EOA (0)
    rpc_url="https://polygon-bor-rpc.publicnode.com",    # for on-chain reads
)

trader = PolyNodeTrader(config)

Signature Types

TypeValueDescription
SignatureType.EOA0Direct EOA signing (user pays gas for approvals)
SignatureType.POLY_PROXY1Legacy Polymarket proxy wallet
SignatureType.POLY_GNOSIS_SAFE2Gnosis Safe (default, gasless onboarding)

Privy Signer (Server-Side Wallets)

Use Privy-managed wallets for headless server-side trading. No private key needed — signing is done through Privy’s wallet API:
from polynode.trading import PolyNodeTrader, TraderConfig
from polynode.trading.privy import PrivySigner, PrivyConfig

signer = PrivySigner(
    PrivyConfig(
        app_id="your-privy-app-id",
        app_secret="your-privy-app-secret",
        authorization_key="wallet-auth:your-authorization-key",
    ),
    wallet_id="your-privy-wallet-id",
    wallet_address="0xYourWalletAddress",
)

trader = PolyNodeTrader(TraderConfig(polynode_key="pn_live_..."))
status = await trader.ensure_ready(signer)
result = await trader.order(OrderParams(token_id="...", side="BUY", price=0.50, size=100))
The Privy signer implements the same RouterSigner interface and works with all trading methods (ensure_ready, order, cancel_all, etc.). Gnosis Safe wallets (type 2) are fully gasless.

Cleanup

trader.close()  # closes SQLite, clears active signer

Configuration

pn = PolyNode(
    api_key="pn_live_...",                       # required
    base_url="https://api.polynode.dev",         # default
    ws_url="wss://ws.polynode.dev/ws",           # default
    ob_url="wss://ob.polynode.dev/ws",           # default
    rpc_url="https://rpc.polynode.dev",          # default
    timeout=10.0,                                 # seconds, default 10
)

Error Handling

from polynode import PolyNode, ApiError, WsError, PolyNodeError

try:
    pn.market("invalid-id")
except ApiError as e:
    print(e.status)   # 404
    print(e.message)  # "Market not found"
except PolyNodeError as e:
    print(e.message)  # base error class

Pydantic Models

All event types are Pydantic v2 models with full IDE support:
from polynode.types.events import (
    SettlementEvent,
    TradeEvent,
    StatusUpdateEvent,
    BlockEvent,
    PositionChangeEvent,
    DepositEvent,
    PositionSplitEvent,
    PositionMergeEvent,
    OracleEvent,
    PriceFeedEvent,
    PolyNodeEvent,          # discriminated union of all events
)

from polynode.types.orderbook import (
    OrderbookLevel,
    BookSnapshot,
    BookUpdate,
    PriceChange,
)

from polynode.types.rest import (
    StatusResponse,
    MarketsResponse,
    MarketSummary,
    CandlesResponse,
    SettlementsResponse,
    WalletResponse,
    OrderbookResponse,
    LeaderboardResponse,
    TrendingResponse,
    TraderProfile,
)
The PolyNodeEvent union uses Pydantic’s discriminated union on event_type:
from pydantic import TypeAdapter

adapter = TypeAdapter(PolyNodeEvent)
event = adapter.validate_python({"event_type": "settlement", ...})
# Returns a SettlementEvent instance

Source

PyPI