Introduce Device protocol and type hints

This commit is contained in:
Matthias Hagmann 2024-04-12 02:58:15 +02:00 committed by Peter F. Patel-Schneider
parent 675cd6ee34
commit 469c04faaf
5 changed files with 146 additions and 87 deletions

View File

@ -20,6 +20,7 @@ import logging
import threading as _threading
import time
from typing import Callable
from typing import Optional
import hidapi as _hid
@ -66,8 +67,8 @@ class DeviceFactory:
class Device:
instances = []
read_register = hidpp10.read_register
write_register = hidpp10.write_register
read_register: Callable = hidpp10.read_register
write_register: Callable = hidpp10.write_register
def __init__(self, receiver, number, online, pairing_info=None, handle=None, device_info=None, setting_callback=None):
assert receiver or device_info

View File

@ -13,10 +13,17 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from __future__ import annotations
import logging
from .common import Battery as _Battery
from typing import Any
from typing_extensions import Protocol
from .common import Battery
from .common import BatteryChargeApproximation
from .common import BatteryStatus
from .common import FirmwareInfo as _FirmwareInfo
from .common import bytes2int as _bytes2int
from .common import int2bytes as _int2bytes
@ -26,19 +33,36 @@ from .hidpp20_constants import FIRMWARE_KIND
logger = logging.getLogger(__name__)
#
# functions
#
class Device(Protocol):
def request(self, request_id, *params):
...
@property
def kind(self) -> Any:
...
@property
def online(self) -> bool:
...
@property
def protocol(self) -> Any:
...
@property
def registers(self) -> list:
...
def read_register(device, register_number, *params):
def read_register(device: Device, register_number, *params):
assert device is not None, f"tried to read register {register_number:02X} from invalid device {device}"
# support long registers by adding a 2 in front of the register number
request_id = 0x8100 | (int(register_number) & 0x2FF)
return device.request(request_id, *params)
def write_register(device, register_number, *value):
def write_register(device: Device, register_number, *value):
assert device is not None, f"tried to write register {register_number:02X} to invalid device {device}"
# support long registers by adding a 2 in front of the register number
request_id = 0x8000 | (int(register_number) & 0x2FF)
@ -59,7 +83,7 @@ def set_configuration_pending_flags(receiver, devices):
class Hidpp10:
def get_battery(self, device):
def get_battery(self, device: Device):
assert device is not None
assert device.kind is not None
if not device.online:
@ -89,7 +113,7 @@ class Hidpp10:
device.registers.append(REGISTERS.battery_status)
return parse_battery_status(REGISTERS.battery_status, reply)
def get_firmware(self, device):
def get_firmware(self, device: Device):
assert device is not None
firmware = [None, None, None]
@ -124,7 +148,7 @@ class Hidpp10:
if any(firmware):
return tuple(f for f in firmware if f)
def set_3leds(self, device, battery_level=None, charging=None, warning=None):
def set_3leds(self, device: Device, battery_level=None, charging=None, warning=None):
assert device is not None
assert device.kind is not None
if not device.online:
@ -134,17 +158,17 @@ class Hidpp10:
return
if battery_level is not None:
if battery_level < _Battery.APPROX.critical:
if battery_level < BatteryChargeApproximation.CRITICAL:
# 1 orange, and force blink
v1, v2 = 0x22, 0x00
warning = True
elif battery_level < _Battery.APPROX.low:
elif battery_level < BatteryChargeApproximation.LOW:
# 1 orange
v1, v2 = 0x22, 0x00
elif battery_level < _Battery.APPROX.good:
elif battery_level < BatteryChargeApproximation.GOOD:
# 1 green
v1, v2 = 0x20, 0x00
elif battery_level < _Battery.APPROX.full:
elif battery_level < BatteryChargeApproximation.FULL:
# 2 greens
v1, v2 = 0x20, 0x02
else:
@ -166,10 +190,10 @@ class Hidpp10:
write_register(device, REGISTERS.three_leds, v1, v2)
def get_notification_flags(self, device):
def get_notification_flags(self, device: Device):
return self._get_register(device, REGISTERS.notifications)
def set_notification_flags(self, device, *flag_bits):
def set_notification_flags(self, device: Device, *flag_bits):
assert device is not None
# Avoid a call if the device is not online,
@ -184,10 +208,10 @@ class Hidpp10:
result = write_register(device, REGISTERS.notifications, _int2bytes(flag_bits, 3))
return result is not None
def get_device_features(self, device):
def get_device_features(self, device: Device):
return self._get_register(device, REGISTERS.mouse_button_flags)
def _get_register(self, device, register):
def _get_register(self, device: Device, register):
assert device is not None
# Avoid a call if the device is not online,
@ -203,50 +227,61 @@ class Hidpp10:
return _bytes2int(flags)
def parse_battery_status(register, reply):
def parse_battery_status(register, reply) -> Battery | None:
def status_byte_to_charge(status_byte_: int) -> BatteryChargeApproximation:
if status_byte_ == 7:
charge_ = BatteryChargeApproximation.FULL
elif status_byte_ == 5:
charge_ = BatteryChargeApproximation.GOOD
elif status_byte_ == 3:
charge_ = BatteryChargeApproximation.LOW
elif status_byte_ == 1:
charge_ = BatteryChargeApproximation.CRITICAL
else:
# pure 'charging' notifications may come without a status
charge_ = BatteryChargeApproximation.EMPTY
return charge_
def status_byte_to_battery_status(status_byte_: int) -> BatteryStatus:
if status_byte_ == 0x30:
status_text_ = BatteryStatus.DISCHARGING
elif status_byte_ == 0x50:
status_text_ = BatteryStatus.RECHARGING
elif status_byte_ == 0x90:
status_text_ = BatteryStatus.FULL
else:
status_text_ = None
return status_text_
def charging_byte_to_status_text(charging_byte_: int) -> BatteryStatus:
if charging_byte_ == 0x00:
status_text_ = BatteryStatus.DISCHARGING
elif charging_byte_ & 0x21 == 0x21:
status_text_ = BatteryStatus.RECHARGING
elif charging_byte_ & 0x22 == 0x22:
status_text_ = BatteryStatus.FULL
else:
logger.warning("could not parse 0x07 battery status: %02X (level %02X)", charging_byte_, status_byte)
status_text_ = None
return status_text_
if register == REGISTERS.battery_charge:
charge = ord(reply[:1])
status_byte = ord(reply[2:3]) & 0xF0
status_text = (
_Battery.STATUS.discharging
if status_byte == 0x30
else _Battery.STATUS.recharging
if status_byte == 0x50
else _Battery.STATUS.full
if status_byte == 0x90
else None
)
return _Battery(charge, None, status_text, None)
battery_status = status_byte_to_battery_status(status_byte)
return Battery(charge, None, battery_status, None)
if register == REGISTERS.battery_status:
status_byte = ord(reply[:1])
charge = (
_Battery.APPROX.full
if status_byte == 7 # full
else _Battery.APPROX.good
if status_byte == 5 # good
else _Battery.APPROX.low
if status_byte == 3 # low
else _Battery.APPROX.critical
if status_byte == 1 # critical
# pure 'charging' notifications may come without a status
else _Battery.APPROX.empty
)
charging_byte = ord(reply[1:2])
if charging_byte == 0x00:
status_text = _Battery.STATUS.discharging
elif charging_byte & 0x21 == 0x21:
status_text = _Battery.STATUS.recharging
elif charging_byte & 0x22 == 0x22:
status_text = _Battery.STATUS.full
else:
logger.warning("could not parse 0x07 battery status: %02X (level %02X)", charging_byte, status_byte)
status_text = None
status_text = charging_byte_to_status_text(charging_byte)
charge = status_byte_to_charge(status_byte)
if charging_byte & 0x03 and status_byte == 0:
# some 'charging' notifications may come with no battery level information
charge = None
# Return None for next charge level and voltage as these are not in HID++ 1.0 spec
return _Battery(charge, None, status_text, None)
return Battery(charge, None, status_text, None)

View File

@ -27,6 +27,7 @@ from typing import Optional
import yaml as _yaml
from solaar.i18n import _
from typing_extensions import Protocol
from . import exceptions
from . import hidpp10_constants as _hidpp10_constants
@ -53,6 +54,30 @@ logger = logging.getLogger(__name__)
KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KIND}
class Device(Protocol):
def feature_request(self, feature: FEATURE) -> Any:
...
def request(self) -> Any:
...
@property
def features(self) -> Any:
...
@property
def _gestures(self) -> Any:
...
@property
def _backlight(self) -> Any:
...
@property
def _profiles(self) -> Any:
...
class FeaturesArray(dict):
def __init__(self, device):
assert device is not None
@ -159,7 +184,7 @@ class ReprogrammableKey:
- flags {List[str]} -- capabilities and desired software handling of the control
"""
def __init__(self, device, index, cid, tid, flags):
def __init__(self, device: Device, index, cid, tid, flags):
self._device = device
self.index = index
self._cid = cid
@ -201,7 +226,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
- mapping_flags {List[str]} -- mapping flags set on the control
"""
def __init__(self, device, index, cid, tid, flags, pos, group, gmask):
def __init__(self, device: Device, index, cid, tid, flags, pos, group, gmask):
ReprogrammableKey.__init__(self, device, index, cid, tid, flags)
self.pos = pos
self.group = group
@ -460,7 +485,7 @@ class KeysArray:
class KeysArrayV2(KeysArray):
def __init__(self, device, count, version=1):
def __init__(self, device: Device, count, version=1):
super().__init__(device, count, version)
"""The mapping from Control IDs to their native Task IDs.
For example, Control "Left Button" is mapped to Task "Left Click".
@ -1417,7 +1442,7 @@ class Hidpp20:
offset = offset + 2
return (unitId.hex().upper(), modelId.hex().upper(), tid_map)
def get_kind(self, device):
def get_kind(self, device: Device):
"""Reads a device's type.
:see DEVICE_KIND:
@ -1431,7 +1456,7 @@ class Hidpp20:
# logger.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind])
return KIND_MAP[DEVICE_KIND[kind]]
def get_name(self, device):
def get_name(self, device: Device):
"""Reads a device's name.
:returns: a string with the device name, or ``None`` if the device is not
@ -1452,7 +1477,7 @@ class Hidpp20:
return name.decode("utf-8")
def get_friendly_name(self, device):
def get_friendly_name(self, device: Device):
"""Reads a device's friendly name.
:returns: a string with the device name, or ``None`` if the device is not
@ -1473,22 +1498,22 @@ class Hidpp20:
return name.decode("utf-8")
def get_battery_status(self, device):
def get_battery_status(self, device: Device):
report = device.feature_request(FEATURE.BATTERY_STATUS)
if report:
return decipher_battery_status(report)
def get_battery_unified(self, device):
def get_battery_unified(self, device: Device):
report = device.feature_request(FEATURE.UNIFIED_BATTERY, 0x10)
if report is not None:
return decipher_battery_unified(report)
def get_battery_voltage(self, device):
def get_battery_voltage(self, device: Device):
report = device.feature_request(FEATURE.BATTERY_VOLTAGE)
if report is not None:
return decipher_battery_voltage(report)
def get_adc_measurement(self, device):
def get_adc_measurement(self, device: Device):
try: # this feature call produces an error for headsets that are connected but inactive
report = device.feature_request(FEATURE.ADC_MEASUREMENT)
if report is not None:
@ -1513,7 +1538,7 @@ class Hidpp20:
return result
return 0
def get_keys(self, device):
def get_keys(self, device: Device):
# TODO: add here additional variants for other REPROG_CONTROLS
count = None
if FEATURE.REPROG_CONTROLS_V2 in device.features:
@ -1524,30 +1549,30 @@ class Hidpp20:
return KeysArrayV4(device, ord(count[:1]))
return None
def get_remap_keys(self, device):
def get_remap_keys(self, device: Device):
count = device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10)
if count:
return KeysArrayPersistent(device, ord(count[:1]))
def get_gestures(self, device):
def get_gestures(self, device: Device):
if getattr(device, "_gestures", None) is not None:
return device._gestures
if FEATURE.GESTURE_2 in device.features:
return Gestures(device)
def get_backlight(self, device):
def get_backlight(self, device: Device):
if getattr(device, "_backlight", None) is not None:
return device._backlight
if FEATURE.BACKLIGHT2 in device.features:
return Backlight(device)
def get_profiles(self, device):
def get_profiles(self, device: Device):
if getattr(device, "_profiles", None) is not None:
return device._profiles
if FEATURE.ONBOARD_PROFILES in device.features:
return OnboardProfiles.from_device(device)
def get_mouse_pointer_info(self, device):
def get_mouse_pointer_info(self, device: Device):
pointer_info = device.feature_request(FEATURE.MOUSE_POINTER)
if pointer_info:
dpi, flags = _unpack("!HB", pointer_info[:3])
@ -1561,7 +1586,7 @@ class Hidpp20:
"suggest_vertical_orientation": suggest_vertical_orientation,
}
def get_vertical_scrolling_info(self, device):
def get_vertical_scrolling_info(self, device: Device):
vertical_scrolling_info = device.feature_request(FEATURE.VERTICAL_SCROLLING)
if vertical_scrolling_info:
roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3])
@ -1577,13 +1602,13 @@ class Hidpp20:
)[roller]
return {"roller": roller_type, "ratchet": ratchet, "lines": lines}
def get_hi_res_scrolling_info(self, device):
def get_hi_res_scrolling_info(self, device: Device):
hi_res_scrolling_info = device.feature_request(FEATURE.HI_RES_SCROLLING)
if hi_res_scrolling_info:
mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2])
return mode, resolution
def get_pointer_speed_info(self, device):
def get_pointer_speed_info(self, device: Device):
pointer_speed_info = device.feature_request(FEATURE.POINTER_SPEED)
if pointer_speed_info:
pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2])
@ -1591,14 +1616,14 @@ class Hidpp20:
# pointer_speed_lo = pointer_speed_lo
return pointer_speed_hi + pointer_speed_lo / 256
def get_lowres_wheel_status(self, device):
def get_lowres_wheel_status(self, device: Device):
lowres_wheel_status = device.feature_request(FEATURE.LOWRES_WHEEL)
if lowres_wheel_status:
wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0]
wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01]
return wheel_reporting
def get_hires_wheel(self, device):
def get_hires_wheel(self, device: Device):
caps = device.feature_request(FEATURE.HIRES_WHEEL, 0x00)
mode = device.feature_request(FEATURE.HIRES_WHEEL, 0x10)
ratchet = device.feature_request(FEATURE.HIRES_WHEEL, 0x030)
@ -1624,7 +1649,7 @@ class Hidpp20:
return multi, has_invert, has_ratchet, inv, res, target, ratchet
def get_new_fn_inversion(self, device):
def get_new_fn_inversion(self, device: Device):
state = device.feature_request(FEATURE.NEW_FN_INVERSION, 0x00)
if state:
inverted, default_inverted = _unpack("!BB", state[:2])
@ -1632,7 +1657,7 @@ class Hidpp20:
default_inverted = (default_inverted & 0x01) != 0
return inverted, default_inverted
def get_host_names(self, device):
def get_host_names(self, device: Device):
state = device.feature_request(FEATURE.HOSTS_INFO, 0x00)
host_names = {}
if state:
@ -1658,7 +1683,7 @@ class Hidpp20:
host_names[currentHost] = (host_names[currentHost][0], hostname)
return host_names
def set_host_name(self, device, name, currentName=""):
def set_host_name(self, device: Device, name, currentName=""):
name = bytearray(name, "utf-8")
currentName = bytearray(currentName, "utf-8")
if logger.isEnabledFor(logging.INFO):
@ -1680,18 +1705,18 @@ class Hidpp20:
chunk += 14
return True
def get_onboard_mode(self, device):
def get_onboard_mode(self, device: Device):
state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x20)
if state:
mode = _unpack("!B", state[:1])[0]
return mode
def set_onboard_mode(self, device, mode):
def set_onboard_mode(self, device: Device, mode):
state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x10, mode)
return state
def get_polling_rate(self, device):
def get_polling_rate(self, device: Device):
state = device.feature_request(FEATURE.REPORT_RATE, 0x10)
if state:
rate = _unpack("!B", state[:1])[0]
@ -1703,14 +1728,14 @@ class Hidpp20:
rate = _unpack("!B", state[:1])[0]
return rates[rate]
def get_remaining_pairing(self, device):
def get_remaining_pairing(self, device: Device):
result = device.feature_request(FEATURE.REMAINING_PAIRING, 0x0)
if result:
result = _unpack("!B", result[:1])[0]
FEATURE._fallback = lambda x: f"unknown:{x:04X}"
return result
def config_change(self, device, configuration, no_reply=False):
def config_change(self, device: Device, configuration, no_reply=False):
return device.feature_request(FEATURE.CONFIG_CHANGE, 0x10, configuration, no_reply=no_reply)

View File

@ -37,8 +37,6 @@ from .common import strhex as _strhex
logger = logging.getLogger(__name__)
_hidpp10 = hidpp10.Hidpp10()
_hidpp20 = hidpp20.Hidpp20()
_R = _hidpp10_constants.REGISTERS
_F = _hidpp20_constants.FEATURE

View File

@ -20,6 +20,7 @@ import logging
import time
from dataclasses import dataclass
from typing import Callable
from typing import Optional
import hidapi as _hid
@ -62,6 +63,8 @@ class Receiver:
The paired devices are available through the sequence interface.
"""
read_register: Callable = hidpp10.read_register
write_register: Callable = hidpp10.write_register
number = 0xFF
kind = None
@ -255,9 +258,6 @@ class Receiver:
def reset_pairing(self):
self.pairing = Pairing()
read_register = hidpp10.read_register
write_register = hidpp10.write_register
def __iter__(self):
connected_devices = self.count()
found_devices = 0