replaced hard-coded register numbers with constants
also re-worked the battery reading code a bit
This commit is contained in:
parent
bf5fc42f07
commit
9a2a28e0aa
|
@ -98,7 +98,7 @@ class NamedInts(object):
|
|||
self.__dict__ = values
|
||||
self._values = sorted(list(values.values()))
|
||||
self._indexed = {int(v): v for v in self._values}
|
||||
assert len(values) == len(self._indexed), "(%d) %r\n=> (%d) %r" % (len(values), values, len(self._indexed), self._indexed)
|
||||
# assert len(values) == len(self._indexed), "(%d) %r\n=> (%d) %r" % (len(values), values, len(self._indexed), self._indexed)
|
||||
self._fallback = None
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -6,7 +6,9 @@ from __future__ import absolute_import, division, print_function, unicode_litera
|
|||
|
||||
from . import hidpp10 as _hidpp10
|
||||
from .common import NamedInts as _NamedInts
|
||||
from .settings_templates import Register as _R, Feature as _F
|
||||
from .settings_templates import RegisterSettings as _RS, FeatureSettings as _FS
|
||||
|
||||
_R = _hidpp10.REGISTERS
|
||||
|
||||
#
|
||||
#
|
||||
|
@ -118,29 +120,29 @@ _D('Wireless Keyboard K270')
|
|||
_D('Wireless Keyboard K350')
|
||||
_D('Wireless Keyboard K360', protocol=2.0, wpid='4004',
|
||||
settings=[
|
||||
_F.fn_swap()
|
||||
_FS.fn_swap()
|
||||
],
|
||||
)
|
||||
_D('Wireless Touch Keyboard K400', protocol=2.0, wpid='4024',
|
||||
settings=[
|
||||
_F.fn_swap()
|
||||
_FS.fn_swap()
|
||||
],
|
||||
)
|
||||
_D('Wireless Keyboard MK700', protocol=1.0, wpid='2008',
|
||||
registers={'battery_charge': -0x0D, 'battery_status': 0x07},
|
||||
registers=(_R.battery_status, ),
|
||||
settings=[
|
||||
_R.fn_swap(),
|
||||
_RS.fn_swap(),
|
||||
],
|
||||
)
|
||||
_D('Wireless Solar Keyboard K750', protocol=2.0, wpid='4002',
|
||||
settings=[
|
||||
_F.fn_swap()
|
||||
_FS.fn_swap()
|
||||
],
|
||||
)
|
||||
_D('Wireless Illuminated Keyboard K800', protocol=1.0, wpid='2010',
|
||||
registers={'battery_charge': -0x0D, 'battery_status': 0x07, '3leds': 0x51},
|
||||
registers=(_R.battery_status, _R.three_leds, ),
|
||||
settings=[
|
||||
_R.fn_swap(),
|
||||
_RS.fn_swap(),
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -152,7 +154,7 @@ _D('Wireless Mouse M187', protocol=1.0)
|
|||
_D('Wireless Mouse M215', protocol=1.0, wpid='1020')
|
||||
_D('Wireless Mouse M235', protocol=1.0)
|
||||
_D('Wireless Mouse M305', protocol=1.0, wpid='101F',
|
||||
registers={'battery_charge': -0x0D, 'battery_status': 0x07},
|
||||
registers=(_R.battery_status, ),
|
||||
)
|
||||
_D('Wireless Mouse M310', protocol=1.0)
|
||||
_D('Wireless Mouse M315', protocol=1.0)
|
||||
|
@ -161,18 +163,18 @@ _D('Wireless Mouse M325')
|
|||
_D('Wireless Mouse M345')
|
||||
_D('Wireless Mouse M505')
|
||||
_D('Wireless Mouse M510', protocol=1.0, wpid='1025',
|
||||
registers={'battery_charge': -0x0D, 'battery_status': 0x07},
|
||||
registers=(_R.battery_status, ),
|
||||
settings=[
|
||||
_R.smooth_scroll(),
|
||||
_RS.smooth_scroll(),
|
||||
],
|
||||
)
|
||||
_D('Couch Mouse M515', protocol=2.0)
|
||||
_D('Wireless Mouse M525', protocol=2.0)
|
||||
_D('Touch Mouse M600', protocol=2.0, wpid='401A')
|
||||
_D('Marathon Mouse M705', protocol=1.0, wpid='101B',
|
||||
registers={'battery_charge': 0x0D},
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[
|
||||
_R.smooth_scroll(),
|
||||
_RS.smooth_scroll(),
|
||||
],
|
||||
)
|
||||
_D('Zone Touch Mouse T400')
|
||||
|
@ -180,9 +182,9 @@ _D('Touch Mouse T620')
|
|||
_D('Logitech Cube', kind=_hidpp10.DEVICE_KIND.mouse, protocol=2.0)
|
||||
_D('Anywhere Mouse MX', codename='Anywhere MX', protocol=1.0, wpid='1017')
|
||||
_D('Performance Mouse MX', codename='Performance MX', protocol=1.0, wpid='101A',
|
||||
registers={'battery_charge': -0x0D, 'battery_status': 0x07, '3leds': 0x51},
|
||||
registers=(_R.battery_status, _R.three_leds, ),
|
||||
settings=[
|
||||
_R.dpi(choices=_PERFORMANCE_MX_DPIS),
|
||||
_RS.dpi(choices=_PERFORMANCE_MX_DPIS),
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -201,8 +203,8 @@ _D('Wireless Touchpad', codename='Wireless Touch', protocol=2.0, wpid='4011')
|
|||
#
|
||||
|
||||
_D('VX Nano Cordless Laser Mouse', codename='VX Nano', protocol=1.0, wpid='100F',
|
||||
registers={'battery_charge': 0x0D, 'battery_status': -0x07},
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[
|
||||
_R.smooth_scroll(),
|
||||
_RS.smooth_scroll(),
|
||||
],
|
||||
)
|
||||
|
|
|
@ -59,7 +59,7 @@ NOTIFICATION_FLAG = _NamedInts(
|
|||
keyboard_multimedia_raw=0x010000, # consumer controls such as Mute and Calculator
|
||||
# reserved_r1b4= 0x001000, # unknown, seen on a unifying receiver
|
||||
software_present= 0x000800, # .. no idea
|
||||
keyboard_backlight= 0x000200, # illumination brightness level changes (by pressing keys)
|
||||
keyboard_illumination= 0x000200, # illumination brightness level changes (by pressing keys)
|
||||
wireless= 0x000100, # notify when the device wireless goes on/off-line
|
||||
)
|
||||
|
||||
|
@ -90,43 +90,49 @@ BATTERY_APPOX = _NamedInts(
|
|||
good = 50,
|
||||
full = 90)
|
||||
|
||||
"""Known registers.
|
||||
Devices usually have a (small) sub-set of these. Some registers are only
|
||||
applicable to certain device kinds (e.g. smooth_scroll only applies to mice."""
|
||||
REGISTERS = _NamedInts(
|
||||
# only apply to receivers
|
||||
receiver_connection=0x02,
|
||||
receiver_pairing=0xB2,
|
||||
devices_activity=0x2B3,
|
||||
receiver_info=0x2B5,
|
||||
|
||||
# only apply to devices
|
||||
mouse_smooth_scroll=0x01,
|
||||
keyboard_hand_detection=0x01,
|
||||
battery_status=0x07,
|
||||
keyboard_fn_swap=0x09,
|
||||
battery_charge=0x0D,
|
||||
keyboard_illumination=0x17,
|
||||
three_leds=0x51,
|
||||
mouse_dpi=0x63,
|
||||
|
||||
# apply to both
|
||||
notifications=0x00,
|
||||
firmware=0xF1,
|
||||
)
|
||||
|
||||
#
|
||||
# functions
|
||||
#
|
||||
|
||||
def read_register(device, register_number, *params):
|
||||
assert device
|
||||
# support long registers by adding a 2 in front of the number
|
||||
# support long registers by adding a 2 in front of the register number
|
||||
request_id = 0x8100 | (int(register_number) & 0x2FF)
|
||||
return device.request(request_id, *params)
|
||||
|
||||
|
||||
def write_register(device, register_number, *value):
|
||||
assert device
|
||||
# support long registers by adding a 2 in front of the number
|
||||
# support long registers by adding a 2 in front of the register number
|
||||
request_id = 0x8000 | (int(register_number) & 0x2FF)
|
||||
return device.request(request_id, *value)
|
||||
|
||||
|
||||
def get_register(device, name, default_number=-1):
|
||||
assert device
|
||||
assert device.kind is not None
|
||||
if not device.online:
|
||||
return
|
||||
|
||||
known_register = device.registers.get(name)
|
||||
register = known_register or default_number
|
||||
if register > 0:
|
||||
reply = read_register(device, register)
|
||||
if reply:
|
||||
return reply
|
||||
|
||||
if not known_register and device.kind is not None and device.online:
|
||||
_log.warn("%s: failed to read register '%s' (0x%02X), blacklisting",
|
||||
device, name, default_number)
|
||||
device.registers[name] = -default_number
|
||||
|
||||
|
||||
def get_battery(device):
|
||||
assert device
|
||||
assert device.kind is not None
|
||||
|
@ -138,76 +144,96 @@ def get_battery(device):
|
|||
# let's just assume HID++ 2.0 devices do not provide the battery info in a register
|
||||
return
|
||||
|
||||
reply = get_register(device, 'battery_charge', 0x0D)
|
||||
for r in (REGISTERS.battery_status, REGISTERS.battery_charge):
|
||||
if r in device.registers:
|
||||
reply = read_register(device, r)
|
||||
if reply:
|
||||
return parse_battery_status(r, reply)
|
||||
return
|
||||
|
||||
# the descriptor does not tell us which register this device has, try them both
|
||||
reply = read_register(device, REGISTERS.battery_charge)
|
||||
if reply:
|
||||
level = ord(reply[:1])
|
||||
battery_status = ord(reply[2:3])
|
||||
return parse_battery_reply_0D(level, battery_status)
|
||||
# remember this for the next time
|
||||
device.registers.append(REGISTERS.battery_charge)
|
||||
return parse_battery_status(REGISTERS.battery_charge, reply)
|
||||
|
||||
reply = get_register(device, 'battery_status', 0x07)
|
||||
reply = read_register(device, REGISTERS.battery_status)
|
||||
if reply:
|
||||
level = ord(reply[:1])
|
||||
battery_status = ord(reply[1:2])
|
||||
return parse_battery_reply_07(level, battery_status)
|
||||
# remember this for the next time
|
||||
device.registers.append(REGISTERS.battery_status)
|
||||
return parse_battery_status(REGISTERS.battery_status, reply)
|
||||
|
||||
def parse_battery_reply_0D(level, battery_status):
|
||||
charge = level
|
||||
status = battery_status & 0xF0
|
||||
status = ('discharging' if status == 0x30
|
||||
else 'charging' if status == 0x50
|
||||
else 'fully charged' if status == 0x90
|
||||
else None)
|
||||
return charge, status
|
||||
|
||||
def parse_battery_reply_07(level, battery_status):
|
||||
charge = (BATTERY_APPOX.full if level == 7 # full
|
||||
else BATTERY_APPOX.good if level == 5 # good
|
||||
else BATTERY_APPOX.low if level == 3 # low
|
||||
else BATTERY_APPOX.critical if level == 1 # critical
|
||||
else BATTERY_APPOX.empty ) # wtf?
|
||||
def parse_battery_status(register, reply):
|
||||
if register == REGISTERS.battery_charge:
|
||||
charge = ord(reply[:1])
|
||||
status_byte = ord(reply[2:3]) & 0xF0
|
||||
status_text = ('discharging' if status_byte == 0x30
|
||||
else 'charging' if status_byte == 0x50
|
||||
else 'fully charged' if status_byte == 0x90
|
||||
else None)
|
||||
return charge, status_text
|
||||
|
||||
if battery_status == 0x00:
|
||||
status = 'discharging'
|
||||
elif battery_status & 0x21 == 0x21:
|
||||
status = 'charging'
|
||||
elif battery_status & 0x22 == 0x22:
|
||||
status = 'fully charged'
|
||||
else:
|
||||
_log.warn("could not parse 0x07 battery status: %02X (level %02X)", battery_status, level)
|
||||
status = None
|
||||
if register == REGISTERS.battery_status:
|
||||
status_byte = ord(reply[:1])
|
||||
charge = (BATTERY_APPOX.full if status_byte == 7 # full
|
||||
else BATTERY_APPOX.good if status_byte == 5 # good
|
||||
else BATTERY_APPOX.low if status_byte == 3 # low
|
||||
else BATTERY_APPOX.critical if status_byte == 1 # critical
|
||||
# pure 'charging' notifications may come without a status
|
||||
else BATTERY_APPOX.empty)
|
||||
|
||||
if battery_status & 0x03 and level == 0:
|
||||
# some 'charging' notifications may come with no battery level information
|
||||
charge = None
|
||||
charging_byte = ord(reply[1:2])
|
||||
if charging_byte == 0x00:
|
||||
status_text = 'discharging'
|
||||
elif charging_byte & 0x21 == 0x21:
|
||||
status_text = 'charging'
|
||||
elif charging_byte & 0x22 == 0x22:
|
||||
status_text = 'fully charged'
|
||||
else:
|
||||
_log.warn("could not parse 0x07 battery status: %02X (level %02X)", charging_byte, status_byte)
|
||||
status_text = None
|
||||
|
||||
return charge, status
|
||||
if charging_byte & 0x03 and status_byte == 0:
|
||||
# some 'charging' notifications may come with no battery level information
|
||||
charge = None
|
||||
|
||||
return charge, status_text
|
||||
|
||||
|
||||
def get_firmware(device):
|
||||
assert device
|
||||
|
||||
firmware = [None, None]
|
||||
firmware = [None, None, None]
|
||||
|
||||
reply = read_register(device, 0xF1, 0x01)
|
||||
reply = read_register(device, REGISTERS.firmware, 0x01)
|
||||
if not reply:
|
||||
# won't be able to read any of it now...
|
||||
return
|
||||
|
||||
fw_version = _strhex(reply[1:3])
|
||||
fw_version = '%s.%s' % (fw_version[0:2], fw_version[2:4])
|
||||
reply = read_register(device, 0xF1, 0x02)
|
||||
reply = read_register(device, REGISTERS.firmware, 0x02)
|
||||
if reply:
|
||||
fw_version += '.B' + _strhex(reply[1:3])
|
||||
fw = _FirmwareInfo(FIRMWARE_KIND.Firmware, '', fw_version, None)
|
||||
firmware[0] = fw
|
||||
|
||||
reply = read_register(device, 0xF1, 0x04)
|
||||
reply = read_register(device, REGISTERS.firmware, 0x04)
|
||||
if reply:
|
||||
bl_version = _strhex(reply[1:3])
|
||||
bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4])
|
||||
bl = _FirmwareInfo(FIRMWARE_KIND.Bootloader, '', bl_version, None)
|
||||
firmware[1] = bl
|
||||
|
||||
reply = read_register(device, REGISTERS.firmware, 0x03)
|
||||
if reply:
|
||||
o_version = _strhex(reply[1:3])
|
||||
o_version = '%s.%s' % (o_version[0:2], o_version[2:4])
|
||||
o = _FirmwareInfo(FIRMWARE_KIND.Other, '', o_version, None)
|
||||
firmware[2] = o
|
||||
|
||||
if any(firmware):
|
||||
return tuple(f for f in firmware if f)
|
||||
|
||||
|
@ -218,8 +244,7 @@ def set_3leds(device, battery_level=None, charging=None, warning=None):
|
|||
if not device.online:
|
||||
return
|
||||
|
||||
leds_register = device.registers.get('3leds')
|
||||
if leds_register is None or leds_register < 0:
|
||||
if REGISTERS.three_leds not in device.registers:
|
||||
return
|
||||
|
||||
if battery_level is not None:
|
||||
|
@ -243,6 +268,9 @@ def set_3leds(device, battery_level=None, charging=None, warning=None):
|
|||
# set the blinking flag for the leds already set
|
||||
v1 |= (v1 >> 1)
|
||||
v2 |= (v2 >> 1)
|
||||
elif charging:
|
||||
# blink all green
|
||||
v1, v2 = 0x30,0x33
|
||||
elif warning:
|
||||
# 1 red
|
||||
v1, v2 = 0x02, 0x00
|
||||
|
@ -250,19 +278,21 @@ def set_3leds(device, battery_level=None, charging=None, warning=None):
|
|||
# turn off all leds
|
||||
v1, v2 = 0x11, 0x11
|
||||
|
||||
write_register(device, leds_register, v1, v2)
|
||||
write_register(device, REGISTERS.three_leds, v1, v2)
|
||||
|
||||
|
||||
def get_notification_flags(device):
|
||||
assert device
|
||||
|
||||
# Avoid a call if the device is not online,
|
||||
# or the device does not support registers.
|
||||
if device.kind is not None:
|
||||
# peripherals with protocol >= 2.0 don't support registers
|
||||
p = device.protocol
|
||||
if p is not None and p >= 2.0:
|
||||
return
|
||||
|
||||
flags = read_register(device, 0x00)
|
||||
flags = read_register(device, REGISTERS.notifications)
|
||||
if flags is not None:
|
||||
assert len(flags) == 3
|
||||
return _bytes2int(flags)
|
||||
|
@ -271,6 +301,8 @@ def get_notification_flags(device):
|
|||
def set_notification_flags(device, *flag_bits):
|
||||
assert device
|
||||
|
||||
# Avoid a call if the device is not online,
|
||||
# or the device does not support registers.
|
||||
if device.kind is not None:
|
||||
# peripherals with protocol >= 2.0 don't support registers
|
||||
p = device.protocol
|
||||
|
@ -279,5 +311,5 @@ def set_notification_flags(device, *flag_bits):
|
|||
|
||||
flag_bits = sum(int(b) for b in flag_bits)
|
||||
assert flag_bits & 0x00FFFFFF == flag_bits
|
||||
result = write_register(device, 0x00, _int2bytes(flag_bits, 3))
|
||||
result = write_register(device, REGISTERS.notifications, _int2bytes(flag_bits, 3))
|
||||
return result is not None
|
||||
|
|
|
@ -18,6 +18,8 @@ from .common import strhex as _strhex
|
|||
from .descriptors import DEVICES as _DESCRIPTORS
|
||||
from .settings_templates import check_feature_settings as _check_feature_settings
|
||||
|
||||
_R = _hidpp10.REGISTERS
|
||||
|
||||
#
|
||||
#
|
||||
#
|
||||
|
@ -66,7 +68,7 @@ class PairedDevice(object):
|
|||
self._kind = _hidpp10.DEVICE_KIND[kind]
|
||||
else:
|
||||
# force a reading of the wpid
|
||||
pair_info = receiver.read_register(0x2B5, 0x20 + number - 1)
|
||||
pair_info = receiver.read_register(_R.receiver_info, 0x20 + number - 1)
|
||||
if pair_info:
|
||||
# may be either a Unifying receiver, or an Unifying-ready receiver
|
||||
self.wpid = _strhex(pair_info[3:5])
|
||||
|
@ -76,7 +78,7 @@ class PairedDevice(object):
|
|||
|
||||
else:
|
||||
# unifying protocol not supported, must be a Nano receiver
|
||||
device_info = self.receiver.read_register(0x2B5, 0x04)
|
||||
device_info = self.receiver.read_register(_R.receiver_info, 0x04)
|
||||
if device_info is None:
|
||||
_log.error("failed to read Nano wpid for device %d of %s", number, receiver)
|
||||
raise _base.NoSuchDevice(nuber=number, receiver=receiver, error="read Nano wpid")
|
||||
|
@ -93,7 +95,7 @@ class PairedDevice(object):
|
|||
if self.descriptor is None:
|
||||
# Last chance to correctly identify the device; many Nano receivers
|
||||
# do not support this call.
|
||||
codename = self.receiver.read_register(0x2B5, 0x40 + self.number - 1)
|
||||
codename = self.receiver.read_register(_R.receiver_info, 0x40 + self.number - 1)
|
||||
if codename:
|
||||
self._codename = codename[2:].rstrip(b'\x00').decode('utf-8')
|
||||
self.descriptor = _DESCRIPTORS.get(self._codename)
|
||||
|
@ -125,7 +127,7 @@ class PairedDevice(object):
|
|||
@property
|
||||
def codename(self):
|
||||
if self._codename is None:
|
||||
codename = self.receiver.read_register(0x2B5, 0x40 + self.number - 1)
|
||||
codename = self.receiver.read_register(_R.receiver_info, 0x40 + self.number - 1)
|
||||
if codename:
|
||||
self._codename = codename[2:].rstrip(b'\x00').decode('utf-8')
|
||||
# _log.debug("device %d codename %s", self.number, self._codename)
|
||||
|
@ -143,7 +145,7 @@ class PairedDevice(object):
|
|||
@property
|
||||
def kind(self):
|
||||
if self._kind is None:
|
||||
pair_info = self.receiver.read_register(0x2B5, 0x20 + self.number - 1)
|
||||
pair_info = self.receiver.read_register(_R.receiver_info, 0x20 + self.number - 1)
|
||||
if pair_info:
|
||||
kind = ord(pair_info[7:8]) & 0x0F
|
||||
self._kind = _hidpp10.DEVICE_KIND[kind]
|
||||
|
@ -164,7 +166,7 @@ class PairedDevice(object):
|
|||
@property
|
||||
def serial(self):
|
||||
if self._serial is None:
|
||||
serial = self.receiver.read_register(0x2B5, 0x30 + self.number - 1)
|
||||
serial = self.receiver.read_register(_R.receiver_info, 0x30 + self.number - 1)
|
||||
if serial:
|
||||
ps = ord(serial[9:10]) & 0x0F
|
||||
self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps]
|
||||
|
@ -182,7 +184,7 @@ class PairedDevice(object):
|
|||
@property
|
||||
def power_switch_location(self):
|
||||
if self._power_switch is None:
|
||||
ps = self.receiver.read_register(0x2B5, 0x30 + self.number - 1)
|
||||
ps = self.receiver.read_register(_R.receiver_info, 0x30 + self.number - 1)
|
||||
if ps is not None:
|
||||
ps = ord(ps[9:10]) & 0x0F
|
||||
self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps]
|
||||
|
@ -193,7 +195,7 @@ class PairedDevice(object):
|
|||
@property
|
||||
def polling_rate(self):
|
||||
if self._polling_rate is None:
|
||||
pair_info = self.receiver.read_register(0x2B5, 0x20 + self.number - 1)
|
||||
pair_info = self.receiver.read_register(_R.receiver_info, 0x20 + self.number - 1)
|
||||
if pair_info:
|
||||
self._polling_rate = ord(pair_info[2:3])
|
||||
else:
|
||||
|
@ -211,9 +213,9 @@ class PairedDevice(object):
|
|||
def registers(self):
|
||||
if self._registers is None:
|
||||
if self.descriptor and self.descriptor.registers:
|
||||
self._registers = dict(self.descriptor.registers)
|
||||
self._registers = list(self.descriptor.registers)
|
||||
else:
|
||||
self._registers = {}
|
||||
self._registers = []
|
||||
return self._registers
|
||||
|
||||
@property
|
||||
|
@ -235,7 +237,7 @@ class PairedDevice(object):
|
|||
|
||||
if enable:
|
||||
set_flag_bits = ( _hidpp10.NOTIFICATION_FLAG.battery_status
|
||||
| _hidpp10.NOTIFICATION_FLAG.keyboard_backlight
|
||||
| _hidpp10.NOTIFICATION_FLAG.keyboard_illumination
|
||||
| _hidpp10.NOTIFICATION_FLAG.wireless
|
||||
| _hidpp10.NOTIFICATION_FLAG.software_present )
|
||||
else:
|
||||
|
@ -308,7 +310,7 @@ class Receiver(object):
|
|||
|
||||
# read the serial immediately, so we can find out max_devices
|
||||
# this will tell us if it's a Unifying or Nano receiver
|
||||
serial_reply = self.read_register(0x2B5, 0x03)
|
||||
serial_reply = self.read_register(_R.receiver_info, 0x03)
|
||||
assert serial_reply
|
||||
self.serial = _strhex(serial_reply[1:5])
|
||||
self.max_devices = ord(serial_reply[6:7])
|
||||
|
@ -363,7 +365,7 @@ class Receiver(object):
|
|||
def notify_devices(self):
|
||||
"""Scan all devices."""
|
||||
if self.handle:
|
||||
if not self.write_register(0x02, 0x02):
|
||||
if not self.write_register(_R.receiver_connection, 0x02):
|
||||
_log.warn("%s: failed to trigger device link notifications", self)
|
||||
|
||||
def register_new_device(self, number, notification=None):
|
||||
|
@ -387,14 +389,14 @@ class Receiver(object):
|
|||
|
||||
def set_lock(self, lock_closed=True, device=0, timeout=0):
|
||||
if self.handle:
|
||||
lock = 0x02 if lock_closed else 0x01
|
||||
reply = self.write_register(0xB2, lock, device, timeout)
|
||||
action = 0x02 if lock_closed else 0x01
|
||||
reply = self.write_register(_R.receiver_pairing, action, device, timeout)
|
||||
if reply:
|
||||
return True
|
||||
_log.warn("%s: failed to %s the receiver lock", self, 'close' if lock_closed else 'open')
|
||||
|
||||
def count(self):
|
||||
count = self.read_register(0x02)
|
||||
count = self.read_register(_R.receiver_connection)
|
||||
return 0 if count is None else ord(count[1:2])
|
||||
|
||||
# def has_devices(self):
|
||||
|
@ -436,11 +438,12 @@ class Receiver(object):
|
|||
raise IndexError(key)
|
||||
|
||||
dev = self._devices[key]
|
||||
reply = self.write_register(0xB2, 0x03, int(key))
|
||||
action = 0x03
|
||||
reply = self.write_register(_R.receiver_pairing, action, int(key))
|
||||
if reply:
|
||||
# invalidate the device
|
||||
dev.wpid = None
|
||||
dev.online = False
|
||||
dev.wpid = None
|
||||
del self._devices[key]
|
||||
_log.warn("%s unpaired device %s", self, dev)
|
||||
else:
|
||||
|
|
|
@ -15,6 +15,10 @@ from .settings import (
|
|||
ChoicesValidator as _ChoicesV,
|
||||
)
|
||||
|
||||
_DK = _hidpp10.DEVICE_KIND
|
||||
_R = _hidpp10.REGISTERS
|
||||
_F = _hidpp20.FEATURE
|
||||
|
||||
#
|
||||
# pre-defined basic setting descriptors
|
||||
#
|
||||
|
@ -64,27 +68,27 @@ _FN_SWAP = ('fn-swap', 'Swap Fx function',
|
|||
#
|
||||
#
|
||||
|
||||
def _register_fn_swap(register=0x09, true_value=b'\x00\x01', mask=b'\x00\x01'):
|
||||
def _register_fn_swap(register=_R.keyboard_fn_swap, true_value=b'\x00\x01', mask=b'\x00\x01'):
|
||||
return register_toggle(_FN_SWAP[0], register, true_value=true_value, mask=mask,
|
||||
label=_FN_SWAP[1], description=_FN_SWAP[2],
|
||||
device_kind=_hidpp10.DEVICE_KIND.keyboard)
|
||||
device_kind=_DK.keyboard)
|
||||
|
||||
def _register_smooth_scroll(register=0x01, true_value=0x40, mask=0x40):
|
||||
def _register_smooth_scroll(register=_R.mouse_smooth_scroll, true_value=0x40, mask=0x40):
|
||||
return register_toggle(_SMOOTH_SCROLL[0], register, true_value=true_value, mask=mask,
|
||||
label=_SMOOTH_SCROLL[1], description=_SMOOTH_SCROLL[2],
|
||||
device_kind=_hidpp10.DEVICE_KIND.mouse)
|
||||
device_kind=_DK.mouse)
|
||||
|
||||
def _register_dpi(register=0x63, choices=None):
|
||||
def _register_dpi(register=_R.mouse_dpi, choices=None):
|
||||
return register_choices(_DPI[0], register, choices,
|
||||
label=_DPI[1], description=_DPI[2],
|
||||
device_kind=_hidpp10.DEVICE_KIND.mouse)
|
||||
device_kind=_DK.mouse)
|
||||
|
||||
|
||||
def _feature_fn_swap():
|
||||
return feature_toggle(_FN_SWAP[0], _hidpp20.FEATURE.FN_INVERSION,
|
||||
return feature_toggle(_FN_SWAP[0], _F.FN_INVERSION,
|
||||
write_returns_value=True,
|
||||
label=_FN_SWAP[1], description=_FN_SWAP[2],
|
||||
device_kind=_hidpp10.DEVICE_KIND.keyboard)
|
||||
device_kind=_DK.keyboard)
|
||||
|
||||
|
||||
#
|
||||
|
@ -101,14 +105,14 @@ _SETTINGS_LIST = namedtuple('_SETTINGS_LIST', [
|
|||
])
|
||||
del namedtuple
|
||||
|
||||
Register = _SETTINGS_LIST(
|
||||
RegisterSettings = _SETTINGS_LIST(
|
||||
fn_swap=_register_fn_swap,
|
||||
smooth_scroll=_register_smooth_scroll,
|
||||
dpi=_register_dpi,
|
||||
hand_detection=None,
|
||||
typing_illumination=None,
|
||||
)
|
||||
Feature = _SETTINGS_LIST(
|
||||
FeatureSettings = _SETTINGS_LIST(
|
||||
fn_swap=_feature_fn_swap,
|
||||
smooth_scroll=None,
|
||||
dpi=None,
|
||||
|
@ -128,6 +132,6 @@ def check_feature_settings(device, already_known):
|
|||
return
|
||||
if device.protocol is not None and device.protocol < 2.0:
|
||||
return
|
||||
if not any(s.name == _FN_SWAP[0] for s in already_known) and _hidpp20.FEATURE.FN_INVERSION in device.features:
|
||||
fn_swap = Feature.fn_swap()
|
||||
if not any(s.name == _FN_SWAP[0] for s in already_known) and _F.FN_INVERSION in device.features:
|
||||
fn_swap = FeatureSettings.fn_swap()
|
||||
already_known.append(fn_swap(device))
|
||||
|
|
|
@ -24,6 +24,8 @@ from .common import NamedInts as _NamedInts, NamedInt as _NamedInt, strhex as _s
|
|||
from . import hidpp10 as _hidpp10
|
||||
from . import hidpp20 as _hidpp20
|
||||
|
||||
_R = _hidpp10.REGISTERS
|
||||
|
||||
#
|
||||
#
|
||||
#
|
||||
|
@ -82,18 +84,18 @@ class ReceiverStatus(dict):
|
|||
# self.updated = _timestamp()
|
||||
self._changed_callback(self._receiver, alert=alert, reason=reason)
|
||||
|
||||
def poll(self, timestamp):
|
||||
r = self._receiver
|
||||
assert r
|
||||
|
||||
if _log.isEnabledFor(_DEBUG):
|
||||
_log.debug("polling status of %s", r)
|
||||
|
||||
# make sure to read some stuff that may be read later by the UI
|
||||
r.serial, r.firmware, None
|
||||
|
||||
# get an update of the notification flags
|
||||
# self[KEYS.NOTIFICATION_FLAGS] = _hidpp10.get_notification_flags(r)
|
||||
# def poll(self, timestamp):
|
||||
# r = self._receiver
|
||||
# assert r
|
||||
#
|
||||
# if _log.isEnabledFor(_DEBUG):
|
||||
# _log.debug("polling status of %s", r)
|
||||
#
|
||||
# # make sure to read some stuff that may be read later by the UI
|
||||
# r.serial, r.firmware, None
|
||||
#
|
||||
# # get an update of the notification flags
|
||||
# # self[KEYS.NOTIFICATION_FLAGS] = _hidpp10.get_notification_flags(r)
|
||||
|
||||
def process_notification(self, n):
|
||||
if n.sub_id == 0x4A:
|
||||
|
@ -271,40 +273,40 @@ class DeviceStatus(dict):
|
|||
# _log.debug("device %d changed: active=%s %s", self._device.number, self._active, dict(self))
|
||||
self._changed_callback(d, alert, reason)
|
||||
|
||||
def poll(self, timestamp):
|
||||
d = self._device
|
||||
if not d:
|
||||
_log.error("polling status of invalid device")
|
||||
return
|
||||
|
||||
if self._active:
|
||||
if _log.isEnabledFor(_DEBUG):
|
||||
_log.debug("polling status of %s", d)
|
||||
|
||||
# read these from the device, the UI may need them later
|
||||
d.protocol, d.serial, d.firmware, d.kind, d.name, d.settings, None
|
||||
|
||||
# make sure we know all the features of the device
|
||||
# if d.features:
|
||||
# d.features[:]
|
||||
|
||||
# devices may go out-of-range while still active, or the computer
|
||||
# may go to sleep and wake up without the devices available
|
||||
if timestamp - self.updated > _STATUS_TIMEOUT:
|
||||
if d.ping():
|
||||
timestamp = self.updated = _timestamp()
|
||||
else:
|
||||
self._changed(active=False, reason='out of range')
|
||||
|
||||
# if still active, make sure we know the battery level
|
||||
if KEYS.BATTERY_LEVEL not in self:
|
||||
self.read_battery(timestamp)
|
||||
|
||||
elif timestamp - self.updated > _STATUS_TIMEOUT:
|
||||
if d.ping():
|
||||
self._changed(active=True)
|
||||
else:
|
||||
self.updated = _timestamp()
|
||||
# def poll(self, timestamp):
|
||||
# d = self._device
|
||||
# if not d:
|
||||
# _log.error("polling status of invalid device")
|
||||
# return
|
||||
#
|
||||
# if self._active:
|
||||
# if _log.isEnabledFor(_DEBUG):
|
||||
# _log.debug("polling status of %s", d)
|
||||
#
|
||||
# # read these from the device, the UI may need them later
|
||||
# d.protocol, d.serial, d.firmware, d.kind, d.name, d.settings, None
|
||||
#
|
||||
# # make sure we know all the features of the device
|
||||
# # if d.features:
|
||||
# # d.features[:]
|
||||
#
|
||||
# # devices may go out-of-range while still active, or the computer
|
||||
# # may go to sleep and wake up without the devices available
|
||||
# if timestamp - self.updated > _STATUS_TIMEOUT:
|
||||
# if d.ping():
|
||||
# timestamp = self.updated = _timestamp()
|
||||
# else:
|
||||
# self._changed(active=False, reason='out of range')
|
||||
#
|
||||
# # if still active, make sure we know the battery level
|
||||
# if KEYS.BATTERY_LEVEL not in self:
|
||||
# self.read_battery(timestamp)
|
||||
#
|
||||
# elif timestamp - self.updated > _STATUS_TIMEOUT:
|
||||
# if d.ping():
|
||||
# self._changed(active=True)
|
||||
# else:
|
||||
# self.updated = _timestamp()
|
||||
|
||||
def process_notification(self, n):
|
||||
# incoming packets with SubId >= 0x80 are supposedly replies from
|
||||
|
@ -319,9 +321,9 @@ class DeviceStatus(dict):
|
|||
if self._device.protocol < 2.0:
|
||||
# README assuming HID++ 2.0 devices don't use the 0x07/0x0D registers
|
||||
# however, this has not been fully verified yet
|
||||
if n.sub_id in (0x07, 0x0D) and len(n.data) == 3 and n.data[2:3] == b'\x00':
|
||||
if n.sub_id in (_R.battery_charge, _R.battery_status) and len(n.data) == 3 and n.data[2:3] == b'\x00':
|
||||
return self._process_hidpp10_custom_notification(n)
|
||||
if n.sub_id == 0x17 and len(n.data) == 3:
|
||||
if n.sub_id == _R.illumination and len(n.data) == 3:
|
||||
return self._process_hidpp10_custom_notification(n)
|
||||
else:
|
||||
# assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications
|
||||
|
@ -337,22 +339,16 @@ class DeviceStatus(dict):
|
|||
if _log.isEnabledFor(_DEBUG):
|
||||
_log.debug("%s (%s) custom battery notification %s", self._device, self._device.protocol, n)
|
||||
|
||||
if n.sub_id == 0x07:
|
||||
# message layout: 10 ix 07("address") <LEVEL> <STATUS> 00 00
|
||||
level, status = _hidpp10.parse_battery_reply_07(n.address, ord(n.data[:1]))
|
||||
if n.sub_id in (_R.battery_status, _R.battery_charge):
|
||||
data = '%c%s' % (n.address, n.data)
|
||||
level, status = _hidpp10.parse_battery_status(n.sub_id, data)
|
||||
self.set_battery_info(level, status)
|
||||
return True
|
||||
|
||||
if n.sub_id == 0x0D:
|
||||
# message layout: 10 ix 0D("address") <CHARGE> <?> <STATUS> 00
|
||||
level, status = _hidpp10.parse_battery_reply_0D(n.address, ord(n.data[1:2]))
|
||||
self.set_battery_info(level, status)
|
||||
return True
|
||||
|
||||
if n.sub_id == 0x17:
|
||||
if n.sub_id == _R.illumination:
|
||||
# message layout: 10 ix 17("address") <??> <?> <??> <light level 1=off..5=max>
|
||||
# TODO anything we can do with this?
|
||||
_log.info("backlight event: %s", n)
|
||||
_log.info("illumination event: %s", n)
|
||||
return True
|
||||
|
||||
_log.warn("%s: unrecognized %s", self._device, n)
|
||||
|
|
|
@ -101,7 +101,7 @@ def _print_receiver(receiver, verbose=False):
|
|||
else:
|
||||
print (" Notifications: (none)")
|
||||
|
||||
activity = receiver.read_register(0x2B3)
|
||||
activity = receiver.read_register(hidpp10.REGISTERS.devices_activity)
|
||||
if activity:
|
||||
activity = [(d, ord(activity[d - 1:d])) for d in range(1, receiver.max_devices)]
|
||||
activity_text = ', '.join(('%d=%d' % (d, a)) for d, a in activity if a > 0)
|
||||
|
|
Loading…
Reference in New Issue