1- import asyncio
2- import json
31import logging
42import os
3+ from asyncio import wait_for
4+ from json import loads
55from typing import (
66 Any ,
77 Dict ,
8+ Final ,
89 Optional ,
910 Union ,
11+ final ,
1012)
1113
1214from eth_typing import (
3638 RPCResponse ,
3739)
3840
39- DEFAULT_PING_INTERVAL = 30 # 30 seconds
40- DEFAULT_PING_TIMEOUT = 300 # 5 minutes
41+ DEFAULT_PING_INTERVAL : Final = 30 # 30 seconds
42+ DEFAULT_PING_TIMEOUT : Final = 300 # 5 minutes
4143
42- VALID_WEBSOCKET_URI_PREFIXES = {"ws://" , "wss://" }
43- RESTRICTED_WEBSOCKET_KWARGS = {"uri" , "loop" }
44- DEFAULT_WEBSOCKET_KWARGS = {
44+ VALID_WEBSOCKET_URI_PREFIXES : Final = frozenset ( {"ws://" , "wss://" })
45+ RESTRICTED_WEBSOCKET_KWARGS : Final = frozenset ( {"uri" , "loop" })
46+ DEFAULT_WEBSOCKET_KWARGS : Final = {
4547 # set how long to wait between pings from the server
4648 "ping_interval" : DEFAULT_PING_INTERVAL ,
4749 # set how long to wait without a pong response before closing the connection
@@ -53,9 +55,10 @@ def get_default_endpoint() -> URI:
5355 return URI (os .environ .get ("WEB3_WS_PROVIDER_URI" , "ws://127.0.0.1:8546" ))
5456
5557
58+ @final
5659class WebSocketProvider (PersistentConnectionProvider ):
57- logger = logging .getLogger ("faster_web3.providers.WebSocketProvider" )
58- is_async : bool = True
60+ logger : Final = logging .getLogger ("faster_web3.providers.WebSocketProvider" )
61+ is_async : Final = True
5962
6063 def __init__ (
6164 self ,
@@ -67,33 +70,32 @@ def __init__(
6770 ** kwargs : Any ,
6871 ) -> None :
6972 # initialize the endpoint_uri before calling the super constructor
70- self .endpoint_uri = (
71- URI (endpoint_uri ) if endpoint_uri is not None else get_default_endpoint ()
72- )
73+ endpoint = URI (endpoint_uri ) or get_default_endpoint ()
74+ self .endpoint_uri : Final = endpoint
7375 super ().__init__ (** kwargs )
74- self .use_text_frames = use_text_frames
76+ self .use_text_frames : Final = use_text_frames
7577 self ._ws : Optional [WebSocketClientProtocol ] = None
7678
7779 if not any (
78- self .endpoint_uri .startswith (prefix )
79- for prefix in VALID_WEBSOCKET_URI_PREFIXES
80+ map (endpoint .startswith , VALID_WEBSOCKET_URI_PREFIXES )
8081 ):
8182 raise Web3ValidationError (
8283 "WebSocket endpoint uri must begin with 'ws://' or 'wss://': "
8384 f"{ self .endpoint_uri } "
8485 )
8586
8687 if websocket_kwargs is not None :
87- found_restricted_keys = set (websocket_kwargs ).intersection (
88+ if found_restricted_keys : = set (websocket_kwargs ).intersection (
8889 RESTRICTED_WEBSOCKET_KWARGS
89- )
90- if found_restricted_keys :
90+ ):
9191 raise Web3ValidationError (
9292 "Found restricted keys for websocket_kwargs: "
9393 f"{ found_restricted_keys } ."
9494 )
9595
96- self .websocket_kwargs = merge (DEFAULT_WEBSOCKET_KWARGS , websocket_kwargs or {})
96+ self .websocket_kwargs : Final [Dict [str , Any ]] = merge (
97+ DEFAULT_WEBSOCKET_KWARGS , websocket_kwargs or {}
98+ )
9799
98100 def __str__ (self ) -> str :
99101 return f"WebSocket connection: { self .endpoint_uri } "
@@ -123,11 +125,11 @@ async def socket_send(self, request_data: bytes) -> None:
123125 if self .use_text_frames :
124126 payload = request_data .decode ("utf-8" )
125127
126- await asyncio . wait_for (self ._ws .send (payload ), timeout = self .request_timeout )
128+ await wait_for (self ._ws .send (payload ), timeout = self .request_timeout )
127129
128130 async def socket_recv (self ) -> RPCResponse :
129131 raw_response = await self ._ws .recv ()
130- return json . loads (raw_response )
132+ return loads (raw_response )
131133
132134 # -- private methods -- #
133135
0 commit comments