parent
267b0a723d
commit
0bf7a78553
|
@ -228,7 +228,7 @@ class _DeviceMonitor(Thread):
|
||||||
|
|
||||||
def _match(
|
def _match(
|
||||||
action: str,
|
action: str,
|
||||||
device,
|
device: dict[str, Any],
|
||||||
filter_func: Callable[[int, int, int, bool, bool], dict[str, Any]],
|
filter_func: Callable[[int, int, int, bool, bool], dict[str, Any]],
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
|
@ -393,7 +393,7 @@ def open(vendor_id, product_id, serial=None):
|
||||||
return device_handle
|
return device_handle
|
||||||
|
|
||||||
|
|
||||||
def open_path(device_path) -> Any:
|
def open_path(device_path: str) -> int:
|
||||||
"""Open a HID device by its path name.
|
"""Open a HID device by its path name.
|
||||||
|
|
||||||
:param device_path: the path of a ``DeviceInfo`` tuple returned by enumerate().
|
:param device_path: the path of a ``DeviceInfo`` tuple returned by enumerate().
|
||||||
|
|
|
@ -66,7 +66,7 @@ class HIDAPI(typing.Protocol):
|
||||||
def open(self, vendor_id, product_id, serial=None):
|
def open(self, vendor_id, product_id, serial=None):
|
||||||
...
|
...
|
||||||
|
|
||||||
def open_path(self, path):
|
def open_path(self, path) -> int:
|
||||||
...
|
...
|
||||||
|
|
||||||
def enumerate(self, filter_func: Callable[[int, int, int, bool, bool], dict[str, typing.Any]]) -> DeviceInfo:
|
def enumerate(self, filter_func: Callable[[int, int, int, bool, bool], dict[str, typing.Any]]) -> DeviceInfo:
|
||||||
|
@ -233,7 +233,7 @@ def notify_on_receivers_glib(glib: GLib, callback: Callable):
|
||||||
return hidapi.monitor_glib(glib, callback, _filter_products_of_interest)
|
return hidapi.monitor_glib(glib, callback, _filter_products_of_interest)
|
||||||
|
|
||||||
|
|
||||||
def open_path(path):
|
def open_path(path) -> int:
|
||||||
"""Checks if the given Linux device path points to the right UR device.
|
"""Checks if the given Linux device path points to the right UR device.
|
||||||
|
|
||||||
:param path: the Linux device path.
|
:param path: the Linux device path.
|
||||||
|
@ -356,7 +356,7 @@ def _is_relevant_message(data: bytes) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _read(handle, timeout):
|
def _read(handle, timeout) -> tuple[int, int, bytes]:
|
||||||
"""Read an incoming packet from the receiver.
|
"""Read an incoming packet from the receiver.
|
||||||
|
|
||||||
:returns: a tuple of (report_id, devnumber, data), or `None`.
|
:returns: a tuple of (report_id, devnumber, data), or `None`.
|
||||||
|
|
|
@ -23,7 +23,6 @@ import threading
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from typing import Protocol
|
from typing import Protocol
|
||||||
|
@ -51,7 +50,7 @@ _hidpp20 = hidpp20.Hidpp20()
|
||||||
|
|
||||||
|
|
||||||
class LowLevelInterface(Protocol):
|
class LowLevelInterface(Protocol):
|
||||||
def open_path(self, path) -> Any:
|
def open_path(self, path) -> int:
|
||||||
...
|
...
|
||||||
|
|
||||||
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
|
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
|
||||||
|
|
|
@ -22,7 +22,6 @@ import struct
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
from time import time
|
from time import time
|
||||||
from typing import Any
|
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
from typing import Protocol
|
from typing import Protocol
|
||||||
|
|
||||||
|
@ -1890,7 +1889,7 @@ class SettingsProtocol(Protocol):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
def check_feature(device, settings_class: SettingsProtocol) -> None | bool | Any:
|
def check_feature(device, settings_class: SettingsProtocol) -> None | bool | SettingsProtocol:
|
||||||
if settings_class.feature not in device.features:
|
if settings_class.feature not in device.features:
|
||||||
return
|
return
|
||||||
if settings_class.min_version > device.features.get_feature_version(settings_class.feature):
|
if settings_class.min_version > device.features.get_feature_version(settings_class.feature):
|
||||||
|
|
|
@ -15,10 +15,13 @@
|
||||||
## with this program; if not, write to the Free Software Foundation, Inc.,
|
## with this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
import time
|
||||||
|
import typing
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
@ -36,9 +39,15 @@ from . import configuration
|
||||||
from . import dbus
|
from . import dbus
|
||||||
from . import i18n
|
from . import i18n
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
from hidapi.common import DeviceInfo
|
||||||
|
|
||||||
gi.require_version("Gtk", "3.0") # NOQA: E402
|
gi.require_version("Gtk", "3.0") # NOQA: E402
|
||||||
from gi.repository import GLib # NOQA: E402 # isort:skip
|
from gi.repository import GLib # NOQA: E402 # isort:skip
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
from logitech_receiver.device import Device
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
ACTION_ADD = "add"
|
ACTION_ADD = "add"
|
||||||
|
@ -235,7 +244,7 @@ class SolaarListener(listener.EventsListener):
|
||||||
return f"<SolaarListener({self.receiver.path},{self.receiver.handle})>"
|
return f"<SolaarListener({self.receiver.path},{self.receiver.handle})>"
|
||||||
|
|
||||||
|
|
||||||
def _process_bluez_dbus(device, path, dictionary, signature):
|
def _process_bluez_dbus(device: Device, path, dictionary: dict, signature):
|
||||||
"""Process bluez dbus property changed signals for device status
|
"""Process bluez dbus property changed signals for device status
|
||||||
changes to discover disconnections and connections.
|
changes to discover disconnections and connections.
|
||||||
"""
|
"""
|
||||||
|
@ -251,7 +260,7 @@ def _process_bluez_dbus(device, path, dictionary, signature):
|
||||||
_cleanup_bluez_dbus(device)
|
_cleanup_bluez_dbus(device)
|
||||||
|
|
||||||
|
|
||||||
def _cleanup_bluez_dbus(device):
|
def _cleanup_bluez_dbus(device: Device):
|
||||||
"""Remove dbus signal receiver for device"""
|
"""Remove dbus signal receiver for device"""
|
||||||
if logger.isEnabledFor(logging.INFO):
|
if logger.isEnabledFor(logging.INFO):
|
||||||
logger.info("bluez cleanup for %s", device)
|
logger.info("bluez cleanup for %s", device)
|
||||||
|
@ -261,10 +270,10 @@ def _cleanup_bluez_dbus(device):
|
||||||
_all_listeners = {} # all known receiver listeners, listeners that stop on their own may remain here
|
_all_listeners = {} # all known receiver listeners, listeners that stop on their own may remain here
|
||||||
|
|
||||||
|
|
||||||
def _start(device_info):
|
def _start(device_info: DeviceInfo):
|
||||||
assert _status_callback and _setting_callback
|
assert _status_callback and _setting_callback
|
||||||
isDevice = device_info.isDevice
|
|
||||||
if not isDevice:
|
if not device_info.isDevice:
|
||||||
receiver_ = logitech_receiver.receiver.create_receiver(base, device_info, _setting_callback)
|
receiver_ = logitech_receiver.receiver.create_receiver(base, device_info, _setting_callback)
|
||||||
else:
|
else:
|
||||||
receiver_ = logitech_receiver.device.create_device(base, device_info, _setting_callback)
|
receiver_ = logitech_receiver.device.create_device(base, device_info, _setting_callback)
|
||||||
|
@ -345,7 +354,7 @@ def setup_scanner(status_changed_callback, setting_changed_callback, error_callb
|
||||||
base.notify_on_receivers_glib(GLib, _process_receiver_event)
|
base.notify_on_receivers_glib(GLib, _process_receiver_event)
|
||||||
|
|
||||||
|
|
||||||
def _process_add(device_info, retry):
|
def _process_add(device_info: DeviceInfo, retry):
|
||||||
try:
|
try:
|
||||||
_start(device_info)
|
_start(device_info)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
|
|
|
@ -31,7 +31,7 @@ class LowLevelInterfaceFake:
|
||||||
def __init__(self, responses=None):
|
def __init__(self, responses=None):
|
||||||
self.responses = responses
|
self.responses = responses
|
||||||
|
|
||||||
def open_path(self, path):
|
def open_path(self, path) -> int:
|
||||||
return fake_hidpp.open_path(path)
|
return fake_hidpp.open_path(path)
|
||||||
|
|
||||||
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
|
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
|
||||||
|
|
Loading…
Reference in New Issue