re-worked (AGAIN) the way the devices are initially set-up
There is absolutely no consistency between the registers and features receivers have, even if they're the same product_id!
This commit is contained in:
parent
20aa797e96
commit
3436055c7f
|
@ -154,9 +154,9 @@ class FeaturesArray(object):
|
|||
return False
|
||||
|
||||
# I _think_ this is universally true
|
||||
if self.device.protocol < 2.0:
|
||||
if self.device.protocol is not None and self.device.protocol < 2.0:
|
||||
self.supported = False
|
||||
# self.device.features = None
|
||||
self.device.features = None
|
||||
self.device = None
|
||||
return False
|
||||
|
||||
|
@ -312,7 +312,7 @@ class KeysArray(object):
|
|||
#
|
||||
|
||||
def feature_request(device, feature, function=0x00, *params):
|
||||
if device.features:
|
||||
if device.online and device.features:
|
||||
if feature in device.features:
|
||||
feature_index = device.features.index(int(feature))
|
||||
return device.request((feature_index << 8) + (function & 0xFF), *params)
|
||||
|
|
|
@ -15,19 +15,13 @@ from . import base as _base
|
|||
from . import hidpp10 as _hidpp10
|
||||
from . import hidpp20 as _hidpp20
|
||||
from .common import strhex as _strhex
|
||||
from .descriptors import (
|
||||
DEVICES as _DESCRIPTORS,
|
||||
check_features as _check_feature_settings,
|
||||
)
|
||||
from .descriptors import DEVICES as _DESCRIPTORS
|
||||
from .settings_templates import check_feature_settings as _check_feature_settings
|
||||
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
"""A receiver may have a maximum of 6 paired devices at a time."""
|
||||
MAX_PAIRED_DEVICES = 6
|
||||
|
||||
|
||||
class PairedDevice(object):
|
||||
def __init__(self, receiver, number, link_notification=None):
|
||||
assert receiver
|
||||
|
@ -41,7 +35,7 @@ class PairedDevice(object):
|
|||
|
||||
# the Wireless PID is unique per device model
|
||||
self.wpid = None
|
||||
self._descriptor = None
|
||||
self.descriptor = None
|
||||
|
||||
# mose, keyboard, etc (see _hidpp10.DEVICE_KIND)
|
||||
self._kind = None
|
||||
|
@ -64,90 +58,66 @@ class PairedDevice(object):
|
|||
self._polling_rate = None
|
||||
self._power_switch = None
|
||||
|
||||
unifying = self.receiver.unifying_supported
|
||||
|
||||
if link_notification is not None:
|
||||
self.online = bool(ord(link_notification.data[0:1]) & 0x40)
|
||||
self.wpid = _strhex(link_notification.data[2:3] + link_notification.data[1:2])
|
||||
assert link_notification.address == (0x04 if unifying else 0x03)
|
||||
# assert link_notification.address == (0x04 if unifying else 0x03)
|
||||
kind = ord(link_notification.data[0:1]) & 0x0F
|
||||
self._kind = _hidpp10.DEVICE_KIND[kind]
|
||||
|
||||
if unifying:
|
||||
if self.wpid is None:
|
||||
# force a reading of the codename
|
||||
pair_info = receiver.read_register(0x2B5, 0x20 + number - 1)
|
||||
if pair_info is None:
|
||||
raise _base.NoSuchDevice(nuber=number, receiver=receiver, error="read pair info")
|
||||
|
||||
else:
|
||||
# force a reading of the wpid
|
||||
pair_info = receiver.read_register(0x2B5, 0x20 + number - 1)
|
||||
if pair_info:
|
||||
# may be either a Unifying receiver, or an Unifying-ready receiver
|
||||
self.wpid = _strhex(pair_info[3:5])
|
||||
kind = ord(pair_info[7:8]) & 0x0F
|
||||
self._kind = _hidpp10.DEVICE_KIND[kind]
|
||||
self._polling_rate = ord(pair_info[2:3])
|
||||
|
||||
else:
|
||||
self._serial = self.receiver.serial
|
||||
self._polling_rate = 0
|
||||
self._power_switch = '(unknown)'
|
||||
|
||||
if self.wpid is None:
|
||||
else:
|
||||
# unifying protocol not supported, must be a Nano receiver
|
||||
device_info = self.receiver.read_register(0x2B5, 0x04)
|
||||
if device_info is None:
|
||||
_log.error("failed to read Nano wpid for device %d of %s", number, receiver)
|
||||
raise _base.NoSuchDevice(nuber=number, receiver=receiver, error="read Nano wpid")
|
||||
self.wpid = _strhex(device_info[3:5])
|
||||
|
||||
self._descriptor = _DESCRIPTORS.get(self.wpid)
|
||||
if self._descriptor is None:
|
||||
self._codename = self.receiver.wpid
|
||||
# actually there IS a device, just that we can't identify it
|
||||
# raise _base.NoSuchDevice(nuber=number, receiver=receiver, product_id=receiver.product_id, failed="no descriptor")
|
||||
self._name = 'Unknown device ' + self._codename
|
||||
else:
|
||||
self._codename = self._descriptor.codename
|
||||
self._name = self._descriptor.name
|
||||
self.wpid = _strhex(device_info[3:5])
|
||||
self._polling_rate = 0
|
||||
self._power_switch = '(unknown)'
|
||||
|
||||
# the wpid is necessary to properly identify wireless link on/off notifications
|
||||
# also it gets set to None when the device is unpaired
|
||||
# also it gets set to None on this object when the device is unpaired
|
||||
assert self.wpid is not None, "failed to read wpid: device %d of %s" % (number, receiver)
|
||||
|
||||
# knowing the protocol as soon as possible helps reading all other info
|
||||
# and avoids an unecessary ping
|
||||
self.descriptor = _DESCRIPTORS.get(self.wpid)
|
||||
if self.descriptor is None:
|
||||
# Last chance to correctly identify the device; many Nano receivers
|
||||
# do not support this call.
|
||||
codename = self.receiver.read_register(0x2B5, 0x40 + self.number - 1)
|
||||
if codename:
|
||||
self._codename = codename[2:].rstrip(b'\x00').decode('utf-8')
|
||||
self.descriptor = _DESCRIPTORS.get(self._codename)
|
||||
|
||||
if self.descriptor:
|
||||
self._protocol = self.descriptor.protocol if unifying else 1.0 # may be None
|
||||
else:
|
||||
_log.warn("device without descriptor found: %s - %s (%d of %s)", self.wpid, self._codename, number, receiver)
|
||||
self._protocol = None if unifying else 1.0
|
||||
self._name = self.descriptor.name
|
||||
self._protocol = self.descriptor.protocol
|
||||
if self._codename is None:
|
||||
self._codename = self.descriptor.codename
|
||||
if self._kind is None:
|
||||
self._kind = self.descriptor.kind
|
||||
|
||||
if self._protocol is not None:
|
||||
self.features = _hidpp20.FeaturesArray(self) if self._protocol >= 2.0 else None
|
||||
elif unifying:
|
||||
# may be a 2.0 device
|
||||
self.features = _hidpp20.FeaturesArray(self)
|
||||
self.features = None if self._protocol < 2.0 else _hidpp20.FeaturesArray(self)
|
||||
else:
|
||||
self.features = None
|
||||
|
||||
@property
|
||||
def descriptor(self):
|
||||
if self._descriptor is None:
|
||||
self._descriptor = _DESCRIPTORS.get(self.wpid)
|
||||
if self._descriptor is None and self._codename:
|
||||
self._descriptor = _DESCRIPTORS.get(self._codename)
|
||||
return self._descriptor
|
||||
# may be a 2.0 device; if not, it will fix itself later
|
||||
self.features = _hidpp20.FeaturesArray(self)
|
||||
|
||||
@property
|
||||
def protocol(self):
|
||||
if self._protocol is None:
|
||||
if self.descriptor:
|
||||
if self.descriptor.protocol:
|
||||
self._protocol = self.descriptor.protocol
|
||||
else:
|
||||
_log.warn("%s: descriptor has no protocol, should be %0.1f", self, self._protocol)
|
||||
|
||||
if self._protocol is None:
|
||||
self._protocol = _base.ping(self.receiver.handle, self.number)
|
||||
# if the ping failed, the peripheral is (almost) certainly offline
|
||||
self.online = self._protocol is not None
|
||||
self._protocol = _base.ping(self.receiver.handle, self.number)
|
||||
# if the ping failed, the peripheral is (almost) certainly offline
|
||||
self.online = self._protocol is not None
|
||||
|
||||
# _log.debug("device %d protocol %s", self.number, self._protocol)
|
||||
return self._protocol or 0
|
||||
|
@ -155,45 +125,40 @@ class PairedDevice(object):
|
|||
@property
|
||||
def codename(self):
|
||||
if self._codename is None:
|
||||
if self.descriptor:
|
||||
self._codename = self.descriptor.codename
|
||||
elif self.receiver.unifying_supported:
|
||||
codename = self.receiver.read_register(0x2B5, 0x40 + self.number - 1)
|
||||
if codename:
|
||||
self._codename = codename[2:].rstrip(b'\x00').decode('utf-8')
|
||||
# _log.debug("device %d codename %s", self.number, self._codename)
|
||||
return self._codename or '?'
|
||||
codename = self.receiver.read_register(0x2B5, 0x40 + self.number - 1)
|
||||
if codename:
|
||||
self._codename = codename[2:].rstrip(b'\x00').decode('utf-8')
|
||||
# _log.debug("device %d codename %s", self.number, self._codename)
|
||||
else:
|
||||
self._codename = '? (%s)' % self.wpid
|
||||
return self._codename
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if self._name is None:
|
||||
if self.descriptor:
|
||||
self._name = self.descriptor.name
|
||||
elif self.protocol >= 2.0 and self.online:
|
||||
if self.protocol >= 2.0 and self.online:
|
||||
self._name = _hidpp20.get_name(self)
|
||||
return self._name or self.codename or '?'
|
||||
return self._name or ('Unknown device %s' % self.wpid)
|
||||
|
||||
@property
|
||||
def kind(self):
|
||||
if self._kind is None:
|
||||
if self.descriptor:
|
||||
self._kind = self.descriptor.kind
|
||||
elif self.receiver.unifying_supported:
|
||||
pair_info = self.receiver.read_register(0x2B5, 0x20 + self.number - 1)
|
||||
if pair_info:
|
||||
kind = ord(pair_info[7:8]) & 0x0F
|
||||
self._kind = _hidpp10.DEVICE_KIND[kind]
|
||||
if self._kind is None and self.protocol >= 2.0 and self.online:
|
||||
pair_info = self.receiver.read_register(0x2B5, 0x20 + self.number - 1)
|
||||
if pair_info:
|
||||
kind = ord(pair_info[7:8]) & 0x0F
|
||||
self._kind = _hidpp10.DEVICE_KIND[kind]
|
||||
self._polling_rate = ord(pair_info[2:3])
|
||||
elif self.protocol >= 2.0:
|
||||
self._kind = _hidpp20.get_kind(self)
|
||||
return self._kind or '?'
|
||||
|
||||
@property
|
||||
def firmware(self):
|
||||
if self._firmware is None and self.online:
|
||||
if self.protocol < 2.0:
|
||||
self._firmware = _hidpp10.get_firmware(self)
|
||||
else:
|
||||
if self.protocol >= 2.0:
|
||||
self._firmware = _hidpp20.get_firmware(self)
|
||||
else:
|
||||
self._firmware = _hidpp10.get_firmware(self)
|
||||
return self._firmware or ()
|
||||
|
||||
@property
|
||||
|
@ -217,11 +182,10 @@ class PairedDevice(object):
|
|||
@property
|
||||
def power_switch_location(self):
|
||||
if self._power_switch is None:
|
||||
if self.receiver.unifying_supported:
|
||||
ps = self.receiver.read_register(0x2B5, 0x30 + self.number - 1)
|
||||
if ps is not None:
|
||||
ps = ord(ps[9:10]) & 0x0F
|
||||
self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps]
|
||||
ps = self.receiver.read_register(0x2B5, 0x30 + self.number - 1)
|
||||
if ps is not None:
|
||||
ps = ord(ps[9:10]) & 0x0F
|
||||
self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps]
|
||||
else:
|
||||
self._power_switch = '(unknown)'
|
||||
return self._power_switch
|
||||
|
@ -229,10 +193,9 @@ class PairedDevice(object):
|
|||
@property
|
||||
def polling_rate(self):
|
||||
if self._polling_rate is None:
|
||||
if self.receiver.unifying_supported:
|
||||
pair_info = self.receiver.read_register(0x2B5, 0x20 + self.number - 1)
|
||||
if pair_info:
|
||||
self._polling_rate = ord(pair_info[2:3])
|
||||
pair_info = self.receiver.read_register(0x2B5, 0x20 + self.number - 1)
|
||||
if pair_info:
|
||||
self._polling_rate = ord(pair_info[2:3])
|
||||
else:
|
||||
self._polling_rate = 0
|
||||
return self._polling_rate
|
||||
|
@ -240,7 +203,7 @@ class PairedDevice(object):
|
|||
@property
|
||||
def keys(self):
|
||||
if self._keys is None:
|
||||
if self.protocol >= 2.0 and self.online:
|
||||
if self.online and self.protocol >= 2.0:
|
||||
self._keys = _hidpp20.get_keys(self) or ()
|
||||
return self._keys
|
||||
|
||||
|
@ -261,8 +224,7 @@ class PairedDevice(object):
|
|||
else:
|
||||
self._settings = []
|
||||
|
||||
if self.online and self.features:
|
||||
_check_feature_settings(self, self._settings)
|
||||
_check_feature_settings(self, self._settings)
|
||||
return self._settings
|
||||
|
||||
def enable_notifications(self, enable=True):
|
||||
|
@ -301,6 +263,8 @@ class PairedDevice(object):
|
|||
"""Checks if the device is online, returns True of False"""
|
||||
protocol = _base.ping(self.receiver.handle, self.number)
|
||||
self.online = protocol is not None
|
||||
if protocol is not None:
|
||||
self._protocol = protocol
|
||||
return self.online
|
||||
|
||||
def __index__(self):
|
||||
|
@ -349,14 +313,10 @@ class Receiver(object):
|
|||
self.serial = _strhex(serial_reply[1:5])
|
||||
self.max_devices = ord(serial_reply[6:7])
|
||||
|
||||
if self.max_devices == 1:
|
||||
self.name = 'Nano Receiver'
|
||||
old_equad_reply = self.read_register(0x2B5, 0x04)
|
||||
self.unifying_supported = old_equad_reply is None
|
||||
_log.info("%s (%s) uses protocol %s", self.name, self.path, 'eQuad' if old_equad_reply else 'eQuad DJ')
|
||||
elif self.max_devices == 6:
|
||||
if self.max_devices == 6:
|
||||
self.name = 'Unifying Receiver'
|
||||
self.unifying_supported = True
|
||||
elif self.max_devices < 6:
|
||||
self.name = 'Nano Receiver'
|
||||
else:
|
||||
raise Exception("unknown receiver type", self.max_devices)
|
||||
self._str = '<%s(%s,%s%s)>' % (self.name.replace(' ', ''), self.path, '' if type(self.handle) == int else 'T', self.handle)
|
||||
|
|
|
@ -124,6 +124,8 @@ del _SETTINGS_LIST
|
|||
|
||||
def check_feature_settings(device, already_known):
|
||||
"""Try to auto-detect device settings by the HID++ 2.0 features they have."""
|
||||
if device.features is None:
|
||||
return
|
||||
if device.protocol is not None and device.protocol < 2.0:
|
||||
return
|
||||
if not any(s.name == _FN_SWAP[0] for s in already_known) and _hidpp20.FEATURE.FN_INVERSION in device.features:
|
||||
|
|
|
@ -101,12 +101,11 @@ def _print_receiver(receiver, verbose=False):
|
|||
else:
|
||||
print (" Notifications: (none)")
|
||||
|
||||
if receiver.unifying_supported:
|
||||
activity = receiver.read_register(0x2B3)
|
||||
if activity:
|
||||
activity = [(d, ord(activity[d - 1:d])) for d in range(1, receiver.max_devices)]
|
||||
activity_text = ', '.join(('%d=%d' % (d, a)) for d, a in activity if a > 0)
|
||||
print (" Device activity counters:", activity_text or '(empty)')
|
||||
activity = receiver.read_register(0x2B3)
|
||||
if activity:
|
||||
activity = [(d, ord(activity[d - 1:d])) for d in range(1, receiver.max_devices)]
|
||||
activity_text = ', '.join(('%d=%d' % (d, a)) for d, a in activity if a > 0)
|
||||
print (" Device activity counters:", activity_text or '(empty)')
|
||||
|
||||
|
||||
def _print_device(dev, verbose=False):
|
||||
|
|
|
@ -606,7 +606,7 @@ def _update_device_panel(device, panel, buttons, full=False):
|
|||
panel._lux.set_visible(False)
|
||||
|
||||
buttons._pair.set_visible(False)
|
||||
buttons._unpair.set_sensitive(device.receiver.unifying_supported)
|
||||
buttons._unpair.set_sensitive(device.receiver.max_devices >= 6)
|
||||
buttons._unpair.set_visible(True)
|
||||
|
||||
panel.set_visible(True)
|
||||
|
|
Loading…
Reference in New Issue