Overview

In modern trading systems, real-time communication is crucial for maintaining market data feeds, executing trades, and managing orders. Our WebSocket client implementation addresses these needs through a carefully designed architecture that combines Python's asyncio capabilities with Protocol Buffers for efficient message handling. This design ensures reliable, type-safe, and maintainable code that can handle the demands of production trading environments.

Core Architecture

At the heart of our implementation lies a type system that defines clear interfaces for handling different kinds of messages. We use type aliases to create distinct handlers for responses, events, and exit conditions. This typing system not only provides better code clarity but also enables powerful IDE support and catches potential errors at compile time rather than runtime.


from typing import TypeAlias, Callable, AsyncIterator
from dataclasses import dataclass

# Core type definitions
ResponseHandler: TypeAlias = Callable[["WebsocketClientV2", Response], None]
EventHandler: TypeAlias = Callable[["WebsocketClientV2", ChannelMessage], None]
ExitHandler: TypeAlias = Callable[["WebsocketClientV2"], None]

@dataclass
class ConnectionConfig:
    """Configuration for WebSocket connection."""
    ws_url: str
    api_token: str
    ping_interval: int = 20
    ping_timeout: int = 20

The message parsing system utilizes Protocol Buffers, offering a robust and efficient way to handle binary data. This choice provides schema validation, type safety, and excellent performance characteristics. Our implementation includes comprehensive error handling and logging, ensuring that even when things go wrong, we have the necessary information for debugging and recovery.


class WebsocketClient:
    def parse_response(self, data: bytes) -> Response | None:
        """Parse binary WebSocket messages into Response objects."""
        try:
            response = Response()
            response.ParseFromString(data)
            
            # Validate message has required fields
            if (response.HasField("error") or 
                response.HasField("auth") or 
                response.HasField("subscription") or 
                response.HasField("order") or 
                response.HasField("response")):
                return response
            return None
        except Exception:
            logger.debug("Failed to parse as Response", exc_info=True)
            return None

    async def process_message(self, message: bytes) -> None:
        """Process incoming WebSocket messages."""
        if response := self.parse_response(message):
            if self.response_handler:
                self.response_handler(self, response)
            return
        
        if channel_msg := self.parse_channel_message(message):
            if self.event_handler:
                self.event_handler(self, channel_msg)

Connection Management

Connection stability is paramount in trading systems. Our client implements sophisticated connection management with configurable keepalive mechanisms, authentication handling, and state tracking. The connection layer handles everything from initial handshakes to periodic health checks, ensuring the connection remains stable and responsive under various network conditions.

class WebsocketClient:
    async def connect(self) -> None:
        """Establish WebSocket connection with configured parameters."""
        self.headers = {
            "Authorization": f"Bearer {self.api_token}",
            "User-Agent": f"WebsocketClient/{self.version}"
        }
        
        self.websocket = await websockets.connect(
            self.ws_url,
            extra_headers=self.headers,
            ping_interval=self.config.ping_interval,
            ping_timeout=self.config.ping_timeout
        )

    async def reconnect(self) -> None:
        """Handle reconnection with exponential backoff."""
        retry_delay = 1.0
        while not self.websocket or self.websocket.closed:
            try:
                await self.connect()
                logger.info("Reconnected successfully")
                break
            except Exception:
                logger.error("Reconnection failed", exc_info=True)
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, 60)  # Max 60 second delay

Event Processing

The event processing system operates on an asynchronous loop that efficiently handles incoming messages without blocking. This design allows for high throughput while maintaining responsiveness. The system includes dedicated handlers for different message types, enabling clean separation of concerns and making it easy to implement various trading strategies.

class WebsocketClient:
    async def run(self) -> None:
        """Main event loop for processing messages."""
        while True:
            try:
                if not self.websocket or self.websocket.closed:
                    await self.reconnect()
                    continue

                message = await self.websocket.recv()
                await self.process_message(message)
                
            except websockets.ConnectionClosed:
                logger.warning("Connection closed, attempting reconnect")
                await self.reconnect()
                
            except Exception:
                logger.error("Fatal error in message processing", exc_info=True)
                if self.exit_handler:
                    self.exit_handler(self)
                raise

Conclusion

Our WebSocket client implementation represents a thoughtful balance between robustness and maintainability. It provides the features needed for production trading systems while remaining clean and understandable. The combination of strong typing, comprehensive testing, and careful error handling makes it a reliable foundation for building complex trading applications. Whether handling market data feeds or managing orders, this implementation provides the tools needed for successful trading system integration.