device: use feature_request from the device everywhere in hidpp20

This commit is contained in:
Peter F. Patel-Schneider 2024-03-22 13:25:36 -04:00
parent 7d6428a03b
commit 47ba1402ed
1 changed files with 56 additions and 56 deletions

View File

@ -269,7 +269,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
def _getCidReporting(self):
try:
mapped_data = feature_request(self._device, FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(_pack("!H", self._cid)))
mapped_data = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(_pack("!H", self._cid)))
if mapped_data:
cid, mapping_flags_1, mapped_to = _unpack("!HBH", mapped_data[:5])
if cid != self._cid and logger.isEnabledFor(logging.WARNING):
@ -343,7 +343,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
pkt = tuple(_pack("!HBH", self._cid, bfield & 0xFF, remap))
# TODO: to fully support version 4 of REPROG_CONTROLS_V4, append `(bfield >> 8) & 0xff` here.
# But older devices might behave oddly given that byte, so we don't send it.
ret = feature_request(self._device, FEATURE.REPROG_CONTROLS_V4, 0x30, *pkt)
ret = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x30, *pkt)
if ret is None or _unpack("!BBBBB", ret[:5]) != pkt and logger.isEnabledFor(logging.DEBUG):
logger.debug(f"REPROG_CONTROLS_v4 setCidReporting on device {self._device} didn't echo request packet.")
@ -402,13 +402,13 @@ class PersistentRemappableAction:
def remap(self, data_bytes):
cid = _int2bytes(self._cid, 2)
if _bytes2int(data_bytes) == special_keys.KEYS_Default: # map back to default
feature_request(self._device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x50, cid, 0xFF)
self._device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x50, cid, 0xFF)
self._device.remap_keys._query_key(self.index)
return self._device.remap_keys.keys[self.index].data_bytes
else:
self._actionId, self._code, self._modifierMask = _unpack("!BHB", data_bytes)
self.cidStatus = 0x01
feature_request(self._device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x40, cid, 0xFF, data_bytes)
self._device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x40, cid, 0xFF, data_bytes)
return True
@ -436,13 +436,13 @@ class KeysArray:
# TODO: add here additional variants for other REPROG_CONTROLS
if self.keyversion == FEATURE.REPROG_CONTROLS_V2:
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V2, 0x10, index)
keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS_V2, 0x10, index)
if keydata:
cid, tid, flags = _unpack("!HHB", keydata[:5])
self.keys[index] = ReprogrammableKey(self.device, index, cid, tid, flags)
self.cid_to_tid[cid] = tid
elif self.keyversion == FEATURE.REPROG_CONTROLS_V4:
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V4, 0x10, index)
keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x10, index)
if keydata:
cid, tid, flags1, pos, group, gmask, flags2 = _unpack("!HHBBBBB", keydata[:9])
flags = flags1 | (flags2 << 8)
@ -507,7 +507,7 @@ class KeysArrayV1(KeysArray):
def _query_key(self, index: int):
if index < 0 or index >= len(self.keys):
raise IndexError(index)
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS, 0x10, index)
keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS, 0x10, index)
if keydata:
cid, tid, flags = _unpack("!HHB", keydata[:5])
self.keys[index] = ReprogrammableKey(self.device, index, cid, tid, flags)
@ -523,7 +523,7 @@ class KeysArrayV4(KeysArrayV1):
def _query_key(self, index: int):
if index < 0 or index >= len(self.keys):
raise IndexError(index)
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V4, 0x10, index)
keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x10, index)
if keydata:
cid, tid, flags1, pos, group, gmask, flags2 = _unpack("!HHBBBBB", keydata[:9])
flags = flags1 | (flags2 << 8)
@ -552,12 +552,12 @@ class KeysArrayPersistent(KeysArray):
def _query_key(self, index: int):
if index < 0 or index >= len(self.keys):
raise IndexError(index)
keydata = feature_request(self.device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF)
keydata = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF)
if keydata:
key = _unpack("!H", keydata[:2])[0]
try:
mapped_data = feature_request(
self.device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key & 0xFF00, key & 0xFF, 0xFF
mapped_data = self.device.feature_request(
FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key & 0xFF00, key & 0xFF, 0xFF
)
if mapped_data:
_ignore, _ignore, actionId, remapped, modifiers, status = _unpack("!HBBHBB", mapped_data[:8])
@ -689,7 +689,7 @@ class Gesture:
def enabled(self): # is the gesture enabled?
if self._enabled is None and self.index is not None:
offset, mask = self.enable_offset_mask()
result = feature_request(self._device, FEATURE.GESTURE_2, 0x10, offset, 0x01, mask)
result = self._device.feature_request(FEATURE.GESTURE_2, 0x10, offset, 0x01, mask)
self._enabled = bool(result[0] & mask) if result else None
return self._enabled
@ -698,13 +698,13 @@ class Gesture:
return None
if self.index is not None:
offset, mask = self.enable_offset_mask()
reply = feature_request(self._device, FEATURE.GESTURE_2, 0x20, offset, 0x01, mask, mask if enable else 0x00)
reply = self._device.feature_request(FEATURE.GESTURE_2, 0x20, offset, 0x01, mask, mask if enable else 0x00)
return reply
def diverted(self): # is the gesture diverted?
if self._diverted is None and self.diversion_index is not None:
offset, mask = self.diversion_offset_mask()
result = feature_request(self._device, FEATURE.GESTURE_2, 0x30, offset, 0x01, mask)
result = self._device.feature_request(FEATURE.GESTURE_2, 0x30, offset, 0x01, mask)
self._diverted = bool(result[0] & mask) if result else None
return self._diverted
@ -713,7 +713,7 @@ class Gesture:
return None
if self.diversion_index is not None:
offset, mask = self.diversion_offset_mask()
reply = feature_request(self._device, FEATURE.GESTURE_2, 0x40, offset, 0x01, mask, mask if diverted else 0x00)
reply = self._device.feature_request(FEATURE.GESTURE_2, 0x40, offset, 0x01, mask, mask if diverted else 0x00)
return reply
def as_int(self):
@ -753,7 +753,7 @@ class Param:
return self._value if self._value is not None else self.read()
def read(self): # returns the bytes for the parameter
result = feature_request(self._device, FEATURE.GESTURE_2, 0x70, self.index, 0xFF)
result = self._device.feature_request(FEATURE.GESTURE_2, 0x70, self.index, 0xFF)
if result:
self._value = _bytes2int(result[: self.size])
return self._value
@ -765,14 +765,14 @@ class Param:
return self._default_value
def _read_default(self):
result = feature_request(self._device, FEATURE.GESTURE_2, 0x60, self.index, 0xFF)
result = self._device.feature_request(FEATURE.GESTURE_2, 0x60, self.index, 0xFF)
if result:
self._default_value = _bytes2int(result[: self.size])
return self._default_value
def write(self, bytes):
self._value = bytes
return feature_request(self._device, FEATURE.GESTURE_2, 0x80, self.index, bytes, 0xFF)
return self._device.feature_request(FEATURE.GESTURE_2, 0x80, self.index, bytes, 0xFF)
def __str__(self):
return str(self.param)
@ -797,7 +797,7 @@ class Spec:
def read(self):
try:
value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF)
value = self._device.feature_request(FEATURE.GESTURE_2, 0x50, self.id, 0xFF)
except exceptions.FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus)
if logger.isEnabledFor(logging.WARNING):
logger.warning(
@ -826,7 +826,7 @@ class Gestures:
field_high = 0x00
while field_high != 0x01: # end of fields
# retrieve the next eight fields
fields = feature_request(device, FEATURE.GESTURE_2, 0x00, index >> 8, index & 0xFF)
fields = device.feature_request(FEATURE.GESTURE_2, 0x00, index >> 8, index & 0xFF)
if not fields:
break
for offset in range(8):
@ -1394,13 +1394,13 @@ class Hidpp20:
:returns: a list of FirmwareInfo tuples, ordered by firmware layer.
"""
count = feature_request(device, FEATURE.DEVICE_FW_VERSION)
count = device.feature_request(FEATURE.DEVICE_FW_VERSION)
if count:
count = ord(count[:1])
fw = []
for index in range(0, count):
fw_info = feature_request(device, FEATURE.DEVICE_FW_VERSION, 0x10, index)
fw_info = device.feature_request(FEATURE.DEVICE_FW_VERSION, 0x10, index)
if fw_info:
level = ord(fw_info[:1]) & 0x0F
if level == 0 or level == 1:
@ -1422,7 +1422,7 @@ class Hidpp20:
def get_ids(self, device):
"""Reads a device's ids (unit and model numbers)"""
ids = feature_request(device, FEATURE.DEVICE_FW_VERSION)
ids = device.feature_request(FEATURE.DEVICE_FW_VERSION)
if ids:
unitId = ids[1:5]
modelId = ids[7:13]
@ -1442,7 +1442,7 @@ class Hidpp20:
:returns: a string describing the device type, or ``None`` if the device is
not available or does not support the ``DEVICE_NAME`` feature.
"""
kind = feature_request(device, FEATURE.DEVICE_NAME, 0x20)
kind = device.feature_request(FEATURE.DEVICE_NAME, 0x20)
if kind:
kind = ord(kind[:1])
# if logger.isEnabledFor(logging.DEBUG):
@ -1455,13 +1455,13 @@ class Hidpp20:
:returns: a string with the device name, or ``None`` if the device is not
available or does not support the ``DEVICE_NAME`` feature.
"""
name_length = feature_request(device, FEATURE.DEVICE_NAME)
name_length = device.feature_request(FEATURE.DEVICE_NAME)
if name_length:
name_length = ord(name_length[:1])
name = b""
while len(name) < name_length:
fragment = feature_request(device, FEATURE.DEVICE_NAME, 0x10, len(name))
fragment = device.feature_request(FEATURE.DEVICE_NAME, 0x10, len(name))
if fragment:
name += fragment[: name_length - len(name)]
else:
@ -1476,13 +1476,13 @@ class Hidpp20:
:returns: a string with the device name, or ``None`` if the device is not
available or does not support the ``DEVICE_NAME`` feature.
"""
name_length = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME)
name_length = device.feature_request(FEATURE.DEVICE_FRIENDLY_NAME)
if name_length:
name_length = ord(name_length[:1])
name = b""
while len(name) < name_length:
fragment = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME, 0x10, len(name))
fragment = device.feature_request(FEATURE.DEVICE_FRIENDLY_NAME, 0x10, len(name))
if fragment:
name += fragment[1 : name_length - len(name) + 1]
else:
@ -1492,23 +1492,23 @@ class Hidpp20:
return name.decode("utf-8")
def get_battery_status(self, device):
report = feature_request(device, FEATURE.BATTERY_STATUS)
report = device.feature_request(FEATURE.BATTERY_STATUS)
if report:
return decipher_battery_status(report)
def get_battery_unified(self, device):
report = feature_request(device, FEATURE.UNIFIED_BATTERY, 0x10)
report = device.feature_request(FEATURE.UNIFIED_BATTERY, 0x10)
if report is not None:
return decipher_battery_unified(report)
def get_battery_voltage(self, device):
report = feature_request(device, FEATURE.BATTERY_VOLTAGE)
report = device.feature_request(FEATURE.BATTERY_VOLTAGE)
if report is not None:
return decipher_battery_voltage(report)
def get_adc_measurement(self, device):
try: # this feature call produces an error for headsets that are connected but inactive
report = feature_request(device, FEATURE.ADC_MEASUREMENT)
report = device.feature_request(FEATURE.ADC_MEASUREMENT)
if report is not None:
return decipher_adc_measurement(report)
except exceptions.FeatureCallError:
@ -1535,15 +1535,15 @@ class Hidpp20:
# TODO: add here additional variants for other REPROG_CONTROLS
count = None
if FEATURE.REPROG_CONTROLS_V2 in device.features:
count = feature_request(device, FEATURE.REPROG_CONTROLS_V2)
count = device.feature_request(FEATURE.REPROG_CONTROLS_V2)
return KeysArrayV1(device, ord(count[:1]))
elif FEATURE.REPROG_CONTROLS_V4 in device.features:
count = feature_request(device, FEATURE.REPROG_CONTROLS_V4)
count = device.feature_request(FEATURE.REPROG_CONTROLS_V4)
return KeysArrayV4(device, ord(count[:1]))
return None
def get_remap_keys(self, device):
count = feature_request(device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10)
count = device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10)
if count:
return KeysArrayPersistent(device, ord(count[:1]))
@ -1566,7 +1566,7 @@ class Hidpp20:
return OnboardProfiles.from_device(device)
def get_mouse_pointer_info(self, device):
pointer_info = feature_request(device, FEATURE.MOUSE_POINTER)
pointer_info = device.feature_request(FEATURE.MOUSE_POINTER)
if pointer_info:
dpi, flags = _unpack("!HB", pointer_info[:3])
acceleration = ("none", "low", "med", "high")[flags & 0x3]
@ -1580,7 +1580,7 @@ class Hidpp20:
}
def get_vertical_scrolling_info(self, device):
vertical_scrolling_info = feature_request(device, FEATURE.VERTICAL_SCROLLING)
vertical_scrolling_info = device.feature_request(FEATURE.VERTICAL_SCROLLING)
if vertical_scrolling_info:
roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3])
roller_type = (
@ -1596,13 +1596,13 @@ class Hidpp20:
return {"roller": roller_type, "ratchet": ratchet, "lines": lines}
def get_hi_res_scrolling_info(self, device):
hi_res_scrolling_info = feature_request(device, FEATURE.HI_RES_SCROLLING)
hi_res_scrolling_info = device.feature_request(FEATURE.HI_RES_SCROLLING)
if hi_res_scrolling_info:
mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2])
return mode, resolution
def get_pointer_speed_info(self, device):
pointer_speed_info = feature_request(device, FEATURE.POINTER_SPEED)
pointer_speed_info = device.feature_request(FEATURE.POINTER_SPEED)
if pointer_speed_info:
pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2])
# if pointer_speed_lo > 0:
@ -1610,16 +1610,16 @@ class Hidpp20:
return pointer_speed_hi + pointer_speed_lo / 256
def get_lowres_wheel_status(self, device):
lowres_wheel_status = feature_request(device, FEATURE.LOWRES_WHEEL)
lowres_wheel_status = device.feature_request(FEATURE.LOWRES_WHEEL)
if lowres_wheel_status:
wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0]
wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01]
return wheel_reporting
def get_hires_wheel(self, device):
caps = feature_request(device, FEATURE.HIRES_WHEEL, 0x00)
mode = feature_request(device, FEATURE.HIRES_WHEEL, 0x10)
ratchet = feature_request(device, FEATURE.HIRES_WHEEL, 0x030)
caps = device.feature_request(FEATURE.HIRES_WHEEL, 0x00)
mode = device.feature_request(FEATURE.HIRES_WHEEL, 0x10)
ratchet = device.feature_request(FEATURE.HIRES_WHEEL, 0x030)
if caps and mode and ratchet:
# Parse caps
@ -1643,7 +1643,7 @@ class Hidpp20:
return multi, has_invert, has_ratchet, inv, res, target, ratchet
def get_new_fn_inversion(self, device):
state = feature_request(device, FEATURE.NEW_FN_INVERSION, 0x00)
state = device.feature_request(FEATURE.NEW_FN_INVERSION, 0x00)
if state:
inverted, default_inverted = _unpack("!BB", state[:2])
inverted = (inverted & 0x01) != 0
@ -1651,18 +1651,18 @@ class Hidpp20:
return inverted, default_inverted
def get_host_names(self, device):
state = feature_request(device, FEATURE.HOSTS_INFO, 0x00)
state = device.feature_request(FEATURE.HOSTS_INFO, 0x00)
host_names = {}
if state:
capability_flags, _ignore, numHosts, currentHost = _unpack("!BBBB", state[:4])
if capability_flags & 0x01: # device can get host names
for host in range(0, numHosts):
hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, host)
hostinfo = device.feature_request(FEATURE.HOSTS_INFO, 0x10, host)
_ignore, status, _ignore, _ignore, nameLen, _ignore = _unpack("!BBBBBB", hostinfo[:6])
name = ""
remaining = nameLen
while remaining > 0:
name_piece = feature_request(device, FEATURE.HOSTS_INFO, 0x30, host, nameLen - remaining)
name_piece = device.feature_request(FEATURE.HOSTS_INFO, 0x30, host, nameLen - remaining)
if name_piece:
name += name_piece[2 : 2 + min(remaining, 14)].decode()
remaining = max(0, remaining - 14)
@ -1681,55 +1681,55 @@ class Hidpp20:
currentName = bytearray(currentName, "utf-8")
if logger.isEnabledFor(logging.INFO):
logger.info("Setting host name to %s", name)
state = feature_request(device, FEATURE.HOSTS_INFO, 0x00)
state = device.feature_request(FEATURE.HOSTS_INFO, 0x00)
if state:
flags, _ignore, _ignore, currentHost = _unpack("!BBBB", state[:4])
if flags & 0x02:
hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, currentHost)
hostinfo = device.feature_request(FEATURE.HOSTS_INFO, 0x10, currentHost)
_ignore, _ignore, _ignore, _ignore, _ignore, maxNameLen = _unpack("!BBBBBB", hostinfo[:6])
if name[:maxNameLen] == currentName[:maxNameLen] and False:
return True
length = min(maxNameLen, len(name))
chunk = 0
while chunk < length:
response = feature_request(device, FEATURE.HOSTS_INFO, 0x40, currentHost, chunk, name[chunk : chunk + 14])
response = device.feature_request(FEATURE.HOSTS_INFO, 0x40, currentHost, chunk, name[chunk : chunk + 14])
if not response:
return False
chunk += 14
return True
def get_onboard_mode(self, device):
state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x20)
state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x20)
if state:
mode = _unpack("!B", state[:1])[0]
return mode
def set_onboard_mode(self, device, mode):
state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x10, mode)
state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x10, mode)
return state
def get_polling_rate(self, device):
state = feature_request(device, FEATURE.REPORT_RATE, 0x10)
state = device.feature_request(FEATURE.REPORT_RATE, 0x10)
if state:
rate = _unpack("!B", state[:1])[0]
return str(rate) + "ms"
else:
rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"]
state = feature_request(device, FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20)
state = device.feature_request(FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20)
if state:
rate = _unpack("!B", state[:1])[0]
return rates[rate]
def get_remaining_pairing(self, device):
result = feature_request(device, FEATURE.REMAINING_PAIRING, 0x0)
result = device.feature_request(FEATURE.REMAINING_PAIRING, 0x0)
if result:
result = _unpack("!B", result[:1])[0]
FEATURE._fallback = lambda x: f"unknown:{x:04X}"
return result
def config_change(self, device, configuration, no_reply=False):
return feature_request(device, FEATURE.CONFIG_CHANGE, 0x10, configuration, no_reply=no_reply)
return device.feature_request(FEATURE.CONFIG_CHANGE, 0x10, configuration, no_reply=no_reply)
battery_functions = {