1828 lines
71 KiB
Python
1828 lines
71 KiB
Python
# -*- python-mode -*-
|
|
|
|
## Copyright (C) 2012-2013 Daniel Pavel
|
|
##
|
|
## This program is free software; you can redistribute it and/or modify
|
|
## it under the terms of the GNU General Public License as published by
|
|
## the Free Software Foundation; either version 2 of the License, or
|
|
## (at your option) any later version.
|
|
##
|
|
## This program is distributed in the hope that it will be useful,
|
|
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
## GNU General Public License for more details.
|
|
##
|
|
## You should have received a copy of the GNU General Public License along
|
|
## with this program; if not, write to the Free Software Foundation, Inc.,
|
|
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
|
|
# Logitech Unifying Receiver API.
|
|
|
|
import logging
|
|
import socket
|
|
import threading as _threading
|
|
|
|
from struct import pack as _pack
|
|
from struct import unpack as _unpack
|
|
from typing import List, Optional
|
|
|
|
import yaml as _yaml
|
|
|
|
from . import exceptions, special_keys
|
|
from .common import BATTERY_APPROX as _BATTERY_APPROX
|
|
from .common import FirmwareInfo as _FirmwareInfo
|
|
from .common import NamedInt as _NamedInt
|
|
from .common import NamedInts as _NamedInts
|
|
from .common import UnsortedNamedInts as _UnsortedNamedInts
|
|
from .common import bytes2int as _bytes2int
|
|
from .common import crc16 as _crc16
|
|
from .common import int2bytes as _int2bytes
|
|
from .hidpp20_constants import BATTERY_STATUS, CHARGE_LEVEL, CHARGE_STATUS, CHARGE_TYPE, DEVICE_KIND, ERROR, FEATURE, GESTURE
|
|
from .hidpp20_constants import FIRMWARE_KIND as _FIRMWARE_KIND
|
|
from .i18n import _
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def hexint_presenter(dumper, data):
|
|
return dumper.represent_int(hex(data))
|
|
|
|
|
|
_yaml.add_representer(int, hexint_presenter)
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
|
|
class FeaturesArray(dict):
|
|
def __init__(self, device):
|
|
assert device is not None
|
|
self.supported = True # Actually don't know whether it is supported yet
|
|
self.device = device
|
|
self.inverse = {}
|
|
self.version = {}
|
|
self.count = 0
|
|
|
|
def _check(self) -> bool:
|
|
if not self.device.online:
|
|
return False
|
|
if self.supported is False:
|
|
return False
|
|
if self.device.protocol and self.device.protocol < 2.0:
|
|
self.supported = False
|
|
return False
|
|
if self.count > 0:
|
|
return True
|
|
reply = self.device.request(0x0000, _pack("!H", FEATURE.FEATURE_SET))
|
|
if reply is not None:
|
|
fs_index = reply[0]
|
|
if fs_index:
|
|
count = self.device.request(fs_index << 8)
|
|
if count is None:
|
|
logger.warn("FEATURE_SET found, but failed to read features count")
|
|
return False
|
|
else:
|
|
self.count = count[0] + 1 # ROOT feature not included in count
|
|
self[FEATURE.ROOT] = 0
|
|
self[FEATURE.FEATURE_SET] = fs_index
|
|
return True
|
|
else:
|
|
self.supported = False
|
|
return False
|
|
|
|
def get_feature(self, index: int) -> Optional[_NamedInt]:
|
|
feature = self.inverse.get(index)
|
|
if feature is not None:
|
|
return feature
|
|
elif self._check():
|
|
response = self.device.feature_request(FEATURE.FEATURE_SET, 0x10, index)
|
|
if response:
|
|
feature = FEATURE[_unpack("!H", response[:2])[0]]
|
|
self[feature] = index
|
|
self.version[feature] = response[3]
|
|
return feature
|
|
|
|
def enumerate(self): # return all features and their index, ordered by index
|
|
if self._check():
|
|
for index in range(self.count):
|
|
feature = self.get_feature(index)
|
|
yield feature, index
|
|
|
|
def get_feature_version(self, feature: _NamedInt) -> Optional[int]:
|
|
if self[feature]:
|
|
return self.version.get(feature, 0)
|
|
|
|
def __contains__(self, feature: _NamedInt) -> bool:
|
|
index = self.__getitem__(feature)
|
|
return index is not None and index is not False
|
|
|
|
def __getitem__(self, feature: _NamedInt) -> Optional[int]:
|
|
index = super().get(feature)
|
|
if index is not None:
|
|
return index
|
|
elif self._check():
|
|
response = self.device.request(0x0000, _pack("!H", feature))
|
|
if response:
|
|
index = response[0]
|
|
self[feature] = index if index else False
|
|
self.version[feature] = response[2]
|
|
return index if index else False
|
|
|
|
def __setitem__(self, feature, index):
|
|
if type(super().get(feature)) == int:
|
|
self.inverse.pop(super().get(feature))
|
|
super().__setitem__(feature, index)
|
|
if type(index) == int:
|
|
self.inverse[index] = feature
|
|
|
|
def __delitem__(self, feature):
|
|
raise Exception("Don't delete features from FeatureArray")
|
|
|
|
def __len__(self) -> int:
|
|
return self.count
|
|
|
|
__bool__ = __nonzero__ = _check
|
|
|
|
|
|
class ReprogrammableKey:
|
|
"""Information about a control present on a device with the `REPROG_CONTROLS` feature.
|
|
Ref: https://drive.google.com/file/d/0BxbRzx7vEV7eU3VfMnRuRXktZ3M/view
|
|
Read-only properties:
|
|
- index {int} -- index in the control ID table
|
|
- key {_NamedInt} -- the name of this control
|
|
- default_task {_NamedInt} -- the native function of this control
|
|
- flags {List[str]} -- capabilities and desired software handling of the control
|
|
"""
|
|
|
|
def __init__(self, device, index, cid, tid, flags):
|
|
self._device = device
|
|
self.index = index
|
|
self._cid = cid
|
|
self._tid = tid
|
|
self._flags = flags
|
|
|
|
@property
|
|
def key(self) -> _NamedInt:
|
|
return special_keys.CONTROL[self._cid]
|
|
|
|
@property
|
|
def default_task(self) -> _NamedInt:
|
|
"""NOTE: This NamedInt is a bit mixed up, because its value is the Control ID
|
|
while the name is the Control ID's native task. But this makes more sense
|
|
than presenting details of controls vs tasks in the interface. The same
|
|
convention applies to `mapped_to`, `remappable_to`, `remap` in `ReprogrammableKeyV4`."""
|
|
task = str(special_keys.TASK[self._tid])
|
|
return _NamedInt(self._cid, task)
|
|
|
|
@property
|
|
def flags(self) -> List[str]:
|
|
return special_keys.KEY_FLAG.flag_names(self._flags)
|
|
|
|
|
|
class ReprogrammableKeyV4(ReprogrammableKey):
|
|
"""Information about a control present on a device with the `REPROG_CONTROLS_V4` feature.
|
|
Ref (v2): https://lekensteyn.nl/files/logitech/x1b04_specialkeysmsebuttons.html
|
|
Ref (v4): https://drive.google.com/file/d/10imcbmoxTJ1N510poGdsviEhoFfB_Ua4/view
|
|
Contains all the functionality of `ReprogrammableKey` plus remapping keys and /diverting/ them
|
|
in order to handle keypresses in a custom way.
|
|
|
|
Additional read-only properties:
|
|
- pos {int} -- position of this control on the device; 1-16 for FN-keys, otherwise 0
|
|
- group {int} -- the group this control belongs to; other controls with this group in their
|
|
`group_mask` can be remapped to this control
|
|
- group_mask {List[str]} -- this control can be remapped to any control ID in these groups
|
|
- mapped_to {_NamedInt} -- which action this control is mapped to; usually itself
|
|
- remappable_to {List[_NamedInt]} -- list of actions which this control can be remapped to
|
|
- mapping_flags {List[str]} -- mapping flags set on the control
|
|
"""
|
|
|
|
def __init__(self, device, index, cid, tid, flags, pos, group, gmask):
|
|
ReprogrammableKey.__init__(self, device, index, cid, tid, flags)
|
|
self.pos = pos
|
|
self.group = group
|
|
self._gmask = gmask
|
|
self._mapping_flags = None
|
|
self._mapped_to = None
|
|
|
|
@property
|
|
def group_mask(self):
|
|
return special_keys.CID_GROUP_BIT.flag_names(self._gmask)
|
|
|
|
@property
|
|
def mapped_to(self) -> _NamedInt:
|
|
if self._mapped_to is None:
|
|
self._getCidReporting()
|
|
self._device.keys._ensure_all_keys_queried()
|
|
task = str(special_keys.TASK[self._device.keys.cid_to_tid[self._mapped_to]])
|
|
return _NamedInt(self._mapped_to, task)
|
|
|
|
@property
|
|
def remappable_to(self) -> _NamedInts:
|
|
self._device.keys._ensure_all_keys_queried()
|
|
ret = _UnsortedNamedInts()
|
|
if self.group_mask != []: # only keys with a non-zero gmask are remappable
|
|
ret[self.default_task] = self.default_task # it should always be possible to map the key to itself
|
|
for g in self.group_mask:
|
|
g = special_keys.CID_GROUP[str(g)]
|
|
for tgt_cid in self._device.keys.group_cids[g]:
|
|
tgt_task = str(special_keys.TASK[self._device.keys.cid_to_tid[tgt_cid]])
|
|
tgt_task = _NamedInt(tgt_cid, tgt_task)
|
|
if tgt_task != self.default_task: # don't put itself in twice
|
|
ret[tgt_task] = tgt_task
|
|
return ret
|
|
|
|
@property
|
|
def mapping_flags(self) -> List[str]:
|
|
if self._mapping_flags is None:
|
|
self._getCidReporting()
|
|
return special_keys.MAPPING_FLAG.flag_names(self._mapping_flags)
|
|
|
|
def set_diverted(self, value: bool):
|
|
"""If set, the control is diverted temporarily and reports presses as HID++ events."""
|
|
flags = {special_keys.MAPPING_FLAG.diverted: value}
|
|
self._setCidReporting(flags=flags)
|
|
|
|
def set_persistently_diverted(self, value: bool):
|
|
"""If set, the control is diverted permanently and reports presses as HID++ events."""
|
|
flags = {special_keys.MAPPING_FLAG.persistently_diverted: value}
|
|
self._setCidReporting(flags=flags)
|
|
|
|
def set_rawXY_reporting(self, value: bool):
|
|
"""If set, the mouse temporarily reports all its raw XY events while this control is pressed as HID++ events."""
|
|
flags = {special_keys.MAPPING_FLAG.raw_XY_diverted: value}
|
|
self._setCidReporting(flags=flags)
|
|
|
|
def remap(self, to: _NamedInt):
|
|
"""Temporarily remaps this control to another action."""
|
|
self._setCidReporting(remap=int(to))
|
|
|
|
def _getCidReporting(self):
|
|
try:
|
|
mapped_data = feature_request(self._device, FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(_pack("!H", self._cid)))
|
|
if mapped_data:
|
|
cid, mapping_flags_1, mapped_to = _unpack("!HBH", mapped_data[:5])
|
|
if cid != self._cid and logger.isEnabledFor(logging.WARNING):
|
|
logger.warn(
|
|
f"REPROG_CONTROLS_V4 endpoint getCidReporting on device {self._device} replied "
|
|
+ f"with a different control ID ({cid}) than requested ({self._cid})."
|
|
)
|
|
self._mapped_to = mapped_to if mapped_to != 0 else self._cid
|
|
if len(mapped_data) > 5:
|
|
(mapping_flags_2,) = _unpack("!B", mapped_data[5:6])
|
|
else:
|
|
mapping_flags_2 = 0
|
|
self._mapping_flags = mapping_flags_1 | (mapping_flags_2 << 8)
|
|
else:
|
|
raise exceptions.FeatureCallError(msg="No reply from device.")
|
|
except exceptions.FeatureCallError: # if the key hasn't ever been configured only produce a warning
|
|
if logger.isEnabledFor(logging.WARNING):
|
|
logger.warn(
|
|
f"Feature Call Error in _getCidReporting on device {self._device} for cid {self._cid} - use defaults"
|
|
)
|
|
# Clear flags and set mapping target to self
|
|
self._mapping_flags = 0
|
|
self._mapped_to = self._cid
|
|
|
|
def _setCidReporting(self, flags=None, remap=0):
|
|
"""Sends a `setCidReporting` request with the given parameters. Raises an exception if the parameters are invalid.
|
|
Parameters:
|
|
- flags {Dict[_NamedInt,bool]} -- a dictionary of which mapping flags to set/unset
|
|
- remap {int} -- which control ID to remap to; or 0 to keep current mapping
|
|
"""
|
|
flags = flags if flags else {} # See flake8 B006
|
|
|
|
# if special_keys.MAPPING_FLAG.raw_XY_diverted in flags and flags[special_keys.MAPPING_FLAG.raw_XY_diverted]:
|
|
# We need diversion to report raw XY, so divert temporarily (since XY reporting is also temporary)
|
|
# flags[special_keys.MAPPING_FLAG.diverted] = True
|
|
# if special_keys.MAPPING_FLAG.diverted in flags and not flags[special_keys.MAPPING_FLAG.diverted]:
|
|
# flags[special_keys.MAPPING_FLAG.raw_XY_diverted] = False
|
|
|
|
# The capability required to set a given reporting flag.
|
|
FLAG_TO_CAPABILITY = {
|
|
special_keys.MAPPING_FLAG.diverted: special_keys.KEY_FLAG.divertable,
|
|
special_keys.MAPPING_FLAG.persistently_diverted: special_keys.KEY_FLAG.persistently_divertable,
|
|
special_keys.MAPPING_FLAG.analytics_key_events_reporting: special_keys.KEY_FLAG.analytics_key_events,
|
|
special_keys.MAPPING_FLAG.force_raw_XY_diverted: special_keys.KEY_FLAG.force_raw_XY,
|
|
special_keys.MAPPING_FLAG.raw_XY_diverted: special_keys.KEY_FLAG.raw_XY,
|
|
}
|
|
|
|
bfield = 0
|
|
for f, v in flags.items():
|
|
if v and FLAG_TO_CAPABILITY[f] not in self.flags:
|
|
raise exceptions.FeatureNotSupported(
|
|
msg=f'Tried to set mapping flag "{f}" on control "{self.key}" '
|
|
+ f'which does not support "{FLAG_TO_CAPABILITY[f]}" on device {self._device}.'
|
|
)
|
|
bfield |= int(f) if v else 0
|
|
bfield |= int(f) << 1 # The 'Xvalid' bit
|
|
if self._mapping_flags: # update flags if already read
|
|
if v:
|
|
self._mapping_flags |= int(f)
|
|
else:
|
|
self._mapping_flags &= ~int(f)
|
|
|
|
if remap != 0 and remap not in self.remappable_to:
|
|
raise exceptions.FeatureNotSupported(
|
|
msg=f'Tried to remap control "{self.key}" to a control ID {remap} which it is not remappable to '
|
|
+ f"on device {self._device}."
|
|
)
|
|
if remap != 0: # update mapping if changing (even if not already read)
|
|
self._mapped_to = remap
|
|
|
|
pkt = tuple(_pack("!HBH", self._cid, bfield & 0xFF, remap))
|
|
# TODO: to fully support version 4 of REPROG_CONTROLS_V4, append `(bfield >> 8) & 0xff` here.
|
|
# But older devices might behave oddly given that byte, so we don't send it.
|
|
ret = feature_request(self._device, FEATURE.REPROG_CONTROLS_V4, 0x30, *pkt)
|
|
if ret is None or _unpack("!BBBBB", ret[:5]) != pkt and logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug(f"REPROG_CONTROLS_v4 setCidReporting on device {self._device} didn't echo request packet.")
|
|
|
|
|
|
class PersistentRemappableAction:
|
|
def __init__(self, device, index, cid, actionId, remapped, modifierMask, cidStatus):
|
|
self._device = device
|
|
self.index = index
|
|
self._cid = cid
|
|
self.actionId = actionId
|
|
self.remapped = remapped
|
|
self._modifierMask = modifierMask
|
|
self.cidStatus = cidStatus
|
|
|
|
@property
|
|
def key(self) -> _NamedInt:
|
|
return special_keys.CONTROL[self._cid]
|
|
|
|
@property
|
|
def actionType(self) -> _NamedInt:
|
|
return special_keys.ACTIONID[self._actionId]
|
|
|
|
@property
|
|
def action(self):
|
|
if self.actionId == special_keys.ACTIONID.Empty:
|
|
return None
|
|
elif self.actionId == special_keys.ACTIONID.Key:
|
|
return "Key: " + str(self.modifiers) + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Mouse:
|
|
return "Mouse Button: " + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Xdisp:
|
|
return "X Displacement " + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Ydisp:
|
|
return "Y Displacement " + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Vscroll:
|
|
return "Vertical Scroll " + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Hscroll:
|
|
return "Horizontal Scroll: " + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Consumer:
|
|
return "Consumer: " + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Internal:
|
|
return "Internal Action " + str(self.remapped)
|
|
elif self.actionId == special_keys.ACTIONID.Internal:
|
|
return "Power " + str(self.remapped)
|
|
else:
|
|
return "Unknown"
|
|
|
|
@property
|
|
def modifiers(self):
|
|
return special_keys.modifiers[self._modifierMask]
|
|
|
|
@property
|
|
def data_bytes(self):
|
|
return _int2bytes(self.actionId, 1) + _int2bytes(self.remapped, 2) + _int2bytes(self._modifierMask, 1)
|
|
|
|
def remap(self, data_bytes):
|
|
cid = _int2bytes(self._cid, 2)
|
|
if _bytes2int(data_bytes) == special_keys.KEYS_Default: # map back to default
|
|
feature_request(self._device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x50, cid, 0xFF)
|
|
self._device.remap_keys._query_key(self.index)
|
|
return self._device.remap_keys.keys[self.index].data_bytes
|
|
else:
|
|
self._actionId, self._code, self._modifierMask = _unpack("!BHB", data_bytes)
|
|
self.cidStatus = 0x01
|
|
feature_request(self._device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x40, cid, 0xFF, data_bytes)
|
|
return True
|
|
|
|
|
|
class KeysArray:
|
|
"""A sequence of key mappings supported by a HID++ 2.0 device."""
|
|
|
|
def __init__(self, device, count, version):
|
|
assert device is not None
|
|
self.device = device
|
|
self.lock = _threading.Lock()
|
|
if FEATURE.REPROG_CONTROLS_V4 in self.device.features:
|
|
self.keyversion = FEATURE.REPROG_CONTROLS_V4
|
|
elif FEATURE.REPROG_CONTROLS_V2 in self.device.features:
|
|
self.keyversion = FEATURE.REPROG_CONTROLS_V2
|
|
else:
|
|
if logger.isEnabledFor(logging.ERROR):
|
|
logger.error(f"Trying to read keys on device {device} which has no REPROG_CONTROLS(_VX) support.")
|
|
self.keyversion = None
|
|
self.keys = [None] * count
|
|
|
|
def _query_key(self, index: int):
|
|
"""Queries the device for a given key and stores it in self.keys."""
|
|
if index < 0 or index >= len(self.keys):
|
|
raise IndexError(index)
|
|
|
|
# TODO: add here additional variants for other REPROG_CONTROLS
|
|
if self.keyversion == FEATURE.REPROG_CONTROLS_V2:
|
|
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V2, 0x10, index)
|
|
if keydata:
|
|
cid, tid, flags = _unpack("!HHB", keydata[:5])
|
|
self.keys[index] = ReprogrammableKey(self.device, index, cid, tid, flags)
|
|
self.cid_to_tid[cid] = tid
|
|
elif self.keyversion == FEATURE.REPROG_CONTROLS_V4:
|
|
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V4, 0x10, index)
|
|
if keydata:
|
|
cid, tid, flags1, pos, group, gmask, flags2 = _unpack("!HHBBBBB", keydata[:9])
|
|
flags = flags1 | (flags2 << 8)
|
|
self.keys[index] = ReprogrammableKeyV4(self.device, index, cid, tid, flags, pos, group, gmask)
|
|
self.cid_to_tid[cid] = tid
|
|
if group != 0: # 0 = does not belong to a group
|
|
self.group_cids[special_keys.CID_GROUP[group]].append(cid)
|
|
elif logger.isEnabledFor(logging.WARNING):
|
|
logger.warn(f"Key with index {index} was expected to exist but device doesn't report it.")
|
|
|
|
def _ensure_all_keys_queried(self):
|
|
"""The retrieval of key information is lazy, but for certain functionality
|
|
we need to know all keys. This function makes sure that's the case."""
|
|
with self.lock: # don't want two threads doing this
|
|
for i, k in enumerate(self.keys):
|
|
if k is None:
|
|
self._query_key(i)
|
|
|
|
def __getitem__(self, index):
|
|
if isinstance(index, int):
|
|
if index < 0 or index >= len(self.keys):
|
|
raise IndexError(index)
|
|
|
|
if self.keys[index] is None:
|
|
self._query_key(index)
|
|
|
|
return self.keys[index]
|
|
|
|
elif isinstance(index, slice):
|
|
indices = index.indices(len(self.keys))
|
|
return [self.__getitem__(i) for i in range(*indices)]
|
|
|
|
def index(self, value):
|
|
self._ensure_all_keys_queried()
|
|
for index, k in enumerate(self.keys):
|
|
if k is not None and int(value) == int(k.key):
|
|
return index
|
|
|
|
def __iter__(self):
|
|
for k in range(0, len(self.keys)):
|
|
yield self.__getitem__(k)
|
|
|
|
def __len__(self):
|
|
return len(self.keys)
|
|
|
|
|
|
class KeysArrayV1(KeysArray):
|
|
def __init__(self, device, count, version=1):
|
|
super().__init__(device, count, version)
|
|
"""The mapping from Control IDs to their native Task IDs.
|
|
For example, Control "Left Button" is mapped to Task "Left Click".
|
|
When remapping controls, we point the control we want to remap
|
|
at a target Control ID rather than a target Task ID. This has the
|
|
effect of performing the native task of the target control,
|
|
even if the target itself is also remapped. So remapping
|
|
is not recursive."""
|
|
self.cid_to_tid = {}
|
|
"""The mapping from Control ID groups to Controls IDs that belong to it.
|
|
A key k can only be remapped to targets in groups within k.group_mask."""
|
|
self.group_cids = {g: [] for g in special_keys.CID_GROUP}
|
|
|
|
def _query_key(self, index: int):
|
|
if index < 0 or index >= len(self.keys):
|
|
raise IndexError(index)
|
|
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS, 0x10, index)
|
|
if keydata:
|
|
cid, tid, flags = _unpack("!HHB", keydata[:5])
|
|
self.keys[index] = ReprogrammableKey(self.device, index, cid, tid, flags)
|
|
self.cid_to_tid[cid] = tid
|
|
elif logger.isEnabledFor(logging.WARNING):
|
|
logger.warn(f"Key with index {index} was expected to exist but device doesn't report it.")
|
|
|
|
|
|
class KeysArrayV4(KeysArrayV1):
|
|
def __init__(self, device, count):
|
|
super().__init__(device, count, 4)
|
|
|
|
def _query_key(self, index: int):
|
|
if index < 0 or index >= len(self.keys):
|
|
raise IndexError(index)
|
|
keydata = feature_request(self.device, FEATURE.REPROG_CONTROLS_V4, 0x10, index)
|
|
if keydata:
|
|
cid, tid, flags1, pos, group, gmask, flags2 = _unpack("!HHBBBBB", keydata[:9])
|
|
flags = flags1 | (flags2 << 8)
|
|
self.keys[index] = ReprogrammableKeyV4(self.device, index, cid, tid, flags, pos, group, gmask)
|
|
self.cid_to_tid[cid] = tid
|
|
if group != 0: # 0 = does not belong to a group
|
|
self.group_cids[special_keys.CID_GROUP[group]].append(cid)
|
|
elif logger.isEnabledFor(logging.WARNING):
|
|
logger.warn(f"Key with index {index} was expected to exist but device doesn't report it.")
|
|
|
|
|
|
# we are only interested in the current host, so use 0xFF for the host throughout
|
|
class KeysArrayPersistent(KeysArray):
|
|
def __init__(self, device, count):
|
|
super().__init__(device, count, 5)
|
|
self._capabilities = None
|
|
|
|
@property
|
|
def capabilities(self):
|
|
if self._capabilities is None and self.device.online:
|
|
capabilities = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x00)
|
|
assert capabilities, "Oops, persistent remappable key capabilities cannot be retrieved!"
|
|
self._capabilities = _unpack("!H", capabilities[:2])[0] # flags saying what the mappings are possible
|
|
return self._capabilities
|
|
|
|
def _query_key(self, index: int):
|
|
if index < 0 or index >= len(self.keys):
|
|
raise IndexError(index)
|
|
keydata = feature_request(self.device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF)
|
|
if keydata:
|
|
key = _unpack("!H", keydata[:2])[0]
|
|
try:
|
|
mapped_data = feature_request(
|
|
self.device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key & 0xFF00, key & 0xFF, 0xFF
|
|
)
|
|
if mapped_data:
|
|
_ignore, _ignore, actionId, remapped, modifiers, status = _unpack("!HBBHBB", mapped_data[:8])
|
|
except Exception:
|
|
actionId = remapped = modifiers = status = 0
|
|
actionId = special_keys.ACTIONID[actionId]
|
|
if actionId == special_keys.ACTIONID.Key:
|
|
remapped = special_keys.USB_HID_KEYCODES[remapped]
|
|
elif actionId == special_keys.ACTIONID.Mouse:
|
|
remapped = special_keys.MOUSE_BUTTONS[remapped]
|
|
elif actionId == special_keys.ACTIONID.Hscroll:
|
|
remapped = special_keys.HORIZONTAL_SCROLL[remapped]
|
|
elif actionId == special_keys.ACTIONID.Consumer:
|
|
remapped = special_keys.HID_CONSUMERCODES[remapped]
|
|
elif actionId == special_keys.ACTIONID.Empty: # purge data from empty value
|
|
remapped = modifiers = 0
|
|
self.keys[index] = PersistentRemappableAction(self.device, index, key, actionId, remapped, modifiers, status)
|
|
elif logger.isEnabledFor(logging.WARNING):
|
|
logger.warn(f"Key with index {index} was expected to exist but device doesn't report it.")
|
|
|
|
|
|
# Param Ids for feature GESTURE_2
|
|
PARAM = _NamedInts(
|
|
ExtraCapabilities=1, # not suitable for use
|
|
PixelZone=2, # 4 2-byte integers, left, bottom, width, height; pixels
|
|
RatioZone=3, # 4 bytes, left, bottom, width, height; unit 1/240 pad size
|
|
ScaleFactor=4, # 2-byte integer, with 256 as normal scale
|
|
)
|
|
PARAM._fallback = lambda x: "unknown:%04X" % x
|
|
|
|
|
|
class SubParam:
|
|
__slots__ = ("id", "length", "minimum", "maximum", "widget")
|
|
|
|
def __init__(self, id, length, minimum=None, maximum=None, widget=None):
|
|
self.id = id
|
|
self.length = length
|
|
self.minimum = minimum if minimum is not None else 0
|
|
self.maximum = maximum if maximum is not None else ((1 << 8 * length) - 1)
|
|
self.widget = widget if widget is not None else "Scale"
|
|
|
|
def __str__(self):
|
|
return self.id
|
|
|
|
def __repr__(self):
|
|
return self.id
|
|
|
|
|
|
SUB_PARAM = { # (byte count, minimum, maximum)
|
|
PARAM["ExtraCapabilities"]: None, # ignore
|
|
PARAM["PixelZone"]: ( # TODO: replace min and max with the correct values
|
|
SubParam("left", 2, 0x0000, 0xFFFF, "SpinButton"),
|
|
SubParam("bottom", 2, 0x0000, 0xFFFF, "SpinButton"),
|
|
SubParam("width", 2, 0x0000, 0xFFFF, "SpinButton"),
|
|
SubParam("height", 2, 0x0000, 0xFFFF, "SpinButton"),
|
|
),
|
|
PARAM["RatioZone"]: ( # TODO: replace min and max with the correct values
|
|
SubParam("left", 1, 0x00, 0xFF, "SpinButton"),
|
|
SubParam("bottom", 1, 0x00, 0xFF, "SpinButton"),
|
|
SubParam("width", 1, 0x00, 0xFF, "SpinButton"),
|
|
SubParam("height", 1, 0x00, 0xFF, "SpinButton"),
|
|
),
|
|
PARAM["ScaleFactor"]: (SubParam("scale", 2, 0x002E, 0x01FF, "Scale"),),
|
|
}
|
|
|
|
# Spec Ids for feature GESTURE_2
|
|
SPEC = _NamedInts(
|
|
DVI_field_width=1,
|
|
field_widths=2,
|
|
period_unit=3,
|
|
resolution=4,
|
|
multiplier=5,
|
|
sensor_size=6,
|
|
finger_width_and_height=7,
|
|
finger_major_minor_axis=8,
|
|
finger_force=9,
|
|
zone=10,
|
|
)
|
|
SPEC._fallback = lambda x: "unknown:%04X" % x
|
|
|
|
# Action Ids for feature GESTURE_2
|
|
ACTION_ID = _NamedInts(
|
|
MovePointer=1,
|
|
ScrollHorizontal=2,
|
|
WheelScrolling=3,
|
|
ScrollVertial=4,
|
|
ScrollOrPageXY=5,
|
|
ScrollOrPageHorizontal=6,
|
|
PageScreen=7,
|
|
Drag=8,
|
|
SecondaryDrag=9,
|
|
Zoom=10,
|
|
ScrollHorizontalOnly=11,
|
|
ScrollVerticalOnly=12,
|
|
)
|
|
ACTION_ID._fallback = lambda x: "unknown:%04X" % x
|
|
|
|
|
|
class Gesture:
|
|
def __init__(self, device, low, high, next_index, next_diversion_index):
|
|
self._device = device
|
|
self.id = low
|
|
self.gesture = GESTURE[low]
|
|
self.can_be_enabled = high & 0x01
|
|
self.can_be_diverted = high & 0x02
|
|
self.show_in_ui = high & 0x04
|
|
self.desired_software_default = high & 0x08
|
|
self.persistent = high & 0x10
|
|
self.default_enabled = high & 0x20
|
|
self.index = next_index if self.can_be_enabled or self.default_enabled else None
|
|
self.diversion_index = next_diversion_index if self.can_be_diverted else None
|
|
self._enabled = None
|
|
self._diverted = None
|
|
|
|
def _offset_mask(self, index): # offset and mask
|
|
if index is not None:
|
|
offset = index >> 3 # 8 gestures per byte
|
|
mask = 0x1 << (index % 8)
|
|
return (offset, mask)
|
|
else:
|
|
return (None, None)
|
|
|
|
def enable_offset_mask(gesture):
|
|
return gesture._offset_mask(gesture.index)
|
|
|
|
def diversion_offset_mask(gesture):
|
|
return gesture._offset_mask(gesture.diversion_index)
|
|
|
|
def enabled(self): # is the gesture enabled?
|
|
if self._enabled is None and self.index is not None:
|
|
offset, mask = self.enable_offset_mask()
|
|
result = feature_request(self._device, FEATURE.GESTURE_2, 0x10, offset, 0x01, mask)
|
|
self._enabled = bool(result[0] & mask) if result else None
|
|
return self._enabled
|
|
|
|
def set(self, enable): # enable or disable the gesture
|
|
if not self.can_be_enabled:
|
|
return None
|
|
if self.index is not None:
|
|
offset, mask = self.enable_offset_mask()
|
|
reply = feature_request(self._device, FEATURE.GESTURE_2, 0x20, offset, 0x01, mask, mask if enable else 0x00)
|
|
return reply
|
|
|
|
def diverted(self): # is the gesture diverted?
|
|
if self._diverted is None and self.diversion_index is not None:
|
|
offset, mask = self.diversion_offset_mask()
|
|
result = feature_request(self._device, FEATURE.GESTURE_2, 0x30, offset, 0x01, mask)
|
|
self._diverted = bool(result[0] & mask) if result else None
|
|
return self._diverted
|
|
|
|
def divert(self, diverted): # divert or undivert the gesture
|
|
if not self.can_be_diverted:
|
|
return None
|
|
if self.diversion_index is not None:
|
|
offset, mask = self.diversion_offset_mask()
|
|
reply = feature_request(self._device, FEATURE.GESTURE_2, 0x40, offset, 0x01, mask, mask if diverted else 0x00)
|
|
return reply
|
|
|
|
def as_int(self):
|
|
return self.gesture
|
|
|
|
def __int__(self):
|
|
return self.id
|
|
|
|
def __repr__(self):
|
|
return f"<Gesture {self.gesture} index={self.index} diversion_index={self.diversion_index}>"
|
|
|
|
# allow a gesture to be used as a settings reader/writer to enable and disable the gesture
|
|
read = enabled
|
|
write = set
|
|
|
|
|
|
class Param:
|
|
param_index = {}
|
|
|
|
def __init__(self, device, low, high):
|
|
self._device = device
|
|
self.id = low
|
|
self.param = PARAM[low]
|
|
self.size = high & 0x0F
|
|
self.show_in_ui = bool(high & 0x1F)
|
|
self._value = None
|
|
self._default_value = None
|
|
self.index = Param.param_index.get(device, 0)
|
|
Param.param_index[device] = self.index + 1
|
|
|
|
@property
|
|
def sub_params(self):
|
|
return SUB_PARAM.get(self.id, None)
|
|
|
|
@property
|
|
def value(self):
|
|
return self._value if self._value is not None else self.read()
|
|
|
|
def read(self): # returns the bytes for the parameter
|
|
result = feature_request(self._device, FEATURE.GESTURE_2, 0x70, self.index, 0xFF)
|
|
if result:
|
|
self._value = _bytes2int(result[: self.size])
|
|
return self._value
|
|
|
|
@property
|
|
def default_value(self):
|
|
if self._default_value is None:
|
|
self._default_value = self._read_default()
|
|
return self._default_value
|
|
|
|
def _read_default(self):
|
|
result = feature_request(self._device, FEATURE.GESTURE_2, 0x60, self.index, 0xFF)
|
|
if result:
|
|
self._default_value = _bytes2int(result[: self.size])
|
|
return self._default_value
|
|
|
|
def write(self, bytes):
|
|
self._value = bytes
|
|
return feature_request(self._device, FEATURE.GESTURE_2, 0x80, self.index, bytes, 0xFF)
|
|
|
|
def __str__(self):
|
|
return str(self.param)
|
|
|
|
def __int__(self):
|
|
return self.id
|
|
|
|
|
|
class Spec:
|
|
def __init__(self, device, low, high):
|
|
self._device = device
|
|
self.id = low
|
|
self.spec = SPEC[low]
|
|
self.byte_count = high & 0x0F
|
|
self._value = None
|
|
|
|
@property
|
|
def value(self):
|
|
if self._value is None:
|
|
self._value = self.read()
|
|
return self._value
|
|
|
|
def read(self):
|
|
try:
|
|
value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF)
|
|
except exceptions.FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus)
|
|
if logger.isEnabledFor(logging.WARNING):
|
|
logger.warn(f"Feature Call Error reading Gesture Spec on device {self._device} for spec {self.id} - use None")
|
|
return None
|
|
return _bytes2int(value[: self.byte_count])
|
|
|
|
def __repr__(self):
|
|
return f"[{self.spec}={self.value}]"
|
|
|
|
|
|
class Gestures:
|
|
"""Information about the gestures that a device supports.
|
|
Right now only some information fields are supported.
|
|
WARNING: Assumes that parameters are always global, which is not the case.
|
|
"""
|
|
|
|
def __init__(self, device):
|
|
self.device = device
|
|
self.gestures = {}
|
|
self.params = {}
|
|
self.specs = {}
|
|
index = 0
|
|
next_gesture_index = next_divsn_index = 0
|
|
field_high = 0x00
|
|
while field_high != 0x01: # end of fields
|
|
# retrieve the next eight fields
|
|
fields = feature_request(device, FEATURE.GESTURE_2, 0x00, index >> 8, index & 0xFF)
|
|
if not fields:
|
|
break
|
|
for offset in range(8):
|
|
field_high = fields[offset * 2]
|
|
field_low = fields[offset * 2 + 1]
|
|
if field_high == 0x1: # end of fields
|
|
break
|
|
elif field_high & 0x80:
|
|
gesture = Gesture(device, field_low, field_high, next_gesture_index, next_divsn_index)
|
|
next_gesture_index = next_gesture_index if gesture.index is None else next_gesture_index + 1
|
|
next_divsn_index = next_divsn_index if gesture.diversion_index is None else next_divsn_index + 1
|
|
self.gestures[gesture.gesture] = gesture
|
|
elif field_high & 0xF0 == 0x30 or field_high & 0xF0 == 0x20:
|
|
param = Param(device, field_low, field_high)
|
|
self.params[param.param] = param
|
|
elif field_high == 0x04:
|
|
if field_low != 0x00:
|
|
logger.error(f"Unimplemented GESTURE_2 grouping {field_low} {field_high} found.")
|
|
elif field_high & 0xF0 == 0x40:
|
|
spec = Spec(device, field_low, field_high)
|
|
self.specs[spec.spec] = spec
|
|
else:
|
|
logger.warn(f"Unimplemented GESTURE_2 field {field_low} {field_high} found.")
|
|
index += 1
|
|
|
|
def gesture(self, gesture):
|
|
return self.gestures.get(gesture, None)
|
|
|
|
def gesture_enabled(self, gesture): # is the gesture enabled?
|
|
g = self.gestures.get(gesture, None)
|
|
return g.enabled(self.device) if g else None
|
|
|
|
def enable_gesture(self, gesture):
|
|
g = self.gestures.get(gesture, None)
|
|
return g.set(self.device, True) if g else None
|
|
|
|
def disable_gesture(self, gesture):
|
|
g = self.gestures.get(gesture, None)
|
|
return g.set(self.device, False) if g else None
|
|
|
|
def param(self, param):
|
|
return self.params.get(param, None)
|
|
|
|
def get_param(self, param):
|
|
g = self.params.get(param, None)
|
|
return g.get(self.device) if g else None
|
|
|
|
def set_param(self, param, value):
|
|
g = self.params.get(param, None)
|
|
return g.set(self.device, value) if g else None
|
|
|
|
|
|
class Backlight:
|
|
"""Information about the current settings of x1982 Backlight2 v3, but also works for previous versions"""
|
|
|
|
def __init__(self, device):
|
|
response = device.feature_request(FEATURE.BACKLIGHT2, 0x00)
|
|
if not response:
|
|
raise exceptions.FeatureCallError(msg="No reply from device.")
|
|
self.device = device
|
|
self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = _unpack(
|
|
"<BBBHBHHH", response[:12]
|
|
)
|
|
self.auto_supported = supported & 0x08
|
|
self.temp_supported = supported & 0x10
|
|
self.perm_supported = supported & 0x20
|
|
self.mode = (self.options >> 3) & 0x03
|
|
|
|
def write(self):
|
|
self.options = (self.options & 0x07) | (self.mode << 3)
|
|
level = self.level if self.mode == 0x3 else 0
|
|
data_bytes = _pack("<BBBBHHH", self.enabled, self.options, 0xFF, level, self.dho, self.dhi, self.dpow)
|
|
self.device.feature_request(FEATURE.BACKLIGHT2, 0x00) # for testing - remove later
|
|
self.device.feature_request(FEATURE.BACKLIGHT2, 0x10, data_bytes)
|
|
|
|
|
|
class LEDParam:
|
|
color = "color"
|
|
speed = "speed"
|
|
period = "period"
|
|
intensity = "intensity"
|
|
ramp = "ramp"
|
|
form = "form"
|
|
|
|
|
|
LEDRampChoices = _NamedInts(default=0, yes=1, no=2)
|
|
LEDFormChoices = _NamedInts(default=0, sine=1, square=2, triangle=3, sawtooth=4, sharkfin=5, exponential=6)
|
|
LEDParamSize = {LEDParam.color: 3, LEDParam.speed: 1, LEDParam.period: 2,
|
|
LEDParam.intensity: 1, LEDParam.ramp: 1, LEDParam.form: 1} # yapf: disable
|
|
LEDEffects = { # Wave=0x04, Stars=0x05, Press=0x06, Audio=0x07, # not implemented
|
|
0x0: [_NamedInt(0x0, _('Disabled')), {}],
|
|
0x1: [_NamedInt(0x1, _('Static')), {LEDParam.color: 0, LEDParam.ramp: 3}],
|
|
0x2: [_NamedInt(0x2, _('Pulse')), {LEDParam.color: 0, LEDParam.speed: 3}],
|
|
0x3: [_NamedInt(0x3, _('Cycle')), {LEDParam.period: 5, LEDParam.intensity: 7}],
|
|
0x8: [_NamedInt(0x8, _('Boot')), {}],
|
|
0x9: [_NamedInt(0x9, _('Demo')), {}],
|
|
0xA: [_NamedInt(0xA, _('Breathe')), {LEDParam.color: 0, LEDParam.period: 3,
|
|
LEDParam.form: 5, LEDParam.intensity: 6}],
|
|
0xB: [_NamedInt(0xB, _('Ripple')), {LEDParam.color: 0, LEDParam.period: 4}]
|
|
} # yapf: disable
|
|
|
|
|
|
class LEDEffectSetting: # an effect plus its parameters
|
|
def __init__(self, **kwargs):
|
|
self.ID = None
|
|
for key, val in kwargs.items():
|
|
setattr(self, key, val)
|
|
|
|
@classmethod
|
|
def from_bytes(cls, bytes, options=None):
|
|
ID = next((ze.ID for ze in options if ze.index == bytes[0]), None) if options is not None else bytes[0]
|
|
effect = LEDEffects[ID] if ID in LEDEffects else None
|
|
args = {"ID": effect[0] if effect else None}
|
|
if effect:
|
|
for p, b in effect[1].items():
|
|
args[str(p)] = _bytes2int(bytes[1 + b : 1 + b + LEDParamSize[p]])
|
|
else:
|
|
args["bytes"] = bytes
|
|
return cls(**args)
|
|
|
|
def to_bytes(self, options=None):
|
|
ID = self.ID
|
|
if ID is None:
|
|
return self.bytes if hasattr(self, "bytes") else b"\xff" * 11
|
|
else:
|
|
bs = [0] * 10
|
|
for p, b in LEDEffects[ID][1].items():
|
|
bs[b : b + LEDParamSize[p]] = _int2bytes(getattr(self, str(p), 0), LEDParamSize[p])
|
|
if options is not None:
|
|
ID = next((ze.index for ze in options if ze.ID == ID), None)
|
|
result = _int2bytes(ID, 1) + bytes(bs)
|
|
return result
|
|
|
|
@classmethod
|
|
def from_yaml(cls, loader, node):
|
|
return cls(**loader.construct_mapping(node))
|
|
|
|
@classmethod
|
|
def to_yaml(cls, dumper, data):
|
|
return dumper.represent_mapping("!LEDEffectSetting", data.__dict__, flow_style=True)
|
|
|
|
def __str__(self):
|
|
return _yaml.dump(self, width=float("inf")).rstrip("\n")
|
|
|
|
|
|
_yaml.SafeLoader.add_constructor("!LEDEffectSetting", LEDEffectSetting.from_yaml)
|
|
_yaml.add_representer(LEDEffectSetting, LEDEffectSetting.to_yaml)
|
|
|
|
|
|
class LEDEffectInfo: # an effect that a zone can do
|
|
def __init__(self, device, zindex, eindex):
|
|
info = device.feature_request(FEATURE.COLOR_LED_EFFECTS, 0x20, zindex, eindex)
|
|
self.zindex, self.index, self.ID, self.capabilities, self.period = _unpack("!BBHHH", info[0:8])
|
|
|
|
def __str__(self):
|
|
return f"LEDEffectInfo({self.zindex}, {self.index}, {self.ID}, {self.capabilities: x}, {self.period})"
|
|
|
|
|
|
LEDZoneLocations = _NamedInts()
|
|
LEDZoneLocations[0x00] = _("Unknown Location")
|
|
LEDZoneLocations[0x01] = _("Primary")
|
|
LEDZoneLocations[0x02] = _("Logo")
|
|
LEDZoneLocations[0x03] = _("Left Side")
|
|
LEDZoneLocations[0x04] = _("Right Side")
|
|
LEDZoneLocations[0x05] = _("Combined")
|
|
LEDZoneLocations[0x06] = _("Primary 1")
|
|
LEDZoneLocations[0x07] = _("Primary 2")
|
|
LEDZoneLocations[0x08] = _("Primary 3")
|
|
LEDZoneLocations[0x09] = _("Primary 4")
|
|
LEDZoneLocations[0x0A] = _("Primary 5")
|
|
LEDZoneLocations[0x0B] = _("Primary 6")
|
|
|
|
|
|
class LEDZoneInfo: # effects that a zone can do
|
|
def __init__(self, device, index):
|
|
info = device.feature_request(FEATURE.COLOR_LED_EFFECTS, 0x10, index)
|
|
self.index, self.location, self.count = _unpack("!BHB", info[0:4])
|
|
self.location = LEDZoneLocations[self.location] if LEDZoneLocations[self.location] else self.location
|
|
self.effects = []
|
|
for i in range(0, self.count):
|
|
self.effects.append(LEDEffectInfo(device, index, i))
|
|
|
|
def to_command(self, setting):
|
|
for i in range(0, len(self.effects)):
|
|
e = self.effects[i]
|
|
if e.ID == setting.ID:
|
|
return _int2bytes(self.index) + _int2bytes(i) + setting.to_bytes()[1:]
|
|
return None
|
|
|
|
def __str__(self):
|
|
return f"LEDZoneInfo({self.index}, {self.location}, {[str(z) for z in self.effects]}"
|
|
|
|
|
|
class LEDEffectsInfo: # effects that the LEDs can do
|
|
def __init__(self, device):
|
|
info = device.feature_request(FEATURE.COLOR_LED_EFFECTS, 0x00)
|
|
self.device = device
|
|
self.count, _, capabilities = _unpack("!BHH", info[0:5])
|
|
self.readable = capabilities & 0x1
|
|
self.zones = []
|
|
for i in range(0, self.count):
|
|
self.zones.append(LEDZoneInfo(device, i))
|
|
|
|
def to_command(self, index, setting):
|
|
return self.zones[index].to_command(setting)
|
|
|
|
def __str__(self):
|
|
zones = "\n".join([str(z) for z in self.zones])
|
|
return f"LEDEffectsInfo({self.device}, readable {self.readable}\n{zones})"
|
|
|
|
|
|
ButtonBehaviors = _NamedInts(MacroExecute=0x0, MacroStop=0x1, MacroStopAll=0x2, Send=0x8, Function=0x9)
|
|
ButtonMappingTypes = _NamedInts(No_Action=0x0, Button=0x1, Modifier_And_Key=0x2, Consumer_Key=0x3)
|
|
ButtonFunctions = _NamedInts(
|
|
No_Action=0x0,
|
|
Tilt_Left=0x1,
|
|
Tilt_Right=0x2,
|
|
Next_DPI=0x3,
|
|
Previous_DPI=0x4,
|
|
Cycle_DPI=0x5,
|
|
Default_DPI=0x6,
|
|
Shift_DPI=0x7,
|
|
Next_Profile=0x8,
|
|
Previous_Profile=0x9,
|
|
Cycle_Profile=0xA,
|
|
G_Shift=0xB,
|
|
Battery_Status=0xC,
|
|
Profile_Select=0xD,
|
|
Mode_Switch=0xE,
|
|
Host_Button=0xF,
|
|
Scroll_Down=0x10,
|
|
Scroll_Up=0x11,
|
|
)
|
|
ButtonButtons = special_keys.MOUSE_BUTTONS
|
|
ButtonModifiers = special_keys.modifiers
|
|
ButtonKeys = special_keys.USB_HID_KEYCODES
|
|
ButtonConsumerKeys = special_keys.HID_CONSUMERCODES
|
|
|
|
|
|
class Button:
|
|
"""A button mapping"""
|
|
|
|
def __init__(self, **kwargs):
|
|
self.behavior = None
|
|
for key, val in kwargs.items():
|
|
setattr(self, key, val)
|
|
|
|
@classmethod
|
|
def from_yaml(cls, loader, node):
|
|
args = loader.construct_mapping(node)
|
|
return cls(**args)
|
|
|
|
@classmethod
|
|
def to_yaml(cls, dumper, data):
|
|
return dumper.represent_mapping("!Button", data.__dict__, flow_style=True)
|
|
|
|
@classmethod
|
|
def from_bytes(cls, bytes):
|
|
behavior = ButtonBehaviors[bytes[0] >> 4]
|
|
if behavior == ButtonBehaviors.MacroExecute or behavior == ButtonBehaviors.MacroStop:
|
|
sector = (bytes[0] & 0x0F) << 8 + bytes[1]
|
|
address = bytes[2] << 8 + bytes[3]
|
|
result = cls(behavior=behavior, sector=sector, address=address)
|
|
elif behavior == ButtonBehaviors.Send:
|
|
mapping_type = ButtonMappingTypes[bytes[1]]
|
|
if mapping_type == ButtonMappingTypes.Button:
|
|
value = ButtonButtons[(bytes[2] << 8) + bytes[3]]
|
|
result = cls(behavior=behavior, type=mapping_type, value=value)
|
|
elif mapping_type == ButtonMappingTypes.Modifier_And_Key:
|
|
modifiers = bytes[2]
|
|
value = ButtonKeys[bytes[3]]
|
|
result = cls(behavior=behavior, type=mapping_type, modifiers=modifiers, value=value)
|
|
elif mapping_type == ButtonMappingTypes.Consumer_Key:
|
|
value = ButtonConsumerKeys[(bytes[2] << 8) + bytes[3]]
|
|
result = cls(behavior=behavior, type=mapping_type, value=value)
|
|
elif mapping_type == ButtonMappingTypes.No_Action:
|
|
result = cls(behavior=behavior, type=mapping_type)
|
|
elif behavior == ButtonBehaviors.Function:
|
|
value = ButtonFunctions[bytes[1]] if ButtonFunctions[bytes[1]] is not None else bytes[1]
|
|
data = bytes[3]
|
|
result = cls(behavior=behavior, value=value, data=data)
|
|
else:
|
|
result = cls(behavior=bytes[0] >> 4, bytes=bytes)
|
|
return result
|
|
|
|
def to_bytes(self):
|
|
bytes = _int2bytes(self.behavior << 4, 1) if self.behavior is not None else None
|
|
if self.behavior == ButtonBehaviors.MacroExecute or self.behavior == ButtonBehaviors.MacroStop:
|
|
bytes = _int2bytes(self.sector, 2) + _int2bytes(self.address, 2)
|
|
bytes[0] += self.behavior << 4
|
|
elif self.behavior == ButtonBehaviors.Send:
|
|
bytes += _int2bytes(self.type, 1)
|
|
if self.type == ButtonMappingTypes.Button:
|
|
bytes += _int2bytes(self.value, 2)
|
|
elif self.type == ButtonMappingTypes.Modifier_And_Key:
|
|
bytes += _int2bytes(self.modifiers, 1)
|
|
bytes += _int2bytes(self.value, 1)
|
|
elif self.type == ButtonMappingTypes.Consumer_Key:
|
|
bytes += _int2bytes(self.value, 2)
|
|
elif self.type == ButtonMappingTypes.No_Action:
|
|
bytes += b"\xff\xff"
|
|
elif self.behavior == ButtonBehaviors.Function:
|
|
bytes += _int2bytes(self.value, 1) + b"\xff" + (_int2bytes(self.data, 1) if self.data else b"\x00")
|
|
else:
|
|
bytes = self.bytes if self.bytes else b"\xff\xff\xff\xff"
|
|
return bytes
|
|
|
|
def __repr__(self):
|
|
return "%s{%s}" % (
|
|
self.__class__.__name__,
|
|
", ".join([str(key) + ":" + str(val) for key, val in self.__dict__.items()]),
|
|
)
|
|
|
|
|
|
_yaml.SafeLoader.add_constructor("!Button", Button.from_yaml)
|
|
_yaml.add_representer(Button, Button.to_yaml)
|
|
|
|
|
|
class OnboardProfile:
|
|
"""A single onboard profile"""
|
|
|
|
def __init__(self, **kwargs):
|
|
for key, val in kwargs.items():
|
|
setattr(self, key, val)
|
|
|
|
@classmethod
|
|
def from_yaml(cls, loader, node):
|
|
args = loader.construct_mapping(node)
|
|
return cls(**args)
|
|
|
|
@classmethod
|
|
def to_yaml(cls, dumper, data):
|
|
return dumper.represent_mapping("!OnboardProfile", data.__dict__)
|
|
|
|
@classmethod
|
|
def from_bytes(cls, sector, enabled, buttons, gbuttons, bytes):
|
|
return cls(
|
|
sector=sector,
|
|
enabled=enabled,
|
|
report_rate=bytes[0],
|
|
resolution_default_index=bytes[1],
|
|
resolution_shift_index=bytes[2],
|
|
resolutions=[_unpack("<H", bytes[i * 2 + 3 : i * 2 + 5])[0] for i in range(0, 5)],
|
|
red=bytes[13],
|
|
green=bytes[14],
|
|
blue=bytes[15],
|
|
power_mode=bytes[16],
|
|
angle_snap=bytes[17],
|
|
write_count=_unpack("<H", bytes[18:20])[0],
|
|
reserved=bytes[20:28],
|
|
ps_timeout=_unpack("<H", bytes[28:30])[0],
|
|
po_timeout=_unpack("<H", bytes[30:32])[0],
|
|
buttons=[Button.from_bytes(bytes[32 + i * 4 : 32 + i * 4 + 4]) for i in range(0, buttons)],
|
|
gbuttons=[Button.from_bytes(bytes[96 + i * 4 : 96 + i * 4 + 4]) for i in range(0, gbuttons)],
|
|
name=bytes[160:208].decode("utf-16le").rstrip("\x00").rstrip("\uFFFF"),
|
|
lighting=[LEDEffectSetting.from_bytes(bytes[208 + i * 11 : 219 + i * 11]) for i in range(0, 4)],
|
|
)
|
|
|
|
@classmethod
|
|
def from_dev(cls, dev, i, sector, s, enabled, buttons, gbuttons):
|
|
bytes = OnboardProfiles.read_sector(dev, sector, s)
|
|
return cls.from_bytes(sector, enabled, buttons, gbuttons, bytes)
|
|
|
|
def to_bytes(self, length):
|
|
bytes = _int2bytes(self.report_rate, 1)
|
|
bytes += _int2bytes(self.resolution_default_index, 1) + _int2bytes(self.resolution_shift_index, 1)
|
|
bytes += b"".join([self.resolutions[i].to_bytes(2, "little") for i in range(0, 5)])
|
|
bytes += _int2bytes(self.red, 1) + _int2bytes(self.green, 1) + _int2bytes(self.blue, 1)
|
|
bytes += _int2bytes(self.power_mode, 1) + _int2bytes(self.angle_snap, 1)
|
|
bytes += self.write_count.to_bytes(2, "little") + self.reserved
|
|
bytes += self.ps_timeout.to_bytes(2, "little") + self.po_timeout.to_bytes(2, "little")
|
|
for i in range(0, 16):
|
|
bytes += self.buttons[i].to_bytes() if i < len(self.buttons) else b"\xff\xff\xff\xff"
|
|
for i in range(0, 16):
|
|
bytes += self.gbuttons[i].to_bytes() if i < len(self.gbuttons) else b"\xff\xff\xff\xff"
|
|
if self.enabled:
|
|
bytes += self.name[0:24].ljust(24, "\x00").encode("utf-16le")
|
|
else:
|
|
bytes += b"\xff" * 48
|
|
for i in range(0, 4):
|
|
bytes += self.lighting[i].to_bytes()
|
|
while len(bytes) < length - 2:
|
|
bytes += b"\xff"
|
|
bytes += _int2bytes(_crc16(bytes), 2)
|
|
return bytes
|
|
|
|
def dump(self):
|
|
print(f" Onboard Profile: {self.name}")
|
|
print(f" Report Rate {self.report_rate} ms")
|
|
print(f" DPI Resolutions {self.resolutions}")
|
|
print(f" Default Resolution Index {self.res_index}, Shift Resolution Index {self.res_shift_index}")
|
|
print(f" Colors {self.red} {self.green} {self.blue}")
|
|
print(f" Power {self.power_mode}, Angle Snapping {self.angle_snap}")
|
|
for i in range(0, len(self.buttons)):
|
|
if self.buttons[i].behavior is not None:
|
|
print(" BUTTON", i + 1, self.buttons[i])
|
|
for i in range(0, len(self.gbuttons)):
|
|
if self.gbuttons[i].behavior is not None:
|
|
print(" G-BUTTON", i + 1, self.gbuttons[i])
|
|
|
|
|
|
_yaml.SafeLoader.add_constructor("!OnboardProfile", OnboardProfile.from_yaml)
|
|
_yaml.add_representer(OnboardProfile, OnboardProfile.to_yaml)
|
|
|
|
OnboardProfilesVersion = 3
|
|
|
|
|
|
# Doesn't handle macros
|
|
class OnboardProfiles:
|
|
"""The entire onboard profiles information"""
|
|
|
|
def __init__(self, **kwargs):
|
|
for key, val in kwargs.items():
|
|
setattr(self, key, val)
|
|
|
|
@classmethod
|
|
def from_yaml(cls, loader, node):
|
|
args = loader.construct_mapping(node)
|
|
return cls(**args)
|
|
|
|
@classmethod
|
|
def to_yaml(cls, dumper, data):
|
|
return dumper.represent_mapping("!OnboardProfiles", data.__dict__)
|
|
|
|
@classmethod
|
|
def get_profile_headers(cls, device):
|
|
i = 0
|
|
headers = []
|
|
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, 0, 0, 0, i)
|
|
s = 0x00
|
|
if chunk[0:4] == b"\x00\x00\x00\x00": # look in ROM instead
|
|
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, 0x01, 0, 0, i)
|
|
s = 0x01
|
|
while chunk[0:2] != b"\xff\xff":
|
|
sector, enabled = _unpack("!HB", chunk[0:3])
|
|
headers.append((sector, enabled))
|
|
i += 1
|
|
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, s, 0, 0, i * 4)
|
|
return headers
|
|
|
|
@classmethod
|
|
def from_device(cls, device):
|
|
if not device.online: # wake the device up if necessary
|
|
device.ping()
|
|
response = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x00)
|
|
memory, profile, _macro = _unpack("!BBB", response[0:3])
|
|
if memory != 0x01 or profile > 0x04:
|
|
return
|
|
count, oob, buttons, sectors, size, shift = _unpack("!BBBBHB", response[3:10])
|
|
gbuttons = buttons if (shift & 0x3 == 0x2) else 0
|
|
headers = OnboardProfiles.get_profile_headers(device)
|
|
profiles = {}
|
|
i = 0
|
|
for sector, enabled in headers:
|
|
profiles[i + 1] = OnboardProfile.from_dev(device, i, sector, size, enabled, buttons, gbuttons)
|
|
i += 1
|
|
return cls(
|
|
version=OnboardProfilesVersion,
|
|
name=device.name,
|
|
count=count,
|
|
buttons=buttons,
|
|
gbuttons=gbuttons,
|
|
sectors=sectors,
|
|
size=size,
|
|
profiles=profiles,
|
|
)
|
|
|
|
def to_bytes(self):
|
|
bytes = b""
|
|
for i in range(1, len(self.profiles) + 1):
|
|
bytes += _int2bytes(self.profiles[i].sector, 2) + _int2bytes(self.profiles[i].enabled, 1) + b"\x00"
|
|
bytes += b"\xff\xff\x00\x00" # marker after last profile
|
|
while len(bytes) < self.size - 2: # leave room for CRC
|
|
bytes += b"\xff"
|
|
bytes += _int2bytes(_crc16(bytes), 2)
|
|
return bytes
|
|
|
|
@classmethod
|
|
def read_sector(cls, dev, sector, s): # doesn't check for valid sector or size
|
|
bytes = b""
|
|
o = 0
|
|
while o < s - 15:
|
|
chunk = dev.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, sector >> 8, sector & 0xFF, o >> 8, o & 0xFF)
|
|
bytes += chunk
|
|
o += 16
|
|
chunk = dev.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, sector >> 8, sector & 0xFF, (s - 16) >> 8, (s - 16) & 0xFF)
|
|
bytes += chunk[16 + o - s :] # the last chunk has to be read in an awkward way
|
|
return bytes
|
|
|
|
@classmethod
|
|
def write_sector(cls, device, s, bs): # doesn't check for valid sector or size
|
|
rbs = OnboardProfiles.read_sector(device, s, len(bs))
|
|
if rbs[:-2] == bs[:-2]:
|
|
return False
|
|
device.feature_request(FEATURE.ONBOARD_PROFILES, 0x60, s >> 8, s & 0xFF, 0, 0, len(bs) >> 8, len(bs) & 0xFF)
|
|
o = 0
|
|
while o < len(bs) - 1:
|
|
device.feature_request(FEATURE.ONBOARD_PROFILES, 0x70, bs[o : o + 16])
|
|
o += 16
|
|
device.feature_request(FEATURE.ONBOARD_PROFILES, 0x80)
|
|
return True
|
|
|
|
def write(self, device):
|
|
try:
|
|
written = 1 if OnboardProfiles.write_sector(device, 0, self.to_bytes()) else 0
|
|
except Exception as e:
|
|
logger.warn("Exception writing onboard profile control sector")
|
|
raise e
|
|
for p in self.profiles.values():
|
|
try:
|
|
if p.sector >= self.sectors:
|
|
raise Exception(f"Sector {p.sector} not a writable sector")
|
|
written += 1 if OnboardProfiles.write_sector(device, p.sector, p.to_bytes(self.size)) else 0
|
|
except Exception as e:
|
|
logger.warn(f"Exception writing onboard profile sector {p.sector}")
|
|
raise e
|
|
return written
|
|
|
|
def show(self):
|
|
print(_yaml.dump(self))
|
|
|
|
|
|
_yaml.SafeLoader.add_constructor("!OnboardProfiles", OnboardProfiles.from_yaml)
|
|
_yaml.add_representer(OnboardProfiles, OnboardProfiles.to_yaml)
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
|
|
def feature_request(device, feature, function=0x00, *params, no_reply=False):
|
|
if device.online and device.features:
|
|
if feature in device.features:
|
|
feature_index = device.features[feature]
|
|
return device.request((feature_index << 8) + (function & 0xFF), *params, no_reply=no_reply)
|
|
|
|
|
|
def get_firmware(device):
|
|
"""Reads a device's firmware info.
|
|
|
|
:returns: a list of FirmwareInfo tuples, ordered by firmware layer.
|
|
"""
|
|
count = feature_request(device, FEATURE.DEVICE_FW_VERSION)
|
|
if count:
|
|
count = ord(count[:1])
|
|
|
|
fw = []
|
|
for index in range(0, count):
|
|
fw_info = feature_request(device, FEATURE.DEVICE_FW_VERSION, 0x10, index)
|
|
if fw_info:
|
|
level = ord(fw_info[:1]) & 0x0F
|
|
if level == 0 or level == 1:
|
|
name, version_major, version_minor, build = _unpack("!3sBBH", fw_info[1:8])
|
|
version = "%02X.%02X" % (version_major, version_minor)
|
|
if build:
|
|
version += ".B%04X" % build
|
|
extras = fw_info[9:].rstrip(b"\x00") or None
|
|
fw_info = _FirmwareInfo(_FIRMWARE_KIND[level], name.decode("ascii"), version, extras)
|
|
elif level == _FIRMWARE_KIND.Hardware:
|
|
fw_info = _FirmwareInfo(_FIRMWARE_KIND.Hardware, "", str(ord(fw_info[1:2])), None)
|
|
else:
|
|
fw_info = _FirmwareInfo(_FIRMWARE_KIND.Other, "", "", None)
|
|
|
|
fw.append(fw_info)
|
|
# if logger.isEnabledFor(logging.DEBUG):
|
|
# logger.debug("device %d firmware %s", devnumber, fw_info)
|
|
return tuple(fw)
|
|
|
|
|
|
def get_ids(device):
|
|
"""Reads a device's ids (unit and model numbers)"""
|
|
ids = feature_request(device, FEATURE.DEVICE_FW_VERSION)
|
|
if ids:
|
|
unitId = ids[1:5]
|
|
modelId = ids[7:13]
|
|
transport_bits = ord(ids[6:7])
|
|
offset = 0
|
|
tid_map = {}
|
|
for transport, flag in [("btid", 0x1), ("btleid", 0x02), ("wpid", 0x04), ("usbid", 0x08)]:
|
|
if transport_bits & flag:
|
|
tid_map[transport] = modelId[offset : offset + 2].hex().upper()
|
|
offset = offset + 2
|
|
return (unitId.hex().upper(), modelId.hex().upper(), tid_map)
|
|
|
|
|
|
def get_kind(device):
|
|
"""Reads a device's type.
|
|
|
|
:see DEVICE_KIND:
|
|
:returns: a string describing the device type, or ``None`` if the device is
|
|
not available or does not support the ``DEVICE_NAME`` feature.
|
|
"""
|
|
kind = feature_request(device, FEATURE.DEVICE_NAME, 0x20)
|
|
if kind:
|
|
kind = ord(kind[:1])
|
|
# if logger.isEnabledFor(logging.DEBUG):
|
|
# logger.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind])
|
|
return DEVICE_KIND[kind]
|
|
|
|
|
|
def get_name(device):
|
|
"""Reads a device's name.
|
|
|
|
:returns: a string with the device name, or ``None`` if the device is not
|
|
available or does not support the ``DEVICE_NAME`` feature.
|
|
"""
|
|
name_length = feature_request(device, FEATURE.DEVICE_NAME)
|
|
if name_length:
|
|
name_length = ord(name_length[:1])
|
|
|
|
name = b""
|
|
while len(name) < name_length:
|
|
fragment = feature_request(device, FEATURE.DEVICE_NAME, 0x10, len(name))
|
|
if fragment:
|
|
name += fragment[: name_length - len(name)]
|
|
else:
|
|
logger.error("failed to read whole name of %s (expected %d chars)", device, name_length)
|
|
return None
|
|
|
|
return name.decode("utf-8")
|
|
|
|
|
|
def get_friendly_name(device):
|
|
"""Reads a device's friendly name.
|
|
|
|
:returns: a string with the device name, or ``None`` if the device is not
|
|
available or does not support the ``DEVICE_NAME`` feature.
|
|
"""
|
|
name_length = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME)
|
|
if name_length:
|
|
name_length = ord(name_length[:1])
|
|
|
|
name = b""
|
|
while len(name) < name_length:
|
|
fragment = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME, 0x10, len(name))
|
|
if fragment:
|
|
initial_null = 0 if fragment[0] else 1 # initial null actually seen on a device
|
|
name += fragment[initial_null : name_length + initial_null - len(name)]
|
|
else:
|
|
logger.error("failed to read whole name of %s (expected %d chars)", device, name_length)
|
|
return None
|
|
|
|
return name.decode("utf-8")
|
|
|
|
|
|
def get_battery_status(device):
|
|
report = feature_request(device, FEATURE.BATTERY_STATUS)
|
|
if report:
|
|
return decipher_battery_status(report)
|
|
|
|
|
|
def decipher_battery_status(report):
|
|
discharge, next, status = _unpack("!BBB", report[:3])
|
|
discharge = None if discharge == 0 else discharge
|
|
status = BATTERY_STATUS[status]
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug("battery status %s%% charged, next %s%%, status %s", discharge, next, status)
|
|
return FEATURE.BATTERY_STATUS, discharge, next, status, None
|
|
|
|
|
|
def get_battery_unified(device):
|
|
report = feature_request(device, FEATURE.UNIFIED_BATTERY, 0x10)
|
|
if report is not None:
|
|
return decipher_battery_unified(report)
|
|
|
|
|
|
def decipher_battery_unified(report):
|
|
discharge, level, status, _ignore = _unpack("!BBBB", report[:4])
|
|
status = BATTERY_STATUS[status]
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug("battery unified %s%% charged, level %s, charging %s", discharge, level, status)
|
|
level = (
|
|
_BATTERY_APPROX.full
|
|
if level == 8 # full
|
|
else _BATTERY_APPROX.good
|
|
if level == 4 # good
|
|
else _BATTERY_APPROX.low
|
|
if level == 2 # low
|
|
else _BATTERY_APPROX.critical
|
|
if level == 1 # critical
|
|
else _BATTERY_APPROX.empty
|
|
)
|
|
return FEATURE.UNIFIED_BATTERY, discharge if discharge else level, None, status, None
|
|
|
|
|
|
# voltage to remaining charge from Logitech
|
|
battery_voltage_remaining = (
|
|
(4186, 100),
|
|
(4067, 90),
|
|
(3989, 80),
|
|
(3922, 70),
|
|
(3859, 60),
|
|
(3811, 50),
|
|
(3778, 40),
|
|
(3751, 30),
|
|
(3717, 20),
|
|
(3671, 10),
|
|
(3646, 5),
|
|
(3579, 2),
|
|
(3500, 0),
|
|
(-1000, 0),
|
|
)
|
|
|
|
|
|
def get_battery_voltage(device):
|
|
report = feature_request(device, FEATURE.BATTERY_VOLTAGE)
|
|
if report is not None:
|
|
return decipher_battery_voltage(report)
|
|
|
|
|
|
def decipher_battery_voltage(report):
|
|
voltage, flags = _unpack(">HB", report[:3])
|
|
status = BATTERY_STATUS.discharging
|
|
charge_sts = ERROR.unknown
|
|
charge_lvl = CHARGE_LEVEL.average
|
|
charge_type = CHARGE_TYPE.standard
|
|
if flags & (1 << 7):
|
|
status = BATTERY_STATUS.recharging
|
|
charge_sts = CHARGE_STATUS[flags & 0x03]
|
|
if charge_sts is None:
|
|
charge_sts = ERROR.unknown
|
|
elif charge_sts == CHARGE_STATUS.full:
|
|
charge_lvl = CHARGE_LEVEL.full
|
|
status = BATTERY_STATUS.full
|
|
if flags & (1 << 3):
|
|
charge_type = CHARGE_TYPE.fast
|
|
elif flags & (1 << 4):
|
|
charge_type = CHARGE_TYPE.slow
|
|
status = BATTERY_STATUS.slow_recharge
|
|
elif flags & (1 << 5):
|
|
charge_lvl = CHARGE_LEVEL.critical
|
|
for level in battery_voltage_remaining:
|
|
if level[0] < voltage:
|
|
charge_lvl = level[1]
|
|
break
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug(
|
|
"battery voltage %d mV, charging %s, status %d = %s, level %s, type %s",
|
|
voltage,
|
|
status,
|
|
(flags & 0x03),
|
|
charge_sts,
|
|
charge_lvl,
|
|
charge_type,
|
|
)
|
|
return FEATURE.BATTERY_VOLTAGE, charge_lvl, None, status, voltage
|
|
|
|
|
|
def get_adc_measurement(device):
|
|
try: # this feature call produces an error for headsets that are connected but inactive
|
|
report = feature_request(device, FEATURE.ADC_MEASUREMENT)
|
|
if report is not None:
|
|
return decipher_adc_measurement(report)
|
|
except exceptions.FeatureCallError:
|
|
return FEATURE.ADC_MEASUREMENT if FEATURE.ADC_MEASUREMENT in device.features else None
|
|
|
|
|
|
def decipher_adc_measurement(report):
|
|
# partial implementation - needs mapping to levels
|
|
adc, flags = _unpack("!HB", report[:3])
|
|
for level in battery_voltage_remaining:
|
|
if level[0] < adc:
|
|
charge_level = level[1]
|
|
break
|
|
if flags & 0x01:
|
|
status = BATTERY_STATUS.recharging if flags & 0x02 else BATTERY_STATUS.discharging
|
|
return FEATURE.ADC_MEASUREMENT, charge_level, None, status, adc
|
|
|
|
|
|
battery_functions = {
|
|
FEATURE.BATTERY_STATUS: get_battery_status,
|
|
FEATURE.BATTERY_VOLTAGE: get_battery_voltage,
|
|
FEATURE.UNIFIED_BATTERY: get_battery_unified,
|
|
FEATURE.ADC_MEASUREMENT: get_adc_measurement,
|
|
}
|
|
|
|
|
|
def get_battery(device, feature):
|
|
"""Return battery information - feature, approximate level, next, charging, voltage
|
|
or battery feature if there is one but it is not responding or None for no battery feature"""
|
|
if feature is not None:
|
|
battery_function = battery_functions.get(feature, None)
|
|
if battery_function:
|
|
result = battery_function(device)
|
|
if result:
|
|
return result
|
|
else:
|
|
for battery_function in battery_functions.values():
|
|
result = battery_function(device)
|
|
if result:
|
|
return result
|
|
return 0
|
|
|
|
|
|
def get_keys(device):
|
|
# TODO: add here additional variants for other REPROG_CONTROLS
|
|
count = None
|
|
if FEATURE.REPROG_CONTROLS_V2 in device.features:
|
|
count = feature_request(device, FEATURE.REPROG_CONTROLS_V2)
|
|
return KeysArrayV1(device, ord(count[:1]))
|
|
elif FEATURE.REPROG_CONTROLS_V4 in device.features:
|
|
count = feature_request(device, FEATURE.REPROG_CONTROLS_V4)
|
|
return KeysArrayV4(device, ord(count[:1]))
|
|
return None
|
|
|
|
|
|
def get_remap_keys(device):
|
|
count = feature_request(device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10)
|
|
if count:
|
|
return KeysArrayPersistent(device, ord(count[:1]))
|
|
|
|
|
|
def get_gestures(device):
|
|
if getattr(device, "_gestures", None) is not None:
|
|
return device._gestures
|
|
if FEATURE.GESTURE_2 in device.features:
|
|
return Gestures(device)
|
|
|
|
|
|
def get_backlight(device):
|
|
if getattr(device, "_backlight", None) is not None:
|
|
return device._backlight
|
|
if FEATURE.BACKLIGHT2 in device.features:
|
|
return Backlight(device)
|
|
|
|
|
|
def get_profiles(device):
|
|
if getattr(device, "_profiles", None) is not None:
|
|
return device._profiles
|
|
if FEATURE.ONBOARD_PROFILES in device.features:
|
|
return OnboardProfiles.from_device(device)
|
|
|
|
|
|
def get_mouse_pointer_info(device):
|
|
pointer_info = feature_request(device, FEATURE.MOUSE_POINTER)
|
|
if pointer_info:
|
|
dpi, flags = _unpack("!HB", pointer_info[:3])
|
|
acceleration = ("none", "low", "med", "high")[flags & 0x3]
|
|
suggest_os_ballistics = (flags & 0x04) != 0
|
|
suggest_vertical_orientation = (flags & 0x08) != 0
|
|
return {
|
|
"dpi": dpi,
|
|
"acceleration": acceleration,
|
|
"suggest_os_ballistics": suggest_os_ballistics,
|
|
"suggest_vertical_orientation": suggest_vertical_orientation,
|
|
}
|
|
|
|
|
|
def get_vertical_scrolling_info(device):
|
|
vertical_scrolling_info = feature_request(device, FEATURE.VERTICAL_SCROLLING)
|
|
if vertical_scrolling_info:
|
|
roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3])
|
|
roller_type = (
|
|
"reserved",
|
|
"standard",
|
|
"reserved",
|
|
"3G",
|
|
"micro",
|
|
"normal touch pad",
|
|
"inverted touch pad",
|
|
"reserved",
|
|
)[roller]
|
|
return {"roller": roller_type, "ratchet": ratchet, "lines": lines}
|
|
|
|
|
|
def get_hi_res_scrolling_info(device):
|
|
hi_res_scrolling_info = feature_request(device, FEATURE.HI_RES_SCROLLING)
|
|
if hi_res_scrolling_info:
|
|
mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2])
|
|
return mode, resolution
|
|
|
|
|
|
def get_pointer_speed_info(device):
|
|
pointer_speed_info = feature_request(device, FEATURE.POINTER_SPEED)
|
|
if pointer_speed_info:
|
|
pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2])
|
|
# if pointer_speed_lo > 0:
|
|
# pointer_speed_lo = pointer_speed_lo
|
|
return pointer_speed_hi + pointer_speed_lo / 256
|
|
|
|
|
|
def get_lowres_wheel_status(device):
|
|
lowres_wheel_status = feature_request(device, FEATURE.LOWRES_WHEEL)
|
|
if lowres_wheel_status:
|
|
wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0]
|
|
wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01]
|
|
return wheel_reporting
|
|
|
|
|
|
def get_hires_wheel(device):
|
|
caps = feature_request(device, FEATURE.HIRES_WHEEL, 0x00)
|
|
mode = feature_request(device, FEATURE.HIRES_WHEEL, 0x10)
|
|
ratchet = feature_request(device, FEATURE.HIRES_WHEEL, 0x030)
|
|
|
|
if caps and mode and ratchet:
|
|
# Parse caps
|
|
multi, flags = _unpack("!BB", caps[:2])
|
|
|
|
has_invert = (flags & 0x08) != 0
|
|
has_ratchet = (flags & 0x04) != 0
|
|
|
|
# Parse mode
|
|
wheel_mode, reserved = _unpack("!BB", mode[:2])
|
|
|
|
target = (wheel_mode & 0x01) != 0
|
|
res = (wheel_mode & 0x02) != 0
|
|
inv = (wheel_mode & 0x04) != 0
|
|
|
|
# Parse Ratchet switch
|
|
ratchet_mode, reserved = _unpack("!BB", ratchet[:2])
|
|
|
|
ratchet = (ratchet_mode & 0x01) != 0
|
|
|
|
return multi, has_invert, has_ratchet, inv, res, target, ratchet
|
|
|
|
|
|
def get_new_fn_inversion(device):
|
|
state = feature_request(device, FEATURE.NEW_FN_INVERSION, 0x00)
|
|
if state:
|
|
inverted, default_inverted = _unpack("!BB", state[:2])
|
|
inverted = (inverted & 0x01) != 0
|
|
default_inverted = (default_inverted & 0x01) != 0
|
|
return inverted, default_inverted
|
|
|
|
|
|
def get_host_names(device):
|
|
state = feature_request(device, FEATURE.HOSTS_INFO, 0x00)
|
|
host_names = {}
|
|
if state:
|
|
capability_flags, _ignore, numHosts, currentHost = _unpack("!BBBB", state[:4])
|
|
if capability_flags & 0x01: # device can get host names
|
|
for host in range(0, numHosts):
|
|
hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, host)
|
|
_ignore, status, _ignore, _ignore, nameLen, _ignore = _unpack("!BBBBBB", hostinfo[:6])
|
|
name = ""
|
|
remaining = nameLen
|
|
while remaining > 0:
|
|
name_piece = feature_request(device, FEATURE.HOSTS_INFO, 0x30, host, nameLen - remaining)
|
|
if name_piece:
|
|
name += name_piece[2 : 2 + min(remaining, 14)].decode()
|
|
remaining = max(0, remaining - 14)
|
|
else:
|
|
remaining = 0
|
|
host_names[host] = (bool(status), name)
|
|
# update the current host's name if it doesn't match the system name
|
|
hostname = socket.gethostname().partition(".")[0]
|
|
if host_names[currentHost][1] != hostname:
|
|
set_host_name(device, hostname, host_names[currentHost][1])
|
|
host_names[currentHost] = (host_names[currentHost][0], hostname)
|
|
return host_names
|
|
|
|
|
|
def set_host_name(device, name, currentName=""):
|
|
name = bytearray(name, "utf-8")
|
|
currentName = bytearray(currentName, "utf-8")
|
|
if logger.isEnabledFor(logging.INFO):
|
|
logger.info("Setting host name to %s", name)
|
|
state = feature_request(device, FEATURE.HOSTS_INFO, 0x00)
|
|
if state:
|
|
flags, _ignore, _ignore, currentHost = _unpack("!BBBB", state[:4])
|
|
if flags & 0x02:
|
|
hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, currentHost)
|
|
_ignore, _ignore, _ignore, _ignore, _ignore, maxNameLen = _unpack("!BBBBBB", hostinfo[:6])
|
|
if name[:maxNameLen] == currentName[:maxNameLen] and False:
|
|
return True
|
|
length = min(maxNameLen, len(name))
|
|
chunk = 0
|
|
while chunk < length:
|
|
response = feature_request(device, FEATURE.HOSTS_INFO, 0x40, currentHost, chunk, name[chunk : chunk + 14])
|
|
if not response:
|
|
return False
|
|
chunk += 14
|
|
return True
|
|
|
|
|
|
def get_onboard_mode(device):
|
|
state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x20)
|
|
|
|
if state:
|
|
mode = _unpack("!B", state[:1])[0]
|
|
return mode
|
|
|
|
|
|
def set_onboard_mode(device, mode):
|
|
state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x10, mode)
|
|
return state
|
|
|
|
|
|
def get_polling_rate(device):
|
|
state = feature_request(device, FEATURE.REPORT_RATE, 0x10)
|
|
if state:
|
|
rate = _unpack("!B", state[:1])[0]
|
|
return str(rate) + "ms"
|
|
else:
|
|
rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"]
|
|
state = feature_request(device, FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20)
|
|
if state:
|
|
rate = _unpack("!B", state[:1])[0]
|
|
return rates[rate]
|
|
|
|
|
|
def get_remaining_pairing(device):
|
|
result = feature_request(device, FEATURE.REMAINING_PAIRING, 0x0)
|
|
if result:
|
|
result = _unpack("!B", result[:1])[0]
|
|
return result
|
|
|
|
|
|
def config_change(device, configuration, no_reply=False):
|
|
return feature_request(device, FEATURE.CONFIG_CHANGE, 0x00, configuration, no_reply=no_reply)
|