A WebSocket Client for Trading Systems
Overview
In modern trading systems, real-time communication is crucial for maintaining market data feeds, executing trades, and managing orders. Our WebSocket client implementation addresses these needs through a carefully designed architecture that combines Python's asyncio capabilities with Protocol Buffers for efficient message handling. This design ensures reliable, type-safe, and maintainable code that can handle the demands of production trading environments.
Core Architecture
At the heart of our implementation lies a type system that defines clear interfaces for handling different kinds of messages. We use type aliases to create distinct handlers for responses, events, and exit conditions. This typing system not only provides better code clarity but also enables powerful IDE support and catches potential errors at compile time rather than runtime.
from typing import TypeAlias, Callable, AsyncIterator
from dataclasses import dataclass
# Core type definitions
ResponseHandler: TypeAlias = Callable[["WebsocketClientV2", Response], None]
EventHandler: TypeAlias = Callable[["WebsocketClientV2", ChannelMessage], None]
ExitHandler: TypeAlias = Callable[["WebsocketClientV2"], None]
@dataclass
class ConnectionConfig:
"""Configuration for WebSocket connection."""
ws_url: str
api_token: str
ping_interval: int = 20
ping_timeout: int = 20
The message parsing system utilizes Protocol Buffers, offering a robust and efficient way to handle binary data. This choice provides schema validation, type safety, and excellent performance characteristics. Our implementation includes comprehensive error handling and logging, ensuring that even when things go wrong, we have the necessary information for debugging and recovery.
class WebsocketClient:
def parse_response(self, data: bytes) -> Response | None:
"""Parse binary WebSocket messages into Response objects."""
try:
response = Response()
response.ParseFromString(data)
# Validate message has required fields
if (response.HasField("error") or
response.HasField("auth") or
response.HasField("subscription") or
response.HasField("order") or
response.HasField("response")):
return response
return None
except Exception:
logger.debug("Failed to parse as Response", exc_info=True)
return None
async def process_message(self, message: bytes) -> None:
"""Process incoming WebSocket messages."""
if response := self.parse_response(message):
if self.response_handler:
self.response_handler(self, response)
return
if channel_msg := self.parse_channel_message(message):
if self.event_handler:
self.event_handler(self, channel_msg)
Connection Management
Connection stability is paramount in trading systems. Our client implements sophisticated connection management with configurable keepalive mechanisms, authentication handling, and state tracking. The connection layer handles everything from initial handshakes to periodic health checks, ensuring the connection remains stable and responsive under various network conditions.
class WebsocketClient:
async def connect(self) -> None:
"""Establish WebSocket connection with configured parameters."""
self.headers = {
"Authorization": f"Bearer {self.api_token}",
"User-Agent": f"WebsocketClient/{self.version}"
}
self.websocket = await websockets.connect(
self.ws_url,
extra_headers=self.headers,
ping_interval=self.config.ping_interval,
ping_timeout=self.config.ping_timeout
)
async def reconnect(self) -> None:
"""Handle reconnection with exponential backoff."""
retry_delay = 1.0
while not self.websocket or self.websocket.closed:
try:
await self.connect()
logger.info("Reconnected successfully")
break
except Exception:
logger.error("Reconnection failed", exc_info=True)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60) # Max 60 second delay
Event Processing
The event processing system operates on an asynchronous loop that efficiently handles incoming messages without blocking. This design allows for high throughput while maintaining responsiveness. The system includes dedicated handlers for different message types, enabling clean separation of concerns and making it easy to implement various trading strategies.
class WebsocketClient:
async def run(self) -> None:
"""Main event loop for processing messages."""
while True:
try:
if not self.websocket or self.websocket.closed:
await self.reconnect()
continue
message = await self.websocket.recv()
await self.process_message(message)
except websockets.ConnectionClosed:
logger.warning("Connection closed, attempting reconnect")
await self.reconnect()
except Exception:
logger.error("Fatal error in message processing", exc_info=True)
if self.exit_handler:
self.exit_handler(self)
raise
Conclusion
Our WebSocket client implementation represents a thoughtful balance between robustness and maintainability. It provides the features needed for production trading systems while remaining clean and understandable. The combination of strong typing, comprehensive testing, and careful error handling makes it a reliable foundation for building complex trading applications. Whether handling market data feeds or managing orders, this implementation provides the tools needed for successful trading system integration.
Comments