diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index b68954b9..df488b43 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -20,6 +20,7 @@ import logging import threading as _threading import time +from typing import Callable from typing import Optional import hidapi as _hid @@ -66,8 +67,8 @@ class DeviceFactory: class Device: instances = [] - read_register = hidpp10.read_register - write_register = hidpp10.write_register + read_register: Callable = hidpp10.read_register + write_register: Callable = hidpp10.write_register def __init__(self, receiver, number, online, pairing_info=None, handle=None, device_info=None, setting_callback=None): assert receiver or device_info diff --git a/lib/logitech_receiver/hidpp10.py b/lib/logitech_receiver/hidpp10.py index 80d3ccda..d6358c3c 100644 --- a/lib/logitech_receiver/hidpp10.py +++ b/lib/logitech_receiver/hidpp10.py @@ -13,10 +13,17 @@ ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +from __future__ import annotations import logging -from .common import Battery as _Battery +from typing import Any + +from typing_extensions import Protocol + +from .common import Battery +from .common import BatteryChargeApproximation +from .common import BatteryStatus from .common import FirmwareInfo as _FirmwareInfo from .common import bytes2int as _bytes2int from .common import int2bytes as _int2bytes @@ -26,19 +33,36 @@ from .hidpp20_constants import FIRMWARE_KIND logger = logging.getLogger(__name__) -# -# functions -# + +class Device(Protocol): + def request(self, request_id, *params): + ... + + @property + def kind(self) -> Any: + ... + + @property + def online(self) -> bool: + ... + + @property + def protocol(self) -> Any: + ... + + @property + def registers(self) -> list: + ... -def read_register(device, register_number, *params): +def read_register(device: Device, register_number, *params): assert device is not None, f"tried to read register {register_number:02X} from invalid device {device}" # support long registers by adding a 2 in front of the register number request_id = 0x8100 | (int(register_number) & 0x2FF) return device.request(request_id, *params) -def write_register(device, register_number, *value): +def write_register(device: Device, register_number, *value): assert device is not None, f"tried to write register {register_number:02X} to invalid device {device}" # support long registers by adding a 2 in front of the register number request_id = 0x8000 | (int(register_number) & 0x2FF) @@ -59,7 +83,7 @@ def set_configuration_pending_flags(receiver, devices): class Hidpp10: - def get_battery(self, device): + def get_battery(self, device: Device): assert device is not None assert device.kind is not None if not device.online: @@ -89,7 +113,7 @@ class Hidpp10: device.registers.append(REGISTERS.battery_status) return parse_battery_status(REGISTERS.battery_status, reply) - def get_firmware(self, device): + def get_firmware(self, device: Device): assert device is not None firmware = [None, None, None] @@ -124,7 +148,7 @@ class Hidpp10: if any(firmware): return tuple(f for f in firmware if f) - def set_3leds(self, device, battery_level=None, charging=None, warning=None): + def set_3leds(self, device: Device, battery_level=None, charging=None, warning=None): assert device is not None assert device.kind is not None if not device.online: @@ -134,17 +158,17 @@ class Hidpp10: return if battery_level is not None: - if battery_level < _Battery.APPROX.critical: + if battery_level < BatteryChargeApproximation.CRITICAL: # 1 orange, and force blink v1, v2 = 0x22, 0x00 warning = True - elif battery_level < _Battery.APPROX.low: + elif battery_level < BatteryChargeApproximation.LOW: # 1 orange v1, v2 = 0x22, 0x00 - elif battery_level < _Battery.APPROX.good: + elif battery_level < BatteryChargeApproximation.GOOD: # 1 green v1, v2 = 0x20, 0x00 - elif battery_level < _Battery.APPROX.full: + elif battery_level < BatteryChargeApproximation.FULL: # 2 greens v1, v2 = 0x20, 0x02 else: @@ -166,10 +190,10 @@ class Hidpp10: write_register(device, REGISTERS.three_leds, v1, v2) - def get_notification_flags(self, device): + def get_notification_flags(self, device: Device): return self._get_register(device, REGISTERS.notifications) - def set_notification_flags(self, device, *flag_bits): + def set_notification_flags(self, device: Device, *flag_bits): assert device is not None # Avoid a call if the device is not online, @@ -184,10 +208,10 @@ class Hidpp10: result = write_register(device, REGISTERS.notifications, _int2bytes(flag_bits, 3)) return result is not None - def get_device_features(self, device): + def get_device_features(self, device: Device): return self._get_register(device, REGISTERS.mouse_button_flags) - def _get_register(self, device, register): + def _get_register(self, device: Device, register): assert device is not None # Avoid a call if the device is not online, @@ -203,50 +227,61 @@ class Hidpp10: return _bytes2int(flags) -def parse_battery_status(register, reply): +def parse_battery_status(register, reply) -> Battery | None: + def status_byte_to_charge(status_byte_: int) -> BatteryChargeApproximation: + if status_byte_ == 7: + charge_ = BatteryChargeApproximation.FULL + elif status_byte_ == 5: + charge_ = BatteryChargeApproximation.GOOD + elif status_byte_ == 3: + charge_ = BatteryChargeApproximation.LOW + elif status_byte_ == 1: + charge_ = BatteryChargeApproximation.CRITICAL + else: + # pure 'charging' notifications may come without a status + charge_ = BatteryChargeApproximation.EMPTY + return charge_ + + def status_byte_to_battery_status(status_byte_: int) -> BatteryStatus: + if status_byte_ == 0x30: + status_text_ = BatteryStatus.DISCHARGING + elif status_byte_ == 0x50: + status_text_ = BatteryStatus.RECHARGING + elif status_byte_ == 0x90: + status_text_ = BatteryStatus.FULL + else: + status_text_ = None + return status_text_ + + def charging_byte_to_status_text(charging_byte_: int) -> BatteryStatus: + if charging_byte_ == 0x00: + status_text_ = BatteryStatus.DISCHARGING + elif charging_byte_ & 0x21 == 0x21: + status_text_ = BatteryStatus.RECHARGING + elif charging_byte_ & 0x22 == 0x22: + status_text_ = BatteryStatus.FULL + else: + logger.warning("could not parse 0x07 battery status: %02X (level %02X)", charging_byte_, status_byte) + status_text_ = None + return status_text_ + if register == REGISTERS.battery_charge: charge = ord(reply[:1]) status_byte = ord(reply[2:3]) & 0xF0 - status_text = ( - _Battery.STATUS.discharging - if status_byte == 0x30 - else _Battery.STATUS.recharging - if status_byte == 0x50 - else _Battery.STATUS.full - if status_byte == 0x90 - else None - ) - return _Battery(charge, None, status_text, None) + + battery_status = status_byte_to_battery_status(status_byte) + return Battery(charge, None, battery_status, None) if register == REGISTERS.battery_status: status_byte = ord(reply[:1]) - charge = ( - _Battery.APPROX.full - if status_byte == 7 # full - else _Battery.APPROX.good - if status_byte == 5 # good - else _Battery.APPROX.low - if status_byte == 3 # low - else _Battery.APPROX.critical - if status_byte == 1 # critical - # pure 'charging' notifications may come without a status - else _Battery.APPROX.empty - ) - charging_byte = ord(reply[1:2]) - if charging_byte == 0x00: - status_text = _Battery.STATUS.discharging - elif charging_byte & 0x21 == 0x21: - status_text = _Battery.STATUS.recharging - elif charging_byte & 0x22 == 0x22: - status_text = _Battery.STATUS.full - else: - logger.warning("could not parse 0x07 battery status: %02X (level %02X)", charging_byte, status_byte) - status_text = None + + status_text = charging_byte_to_status_text(charging_byte) + charge = status_byte_to_charge(status_byte) if charging_byte & 0x03 and status_byte == 0: # some 'charging' notifications may come with no battery level information charge = None # Return None for next charge level and voltage as these are not in HID++ 1.0 spec - return _Battery(charge, None, status_text, None) + return Battery(charge, None, status_text, None) diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 1acb30a8..a738e66e 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -27,6 +27,7 @@ from typing import Optional import yaml as _yaml from solaar.i18n import _ +from typing_extensions import Protocol from . import exceptions from . import hidpp10_constants as _hidpp10_constants @@ -53,6 +54,30 @@ logger = logging.getLogger(__name__) KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KIND} +class Device(Protocol): + def feature_request(self, feature: FEATURE) -> Any: + ... + + def request(self) -> Any: + ... + + @property + def features(self) -> Any: + ... + + @property + def _gestures(self) -> Any: + ... + + @property + def _backlight(self) -> Any: + ... + + @property + def _profiles(self) -> Any: + ... + + class FeaturesArray(dict): def __init__(self, device): assert device is not None @@ -159,7 +184,7 @@ class ReprogrammableKey: - flags {List[str]} -- capabilities and desired software handling of the control """ - def __init__(self, device, index, cid, tid, flags): + def __init__(self, device: Device, index, cid, tid, flags): self._device = device self.index = index self._cid = cid @@ -201,7 +226,7 @@ class ReprogrammableKeyV4(ReprogrammableKey): - mapping_flags {List[str]} -- mapping flags set on the control """ - def __init__(self, device, index, cid, tid, flags, pos, group, gmask): + def __init__(self, device: Device, index, cid, tid, flags, pos, group, gmask): ReprogrammableKey.__init__(self, device, index, cid, tid, flags) self.pos = pos self.group = group @@ -460,7 +485,7 @@ class KeysArray: class KeysArrayV2(KeysArray): - def __init__(self, device, count, version=1): + def __init__(self, device: Device, count, version=1): super().__init__(device, count, version) """The mapping from Control IDs to their native Task IDs. For example, Control "Left Button" is mapped to Task "Left Click". @@ -1417,7 +1442,7 @@ class Hidpp20: offset = offset + 2 return (unitId.hex().upper(), modelId.hex().upper(), tid_map) - def get_kind(self, device): + def get_kind(self, device: Device): """Reads a device's type. :see DEVICE_KIND: @@ -1431,7 +1456,7 @@ class Hidpp20: # logger.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind]) return KIND_MAP[DEVICE_KIND[kind]] - def get_name(self, device): + def get_name(self, device: Device): """Reads a device's name. :returns: a string with the device name, or ``None`` if the device is not @@ -1452,7 +1477,7 @@ class Hidpp20: return name.decode("utf-8") - def get_friendly_name(self, device): + def get_friendly_name(self, device: Device): """Reads a device's friendly name. :returns: a string with the device name, or ``None`` if the device is not @@ -1473,22 +1498,22 @@ class Hidpp20: return name.decode("utf-8") - def get_battery_status(self, device): + def get_battery_status(self, device: Device): report = device.feature_request(FEATURE.BATTERY_STATUS) if report: return decipher_battery_status(report) - def get_battery_unified(self, device): + def get_battery_unified(self, device: Device): report = device.feature_request(FEATURE.UNIFIED_BATTERY, 0x10) if report is not None: return decipher_battery_unified(report) - def get_battery_voltage(self, device): + def get_battery_voltage(self, device: Device): report = device.feature_request(FEATURE.BATTERY_VOLTAGE) if report is not None: return decipher_battery_voltage(report) - def get_adc_measurement(self, device): + def get_adc_measurement(self, device: Device): try: # this feature call produces an error for headsets that are connected but inactive report = device.feature_request(FEATURE.ADC_MEASUREMENT) if report is not None: @@ -1513,7 +1538,7 @@ class Hidpp20: return result return 0 - def get_keys(self, device): + def get_keys(self, device: Device): # TODO: add here additional variants for other REPROG_CONTROLS count = None if FEATURE.REPROG_CONTROLS_V2 in device.features: @@ -1524,30 +1549,30 @@ class Hidpp20: return KeysArrayV4(device, ord(count[:1])) return None - def get_remap_keys(self, device): + def get_remap_keys(self, device: Device): count = device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10) if count: return KeysArrayPersistent(device, ord(count[:1])) - def get_gestures(self, device): + def get_gestures(self, device: Device): if getattr(device, "_gestures", None) is not None: return device._gestures if FEATURE.GESTURE_2 in device.features: return Gestures(device) - def get_backlight(self, device): + def get_backlight(self, device: Device): if getattr(device, "_backlight", None) is not None: return device._backlight if FEATURE.BACKLIGHT2 in device.features: return Backlight(device) - def get_profiles(self, device): + def get_profiles(self, device: Device): if getattr(device, "_profiles", None) is not None: return device._profiles if FEATURE.ONBOARD_PROFILES in device.features: return OnboardProfiles.from_device(device) - def get_mouse_pointer_info(self, device): + def get_mouse_pointer_info(self, device: Device): pointer_info = device.feature_request(FEATURE.MOUSE_POINTER) if pointer_info: dpi, flags = _unpack("!HB", pointer_info[:3]) @@ -1561,7 +1586,7 @@ class Hidpp20: "suggest_vertical_orientation": suggest_vertical_orientation, } - def get_vertical_scrolling_info(self, device): + def get_vertical_scrolling_info(self, device: Device): vertical_scrolling_info = device.feature_request(FEATURE.VERTICAL_SCROLLING) if vertical_scrolling_info: roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3]) @@ -1577,13 +1602,13 @@ class Hidpp20: )[roller] return {"roller": roller_type, "ratchet": ratchet, "lines": lines} - def get_hi_res_scrolling_info(self, device): + def get_hi_res_scrolling_info(self, device: Device): hi_res_scrolling_info = device.feature_request(FEATURE.HI_RES_SCROLLING) if hi_res_scrolling_info: mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2]) return mode, resolution - def get_pointer_speed_info(self, device): + def get_pointer_speed_info(self, device: Device): pointer_speed_info = device.feature_request(FEATURE.POINTER_SPEED) if pointer_speed_info: pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2]) @@ -1591,14 +1616,14 @@ class Hidpp20: # pointer_speed_lo = pointer_speed_lo return pointer_speed_hi + pointer_speed_lo / 256 - def get_lowres_wheel_status(self, device): + def get_lowres_wheel_status(self, device: Device): lowres_wheel_status = device.feature_request(FEATURE.LOWRES_WHEEL) if lowres_wheel_status: wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0] wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01] return wheel_reporting - def get_hires_wheel(self, device): + def get_hires_wheel(self, device: Device): caps = device.feature_request(FEATURE.HIRES_WHEEL, 0x00) mode = device.feature_request(FEATURE.HIRES_WHEEL, 0x10) ratchet = device.feature_request(FEATURE.HIRES_WHEEL, 0x030) @@ -1624,7 +1649,7 @@ class Hidpp20: return multi, has_invert, has_ratchet, inv, res, target, ratchet - def get_new_fn_inversion(self, device): + def get_new_fn_inversion(self, device: Device): state = device.feature_request(FEATURE.NEW_FN_INVERSION, 0x00) if state: inverted, default_inverted = _unpack("!BB", state[:2]) @@ -1632,7 +1657,7 @@ class Hidpp20: default_inverted = (default_inverted & 0x01) != 0 return inverted, default_inverted - def get_host_names(self, device): + def get_host_names(self, device: Device): state = device.feature_request(FEATURE.HOSTS_INFO, 0x00) host_names = {} if state: @@ -1658,7 +1683,7 @@ class Hidpp20: host_names[currentHost] = (host_names[currentHost][0], hostname) return host_names - def set_host_name(self, device, name, currentName=""): + def set_host_name(self, device: Device, name, currentName=""): name = bytearray(name, "utf-8") currentName = bytearray(currentName, "utf-8") if logger.isEnabledFor(logging.INFO): @@ -1680,18 +1705,18 @@ class Hidpp20: chunk += 14 return True - def get_onboard_mode(self, device): + def get_onboard_mode(self, device: Device): state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x20) if state: mode = _unpack("!B", state[:1])[0] return mode - def set_onboard_mode(self, device, mode): + def set_onboard_mode(self, device: Device, mode): state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x10, mode) return state - def get_polling_rate(self, device): + def get_polling_rate(self, device: Device): state = device.feature_request(FEATURE.REPORT_RATE, 0x10) if state: rate = _unpack("!B", state[:1])[0] @@ -1703,14 +1728,14 @@ class Hidpp20: rate = _unpack("!B", state[:1])[0] return rates[rate] - def get_remaining_pairing(self, device): + def get_remaining_pairing(self, device: Device): result = device.feature_request(FEATURE.REMAINING_PAIRING, 0x0) if result: result = _unpack("!B", result[:1])[0] FEATURE._fallback = lambda x: f"unknown:{x:04X}" return result - def config_change(self, device, configuration, no_reply=False): + def config_change(self, device: Device, configuration, no_reply=False): return device.feature_request(FEATURE.CONFIG_CHANGE, 0x10, configuration, no_reply=no_reply) diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 6a983ce0..4a9b29f5 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -37,8 +37,6 @@ from .common import strhex as _strhex logger = logging.getLogger(__name__) -_hidpp10 = hidpp10.Hidpp10() -_hidpp20 = hidpp20.Hidpp20() _R = _hidpp10_constants.REGISTERS _F = _hidpp20_constants.FEATURE diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index 859fccac..c60474bd 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -20,6 +20,7 @@ import logging import time from dataclasses import dataclass +from typing import Callable from typing import Optional import hidapi as _hid @@ -62,6 +63,8 @@ class Receiver: The paired devices are available through the sequence interface. """ + read_register: Callable = hidpp10.read_register + write_register: Callable = hidpp10.write_register number = 0xFF kind = None @@ -255,9 +258,6 @@ class Receiver: def reset_pairing(self): self.pairing = Pairing() - read_register = hidpp10.read_register - write_register = hidpp10.write_register - def __iter__(self): connected_devices = self.count() found_devices = 0