Unify imports in logitech package

Related #2273
This commit is contained in:
MattHag 2024-05-16 20:49:01 +02:00 committed by Peter F. Patel-Schneider
parent 90b0db6c3b
commit 815dce07be
15 changed files with 871 additions and 891 deletions

View File

@ -20,32 +20,28 @@
from __future__ import annotations
import logging
import threading as _threading
import struct
import threading
from collections import namedtuple
from contextlib import contextmanager
from random import getrandbits as _random_bits
from struct import pack as _pack
from time import time as _timestamp
from random import getrandbits
from time import time
import hidapi as _hid
import hidapi
from . import base_usb
from . import common
from . import descriptors
from . import exceptions
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp10_constants
from . import hidpp20
from . import hidpp20_constants as _hidpp20_constants
from .base_usb import ALL as _RECEIVER_USB_IDS
from .common import strhex as _strhex
from .descriptors import DEVICES as _DEVICES
from . import hidpp20_constants
logger = logging.getLogger(__name__)
_hidpp20 = hidpp20.Hidpp20()
#
#
#
def _wired_device(product_id, interface):
return {"vendor_id": 1133, "product_id": product_id, "bus_id": 3, "usb_interface": interface, "isDevice": True}
@ -57,7 +53,7 @@ def _bt_device(product_id):
DEVICE_IDS = []
for _ignore, d in _DEVICES.items():
for _ignore, d in descriptors.DEVICES.items():
if d.usbid:
DEVICE_IDS.append(_wired_device(d.usbid, d.interface if d.interface else 2))
if d.btid:
@ -81,7 +77,7 @@ def product_information(usb_id: int | str) -> dict:
if isinstance(usb_id, str):
usb_id = int(usb_id, 16)
for r in _RECEIVER_USB_IDS:
for r in base_usb.ALL:
if usb_id == r.get("product_id"):
return r
return {}
@ -131,7 +127,7 @@ def match(record, bus_id, vendor_id, product_id):
def filter_receivers(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):
"""Check that this product is a Logitech receiver and if so return the receiver record for further checking"""
for record in _RECEIVER_USB_IDS: # known receivers
for record in base_usb.ALL: # known receivers
if match(record, bus_id, vendor_id, product_id):
return record
if vendor_id == 0x046D and 0xC500 <= product_id <= 0xC5FF: # unknown receiver
@ -140,7 +136,7 @@ def filter_receivers(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_lon
def receivers():
"""Enumerate all the receivers attached to the machine."""
yield from _hid.enumerate(filter_receivers)
yield from hidapi.enumerate(filter_receivers)
def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):
@ -159,12 +155,12 @@ def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):
def receivers_and_devices():
"""Enumerate all the receivers and devices directly attached to the machine."""
yield from _hid.enumerate(filter)
yield from hidapi.enumerate(filter)
def notify_on_receivers_glib(callback):
"""Watch for matching devices and notifies the callback on the GLib thread."""
return _hid.monitor_glib(callback, filter)
return hidapi.monitor_glib(callback, filter)
#
@ -185,7 +181,7 @@ def open_path(path):
:returns: an open receiver handle if this is the right Linux device, or
``None``.
"""
return _hid.open_path(path)
return hidapi.open_path(path)
def open():
@ -204,13 +200,11 @@ def close(handle):
if handle:
try:
if isinstance(handle, int):
_hid.close(handle)
hidapi.close(handle)
else:
handle.close()
# logger.info("closed receiver handle %r", handle)
return True
except Exception:
# logger.exception("closing receiver handle %r", handle)
pass
return False
@ -234,14 +228,21 @@ def write(handle, devnumber, data, long_message=False):
assert isinstance(data, bytes), (repr(data), type(data))
if long_message or len(data) > _SHORT_MESSAGE_SIZE - 2 or data[:1] == b"\x82":
wdata = _pack("!BB18s", HIDPP_LONG_MESSAGE_ID, devnumber, data)
wdata = struct.pack("!BB18s", HIDPP_LONG_MESSAGE_ID, devnumber, data)
else:
wdata = _pack("!BB5s", HIDPP_SHORT_MESSAGE_ID, devnumber, data)
wdata = struct.pack("!BB5s", HIDPP_SHORT_MESSAGE_ID, devnumber, data)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("(%s) <= w[%02X %02X %s %s]", handle, ord(wdata[:1]), devnumber, _strhex(wdata[2:4]), _strhex(wdata[4:]))
logger.debug(
"(%s) <= w[%02X %02X %s %s]",
handle,
ord(wdata[:1]),
devnumber,
common.strhex(wdata[2:4]),
common.strhex(wdata[4:]),
)
try:
_hid.write(int(handle), wdata)
hidapi.write(int(handle), wdata)
except Exception as reason:
logger.error("write failed, assuming handle %r no longer available", handle)
close(handle)
@ -274,7 +275,7 @@ def check_message(data):
if report_lengths.get(report_id) == len(data):
return True
else:
logger.warning(f"unexpected message size: report_id {report_id:02X} message {_strhex(data)}")
logger.warning(f"unexpected message size: report_id {report_id:02X} message {common.strhex(data)}")
return False
@ -290,7 +291,7 @@ def _read(handle, timeout):
try:
# convert timeout to milliseconds, the hidapi expects it
timeout = int(timeout * 1000)
data = _hid.read(int(handle), _MAX_READ_SIZE, timeout)
data = hidapi.read(int(handle), _MAX_READ_SIZE, timeout)
except Exception as reason:
logger.warning("read failed, assuming handle %r no longer available", handle)
close(handle)
@ -303,7 +304,9 @@ def _read(handle, timeout):
if logger.isEnabledFor(logging.DEBUG) and (
report_id != DJ_MESSAGE_ID or ord(data[2:3]) > 0x10
): # ignore DJ input messages
logger.debug("(%s) => r[%02X %02X %s %s]", handle, report_id, devnumber, _strhex(data[2:4]), _strhex(data[4:]))
logger.debug(
"(%s) => r[%02X %02X %s %s]", handle, report_id, devnumber, common.strhex(data[2:4]), common.strhex(data[4:])
)
return report_id, devnumber, data[2:]
@ -322,7 +325,7 @@ def _skip_incoming(handle, ihandle, notifications_hook):
while True:
try:
# read whatever is already in the buffer, if any
data = _hid.read(ihandle, _MAX_READ_SIZE, 0)
data = hidapi.read(ihandle, _MAX_READ_SIZE, 0)
except Exception as reason:
logger.error("read failed, assuming receiver %s no longer available", handle)
close(handle)
@ -380,14 +383,10 @@ _HIDPP_Notification.__str__ = lambda self: "Notification(%02x,%d,%02X,%02X,%s)"
self.devnumber,
self.sub_id,
self.address,
_strhex(self.data),
common.strhex(self.data),
)
#
#
#
request_lock = _threading.Lock() # serialize all requests
request_lock = threading.Lock() # serialize all requests
handles_lock = {}
@ -396,7 +395,7 @@ def handle_lock(handle):
if handles_lock.get(handle) is None:
if logger.isEnabledFor(logging.INFO):
logger.info("New lock %s", repr(handle))
handles_lock[handle] = _threading.Lock() # Serialize requests on the handle
handles_lock[handle] = threading.Lock() # Serialize requests on the handle
return handles_lock[handle]
@ -422,10 +421,6 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
:param params: parameters for the feature call, 3 to 16 bytes.
:returns: the reply data, or ``None`` if some error occurred. or no reply expected
"""
# import inspect as _inspect
# print ('\n '.join(str(s) for s in _inspect.stack()))
with acquire_timeout(handle_lock(handle), handle, 10.0):
assert isinstance(request_id, int)
if (devnumber != 0xFF or protocol >= 2.0) and request_id < 0x8000:
@ -434,7 +429,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
# most significant bit (8) in SoftwareId, to make notifications easier
# to distinguish from request replies.
# This only applies to peripheral requests, ofc.
request_id = (request_id & 0xFFF0) | 0x08 | _random_bits(3)
request_id = (request_id & 0xFFF0) | 0x08 | getrandbits(3)
timeout = _RECEIVER_REQUEST_TIMEOUT if devnumber == 0xFF else _DEVICE_REQUEST_TIMEOUT
# be extra patient on long register read
@ -442,12 +437,10 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
timeout *= 2
if params:
params = b"".join(_pack("B", p) if isinstance(p, int) else p for p in params)
params = b"".join(struct.pack("B", p) if isinstance(p, int) else p for p in params)
else:
params = b""
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("(%s) device %d request_id {%04X} params [%s]", handle, devnumber, request_id, _strhex(params))
request_data = _pack("!H", request_id) + params
request_data = struct.pack("!H", request_id) + params
ihandle = int(handle)
notifications_hook = getattr(handle, "notifications_hook", None)
@ -462,7 +455,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
return None
# we consider timeout from this point
request_started = _timestamp()
request_started = time()
delta = 0
while delta < timeout:
@ -473,7 +466,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
if reply_devnumber == devnumber or reply_devnumber == devnumber ^ 0xFF: # BT device returning 0x00
if (
report_id == HIDPP_SHORT_MESSAGE_ID
and reply_data[:1] == b"\x8F"
and reply_data[:1] == b"\x8f"
and reply_data[1:3] == request_data[:2]
):
error = ord(reply_data[3:4])
@ -485,10 +478,10 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
devnumber,
request_id,
error,
_hidpp10_constants.ERROR[error],
hidpp10_constants.ERROR[error],
)
return _hidpp10_constants.ERROR[error] if return_error else None
if reply_data[:1] == b"\xFF" and reply_data[1:3] == request_data[:2]:
return hidpp10_constants.ERROR[error] if return_error else None
if reply_data[:1] == b"\xff" and reply_data[1:3] == request_data[:2]:
# a HID++ 2.0 feature call returned with an error
error = ord(reply_data[3:4])
logger.error(
@ -497,7 +490,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
devnumber,
request_id,
error,
_hidpp20_constants.ERROR[error],
hidpp20_constants.ERROR[error],
)
raise exceptions.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)
@ -517,20 +510,13 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
else:
# a reply was received, but did not match our request in any way
# reset the timeout starting point
request_started = _timestamp()
request_started = time()
if notifications_hook:
n = make_notification(report_id, reply_devnumber, reply_data)
if n:
notifications_hook(n)
# elif logger.isEnabledFor(logging.DEBUG):
# logger.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
# elif logger.isEnabledFor(logging.DEBUG):
# logger.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
delta = _timestamp() - request_started
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("(%s) still waiting for reply, delta %f", handle, delta)
delta = time() - request_started
logger.warning(
"timeout (%0.2f/%0.2f) on device %d request {%04X} params [%s]",
@ -538,7 +524,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
timeout,
devnumber,
request_id,
_strhex(params),
common.strhex(params),
)
# raise DeviceUnreachable(number=devnumber, request=request_id)
@ -560,11 +546,11 @@ def ping(handle, devnumber, long_message=False):
# randomize the SoftwareId and mark byte to be able to identify the ping
# reply, and set most significant (0x8) bit in SoftwareId so that the reply
# is always distinguishable from notifications
request_id = 0x0018 | _random_bits(3)
request_data = _pack("!HBBB", request_id, 0, 0, _random_bits(8))
request_id = 0x0018 | getrandbits(3)
request_data = struct.pack("!HBBB", request_id, 0, 0, getrandbits(8))
write(int(handle), devnumber, request_data, long_message)
request_started = _timestamp() # we consider timeout from this point
request_started = time() # we consider timeout from this point
delta = 0
while delta < _PING_TIMEOUT:
reply = _read(handle, _PING_TIMEOUT)
@ -577,18 +563,18 @@ def ping(handle, devnumber, long_message=False):
if (
report_id == HIDPP_SHORT_MESSAGE_ID
and reply_data[:1] == b"\x8F"
and reply_data[:1] == b"\x8f"
and reply_data[1:3] == request_data[:2]
): # error response
error = ord(reply_data[3:4])
if error == _hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
if error == hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
return 1.0
if (
error == _hidpp10_constants.ERROR.resource_error
or error == _hidpp10_constants.ERROR.connection_request_failed
error == hidpp10_constants.ERROR.resource_error
or error == hidpp10_constants.ERROR.connection_request_failed
):
return # device unreachable
if error == _hidpp10_constants.ERROR.unknown_device: # no paired device with that number
if error == hidpp10_constants.ERROR.unknown_device: # no paired device with that number
logger.error("(%s) device %d error on ping request: unknown device", handle, devnumber)
raise exceptions.NoSuchDevice(number=devnumber, request=request_id)
@ -596,9 +582,7 @@ def ping(handle, devnumber, long_message=False):
n = make_notification(report_id, reply_devnumber, reply_data)
if n:
notifications_hook(n)
# elif logger.isEnabledFor(logging.DEBUG):
# logger.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
delta = _timestamp() - request_started
delta = time() - request_started
logger.warning("(%s) timeout (%0.2f/%0.2f) on device %d ping", handle, delta, _PING_TIMEOUT, devnumber)

View File

@ -14,16 +14,16 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from __future__ import annotations
# Some common functions and types.
import binascii
import dataclasses
from binascii import hexlify as _hexlify
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
from typing import Union
import yaml as _yaml
import yaml
from solaar.i18n import _
@ -348,8 +348,8 @@ class NamedInt(int):
return dumper.represent_mapping("!NamedInt", {"value": int(data), "name": data.name}, flow_style=True)
_yaml.SafeLoader.add_constructor("!NamedInt", NamedInt.from_yaml)
_yaml.add_representer(NamedInt, NamedInt.to_yaml)
yaml.SafeLoader.add_constructor("!NamedInt", NamedInt.from_yaml)
yaml.add_representer(NamedInt, NamedInt.to_yaml)
class NamedInts:
@ -512,7 +512,7 @@ class UnsortedNamedInts(NamedInts):
def strhex(x):
assert x is not None
"""Produce a hex-string representation of a sequence of bytes."""
return _hexlify(x).decode("ascii").upper()
return binascii.hexlify(x).decode("ascii").upper()
def bytes2int(x, signed=False):
@ -541,7 +541,7 @@ class KwException(Exception):
return self.args[0].get(k) # was self.args[0][k]
@dataclass
@dataclasses.dataclass
class FirmwareInfo:
kind: str
name: str
@ -567,7 +567,7 @@ class BatteryLevelApproximation(IntEnum):
FULL = 90
@dataclass
@dataclasses.dataclass
class Battery:
"""Information about the current state of a battery"""

View File

@ -22,12 +22,8 @@
# - the device uses a USB interface other than 2
# - the name or codename should be different from what the device reports
from .hidpp10_constants import DEVICE_KIND as _DK
from .hidpp10_constants import REGISTERS as _R
#
#
#
from .hidpp10_constants import DEVICE_KIND
from .hidpp10_constants import REGISTERS as REG
class _DeviceDescriptor:
@ -73,15 +69,15 @@ def _D(
):
if kind is None:
kind = (
_DK.mouse
DEVICE_KIND.mouse
if "Mouse" in name
else _DK.keyboard
else DEVICE_KIND.keyboard
if "Keyboard" in name
else _DK.numpad
else DEVICE_KIND.numpad
if "Number Pad" in name
else _DK.touchpad
else DEVICE_KIND.touchpad
if "Touchpad" in name
else _DK.trackball
else DEVICE_KIND.trackball
if "Trackball" in name
else None
)
@ -94,9 +90,12 @@ def _D(
assert w[0:1] == "4", f"{name} has protocol {protocol:0.1f}, wpid {w}"
else:
if w[0:1] == "1":
assert kind == _DK.mouse, f"{name} has protocol {protocol:0.1f}, wpid {w}"
assert kind == DEVICE_KIND.mouse, f"{name} has protocol {protocol:0.1f}, wpid {w}"
elif w[0:1] == "2":
assert kind in (_DK.keyboard, _DK.numpad), f"{name} has protocol {protocol:0.1f}, wpid {w}"
assert kind in (
DEVICE_KIND.keyboard,
DEVICE_KIND.numpad,
), f"{name} has protocol {protocol:0.1f}, wpid {w}"
device_descriptor = _DeviceDescriptor(
name=name,
@ -192,24 +191,24 @@ def get_btid(btid):
# Keyboards
_D("Wireless Keyboard EX110", codename="EX110", protocol=1.0, wpid="0055", registers=(_R.battery_status,))
_D("Wireless Keyboard S510", codename="S510", protocol=1.0, wpid="0056", registers=(_R.battery_status,))
_D("Wireless Wave Keyboard K550", codename="K550", protocol=1.0, wpid="0060", registers=(_R.battery_status,))
_D("Wireless Keyboard EX100", codename="EX100", protocol=1.0, wpid="0065", registers=(_R.battery_status,))
_D("Wireless Keyboard MK300", codename="MK300", protocol=1.0, wpid="0068", registers=(_R.battery_status,))
_D("Number Pad N545", codename="N545", protocol=1.0, wpid="2006", registers=(_R.battery_status,))
_D("Wireless Compact Keyboard K340", codename="K340", protocol=1.0, wpid="2007", registers=(_R.battery_status,))
_D("Wireless Keyboard MK700", codename="MK700", protocol=1.0, wpid="2008", registers=(_R.battery_status,))
_D("Wireless Wave Keyboard K350", codename="K350", protocol=1.0, wpid="200A", registers=(_R.battery_status,))
_D("Wireless Keyboard MK320", codename="MK320", protocol=1.0, wpid="200F", registers=(_R.battery_status,))
_D("Wireless Keyboard EX110", codename="EX110", protocol=1.0, wpid="0055", registers=(REG.battery_status,))
_D("Wireless Keyboard S510", codename="S510", protocol=1.0, wpid="0056", registers=(REG.battery_status,))
_D("Wireless Wave Keyboard K550", codename="K550", protocol=1.0, wpid="0060", registers=(REG.battery_status,))
_D("Wireless Keyboard EX100", codename="EX100", protocol=1.0, wpid="0065", registers=(REG.battery_status,))
_D("Wireless Keyboard MK300", codename="MK300", protocol=1.0, wpid="0068", registers=(REG.battery_status,))
_D("Number Pad N545", codename="N545", protocol=1.0, wpid="2006", registers=(REG.battery_status,))
_D("Wireless Compact Keyboard K340", codename="K340", protocol=1.0, wpid="2007", registers=(REG.battery_status,))
_D("Wireless Keyboard MK700", codename="MK700", protocol=1.0, wpid="2008", registers=(REG.battery_status,))
_D("Wireless Wave Keyboard K350", codename="K350", protocol=1.0, wpid="200A", registers=(REG.battery_status,))
_D("Wireless Keyboard MK320", codename="MK320", protocol=1.0, wpid="200F", registers=(REG.battery_status,))
_D(
"Wireless Illuminated Keyboard K800",
codename="K800",
protocol=1.0,
wpid="2010",
registers=(_R.battery_status, _R.three_leds),
registers=(REG.battery_status, REG.three_leds),
)
_D("Wireless Keyboard K520", codename="K520", protocol=1.0, wpid="2011", registers=(_R.battery_status,))
_D("Wireless Keyboard K520", codename="K520", protocol=1.0, wpid="2011", registers=(REG.battery_status,))
_D("Wireless Solar Keyboard K750", codename="K750", protocol=2.0, wpid="4002")
_D("Wireless Keyboard K270 (unifying)", codename="K270", protocol=2.0, wpid="4003")
_D("Wireless Keyboard K360", codename="K360", protocol=2.0, wpid="4004")
@ -234,51 +233,57 @@ _D("K845 Mechanical Keyboard", codename="K845", usbid=0xC341, interface=3)
# Mice
_D("LX5 Cordless Mouse", codename="LX5", protocol=1.0, wpid="0036", registers=(_R.battery_status,))
_D("LX7 Cordless Laser Mouse", codename="LX7", protocol=1.0, wpid="0039", registers=(_R.battery_status,))
_D("Wireless Wave Mouse M550", codename="M550", protocol=1.0, wpid="003C", registers=(_R.battery_status,))
_D("Wireless Mouse EX100", codename="EX100m", protocol=1.0, wpid="003F", registers=(_R.battery_status,))
_D("Wireless Mouse M30", codename="M30", protocol=1.0, wpid="0085", registers=(_R.battery_status,))
_D("MX610 Laser Cordless Mouse", codename="MX610", protocol=1.0, wpid="1001", registers=(_R.battery_status,))
_D("G7 Cordless Laser Mouse", codename="G7", protocol=1.0, wpid="1002", registers=(_R.battery_status,))
_D("V400 Laser Cordless Mouse", codename="V400", protocol=1.0, wpid="1003", registers=(_R.battery_status,))
_D("MX610 Left-Handled Mouse", codename="MX610L", protocol=1.0, wpid="1004", registers=(_R.battery_status,))
_D("V450 Laser Cordless Mouse", codename="V450", protocol=1.0, wpid="1005", registers=(_R.battery_status,))
_D("LX5 Cordless Mouse", codename="LX5", protocol=1.0, wpid="0036", registers=(REG.battery_status,))
_D("LX7 Cordless Laser Mouse", codename="LX7", protocol=1.0, wpid="0039", registers=(REG.battery_status,))
_D("Wireless Wave Mouse M550", codename="M550", protocol=1.0, wpid="003C", registers=(REG.battery_status,))
_D("Wireless Mouse EX100", codename="EX100m", protocol=1.0, wpid="003F", registers=(REG.battery_status,))
_D("Wireless Mouse M30", codename="M30", protocol=1.0, wpid="0085", registers=(REG.battery_status,))
_D("MX610 Laser Cordless Mouse", codename="MX610", protocol=1.0, wpid="1001", registers=(REG.battery_status,))
_D("G7 Cordless Laser Mouse", codename="G7", protocol=1.0, wpid="1002", registers=(REG.battery_status,))
_D("V400 Laser Cordless Mouse", codename="V400", protocol=1.0, wpid="1003", registers=(REG.battery_status,))
_D("MX610 Left-Handled Mouse", codename="MX610L", protocol=1.0, wpid="1004", registers=(REG.battery_status,))
_D("V450 Laser Cordless Mouse", codename="V450", protocol=1.0, wpid="1005", registers=(REG.battery_status,))
_D(
"VX Revolution",
codename="VX Revolution",
kind=_DK.mouse,
kind=DEVICE_KIND.mouse,
protocol=1.0,
wpid=("1006", "100D", "0612"),
registers=(_R.battery_charge,),
registers=(REG.battery_charge,),
)
_D("MX Air", codename="MX Air", protocol=1.0, kind=_DK.mouse, wpid=("1007", "100E"), registers=(_R.battery_charge,))
_D("MX Air", codename="MX Air", protocol=1.0, kind=DEVICE_KIND.mouse, wpid=("1007", "100E"), registers=(REG.battery_charge,))
_D(
"MX Revolution",
codename="MX Revolution",
protocol=1.0,
kind=_DK.mouse,
kind=DEVICE_KIND.mouse,
wpid=("1008", "100C"),
registers=(_R.battery_charge,),
registers=(REG.battery_charge,),
)
_D("MX620 Laser Cordless Mouse", codename="MX620", protocol=1.0, wpid=("100A", "1016"), registers=(_R.battery_charge,))
_D("VX Nano Cordless Laser Mouse", codename="VX Nano", protocol=1.0, wpid=("100B", "100F"), registers=(_R.battery_charge,))
_D("V450 Nano Cordless Laser Mouse", codename="V450 Nano", protocol=1.0, wpid="1011", registers=(_R.battery_charge,))
_D("V550 Nano Cordless Laser Mouse", codename="V550 Nano", protocol=1.0, wpid="1013", registers=(_R.battery_charge,))
_D("MX620 Laser Cordless Mouse", codename="MX620", protocol=1.0, wpid=("100A", "1016"), registers=(REG.battery_charge,))
_D("VX Nano Cordless Laser Mouse", codename="VX Nano", protocol=1.0, wpid=("100B", "100F"), registers=(REG.battery_charge,))
_D("V450 Nano Cordless Laser Mouse", codename="V450 Nano", protocol=1.0, wpid="1011", registers=(REG.battery_charge,))
_D("V550 Nano Cordless Laser Mouse", codename="V550 Nano", protocol=1.0, wpid="1013", registers=(REG.battery_charge,))
_D(
"MX 1100 Cordless Laser Mouse",
codename="MX 1100",
protocol=1.0,
kind=_DK.mouse,
kind=DEVICE_KIND.mouse,
wpid="1014",
registers=(_R.battery_charge,),
registers=(REG.battery_charge,),
)
_D("Anywhere Mouse MX", codename="Anywhere MX", protocol=1.0, wpid="1017", registers=(_R.battery_charge,))
_D("Performance Mouse MX", codename="Performance MX", protocol=1.0, wpid="101A", registers=(_R.battery_status, _R.three_leds))
_D("Marathon Mouse M705 (M-R0009)", codename="M705 (M-R0009)", protocol=1.0, wpid="101B", registers=(_R.battery_charge,))
_D("Wireless Mouse M350", codename="M350", protocol=1.0, wpid="101C", registers=(_R.battery_charge,))
_D("Wireless Mouse M505", codename="M505/B605", protocol=1.0, wpid="101D", registers=(_R.battery_charge,))
_D("Wireless Mouse M305", codename="M305", protocol=1.0, wpid="101F", registers=(_R.battery_status,))
_D("Anywhere Mouse MX", codename="Anywhere MX", protocol=1.0, wpid="1017", registers=(REG.battery_charge,))
_D(
"Performance Mouse MX",
codename="Performance MX",
protocol=1.0,
wpid="101A",
registers=(REG.battery_status, REG.three_leds),
)
_D("Marathon Mouse M705 (M-R0009)", codename="M705 (M-R0009)", protocol=1.0, wpid="101B", registers=(REG.battery_charge,))
_D("Wireless Mouse M350", codename="M350", protocol=1.0, wpid="101C", registers=(REG.battery_charge,))
_D("Wireless Mouse M505", codename="M505/B605", protocol=1.0, wpid="101D", registers=(REG.battery_charge,))
_D("Wireless Mouse M305", codename="M305", protocol=1.0, wpid="101F", registers=(REG.battery_status,))
_D("Wireless Mouse M215", codename="M215", protocol=1.0, wpid="1020")
_D(
"G700 Gaming Mouse",
@ -288,12 +293,12 @@ _D(
usbid=0xC06B,
interface=1,
registers=(
_R.battery_status,
_R.three_leds,
REG.battery_status,
REG.three_leds,
),
)
_D("Wireless Mouse M310", codename="M310", protocol=1.0, wpid="1024", registers=(_R.battery_status,))
_D("Wireless Mouse M510", codename="M510", protocol=1.0, wpid="1025", registers=(_R.battery_status,))
_D("Wireless Mouse M310", codename="M310", protocol=1.0, wpid="1024", registers=(REG.battery_status,))
_D("Wireless Mouse M510", codename="M510", protocol=1.0, wpid="1025", registers=(REG.battery_status,))
_D("Fujitsu Sonic Mouse", codename="Sonic", protocol=1.0, wpid="1029")
_D(
"G700s Gaming Mouse",
@ -303,8 +308,8 @@ _D(
usbid=0xC07C,
interface=1,
registers=(
_R.battery_status,
_R.three_leds,
REG.battery_status,
REG.three_leds,
),
)
_D("Couch Mouse M515", codename="M515", protocol=2.0, wpid="4007")
@ -348,7 +353,7 @@ _D("G502 Lightspeed Gaming Mouse", codename="G502 Lightspeed", usbid=0xC08D)
_D("MX518 Gaming Mouse", codename="MX518", usbid=0xC08E, interface=1)
_D("G703 Hero Gaming Mouse", codename="G703 Hero", usbid=0xC090)
_D("G903 Hero Gaming Mouse", codename="G903 Hero", usbid=0xC091)
_D(None, kind=_DK.mouse, usbid=0xC092, interface=1) # two mice share this ID
_D(None, kind=DEVICE_KIND.mouse, usbid=0xC092, interface=1) # two mice share this ID
_D("M500S Mouse", codename="M500S", usbid=0xC093, interface=1)
# _D('G600 Gaming Mouse', codename='G600 Gaming', usbid=0xc24a, interface=1) # not an HID++ device
_D("G500s Gaming Mouse", codename="G500s Gaming", usbid=0xC24E, interface=1, protocol=1.0)
@ -365,13 +370,15 @@ _D("Wireless Trackball M570", codename="M570")
_D("Wireless Touchpad", codename="Wireless Touch", protocol=2.0, wpid="4011")
_D("Wireless Rechargeable Touchpad T650", codename="T650", protocol=2.0, wpid="4101")
_D("G Powerplay", codename="Powerplay", protocol=2.0, kind=_DK.touchpad, wpid="405F") # To override self-identification
_D(
"G Powerplay", codename="Powerplay", protocol=2.0, kind=DEVICE_KIND.touchpad, wpid="405F"
) # To override self-identification
# Headset
_D("G533 Gaming Headset", codename="G533 Headset", protocol=2.0, interface=3, kind=_DK.headset, usbid=0x0A66)
_D("G535 Gaming Headset", codename="G535 Headset", protocol=2.0, interface=3, kind=_DK.headset, usbid=0x0AC4)
_D("G935 Gaming Headset", codename="G935 Headset", protocol=2.0, interface=3, kind=_DK.headset, usbid=0x0A87)
_D("G733 Gaming Headset", codename="G733 Headset", protocol=2.0, interface=3, kind=_DK.headset, usbid=0x0AB5)
_D("G733 Gaming Headset", codename="G733 Headset New", protocol=2.0, interface=3, kind=_DK.headset, usbid=0x0AFE)
_D("PRO X Wireless Gaming Headset", codename="PRO Headset", protocol=2.0, interface=3, kind=_DK.headset, usbid=0x0ABA)
_D("G533 Gaming Headset", codename="G533 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0A66)
_D("G535 Gaming Headset", codename="G535 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AC4)
_D("G935 Gaming Headset", codename="G935 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0A87)
_D("G733 Gaming Headset", codename="G733 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AB5)
_D("G733 Gaming Headset", codename="G733 Headset New", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AFE)
_D("PRO X Wireless Gaming Headset", codename="PRO Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0ABA)

View File

@ -14,17 +14,17 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import errno as _errno
import errno
import logging
import threading as _threading
import threading
import time
from typing import Callable
from typing import Optional
import hidapi as _hid
import solaar.configuration as _configuration
import hidapi
from solaar import configuration
from . import base
from . import descriptors
@ -34,9 +34,9 @@ from . import hidpp10_constants
from . import hidpp20
from . import hidpp20_constants
from . import settings
from . import settings_templates
from .common import Alert
from .common import Battery
from .settings_templates import check_feature_settings as _check_feature_settings
logger = logging.getLogger(__name__)
@ -59,7 +59,7 @@ class DeviceFactory:
return Device(None, None, None, handle=handle, device_info=device_info, setting_callback=setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
if e.errno == errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)
@ -70,7 +70,16 @@ class Device:
read_register: Callable = hidpp10.read_register
write_register: Callable = hidpp10.write_register
def __init__(self, receiver, number, online, pairing_info=None, handle=None, device_info=None, setting_callback=None):
def __init__(
self,
receiver,
number,
online,
pairing_info=None,
handle=None,
device_info=None,
setting_callback=None,
):
assert receiver or device_info
if receiver:
assert 0 < number <= 15 # some receivers have devices past their max # of devices
@ -110,14 +119,14 @@ class Device:
self._active = None # lags self.online - is used to help determine when to setup devices
self._feature_settings_checked = False
self._gestures_lock = _threading.Lock()
self._settings_lock = _threading.Lock()
self._persister_lock = _threading.Lock()
self._gestures_lock = threading.Lock()
self._settings_lock = threading.Lock()
self._persister_lock = threading.Lock()
self._notification_handlers = {} # See `add_notification_handler`
self.cleanups = [] # functions to run on the device when it is closed
if not self.path:
self.path = _hid.find_paired_node(receiver.path, number, 1) if receiver else None
self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None
if not self.handle:
try:
self.handle = base.open_path(self.path) if self.path else None
@ -302,9 +311,9 @@ class Device:
self._profiles = _hidpp20.get_profiles(self)
return self._profiles
def set_configuration(self, configuration, no_reply=False):
def set_configuration(self, configuration_, no_reply=False):
if self.online and self.protocol >= 2.0:
_hidpp20.config_change(self, configuration, no_reply=no_reply)
_hidpp20.config_change(self, configuration_, no_reply=no_reply)
def reset(self, no_reply=False):
self.set_configuration(0, no_reply)
@ -314,7 +323,7 @@ class Device:
if not self._persister:
with self._persister_lock:
if not self._persister:
self._persister = _configuration.persister(self)
self._persister = configuration.persister(self)
return self._persister
@property
@ -337,7 +346,7 @@ class Device:
if not self._feature_settings_checked:
with self._settings_lock:
if not self._feature_settings_checked:
self._feature_settings_checked = _check_feature_settings(self, self._settings)
self._feature_settings_checked = settings_templates.check_feature_settings(self, self._settings)
return self._settings
def battery(self): # None or level, next, status, voltage

View File

@ -14,43 +14,37 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import ctypes as _ctypes
import ctypes
import logging
import math
import numbers
import os as _os
import os.path as _path
import platform as _platform
import os
import platform
import socket
import struct
import subprocess
import sys as _sys
import time as _time
import sys
import time
from typing import Dict
from typing import Tuple
import gi
import psutil
import yaml
from keysyms import keysymdef
# There is no evdev on macOS or Windows. Diversion will not work without
# it but other Solaar functionality is available.
if _platform.system() in ("Darwin", "Windows"):
if platform.system() in ("Darwin", "Windows"):
evdev = None
else:
import evdev
from math import sqrt as _sqrt
from struct import unpack as _unpack
from yaml import add_representer as _yaml_add_representer
from yaml import dump_all as _yaml_dump_all
from yaml import safe_load_all as _yaml_safe_load_all
from .common import NamedInt
from .hidpp20 import FEATURE as _F
from .special_keys import CONTROL as _CONTROL
from .hidpp20 import FEATURE
from .special_keys import CONTROL
gi.require_version("Gdk", "3.0") # isort:skip
from gi.repository import Gdk, GLib # NOQA: E402 # isort:skip
@ -102,7 +96,7 @@ gkeymap = Gdk.Keymap.get_for_display(gdisplay) if gdisplay else None
if logger.isEnabledFor(logging.INFO):
logger.info("GDK Keymap %sset up", "" if gkeymap else "not ")
wayland = _os.getenv("WAYLAND_DISPLAY") # is this Wayland?
wayland = os.getenv("WAYLAND_DISPLAY") # is this Wayland?
if wayland:
logger.warning(
"rules cannot access modifier keys in Wayland, "
@ -137,26 +131,26 @@ thumb_wheel_displacement = 0
_dbus_interface = None
class XkbDisplay(_ctypes.Structure):
class XkbDisplay(ctypes.Structure):
"""opaque struct"""
class XkbStateRec(_ctypes.Structure):
class XkbStateRec(ctypes.Structure):
_fields_ = [
("group", _ctypes.c_ubyte),
("locked_group", _ctypes.c_ubyte),
("base_group", _ctypes.c_ushort),
("latched_group", _ctypes.c_ushort),
("mods", _ctypes.c_ubyte),
("base_mods", _ctypes.c_ubyte),
("latched_mods", _ctypes.c_ubyte),
("locked_mods", _ctypes.c_ubyte),
("compat_state", _ctypes.c_ubyte),
("grab_mods", _ctypes.c_ubyte),
("compat_grab_mods", _ctypes.c_ubyte),
("lookup_mods", _ctypes.c_ubyte),
("compat_lookup_mods", _ctypes.c_ubyte),
("ptr_buttons", _ctypes.c_ushort),
("group", ctypes.c_ubyte),
("locked_group", ctypes.c_ubyte),
("base_group", ctypes.c_ushort),
("latched_group", ctypes.c_ushort),
("mods", ctypes.c_ubyte),
("base_mods", ctypes.c_ubyte),
("latched_mods", ctypes.c_ubyte),
("locked_mods", ctypes.c_ubyte),
("compat_state", ctypes.c_ubyte),
("grab_mods", ctypes.c_ubyte),
("compat_grab_mods", ctypes.c_ubyte),
("lookup_mods", ctypes.c_ubyte),
("compat_lookup_mods", ctypes.c_ubyte),
("ptr_buttons", ctypes.c_ushort),
] # something strange is happening here but it is not being used
@ -176,7 +170,7 @@ def x11_setup():
if logger.isEnabledFor(logging.INFO):
logger.info("X11 library loaded and display set up")
except Exception:
logger.warning("X11 not available - some rule capabilities inoperable", exc_info=_sys.exc_info())
logger.warning("X11 not available - some rule capabilities inoperable", exc_info=sys.exc_info())
_x11 = False
xtest_available = False
return _x11
@ -193,7 +187,7 @@ def gnome_dbus_interface_setup():
remote_object = bus.get_object("org.gnome.Shell", "/io/github/pwr_solaar/solaar")
_dbus_interface = dbus.Interface(remote_object, "io.github.pwr_solaar.solaar")
except dbus.exceptions.DBusException:
logger.warning("Solaar Gnome extension not installed - some rule capabilities inoperable", exc_info=_sys.exc_info())
logger.warning("Solaar Gnome extension not installed - some rule capabilities inoperable", exc_info=sys.exc_info())
_dbus_interface = False
return _dbus_interface
@ -203,14 +197,14 @@ def xkb_setup():
if Xkbdisplay is not None:
return Xkbdisplay
try: # set up to get keyboard state using ctypes interface to libx11
X11Lib = _ctypes.cdll.LoadLibrary("libX11.so")
X11Lib.XOpenDisplay.restype = _ctypes.POINTER(XkbDisplay)
X11Lib.XkbGetState.argtypes = [_ctypes.POINTER(XkbDisplay), _ctypes.c_uint, _ctypes.POINTER(XkbStateRec)]
X11Lib = ctypes.cdll.LoadLibrary("libX11.so")
X11Lib.XOpenDisplay.restype = ctypes.POINTER(XkbDisplay)
X11Lib.XkbGetState.argtypes = [ctypes.POINTER(XkbDisplay), ctypes.c_uint, ctypes.POINTER(XkbStateRec)]
Xkbdisplay = X11Lib.XOpenDisplay(None)
if logger.isEnabledFor(logging.INFO):
logger.info("XKB display set up")
except Exception:
logger.warning("XKB display not available - rules cannot access keyboard group", exc_info=_sys.exc_info())
logger.warning("XKB display not available - rules cannot access keyboard group", exc_info=sys.exc_info())
Xkbdisplay = False
return Xkbdisplay
@ -262,7 +256,7 @@ if wayland: # Wayland can't use xtest so may as well set up uinput now
def kbdgroup():
if xkb_setup():
state = XkbStateRec()
X11Lib.XkbGetState(Xkbdisplay, XkbUseCoreKbd, _ctypes.pointer(state))
X11Lib.XkbGetState(Xkbdisplay, XkbUseCoreKbd, ctypes.pointer(state))
return state.group
else:
return None
@ -282,7 +276,7 @@ def signed(bytes_: bytes) -> int:
def xy_direction(_x, _y):
# normalize x and y
m = _sqrt((_x * _x) + (_y * _y))
m = math.sqrt((_x * _x) + (_y * _y))
if m == 0:
return "noop"
x = round(_x / m)
@ -419,7 +413,7 @@ def simulate_scroll(dx, dy):
def thumb_wheel_up(f, r, d, a):
global thumb_wheel_displacement
if f != _F.THUMB_WHEEL or r != 0:
if f != FEATURE.THUMB_WHEEL or r != 0:
return False
if a is None:
return signed(d[0:2]) < 0 and signed(d[0:2])
@ -432,7 +426,7 @@ def thumb_wheel_up(f, r, d, a):
def thumb_wheel_down(f, r, d, a):
global thumb_wheel_displacement
if f != _F.THUMB_WHEEL or r != 0:
if f != FEATURE.THUMB_WHEEL or r != 0:
return False
if a is None:
return signed(d[0:2]) > 0 and signed(d[0:2])
@ -445,9 +439,9 @@ def thumb_wheel_down(f, r, d, a):
def charging(f, r, d, _a):
if (
(f == _F.BATTERY_STATUS and r == 0 and 1 <= d[2] <= 4)
or (f == _F.BATTERY_VOLTAGE and r == 0 and d[2] & (1 << 7))
or (f == _F.UNIFIED_BATTERY and r == 0 and 1 <= d[2] <= 3)
(f == FEATURE.BATTERY_STATUS and r == 0 and 1 <= d[2] <= 4)
or (f == FEATURE.BATTERY_VOLTAGE and r == 0 and d[2] & (1 << 7))
or (f == FEATURE.UNIFIED_BATTERY and r == 0 and 1 <= d[2] <= 3)
):
return 1
else:
@ -455,20 +449,32 @@ def charging(f, r, d, _a):
TESTS = {
"crown_right": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[1] < 128 and d[1], False],
"crown_left": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[1] >= 128 and 256 - d[1], False],
"crown_right_ratchet": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[2] < 128 and d[2], False],
"crown_left_ratchet": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[2] >= 128 and 256 - d[2], False],
"crown_tap": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[5] == 0x01 and d[5], False],
"crown_start_press": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[6] == 0x01 and d[6], False],
"crown_end_press": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[6] == 0x05 and d[6], False],
"crown_pressed": [lambda f, r, d, a: f == _F.CROWN and r == 0 and d[6] >= 0x01 and d[6] <= 0x04 and d[6], False],
"crown_right": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[1] < 128 and d[1], False],
"crown_left": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[1] >= 128 and 256 - d[1], False],
"crown_right_ratchet": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[2] < 128 and d[2], False],
"crown_left_ratchet": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[2] >= 128 and 256 - d[2], False],
"crown_tap": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[5] == 0x01 and d[5], False],
"crown_start_press": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] == 0x01 and d[6], False],
"crown_end_press": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] == 0x05 and d[6], False],
"crown_pressed": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] >= 0x01 and d[6] <= 0x04 and d[6], False],
"thumb_wheel_up": [thumb_wheel_up, True],
"thumb_wheel_down": [thumb_wheel_down, True],
"lowres_wheel_up": [lambda f, r, d, a: f == _F.LOWRES_WHEEL and r == 0 and signed(d[0:1]) > 0 and signed(d[0:1]), False],
"lowres_wheel_down": [lambda f, r, d, a: f == _F.LOWRES_WHEEL and r == 0 and signed(d[0:1]) < 0 and signed(d[0:1]), False],
"hires_wheel_up": [lambda f, r, d, a: f == _F.HIRES_WHEEL and r == 0 and signed(d[1:3]) > 0 and signed(d[1:3]), False],
"hires_wheel_down": [lambda f, r, d, a: f == _F.HIRES_WHEEL and r == 0 and signed(d[1:3]) < 0 and signed(d[1:3]), False],
"lowres_wheel_up": [
lambda f, r, d, a: f == FEATURE.LOWRES_WHEEL and r == 0 and signed(d[0:1]) > 0 and signed(d[0:1]),
False,
],
"lowres_wheel_down": [
lambda f, r, d, a: f == FEATURE.LOWRES_WHEEL and r == 0 and signed(d[0:1]) < 0 and signed(d[0:1]),
False,
],
"hires_wheel_up": [
lambda f, r, d, a: f == FEATURE.HIRES_WHEEL and r == 0 and signed(d[1:3]) > 0 and signed(d[1:3]),
False,
],
"hires_wheel_down": [
lambda f, r, d, a: f == FEATURE.HIRES_WHEEL and r == 0 and signed(d[1:3]) < 0 and signed(d[1:3]),
False,
],
"charging": [charging, False],
"False": [lambda f, r, d, a: False, False],
"True": [lambda f, r, d, a: True, False],
@ -714,11 +720,11 @@ class MouseProcess(Condition):
class Feature(Condition):
def __init__(self, feature, warn=True):
if not (isinstance(feature, str) and feature in _F):
if not (isinstance(feature, str) and feature in FEATURE):
if warn:
logger.warning("rule Feature argument not name of a feature: %s", feature)
self.feature = None
self.feature = _F[feature]
self.feature = FEATURE[feature]
def __str__(self):
return "Feature: " + str(self.feature)
@ -857,8 +863,8 @@ class Key(Condition):
elif len(args) >= 2:
key, action = args[:2]
if isinstance(key, str) and key in _CONTROL:
self.key = _CONTROL[key]
if isinstance(key, str) and key in CONTROL:
self.key = CONTROL[key]
else:
if warn:
logger.warning(f"rule Key key name not name of a Logitech key: {key}")
@ -896,8 +902,8 @@ class KeyIsDown(Condition):
elif isinstance(args, str):
key = args
if isinstance(key, str) and key in _CONTROL:
self.key = _CONTROL[key]
if isinstance(key, str) and key in CONTROL:
self.key = CONTROL[key]
else:
if warn:
logger.warning(f"rule Key key name not name of a Logitech key: {key}")
@ -1013,7 +1019,7 @@ class MouseGesture(Condition):
if isinstance(movements, str):
movements = [movements]
for x in movements:
if x not in self.MOVEMENTS and x not in _CONTROL:
if x not in self.MOVEMENTS and x not in CONTROL:
if warn:
logger.warning("rule Mouse Gesture argument not direction or name of a Logitech key: %s", x)
self.movements = movements
@ -1024,14 +1030,14 @@ class MouseGesture(Condition):
def evaluate(self, feature, notification, device, last_result):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("evaluate condition: %s", self)
if feature == _F.MOUSE_GESTURE:
if feature == FEATURE.MOUSE_GESTURE:
d = notification.data
data = _unpack("!" + (int(len(d) / 2) * "h"), d)
data = struct.unpack("!" + (int(len(d) / 2) * "h"), d)
data_offset = 1
movement_offset = 0
if self.movements and self.movements[0] not in self.MOVEMENTS: # matching against initiating key
movement_offset = 1
if self.movements[0] != str(_CONTROL[data[0]]):
if self.movements[0] != str(CONTROL[data[0]]):
return False
for m in self.movements[movement_offset:]:
if data_offset >= len(data):
@ -1042,7 +1048,7 @@ class MouseGesture(Condition):
return False
data_offset += 3
elif data[data_offset] == 1:
if m != str(_CONTROL[data[data_offset + 1]]):
if m != str(CONTROL[data[data_offset + 1]]):
return False
data_offset += 2
return data_offset == len(data)
@ -1214,7 +1220,7 @@ class KeyPress(Action):
self.keyDown(self.key_symbols, current)
if self.action != DEPRESS:
self.keyUp(reversed(self.key_symbols), current)
_time.sleep(0.01)
time.sleep(0.01)
else:
logger.warning("no keymap so cannot determine which keycode to send")
return None
@ -1253,7 +1259,7 @@ class MouseScroll(Action):
logger.info("MouseScroll action: %s %s %s", self.amounts, last_result, amounts)
dx, dy = amounts
simulate_scroll(dx, dy)
_time.sleep(0.01)
time.sleep(0.01)
return None
def data(self):
@ -1289,7 +1295,7 @@ class MouseClick(Action):
logger.info(f"MouseClick action: {int(self.count)} {self.button}")
if self.button and self.count:
click(buttons[self.button], self.count)
_time.sleep(0.01)
time.sleep(0.01)
return None
def data(self):
@ -1438,12 +1444,12 @@ if True:
def key_is_down(key):
if key == _CONTROL.MR:
if key == CONTROL.MR:
return mr_key_down
elif _CONTROL.M1 <= key <= _CONTROL.M8:
return bool(m_keys_down & (0x01 << (key - _CONTROL.M1)))
elif _CONTROL.G1 <= key <= _CONTROL.G32:
return bool(g_keys_down & (0x01 << (key - _CONTROL.G1)))
elif CONTROL.M1 <= key <= CONTROL.M8:
return bool(m_keys_down & (0x01 << (key - CONTROL.M1)))
elif CONTROL.G1 <= key <= CONTROL.G32:
return bool(g_keys_down & (0x01 << (key - CONTROL.G1)))
else:
return key in keys_down
@ -1459,8 +1465,8 @@ def process_notification(device, notification, feature):
global keys_down, g_keys_down, m_keys_down, mr_key_down, key_down, key_up, thumb_wheel_displacement
key_down, key_up = None, None
# need to keep track of keys that are down to find a new key down
if feature == _F.REPROG_CONTROLS_V4 and notification.address == 0x00:
new_keys_down = _unpack("!4H", notification.data[:8])
if feature == FEATURE.REPROG_CONTROLS_V4 and notification.address == 0x00:
new_keys_down = struct.unpack("!4H", notification.data[:8])
for key in new_keys_down:
if key and key not in keys_down:
key_down = key
@ -1469,33 +1475,33 @@ def process_notification(device, notification, feature):
key_up = key
keys_down = new_keys_down
# and also G keys down
elif feature == _F.GKEY and notification.address == 0x00:
new_g_keys_down = _unpack("<I", notification.data[:4])[0]
elif feature == FEATURE.GKEY and notification.address == 0x00:
new_g_keys_down = struct.unpack("<I", notification.data[:4])[0]
for i in range(32):
if new_g_keys_down & (0x01 << i) and not g_keys_down & (0x01 << i):
key_down = _CONTROL["G" + str(i + 1)]
key_down = CONTROL["G" + str(i + 1)]
if g_keys_down & (0x01 << i) and not new_g_keys_down & (0x01 << i):
key_up = _CONTROL["G" + str(i + 1)]
key_up = CONTROL["G" + str(i + 1)]
g_keys_down = new_g_keys_down
# and also M keys down
elif feature == _F.MKEYS and notification.address == 0x00:
new_m_keys_down = _unpack("!1B", notification.data[:1])[0]
elif feature == FEATURE.MKEYS and notification.address == 0x00:
new_m_keys_down = struct.unpack("!1B", notification.data[:1])[0]
for i in range(1, 9):
if new_m_keys_down & (0x01 << (i - 1)) and not m_keys_down & (0x01 << (i - 1)):
key_down = _CONTROL["M" + str(i)]
key_down = CONTROL["M" + str(i)]
if m_keys_down & (0x01 << (i - 1)) and not new_m_keys_down & (0x01 << (i - 1)):
key_up = _CONTROL["M" + str(i)]
key_up = CONTROL["M" + str(i)]
m_keys_down = new_m_keys_down
# and also MR key
elif feature == _F.MR and notification.address == 0x00:
new_mr_key_down = _unpack("!1B", notification.data[:1])[0]
elif feature == FEATURE.MR and notification.address == 0x00:
new_mr_key_down = struct.unpack("!1B", notification.data[:1])[0]
if not mr_key_down and new_mr_key_down:
key_down = _CONTROL["MR"]
key_down = CONTROL["MR"]
if mr_key_down and not new_mr_key_down:
key_up = _CONTROL["MR"]
key_up = CONTROL["MR"]
mr_key_down = new_mr_key_down
# keep track of thumb wheel movment
elif feature == _F.THUMB_WHEEL and notification.address == 0x00:
elif feature == FEATURE.THUMB_WHEEL and notification.address == 0x00:
if notification.data[4] <= 0x01: # when wheel starts, zero out last movement
thumb_wheel_displacement = 0
thumb_wheel_displacement += signed(notification.data[0:2])
@ -1503,8 +1509,8 @@ def process_notification(device, notification, feature):
GLib.idle_add(evaluate_rules, feature, notification, device)
_XDG_CONFIG_HOME = _os.environ.get("XDG_CONFIG_HOME") or _path.expanduser(_path.join("~", ".config"))
_file_path = _path.join(_XDG_CONFIG_HOME, "solaar", "rules.yaml")
_XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser(os.path.join("~", ".config"))
_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "rules.yaml")
rules = built_in_rules
@ -1517,7 +1523,7 @@ def _save_config_rule_file(file_name=_file_path):
def blockseq_rep(dumper, data):
return dumper.represent_sequence("tag:yaml.org,2002:seq", data, flow_style=True)
_yaml_add_representer(inline_list, blockseq_rep)
yaml.add_representer(inline_list, blockseq_rep)
def convert(elem):
if isinstance(elem, list):
@ -1550,7 +1556,7 @@ def _save_config_rule_file(file_name=_file_path):
with open(file_name, "w") as f:
if rules_to_save:
f.write("%YAML 1.3\n") # Write version manually
_yaml_dump_all(convert([r["Rule"] for r in rules_to_save]), f, **dump_settings)
yaml.dump_all(convert([r["Rule"] for r in rules_to_save]), f, **dump_settings)
except Exception as e:
logger.error("failed to save to %s\n%s", file_name, e)
return False
@ -1561,7 +1567,7 @@ def load_config_rule_file():
"""Loads user configured rules."""
global rules
if _path.isfile(_file_path):
if os.path.isfile(_file_path):
rules = _load_rule_config(_file_path)
@ -1570,7 +1576,7 @@ def _load_rule_config(file_path: str) -> Rule:
try:
with open(file_path) as config_file:
loaded_rules = []
for loaded_rule in _yaml_safe_load_all(config_file):
for loaded_rule in yaml.safe_load_all(config_file):
rule = Rule(loaded_rule, source=file_path)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("load rule: %s", rule)

View File

@ -15,14 +15,14 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from .common import KwException as _KwException
from .common import KwException
#
# Exceptions that may be raised by this API.
#
class NoReceiver(_KwException):
class NoReceiver(KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
@ -31,25 +31,25 @@ class NoReceiver(_KwException):
pass
class NoSuchDevice(_KwException):
class NoSuchDevice(KwException):
"""Raised when trying to reach a device number not paired to the receiver."""
pass
class DeviceUnreachable(_KwException):
class DeviceUnreachable(KwException):
"""Raised when a request is made to an unreachable (turned off) device."""
pass
class FeatureNotSupported(_KwException):
class FeatureNotSupported(KwException):
"""Raised when trying to request a feature not supported by the device."""
pass
class FeatureCallError(_KwException):
class FeatureCallError(KwException):
"""Raised if the device replied to a feature call with an error."""
pass

View File

@ -21,13 +21,10 @@ from typing import Any
from typing_extensions import Protocol
from . import common
from .common import Battery
from .common import BatteryLevelApproximation
from .common import BatteryStatus
from .common import FirmwareInfo as _FirmwareInfo
from .common import bytes2int as _bytes2int
from .common import int2bytes as _int2bytes
from .common import strhex as _strhex
from .hidpp10_constants import REGISTERS
from .hidpp20_constants import FIRMWARE_KIND
@ -123,26 +120,26 @@ class Hidpp10:
# won't be able to read any of it now...
return
fw_version = _strhex(reply[1:3])
fw_version = common.strhex(reply[1:3])
fw_version = f"{fw_version[0:2]}.{fw_version[2:4]}"
reply = read_register(device, REGISTERS.firmware, 0x02)
if reply:
fw_version += ".B" + _strhex(reply[1:3])
fw = _FirmwareInfo(FIRMWARE_KIND.Firmware, "", fw_version, None)
fw_version += ".B" + common.strhex(reply[1:3])
fw = common.FirmwareInfo(FIRMWARE_KIND.Firmware, "", fw_version, None)
firmware[0] = fw
reply = read_register(device, REGISTERS.firmware, 0x04)
if reply:
bl_version = _strhex(reply[1:3])
bl_version = common.strhex(reply[1:3])
bl_version = f"{bl_version[0:2]}.{bl_version[2:4]}"
bl = _FirmwareInfo(FIRMWARE_KIND.Bootloader, "", bl_version, None)
bl = common.FirmwareInfo(FIRMWARE_KIND.Bootloader, "", bl_version, None)
firmware[1] = bl
reply = read_register(device, REGISTERS.firmware, 0x03)
if reply:
o_version = _strhex(reply[1:3])
o_version = common.strhex(reply[1:3])
o_version = f"{o_version[0:2]}.{o_version[2:4]}"
o = _FirmwareInfo(FIRMWARE_KIND.Other, "", o_version, None)
o = common.FirmwareInfo(FIRMWARE_KIND.Other, "", o_version, None)
firmware[2] = o
if any(firmware):
@ -205,7 +202,7 @@ class Hidpp10:
flag_bits = sum(int(b) for b in flag_bits)
assert flag_bits & 0x00FFFFFF == flag_bits
result = write_register(device, REGISTERS.notifications, _int2bytes(flag_bits, 3))
result = write_register(device, REGISTERS.notifications, common.int2bytes(flag_bits, 3))
return result is not None
def get_device_features(self, device: Device):
@ -224,7 +221,7 @@ class Hidpp10:
flags = read_register(device, register)
if flags is not None:
assert len(flags) == 3
return _bytes2int(flags)
return common.bytes2int(flags)
def parse_battery_status(register, reply) -> Battery | None:

View File

@ -17,33 +17,27 @@
import logging
import socket
import threading as _threading
import struct
import threading
from struct import pack as _pack
from struct import unpack as _unpack
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
import yaml as _yaml
import yaml
from solaar.i18n import _
from typing_extensions import Protocol
from . import common
from . import exceptions
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp10_constants
from . import special_keys
from .common import Battery
from .common import BatteryLevelApproximation
from .common import BatteryStatus
from .common import FirmwareInfo as _FirmwareInfo
from .common import NamedInt as _NamedInt
from .common import NamedInts as _NamedInts
from .common import UnsortedNamedInts as _UnsortedNamedInts
from .common import bytes2int as _bytes2int
from .common import crc16 as _crc16
from .common import int2bytes as _int2bytes
from .common import NamedInt
from .hidpp20_constants import CHARGE_LEVEL
from .hidpp20_constants import CHARGE_STATUS
from .hidpp20_constants import CHARGE_TYPE
@ -57,7 +51,7 @@ logger = logging.getLogger(__name__)
FixedBytes5 = bytes
KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KIND}
KIND_MAP = {kind: hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KIND}
class Device(Protocol):
@ -103,7 +97,7 @@ class FeaturesArray(dict):
return False
if self.count > 0:
return True
reply = self.device.request(0x0000, _pack("!H", FEATURE.FEATURE_SET))
reply = self.device.request(0x0000, struct.pack("!H", FEATURE.FEATURE_SET))
if reply is not None:
fs_index = reply[0]
if fs_index:
@ -120,7 +114,7 @@ class FeaturesArray(dict):
self.supported = False
return False
def get_feature(self, index: int) -> Optional[_NamedInt]:
def get_feature(self, index: int) -> Optional[NamedInt]:
feature = self.inverse.get(index)
if feature is not None:
return feature
@ -130,7 +124,7 @@ class FeaturesArray(dict):
return feature
response = self.device.feature_request(FEATURE.FEATURE_SET, 0x10, index)
if response:
feature = FEATURE[_unpack("!H", response[:2])[0]]
feature = FEATURE[struct.unpack("!H", response[:2])[0]]
self[feature] = index
self.version[feature] = response[3]
return feature
@ -141,15 +135,15 @@ class FeaturesArray(dict):
feature = self.get_feature(index)
yield feature, index
def get_feature_version(self, feature: _NamedInt) -> Optional[int]:
def get_feature_version(self, feature: NamedInt) -> Optional[int]:
if self[feature]:
return self.version.get(feature, 0)
def __contains__(self, feature: _NamedInt) -> bool:
def __contains__(self, feature: NamedInt) -> bool:
index = self.__getitem__(feature)
return index is not None and index is not False
def __getitem__(self, feature: _NamedInt) -> Optional[int]:
def __getitem__(self, feature: NamedInt) -> Optional[int]:
index = super().get(feature)
if index is not None:
return index
@ -157,7 +151,7 @@ class FeaturesArray(dict):
index = super().get(feature)
if index is not None:
return index
response = self.device.request(0x0000, _pack("!H", feature))
response = self.device.request(0x0000, struct.pack("!H", feature))
if response:
index = response[0]
self[feature] = index if index else False
@ -185,8 +179,8 @@ class ReprogrammableKey:
Ref: https://drive.google.com/file/d/0BxbRzx7vEV7eU3VfMnRuRXktZ3M/view
Read-only properties:
- index {int} -- index in the control ID table
- key {_NamedInt} -- the name of this control
- default_task {_NamedInt} -- the native function of this control
- key {NamedInt} -- the name of this control
- default_task {NamedInt} -- the native function of this control
- flags {List[str]} -- capabilities and desired software handling of the control
"""
@ -198,17 +192,17 @@ class ReprogrammableKey:
self._flags = flags
@property
def key(self) -> _NamedInt:
def key(self) -> NamedInt:
return special_keys.CONTROL[self._cid]
@property
def default_task(self) -> _NamedInt:
def default_task(self) -> NamedInt:
"""NOTE: This NamedInt is a bit mixed up, because its value is the Control ID
while the name is the Control ID's native task. But this makes more sense
than presenting details of controls vs tasks in the interface. The same
convention applies to `mapped_to`, `remappable_to`, `remap` in `ReprogrammableKeyV4`."""
task = str(special_keys.TASK[self._tid])
return _NamedInt(self._cid, task)
return NamedInt(self._cid, task)
@property
def flags(self) -> List[str]:
@ -227,8 +221,8 @@ class ReprogrammableKeyV4(ReprogrammableKey):
- group {int} -- the group this control belongs to; other controls with this group in their
`group_mask` can be remapped to this control
- group_mask {List[str]} -- this control can be remapped to any control ID in these groups
- mapped_to {_NamedInt} -- which action this control is mapped to; usually itself
- remappable_to {List[_NamedInt]} -- list of actions which this control can be remapped to
- mapped_to {NamedInt} -- which action this control is mapped to; usually itself
- remappable_to {List[NamedInt]} -- list of actions which this control can be remapped to
- mapping_flags {List[str]} -- mapping flags set on the control
"""
@ -245,24 +239,24 @@ class ReprogrammableKeyV4(ReprogrammableKey):
return special_keys.CID_GROUP_BIT.flag_names(self._gmask)
@property
def mapped_to(self) -> _NamedInt:
def mapped_to(self) -> NamedInt:
if self._mapped_to is None:
self._getCidReporting()
self._device.keys._ensure_all_keys_queried()
task = str(special_keys.TASK[self._device.keys.cid_to_tid[self._mapped_to]])
return _NamedInt(self._mapped_to, task)
return NamedInt(self._mapped_to, task)
@property
def remappable_to(self) -> _NamedInts:
def remappable_to(self) -> common.NamedInts:
self._device.keys._ensure_all_keys_queried()
ret = _UnsortedNamedInts()
ret = common.UnsortedNamedInts()
if self.group_mask != []: # only keys with a non-zero gmask are remappable
ret[self.default_task] = self.default_task # it should always be possible to map the key to itself
for g in self.group_mask:
g = special_keys.CID_GROUP[str(g)]
for tgt_cid in self._device.keys.group_cids[g]:
tgt_task = str(special_keys.TASK[self._device.keys.cid_to_tid[tgt_cid]])
tgt_task = _NamedInt(tgt_cid, tgt_task)
tgt_task = NamedInt(tgt_cid, tgt_task)
if tgt_task != self.default_task: # don't put itself in twice
ret[tgt_task] = tgt_task
return ret
@ -288,15 +282,15 @@ class ReprogrammableKeyV4(ReprogrammableKey):
flags = {special_keys.MAPPING_FLAG.raw_XY_diverted: value}
self._setCidReporting(flags=flags)
def remap(self, to: _NamedInt):
def remap(self, to: NamedInt):
"""Temporarily remaps this control to another action."""
self._setCidReporting(remap=int(to))
def _getCidReporting(self):
try:
mapped_data = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(_pack("!H", self._cid)))
mapped_data = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(struct.pack("!H", self._cid)))
if mapped_data:
cid, mapping_flags_1, mapped_to = _unpack("!HBH", mapped_data[:5])
cid, mapping_flags_1, mapped_to = struct.unpack("!HBH", mapped_data[:5])
if cid != self._cid and logger.isEnabledFor(logging.WARNING):
logger.warning(
f"REPROG_CONTROLS_V4 endpoint getCidReporting on device {self._device} replied "
@ -304,7 +298,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
)
self._mapped_to = mapped_to if mapped_to != 0 else self._cid
if len(mapped_data) > 5:
(mapping_flags_2,) = _unpack("!B", mapped_data[5:6])
(mapping_flags_2,) = struct.unpack("!B", mapped_data[5:6])
else:
mapping_flags_2 = 0
self._mapping_flags = mapping_flags_1 | (mapping_flags_2 << 8)
@ -322,7 +316,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
def _setCidReporting(self, flags=None, remap=0):
"""Sends a `setCidReporting` request with the given parameters. Raises an exception if the parameters are invalid.
Parameters:
- flags {Dict[_NamedInt,bool]} -- a dictionary of which mapping flags to set/unset
- flags {Dict[NamedInt,bool]} -- a dictionary of which mapping flags to set/unset
- remap {int} -- which control ID to remap to; or 0 to keep current mapping
"""
flags = flags if flags else {} # See flake8 B006
@ -365,11 +359,11 @@ class ReprogrammableKeyV4(ReprogrammableKey):
if remap != 0: # update mapping if changing (even if not already read)
self._mapped_to = remap
pkt = tuple(_pack("!HBH", self._cid, bfield & 0xFF, remap))
pkt = tuple(struct.pack("!HBH", self._cid, bfield & 0xFF, remap))
# TODO: to fully support version 4 of REPROG_CONTROLS_V4, append `(bfield >> 8) & 0xff` here.
# But older devices might behave oddly given that byte, so we don't send it.
ret = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x30, *pkt)
if ret is None or _unpack("!BBBBB", ret[:5]) != pkt and logger.isEnabledFor(logging.DEBUG):
if ret is None or struct.unpack("!BBBBB", ret[:5]) != pkt and logger.isEnabledFor(logging.DEBUG):
logger.debug(f"REPROG_CONTROLS_v4 setCidReporting on device {self._device} didn't echo request packet.")
@ -384,11 +378,11 @@ class PersistentRemappableAction:
self.cidStatus = cidStatus
@property
def key(self) -> _NamedInt:
def key(self) -> NamedInt:
return special_keys.CONTROL[self._cid]
@property
def actionType(self) -> _NamedInt:
def actionType(self) -> NamedInt:
return special_keys.ACTIONID[self.actionId]
@property
@ -422,16 +416,18 @@ class PersistentRemappableAction:
@property
def data_bytes(self):
return _int2bytes(self.actionId, 1) + _int2bytes(self.remapped, 2) + _int2bytes(self._modifierMask, 1)
return (
common.int2bytes(self.actionId, 1) + common.int2bytes(self.remapped, 2) + common.int2bytes(self._modifierMask, 1)
)
def remap(self, data_bytes):
cid = _int2bytes(self._cid, 2)
if _bytes2int(data_bytes) == special_keys.KEYS_Default: # map back to default
cid = common.int2bytes(self._cid, 2)
if common.bytes2int(data_bytes) == special_keys.KEYS_Default: # map back to default
self._device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x50, cid, 0xFF)
self._device.remap_keys._query_key(self.index)
return self._device.remap_keys.keys[self.index].data_bytes
else:
self.actionId, self.remapped, self._modifierMask = _unpack("!BHB", data_bytes)
self.actionId, self.remapped, self._modifierMask = struct.unpack("!BHB", data_bytes)
self.cidStatus = 0x01
self._device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x40, cid, 0xFF, data_bytes)
return True
@ -443,7 +439,7 @@ class KeysArray:
def __init__(self, device, count, version):
assert device is not None
self.device = device
self.lock = _threading.Lock()
self.lock = threading.Lock()
if FEATURE.REPROG_CONTROLS_V4 in self.device.features:
self.keyversion = FEATURE.REPROG_CONTROLS_V4
elif FEATURE.REPROG_CONTROLS_V2 in self.device.features:
@ -510,7 +506,7 @@ class KeysArrayV2(KeysArray):
raise IndexError(index)
keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS, 0x10, index)
if keydata:
cid, tid, flags = _unpack("!HHB", keydata[:5])
cid, tid, flags = struct.unpack("!HHB", keydata[:5])
self.keys[index] = ReprogrammableKey(self.device, index, cid, tid, flags)
self.cid_to_tid[cid] = tid
elif logger.isEnabledFor(logging.WARNING):
@ -526,7 +522,7 @@ class KeysArrayV4(KeysArrayV2):
raise IndexError(index)
keydata = self.device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x10, index)
if keydata:
cid, tid, flags1, pos, group, gmask, flags2 = _unpack("!HHBBBBB", keydata[:9])
cid, tid, flags1, pos, group, gmask, flags2 = struct.unpack("!HHBBBBB", keydata[:9])
flags = flags1 | (flags2 << 8)
self.keys[index] = ReprogrammableKeyV4(self.device, index, cid, tid, flags, pos, group, gmask)
self.cid_to_tid[cid] = tid
@ -547,7 +543,7 @@ class KeysArrayPersistent(KeysArray):
if self._capabilities is None and self.device.online:
capabilities = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x00)
assert capabilities, "Oops, persistent remappable key capabilities cannot be retrieved!"
self._capabilities = _unpack("!H", capabilities[:2])[0] # flags saying what the mappings are possible
self._capabilities = struct.unpack("!H", capabilities[:2])[0] # flags saying what the mappings are possible
return self._capabilities
def _query_key(self, index: int):
@ -555,10 +551,10 @@ class KeysArrayPersistent(KeysArray):
raise IndexError(index)
keydata = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF)
if keydata:
key = _unpack("!H", keydata[:2])[0]
key = struct.unpack("!H", keydata[:2])[0]
mapped_data = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key >> 8, key & 0xFF, 0xFF)
if mapped_data:
_ignore, _ignore, actionId, remapped, modifiers, status = _unpack("!HBBHBB", mapped_data[:8])
_ignore, _ignore, actionId, remapped, modifiers, status = struct.unpack("!HBBHBB", mapped_data[:8])
else:
actionId = remapped = modifiers = status = 0
actionId = special_keys.ACTIONID[actionId]
@ -578,7 +574,7 @@ class KeysArrayPersistent(KeysArray):
# Param Ids for feature GESTURE_2
PARAM = _NamedInts(
PARAM = common.NamedInts(
ExtraCapabilities=1, # not suitable for use
PixelZone=2, # 4 2-byte integers, left, bottom, width, height; pixels
RatioZone=3, # 4 bytes, left, bottom, width, height; unit 1/240 pad size
@ -622,7 +618,7 @@ SUB_PARAM = { # (byte count, minimum, maximum)
}
# Spec Ids for feature GESTURE_2
SPEC = _NamedInts(
SPEC = common.NamedInts(
DVI_field_width=1,
field_widths=2,
period_unit=3,
@ -637,7 +633,7 @@ SPEC = _NamedInts(
SPEC._fallback = lambda x: f"unknown:{x:04X}"
# Action Ids for feature GESTURE_2
ACTION_ID = _NamedInts(
ACTION_ID = common.NamedInts(
MovePointer=1,
ScrollHorizontal=2,
WheelScrolling=3,
@ -750,7 +746,7 @@ class Param:
def read(self): # returns the bytes for the parameter
result = self._device.feature_request(FEATURE.GESTURE_2, 0x70, self.index, 0xFF)
if result:
self._value = _bytes2int(result[: self.size])
self._value = common.bytes2int(result[: self.size])
return self._value
@property
@ -762,7 +758,7 @@ class Param:
def _read_default(self):
result = self._device.feature_request(FEATURE.GESTURE_2, 0x60, self.index, 0xFF)
if result:
self._default_value = _bytes2int(result[: self.size])
self._default_value = common.bytes2int(result[: self.size])
return self._default_value
def write(self, bytes):
@ -799,7 +795,7 @@ class Spec:
f"Feature Call Error reading Gesture Spec on device {self._device} for spec {self.id} - use None"
)
return None
return _bytes2int(value[: self.byte_count])
return common.bytes2int(value[: self.byte_count])
def __repr__(self):
return f"[{self.spec}={self.value}]"
@ -883,7 +879,7 @@ class Backlight:
if not response:
raise exceptions.FeatureCallError(msg="No reply from device.")
self.device = device
self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = _unpack(
self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = struct.unpack(
"<BBBHBHHH", response[:12]
)
self.auto_supported = supported & 0x08
@ -894,7 +890,7 @@ class Backlight:
def write(self):
self.options = (self.options & 0x07) | (self.mode << 3)
level = self.level if self.mode == 0x3 else 0
data_bytes = _pack("<BBBBHHH", self.enabled, self.options, 0xFF, level, self.dho, self.dhi, self.dpow)
data_bytes = struct.pack("<BBBBHHH", self.enabled, self.options, 0xFF, level, self.dho, self.dhi, self.dpow)
return self.device.feature_request(FEATURE.BACKLIGHT2, 0x10, data_bytes)
@ -908,8 +904,8 @@ class LEDParam:
saturation = "saturation"
LEDRampChoices = _NamedInts(default=0, yes=1, no=2)
LEDFormChoices = _NamedInts(default=0, sine=1, square=2, triangle=3, sawtooth=4, sharkfin=5, exponential=6)
LEDRampChoices = common.NamedInts(default=0, yes=1, no=2)
LEDFormChoices = common.NamedInts(default=0, sine=1, square=2, triangle=3, sawtooth=4, sharkfin=5, exponential=6)
LEDParamSize = {
LEDParam.color: 3,
LEDParam.speed: 1,
@ -922,18 +918,18 @@ LEDParamSize = {
# not implemented from x8070 Wave=4, Stars=5, Press=6, Audio=7
# not implemented from x8071 Custom=12, Kitt=13, HSVPulsing=20, WaveC=22, RippleC=23, SignatureActive=24, SignaturePassive=25
LEDEffects = {
0x00: [_NamedInt(0x00, _("Disabled")), {}],
0x01: [_NamedInt(0x01, _("Static")), {LEDParam.color: 0, LEDParam.ramp: 3}],
0x02: [_NamedInt(0x02, _("Pulse")), {LEDParam.color: 0, LEDParam.speed: 3}],
0x03: [_NamedInt(0x03, _("Cycle")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x08: [_NamedInt(0x08, _("Boot")), {}],
0x09: [_NamedInt(0x09, _("Demo")), {}],
0x0A: [_NamedInt(0x0A, _("Breathe")), {LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6}],
0x0B: [_NamedInt(0x0B, _("Ripple")), {LEDParam.color: 0, LEDParam.period: 4}],
0x0E: [_NamedInt(0x0E, _("Decomposition")), {LEDParam.period: 6, LEDParam.intensity: 8}],
0x0F: [_NamedInt(0x0F, _("Signature1")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x10: [_NamedInt(0x10, _("Signature2")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x15: [_NamedInt(0x15, _("CycleS")), {LEDParam.saturation: 1, LEDParam.period: 6, LEDParam.intensity: 8}],
0x00: [NamedInt(0x00, _("Disabled")), {}],
0x01: [NamedInt(0x01, _("Static")), {LEDParam.color: 0, LEDParam.ramp: 3}],
0x02: [NamedInt(0x02, _("Pulse")), {LEDParam.color: 0, LEDParam.speed: 3}],
0x03: [NamedInt(0x03, _("Cycle")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x08: [NamedInt(0x08, _("Boot")), {}],
0x09: [NamedInt(0x09, _("Demo")), {}],
0x0A: [NamedInt(0x0A, _("Breathe")), {LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6}],
0x0B: [NamedInt(0x0B, _("Ripple")), {LEDParam.color: 0, LEDParam.period: 4}],
0x0E: [NamedInt(0x0E, _("Decomposition")), {LEDParam.period: 6, LEDParam.intensity: 8}],
0x0F: [NamedInt(0x0F, _("Signature1")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x10: [NamedInt(0x10, _("Signature2")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x15: [NamedInt(0x15, _("CycleS")), {LEDParam.saturation: 1, LEDParam.period: 6, LEDParam.intensity: 8}],
}
@ -950,7 +946,7 @@ class LEDEffectSetting: # an effect plus its parameters
args = {"ID": effect[0] if effect else None}
if effect:
for p, b in effect[1].items():
args[str(p)] = _bytes2int(bytes[1 + b : 1 + b + LEDParamSize[p]])
args[str(p)] = common.bytes2int(bytes[1 + b : 1 + b + LEDParamSize[p]])
else:
args["bytes"] = bytes
return cls(**args)
@ -962,10 +958,10 @@ class LEDEffectSetting: # an effect plus its parameters
else:
bs = [0] * 10
for p, b in LEDEffects[ID][1].items():
bs[b : b + LEDParamSize[p]] = _int2bytes(getattr(self, str(p), 0), LEDParamSize[p])
bs[b : b + LEDParamSize[p]] = common.int2bytes(getattr(self, str(p), 0), LEDParamSize[p])
if options is not None:
ID = next((ze.index for ze in options if ze.ID == ID), None)
result = _int2bytes(ID, 1) + bytes(bs)
result = common.int2bytes(ID, 1) + bytes(bs)
return result
@classmethod
@ -980,23 +976,23 @@ class LEDEffectSetting: # an effect plus its parameters
return type(self) == type(other) and self.to_bytes() == other.to_bytes()
def __str__(self):
return _yaml.dump(self, width=float("inf")).rstrip("\n")
return yaml.dump(self, width=float("inf")).rstrip("\n")
_yaml.SafeLoader.add_constructor("!LEDEffectSetting", LEDEffectSetting.from_yaml)
_yaml.add_representer(LEDEffectSetting, LEDEffectSetting.to_yaml)
yaml.SafeLoader.add_constructor("!LEDEffectSetting", LEDEffectSetting.from_yaml)
yaml.add_representer(LEDEffectSetting, LEDEffectSetting.to_yaml)
class LEDEffectInfo: # an effect that a zone can do
def __init__(self, feature, function, device, zindex, eindex):
info = device.feature_request(feature, function, zindex, eindex, 0x00)
self.zindex, self.index, self.ID, self.capabilities, self.period = _unpack("!BBHHH", info[0:8])
self.zindex, self.index, self.ID, self.capabilities, self.period = struct.unpack("!BBHHH", info[0:8])
def __str__(self):
return f"LEDEffectInfo({self.zindex}, {self.index}, {self.ID}, {self.capabilities: x}, {self.period})"
LEDZoneLocations = _NamedInts()
LEDZoneLocations = common.NamedInts()
LEDZoneLocations[0x00] = _("Unknown Location")
LEDZoneLocations[0x01] = _("Primary")
LEDZoneLocations[0x02] = _("Logo")
@ -1014,7 +1010,7 @@ LEDZoneLocations[0x0B] = _("Primary 6")
class LEDZoneInfo: # effects that a zone can do
def __init__(self, feature, function, offset, effect_function, device, index):
info = device.feature_request(feature, function, index, 0xFF, 0x00)
self.location, self.count = _unpack("!HB", info[1 + offset : 4 + offset])
self.location, self.count = struct.unpack("!HB", info[1 + offset : 4 + offset])
self.index = index
self.location = LEDZoneLocations[self.location] if LEDZoneLocations[self.location] else self.location
self.effects = []
@ -1025,7 +1021,7 @@ class LEDZoneInfo: # effects that a zone can do
for i in range(0, len(self.effects)):
e = self.effects[i]
if e.ID == setting.ID:
return _int2bytes(self.index, 1) + _int2bytes(i, 1) + setting.to_bytes()[1:]
return common.int2bytes(self.index, 1) + common.int2bytes(i, 1) + setting.to_bytes()[1:]
return None
def __str__(self):
@ -1036,7 +1032,7 @@ class LEDEffectsInfo: # effects that the LEDs can do, using COLOR_LED_EFFECTS
def __init__(self, device):
self.device = device
info = device.feature_request(FEATURE.COLOR_LED_EFFECTS, 0x00)
self.count, _, capabilities = _unpack("!BHH", info[0:5])
self.count, _, capabilities = struct.unpack("!BHH", info[0:5])
self.readable = capabilities & 0x1
self.zones = []
for i in range(0, self.count):
@ -1054,16 +1050,16 @@ class RGBEffectsInfo(LEDEffectsInfo): # effects that the LEDs can do using RGB_
def __init__(self, device):
self.device = device
info = device.feature_request(FEATURE.RGB_EFFECTS, 0x00, 0xFF, 0xFF, 0x00)
_, _, self.count, _, capabilities = _unpack("!BBBHH", info[0:7])
_, _, self.count, _, capabilities = struct.unpack("!BBBHH", info[0:7])
self.readable = capabilities & 0x1
self.zones = []
for i in range(0, self.count):
self.zones.append(LEDZoneInfo(FEATURE.RGB_EFFECTS, 0x00, 1, 0x00, device, i))
ButtonBehaviors = _NamedInts(MacroExecute=0x0, MacroStop=0x1, MacroStopAll=0x2, Send=0x8, Function=0x9)
ButtonMappingTypes = _NamedInts(No_Action=0x0, Button=0x1, Modifier_And_Key=0x2, Consumer_Key=0x3)
ButtonFunctions = _NamedInts(
ButtonBehaviors = common.NamedInts(MacroExecute=0x0, MacroStop=0x1, MacroStopAll=0x2, Send=0x8, Function=0x9)
ButtonMappingTypes = common.NamedInts(No_Action=0x0, Button=0x1, Modifier_And_Key=0x2, Consumer_Key=0x3)
ButtonFunctions = common.NamedInts(
No_Action=0x0,
Tilt_Left=0x1,
Tilt_Right=0x2,
@ -1136,22 +1132,22 @@ class Button:
return result
def to_bytes(self):
bytes = _int2bytes(self.behavior << 4, 1) if self.behavior is not None else None
bytes = common.int2bytes(self.behavior << 4, 1) if self.behavior is not None else None
if self.behavior == ButtonBehaviors.MacroExecute or self.behavior == ButtonBehaviors.MacroStop:
bytes = _int2bytes((self.behavior << 12) + self.sector, 2) + _int2bytes(self.address, 2)
bytes = common.int2bytes((self.behavior << 12) + self.sector, 2) + common.int2bytes(self.address, 2)
elif self.behavior == ButtonBehaviors.Send:
bytes += _int2bytes(self.type, 1)
bytes += common.int2bytes(self.type, 1)
if self.type == ButtonMappingTypes.Button:
bytes += _int2bytes(self.value, 2)
bytes += common.int2bytes(self.value, 2)
elif self.type == ButtonMappingTypes.Modifier_And_Key:
bytes += _int2bytes(self.modifiers, 1)
bytes += _int2bytes(self.value, 1)
bytes += common.int2bytes(self.modifiers, 1)
bytes += common.int2bytes(self.value, 1)
elif self.type == ButtonMappingTypes.Consumer_Key:
bytes += _int2bytes(self.value, 2)
bytes += common.int2bytes(self.value, 2)
elif self.type == ButtonMappingTypes.No_Action:
bytes += b"\xff\xff"
elif self.behavior == ButtonBehaviors.Function:
bytes += _int2bytes(self.value, 1) + b"\xff" + (_int2bytes(self.data, 1) if self.data else b"\x00")
bytes += common.int2bytes(self.value, 1) + b"\xff" + (common.int2bytes(self.data, 1) if self.data else b"\x00")
else:
bytes = self.bytes if self.bytes else b"\xff\xff\xff\xff"
return bytes
@ -1163,8 +1159,8 @@ class Button:
)
_yaml.SafeLoader.add_constructor("!Button", Button.from_yaml)
_yaml.add_representer(Button, Button.to_yaml)
yaml.SafeLoader.add_constructor("!Button", Button.from_yaml)
yaml.add_representer(Button, Button.to_yaml)
class OnboardProfile:
@ -1191,19 +1187,19 @@ class OnboardProfile:
report_rate=bytes[0],
resolution_default_index=bytes[1],
resolution_shift_index=bytes[2],
resolutions=[_unpack("<H", bytes[i * 2 + 3 : i * 2 + 5])[0] for i in range(0, 5)],
resolutions=[struct.unpack("<H", bytes[i * 2 + 3 : i * 2 + 5])[0] for i in range(0, 5)],
red=bytes[13],
green=bytes[14],
blue=bytes[15],
power_mode=bytes[16],
angle_snap=bytes[17],
write_count=_unpack("<H", bytes[18:20])[0],
write_count=struct.unpack("<H", bytes[18:20])[0],
reserved=bytes[20:28],
ps_timeout=_unpack("<H", bytes[28:30])[0],
po_timeout=_unpack("<H", bytes[30:32])[0],
ps_timeout=struct.unpack("<H", bytes[28:30])[0],
po_timeout=struct.unpack("<H", bytes[30:32])[0],
buttons=[Button.from_bytes(bytes[32 + i * 4 : 32 + i * 4 + 4]) for i in range(0, buttons)],
gbuttons=[Button.from_bytes(bytes[96 + i * 4 : 96 + i * 4 + 4]) for i in range(0, gbuttons)],
name=bytes[160:208].decode("utf-16le").rstrip("\x00").rstrip("\uFFFF"),
name=bytes[160:208].decode("utf-16le").rstrip("\x00").rstrip("\uffff"),
lighting=[LEDEffectSetting.from_bytes(bytes[208 + i * 11 : 219 + i * 11]) for i in range(0, 4)],
)
@ -1213,11 +1209,11 @@ class OnboardProfile:
return cls.from_bytes(sector, enabled, buttons, gbuttons, bytes)
def to_bytes(self, length):
bytes = _int2bytes(self.report_rate, 1)
bytes += _int2bytes(self.resolution_default_index, 1) + _int2bytes(self.resolution_shift_index, 1)
bytes = common.int2bytes(self.report_rate, 1)
bytes += common.int2bytes(self.resolution_default_index, 1) + common.int2bytes(self.resolution_shift_index, 1)
bytes += b"".join([self.resolutions[i].to_bytes(2, "little") for i in range(0, 5)])
bytes += _int2bytes(self.red, 1) + _int2bytes(self.green, 1) + _int2bytes(self.blue, 1)
bytes += _int2bytes(self.power_mode, 1) + _int2bytes(self.angle_snap, 1)
bytes += common.int2bytes(self.red, 1) + common.int2bytes(self.green, 1) + common.int2bytes(self.blue, 1)
bytes += common.int2bytes(self.power_mode, 1) + common.int2bytes(self.angle_snap, 1)
bytes += self.write_count.to_bytes(2, "little") + self.reserved
bytes += self.ps_timeout.to_bytes(2, "little") + self.po_timeout.to_bytes(2, "little")
for i in range(0, 16):
@ -1232,7 +1228,7 @@ class OnboardProfile:
bytes += self.lighting[i].to_bytes()
while len(bytes) < length - 2:
bytes += b"\xff"
bytes += _int2bytes(_crc16(bytes), 2)
bytes += common.int2bytes(common.crc16(bytes), 2)
return bytes
def dump(self):
@ -1250,8 +1246,8 @@ class OnboardProfile:
print(" G-BUTTON", i + 1, self.gbuttons[i])
_yaml.SafeLoader.add_constructor("!OnboardProfile", OnboardProfile.from_yaml)
_yaml.add_representer(OnboardProfile, OnboardProfile.to_yaml)
yaml.SafeLoader.add_constructor("!OnboardProfile", OnboardProfile.from_yaml)
yaml.add_representer(OnboardProfile, OnboardProfile.to_yaml)
OnboardProfilesVersion = 3
@ -1279,11 +1275,11 @@ class OnboardProfiles:
headers = []
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, 0, 0, 0, i)
s = 0x00
if chunk[0:4] == b"\x00\x00\x00\x00" or chunk[0:4] == b"\xFF\xFF\xFF\xFF": # look in ROM instead
if chunk[0:4] == b"\x00\x00\x00\x00" or chunk[0:4] == b"\xff\xff\xff\xff": # look in ROM instead
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, 0x01, 0, 0, i)
s = 0x01
while chunk[0:2] != b"\xff\xff":
sector, enabled = _unpack("!HB", chunk[0:3])
sector, enabled = struct.unpack("!HB", chunk[0:3])
headers.append((sector, enabled))
i += 1
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, s, 0, 0, i * 4)
@ -1294,10 +1290,10 @@ class OnboardProfiles:
if not device.online: # wake the device up if necessary
device.ping()
response = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x00)
memory, profile, _macro = _unpack("!BBB", response[0:3])
memory, profile, _macro = struct.unpack("!BBB", response[0:3])
if memory != 0x01 or profile > 0x04:
return
count, oob, buttons, sectors, size, shift = _unpack("!BBBBHB", response[3:10])
count, oob, buttons, sectors, size, shift = struct.unpack("!BBBBHB", response[3:10])
gbuttons = buttons if (shift & 0x3 == 0x2) else 0
headers = OnboardProfiles.get_profile_headers(device)
profiles = {}
@ -1319,11 +1315,11 @@ class OnboardProfiles:
def to_bytes(self):
bytes = b""
for i in range(1, len(self.profiles) + 1):
bytes += _int2bytes(self.profiles[i].sector, 2) + _int2bytes(self.profiles[i].enabled, 1) + b"\x00"
bytes += common.int2bytes(self.profiles[i].sector, 2) + common.int2bytes(self.profiles[i].enabled, 1) + b"\x00"
bytes += b"\xff\xff\x00\x00" # marker after last profile
while len(bytes) < self.size - 2: # leave room for CRC
bytes += b"\xff"
bytes += _int2bytes(_crc16(bytes), 2)
bytes += common.int2bytes(common.crc16(bytes), 2)
return bytes
@classmethod
@ -1368,11 +1364,11 @@ class OnboardProfiles:
return written
def show(self):
print(_yaml.dump(self))
print(yaml.dump(self))
_yaml.SafeLoader.add_constructor("!OnboardProfiles", OnboardProfiles.from_yaml)
_yaml.add_representer(OnboardProfiles, OnboardProfiles.to_yaml)
yaml.SafeLoader.add_constructor("!OnboardProfiles", OnboardProfiles.from_yaml)
yaml.add_representer(OnboardProfiles, OnboardProfiles.to_yaml)
def feature_request(device, feature, function=0x00, *params, no_reply=False):
@ -1417,20 +1413,18 @@ class Hidpp20:
if fw_info:
level = ord(fw_info[:1]) & 0x0F
if level == 0 or level == 1:
name, version_major, version_minor, build = _unpack("!3sBBH", fw_info[1:8])
name, version_major, version_minor, build = struct.unpack("!3sBBH", fw_info[1:8])
version = f"{version_major:02X}.{version_minor:02X}"
if build:
version += f".B{build:04X}"
extras = fw_info[9:].rstrip(b"\x00") or None
fw_info = _FirmwareInfo(FIRMWARE_KIND[level], name.decode("ascii"), version, extras)
fw_info = common.FirmwareInfo(FIRMWARE_KIND[level], name.decode("ascii"), version, extras)
elif level == FIRMWARE_KIND.Hardware:
fw_info = _FirmwareInfo(FIRMWARE_KIND.Hardware, "", str(ord(fw_info[1:2])), None)
fw_info = common.FirmwareInfo(FIRMWARE_KIND.Hardware, "", str(ord(fw_info[1:2])), None)
else:
fw_info = _FirmwareInfo(FIRMWARE_KIND.Other, "", "", None)
fw_info = common.FirmwareInfo(FIRMWARE_KIND.Other, "", "", None)
fw.append(fw_info)
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("device %d firmware %s", devnumber, fw_info)
return tuple(fw)
def get_ids(self, device):
@ -1458,8 +1452,6 @@ class Hidpp20:
kind = device.feature_request(FEATURE.DEVICE_NAME, 0x20)
if kind:
kind = ord(kind[:1])
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind])
return KIND_MAP[DEVICE_KIND[kind]]
def get_name(self, device: Device):
@ -1581,7 +1573,7 @@ class Hidpp20:
def get_mouse_pointer_info(self, device: Device):
pointer_info = device.feature_request(FEATURE.MOUSE_POINTER)
if pointer_info:
dpi, flags = _unpack("!HB", pointer_info[:3])
dpi, flags = struct.unpack("!HB", pointer_info[:3])
acceleration = ("none", "low", "med", "high")[flags & 0x3]
suggest_os_ballistics = (flags & 0x04) != 0
suggest_vertical_orientation = (flags & 0x08) != 0
@ -1595,7 +1587,7 @@ class Hidpp20:
def get_vertical_scrolling_info(self, device: Device):
vertical_scrolling_info = device.feature_request(FEATURE.VERTICAL_SCROLLING)
if vertical_scrolling_info:
roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3])
roller, ratchet, lines = struct.unpack("!BBB", vertical_scrolling_info[:3])
roller_type = (
"reserved",
"standard",
@ -1611,13 +1603,13 @@ class Hidpp20:
def get_hi_res_scrolling_info(self, device: Device):
hi_res_scrolling_info = device.feature_request(FEATURE.HI_RES_SCROLLING)
if hi_res_scrolling_info:
mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2])
mode, resolution = struct.unpack("!BB", hi_res_scrolling_info[:2])
return mode, resolution
def get_pointer_speed_info(self, device: Device):
pointer_speed_info = device.feature_request(FEATURE.POINTER_SPEED)
if pointer_speed_info:
pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2])
pointer_speed_hi, pointer_speed_lo = struct.unpack("!BB", pointer_speed_info[:2])
# if pointer_speed_lo > 0:
# pointer_speed_lo = pointer_speed_lo
return pointer_speed_hi + pointer_speed_lo / 256
@ -1625,7 +1617,7 @@ class Hidpp20:
def get_lowres_wheel_status(self, device: Device):
lowres_wheel_status = device.feature_request(FEATURE.LOWRES_WHEEL)
if lowres_wheel_status:
wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0]
wheel_flag = struct.unpack("!B", lowres_wheel_status[:1])[0]
wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01]
return wheel_reporting
@ -1636,20 +1628,20 @@ class Hidpp20:
if caps and mode and ratchet:
# Parse caps
multi, flags = _unpack("!BB", caps[:2])
multi, flags = struct.unpack("!BB", caps[:2])
has_invert = (flags & 0x08) != 0
has_ratchet = (flags & 0x04) != 0
# Parse mode
wheel_mode, reserved = _unpack("!BB", mode[:2])
wheel_mode, reserved = struct.unpack("!BB", mode[:2])
target = (wheel_mode & 0x01) != 0
res = (wheel_mode & 0x02) != 0
inv = (wheel_mode & 0x04) != 0
# Parse Ratchet switch
ratchet_mode, reserved = _unpack("!BB", ratchet[:2])
ratchet_mode, reserved = struct.unpack("!BB", ratchet[:2])
ratchet = (ratchet_mode & 0x01) != 0
@ -1658,7 +1650,7 @@ class Hidpp20:
def get_new_fn_inversion(self, device: Device):
state = device.feature_request(FEATURE.NEW_FN_INVERSION, 0x00)
if state:
inverted, default_inverted = _unpack("!BB", state[:2])
inverted, default_inverted = struct.unpack("!BB", state[:2])
inverted = (inverted & 0x01) != 0
default_inverted = (default_inverted & 0x01) != 0
return inverted, default_inverted
@ -1667,11 +1659,11 @@ class Hidpp20:
state = device.feature_request(FEATURE.HOSTS_INFO, 0x00)
host_names = {}
if state:
capability_flags, _ignore, numHosts, currentHost = _unpack("!BBBB", state[:4])
capability_flags, _ignore, numHosts, currentHost = struct.unpack("!BBBB", state[:4])
if capability_flags & 0x01: # device can get host names
for host in range(0, numHosts):
hostinfo = device.feature_request(FEATURE.HOSTS_INFO, 0x10, host)
_ignore, status, _ignore, _ignore, nameLen, _ignore = _unpack("!BBBBBB", hostinfo[:6])
_ignore, status, _ignore, _ignore, nameLen, _ignore = struct.unpack("!BBBBBB", hostinfo[:6])
name = ""
remaining = nameLen
while remaining > 0:
@ -1696,10 +1688,10 @@ class Hidpp20:
logger.info("Setting host name to %s", name)
state = device.feature_request(FEATURE.HOSTS_INFO, 0x00)
if state:
flags, _ignore, _ignore, currentHost = _unpack("!BBBB", state[:4])
flags, _ignore, _ignore, currentHost = struct.unpack("!BBBB", state[:4])
if flags & 0x02:
hostinfo = device.feature_request(FEATURE.HOSTS_INFO, 0x10, currentHost)
_ignore, _ignore, _ignore, _ignore, _ignore, maxNameLen = _unpack("!BBBBBB", hostinfo[:6])
_ignore, _ignore, _ignore, _ignore, _ignore, maxNameLen = struct.unpack("!BBBBBB", hostinfo[:6])
if name[:maxNameLen] == currentName[:maxNameLen] and False:
return True
length = min(maxNameLen, len(name))
@ -1715,7 +1707,7 @@ class Hidpp20:
state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x20)
if state:
mode = _unpack("!B", state[:1])[0]
mode = struct.unpack("!B", state[:1])[0]
return mode
def set_onboard_mode(self, device: Device, mode):
@ -1725,19 +1717,19 @@ class Hidpp20:
def get_polling_rate(self, device: Device):
state = device.feature_request(FEATURE.REPORT_RATE, 0x10)
if state:
rate = _unpack("!B", state[:1])[0]
rate = struct.unpack("!B", state[:1])[0]
return str(rate) + "ms"
else:
rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"]
state = device.feature_request(FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20)
if state:
rate = _unpack("!B", state[:1])[0]
rate = struct.unpack("!B", state[:1])[0]
return rates[rate]
def get_remaining_pairing(self, device: Device):
result = device.feature_request(FEATURE.REMAINING_PAIRING, 0x0)
if result:
result = _unpack("!B", result[:1])[0]
result = struct.unpack("!B", result[:1])[0]
FEATURE._fallback = lambda x: f"unknown:{x:04X}"
return result
@ -1754,7 +1746,7 @@ battery_functions = {
def decipher_battery_status(report: FixedBytes5) -> Tuple[Any, Battery]:
battery_discharge_level, battery_discharge_next_level, battery_status = _unpack("!BBB", report[:3])
battery_discharge_level, battery_discharge_next_level, battery_status = struct.unpack("!BBB", report[:3])
if battery_discharge_level == 0:
battery_discharge_level = None
try:
@ -1770,7 +1762,7 @@ def decipher_battery_status(report: FixedBytes5) -> Tuple[Any, Battery]:
def decipher_battery_voltage(report):
voltage, flags = _unpack(">HB", report[:3])
voltage, flags = struct.unpack(">HB", report[:3])
status = BatteryStatus.DISCHARGING
charge_sts = ERROR.unknown
charge_lvl = CHARGE_LEVEL.average
@ -1808,7 +1800,7 @@ def decipher_battery_voltage(report):
def decipher_battery_unified(report):
discharge, level, status_byte, _ignore = _unpack("!BBBB", report[:4])
discharge, level, status_byte, _ignore = struct.unpack("!BBBB", report[:4])
try:
status = BatteryStatus(status_byte)
except ValueError:
@ -1833,7 +1825,7 @@ def decipher_battery_unified(report):
def decipher_adc_measurement(report):
# partial implementation - needs mapping to levels
adc, flags = _unpack("!HB", report[:3])
adc, flags = struct.unpack("!HB", report[:3])
for level in battery_voltage_remaining:
if level[0] < adc:
charge_level = level[1]

View File

@ -16,10 +16,10 @@
# Translation support for the Logitech receivers library
import gettext as _gettext
import gettext
_ = _gettext.gettext
ngettext = _gettext.ngettext
_ = gettext.gettext
ngettext = gettext.ngettext
# A few common strings, not always accessible as such in the code.

View File

@ -18,31 +18,31 @@
# Handles incoming events from the receiver/devices, updating the object as appropriate.
import logging
import threading as _threading
from struct import unpack as _unpack
import struct
import threading
from solaar.i18n import _
from . import diversion as _diversion
from . import base
from . import common
from . import diversion
from . import hidpp10
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp10_constants
from . import hidpp20
from . import hidpp20_constants as _hidpp20_constants
from . import settings_templates as _st
from .base import DJ_MESSAGE_ID as _DJ_MESSAGE_ID
from . import hidpp20_constants
from . import settings_templates
from .common import Alert
from .common import Battery as _Battery
from .common import BatteryStatus
from .common import strhex as _strhex
logger = logging.getLogger(__name__)
_R = _hidpp10_constants.REGISTERS
_F = _hidpp20_constants.FEATURE
_hidpp10 = hidpp10.Hidpp10()
_hidpp20 = hidpp20.Hidpp20()
_R = hidpp10_constants.REGISTERS
_F = hidpp20_constants.FEATURE
notification_lock = _threading.Lock()
notification_lock = threading.Lock()
def process(device, notification):
@ -68,7 +68,7 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.new_device = None
pair_error = ord(n.data[:1])
if pair_error:
receiver.pairing.error = error_string = _hidpp10_constants.PAIRING_ERRORS[pair_error]
receiver.pairing.error = error_string = hidpp10_constants.PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
@ -87,7 +87,7 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.device_passkey = None
discover_error = ord(n.data[:1])
if discover_error:
receiver.pairing.error = discover_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
receiver.pairing.error = discover_string = hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True
@ -126,7 +126,7 @@ def _process_receiver_notification(receiver, n):
elif n.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(n.data[7])
if pair_error:
receiver.pairing.error = error_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
receiver.pairing.error = error_string = hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
@ -157,7 +157,7 @@ def _process_device_notification(device, n):
# 0x40 to 0x7F appear to be HID++ 1.0 or DJ notifications
if n.sub_id >= 0x40:
if n.report_id == _DJ_MESSAGE_ID:
if n.report_id == base.DJ_MESSAGE_ID:
return _process_dj_notification(device, n)
else:
return _process_hidpp10_notification(device, n)
@ -239,11 +239,11 @@ def _process_hidpp10_notification(device, n):
if n.sub_id == 0x41: # device connection (and disconnection)
flags = ord(n.data[:1]) & 0xF0
if n.address == 0x02: # very old 27 MHz protocol
wpid = "00" + _strhex(n.data[2:3])
wpid = "00" + common.strhex(n.data[2:3])
link_established = True
link_encrypted = bool(flags & 0x80)
elif n.address > 0x00: # all other protocols are supposed to be almost the same
wpid = _strhex(n.data[2:3] + n.data[1:2])
wpid = common.strhex(n.data[2:3] + n.data[1:2])
link_established = not (flags & 0x40)
link_encrypted = bool(flags & 0x20) or n.address == 0x10 # Bolt protocol always encrypted
else:
@ -288,7 +288,9 @@ def _process_hidpp10_notification(device, n):
def _process_feature_notification(device, n, feature):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: notification for feature %s, report %s, data %s", device, feature, n.address >> 4, _strhex(n.data))
logger.debug(
"%s: notification for feature %s, report %s, data %s", device, feature, n.address >> 4, common.strhex(n.data)
)
if feature == _F.BATTERY_STATUS:
if n.address == 0x00:
@ -323,16 +325,16 @@ def _process_feature_notification(device, n, feature):
elif feature == _F.SOLAR_DASHBOARD:
if n.data[5:9] == b"GOOD":
charge, lux, adc = _unpack("!BHH", n.data[:5])
charge, lux, adc = struct.unpack("!BHH", n.data[:5])
# guesstimate the battery voltage, emphasis on 'guess'
# status_text = '%1.2fV' % (adc * 2.67793237653 / 0x0672)
status_text = BatteryStatus.DISCHARGING
if n.address == 0x00:
device.set_battery_info(_Battery(charge, None, status_text, None))
device.set_battery_info(common.Battery(charge, None, status_text, None))
elif n.address == 0x10:
if lux > 200:
status_text = BatteryStatus.RECHARGING
device.set_battery_info(_Battery(charge, None, status_text, None, lux))
device.set_battery_info(common.Battery(charge, None, status_text, None, lux))
elif n.address == 0x20:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: Light Check button pressed", device)
@ -382,18 +384,18 @@ def _process_feature_notification(device, n, feature):
elif feature == _F.BACKLIGHT2:
if n.address == 0x00:
level = _unpack("!B", n.data[1:2])[0]
level = struct.unpack("!B", n.data[1:2])[0]
if device.setting_callback:
device.setting_callback(device, _st.Backlight2Level, [level])
device.setting_callback(device, settings_templates.Backlight2Level, [level])
elif feature == _F.REPROG_CONTROLS_V4:
if n.address == 0x00:
if logger.isEnabledFor(logging.DEBUG):
cid1, cid2, cid3, cid4 = _unpack("!HHHH", n.data[:8])
cid1, cid2, cid3, cid4 = struct.unpack("!HHHH", n.data[:8])
logger.debug("%s: diverted controls pressed: 0x%x, 0x%x, 0x%x, 0x%x", device, cid1, cid2, cid3, cid4)
elif n.address == 0x10:
if logger.isEnabledFor(logging.DEBUG):
dx, dy = _unpack("!hh", n.data[:4])
dx, dy = struct.unpack("!hh", n.data[:4])
logger.debug("%s: rawXY dx=%i dy=%i", device, dx, dy)
elif n.address == 0x20:
if logger.isEnabledFor(logging.DEBUG):
@ -404,7 +406,7 @@ def _process_feature_notification(device, n, feature):
elif feature == _F.HIRES_WHEEL:
if n.address == 0x00:
if logger.isEnabledFor(logging.INFO):
flags, delta_v = _unpack(">bh", n.data[:3])
flags, delta_v = struct.unpack(">bh", n.data[:3])
high_res = (flags & 0x10) != 0
periods = flags & 0x0F
logger.info("%s: WHEEL: res: %d periods: %d delta V:%-3d", device, high_res, periods, delta_v)
@ -414,7 +416,7 @@ def _process_feature_notification(device, n, feature):
logger.info("%s: WHEEL: ratchet: %d", device, ratchet)
if ratchet < 2: # don't process messages with unusual ratchet values
if device.setting_callback:
device.setting_callback(device, _st.ScrollRatchet, [2 if ratchet else 1])
device.setting_callback(device, settings_templates.ScrollRatchet, [2 if ratchet else 1])
else:
if logger.isEnabledFor(logging.INFO):
logger.info("%s: unknown WHEEL %s", device, n)
@ -425,16 +427,18 @@ def _process_feature_notification(device, n, feature):
logger.info("%s: unknown ONBOARD PROFILES %s", device, n)
else:
if n.address == 0x00:
profile_sector = _unpack("!H", n.data[:2])[0]
profile_sector = struct.unpack("!H", n.data[:2])[0]
if profile_sector:
_st.profile_change(device, profile_sector)
settings_templates.profile_change(device, profile_sector)
elif n.address == 0x10:
resolution_index = _unpack("!B", n.data[:1])[0]
profile_sector = _unpack("!H", device.feature_request(_F.ONBOARD_PROFILES, 0x40)[:2])[0]
resolution_index = struct.unpack("!B", n.data[:1])[0]
profile_sector = struct.unpack("!H", device.feature_request(_F.ONBOARD_PROFILES, 0x40)[:2])[0]
if device.setting_callback:
for profile in device.profiles.profiles.values() if device.profiles else []:
if profile.sector == profile_sector:
device.setting_callback(device, _st.AdjustableDpi, [profile.resolutions[resolution_index]])
device.setting_callback(
device, settings_templates.AdjustableDpi, [profile.resolutions[resolution_index]]
)
break
elif feature == _F.BRIGHTNESS_CONTROL:
@ -443,13 +447,13 @@ def _process_feature_notification(device, n, feature):
logger.info("%s: unknown BRIGHTNESS CONTROL %s", device, n)
else:
if n.address == 0x00:
brightness = _unpack("!H", n.data[:2])[0]
device.setting_callback(device, _st.BrightnessControl, [brightness])
brightness = struct.unpack("!H", n.data[:2])[0]
device.setting_callback(device, settings_templates.BrightnessControl, [brightness])
elif n.address == 0x10:
brightness = n.data[0] & 0x01
if brightness:
brightness = _unpack("!H", device.feature_request(_F.BRIGHTNESS_CONTROL, 0x10)[:2])[0]
device.setting_callback(device, _st.BrightnessControl, [brightness])
brightness = struct.unpack("!H", device.feature_request(_F.BRIGHTNESS_CONTROL, 0x10)[:2])[0]
device.setting_callback(device, settings_templates.BrightnessControl, [brightness])
_diversion.process_notification(device, n, feature)
diversion.process_notification(device, n, feature)
return True

View File

@ -15,7 +15,7 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import errno as _errno
import errno
import logging
import time
@ -23,12 +23,12 @@ from dataclasses import dataclass
from typing import Callable
from typing import Optional
import hidapi as _hid
import hidapi
from solaar.i18n import _
from solaar.i18n import ngettext
from . import base as _base
from . import base
from . import exceptions
from . import hidpp10
from . import hidpp10_constants
@ -108,7 +108,7 @@ class Receiver:
if d:
d.close()
self._devices.clear()
return handle and _base.close(handle)
return handle and base.close(handle)
def __del__(self):
self.close()
@ -253,7 +253,7 @@ class Receiver:
def request(self, request_id, *params):
if bool(self):
return _base.request(self.handle, 0xFF, request_id, *params)
return base.request(self.handle, 0xFF, request_id, *params)
def reset_pairing(self):
self.pairing = Pairing()
@ -451,7 +451,7 @@ class Ex100Receiver(Receiver):
return online, encrypted, wpid, kind
def device_pairing_information(self, number: int) -> dict:
wpid = _hid.find_paired_node_wpid(self.path, number) # extract WPID from udev path
wpid = hidapi.find_paired_node_wpid(self.path, number) # extract WPID from udev path
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
@ -491,9 +491,9 @@ class ReceiverFactory:
"""Opens a Logitech Receiver found attached to the machine, by Linux device path."""
try:
handle = _base.open_path(device_info.path)
handle = base.open_path(device_info.path)
if handle:
product_info = _base.product_information(device_info.product_id)
product_info = base.product_information(device_info.product_id)
if not product_info:
logger.warning("Unknown receiver type: %s", device_info.product_id)
product_info = {}
@ -502,7 +502,7 @@ class ReceiverFactory:
return rclass(kind, product_info, handle, device_info.path, device_info.product_id, setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
if e.errno == errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)

View File

@ -16,26 +16,21 @@
import logging
import math
from struct import unpack as _unpack
from time import sleep as _sleep
import struct
import time
from solaar.i18n import _
from . import hidpp20_constants as _hidpp20_constants
from .common import NamedInt as _NamedInt
from .common import NamedInts as _NamedInts
from .common import bytes2int as _bytes2int
from .common import int2bytes as _int2bytes
from . import common
from . import hidpp20_constants
from .common import NamedInt
from .common import NamedInts
logger = logging.getLogger(__name__)
#
#
#
SENSITIVITY_IGNORE = "ignore"
KIND = _NamedInts(
KIND = NamedInts(
toggle=0x01,
choice=0x02,
range=0x04,
@ -613,7 +608,7 @@ class RangeFieldSetting(Setting):
class RegisterRW:
__slots__ = ("register",)
kind = _NamedInt(0x01, _("register"))
kind = NamedInt(0x01, _("register"))
def __init__(self, register):
assert isinstance(register, int)
@ -627,12 +622,12 @@ class RegisterRW:
class FeatureRW:
kind = _NamedInt(0x02, _("feature"))
kind = NamedInt(0x02, _("feature"))
default_read_fnid = 0x00
default_write_fnid = 0x10
def __init__(self, feature, read_fnid=0x00, write_fnid=0x10, prefix=b"", suffix=b"", read_prefix=b"", no_reply=False):
assert isinstance(feature, _NamedInt)
assert isinstance(feature, NamedInt)
self.feature = feature
self.read_fnid = read_fnid
self.write_fnid = write_fnid
@ -653,7 +648,7 @@ class FeatureRW:
class FeatureRWMap(FeatureRW):
kind = _NamedInt(0x02, _("feature"))
kind = NamedInt(0x02, _("feature"))
default_read_fnid = 0x00
default_write_fnid = 0x10
default_key_byte_count = 1
@ -666,7 +661,7 @@ class FeatureRWMap(FeatureRW):
key_byte_count=default_key_byte_count,
no_reply=False,
):
assert isinstance(feature, _NamedInt)
assert isinstance(feature, NamedInt)
self.feature = feature
self.read_fnid = read_fnid
self.write_fnid = write_fnid
@ -675,12 +670,12 @@ class FeatureRWMap(FeatureRW):
def read(self, device, key):
assert self.feature is not None
key_bytes = _int2bytes(key, self.key_byte_count)
key_bytes = common.int2bytes(key, self.key_byte_count)
return device.feature_request(self.feature, self.read_fnid, key_bytes)
def write(self, device, key, data_bytes):
assert self.feature is not None
key_bytes = _int2bytes(key, self.key_byte_count)
key_bytes = common.int2bytes(key, self.key_byte_count)
reply = device.feature_request(self.feature, self.write_fnid, key_bytes, data_bytes, no_reply=self.no_reply)
return reply if not self.no_reply else True
@ -733,13 +728,13 @@ class BooleanValidator(Validator):
else:
assert isinstance(false_value, bytes)
if mask is None or mask == self.default_mask:
mask = b"\xFF" * len(true_value)
mask = b"\xff" * len(true_value)
else:
assert isinstance(mask, bytes)
assert len(mask) == len(true_value) == len(false_value)
tv = _bytes2int(true_value)
fv = _bytes2int(false_value)
mv = _bytes2int(mask)
tv = common.bytes2int(true_value)
fv = common.bytes2int(false_value)
mv = common.bytes2int(mask)
assert tv != fv # true and false might be something other than bit values
assert tv & mv == tv
assert fv & mv == fv
@ -773,14 +768,14 @@ class BooleanValidator(Validator):
return False
count = len(self.mask)
mask = _bytes2int(self.mask)
reply_value = _bytes2int(reply_bytes[:count]) & mask
mask = common.bytes2int(self.mask)
reply_value = common.bytes2int(reply_bytes[:count]) & mask
true_value = _bytes2int(self.true_value)
true_value = common.bytes2int(self.true_value)
if reply_value == true_value:
return True
false_value = _bytes2int(self.false_value)
false_value = common.bytes2int(self.false_value)
if reply_value == false_value:
return False
@ -852,7 +847,7 @@ class BitFieldValidator(Validator):
return "{" + ", ".join([element_to_string(k, value[k]) for k in value]) + "}"
def validate_read(self, reply_bytes):
r = _bytes2int(reply_bytes[: self.byte_count])
r = common.bytes2int(reply_bytes[: self.byte_count])
value = {int(k): False for k in self.options}
m = 1
for _ignore in range(8 * self.byte_count):
@ -867,7 +862,7 @@ class BitFieldValidator(Validator):
for k, v in new_value.items():
if v:
w |= int(k)
return _int2bytes(w, self.byte_count)
return common.int2bytes(w, self.byte_count)
def get_options(self):
return self.options
@ -931,7 +926,7 @@ class BitFieldWithOffsetAndMaskValidator(Validator):
for offset, mask in self._mask_from_offset.items():
b = offset << (8 * (self.byte_count + 1))
b |= (self.sep << (8 * self.byte_count)) | mask
r.append(_int2bytes(b, self.byte_count + 2))
r.append(common.int2bytes(b, self.byte_count + 2))
return r
def prepare_read_key(self, key):
@ -941,14 +936,14 @@ class BitFieldWithOffsetAndMaskValidator(Validator):
offset, mask = option.om_method(option)
b = offset << (8 * (self.byte_count + 1))
b |= (self.sep << (8 * self.byte_count)) | mask
return _int2bytes(b, self.byte_count + 2)
return common.int2bytes(b, self.byte_count + 2)
def validate_read(self, reply_bytes_dict):
values = {int(k): False for k in self.options}
for query, b in reply_bytes_dict.items():
offset = _bytes2int(query[0:1])
offset = common.bytes2int(query[0:1])
b += (self.byte_count - len(b)) * b"\x00"
value = _bytes2int(b[: self.byte_count])
value = common.bytes2int(b[: self.byte_count])
mask_to_opt = self._option_from_offset_mask.get(offset, {})
m = 1
for _ignore in range(8 * self.byte_count):
@ -968,7 +963,7 @@ class BitFieldWithOffsetAndMaskValidator(Validator):
if v:
w[offset] |= mask
return [
_int2bytes(
common.int2bytes(
(offset << (8 * (2 * self.byte_count + 1)))
| (self.sep << (16 * self.byte_count))
| (self._mask_from_offset[offset] << (8 * self.byte_count))
@ -1009,7 +1004,7 @@ class ChoicesValidator(Validator):
def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b""):
assert choices is not None
assert isinstance(choices, _NamedInts)
assert isinstance(choices, NamedInts)
assert len(choices) > 1
self.choices = choices
self.needs_current_value = False
@ -1029,7 +1024,7 @@ class ChoicesValidator(Validator):
return str(self.choices[value]) if isinstance(value, int) else str(value)
def validate_read(self, reply_bytes):
reply_value = _bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count])
reply_value = common.bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count])
valid_value = self.choices[reply_value]
assert valid_value is not None, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
return valid_value
@ -1041,7 +1036,7 @@ class ChoicesValidator(Validator):
value = self.choice(new_value)
if value is None:
raise ValueError(f"invalid choice {new_value!r}")
assert isinstance(value, _NamedInt)
assert isinstance(value, NamedInt)
return self._write_prefix_bytes + value.bytes(self._byte_count)
def choice(self, value):
@ -1083,11 +1078,11 @@ class ChoicesMapValidator(ChoicesValidator):
max_key_bits = 0
max_value_bits = 0
for key, choices in choices_map.items():
assert isinstance(key, _NamedInt)
assert isinstance(choices, _NamedInts)
assert isinstance(key, NamedInt)
assert isinstance(choices, NamedInts)
max_key_bits = max(max_key_bits, key.bit_length())
for key_value in choices:
assert isinstance(key_value, _NamedInt)
assert isinstance(key_value, NamedInt)
max_value_bits = max(max_value_bits, key_value.bit_length())
self._key_byte_count = (max_key_bits + 7) // 8
if key_byte_count:
@ -1119,7 +1114,7 @@ class ChoicesMapValidator(ChoicesValidator):
def validate_read(self, reply_bytes, key):
start = self._key_byte_count + self._read_skip_byte_count
end = start + self._byte_count
reply_value = _bytes2int(reply_bytes[start:end]) & self.mask
reply_value = common.bytes2int(reply_bytes[start:end]) & self.mask
# reprogrammable keys starts out as 0, which is not a choice, so don't use assert here
if self.extra_default is not None and self.extra_default == reply_value:
return int(self.choices[key][0])
@ -1188,7 +1183,7 @@ class RangeValidator(Validator):
assert self._byte_count < 8
def validate_read(self, reply_bytes):
reply_value = _bytes2int(reply_bytes[: self._byte_count])
reply_value = common.bytes2int(reply_bytes[: self._byte_count])
assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
return reply_value
@ -1197,7 +1192,7 @@ class RangeValidator(Validator):
if new_value < self.min_value or new_value > self.max_value:
raise ValueError(f"invalid choice {new_value!r}")
current_value = self.validate_read(current_value) if current_value is not None else None
to_write = _int2bytes(new_value, self._byte_count)
to_write = common.int2bytes(new_value, self._byte_count)
# current value is known and same as value to be written return None to signal not to write it
return None if current_value is not None and current_value == new_value else to_write
@ -1270,7 +1265,7 @@ class PackedRangeValidator(Validator):
def validate_read(self, reply_bytes):
rvs = {
n: _bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True)
n: common.bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True)
for n in range(self.count)
}
for n in range(self.count):
@ -1284,7 +1279,9 @@ class PackedRangeValidator(Validator):
for new_value in new_values.values():
if new_value < self.min_value or new_value > self.max_value:
raise ValueError(f"invalid value {new_value!r}")
bytes = self.write_prefix_bytes + b"".join(_int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count))
bytes = self.write_prefix_bytes + b"".join(
common.int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count)
)
return bytes
def acceptable(self, args, current):
@ -1305,12 +1302,12 @@ class MultipleRangeValidator(Validator):
assert isinstance(sub_items, dict)
# sub_items: items -> class with .minimum, .maximum, .length (in bytes), .id (a string) and .widget (e.g. 'Scale')
self.items = items
self.keys = _NamedInts(**{str(item): int(item) for item in items})
self.keys = NamedInts(**{str(item): int(item) for item in items})
self._item_from_id = {int(k): k for k in items}
self.sub_items = sub_items
def prepare_read_item(self, item):
return _int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2)
return common.int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2)
def validate_read_item(self, reply_bytes, item):
item = self._item_from_id[int(item)]
@ -1320,7 +1317,7 @@ class MultipleRangeValidator(Validator):
r = reply_bytes[start : start + sub_item.length]
if len(r) < sub_item.length:
r += b"\x00" * (sub_item.length - len(value))
v = _bytes2int(r)
v = common.bytes2int(r)
if not (sub_item.minimum < v < sub_item.maximum):
logger.warning(
f"{self.__class__.__name__}: failed to validate read value for {item}.{sub_item}: "
@ -1335,7 +1332,7 @@ class MultipleRangeValidator(Validator):
w = b""
for item in value.keys():
_item = self._item_from_id[int(item)]
b = _int2bytes(_item.index, 1)
b = common.int2bytes(_item.index, 1)
for sub_item in self.sub_items[_item]:
try:
v = value[int(item)][str(sub_item)]
@ -1345,17 +1342,17 @@ class MultipleRangeValidator(Validator):
raise ValueError(
f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]"
)
b += _int2bytes(v, sub_item.length)
b += common.int2bytes(v, sub_item.length)
if len(w) + len(b) > 15:
seq.append(b + b"\xFF")
seq.append(b + b"\xff")
w = b""
w += b
seq.append(w + b"\xFF")
seq.append(w + b"\xff")
return seq
def prepare_write_item(self, item, value):
_item = self._item_from_id[int(item)]
w = _int2bytes(_item.index, 1)
w = common.int2bytes(_item.index, 1)
for sub_item in self.sub_items[_item]:
try:
v = value[str(sub_item)]
@ -1363,8 +1360,8 @@ class MultipleRangeValidator(Validator):
return None
if not (sub_item.minimum <= v <= sub_item.maximum):
raise ValueError(f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]")
w += _int2bytes(v, sub_item.length)
return w + b"\xFF"
w += common.int2bytes(v, sub_item.length)
return w + b"\xff"
def acceptable(self, args, current):
# just one item, with at least one sub-item
@ -1418,13 +1415,13 @@ class ActionSettingRW:
pass
def read(self, device): # need to return bytes, as if read from device
return _int2bytes(self.key.key, 2) if self.active and self.key else b"\x00\x00"
return common.int2bytes(self.key.key, 2) if self.active and self.key else b"\x00\x00"
def write(self, device, data_bytes):
def handler(device, n): # Called on notification events from the device
if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == _hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
if n.address == 0x00:
cids = _unpack("!HHHH", n.data[:8])
cids = struct.unpack("!HHHH", n.data[:8])
if not self.pressed and int(self.key.key) in cids: # trigger key pressed
self.pressed = True
self.press_action()
@ -1438,7 +1435,7 @@ class ActionSettingRW:
self.key_action(key)
elif n.address == 0x10:
if self.pressed:
dx, dy = _unpack("!hh", n.data[:4])
dx, dy = struct.unpack("!hh", n.data[:4])
self.move_action(dx, dy)
divertSetting = next(filter(lambda s: s.name == self.divert_setting_name, device.settings), None)
@ -1446,7 +1443,7 @@ class ActionSettingRW:
logger.warning("setting %s not found on %s", self.divert_setting_name, device.name)
return None
self.device = device
key = _bytes2int(data_bytes)
key = common.bytes2int(data_bytes)
if key: # Enable
self.key = next((k for k in device.keys if k.key == key), None)
if self.key:
@ -1484,13 +1481,13 @@ class RawXYProcessing:
self.keys = [] # the keys that can initiate processing
self.initiating_key = None # the key that did initiate processing
self.active = False
self.feature_offset = device.features[_hidpp20_constants.FEATURE.REPROG_CONTROLS_V4]
self.feature_offset = device.features[hidpp20_constants.FEATURE.REPROG_CONTROLS_V4]
assert self.feature_offset is not False
def handler(self, device, n): # Called on notification events from the device
if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == _hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
if n.address == 0x00:
cids = _unpack("!HHHH", n.data[:8])
cids = struct.unpack("!HHHH", n.data[:8])
## generalize to list of keys
if not self.initiating_key: # no initiating key pressed
for k in self.keys:
@ -1508,7 +1505,7 @@ class RawXYProcessing:
self.key_action(key)
elif n.address == 0x10:
if self.initiating_key:
dx, dy = _unpack("!hh", n.data[:4])
dx, dy = struct.unpack("!hh", n.data[:4])
self.move_action(dx, dy)
def start(self, key):
@ -1556,8 +1553,8 @@ class RawXYProcessing:
def apply_all_settings(device):
if device.features and _hidpp20_constants.FEATURE.HIRES_WHEEL in device.features:
_sleep(0.2) # delay to try to get out of race condition with Linux HID++ driver
if device.features and hidpp20_constants.FEATURE.HIRES_WHEEL in device.features:
time.sleep(0.2) # delay to try to get out of race condition with Linux HID++ driver
persister = getattr(device, "persister", None)
sensitives = persister.get("_sensitive", {}) if persister else {}
for s in device.settings:

File diff suppressed because it is too large Load Diff

View File

@ -15,20 +15,20 @@
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# Reprogrammable keys information
# Mostly from Logitech documentation, but with some edits for better Lunix compatibility
# Mostly from Logitech documentation, but with some edits for better Linux compatibility
import os as _os
import os
import yaml as _yaml
import yaml
from .common import NamedInts as _NamedInts
from .common import UnsortedNamedInts as _UnsortedNamedInts
from .common import NamedInts
from .common import UnsortedNamedInts
_XDG_CONFIG_HOME = _os.environ.get("XDG_CONFIG_HOME") or _os.path.expanduser(_os.path.join("~", ".config"))
_keys_file_path = _os.path.join(_XDG_CONFIG_HOME, "solaar", "keys.yaml")
_XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser(os.path.join("~", ".config"))
_keys_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "keys.yaml")
# <controls.xml awk -F\" '/<Control /{sub(/^LD_FINFO_(CTRLID_)?/, "", $2);printf("\t%s=0x%04X,\n", $2, $4)}' | sort -t= -k2
CONTROL = _NamedInts(
CONTROL = NamedInts(
{
"Volume_Up": 0x0001,
"Volume_Down": 0x0002,
@ -322,7 +322,7 @@ CONTROL[0x1200] = "MR" # add in MR key - this is not really a Logitech Control
CONTROL._fallback = lambda x: f"unknown:{x:04X}"
# <tasks.xml awk -F\" '/<Task /{gsub(/ /, "_", $6); printf("\t%s=0x%04X,\n", $6, $4)}'
TASK = _NamedInts(
TASK = NamedInts(
Volume_Up=0x0001,
Volume_Down=0x0002,
Mute=0x0003,
@ -573,7 +573,7 @@ TASK._fallback = lambda x: f"unknown:{x:04X}"
# Capabilities and desired software handling for a control
# Ref: https://drive.google.com/file/d/10imcbmoxTJ1N510poGdsviEhoFfB_Ua4/view
# We treat bytes 4 and 8 of `getCidInfo` as a single bitfield
KEY_FLAG = _NamedInts(
KEY_FLAG = NamedInts(
analytics_key_events=0x400,
force_raw_XY=0x200,
raw_XY=0x100,
@ -588,16 +588,16 @@ KEY_FLAG = _NamedInts(
)
# Flags describing the reporting method of a control
# We treat bytes 2 and 5 of `get/setCidReporting` as a single bitfield
MAPPING_FLAG = _NamedInts(
MAPPING_FLAG = NamedInts(
analytics_key_events_reporting=0x100,
force_raw_XY_diverted=0x40,
raw_XY_diverted=0x10,
persistently_diverted=0x04,
diverted=0x01,
)
CID_GROUP_BIT = _NamedInts(g8=0x80, g7=0x40, g6=0x20, g5=0x10, g4=0x08, g3=0x04, g2=0x02, g1=0x01)
CID_GROUP = _NamedInts(g8=8, g7=7, g6=6, g5=5, g4=4, g3=3, g2=2, g1=1)
DISABLE = _NamedInts(
CID_GROUP_BIT = NamedInts(g8=0x80, g7=0x40, g6=0x20, g5=0x10, g4=0x08, g3=0x04, g2=0x02, g1=0x01)
CID_GROUP = NamedInts(g8=8, g7=7, g6=6, g5=5, g4=4, g3=3, g2=2, g1=1)
DISABLE = NamedInts(
Caps_Lock=0x01,
Num_Lock=0x02,
Scroll_Lock=0x04,
@ -608,7 +608,7 @@ DISABLE._fallback = lambda x: f"unknown:{x:02X}"
# HID USB Keycodes from https://www.usb.org/sites/default/files/documents/hut1_12v2.pdf
# Modified by information from Linux HID driver linux/drivers/hid/hid-input.c
USB_HID_KEYCODES = _NamedInts(
USB_HID_KEYCODES = NamedInts(
A=0x04,
B=0x05,
C=0x06,
@ -780,7 +780,7 @@ USB_HID_KEYCODES[0x26] = "9"
USB_HID_KEYCODES[0x27] = "0"
USB_HID_KEYCODES[0x64] = "102ND"
HID_CONSUMERCODES = _NamedInts(
HID_CONSUMERCODES = NamedInts(
{
# Unassigned=0x00,
# Consumer_Control=0x01,
@ -1167,9 +1167,9 @@ HID_CONSUMERCODES._fallback = lambda x: f"unknown:{x:04X}"
## Information for x1c00 Persistent from https://drive.google.com/drive/folders/0BxbRzx7vEV7eWmgwazJ3NUFfQ28
KEYMOD = _NamedInts(CTRL=0x01, SHIFT=0x02, ALT=0x04, META=0x08, RCTRL=0x10, RSHIFT=0x20, RALT=0x40, RMETA=0x80)
KEYMOD = NamedInts(CTRL=0x01, SHIFT=0x02, ALT=0x04, META=0x08, RCTRL=0x10, RSHIFT=0x20, RALT=0x40, RMETA=0x80)
ACTIONID = _NamedInts(
ACTIONID = NamedInts(
Empty=0x00,
Key=0x01,
Mouse=0x02,
@ -1182,7 +1182,7 @@ ACTIONID = _NamedInts(
Power=0x09,
)
MOUSE_BUTTONS = _NamedInts(
MOUSE_BUTTONS = NamedInts(
Mouse_Button_Left=0x0001,
Mouse_Button_Right=0x0002,
Mouse_Button_Middle=0x0004,
@ -1202,14 +1202,14 @@ MOUSE_BUTTONS = _NamedInts(
)
MOUSE_BUTTONS._fallback = lambda x: f"unknown mouse button:{x:04X}"
HORIZONTAL_SCROLL = _NamedInts(
HORIZONTAL_SCROLL = NamedInts(
Horizontal_Scroll_Left=0x4000,
Horizontal_Scroll_Right=0x8000,
)
HORIZONTAL_SCROLL._fallback = lambda x: f"unknown horizontal scroll:{x:04X}"
# Construct universe for Persistent Remappable Keys setting (only for supported values)
KEYS = _UnsortedNamedInts()
KEYS = UnsortedNamedInts()
KEYS_Default = 0x7FFFFFFF # Special value to reset key to default - has to be different from all others
KEYS[KEYS_Default] = "Default" # Value to reset to default
KEYS[0] = "None" # Value for no output
@ -1247,7 +1247,7 @@ for code in HORIZONTAL_SCROLL:
# Construct subsets for known devices
def persistent_keys(action_ids):
keys = _UnsortedNamedInts()
keys = UnsortedNamedInts()
keys[KEYS_Default] = "Default" # Value to reset to default
keys[0] = "No Output (only as default)"
for key in KEYS:
@ -1259,7 +1259,7 @@ def persistent_keys(action_ids):
KEYS_KEYS_CONSUMER = persistent_keys([ACTIONID.Key, ACTIONID.Consumer])
KEYS_KEYS_MOUSE_HSCROLL = persistent_keys([ACTIONID.Key, ACTIONID.Mouse, ACTIONID.Hscroll])
COLORS = _UnsortedNamedInts(
COLORS = UnsortedNamedInts(
{
# from Xorg rgb.txt,v 1.3 2000/08/17
"red": 0xFF0000,
@ -1400,11 +1400,11 @@ COLORS = _UnsortedNamedInts(
}
)
COLORSPLUS = _UnsortedNamedInts({"No change": -1})
COLORSPLUS = UnsortedNamedInts({"No change": -1})
for i in COLORS:
COLORSPLUS[int(i)] = str(i)
KEYCODES = _NamedInts(
KEYCODES = NamedInts(
{
"A": 1,
"B": 2,
@ -1529,11 +1529,11 @@ KEYCODES = _NamedInts(
# load in override dictionary for KEYCODES
try:
if _os.path.isfile(_keys_file_path):
if os.path.isfile(_keys_file_path):
with open(_keys_file_path) as keys_file:
keys = _yaml.safe_load(keys_file)
keys = yaml.safe_load(keys_file)
if isinstance(keys, dict):
keys = _NamedInts(**keys)
keys = NamedInts(**keys)
for k in KEYCODES:
if int(k) not in keys and str(k) not in keys:
keys[int(k)] = str(k)

View File

@ -20,9 +20,9 @@ import logging
import os as _os
import threading
import yaml as _yaml
import yaml
from logitech_receiver.common import NamedInt as _NamedInt
from logitech_receiver.common import NamedInt
from solaar import __version__
@ -49,7 +49,7 @@ def _load():
path = _yaml_file_path
try:
with open(_yaml_file_path) as config_file:
loaded_config = _yaml.safe_load(config_file)
loaded_config = yaml.safe_load(config_file)
except Exception as e:
logger.error("failed to load from %s: %s", _yaml_file_path, e)
elif _os.path.isfile(_json_file_path):
@ -153,7 +153,7 @@ def do_save():
save_timer = None
try:
with open(_yaml_file_path, "w") as config_file:
_yaml.dump(_config, config_file, default_flow_style=None, width=150)
yaml.dump(_config, config_file, default_flow_style=None, width=150)
if logger.isEnabledFor(logging.INFO):
logger.info("saved %s to %s", _config, _yaml_file_path)
except Exception as e:
@ -216,14 +216,14 @@ def device_representer(dumper, data):
return dumper.represent_mapping("tag:yaml.org,2002:map", data)
_yaml.add_representer(_DeviceEntry, device_representer)
yaml.add_representer(_DeviceEntry, device_representer)
def named_int_representer(dumper, data):
return dumper.represent_scalar("tag:yaml.org,2002:int", str(int(data)))
_yaml.add_representer(_NamedInt, named_int_representer)
yaml.add_representer(NamedInt, named_int_representer)
# A device can be identified by a combination of WPID and serial number (for receiver-connected devices)