4
4
Contains the ABC bus implementation and its documentation.
5
5
"""
6
6
7
+ from typing import Any , Iterator , List , Optional , Sequence , Tuple , Union
8
+
9
+ import can .typechecking
10
+
7
11
from abc import ABCMeta , abstractmethod
8
12
import can
9
13
import logging
@@ -38,7 +42,12 @@ class BusABC(metaclass=ABCMeta):
38
42
RECV_LOGGING_LEVEL = 9
39
43
40
44
@abstractmethod
41
- def __init__ (self , channel , can_filters = None , ** kwargs ):
45
+ def __init__ (
46
+ self ,
47
+ channel : Any ,
48
+ can_filters : Optional [can .typechecking .CanFilters ] = None ,
49
+ ** kwargs : object
50
+ ):
42
51
"""Construct and open a CAN bus instance of the specified type.
43
52
44
53
Subclasses should call though this method with all given parameters
@@ -47,26 +56,24 @@ def __init__(self, channel, can_filters=None, **kwargs):
47
56
:param channel:
48
57
The can interface identifier. Expected type is backend dependent.
49
58
50
- :param list can_filters:
59
+ :param can_filters:
51
60
See :meth:`~can.BusABC.set_filters` for details.
52
61
53
62
:param dict kwargs:
54
63
Any backend dependent configurations are passed in this dictionary
55
64
"""
56
- self ._periodic_tasks = []
65
+ self ._periodic_tasks : List [ can . broadcastmanager . CyclicSendTaskABC ] = []
57
66
self .set_filters (can_filters )
58
67
59
- def __str__ (self ):
68
+ def __str__ (self ) -> str :
60
69
return self .channel_info
61
70
62
- def recv (self , timeout = None ):
71
+ def recv (self , timeout : Optional [ float ] = None ) -> Optional [ can . Message ] :
63
72
"""Block waiting for a message from the Bus.
64
73
65
- :type timeout: float or None
66
74
:param timeout:
67
75
seconds to wait for a message or None to wait indefinitely
68
76
69
- :rtype: can.Message or None
70
77
:return:
71
78
None on timeout or a :class:`can.Message` object.
72
79
:raises can.CanError:
@@ -100,7 +107,9 @@ def recv(self, timeout=None):
100
107
else :
101
108
return None
102
109
103
- def _recv_internal (self , timeout ):
110
+ def _recv_internal (
111
+ self , timeout : Optional [float ]
112
+ ) -> Tuple [Optional [can .Message ], bool ]:
104
113
"""
105
114
Read a message from the bus and tell whether it was filtered.
106
115
This methods may be called by :meth:`~can.BusABC.recv`
@@ -128,7 +137,6 @@ def _recv_internal(self, timeout):
128
137
:param float timeout: seconds to wait for a message,
129
138
see :meth:`~can.BusABC.send`
130
139
131
- :rtype: tuple[can.Message, bool] or tuple[None, bool]
132
140
:return:
133
141
1. a message that was read or None on timeout
134
142
2. a bool that is True if message filtering has already
@@ -144,14 +152,13 @@ def _recv_internal(self, timeout):
144
152
raise NotImplementedError ("Trying to read from a write only bus?" )
145
153
146
154
@abstractmethod
147
- def send (self , msg , timeout = None ):
155
+ def send (self , msg : can . Message , timeout : Optional [ float ] = None ):
148
156
"""Transmit a message to the CAN bus.
149
157
150
158
Override this method to enable the transmit path.
151
159
152
160
:param can.Message msg: A message object.
153
161
154
- :type timeout: float or None
155
162
:param timeout:
156
163
If > 0, wait up to this many seconds for message to be ACK'ed or
157
164
for transmit queue to be ready depending on driver implementation.
@@ -164,7 +171,13 @@ def send(self, msg, timeout=None):
164
171
"""
165
172
raise NotImplementedError ("Trying to write to a readonly bus?" )
166
173
167
- def send_periodic (self , msgs , period , duration = None , store_task = True ):
174
+ def send_periodic (
175
+ self ,
176
+ msgs : Union [Sequence [can .Message ], can .Message ],
177
+ period : float ,
178
+ duration : Optional [float ] = None ,
179
+ store_task : bool = True ,
180
+ ) -> can .broadcastmanager .CyclicSendTaskABC :
168
181
"""Start sending messages at a given period on this bus.
169
182
170
183
The task will be active until one of the following conditions are met:
@@ -175,20 +188,19 @@ def send_periodic(self, msgs, period, duration=None, store_task=True):
175
188
- :meth:`BusABC.stop_all_periodic_tasks()` is called
176
189
- the task's :meth:`CyclicTask.stop()` method is called.
177
190
178
- :param Union[Sequence[can.Message], can.Message] msgs:
191
+ :param msgs:
179
192
Messages to transmit
180
- :param float period:
193
+ :param period:
181
194
Period in seconds between each message
182
- :param float duration:
195
+ :param duration:
183
196
Approximate duration in seconds to continue sending messages. If
184
197
no duration is provided, the task will continue indefinitely.
185
- :param bool store_task:
198
+ :param store_task:
186
199
If True (the default) the task will be attached to this Bus instance.
187
200
Disable to instead manage tasks manually.
188
201
:return:
189
202
A started task instance. Note the task can be stopped (and depending on
190
203
the backend modified) by calling the :meth:`stop` method.
191
- :rtype: can.broadcastmanager.CyclicSendTaskABC
192
204
193
205
.. note::
194
206
@@ -223,30 +235,34 @@ def wrapped_stop_method(remove_task=True):
223
235
pass
224
236
original_stop_method ()
225
237
226
- task . stop = wrapped_stop_method
238
+ setattr ( task , " stop" , wrapped_stop_method )
227
239
228
240
if store_task :
229
241
self ._periodic_tasks .append (task )
230
242
231
243
return task
232
244
233
- def _send_periodic_internal (self , msgs , period , duration = None ):
245
+ def _send_periodic_internal (
246
+ self ,
247
+ msgs : Union [Sequence [can .Message ], can .Message ],
248
+ period : float ,
249
+ duration : Optional [float ] = None ,
250
+ ) -> can .broadcastmanager .CyclicSendTaskABC :
234
251
"""Default implementation of periodic message sending using threading.
235
252
236
253
Override this method to enable a more efficient backend specific approach.
237
254
238
- :param Union[Sequence[can.Message], can.Message] msgs:
255
+ :param msgs:
239
256
Messages to transmit
240
- :param float period:
257
+ :param period:
241
258
Period in seconds between each message
242
- :param float duration:
259
+ :param duration:
243
260
The duration between sending each message at the given rate. If
244
261
no duration is provided, the task will continue indefinitely.
245
262
:return:
246
263
A started task instance. Note the task can be stopped (and
247
264
depending on the backend modified) by calling the :meth:`stop`
248
265
method.
249
- :rtype: can.broadcastmanager.CyclicSendTaskABC
250
266
"""
251
267
if not hasattr (self , "_lock_send_periodic" ):
252
268
# Create a send lock for this bus, but not for buses which override this method
@@ -275,7 +291,7 @@ def stop_all_periodic_tasks(self, remove_tasks=True):
275
291
if remove_tasks :
276
292
self ._periodic_tasks = []
277
293
278
- def __iter__ (self ):
294
+ def __iter__ (self ) -> Iterator [ can . Message ] :
279
295
"""Allow iteration on messages as they are received.
280
296
281
297
>>> for msg in bus:
@@ -291,18 +307,18 @@ def __iter__(self):
291
307
yield msg
292
308
293
309
@property
294
- def filters (self ):
310
+ def filters (self ) -> Optional [ can . typechecking . CanFilters ] :
295
311
"""
296
312
Modify the filters of this bus. See :meth:`~can.BusABC.set_filters`
297
313
for details.
298
314
"""
299
315
return self ._filters
300
316
301
317
@filters .setter
302
- def filters (self , filters ):
318
+ def filters (self , filters : Optional [ can . typechecking . CanFilters ] ):
303
319
self .set_filters (filters )
304
320
305
- def set_filters (self , filters = None ):
321
+ def set_filters (self , filters : Optional [ can . typechecking . CanFilters ] = None ):
306
322
"""Apply filtering to all messages received by this Bus.
307
323
308
324
All messages that match at least one filter are returned.
@@ -327,25 +343,24 @@ def set_filters(self, filters=None):
327
343
self ._filters = filters or None
328
344
self ._apply_filters (self ._filters )
329
345
330
- def _apply_filters (self , filters ):
346
+ def _apply_filters (self , filters : Optional [ can . typechecking . CanFilters ] ):
331
347
"""
332
348
Hook for applying the filters to the underlying kernel or
333
349
hardware if supported/implemented by the interface.
334
350
335
- :param Iterator[dict] filters:
351
+ :param filters:
336
352
See :meth:`~can.BusABC.set_filters` for details.
337
353
"""
338
354
339
- def _matches_filters (self , msg ) :
355
+ def _matches_filters (self , msg : can . Message ) -> bool :
340
356
"""Checks whether the given message matches at least one of the
341
357
current filters. See :meth:`~can.BusABC.set_filters` for details
342
358
on how the filters work.
343
359
344
360
This method should not be overridden.
345
361
346
- :param can.Message msg:
362
+ :param msg:
347
363
the message to check if matching
348
- :rtype: bool
349
364
:return: whether the given message matches at least one filter
350
365
"""
351
366
@@ -388,33 +403,28 @@ def __exit__(self, exc_type, exc_val, exc_tb):
388
403
self .shutdown ()
389
404
390
405
@property
391
- def state (self ):
406
+ def state (self ) -> BusState :
392
407
"""
393
408
Return the current state of the hardware
394
-
395
- :type: can.BusState
396
409
"""
397
410
return BusState .ACTIVE
398
411
399
412
@state .setter
400
- def state (self , new_state ):
413
+ def state (self , new_state : BusState ):
401
414
"""
402
415
Set the new state of the hardware
403
-
404
- :type: can.BusState
405
416
"""
406
417
raise NotImplementedError ("Property is not implemented." )
407
418
408
419
@staticmethod
409
- def _detect_available_configs ():
420
+ def _detect_available_configs () -> Iterator [ dict ] :
410
421
"""Detect all configurations/channels that this interface could
411
422
currently connect with.
412
423
413
424
This might be quite time consuming.
414
425
415
426
May not to be implemented by every interface on every platform.
416
427
417
- :rtype: Iterator[dict]
418
428
:return: an iterable of dicts, each being a configuration suitable
419
429
for usage in the interface's bus constructor.
420
430
"""
0 commit comments