Skip to content

Add typing annotations for functions in can.bus #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 50 additions & 40 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
Contains the ABC bus implementation and its documentation.
"""

from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union

import can.typechecking

from abc import ABCMeta, abstractmethod
import can
import logging
Expand Down Expand Up @@ -38,7 +42,12 @@ class BusABC(metaclass=ABCMeta):
RECV_LOGGING_LEVEL = 9

@abstractmethod
def __init__(self, channel, can_filters=None, **kwargs):
def __init__(
self,
channel: Any,
can_filters: Optional[can.typechecking.CanFilters] = None,
**kwargs: object
):
"""Construct and open a CAN bus instance of the specified type.

Subclasses should call though this method with all given parameters
Expand All @@ -47,26 +56,24 @@ def __init__(self, channel, can_filters=None, **kwargs):
:param channel:
The can interface identifier. Expected type is backend dependent.

:param list can_filters:
:param can_filters:
See :meth:`~can.BusABC.set_filters` for details.

:param dict kwargs:
Any backend dependent configurations are passed in this dictionary
"""
self._periodic_tasks = []
self._periodic_tasks: List[can.broadcastmanager.CyclicSendTaskABC] = []
self.set_filters(can_filters)

def __str__(self):
def __str__(self) -> str:
return self.channel_info

def recv(self, timeout=None):
def recv(self, timeout: Optional[float] = None) -> Optional[can.Message]:
"""Block waiting for a message from the Bus.

:type timeout: float or None
:param timeout:
seconds to wait for a message or None to wait indefinitely

:rtype: can.Message or None
:return:
None on timeout or a :class:`can.Message` object.
:raises can.CanError:
Expand Down Expand Up @@ -100,7 +107,9 @@ def recv(self, timeout=None):
else:
return None

def _recv_internal(self, timeout):
def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[can.Message], bool]:
"""
Read a message from the bus and tell whether it was filtered.
This methods may be called by :meth:`~can.BusABC.recv`
Expand Down Expand Up @@ -128,7 +137,6 @@ def _recv_internal(self, timeout):
:param float timeout: seconds to wait for a message,
see :meth:`~can.BusABC.send`

:rtype: tuple[can.Message, bool] or tuple[None, bool]
:return:
1. a message that was read or None on timeout
2. a bool that is True if message filtering has already
Expand All @@ -144,14 +152,13 @@ def _recv_internal(self, timeout):
raise NotImplementedError("Trying to read from a write only bus?")

@abstractmethod
def send(self, msg, timeout=None):
def send(self, msg: can.Message, timeout: Optional[float] = None):
"""Transmit a message to the CAN bus.

Override this method to enable the transmit path.

:param can.Message msg: A message object.

:type timeout: float or None
:param timeout:
If > 0, wait up to this many seconds for message to be ACK'ed or
for transmit queue to be ready depending on driver implementation.
Expand All @@ -164,7 +171,13 @@ def send(self, msg, timeout=None):
"""
raise NotImplementedError("Trying to write to a readonly bus?")

def send_periodic(self, msgs, period, duration=None, store_task=True):
def send_periodic(
self,
msgs: Union[Sequence[can.Message], can.Message],
period: float,
duration: Optional[float] = None,
store_task: bool = True,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.

The task will be active until one of the following conditions are met:
Expand All @@ -175,20 +188,19 @@ def send_periodic(self, msgs, period, duration=None, store_task=True):
- :meth:`BusABC.stop_all_periodic_tasks()` is called
- the task's :meth:`CyclicTask.stop()` method is called.

:param Union[Sequence[can.Message], can.Message] msgs:
:param msgs:
Messages to transmit
:param float period:
:param period:
Period in seconds between each message
:param float duration:
:param duration:
Approximate duration in seconds to continue sending messages. If
no duration is provided, the task will continue indefinitely.
:param bool store_task:
:param store_task:
If True (the default) the task will be attached to this Bus instance.
Disable to instead manage tasks manually.
:return:
A started task instance. Note the task can be stopped (and depending on
the backend modified) by calling the :meth:`stop` method.
:rtype: can.broadcastmanager.CyclicSendTaskABC

.. note::

Expand Down Expand Up @@ -223,30 +235,34 @@ def wrapped_stop_method(remove_task=True):
pass
original_stop_method()

task.stop = wrapped_stop_method
setattr(task, "stop", wrapped_stop_method)

if store_task:
self._periodic_tasks.append(task)

return task

def _send_periodic_internal(self, msgs, period, duration=None):
def _send_periodic_internal(
self,
msgs: Union[Sequence[can.Message], can.Message],
period: float,
duration: Optional[float] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.

Override this method to enable a more efficient backend specific approach.

:param Union[Sequence[can.Message], can.Message] msgs:
:param msgs:
Messages to transmit
:param float period:
:param period:
Period in seconds between each message
:param float duration:
:param duration:
The duration between sending each message at the given rate. If
no duration is provided, the task will continue indefinitely.
:return:
A started task instance. Note the task can be stopped (and
depending on the backend modified) by calling the :meth:`stop`
method.
:rtype: can.broadcastmanager.CyclicSendTaskABC
"""
if not hasattr(self, "_lock_send_periodic"):
# Create a send lock for this bus, but not for buses which override this method
Expand Down Expand Up @@ -275,7 +291,7 @@ def stop_all_periodic_tasks(self, remove_tasks=True):
if remove_tasks:
self._periodic_tasks = []

def __iter__(self):
def __iter__(self) -> Iterator[can.Message]:
"""Allow iteration on messages as they are received.

>>> for msg in bus:
Expand All @@ -291,18 +307,18 @@ def __iter__(self):
yield msg

@property
def filters(self):
def filters(self) -> Optional[can.typechecking.CanFilters]:
"""
Modify the filters of this bus. See :meth:`~can.BusABC.set_filters`
for details.
"""
return self._filters

@filters.setter
def filters(self, filters):
def filters(self, filters: Optional[can.typechecking.CanFilters]):
self.set_filters(filters)

def set_filters(self, filters=None):
def set_filters(self, filters: Optional[can.typechecking.CanFilters] = None):
"""Apply filtering to all messages received by this Bus.

All messages that match at least one filter are returned.
Expand All @@ -327,25 +343,24 @@ def set_filters(self, filters=None):
self._filters = filters or None
self._apply_filters(self._filters)

def _apply_filters(self, filters):
def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]):
"""
Hook for applying the filters to the underlying kernel or
hardware if supported/implemented by the interface.

:param Iterator[dict] filters:
:param filters:
See :meth:`~can.BusABC.set_filters` for details.
"""

def _matches_filters(self, msg):
def _matches_filters(self, msg: can.Message) -> bool:
"""Checks whether the given message matches at least one of the
current filters. See :meth:`~can.BusABC.set_filters` for details
on how the filters work.

This method should not be overridden.

:param can.Message msg:
:param msg:
the message to check if matching
:rtype: bool
:return: whether the given message matches at least one filter
"""

Expand Down Expand Up @@ -388,33 +403,28 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

@property
def state(self):
def state(self) -> BusState:
"""
Return the current state of the hardware

:type: can.BusState
"""
return BusState.ACTIVE

@state.setter
def state(self, new_state):
def state(self, new_state: BusState):
"""
Set the new state of the hardware

:type: can.BusState
"""
raise NotImplementedError("Property is not implemented.")

@staticmethod
def _detect_available_configs():
def _detect_available_configs() -> Iterator[dict]:
"""Detect all configurations/channels that this interface could
currently connect with.

This might be quite time consuming.

May not to be implemented by every interface on every platform.

:rtype: Iterator[dict]
:return: an iterable of dicts, each being a configuration suitable
for usage in the interface's bus constructor.
"""
Expand Down
10 changes: 10 additions & 0 deletions can/typechecking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Types for mypy type-checking
"""
import typing

import mypy_extensions

CanFilter = mypy_extensions.TypedDict(
"CanFilter", {"can_id": int, "can_mask": int, "extended": bool}
)
CanFilters = typing.Iterable[CanFilter]
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
"aenum",
'windows-curses;platform_system=="Windows"',
"filelock",
"mypy_extensions >= 0.4.0, < 0.5.0",
],
setup_requires=pytest_runner,
extras_require=extras_require,
Expand Down