@@ -37,7 +37,7 @@ def __init__(self, host: str, port: int, comm_addr: int, timeout: int, retries:
37
37
self ._timer : asyncio .TimerHandle | None = None
38
38
self .timeout : int = timeout
39
39
self .retries : int = retries
40
- self .keep_alive : bool = True
40
+ self .keep_alive : bool = False
41
41
self .protocol : asyncio .Protocol | None = None
42
42
self .response_future : Future | None = None
43
43
self .command : ProtocolCommand | None = None
@@ -62,6 +62,24 @@ def _ensure_lock(self) -> asyncio.Lock:
62
62
self ._close_transport ()
63
63
return self ._lock
64
64
65
+ def _max_retries_reached (self ) -> Future :
66
+ logger .debug ("Max number of retries (%d) reached, request %s failed." , self .retries , self .command )
67
+ self ._close_transport ()
68
+ self .response_future = asyncio .get_running_loop ().create_future ()
69
+ self .response_future .set_exception (MaxRetriesException )
70
+ return self .response_future
71
+
72
+ def _close_transport (self ) -> None :
73
+ if self ._transport :
74
+ try :
75
+ self ._transport .close ()
76
+ except RuntimeError :
77
+ logger .debug ("Failed to close transport." )
78
+ self ._transport = None
79
+ # Cancel Future on connection lost
80
+ if self .response_future and not self .response_future .done ():
81
+ self .response_future .cancel ()
82
+
65
83
async def close (self ) -> None :
66
84
"""Close the underlying transport/connection."""
67
85
raise NotImplementedError ()
@@ -133,15 +151,16 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
133
151
self ._partial_missing = 0
134
152
if self .command .validator (data ):
135
153
logger .debug ("Received: %s" , data .hex ())
154
+ self ._retry = 0
136
155
self .response_future .set_result (data )
137
156
else :
138
157
logger .debug ("Received invalid response: %s" , data .hex ())
139
- asyncio .get_running_loop ().call_soon (self ._retry_mechanism )
158
+ asyncio .get_running_loop ().call_soon (self ._timeout_mechanism )
140
159
except PartialResponseException as ex :
141
160
logger .debug ("Received response fragment (%d of %d): %s" , ex .length , ex .expected , data .hex ())
142
161
self ._partial_data = data
143
162
self ._partial_missing = ex .expected - ex .length
144
- self ._timer = asyncio .get_running_loop ().call_later (self .timeout , self ._retry_mechanism )
163
+ self ._timer = asyncio .get_running_loop ().call_later (self .timeout , self ._timeout_mechanism )
145
164
except asyncio .InvalidStateError :
146
165
logger .debug ("Response already handled: %s" , data .hex ())
147
166
except RequestRejectedException as ex :
@@ -158,13 +177,28 @@ def error_received(self, exc: Exception) -> None:
158
177
159
178
async def send_request (self , command : ProtocolCommand ) -> Future :
160
179
"""Send message via transport"""
161
- async with self ._ensure_lock ():
180
+ await self ._ensure_lock ().acquire ()
181
+ try :
162
182
await self ._connect ()
163
183
response_future = asyncio .get_running_loop ().create_future ()
164
- self ._retry = 0
165
184
self ._send_request (command , response_future )
166
185
await response_future
167
186
return response_future
187
+ except asyncio .CancelledError :
188
+ if self ._retry < self .retries :
189
+ self ._retry += 1
190
+ if self ._lock and self ._lock .locked ():
191
+ self ._lock .release ()
192
+ if not self .keep_alive :
193
+ self ._close_transport ()
194
+ return await self .send_request (command )
195
+ else :
196
+ return self ._max_retries_reached ()
197
+ finally :
198
+ if self ._lock and self ._lock .locked ():
199
+ self ._lock .release ()
200
+ if not self .keep_alive :
201
+ self ._close_transport ()
168
202
169
203
def _send_request (self , command : ProtocolCommand , response_future : Future ) -> None :
170
204
"""Send message via transport"""
@@ -178,32 +212,19 @@ def _send_request(self, command: ProtocolCommand, response_future: Future) -> No
178
212
else :
179
213
logger .debug ("Sending: %s" , self .command )
180
214
self ._transport .sendto (payload )
181
- self ._timer = asyncio .get_running_loop ().call_later (self .timeout , self ._retry_mechanism )
215
+ self ._timer = asyncio .get_running_loop ().call_later (self .timeout , self ._timeout_mechanism )
182
216
183
- def _retry_mechanism (self ) -> None :
184
- """Retry mechanism to prevent hanging transport"""
185
- if self .response_future .done ():
217
+ def _timeout_mechanism (self ) -> None :
218
+ """Timeout mechanism to prevent hanging transport"""
219
+ if self .response_future and self . response_future .done ():
186
220
logger .debug ("Response already received." )
187
- elif self ._retry < self .retries :
221
+ self ._retry = 0
222
+ else :
188
223
if self ._timer :
189
224
logger .debug ("Failed to receive response to %s in time (%ds)." , self .command , self .timeout )
190
- self ._retry += 1
191
- self ._send_request (self .command , self .response_future )
192
- else :
193
- logger .debug ("Max number of retries (%d) reached, request %s failed." , self .retries , self .command )
194
- self .response_future .set_exception (MaxRetriesException )
195
- self ._close_transport ()
196
-
197
- def _close_transport (self ) -> None :
198
- if self ._transport :
199
- try :
200
- self ._transport .close ()
201
- except RuntimeError :
202
- logger .debug ("Failed to close transport." )
203
- self ._transport = None
204
- # Cancel Future on connection close
205
- if self .response_future and not self .response_future .done ():
206
- self .response_future .cancel ()
225
+ self ._timer = None
226
+ if self .response_future and not self .response_future .done ():
227
+ self .response_future .cancel ()
207
228
208
229
async def close (self ):
209
230
self ._close_transport ()
@@ -358,24 +379,6 @@ def _timeout_mechanism(self) -> None:
358
379
self ._timer = None
359
380
self ._close_transport ()
360
381
361
- def _max_retries_reached (self ) -> Future :
362
- logger .debug ("Max number of retries (%d) reached, request %s failed." , self .retries , self .command )
363
- self ._close_transport ()
364
- self .response_future = asyncio .get_running_loop ().create_future ()
365
- self .response_future .set_exception (MaxRetriesException )
366
- return self .response_future
367
-
368
- def _close_transport (self ) -> None :
369
- if self ._transport :
370
- try :
371
- self ._transport .close ()
372
- except RuntimeError :
373
- logger .debug ("Failed to close transport." )
374
- self ._transport = None
375
- # Cancel Future on connection lost
376
- if self .response_future and not self .response_future .done ():
377
- self .response_future .cancel ()
378
-
379
382
async def close (self ):
380
383
await self ._ensure_lock ().acquire ()
381
384
try :
0 commit comments