From 47ba1402ed5428ffc14edfc3767a3921d8972316 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Fri, 22 Mar 2024 13:25:36 -0400 Subject: [PATCH] device: use feature_request from the device everywhere in hidpp20 --- lib/logitech_receiver/hidpp20.py | 112 +++++++++++++++---------------- 1 file changed, 56 insertions(+), 56 deletions(-) diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 15c1c475..384a91d1 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -269,7 +269,7 @@ class ReprogrammableKeyV4(ReprogrammableKey): def _getCidReporting(self): try: - mapped_data = feature_request(self._device, FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(_pack("!H", self._cid))) + mapped_data = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(_pack("!H", self._cid))) if mapped_data: cid, mapping_flags_1, mapped_to = _unpack("!HBH", mapped_data[:5]) if cid != self._cid and logger.isEnabledFor(logging.WARNING): @@ -343,7 +343,7 @@ class ReprogrammableKeyV4(ReprogrammableKey): pkt = tuple(_pack("!HBH", self._cid, bfield & 0xFF, remap)) # TODO: to fully support version 4 of REPROG_CONTROLS_V4, append `(bfield >> 8) & 0xff` here. # But older devices might behave oddly given that byte, so we don't send it. - ret = feature_request(self._device, FEATURE.REPROG_CONTROLS_V4, 0x30, *pkt) + ret = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x30, *pkt) if ret is None or _unpack("!BBBBB", ret[:5]) != pkt and logger.isEnabledFor(logging.DEBUG): logger.debug(f"REPROG_CONTROLS_v4 setCidReporting on device {self._device} didn't echo request packet.") @@ -402,13 +402,13 @@ class PersistentRemappableAction: def remap(self, data_bytes): cid = _int2bytes(self._cid, 2) if _bytes2int(data_bytes) == special_keys.KEYS_Default: # map back to default - feature_request(self._device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x50, cid, 0xFF) + self._device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x50, cid, 0xFF) self._device.remap_keys._query_key(self.index) return self._device.remap_keys.keys[self.index].data_bytes else: self._actionId, self._code, self._modifierMask = _unpack("!BHB", data_bytes) self.cidStatus = 0x01 - feature_request(self._device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x40, cid, 0xFF, data_bytes) + self._device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x40, cid, 0xFF, data_bytes) return True @@ -436,13 +436,13 @@ class KeysArray: # TODO: add here additional variants for other REPROG_CONTROLS if self.keyversion == FEATURE.REPROG_CONTROLS_V2: - keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V2, 0x10, index) + keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS_V2, 0x10, index) if keydata: cid, tid, flags = _unpack("!HHB", keydata[:5]) self.keys[index] = ReprogrammableKey(self.device, index, cid, tid, flags) self.cid_to_tid[cid] = tid elif self.keyversion == FEATURE.REPROG_CONTROLS_V4: - keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V4, 0x10, index) + keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x10, index) if keydata: cid, tid, flags1, pos, group, gmask, flags2 = _unpack("!HHBBBBB", keydata[:9]) flags = flags1 | (flags2 << 8) @@ -507,7 +507,7 @@ class KeysArrayV1(KeysArray): def _query_key(self, index: int): if index < 0 or index >= len(self.keys): raise IndexError(index) - keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS, 0x10, index) + keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS, 0x10, index) if keydata: cid, tid, flags = _unpack("!HHB", keydata[:5]) self.keys[index] = ReprogrammableKey(self.device, index, cid, tid, flags) @@ -523,7 +523,7 @@ class KeysArrayV4(KeysArrayV1): def _query_key(self, index: int): if index < 0 or index >= len(self.keys): raise IndexError(index) - keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V4, 0x10, index) + keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x10, index) if keydata: cid, tid, flags1, pos, group, gmask, flags2 = _unpack("!HHBBBBB", keydata[:9]) flags = flags1 | (flags2 << 8) @@ -552,12 +552,12 @@ class KeysArrayPersistent(KeysArray): def _query_key(self, index: int): if index < 0 or index >= len(self.keys): raise IndexError(index) - keydata = feature_request(self.device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF) + keydata = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF) if keydata: key = _unpack("!H", keydata[:2])[0] try: - mapped_data = feature_request( - self.device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key & 0xFF00, key & 0xFF, 0xFF + mapped_data = self.device.feature_request( + FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key & 0xFF00, key & 0xFF, 0xFF ) if mapped_data: _ignore, _ignore, actionId, remapped, modifiers, status = _unpack("!HBBHBB", mapped_data[:8]) @@ -689,7 +689,7 @@ class Gesture: def enabled(self): # is the gesture enabled? if self._enabled is None and self.index is not None: offset, mask = self.enable_offset_mask() - result = feature_request(self._device, FEATURE.GESTURE_2, 0x10, offset, 0x01, mask) + result = self._device.feature_request(FEATURE.GESTURE_2, 0x10, offset, 0x01, mask) self._enabled = bool(result[0] & mask) if result else None return self._enabled @@ -698,13 +698,13 @@ class Gesture: return None if self.index is not None: offset, mask = self.enable_offset_mask() - reply = feature_request(self._device, FEATURE.GESTURE_2, 0x20, offset, 0x01, mask, mask if enable else 0x00) + reply = self._device.feature_request(FEATURE.GESTURE_2, 0x20, offset, 0x01, mask, mask if enable else 0x00) return reply def diverted(self): # is the gesture diverted? if self._diverted is None and self.diversion_index is not None: offset, mask = self.diversion_offset_mask() - result = feature_request(self._device, FEATURE.GESTURE_2, 0x30, offset, 0x01, mask) + result = self._device.feature_request(FEATURE.GESTURE_2, 0x30, offset, 0x01, mask) self._diverted = bool(result[0] & mask) if result else None return self._diverted @@ -713,7 +713,7 @@ class Gesture: return None if self.diversion_index is not None: offset, mask = self.diversion_offset_mask() - reply = feature_request(self._device, FEATURE.GESTURE_2, 0x40, offset, 0x01, mask, mask if diverted else 0x00) + reply = self._device.feature_request(FEATURE.GESTURE_2, 0x40, offset, 0x01, mask, mask if diverted else 0x00) return reply def as_int(self): @@ -753,7 +753,7 @@ class Param: return self._value if self._value is not None else self.read() def read(self): # returns the bytes for the parameter - result = feature_request(self._device, FEATURE.GESTURE_2, 0x70, self.index, 0xFF) + result = self._device.feature_request(FEATURE.GESTURE_2, 0x70, self.index, 0xFF) if result: self._value = _bytes2int(result[: self.size]) return self._value @@ -765,14 +765,14 @@ class Param: return self._default_value def _read_default(self): - result = feature_request(self._device, FEATURE.GESTURE_2, 0x60, self.index, 0xFF) + result = self._device.feature_request(FEATURE.GESTURE_2, 0x60, self.index, 0xFF) if result: self._default_value = _bytes2int(result[: self.size]) return self._default_value def write(self, bytes): self._value = bytes - return feature_request(self._device, FEATURE.GESTURE_2, 0x80, self.index, bytes, 0xFF) + return self._device.feature_request(FEATURE.GESTURE_2, 0x80, self.index, bytes, 0xFF) def __str__(self): return str(self.param) @@ -797,7 +797,7 @@ class Spec: def read(self): try: - value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF) + value = self._device.feature_request(FEATURE.GESTURE_2, 0x50, self.id, 0xFF) except exceptions.FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus) if logger.isEnabledFor(logging.WARNING): logger.warning( @@ -826,7 +826,7 @@ class Gestures: field_high = 0x00 while field_high != 0x01: # end of fields # retrieve the next eight fields - fields = feature_request(device, FEATURE.GESTURE_2, 0x00, index >> 8, index & 0xFF) + fields = device.feature_request(FEATURE.GESTURE_2, 0x00, index >> 8, index & 0xFF) if not fields: break for offset in range(8): @@ -1394,13 +1394,13 @@ class Hidpp20: :returns: a list of FirmwareInfo tuples, ordered by firmware layer. """ - count = feature_request(device, FEATURE.DEVICE_FW_VERSION) + count = device.feature_request(FEATURE.DEVICE_FW_VERSION) if count: count = ord(count[:1]) fw = [] for index in range(0, count): - fw_info = feature_request(device, FEATURE.DEVICE_FW_VERSION, 0x10, index) + fw_info = device.feature_request(FEATURE.DEVICE_FW_VERSION, 0x10, index) if fw_info: level = ord(fw_info[:1]) & 0x0F if level == 0 or level == 1: @@ -1422,7 +1422,7 @@ class Hidpp20: def get_ids(self, device): """Reads a device's ids (unit and model numbers)""" - ids = feature_request(device, FEATURE.DEVICE_FW_VERSION) + ids = device.feature_request(FEATURE.DEVICE_FW_VERSION) if ids: unitId = ids[1:5] modelId = ids[7:13] @@ -1442,7 +1442,7 @@ class Hidpp20: :returns: a string describing the device type, or ``None`` if the device is not available or does not support the ``DEVICE_NAME`` feature. """ - kind = feature_request(device, FEATURE.DEVICE_NAME, 0x20) + kind = device.feature_request(FEATURE.DEVICE_NAME, 0x20) if kind: kind = ord(kind[:1]) # if logger.isEnabledFor(logging.DEBUG): @@ -1455,13 +1455,13 @@ class Hidpp20: :returns: a string with the device name, or ``None`` if the device is not available or does not support the ``DEVICE_NAME`` feature. """ - name_length = feature_request(device, FEATURE.DEVICE_NAME) + name_length = device.feature_request(FEATURE.DEVICE_NAME) if name_length: name_length = ord(name_length[:1]) name = b"" while len(name) < name_length: - fragment = feature_request(device, FEATURE.DEVICE_NAME, 0x10, len(name)) + fragment = device.feature_request(FEATURE.DEVICE_NAME, 0x10, len(name)) if fragment: name += fragment[: name_length - len(name)] else: @@ -1476,13 +1476,13 @@ class Hidpp20: :returns: a string with the device name, or ``None`` if the device is not available or does not support the ``DEVICE_NAME`` feature. """ - name_length = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME) + name_length = device.feature_request(FEATURE.DEVICE_FRIENDLY_NAME) if name_length: name_length = ord(name_length[:1]) name = b"" while len(name) < name_length: - fragment = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME, 0x10, len(name)) + fragment = device.feature_request(FEATURE.DEVICE_FRIENDLY_NAME, 0x10, len(name)) if fragment: name += fragment[1 : name_length - len(name) + 1] else: @@ -1492,23 +1492,23 @@ class Hidpp20: return name.decode("utf-8") def get_battery_status(self, device): - report = feature_request(device, FEATURE.BATTERY_STATUS) + report = device.feature_request(FEATURE.BATTERY_STATUS) if report: return decipher_battery_status(report) def get_battery_unified(self, device): - report = feature_request(device, FEATURE.UNIFIED_BATTERY, 0x10) + report = device.feature_request(FEATURE.UNIFIED_BATTERY, 0x10) if report is not None: return decipher_battery_unified(report) def get_battery_voltage(self, device): - report = feature_request(device, FEATURE.BATTERY_VOLTAGE) + report = device.feature_request(FEATURE.BATTERY_VOLTAGE) if report is not None: return decipher_battery_voltage(report) def get_adc_measurement(self, device): try: # this feature call produces an error for headsets that are connected but inactive - report = feature_request(device, FEATURE.ADC_MEASUREMENT) + report = device.feature_request(FEATURE.ADC_MEASUREMENT) if report is not None: return decipher_adc_measurement(report) except exceptions.FeatureCallError: @@ -1535,15 +1535,15 @@ class Hidpp20: # TODO: add here additional variants for other REPROG_CONTROLS count = None if FEATURE.REPROG_CONTROLS_V2 in device.features: - count = feature_request(device, FEATURE.REPROG_CONTROLS_V2) + count = device.feature_request(FEATURE.REPROG_CONTROLS_V2) return KeysArrayV1(device, ord(count[:1])) elif FEATURE.REPROG_CONTROLS_V4 in device.features: - count = feature_request(device, FEATURE.REPROG_CONTROLS_V4) + count = device.feature_request(FEATURE.REPROG_CONTROLS_V4) return KeysArrayV4(device, ord(count[:1])) return None def get_remap_keys(self, device): - count = feature_request(device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10) + count = device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10) if count: return KeysArrayPersistent(device, ord(count[:1])) @@ -1566,7 +1566,7 @@ class Hidpp20: return OnboardProfiles.from_device(device) def get_mouse_pointer_info(self, device): - pointer_info = feature_request(device, FEATURE.MOUSE_POINTER) + pointer_info = device.feature_request(FEATURE.MOUSE_POINTER) if pointer_info: dpi, flags = _unpack("!HB", pointer_info[:3]) acceleration = ("none", "low", "med", "high")[flags & 0x3] @@ -1580,7 +1580,7 @@ class Hidpp20: } def get_vertical_scrolling_info(self, device): - vertical_scrolling_info = feature_request(device, FEATURE.VERTICAL_SCROLLING) + vertical_scrolling_info = device.feature_request(FEATURE.VERTICAL_SCROLLING) if vertical_scrolling_info: roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3]) roller_type = ( @@ -1596,13 +1596,13 @@ class Hidpp20: return {"roller": roller_type, "ratchet": ratchet, "lines": lines} def get_hi_res_scrolling_info(self, device): - hi_res_scrolling_info = feature_request(device, FEATURE.HI_RES_SCROLLING) + hi_res_scrolling_info = device.feature_request(FEATURE.HI_RES_SCROLLING) if hi_res_scrolling_info: mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2]) return mode, resolution def get_pointer_speed_info(self, device): - pointer_speed_info = feature_request(device, FEATURE.POINTER_SPEED) + pointer_speed_info = device.feature_request(FEATURE.POINTER_SPEED) if pointer_speed_info: pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2]) # if pointer_speed_lo > 0: @@ -1610,16 +1610,16 @@ class Hidpp20: return pointer_speed_hi + pointer_speed_lo / 256 def get_lowres_wheel_status(self, device): - lowres_wheel_status = feature_request(device, FEATURE.LOWRES_WHEEL) + lowres_wheel_status = device.feature_request(FEATURE.LOWRES_WHEEL) if lowres_wheel_status: wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0] wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01] return wheel_reporting def get_hires_wheel(self, device): - caps = feature_request(device, FEATURE.HIRES_WHEEL, 0x00) - mode = feature_request(device, FEATURE.HIRES_WHEEL, 0x10) - ratchet = feature_request(device, FEATURE.HIRES_WHEEL, 0x030) + caps = device.feature_request(FEATURE.HIRES_WHEEL, 0x00) + mode = device.feature_request(FEATURE.HIRES_WHEEL, 0x10) + ratchet = device.feature_request(FEATURE.HIRES_WHEEL, 0x030) if caps and mode and ratchet: # Parse caps @@ -1643,7 +1643,7 @@ class Hidpp20: return multi, has_invert, has_ratchet, inv, res, target, ratchet def get_new_fn_inversion(self, device): - state = feature_request(device, FEATURE.NEW_FN_INVERSION, 0x00) + state = device.feature_request(FEATURE.NEW_FN_INVERSION, 0x00) if state: inverted, default_inverted = _unpack("!BB", state[:2]) inverted = (inverted & 0x01) != 0 @@ -1651,18 +1651,18 @@ class Hidpp20: return inverted, default_inverted def get_host_names(self, device): - state = feature_request(device, FEATURE.HOSTS_INFO, 0x00) + state = device.feature_request(FEATURE.HOSTS_INFO, 0x00) host_names = {} if state: capability_flags, _ignore, numHosts, currentHost = _unpack("!BBBB", state[:4]) if capability_flags & 0x01: # device can get host names for host in range(0, numHosts): - hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, host) + hostinfo = device.feature_request(FEATURE.HOSTS_INFO, 0x10, host) _ignore, status, _ignore, _ignore, nameLen, _ignore = _unpack("!BBBBBB", hostinfo[:6]) name = "" remaining = nameLen while remaining > 0: - name_piece = feature_request(device, FEATURE.HOSTS_INFO, 0x30, host, nameLen - remaining) + name_piece = device.feature_request(FEATURE.HOSTS_INFO, 0x30, host, nameLen - remaining) if name_piece: name += name_piece[2 : 2 + min(remaining, 14)].decode() remaining = max(0, remaining - 14) @@ -1681,55 +1681,55 @@ class Hidpp20: currentName = bytearray(currentName, "utf-8") if logger.isEnabledFor(logging.INFO): logger.info("Setting host name to %s", name) - state = feature_request(device, FEATURE.HOSTS_INFO, 0x00) + state = device.feature_request(FEATURE.HOSTS_INFO, 0x00) if state: flags, _ignore, _ignore, currentHost = _unpack("!BBBB", state[:4]) if flags & 0x02: - hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, currentHost) + hostinfo = device.feature_request(FEATURE.HOSTS_INFO, 0x10, currentHost) _ignore, _ignore, _ignore, _ignore, _ignore, maxNameLen = _unpack("!BBBBBB", hostinfo[:6]) if name[:maxNameLen] == currentName[:maxNameLen] and False: return True length = min(maxNameLen, len(name)) chunk = 0 while chunk < length: - response = feature_request(device, FEATURE.HOSTS_INFO, 0x40, currentHost, chunk, name[chunk : chunk + 14]) + response = device.feature_request(FEATURE.HOSTS_INFO, 0x40, currentHost, chunk, name[chunk : chunk + 14]) if not response: return False chunk += 14 return True def get_onboard_mode(self, device): - state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x20) + state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x20) if state: mode = _unpack("!B", state[:1])[0] return mode def set_onboard_mode(self, device, mode): - state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x10, mode) + state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x10, mode) return state def get_polling_rate(self, device): - state = feature_request(device, FEATURE.REPORT_RATE, 0x10) + state = device.feature_request(FEATURE.REPORT_RATE, 0x10) if state: rate = _unpack("!B", state[:1])[0] return str(rate) + "ms" else: rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"] - state = feature_request(device, FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20) + state = device.feature_request(FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20) if state: rate = _unpack("!B", state[:1])[0] return rates[rate] def get_remaining_pairing(self, device): - result = feature_request(device, FEATURE.REMAINING_PAIRING, 0x0) + result = device.feature_request(FEATURE.REMAINING_PAIRING, 0x0) if result: result = _unpack("!B", result[:1])[0] FEATURE._fallback = lambda x: f"unknown:{x:04X}" return result def config_change(self, device, configuration, no_reply=False): - return feature_request(device, FEATURE.CONFIG_CHANGE, 0x10, configuration, no_reply=no_reply) + return device.feature_request(FEATURE.CONFIG_CHANGE, 0x10, configuration, no_reply=no_reply) battery_functions = {