import grpc
import requests
from datetime import datetime
from typing import Optional
from connamara.ep3.v1beta1 import order_entry_pb2
from connamara.ep3.v1beta1 import order_entry_pb2_grpc
from connamara.ep3.orders.v1beta1 import orders_pb2
class PolymarketOrderStreamer:
def __init__(self, base_url: str = "https://rest.preprod.polymarketexchange.com",
grpc_server: str = "traderapi.us-east-1.privatelink.preprod.polymarketexchange.com:443"):
self.base_url = base_url
self.grpc_server = grpc_server
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.access_expiration: Optional[datetime] = None
self.session_id: Optional[str] = None
def login(self, username: str, password: str) -> dict:
"""Authenticate with the Polymarket API and store tokens."""
url = f"{self.base_url}/auth/v1beta1/login"
headers = {
"accept": "application/json",
"Content-Type": "application/json"
}
data = {
"username": username,
"password": password
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.refresh_token = token_data["refresh_token"]
self.access_expiration = datetime.fromisoformat(
token_data["access_expiration_time"].replace("Z", "+00:00")
)
return token_data
def stream_orders(self, symbols: list = None, accounts: list = None, snapshot_only: bool = False):
"""Stream order updates using gRPC."""
if not self.access_token:
raise ValueError("Not authenticated. Please login first.")
# Create credentials
credentials = grpc.ssl_channel_credentials()
# Create channel
channel = grpc.secure_channel(self.grpc_server, credentials)
# Create stub
stub = order_entry_pb2_grpc.OrderEntryAPIStub(channel)
# Create request
request = order_entry_pb2.CreateOrderSubscriptionRequest(
symbols=symbols or [],
accounts=accounts or [],
snapshot_only=snapshot_only
)
# Set up metadata with authorization
metadata = [
('authorization', self.access_token)
]
try:
print(f"Starting order stream")
print(f"Symbols: {symbols or 'ALL'}")
print(f"Accounts: {accounts or 'ALL'}")
print(f"Snapshot only: {snapshot_only}")
print("-" * 60)
# Start streaming
response_stream = stub.CreateOrderSubscription(request, metadata=metadata)
for response in response_stream:
self._process_order_response(response)
except grpc.RpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
raise
except KeyboardInterrupt:
print("\nStream interrupted by user")
finally:
channel.close()
def _process_order_response(self, response):
"""Process and display order response."""
# Capture session ID on first message
if response.session_id and not self.session_id:
self.session_id = response.session_id
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Session established")
print(f" Session ID: {self.session_id}")
print("-" * 60)
if response.HasField('heartbeat'):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Heartbeat received")
elif response.HasField('snapshot'):
snapshot = response.snapshot
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Order Snapshot")
print(f" Total orders: {len(snapshot.orders)}")
for order in snapshot.orders:
self._display_order(order)
print("-" * 60)
elif response.HasField('update'):
update = response.update
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Order Update")
# Display executions
if update.executions:
print(f" Executions: {len(update.executions)}")
for execution in update.executions:
self._display_execution(execution)
# Display cancel rejects
if update.HasField('cancel_reject'):
self._display_cancel_reject(update.cancel_reject)
print("-" * 60)
def _display_order(self, order):
"""Display order details."""
print(f" Order ID: {order.id}")
print(f" Client Order ID: {order.clord_id}")
print(f" Symbol: {order.symbol}")
print(f" Side: {orders_pb2.Side.Name(order.side)}")
print(f" Type: {orders_pb2.OrderType.Name(order.type)}")
print(f" State: {orders_pb2.OrderState.Name(order.state)}")
if order.price > 0:
price = order.price / order.price_scale
print(f" Price: ${price:.4f}")
print(f" Order Qty: {order.order_qty}")
print(f" Filled Qty: {order.cum_qty}")
print(f" Remaining Qty: {order.leaves_qty}")
if order.avg_px > 0:
avg_px = order.avg_px / order.price_scale
print(f" Avg Price: ${avg_px:.4f}")
if order.account:
print(f" Account: {order.account}")
print()
def _display_execution(self, execution):
"""Display execution details."""
print(f" Execution ID: {execution.id}")
print(f" Type: {orders_pb2.ExecutionType.Name(execution.type)}")
if execution.HasField('order'):
order = execution.order
print(f" Order ID: {order.id}")
print(f" Symbol: {order.symbol}")
print(f" Side: {orders_pb2.Side.Name(order.side)}")
print(f" State: {orders_pb2.OrderState.Name(order.state)}")
if execution.last_shares > 0:
print(f" Last Shares: {execution.last_shares}")
if execution.last_px > 0:
last_px = execution.last_px / execution.order.price_scale
print(f" Last Price: ${last_px:.4f}")
if execution.trade_id:
print(f" Trade ID: {execution.trade_id}")
if execution.text:
print(f" Text: {execution.text}")
if execution.order_reject_reason != orders_pb2.ORD_REJECT_REASON_UNDEFINED:
print(f" Reject Reason: {orders_pb2.OrdRejectReason.Name(execution.order_reject_reason)}")
print()
def _display_cancel_reject(self, cancel_reject):
"""Display cancel reject details."""
print(f" Cancel Reject:")
print(f" ID: {cancel_reject.id}")
print(f" Client Order ID: {cancel_reject.clord_id}")
print(f" Original Client Order ID: {cancel_reject.orig_clord_id}")
print(f" Response To: {orders_pb2.CxlRejResponseTo.Name(cancel_reject.response_to)}")
print(f" Reject Reason: {orders_pb2.CxlRejReason.Name(cancel_reject.reject_reason)}")
if cancel_reject.text:
print(f" Text: {cancel_reject.text}")
print()
# Usage
if __name__ == "__main__":
streamer = PolymarketOrderStreamer()
# Login
streamer.login("your_username", "your_password")
# Stream orders
streamer.stream_orders(
symbols=["mlb-ari-sf-2025-09-08"],
accounts=[]
)