Skip to content

Add typing annotations for functions in can.bus #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 29, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 40 additions & 18 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Contains the ABC bus implementation and its documentation.
"""

from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Union

from abc import ABCMeta, abstractmethod
import can
import logging
Expand Down Expand Up @@ -38,7 +40,12 @@ class BusABC(metaclass=ABCMeta):
RECV_LOGGING_LEVEL = 9

@abstractmethod
def __init__(self, channel, can_filters=None, **kwargs):
def __init__(
self,
channel: Any,
can_filters: Optional[List[Dict[str, Union[bool, int, str]]]] = None,
**kwargs: object
):
"""Construct and open a CAN bus instance of the specified type.

Subclasses should call though this method with all given parameters
Expand All @@ -53,13 +60,13 @@ def __init__(self, channel, can_filters=None, **kwargs):
:param dict kwargs:
Any backend dependent configurations are passed in this dictionary
"""
self._periodic_tasks = []
self._periodic_tasks: List[can.broadcastmanager.CyclicSendTaskABC] = []
self.set_filters(can_filters)

def __str__(self):
def __str__(self) -> str:
return self.channel_info

def recv(self, timeout=None):
def recv(self, timeout: Optional[float] = None) -> Optional[can.Message]:
"""Block waiting for a message from the Bus.

:type timeout: float or None
Expand Down Expand Up @@ -100,7 +107,7 @@ def recv(self, timeout=None):
else:
return None

def _recv_internal(self, timeout):
def _recv_internal(self, timeout: Optional[float]):
"""
Read a message from the bus and tell whether it was filtered.
This methods may be called by :meth:`~can.BusABC.recv`
Expand Down Expand Up @@ -144,7 +151,7 @@ def _recv_internal(self, timeout):
raise NotImplementedError("Trying to read from a write only bus?")

@abstractmethod
def send(self, msg, timeout=None):
def send(self, msg: can.Message, timeout: Optional[float] = None):
"""Transmit a message to the CAN bus.

Override this method to enable the transmit path.
Expand All @@ -164,7 +171,13 @@ def send(self, msg, timeout=None):
"""
raise NotImplementedError("Trying to write to a readonly bus?")

def send_periodic(self, msgs, period, duration=None, store_task=True):
def send_periodic(
self,
msgs: Union[Sequence[can.Message], can.Message],
period: float,
duration: Optional[float] = None,
store_task: bool = True,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.

The task will be active until one of the following conditions are met:
Expand Down Expand Up @@ -223,14 +236,19 @@ def wrapped_stop_method(remove_task=True):
pass
original_stop_method()

task.stop = wrapped_stop_method
setattr(task, "stop", wrapped_stop_method)

if store_task:
self._periodic_tasks.append(task)

return task

def _send_periodic_internal(self, msgs, period, duration=None):
def _send_periodic_internal(
self,
msgs: Union[Sequence[can.Message], can.Message],
period: float,
duration: Optional[float] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.

Override this method to enable a more efficient backend specific approach.
Expand Down Expand Up @@ -275,7 +293,7 @@ def stop_all_periodic_tasks(self, remove_tasks=True):
if remove_tasks:
self._periodic_tasks = []

def __iter__(self):
def __iter__(self) -> Iterator[can.Message]:
"""Allow iteration on messages as they are received.

>>> for msg in bus:
Expand All @@ -291,18 +309,20 @@ def __iter__(self):
yield msg

@property
def filters(self):
def filters(self) -> Optional[Iterable[dict]]:
"""
Modify the filters of this bus. See :meth:`~can.BusABC.set_filters`
for details.
"""
return self._filters

@filters.setter
def filters(self, filters):
def filters(self, filters: Optional[Iterable[Dict[str, Union[bool, int, str]]]]):
self.set_filters(filters)

def set_filters(self, filters=None):
def set_filters(
self, filters: Optional[Iterable[Dict[str, Union[bool, int, str]]]] = None
):
"""Apply filtering to all messages received by this Bus.

All messages that match at least one filter are returned.
Expand All @@ -327,7 +347,9 @@ def set_filters(self, filters=None):
self._filters = filters or None
self._apply_filters(self._filters)

def _apply_filters(self, filters):
def _apply_filters(
self, filters: Optional[Iterable[Dict[str, Union[bool, int, str]]]]
):
"""
Hook for applying the filters to the underlying kernel or
hardware if supported/implemented by the interface.
Expand All @@ -336,7 +358,7 @@ def _apply_filters(self, filters):
See :meth:`~can.BusABC.set_filters` for details.
"""

def _matches_filters(self, msg):
def _matches_filters(self, msg: can.Message) -> bool:
"""Checks whether the given message matches at least one of the
current filters. See :meth:`~can.BusABC.set_filters` for details
on how the filters work.
Expand Down Expand Up @@ -388,7 +410,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

@property
def state(self):
def state(self) -> BusState:
"""
Return the current state of the hardware

Expand All @@ -397,7 +419,7 @@ def state(self):
return BusState.ACTIVE

@state.setter
def state(self, new_state):
def state(self, new_state: BusState):
"""
Set the new state of the hardware

Expand All @@ -406,7 +428,7 @@ def state(self, new_state):
raise NotImplementedError("Property is not implemented.")

@staticmethod
def _detect_available_configs():
def _detect_available_configs() -> Iterator[dict]:
"""Detect all configurations/channels that this interface could
currently connect with.

Expand Down