Market Data Streaming
Subscribe to real-time market data updates including order book depth, instrument states, and trade statistics using Python and gRPC.
Service Definition
Service: connamara.ep3.v1beta1.MarketDataSubscriptionAPI
RPC: CreateMarketDataSubscription
Type: Server-side streaming
service MarketDataSubscriptionAPI {
rpc CreateMarketDataSubscription(CreateMarketDataSubscriptionRequest)
returns (stream CreateMarketDataSubscriptionResponse);
}
Request Parameters
CreateMarketDataSubscriptionRequest
| Field | Type | Required | Description |
|---|
symbols | list[str] | No | List of symbols to subscribe to. Empty list subscribes to all instruments. |
unaggregated | bool | No | If True, receive raw order book. If False (default), receive aggregated book by price level. |
depth | int | No | Number of price levels to include in order book. Default: 10 |
snapshot_only | bool | No | If True, receive only initial snapshot then close stream. If False (default), receive continuous updates. |
Example Request
from connamara.ep3.v1beta1 import market_data_pb2
# Subscribe to specific symbols
request = market_data_pb2.CreateMarketDataSubscriptionRequest(
symbols=["mlb-ari-sf-2025-09-08", "nba-lal-gsw-2025-01-20"],
unaggregated=False,
depth=10,
snapshot_only=False
)
# Subscribe to all symbols
request = market_data_pb2.CreateMarketDataSubscriptionRequest(
symbols=[], # Empty = all symbols
depth=20
)
Response Messages
The stream returns CreateMarketDataSubscriptionResponse messages with two possible event types:
1. Heartbeat Messages
Keep-alive messages to confirm connection is active.
if response.HasField('heartbeat'):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Heartbeat received")
If you stop receiving heartbeats, the connection may be stale. Consider reconnecting.
2. Market Data Updates
Real-time market data changes.
if response.HasField('update'):
update = response.update
print(f"Symbol: {update.symbol}")
print(f"State: {update.state}")
print(f"Bids: {len(update.bids)}")
print(f"Offers: {len(update.offers)}")
Update Message Structure
Fields
| Field | Type | Description |
|---|
symbol | str | Instrument symbol (e.g., “mlb-ari-sf-2025-09-08”) |
bids | list[BookEntry] | Bid side of order book (buy orders) |
offers | list[BookEntry] | Offer/ask side of order book (sell orders) |
state | InstrumentState | Current trading state of instrument |
stats | InstrumentStats | Market statistics (optional) |
transact_time | Timestamp | Server timestamp of update |
book_hidden | bool | If True, order book is hidden |
BookEntry Structure
Each price level in the order book contains:
| Field | Type | Description |
|---|
px | int64 | Price as integer (divide by price_scale) |
qty | int64 | Aggregate quantity at this price level |
symbol_sub_type | str | Symbol subtype (if applicable) |
Price Representation:
All prices are int64 values. Divide by the instrument’s price_scale to get the decimal value.price_scale varies by instrument. Query instrument metadata to get the correct value.px = bid.px / price_scale # Convert from price representation
print(f"${px:.4f}")
InstrumentStats Structure
Market statistics include:
| Field | Type | Description |
|---|
last_trade_px | int64 | Last trade price (÷ price_scale) |
open_px | int64 | Opening price (÷ price_scale) |
high_px | int64 | High price of session (÷ price_scale) |
low_px | int64 | Low price of session (÷ price_scale) |
close_px | int64 | Closing price (÷ price_scale) |
shares_traded | int64 | Total volume traded |
open_interest | int64 | Current open interest |
notional_traded | int64 | Total notional value traded |
Stats fields use protobuf oneof, so they may not always be present. Always check with HasField() before accessing.if update.HasField('stats') and update.stats.HasField('last_trade_px'):
last_px = update.stats.last_trade_px / price_scale
Instrument States
| State | Value | Description |
|---|
INSTRUMENT_STATE_CLOSED | 0 | Market closed, no trading allowed |
INSTRUMENT_STATE_OPEN | 1 | Active trading, continuous matching |
INSTRUMENT_STATE_PREOPEN | 2 | Orders accepted but no matching (opening auction) |
INSTRUMENT_STATE_SUSPENDED | 3 | Trading suspended, cancel-only |
INSTRUMENT_STATE_EXPIRED | 4 | Instrument expired, no new orders |
INSTRUMENT_STATE_TERMINATED | 5 | Instrument terminated, book closed |
INSTRUMENT_STATE_HALTED | 6 | Trading halted, no orders or cancels |
INSTRUMENT_STATE_MATCH_AND_CLOSE_AUCTION | 7 | Closing auction, will match upon state change |
# Get state name
state_name = market_data_pb2.InstrumentState.Name(update.state)
print(f"State: {state_name}")
Complete Example (from stream.py)
This example matches the implementation from the Python examples repository:
import grpc
import requests
from datetime import datetime
from typing import Optional
from connamara.ep3.v1beta1 import market_data_pb2
from connamara.ep3.v1beta1 import market_data_pb2_grpc
class PolymarketStreamer:
def __init__(self, base_url: str = "https://rest.preprod.polymarketexchange.com",
grpc_server: str = "traderapi.us-east-1.privatelink.preprod.polymarketexchange.com:443"):
self.base_url = base_url
self.grpc_server = grpc_server
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.access_expiration: Optional[datetime] = None
self.price_scales: dict = {} # symbol -> price_scale cache
def get_price_scale(self, symbol: str) -> int:
"""Get price_scale for symbol from cache. Populate via list_instruments API."""
# WARNING: Replace this with actual API lookup. Do not rely on default.
return self.price_scales.get(symbol, 1000)
def login(self, username: str, password: str) -> dict:
"""Authenticate with the Polymarket API and store tokens."""
url = f"{self.base_url}/auth/v1beta1/login"
headers = {
"accept": "application/json",
"Content-Type": "application/json"
}
data = {
"username": username,
"password": password
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.refresh_token = token_data["refresh_token"]
self.access_expiration = datetime.fromisoformat(
token_data["access_expiration_time"].replace("Z", "+00:00")
)
return token_data
def stream_market_data(self, symbols: list, unaggregated: bool = False,
depth: int = 10, snapshot_only: bool = False):
"""Stream market data for the given symbols using gRPC."""
if not self.access_token:
raise ValueError("Not authenticated. Please login first.")
# Create credentials
credentials = grpc.ssl_channel_credentials()
# Create channel
channel = grpc.secure_channel(self.grpc_server, credentials)
# Create stub
stub = market_data_pb2_grpc.MarketDataSubscriptionAPIStub(channel)
# Create request
request = market_data_pb2.CreateMarketDataSubscriptionRequest(
symbols=symbols,
unaggregated=unaggregated,
depth=depth,
snapshot_only=snapshot_only
)
# Set up metadata with authorization
metadata = [
('authorization', self.access_token)
]
try:
print(f"Starting market data stream for symbols: {symbols}")
print(f"Parameters: unaggregated={unaggregated}, depth={depth}, snapshot_only={snapshot_only}")
print("-" * 60)
# Start streaming
response_stream = stub.CreateMarketDataSubscription(request, metadata=metadata)
for response in response_stream:
self._process_market_data_response(response)
except grpc.RpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
raise
except KeyboardInterrupt:
print("\nStream interrupted by user")
finally:
channel.close()
def _process_market_data_response(self, response):
"""Process and display market data response."""
if response.HasField('heartbeat'):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Heartbeat received")
elif response.HasField('update'):
update = response.update
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Market Update for {update.symbol}")
# Display instrument state
state_name = market_data_pb2.InstrumentState.Name(update.state)
print(f" State: {state_name}")
# Get price_scale from instrument metadata (via list_instruments API)
price_scale = self.get_price_scale(update.symbol)
# Display order book
if update.bids:
print(" Bids:")
for i, bid in enumerate(update.bids[:5]): # Show top 5 bids
px = bid.px / price_scale # Convert from price representation
qty = bid.qty
print(f" [{i+1}] ${px:.4f} x {qty}")
if update.offers:
print(" Offers:")
for i, offer in enumerate(update.offers[:5]): # Show top 5 offers
px = offer.px / price_scale # Convert from price representation
qty = offer.qty
print(f" [{i+1}] ${px:.4f} x {qty}")
# Display stats if available
if update.HasField('stats'):
stats = update.stats
print(" Stats:")
if stats.HasField('last_trade_px'):
last_px = stats.last_trade_px / price_scale
print(f" Last Trade: ${last_px:.4f}")
if stats.HasField('open_px'):
open_px = stats.open_px / price_scale
print(f" Open: ${open_px:.4f}")
if stats.HasField('high_px'):
high_px = stats.high_px / price_scale
print(f" High: ${high_px:.4f}")
if stats.HasField('low_px'):
low_px = stats.low_px / price_scale
print(f" Low: ${low_px:.4f}")
if stats.HasField('shares_traded'):
print(f" Shares Traded: {stats.shares_traded}")
if stats.HasField('open_interest'):
print(f" Open Interest: {stats.open_interest}")
print("-" * 60)
# Usage
if __name__ == "__main__":
streamer = PolymarketStreamer()
# Login
streamer.login("your_username", "your_password")
# Stream market data
streamer.stream_market_data(
symbols=["mlb-ari-sf-2025-09-08"],
depth=10
)
Next Steps