Overview
The Limitless WebSocket API provides real-time market prices, orderbook updates, and position notifications. This guide covers connecting with Python using python-socketio with asyncio.
Prerequisites
Install dependencies
Install the required packages:
pip install "python-socketio[asyncio]" eth-account== 0.10.0 requests
Use a virtual environment to isolate dependencies. Run python -m venv venv then source venv/bin/activate (Unix) or venv\Scripts\activate (Windows) before installing.
Authentication Modes
Public (no auth)
Authenticated (API key)
Connect without authentication to receive public market data only:
newPriceData (AMM market prices)
orderbookUpdate (CLOB orderbook)
system (system notifications)
No API key required. Use this for read-only market monitoring. Pass the X-API-Key header during connection for authenticated features :
All public events
positions (position balance updates)
authenticated (auth confirmation)
Required for subscribe_positions. Generate keys at limitless.exchange → profile → Api keys.
Never commit API keys to version control or expose them in client-side code. Use environment variables (e.g. API_KEY) and load them at runtime.
Cookie-based session authentication is deprecated . Use API key authentication via the X-API-Key header.
Connection Details
Setting Value URL wss://ws.limitless.exchangeNamespace /marketsTransport WebSocket only (no polling)
Public (no auth)
Authenticated (API key)
import asyncio
import os
import socketio
sio = socketio.AsyncClient( logger = os.environ.get( "DEBUG" ) == "1" )
@sio.event ( namespace = "/markets" )
async def connect ():
print ( "Connected to /markets" )
async def main ():
await sio.connect(
"wss://ws.limitless.exchange" ,
transports = [ "websocket" ],
namespaces = [ "/markets" ],
)
await sio.wait()
asyncio.run(main())
Subscribing to Market Prices
Emit subscribe_market_prices with market identifiers. Subscriptions replace previous ones, so include all markets you want in a single call.
AMM markets (contract addresses)
CLOB markets (slugs)
Both AMM and CLOB
# Market addresses are Ethereum contract addresses in hex format
await sio.emit(
"subscribe_market_prices" ,
{ "marketAddresses" : [ "0x1234567890abcdef1234567890abcdef12345678" ]},
namespace = "/markets" ,
)
Subscriptions replace previous ones. If you want both AMM prices and CLOB orderbook, send both marketAddresses and marketSlugs together in a single subscribe_market_prices call.
Subscribing to Positions
Position updates require authentication. Emit subscribe_positions after connecting:
# Requires X-API-Key header during connect
await sio.emit( "subscribe_positions" , {}, namespace = "/markets" )
Handling Events
Register handlers for each event type. All handlers use namespace="/markets".
Event Auth Required Description newPriceDataNo AMM market price update orderbookUpdateNo CLOB orderbook update positionsYes Position balance update systemNo System notifications authenticatedYes Authentication confirmation exceptionNo Error notifications
Event Payloads
newPriceData — AMM market price update:
{
"marketAddress" : "0x1234..." ,
"updatedPrices" : { "yes" : "0.65" , "no" : "0.35" },
"blockNumber" : 12345678 ,
"timestamp" : "2024-01-01T00:00:00.000Z"
}
positions — Position balance update (auth required):
{
"account" : "0xabcd..." ,
"marketAddress" : "0x1234..." ,
"positions" : [
{ "tokenId" : "123456" , "balance" : "1000000" , "outcomeIndex" : 0 }
],
"type" : "AMM"
}
system — System notification:
{
"message" : "Notification text" ,
"markets" : [ "0x1234..." ]
}
Complete Async Python Client
import asyncio
import logging
import os
from typing import Optional
import socketio
logging.basicConfig(
level = logging. DEBUG if os.environ.get( "DEBUG" ) == "1" else logging. INFO
)
logger = logging.getLogger( __name__ )
WS_URL = "wss://ws.limitless.exchange"
NAMESPACE = "/markets"
class LimitlessWebSocketClient :
def __init__ ( self , api_key : Optional[ str ] = None ):
self .api_key = api_key or os.environ.get( "API_KEY" )
self .sio = socketio.AsyncClient(
reconnection = True ,
reconnection_attempts = 0 ,
logger = os.environ.get( "DEBUG" ) == "1" ,
)
self ._market_addresses: list[ str ] = []
self ._market_slugs: list[ str ] = []
self ._subscribe_positions = False
self ._setup_handlers()
def _setup_handlers ( self ) -> None :
@self.sio.event ( namespace = NAMESPACE )
async def connect ():
logger.info( "Connected to %s " , NAMESPACE )
await self ._resubscribe()
@self.sio.event ( namespace = NAMESPACE )
async def disconnect ():
logger.info( "Disconnected from %s " , NAMESPACE )
@self.sio.event ( namespace = NAMESPACE )
async def newPriceData ( data : dict ):
logger.info( "Price update: %s " , data)
@self.sio.event ( namespace = NAMESPACE )
async def orderbookUpdate ( data : dict ):
logger.info( "Orderbook update: %s " , data)
@self.sio.event ( namespace = NAMESPACE )
async def positions ( data : dict ):
logger.info( "Positions: %s " , data)
@self.sio.event ( namespace = NAMESPACE )
async def system ( data : dict ):
logger.info( "System: %s " , data)
@self.sio.event ( namespace = NAMESPACE )
async def authenticated ( data : dict ):
logger.info( "Authenticated: %s " , data)
@self.sio.event ( namespace = NAMESPACE )
async def exception ( data : dict ):
logger.error( "Exception: %s " , data)
async def _resubscribe ( self ) -> None :
if self ._market_addresses or self ._market_slugs:
payload = {}
if self ._market_addresses:
payload[ "marketAddresses" ] = self ._market_addresses
if self ._market_slugs:
payload[ "marketSlugs" ] = self ._market_slugs
await self .sio.emit(
"subscribe_market_prices" ,
payload,
namespace = NAMESPACE ,
)
if self ._subscribe_positions:
await self .sio.emit( "subscribe_positions" , {}, namespace = NAMESPACE )
async def subscribe_market_prices (
self ,
market_addresses : Optional[list[ str ]] = None ,
market_slugs : Optional[list[ str ]] = None ,
) -> None :
self ._market_addresses = market_addresses or []
self ._market_slugs = market_slugs or []
await self ._resubscribe()
async def subscribe_positions ( self ) -> None :
if not self .api_key:
raise ValueError ( "API key required for position subscription" )
self ._subscribe_positions = True
await self ._resubscribe()
async def connect ( self ) -> None :
kwargs = {
"transports" : [ "websocket" ],
"namespaces" : [ NAMESPACE ],
}
if self .api_key:
kwargs[ "headers" ] = { "X-API-Key" : self .api_key}
await self .sio.connect( WS_URL , ** kwargs)
async def disconnect ( self ) -> None :
await self .sio.disconnect()
async def run ( self ) -> None :
await self .connect()
await self .sio.wait()
Usage Examples
Public market data only
Authenticated with positions
import asyncio
from limitless_ws_client import LimitlessWebSocketClient
async def main ():
client = LimitlessWebSocketClient()
await client.connect()
await client.subscribe_market_prices(
market_addresses = [ "0x1234567890abcdef1234567890abcdef12345678" ],
market_slugs = [ "btc-100k-weekly" ],
)
await client.sio.wait()
asyncio.run(main())
Environment Variables
Variable Description API_KEYAPI key for authenticated features (positions). Omit for public-only. DEBUGSet to 1 to enable verbose socket.io logging.
Auto-Reconnection
The client uses reconnection=True by default. On reconnect, call _resubscribe() in the connect handler to restore market and position subscriptions, since the server does not persist them across disconnects.
Store your API key securely. Rotate keys if exposure is suspected. Do not log or print API keys.