added read/write_register to receiver and devices

This commit is contained in:
Daniel Pavel 2013-06-21 14:52:16 +02:00
parent 1b68a3d5a9
commit 431f1c97cf
4 changed files with 53 additions and 29 deletions

View File

@ -85,11 +85,23 @@ PAIRING_ERRORS = _NamedInts(
# functions
#
def read_register(device, register_number, *params):
# support long registers by adding a 2 in front of the number
request_id = 0x8100 | (int(register_number) & 0x2FF)
return device.request(request_id, *params)
def write_register(device, register_number, *value):
# support long registers by adding a 2 in front of the number
request_id = 0x8000 | (int(register_number) & 0x2FF)
return device.request(request_id, *value)
def get_register(device, name, default_number=-1):
known_register = device.registers.get(name)
register = known_register or default_number
if register > 0:
reply = device.request(0x8100 + (register & 0xFF))
reply = read_register(device, register)
if reply:
return reply
@ -147,25 +159,28 @@ def get_serial(device):
dev_id = 0x30 + device.number - 1
receiver = device.receiver
serial = receiver.request(0x83B5, dev_id)
if serial:
serial = read_register(receiver, 0x2B5, dev_id)
if serial is not None:
return _strhex(serial[1:5])
def get_firmware(device):
firmware = [None, None]
reply = device.request(0x81F1, 0x01)
if reply:
fw_version = _strhex(reply[1:3])
fw_version = '%s.%s' % (fw_version[0:2], fw_version[2:4])
reply = device.request(0x81F1, 0x02)
if reply:
fw_version += '.B' + _strhex(reply[1:3])
fw = _FirmwareInfo(FIRMWARE_KIND.Firmware, '', fw_version, None)
firmware[0] = fw
reply = read_register(device, 0xF1, 0x01)
if not reply:
# won't be able to read any of it now...
return
reply = device.request(0x81F1, 0x04)
fw_version = _strhex(reply[1:3])
fw_version = '%s.%s' % (fw_version[0:2], fw_version[2:4])
reply = read_register(device, 0xF1, 0x02)
if reply:
fw_version += '.B' + _strhex(reply[1:3])
fw = _FirmwareInfo(FIRMWARE_KIND.Firmware, '', fw_version, None)
firmware[0] = fw
reply = read_register(device, 0xF1, 0x04)
if reply:
bl_version = _strhex(reply[1:3])
bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4])
@ -183,7 +198,7 @@ def get_notification_flags(device):
if p is None or p >= 2.0:
return
flags = device.request(0x8100)
flags = read_register(device, 0x00)
if flags is not None:
assert len(flags) == 3
return ord(flags[0:1]) << 16 | ord(flags[1:2]) << 8 | ord(flags[2:3])
@ -197,5 +212,5 @@ def set_notification_flags(device, *flag_bits):
return
flag_bits = sum(int(b) for b in flag_bits)
result = device.request(0x8000, 0xFF & (flag_bits >> 16), 0xFF & (flag_bits >> 8), 0xFF & flag_bits)
result = write_register(device, 0x00, 0xFF & (flag_bits >> 16), 0xFF & (flag_bits >> 8), 0xFF & flag_bits)
return result is not None

View File

@ -116,7 +116,8 @@ class PairedDevice(object):
@property
def power_switch_location(self):
if self._power_switch is None:
ps = self.receiver.request(0x83B5, 0x30 + self.number - 1)
assert self.receiver.unifying_supported
ps = self.receiver.read_register(0x2B5, 0x30 + self.number - 1)
if ps:
ps = ord(ps[9:10]) & 0x0F
self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps]
@ -126,7 +127,7 @@ class PairedDevice(object):
def codename(self):
if self._codename is None:
if self.receiver.unifying_supported:
codename = self.receiver.request(0x83B5, 0x40 + self.number - 1)
codename = self.receiver.read_register(0x2B5, 0x40 + self.number - 1)
if codename:
self._codename = codename[2:].rstrip(b'\x00').decode('utf-8')
# _log.debug("device %d codename %s", self.number, self._codename)
@ -148,7 +149,7 @@ class PairedDevice(object):
if self._kind is None:
# already handled in the constructor
# if self.receiver.unifying_supported:
# pair_info = self.receiver.request(0x83B5, 0x20 + self.number - 1)
# pair_info = self.receiver.read_register(0x2B5, 0x20 + self.number - 1)
# if pair_info:
# kind = ord(pair_info[7:8]) & 0x0F
# self._kind = _hidpp10.DEVICE_KIND[kind]
@ -236,6 +237,9 @@ class PairedDevice(object):
def request(self, request_id, *params):
return _base.request(self.receiver.handle, self.number, request_id, *params)
read_register = _hidpp10.read_register
write_register = _hidpp10.write_register
def feature_request(self, feature, function=0x00, *params):
if self.protocol >= 2.0:
return _hidpp20.feature_request(self, feature, function, *params)
@ -282,7 +286,7 @@ class Receiver(object):
# read the serial immediately, so we can find out max_devices
# this will tell us if it's a Unifying or Nano receiver
serial_reply = self.request(0x83B5, 0x03)
serial_reply = self.read_register(0x2B5, 0x03)
assert serial_reply
self.serial = _strhex(serial_reply[1:5])
self.max_devices = ord(serial_reply[6:7])
@ -295,7 +299,7 @@ class Receiver(object):
raise Exception("unknown receiver type")
self._str = '<%s(%s,%s%s)>' % (self.name.replace(' ', ''), self.path, '' if type(self.handle) == int else 'T', self.handle)
old_equad_reply = self.request(0x83B5, 0x04)
old_equad_reply = self.read_register(0x2B5, 0x04)
self.unifying_supported = old_equad_reply is None
_log.info("%s (%s) uses protocol %s", self.name, self.path, 'eQuad' if old_equad_reply else 'eQuad DJ')
@ -342,7 +346,7 @@ class Receiver(object):
def notify_devices(self):
"""Scan all devices."""
if self.handle:
if not self.request(0x8002, 0x02):
if not self.write_register(0x02, 0x02):
_log.warn("%s: failed to trigger device link notifications", self)
def register_new_device(self, number):
@ -364,22 +368,25 @@ class Receiver(object):
def set_lock(self, lock_closed=True, device=0, timeout=0):
if self.handle:
lock = 0x02 if lock_closed else 0x01
reply = self.request(0x80B2, lock, device, timeout)
reply = self.write_register(0xB2, lock, device, timeout)
if reply:
return True
_log.warn("%s: failed to %s the receiver lock", self, 'close' if lock_closed else 'open')
def count(self):
count = self.request(0x8102)
count = self.read_register(0x02)
return 0 if count is None else ord(count[1:2])
# def has_devices(self):
# return len(self) > 0 or self.count() > 0
def request(self, request_id, *params):
if self.handle:
if bool(self):
return _base.request(self.handle, 0xFF, request_id, *params)
read_register = _hidpp10.read_register
write_register = _hidpp10.write_register
def __iter__(self):
for number in range(1, 1 + self.max_devices):
if number in self._devices:
@ -390,7 +397,7 @@ class Receiver(object):
yield dev
def __getitem__(self, key):
if not self.handle:
if not bool(self):
return None
dev = self._devices.get(key)
@ -409,8 +416,10 @@ class Receiver(object):
raise IndexError(key)
dev = self._devices[key]
reply = self.request(0x80B2, 0x03, int(key))
reply = self.write_register(0xB2, 0x03, int(key))
if reply:
# invalidate the device
dev.wpid = None
del self._devices[key]
_log.warn("%s unpaired device %s", self, dev)
else:

View File

@ -84,10 +84,10 @@ class _RegisterRW(object):
self.register = register
def read(self, device):
return device.request(0x8100 | (self.register & 0x2FF))
return device.read_register(self.register)
def write(self, device, data_bytes):
return device.request(0x8000 | (self.register & 0x2FF), data_bytes)
return device.write_register(self.register, data_bytes)
class _FeatureRW(object):

View File

@ -101,7 +101,7 @@ def _print_receiver(receiver, verbose=False):
print (" Notifications: (none).")
if receiver.unifying_supported:
activity = receiver.request(0x83B3)
activity = receiver.read_register(0x2B3)
if activity:
activity = [(d, ord(activity[d - 1:d])) for d in range(1, receiver.max_devices)]
print (" Device activity counters:", ', '.join(('%d=%d' % (d, a)) for d, a in activity if a > 0))