Skip to main content

Overview

The Limitless WebSocket API provides real-time market prices, orderbook updates, and position notifications. This guide covers connecting with Python using python-socketio with asyncio.

Prerequisites

1

Install dependencies

Install the required packages:
pip install "python-socketio[asyncio]" eth-account==0.10.0 requests
Use a virtual environment to isolate dependencies. Run python -m venv venv then source venv/bin/activate (Unix) or venv\Scripts\activate (Windows) before installing.

Authentication Modes

Connect without authentication to receive public market data only:
  • newPriceData (AMM market prices)
  • orderbookUpdate (CLOB orderbook)
  • system (system notifications)
No API key required. Use this for read-only market monitoring.
Never commit API keys to version control or expose them in client-side code. Use environment variables (e.g. API_KEY) and load them at runtime.
Cookie-based session authentication is deprecated. Use API key authentication via the X-API-Key header.

Connection Details

SettingValue
URLwss://ws.limitless.exchange
Namespace/markets
TransportWebSocket only (no polling)
import asyncio
import os
import socketio

sio = socketio.AsyncClient(logger=os.environ.get("DEBUG") == "1")

@sio.event(namespace="/markets")
async def connect():
    print("Connected to /markets")

async def main():
    await sio.connect(
        "wss://ws.limitless.exchange",
        transports=["websocket"],
        namespaces=["/markets"],
    )
    await sio.wait()

asyncio.run(main())

Subscribing to Market Prices

Emit subscribe_market_prices with market identifiers. Subscriptions replace previous ones, so include all markets you want in a single call.
# Market addresses are Ethereum contract addresses in hex format
await sio.emit(
    "subscribe_market_prices",
    {"marketAddresses": ["0x1234567890abcdef1234567890abcdef12345678"]},
    namespace="/markets",
)
Subscriptions replace previous ones. If you want both AMM prices and CLOB orderbook, send both marketAddresses and marketSlugs together in a single subscribe_market_prices call.

Subscribing to Positions

Position updates require authentication. Emit subscribe_positions after connecting:
# Requires X-API-Key header during connect
await sio.emit("subscribe_positions", {}, namespace="/markets")

Handling Events

Register handlers for each event type. All handlers use namespace="/markets".
EventAuth RequiredDescription
newPriceDataNoAMM market price update
orderbookUpdateNoCLOB orderbook update
positionsYesPosition balance update
systemNoSystem notifications
authenticatedYesAuthentication confirmation
exceptionNoError notifications

Event Payloads

newPriceData — AMM market price update:
{
  "marketAddress": "0x1234...",
  "updatedPrices": { "yes": "0.65", "no": "0.35" },
  "blockNumber": 12345678,
  "timestamp": "2024-01-01T00:00:00.000Z"
}
positions — Position balance update (auth required):
{
  "account": "0xabcd...",
  "marketAddress": "0x1234...",
  "positions": [
    { "tokenId": "123456", "balance": "1000000", "outcomeIndex": 0 }
  ],
  "type": "AMM"
}
system — System notification:
{
  "message": "Notification text",
  "markets": ["0x1234..."]
}

Complete Async Python Client

import asyncio
import logging
import os
from typing import Optional

import socketio

logging.basicConfig(
    level=logging.DEBUG if os.environ.get("DEBUG") == "1" else logging.INFO
)
logger = logging.getLogger(__name__)

WS_URL = "wss://ws.limitless.exchange"
NAMESPACE = "/markets"


class LimitlessWebSocketClient:
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("API_KEY")
        self.sio = socketio.AsyncClient(
            reconnection=True,
            reconnection_attempts=0,
            logger=os.environ.get("DEBUG") == "1",
        )
        self._market_addresses: list[str] = []
        self._market_slugs: list[str] = []
        self._subscribe_positions = False
        self._setup_handlers()

    def _setup_handlers(self) -> None:
        @self.sio.event(namespace=NAMESPACE)
        async def connect():
            logger.info("Connected to %s", NAMESPACE)
            await self._resubscribe()

        @self.sio.event(namespace=NAMESPACE)
        async def disconnect():
            logger.info("Disconnected from %s", NAMESPACE)

        @self.sio.event(namespace=NAMESPACE)
        async def newPriceData(data: dict):
            logger.info("Price update: %s", data)

        @self.sio.event(namespace=NAMESPACE)
        async def orderbookUpdate(data: dict):
            logger.info("Orderbook update: %s", data)

        @self.sio.event(namespace=NAMESPACE)
        async def positions(data: dict):
            logger.info("Positions: %s", data)

        @self.sio.event(namespace=NAMESPACE)
        async def system(data: dict):
            logger.info("System: %s", data)

        @self.sio.event(namespace=NAMESPACE)
        async def authenticated(data: dict):
            logger.info("Authenticated: %s", data)

        @self.sio.event(namespace=NAMESPACE)
        async def exception(data: dict):
            logger.error("Exception: %s", data)

    async def _resubscribe(self) -> None:
        if self._market_addresses or self._market_slugs:
            payload = {}
            if self._market_addresses:
                payload["marketAddresses"] = self._market_addresses
            if self._market_slugs:
                payload["marketSlugs"] = self._market_slugs
            await self.sio.emit(
                "subscribe_market_prices",
                payload,
                namespace=NAMESPACE,
            )
        if self._subscribe_positions:
            await self.sio.emit("subscribe_positions", {}, namespace=NAMESPACE)

    async def subscribe_market_prices(
        self,
        market_addresses: Optional[list[str]] = None,
        market_slugs: Optional[list[str]] = None,
    ) -> None:
        self._market_addresses = market_addresses or []
        self._market_slugs = market_slugs or []
        await self._resubscribe()

    async def subscribe_positions(self) -> None:
        if not self.api_key:
            raise ValueError("API key required for position subscription")
        self._subscribe_positions = True
        await self._resubscribe()

    async def connect(self) -> None:
        kwargs = {
            "transports": ["websocket"],
            "namespaces": [NAMESPACE],
        }
        if self.api_key:
            kwargs["headers"] = {"X-API-Key": self.api_key}

        await self.sio.connect(WS_URL, **kwargs)

    async def disconnect(self) -> None:
        await self.sio.disconnect()

    async def run(self) -> None:
        await self.connect()
        await self.sio.wait()

Usage Examples

import asyncio
from limitless_ws_client import LimitlessWebSocketClient

async def main():
    client = LimitlessWebSocketClient()
    await client.connect()

    await client.subscribe_market_prices(
        market_addresses=["0x1234567890abcdef1234567890abcdef12345678"],
        market_slugs=["btc-100k-weekly"],
    )

    await client.sio.wait()

asyncio.run(main())

Environment Variables

VariableDescription
API_KEYAPI key for authenticated features (positions). Omit for public-only.
DEBUGSet to 1 to enable verbose socket.io logging.

Auto-Reconnection

The client uses reconnection=True by default. On reconnect, call _resubscribe() in the connect handler to restore market and position subscriptions, since the server does not persist them across disconnects.
Store your API key securely. Rotate keys if exposure is suspected. Do not log or print API keys.