From 84540fb087bc0705737d22000bdaacb088f32bb2 Mon Sep 17 00:00:00 2001 From: Daniel Pavel Date: Thu, 29 Nov 2012 04:10:16 +0200 Subject: [PATCH] re-wrote most of the app, based on latest HID++ docs from Logitech --- app/listener.py | 138 +++++ app/pairing.py | 83 --- app/receiver.py | 365 ------------ app/solaar.py | 133 ++--- app/ui/action.py | 22 +- app/ui/main_window.py | 119 ++-- app/ui/notify.py | 12 +- app/ui/pair_window.py | 210 ++++--- app/ui/status_icon.py | 39 +- bin/solaar | 2 +- lib/hidapi/udev.py | 1 + lib/logitech/devices/__init__.py | 107 ---- lib/logitech/devices/constants.py | 58 -- lib/logitech/devices/k750.py | 55 -- lib/logitech/scanner.py | 73 +-- lib/logitech/unifying_receiver/__init__.py | 30 +- lib/logitech/unifying_receiver/api.py | 566 ------------------- lib/logitech/unifying_receiver/base.py | 325 ++++++----- lib/logitech/unifying_receiver/common.py | 86 ++- lib/logitech/unifying_receiver/constants.py | 109 ---- lib/logitech/unifying_receiver/devices.py | 30 + lib/logitech/unifying_receiver/exceptions.py | 36 -- lib/logitech/unifying_receiver/hidpp10.py | 66 +++ lib/logitech/unifying_receiver/hidpp20.py | 388 +++++++++++++ lib/logitech/unifying_receiver/listener.py | 148 +++-- lib/logitech/unifying_receiver/receiver.py | 329 +++++++++++ lib/logitech/unifying_receiver/status.py | 231 ++++++++ 27 files changed, 1905 insertions(+), 1856 deletions(-) create mode 100644 app/listener.py delete mode 100644 app/pairing.py delete mode 100644 app/receiver.py delete mode 100644 lib/logitech/devices/__init__.py delete mode 100644 lib/logitech/devices/constants.py delete mode 100644 lib/logitech/devices/k750.py delete mode 100644 lib/logitech/unifying_receiver/api.py delete mode 100644 lib/logitech/unifying_receiver/constants.py create mode 100644 lib/logitech/unifying_receiver/devices.py delete mode 100644 lib/logitech/unifying_receiver/exceptions.py create mode 100644 lib/logitech/unifying_receiver/hidpp10.py create mode 100644 lib/logitech/unifying_receiver/hidpp20.py create mode 100644 lib/logitech/unifying_receiver/receiver.py create mode 100644 lib/logitech/unifying_receiver/status.py diff --git a/app/listener.py b/app/listener.py new file mode 100644 index 00000000..b130223e --- /dev/null +++ b/app/listener.py @@ -0,0 +1,138 @@ +# +# +# + +from logging import getLogger, DEBUG as _DEBUG +_log = getLogger('listener') +del getLogger + +import logitech.unifying_receiver as _lur + +# +# +# + +class _DUMMY_RECEIVER(object): + __slots__ = ['name', 'max_devices', 'status'] + name = _lur.Receiver.name + max_devices = _lur.Receiver.max_devices + status = 'Receiver not found' + __bool__ = __nonzero__ = lambda self: False + __str__ = lambda self: 'DUMMY' +DUMMY = _DUMMY_RECEIVER() + +# +# +# + +_DEVICE_TIMEOUT = 3 * 60 # seconds +_DEVICE_STATUS_POLL = 60 # seconds + +# def fake_device(listener): +# dev = _lur.PairedDevice(listener.receiver, 6) +# dev._wpid = '1234' +# dev._kind = 'touchpad' +# dev._codename = 'T650' +# dev._name = 'Wireless Rechargeable Touchpad T650' +# dev._serial = '0123456789' +# dev._protocol = 2.0 +# dev.status = _lur.status.DeviceStatus(dev, listener._status_changed) +# return dev + +class ReceiverListener(_lur.listener.EventsListener): + """Keeps the status of a Unifying Receiver. + """ + def __init__(self, receiver, status_changed_callback=None): + super(ReceiverListener, self).__init__(receiver, self._events_handler) + self.tick_period = _DEVICE_STATUS_POLL + + self.status_changed_callback = status_changed_callback + + receiver.status = _lur.status.ReceiverStatus(receiver, self._status_changed) + _lur.Receiver.create_device = self.create_device + + def create_device(self, receiver, number): + dev = _lur.PairedDevice(receiver, number) + dev.status = _lur.status.DeviceStatus(dev, self._status_changed) + return dev + + def has_started(self): + # self._status_changed(self.receiver) + self.receiver.enable_notifications() + + for dev in self.receiver: + dev.codename, dev.kind, dev.name + # dev.status._changed(dev.protocol > 0) + + # fake = fake_device(self) + # self.receiver._devices[fake.number] = fake + # self._status_changed(fake, _lur.status.ALERT.LOW) + + self.receiver.notify_devices() + self._status_changed(self.receiver, _lur.status.ALERT.LOW) + + def has_stopped(self): + if self.receiver: + self.receiver.enable_notifications(False) + self.receiver.close() + + self.receiver = None + self._status_changed(DUMMY, _lur.status.ALERT.LOW) + + def tick(self, timestamp): + if _log.isEnabledFor(_DEBUG): + _log.debug("tick: polling status") + for dev in self.receiver: + if dev.status: + dev.serial, dev.firmware + if dev.status.get(_lur.status.BATTERY_LEVEL) is None: + battery = _lur.hidpp20.get_battery(dev) or _lur.hidpp10.get_battery(dev) + if battery: + dev.status[_lur.status.BATTERY_LEVEL], dev.status[_lur.status.BATTERY_STATUS] = battery + self._status_changed(dev) + elif len(dev.status) > 0 and timestamp - dev.status.updated > _DEVICE_TIMEOUT: + dev.status.clear() + self._status_changed(dev, _lur.status.ALERT.LOW) + + def _status_changed(self, device, alert=_lur.status.ALERT.NONE, reason=None): + assert device is not None + if _log.isEnabledFor(_DEBUG): + _log.debug("status_changed %s: %s (%X) %s", device, device.status, alert, reason or '') + if self.status_changed_callback: + if device is self.receiver: + self.status_changed_callback(self.receiver or DUMMY, None, alert, reason) + else: + self.status_changed_callback(self.receiver or DUMMY, device, alert, reason) + if device.status is None: + self.status_changed_callback(self.receiver, None) + + def _events_handler(self, event): + if event.devnumber == 0xFF: + if self.receiver.status is not None: + self.receiver.status.process_event(event) + + else: + assert event.devnumber > 0 and event.devnumber <= self.receiver.max_devices + known_device = event.devnumber in self.receiver + + dev = self.receiver[event.devnumber] + if dev: + if dev.status is not None and dev.status.process_event(event): + if self.receiver.status.lock_open and not known_device: + assert event.sub_id == 0x41 + self.receiver.pairing_result = dev + return + else: + _log.warn("received event %s for invalid device %d", event, event.devnumber) + + def __str__(self): + return '' % (self.receiver.path, self.receiver.status) + + @classmethod + def open(self, status_changed_callback=None): + receiver = _lur.Receiver.open() + if receiver: + receiver.handle = _lur.listener.ThreadedHandle(receiver.handle, receiver.path) + rl = ReceiverListener(receiver, status_changed_callback) + rl.start() + return rl diff --git a/app/pairing.py b/app/pairing.py deleted file mode 100644 index 2be37f37..00000000 --- a/app/pairing.py +++ /dev/null @@ -1,83 +0,0 @@ -# -# -# - -from logging import getLogger as _Logger -_l = _Logger('pairing') - -from logitech.unifying_receiver import base as _base - -state = None - -class State(object): - TICK = 400 - PAIR_TIMEOUT = 60 * 1000 / TICK - - def __init__(self, listener): - self.listener = listener - self.reset() - - def device(self, number): - return self.listener.devices.get(number) - - def reset(self): - self.success = None - self.detected_device = None - self._countdown = self.PAIR_TIMEOUT - - def countdown(self, assistant): - if self._countdown < 0 or not self.listener: - return False - - if self._countdown == self.PAIR_TIMEOUT: - self.start_scan() - self._countdown -= 1 - return True - - self._countdown -= 1 - if self._countdown > 0 and self.success is None: - return True - - self.stop_scan() - assistant.scan_complete(assistant, self.detected_device) - return False - - def start_scan(self): - self.reset() - self.listener.events_filter = self.filter_events - reply = _base.request(self.listener.handle, 0xFF, b'\x80\xB2', b'\x01') - _l.debug("start scan reply %s", repr(reply)) - - def stop_scan(self): - if self._countdown >= 0: - self._countdown = -1 - reply = _base.request(self.listener.handle, 0xFF, b'\x80\xB2', b'\x02') - _l.debug("stop scan reply %s", repr(reply)) - self.listener.events_filter = None - - def filter_events(self, event): - if event.devnumber == 0xFF: - if event.code == 0x10: - if event.data == b'\x4A\x01\x00\x00\x00': - _l.debug("receiver listening for device wakeup") - return True - if event.data == b'\x4A\x00\x01\x00\x00': - _l.debug("receiver gave up") - self.success = False - # self.success = True - # self.detected_device = self.listener.receiver.devices[1] - return True - return False - - if event.devnumber in self.listener.receiver.devices: - return False - - _l.debug("event for new device? %s", event) - if event.code == 0x10 and event.data[0:2] == b'\x41\x04': - self.detected_device = self.listener.make_device(event) - return True - - return True - - def unpair(self, device): - return self.listener.unpair_device(device) diff --git a/app/receiver.py b/app/receiver.py deleted file mode 100644 index 3743d691..00000000 --- a/app/receiver.py +++ /dev/null @@ -1,365 +0,0 @@ -# -# -# - -from logging import getLogger as _Logger -from struct import pack as _pack -from time import time as _timestamp - -from logitech.unifying_receiver import base as _base -from logitech.unifying_receiver import api as _api -from logitech.unifying_receiver.listener import EventsListener as _EventsListener -from logitech.unifying_receiver.common import FallbackDict as _FallbackDict -from logitech import devices as _devices -from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS) - -# -# -# - -class _FeaturesArray(object): - __slots__ = ('device', 'features', 'supported') - - def __init__(self, device): - assert device is not None - self.device = device - self.features = None - self.supported = True - - def __del__(self): - self.supported = False - self.device = None - - def _check(self): - # print ("%s check" % self.device) - if self.supported: - if self.features is not None: - return True - - if self.device.protocol < 2.0: - return False - - if self.device.status >= STATUS.CONNECTED: - handle = int(self.device.handle) - try: - index = _api.get_feature_index(handle, self.device.number, _api.FEATURE.FEATURE_SET) - except _api._FeatureNotSupported: - self.supported = False - else: - count = None if index is None else _base.request(handle, self.device.number, _pack('!BB', index, 0x00)) - if count is None: - self.supported = False - else: - count = ord(count[:1]) - self.features = [None] * (1 + count) - self.features[0] = _api.FEATURE.ROOT - self.features[index] = _api.FEATURE.FEATURE_SET - return True - - return False - - __bool__ = __nonzero__ = _check - - def __getitem__(self, index): - if not self._check(): - return None - - if index < 0 or index >= len(self.features): - raise IndexError - - if self.features[index] is None: - # print ("features getitem at %d" % index) - fs_index = self.features.index(_api.FEATURE.FEATURE_SET) - # technically fs_function is 0x10 for this call, but we add the index to differentiate possibly conflicting requests - fs_function = 0x10 | (index & 0x0F) - feature = _base.request(self.device.handle, self.device.number, _pack('!BB', fs_index, fs_function), _pack('!B', index)) - if feature is not None: - self.features[index] = feature[:2] - - return self.features[index] - - def __contains__(self, value): - if self._check(): - if value in self.features: - return True - - # print ("features contains %s" % repr(value)) - for index in range(0, len(self.features)): - f = self.features[index] or self.__getitem__(index) - assert f is not None - if f == value: - return True - # we know the features are ordered by value - if f > value: - break - - return False - - def index(self, value): - if self._check(): - if self.features is not None and value in self.features: - return self.features.index(value) - raise ValueError("%s not in list" % repr(value)) - - def __iter__(self): - if self._check(): - yield _api.FEATURE.ROOT - index = 1 - last_index = len(self.features) - while index < last_index: - yield self.__getitem__(index) - index += 1 - - def __len__(self): - return len(self.features) if self._check() else 0 - -# -# -# - -class DeviceInfo(_api.PairedDevice): - """A device attached to the receiver. - """ - def __init__(self, handle, number, status_changed_callback, status=STATUS.BOOTING): - super(DeviceInfo, self).__init__(handle, number) - self.LOG = _Logger("Device[%d]" % (number)) - - assert status_changed_callback - self.status_changed_callback = status_changed_callback - self._status = status - self.status_updated = _timestamp() - self.props = {} - - self._features = _FeaturesArray(self) - - def __del__(self): - super(ReceiverListener, self).__del__() - self._features.supported = False - self._features.device = None - - @property - def status(self): - return self._status - - @status.setter - def status(self, new_status): - if new_status < STATUS.CONNECTED: - for p in list(self.props): - if p != PROPS.BATTERY_LEVEL: - del self.props[p] - else: - self._features._check() - self.protocol, self.codename, self.name, self.kind - - self.status_updated = _timestamp() - old_status = self._status - if new_status != old_status and not (new_status == STATUS.CONNECTED and old_status > new_status): - self.LOG.debug("status %d => %d", old_status, new_status) - self._status = new_status - ui_flags = STATUS.UI_NOTIFY if new_status == STATUS.UNPAIRED else 0 - self.status_changed_callback(self, ui_flags) - - @property - def status_text(self): - if self._status < STATUS.CONNECTED: - return STATUS_NAME[self._status] - return STATUS_NAME[STATUS.CONNECTED] - - @property - def properties_text(self): - t = [] - if self.props.get(PROPS.BATTERY_LEVEL) is not None: - t.append('Battery: %d%%' % self.props[PROPS.BATTERY_LEVEL]) - if self.props.get(PROPS.BATTERY_STATUS) is not None: - t.append(self.props[PROPS.BATTERY_STATUS]) - if self.props.get(PROPS.LIGHT_LEVEL) is not None: - t.append('Light: %d lux' % self.props[PROPS.LIGHT_LEVEL]) - return ', '.join(t) - - def process_event(self, code, data): - if code == 0x10 and data[:1] == b'\x8F': - self.status = STATUS.UNAVAILABLE - return True - - if code == 0x11: - status = _devices.process_event(self, data) - if status: - if type(status) == int: - self.status = status - return True - - if type(status) == tuple: - new_status, new_props = status - ui_flags = new_props.pop(PROPS.UI_FLAGS, 0) - old_props = dict(self.props) - self.props.update(new_props) - self.status = new_status - if ui_flags or old_props != self.props: - self.status_changed_callback(self, ui_flags) - return True - - self.LOG.warn("don't know how to handle processed event status %s", status) - - return False - - def __str__(self): - return '' % (self.handle, self.number, self.codename or '?', self._status) - -# -# -# - -_RECEIVER_STATUS_NAME = _FallbackDict( - lambda x: - '1 device found' if x == STATUS.CONNECTED + 1 else - ('%d devices found' % x) if x > STATUS.CONNECTED else - '?', - { - STATUS.UNKNOWN: 'Initializing...', - STATUS.UNAVAILABLE: 'Receiver not found.', - STATUS.BOOTING: 'Scanning...', - STATUS.CONNECTED: 'No devices found.', - } - ) - -class ReceiverListener(_EventsListener): - """Keeps the status of a Unifying Receiver. - """ - def __init__(self, receiver, status_changed_callback=None): - super(ReceiverListener, self).__init__(receiver.handle, self._events_handler) - self.LOG = _Logger("Receiver[%s]" % receiver.path) - - self.receiver = receiver - self.events_filter = None - self.events_handler = None - self.status_changed_callback = status_changed_callback - - receiver.kind = receiver.name - receiver.devices = {} - receiver.status = STATUS.BOOTING - receiver.status_text = _RECEIVER_STATUS_NAME[STATUS.BOOTING] - - if _base.request(receiver.handle, 0xFF, b'\x80\x00', b'\x00\x01'): - self.LOG.info("initialized") - else: - self.LOG.warn("initialization failed") - - self.LOG.info("reports %d device(s) paired", len(receiver)) - - def __del__(self): - super(ReceiverListener, self).__del__() - self.receiver = None - - def trigger_device_events(self): - if _base.request(int(self._handle), 0xFF, b'\x80\x02', b'\x02'): - self.LOG.info("triggered device events") - return True - self.LOG.warn("failed to trigger device events") - - def change_status(self, new_status): - if new_status != self.receiver.status: - self.LOG.debug("status %d => %d", self.receiver.status, new_status) - self.receiver.status = new_status - self.receiver.status_text = _RECEIVER_STATUS_NAME[new_status] - self.status_changed(None, STATUS.UI_NOTIFY) - - def status_changed(self, device=None, ui_flags=0): - if self.status_changed_callback: - self.status_changed_callback(self.receiver, device, ui_flags) - - def _device_status_from(self, event): - state_code = ord(event.data[2:3]) & 0xC0 - state = STATUS.UNAVAILABLE if state_code == 0x40 else \ - STATUS.CONNECTED if state_code == 0x80 else \ - STATUS.CONNECTED if state_code == 0x00 else \ - None - if state is None: - self.LOG.warn("failed to identify status of device %d from 0x%02X: %s", event.devnumber, state_code, event) - return state - - def _events_handler(self, event): - if self.events_filter and self.events_filter(event): - return - - if event.code == 0x10 and event.data[0:2] == b'\x41\x04': - if event.devnumber in self.receiver.devices: - status = self._device_status_from(event) - if status is not None: - self.receiver.devices[event.devnumber].status = status - else: - self.make_device(event) - return - - if event.devnumber == 0xFF: - if event.code == 0xFF and event.data is None: - self.LOG.warn("disconnected") - self.receiver.devices = {} - self.change_status(STATUS.UNAVAILABLE) - self.receiver = None - return - elif event.devnumber in self.receiver.devices: - dev = self.receiver.devices[event.devnumber] - if dev.process_event(event.code, event.data): - return - - if self.events_handler and self.events_handler(event): - return - - # self.LOG.warn("don't know how to handle event %s", event) - - def make_device(self, event): - if event.devnumber < 1 or event.devnumber > self.receiver.max_devices: - self.LOG.warn("got event for invalid device number %d: %s", event.devnumber, event) - return None - - status = self._device_status_from(event) - if status is not None: - dev = DeviceInfo(self.handle, event.devnumber, self.status_changed, status) - self.LOG.info("new device %s", dev) - dev.status = status - self.status_changed(dev, STATUS.UI_NOTIFY) - self.receiver.devices[event.devnumber] = dev - self.change_status(STATUS.CONNECTED + len(self.receiver.devices)) - if status == STATUS.CONNECTED: - dev.serial, dev.firmware - return dev - - def unpair_device(self, device): - try: - del self.receiver[device.number] - except IndexError: - self.LOG.error("failed to unpair device %s", device) - return False - - del self.receiver.devices[device.number] - self.LOG.info("unpaired device %s", device) - self.change_status(STATUS.CONNECTED + len(self.receiver.devices)) - device.status = STATUS.UNPAIRED - return True - - def __str__(self): - return '' % (self.receiver.path, int(self.handle), self.receiver.status) - - @classmethod - def open(self, status_changed_callback=None): - receiver = _api.Receiver.open() - if receiver: - handle = receiver.handle - receiver.handle = _api.ThreadedHandle(handle, receiver.path) - rl = ReceiverListener(receiver, status_changed_callback) - rl.start() - return rl - -# -# -# - -class _DUMMY_RECEIVER(object): - __slots__ = ['name', 'max_devices', 'status', 'status_text', 'devices'] - name = kind = _api.Receiver.name - max_devices = _api.Receiver.max_devices - status = STATUS.UNAVAILABLE - status_text = _RECEIVER_STATUS_NAME[STATUS.UNAVAILABLE] - devices = {} - __bool__ = __nonzero__ = lambda self: False -DUMMY = _DUMMY_RECEIVER() diff --git a/app/solaar.py b/app/solaar.py index a9d55307..5ddd48fa 100644 --- a/app/solaar.py +++ b/app/solaar.py @@ -1,7 +1,7 @@ #!/usr/bin/env python -u NAME = 'Solaar' -VERSION = '0.7.4' +VERSION = '0.8' __author__ = "Daniel Pavel " __version__ = VERSION __license__ = "GPL" @@ -10,6 +10,14 @@ __license__ = "GPL" # # +def _require(module, os_package): + try: + __import__(module) + except ImportError: + import sys + sys.exit("%s: missing required package '%s'" % (NAME, os_package)) + + def _parse_arguments(): import argparse arg_parser = argparse.ArgumentParser(prog=NAME.lower()) @@ -44,31 +52,20 @@ def _parse_arguments(): return args -def _require(module, package): - try: - __import__(module) - except ImportError: - import sys - sys.exit("%s: missing required package '%s'" % (NAME, package)) - - -if __name__ == '__main__': - _require('pyudev', 'python-pyudev') - _require('gi.repository', 'python-gi') - _require('gi.repository.Gtk', 'gir1.2-gtk-3.0') - - args = _parse_arguments() - +def _run(args): import ui - # check if the notifications are available and enabled + # even if --no-notifications is given on the command line, still have to + # check they are available args.notifications &= args.systray - if ui.notify.available and ui.notify.init(NAME): + if ui.notify.init(NAME): ui.action.toggle_notifications.set_active(args.notifications) + if not args.notifications: + ui.notify.uninit() else: ui.action.toggle_notifications = None - from receiver import DUMMY + from listener import DUMMY window = ui.main_window.create(NAME, DUMMY.name, DUMMY.max_devices, args.systray) if args.systray: menu_actions = (ui.action.toggle_notifications, @@ -78,84 +75,62 @@ if __name__ == '__main__': icon = None window.present() - import pairing - from logitech.devices.constants import STATUS from gi.repository import Gtk, GObject - listener = None - notify_missing = True - # initializes the receiver listener - from receiver import ReceiverListener - def check_for_listener(retry=True): - def _check_still_scanning(listener): - if listener.receiver.status == STATUS.BOOTING: - listener.change_status(STATUS.CONNECTED) + def check_for_listener(notify=False): + # print ("check_for_listener %s" % notify) + global listener + listener = None + + from listener import ReceiverListener + try: + listener = ReceiverListener.open(status_changed) + except OSError: + ui.error(window, 'Permissions error', + 'Found a possible Unifying Receiver device,\n' + 'but did not have permission to open it.') - global listener, notify_missing if listener is None: - try: - listener = ReceiverListener.open(status_changed) - except OSError: - ui.error(window, 'Permissions error', - 'Found a possible Unifying Receiver device,\n' - 'but did not have permission to open it.') + if notify: + status_changed(DUMMY) + else: + return True - if listener is None: - pairing.state = None - if notify_missing: - status_changed(DUMMY, None, STATUS.UI_NOTIFY) - notify_missing = False - return retry - - # print ("opened receiver", listener, listener.receiver) - notify_missing = True - status_changed(listener.receiver, None, STATUS.UI_NOTIFY) - GObject.timeout_add(3 * 1000, _check_still_scanning, listener) - pairing.state = pairing.State(listener) - listener.trigger_device_events() + from logitech.unifying_receiver import status # callback delivering status events from the receiver/devices to the UI - def status_changed(receiver, device=None, ui_flags=0): - assert receiver is not None + def status_changed(receiver, device=None, alert=status.ALERT.NONE, reason=None): if window: GObject.idle_add(ui.main_window.update, window, receiver, device) if icon: - GObject.idle_add(ui.status_icon.update, icon, receiver) - if ui_flags & STATUS.UI_POPUP: + GObject.idle_add(ui.status_icon.update, icon, receiver, device) + if alert & status.ALERT.MED: GObject.idle_add(window.popup, icon) - if device is None: + if ui.notify.available: # always notify on receiver updates - ui_flags |= STATUS.UI_NOTIFY - if ui_flags & STATUS.UI_NOTIFY and ui.notify.available: - GObject.idle_add(ui.notify.show, device or receiver) + if device is None or alert & status.ALERT.LOW: + GObject.idle_add(ui.notify.show, device or receiver, reason) - global listener - if not listener: - GObject.timeout_add(5000, check_for_listener) - listener = None + if receiver is DUMMY: + GObject.timeout_add(3000, check_for_listener) - # clears all properties of devices that have been inactive for too long - _DEVICE_TIMEOUT = 3 * 60 # seconds - _DEVICE_STATUS_CHECK = 30 # seconds - from time import time as _timestamp - - def check_for_inactive_devices(): - if listener and listener.receiver: - for dev in listener.receiver.devices.values(): - if (dev.status < STATUS.CONNECTED and - dev.props and - _timestamp() - dev.status_updated > _DEVICE_TIMEOUT): - dev.props.clear() - status_changed(listener.receiver, dev) - return True - - GObject.timeout_add(50, check_for_listener, False) - GObject.timeout_add(_DEVICE_STATUS_CHECK * 1000, check_for_inactive_devices) + GObject.timeout_add(0, check_for_listener, True) Gtk.main() - if listener is not None: + if listener: listener.stop() + listener.join() ui.notify.uninit() + + +if __name__ == '__main__': + _require('pyudev', 'python-pyudev') + _require('gi.repository', 'python-gi') + _require('gi.repository.Gtk', 'gir1.2-gtk-3.0') + + args = _parse_arguments() + listener = None + _run(args) diff --git a/app/ui/action.py b/app/ui/action.py index 72c0b508..87d58e79 100644 --- a/app/ui/action.py +++ b/app/ui/action.py @@ -39,17 +39,25 @@ toggle_notifications = _toggle_action('notifications', 'Notifications', _toggle_ def _show_about_window(action): about = Gtk.AboutDialog() + about.set_icon_name(_NAME) about.set_program_name(_NAME) about.set_logo_icon_name(_NAME) about.set_version(_VERSION) + about.set_comments('Shows status of devices connected\nto a Logitech Unifying Receiver.') + about.set_license_type(Gtk.License.GPL_2_0) about.set_copyright('\xC2\xA9 2012 Daniel Pavel') + about.set_authors(('Daniel Pavel http://github.com/pwr',)) - # about.add_credit_section('Testing', 'Douglas Wagner') + try: + about.add_credit_section('Testing', ('Douglas Wagner',)) + except Exception as e: + print e + about.set_website('http://github.com/pwr/Solaar/wiki') about.set_website_label('Solaar Wiki') - about.set_comments('Shows status of devices connected\nto a Logitech Unifying Receiver.') + about.run() about.destroy() about = _action('help-about', 'About ' + _NAME, _show_about_window) @@ -60,16 +68,14 @@ quit = _action('exit', 'Quit', Gtk.main_quit) # # -import pairing - def _pair_device(action, frame): window = frame.get_toplevel() - pair_dialog = ui.pair_window.create(action, pairing.state) - + pair_dialog = ui.pair_window.create(action, frame._device) pair_dialog.set_transient_for(window) pair_dialog.set_modal(True) pair_dialog.set_type_hint(Gdk.WindowTypeHint.DIALOG) + pair_dialog.set_position(Gtk.WindowPosition.CENTER) pair_dialog.present() def pair(frame): @@ -89,7 +95,9 @@ def _unpair_device(action, frame): choice = qdialog.run() qdialog.destroy() if choice == Gtk.ResponseType.ACCEPT: - if not pairing.state.unpair(device): + try: + del device.receiver[device.number] + except: ui.error(window, 'Unpairing failed', 'Failed to unpair device\n%s .' % device.name) def unpair(frame): diff --git a/app/ui/main_window.py b/app/ui/main_window.py index c74c92b0..b9fb17b1 100644 --- a/app/ui/main_window.py +++ b/app/ui/main_window.py @@ -5,10 +5,10 @@ from gi.repository import (Gtk, Gdk, GObject) import ui -from logitech.devices.constants import (STATUS, PROPS) +from logitech.unifying_receiver import status as _status -_SMALL_DEVICE_ICON_SIZE = Gtk.IconSize.BUTTON +_RECEIVER_ICON_SIZE = Gtk.IconSize.BUTTON _DEVICE_ICON_SIZE = Gtk.IconSize.DIALOG _STATUS_ICON_SIZE = Gtk.IconSize.LARGE_TOOLBAR _PLACEHOLDER = '~' @@ -23,12 +23,17 @@ def _make_receiver_box(name): frame.set_name(name) icon_name = ui.get_icon(name, 'preferences-desktop-peripherals') - icon = Gtk.Image.new_from_icon_name(icon_name, _SMALL_DEVICE_ICON_SIZE) + icon = Gtk.Image.new_from_icon_name(icon_name, _RECEIVER_ICON_SIZE) + icon.set_padding(2, 2) label = Gtk.Label('Scanning...') label.set_name('label') label.set_alignment(0, 0.5) + pairing_icon = Gtk.Image.new_from_icon_name('network-wireless', Gtk.IconSize.MENU) + pairing_icon.set_name('pairing-icon') + pairing_icon.set_tooltip_text('The pairing lock is open.') + toolbar = Gtk.Toolbar() toolbar.set_name('toolbar') toolbar.set_style(Gtk.ToolbarStyle.ICONS) @@ -38,9 +43,10 @@ def _make_receiver_box(name): hbox = Gtk.HBox(homogeneous=False, spacing=8) hbox.pack_start(icon, False, False, 0) hbox.pack_start(label, True, True, 0) - hbox.pack_end(toolbar, False, False, 0) + hbox.pack_start(pairing_icon, False, False, 0) + hbox.pack_start(toolbar, False, False, 0) - info_label = Gtk.Label() + info_label = Gtk.Label('Querying ...') info_label.set_name('info-label') info_label.set_alignment(0, 0.5) info_label.set_padding(8, 2) @@ -62,7 +68,9 @@ def _make_receiver_box(name): frame.add(vbox) frame.show_all() + info_box.set_visible(False) + pairing_icon.set_visible(False) return frame @@ -93,11 +101,16 @@ def _make_device_box(index): light_label.set_alignment(0, 0.5) light_label.set_width_chars(8) + not_encrypted_icon = Gtk.Image.new_from_icon_name('security-low', _STATUS_ICON_SIZE) + not_encrypted_icon.set_name('not-encrypted') + not_encrypted_icon.set_tooltip_text('The link is not encrypted!') + toolbar = Gtk.Toolbar() toolbar.set_name('toolbar') toolbar.set_style(Gtk.ToolbarStyle.ICONS) toolbar.set_icon_size(Gtk.IconSize.MENU) toolbar.set_show_arrow(False) + toolbar.set_border_width(0) status_box = Gtk.HBox(homogeneous=False, spacing=0) status_box.set_name('status') @@ -106,13 +119,14 @@ def _make_device_box(index): status_box.pack_start(light_icon, False, True, 0) status_box.pack_start(light_label, False, True, 0) status_box.pack_end(toolbar, False, False, 0) + status_box.pack_end(not_encrypted_icon, False, False, 0) - info_label = Gtk.Label() + info_label = Gtk.Label('Querying ...') info_label.set_name('info-label') info_label.set_alignment(0, 0.5) info_label.set_padding(8, 2) info_label.set_selectable(True) - info_label.fields = {} + info_label._fields = {} info_box = Gtk.Frame() info_box.add(info_label) @@ -197,103 +211,112 @@ def create(title, name, max_devices, systray=False): def _update_device_info_label(label, dev): need_update = False - if 'serial' in label.fields: - serial = label.fields['serial'] + if 'wpid' in label._fields: + wpid = label._fields['wpid'] else: - serial = label.fields['serial'] = dev.serial + wpid = label._fields['wpid'] = dev.wpid need_update = True - if 'firmware' in label.fields: - firmware = label.fields['firmware'] + if 'serial' in label._fields: + serial = label._fields['serial'] else: - if dev.status >= STATUS.CONNECTED: - firmware = label.fields['firmware'] = dev.firmware + serial = label._fields['serial'] = dev.serial + need_update = True + + if 'firmware' in label._fields: + firmware = label._fields['firmware'] + else: + if dev.status: + firmware = label._fields['firmware'] = dev.firmware need_update = True else: firmware = None - if 'hid' in label.fields: - hid = label.fields['hid'] + if 'hid' in label._fields: + hid = label._fields['hid'] else: - if dev.status >= STATUS.CONNECTED: - hid = label.fields['hid'] = dev.protocol + if dev.status: + hid = label._fields['hid'] = 'HID++ %1.1f' % dev.protocol need_update = True else: hid = None if need_update: - items = [('Serial', serial)] + items = [('Wireless PID', wpid), ('Serial', serial)] + if hid: + items += [('Protocol', hid)] if firmware: items += [(f.kind, f.name + ' ' + f.version) for f in firmware] - if hid: - items += [('HID', hid)] - label.set_markup('%s' % '\n'.join('%-10s: %s' % (item[0], str(item[1])) for item in items)) + label.set_markup('%s' % '\n'.join('%-12s: %s' % (item[0], str(item[1])) for item in items)) def _update_receiver_info_label(label, dev): - if label.get_visible() and label.get_text() == '': + if label.get_visible() and '\n' not in label.get_text(): items = [('Serial', dev.serial)] + \ [(f.kind, f.version) for f in dev.firmware] label.set_markup('%s' % '\n'.join('%-10s: %s' % (item[0], str(item[1])) for item in items)) + def _toggle_info_box(action, label_widget, box_widget, frame, update_function): if action.get_active(): box_widget.set_visible(True) - update_function(label_widget, frame._device) + GObject.timeout_add(50, update_function, label_widget, frame._device) else: box_widget.set_visible(False) def _update_receiver_box(frame, receiver): - label, toolbar, info_label = ui.find_children(frame, 'label', 'toolbar', 'info-label') + label, pairing_icon, toolbar, info_label = ui.find_children(frame, 'label', 'pairing-icon', 'toolbar', 'info-label') - label.set_text(receiver.status_text or '') - if receiver.status < STATUS.CONNECTED: - toolbar.set_sensitive(False) + if receiver.status is None: + frame._device = None + label.set_text('No receiver found.') + pairing_icon.set_visible(False) + toolbar.set_visible(False) toolbar.get_children()[0].set_active(False) info_label.set_text('') - frame._device = None else: - toolbar.set_sensitive(True) frame._device = receiver + label.set_text(str(receiver.status)) + pairing_icon.set_visible(receiver.status.lock_open) + toolbar.set_visible(True) def _update_device_box(frame, dev): - frame._device = dev # print (dev.name, dev.kind) icon, label, info_label = ui.find_children(frame, 'icon', 'label', 'info-label') first_run = frame.get_name() != dev.name if first_run: + frame._device = dev frame.set_name(dev.name) icon_name = ui.get_icon(dev.name, dev.kind) icon.set_from_icon_name(icon_name, _DEVICE_ICON_SIZE) label.set_markup('' + dev.name + '') - status = ui.find_children(frame, 'status') - status_icons = status.get_children() + status_icons = ui.find_children(frame, 'status').get_children() + battery_icon, battery_label, light_icon, light_label, not_encrypted_icon = status_icons[0:5] - if dev.status < STATUS.CONNECTED: + battery_level = dev.status.get(_status.BATTERY_LEVEL) + + if not dev.status: label.set_sensitive(False) - for c in status_icons[2:-1]: - c.set_visible(False) - battery_icon, battery_label = status_icons[0:2] battery_icon.set_sensitive(False) battery_label.set_sensitive(False) - battery_level = dev.props.get(PROPS.BATTERY_LEVEL) if battery_level is None: - battery_label.set_markup('%s' % dev.status_text) + battery_label.set_markup('inactive') else: battery_label.set_markup('%d%%' % battery_level) + for c in status_icons[2:-1]: + c.set_visible(False) + else: label.set_sensitive(True) - battery_icon, battery_label = status_icons[0:2] - battery_level = dev.props.get(PROPS.BATTERY_LEVEL) if battery_level is None: battery_icon.set_sensitive(False) battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE) @@ -306,11 +329,10 @@ def _update_device_box(frame, dev): battery_label.set_text('%d%%' % battery_level) battery_label.set_sensitive(True) - battery_status = dev.props.get(PROPS.BATTERY_STATUS) + battery_status = dev.status.get(_status.BATTERY_STATUS) battery_icon.set_tooltip_text(battery_status or '') - light_icon, light_label = status_icons[2:4] - light_level = dev.props.get(PROPS.LIGHT_LEVEL) + light_level = dev.status.get(_status.LIGHT_LEVEL) if light_level is None: light_icon.set_visible(False) light_label.set_visible(False) @@ -321,13 +343,16 @@ def _update_device_box(frame, dev): light_label.set_text('%d lux' % light_level) light_label.set_visible(True) + not_encrypted_icon.set_visible(dev.status.get(_status.ENCRYPTED) == False) + if first_run: frame.set_visible(True) - GObject.timeout_add(2000, _update_device_info_label, info_label, dev) + GObject.timeout_add(5000, _update_device_info_label, info_label, dev) def update(window, receiver, device=None): # print ("update", receiver, receiver.status, device) + assert receiver is not None window.set_icon_name(ui.appicon(receiver.status)) vbox = window.get_child() @@ -335,14 +360,14 @@ def update(window, receiver, device=None): if device is None: _update_receiver_box(frames[0], receiver) - if receiver.status < STATUS.CONNECTED: + if not receiver.status: for frame in frames[1:]: frame.set_visible(False) frame.set_name(_PLACEHOLDER) frame._device = None else: frame = frames[device.number] - if device.status == STATUS.UNPAIRED: + if device.status is None: frame.set_visible(False) frame.set_name(_PLACEHOLDER) frame._device = None diff --git a/app/ui/notify.py b/app/ui/notify.py index b6a479a3..31064fe0 100644 --- a/app/ui/notify.py +++ b/app/ui/notify.py @@ -9,7 +9,6 @@ try: from gi.repository import Notify import ui - from logitech.devices.constants import STATUS # necessary because the notifications daemon does not know about our XDG_DATA_DIRS _icons = {} @@ -47,7 +46,7 @@ try: Notify.uninit() - def show(dev): + def show(dev, reason=None): """Show a notification with title and text.""" if available and Notify.is_initted(): summary = dev.name @@ -57,8 +56,10 @@ try: if n is None: n = _notifications[summary] = Notify.Notification() - n.update(summary, dev.status_text, _icon(summary) or dev.kind) - urgency = Notify.Urgency.LOW if dev.status > STATUS.CONNECTED else Notify.Urgency.NORMAL + message = reason or ('unpaired' if dev.status is None else + (str(dev.status) or ('connected' if dev.status else 'inactive'))) + n.update(summary, message, _icon(summary) or dev.kind) + urgency = Notify.Urgency.LOW if dev.status else Notify.Urgency.NORMAL n.set_urgency(urgency) try: @@ -68,8 +69,7 @@ try: logging.exception("showing %s", n) except ImportError: - logging.warn("desktop notifications disabled") available = False init = lambda app_title: False uninit = lambda: None - show = lambda dev: None + show = lambda dev, reason: None diff --git a/app/ui/pair_window.py b/app/ui/pair_window.py index 03758755..af62003c 100644 --- a/app/ui/pair_window.py +++ b/app/ui/pair_window.py @@ -2,133 +2,185 @@ # # -# import logging +import logging from gi.repository import (Gtk, GObject) import ui -def _create_page(assistant, text, kind, icon_name=None): - p = Gtk.VBox(False, 12) - p.set_border_width(8) +_PAIRING_TIMEOUT = 30 - if text: - item = Gtk.HBox(homogeneous=False, spacing=16) + +def _create_page(assistant, kind, header=None, icon_name=None, text=None): + p = Gtk.VBox(False, 8) + assistant.append_page(p) + assistant.set_page_type(p, kind) + + if header: + item = Gtk.HBox(False, 16) p.pack_start(item, False, True, 0) - label = Gtk.Label(text) + label = Gtk.Label(header) label.set_alignment(0, 0) + label.set_line_wrap(True) item.pack_start(label, True, True, 0) if icon_name: icon = Gtk.Image.new_from_icon_name(icon_name, Gtk.IconSize.DIALOG) + icon.set_alignment(1, 0) item.pack_start(icon, False, False, 0) - assistant.append_page(p) - assistant.set_page_type(p, kind) + if text: + label = Gtk.Label(text) + label.set_alignment(0, 0) + label.set_line_wrap(True) + p.pack_start(label, False, False, 0) p.show_all() return p -def _device_confirmed(entry, _2, trigger, assistant, page): - assistant.commit() - assistant.set_page_complete(page, True) - return True +def _fake_device(receiver): + from logitech.unifying_receiver import PairedDevice + dev = PairedDevice(receiver, 6) + dev._kind = 'touchpad' + dev._codename = 'T650' + dev._name = 'Wireless Rechargeable Touchpad T650' + dev._serial = '0123456789' + dev._protocol = 2.0 + dev.status = {'encrypted': False} + return dev -def _finish(assistant): - # logging.debug("finish %s", assistant) - assistant.destroy() +def _check_lock_state(assistant, receiver): + if not assistant.is_drawable(): + return False -def _cancel(assistant, state): - # logging.debug("cancel %s", assistant) - state.stop_scan() - _finish(assistant) + if receiver.pairing_result: + receiver.pairing_result = _fake_device(receiver) + if type(receiver.pairing_result) == str: + _pairing_failed(assistant, receiver, receiver.pairing_result) + else: + assert hasattr(receiver.pairing_result, 'number') + _pairing_succeeded(assistant, receiver, receiver.pairing_result) + return False -def _prepare(assistant, page, state): + return receiver.status.lock_open + + +def _prepare(assistant, page, receiver): index = assistant.get_current_page() # logging.debug("prepare %s %d %s", assistant, index, page) if index == 0: - state.reset() - GObject.timeout_add(state.TICK, state.countdown, assistant) - spinner = page.get_children()[-1] - spinner.start() - return - - assistant.remove_page(0) - state.stop_scan() - - -def _scan_complete_ui(assistant, device): - if device is None: - page = _create_page(assistant, - 'No new device detected.\n' - '\n' - 'Make sure your device is within the\nreceiver\'s range, and it has\na decent battery charge.\n', - Gtk.AssistantPageType.CONFIRM, - 'dialog-error') + receiver.pairing_result = None + if receiver.set_lock(False, timeout=_PAIRING_TIMEOUT): + spinner = page.get_children()[-1] + spinner.start() + GObject.timeout_add(200, _check_lock_state, assistant, receiver) + assistant.set_page_complete(page, True) + else: + GObject.idle_add(_pairing_failed, assistant, receiver, 'the pairing lock did not open') else: - page = _create_page(assistant, - None, - Gtk.AssistantPageType.CONFIRM) + assistant.remove_page(0) - hbox = Gtk.HBox(False, 16) - device_icon = Gtk.Image() - device_icon.set_from_icon_name(ui.get_icon(device.name, device.kind), Gtk.IconSize.DIALOG) - hbox.pack_start(device_icon, False, False, 0) - device_label = Gtk.Label(device.kind + '\n' + device.name) - hbox.pack_start(device_label, False, False, 0) - halign = Gtk.Alignment.new(0.5, 0.5, 0, 1) - halign.add(hbox) - page.pack_start(halign, False, True, 0) - hbox = Gtk.HBox(False, 16) - hbox.pack_start(Gtk.Entry(), False, False, 0) - hbox.pack_start(Gtk.ToggleButton('Test'), False, False, 0) - halign = Gtk.Alignment.new(0.5, 0.5, 0, 1) +def _finish(assistant, receiver): + logging.debug("finish %s", assistant) + assistant.destroy() + if receiver.status.lock_open: + receiver.set_lock() + + +def _cancel(assistant, receiver): + logging.debug("cancel %s", assistant) + assistant.destroy() + device, receiver.pairing_result = receiver.pairing_result, None + if device: + assert type(device) != str + try: + del receiver[device.number] + except: + logging.error("failed to unpair %s", device) + if receiver.status.lock_open: + receiver.set_lock() + + +def _pairing_failed(assistant, receiver, error): + receiver.pairing_result = None + assistant.commit() + + header = 'Pairing failed: %s.' % error + if 'timeout' in error: + text = 'Make sure your device is within range,\nand it has a decent battery charge.' + else: + text = None + _create_page(assistant, Gtk.AssistantPageType.SUMMARY, header, 'dialog-error', text) + + assistant.next_page() + assistant.commit() + + +def _pairing_succeeded(assistant, receiver, device): + page = _create_page(assistant, Gtk.AssistantPageType.CONFIRM) + + device_icon = Gtk.Image() + device_icon.set_from_icon_name(ui.get_icon(device.name, device.kind), Gtk.IconSize.DIALOG) + device_icon.set_pixel_size(128) + device_icon.set_alignment(0.5, 1) + page.pack_start(device_icon, False, False, 0) + + device_label = Gtk.Label() + device_label.set_markup('' + device.name + '') + device_label.set_alignment(0.5, 0) + page.pack_start(device_label, False, False, 0) + + if device.status.get('encrypted') == False: + hbox = Gtk.HBox(False, 8) + hbox.pack_start(Gtk.Image.new_from_icon_name('dialog-warning', Gtk.IconSize.MENU), False, False, 0) + hbox.pack_start(Gtk.Label('The wireless link is not encrypted!'), False, False, 0) + halign = Gtk.Alignment.new(0.5, 0, 0, 0) halign.add(hbox) page.pack_start(halign, False, False, 0) - entry_info = Gtk.Label('Use the controls above to confirm\n' - 'this is the device you want to pair.') - entry_info.set_sensitive(False) - page.pack_start(entry_info, False, False, 0) + # hbox = Gtk.HBox(False, 8) + # hbox.pack_start(Gtk.Entry(), False, False, 0) + # hbox.pack_start(Gtk.ToggleButton(' Test '), False, False, 0) + # halign = Gtk.Alignment.new(0.5, 1, 0, 0) + # halign.add(hbox) + # page.pack_start(halign, True, True, 0) - page.show_all() - assistant.set_page_complete(page, True) + # entry_info = Gtk.Label() + # entry_info.set_markup('Use the controls above to confirm\n' + # 'this is the device you want to pair.') + # entry_info.set_sensitive(False) + # entry_info.set_alignment(0.5, 0) + # page.pack_start(entry_info, True, True, 0) + + page.show_all() assistant.next_page() - -def _scan_complete(assistant, device): - GObject.idle_add(_scan_complete_ui, assistant, device) + assistant.set_page_complete(page, True) -def create(action, state): +def create(action, receiver): assistant = Gtk.Assistant() assistant.set_title(action.get_label()) assistant.set_icon_name(action.get_icon_name()) - assistant.set_size_request(440, 240) + assistant.set_size_request(420, 260) assistant.set_resizable(False) assistant.set_role('pair-device') - page_intro = _create_page(assistant, - 'Turn on the device you want to pair.\n' - '\n' - 'If the device is already turned on,\nturn if off and on again.', - Gtk.AssistantPageType.INTRO, - 'preferences-desktop-peripherals') + page_intro = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, + 'Turn on the device you want to pair.', 'preferences-desktop-peripherals', + 'If the device is already turned on,\nturn if off and on again.') spinner = Gtk.Spinner() spinner.set_visible(True) - page_intro.pack_end(spinner, True, True, 16) + page_intro.pack_end(spinner, True, True, 24) - assistant.scan_complete = _scan_complete - - assistant.connect('prepare', _prepare, state) - assistant.connect('cancel', _cancel, state) - assistant.connect('close', _finish) - assistant.connect('apply', _finish) + assistant.connect('prepare', _prepare, receiver) + assistant.connect('cancel', _cancel, receiver) + assistant.connect('close', _finish, receiver) return assistant diff --git a/app/ui/status_icon.py b/app/ui/status_icon.py index 814eeb17..35710d04 100644 --- a/app/ui/status_icon.py +++ b/app/ui/status_icon.py @@ -3,8 +3,9 @@ # from gi.repository import Gtk + import ui -from logitech.devices.constants import (STATUS, PROPS) +from logitech.unifying_receiver import status as _status def create(window, menu_actions=None): @@ -31,34 +32,34 @@ def create(window, menu_actions=None): return icon -def update(icon, receiver): +def update(icon, receiver, device=None): + # print "icon update", receiver, receiver._devices, device battery_level = None - lines = [ui.NAME + ': ' + receiver.status_text, ''] - - if receiver.status > STATUS.CONNECTED: - devlist = sorted(receiver.devices.values(), key=lambda x: x.number) - for dev in devlist: + lines = [ui.NAME + ': ' + str(receiver.status), ''] + if receiver and receiver._devices: + for dev in receiver: lines.append('' + dev.name + '') - p = dev.properties_text + assert dev.status is not None + p = str(dev.status) if p: - p = '\t' + p - if dev.status < STATUS.CONNECTED: - p += ' (' + dev.status_text + ')' - lines.append(p) - elif dev.status < STATUS.CONNECTED: - lines.append('\t(' + dev.status_text + ')') - elif dev.protocol < 2.0: - lines.append('\t' + 'no status') + if not dev.status: + p += ' (inactive)' else: - lines.append('\t' + 'waiting for status...') + if dev.status: + if dev.protocol < 2.0: + p = 'no status' + else: + p = 'waiting for status...' + else: + p = '(inactive)' + lines.append('\t' + p) lines.append('') if battery_level is None: - if PROPS.BATTERY_LEVEL in dev.props: - battery_level = dev.props[PROPS.BATTERY_LEVEL] + battery_level = dev.status.get(_status.BATTERY_LEVEL) icon.set_tooltip_markup('\n'.join(lines).rstrip('\n')) diff --git a/bin/solaar b/bin/solaar index 38ba67fb..597768ee 100755 --- a/bin/solaar +++ b/bin/solaar @@ -6,7 +6,7 @@ LIB=`readlink -f $(dirname "$Z")/../lib` SHARE=`readlink -f $(dirname "$Z")/../share` export PYTHONPATH=$APP:$LIB -export XDG_DATA_DIRS=$SHARE:$XDG_DATA_DIRS +export XDG_DATA_DIRS=${SHARE}_override:$SHARE:$XDG_DATA_DIRS PYTHON=`which python python2 python3 | head -n 1` exec $PYTHON -u -m solaar "$@" diff --git a/lib/hidapi/udev.py b/lib/hidapi/udev.py index dea436b7..600a190b 100644 --- a/lib/hidapi/udev.py +++ b/lib/hidapi/udev.py @@ -32,6 +32,7 @@ DeviceInfo = namedtuple('DeviceInfo', [ ]) del namedtuple + # # exposed API # docstrings mostly copied from hidapi.h diff --git a/lib/logitech/devices/__init__.py b/lib/logitech/devices/__init__.py deleted file mode 100644 index 5cc7d0ce..00000000 --- a/lib/logitech/devices/__init__.py +++ /dev/null @@ -1,107 +0,0 @@ -# -# -# - -import logging - -from .constants import (STATUS, PROPS) -from ..unifying_receiver.constants import (FEATURE, BATTERY_STATUS, BATTERY_OK) -from ..unifying_receiver import api as _api - -# -# -# - -_DEVICE_MODULES = {} - -def _module(device): - shortname = device.codename.lower().replace(' ', '_') - if shortname not in _DEVICE_MODULES: - try: - m = __import__(shortname, globals(), level=1) - _DEVICE_MODULES[shortname] = m - except: - # logging.exception(shortname) - _DEVICE_MODULES[shortname] = None - - return _DEVICE_MODULES[shortname] - -# -# -# - -def default_request_status(devinfo): - if FEATURE.BATTERY in devinfo.features: - reply = _api.get_device_battery_level(devinfo.handle, devinfo.number, features=devinfo.features) - if reply: - b_discharge, dischargeNext, b_status = reply - return STATUS.CONNECTED, { - PROPS.BATTERY_LEVEL: b_discharge, - PROPS.BATTERY_STATUS: b_status, - } - - reply = _api.ping(devinfo.handle, devinfo.number) - return STATUS.CONNECTED if reply else STATUS.UNAVAILABLE - - -def default_process_event(devinfo, data): - feature_index = ord(data[0:1]) - if feature_index >= len(devinfo.features): - # logging.warn("mistery event %s for %s", repr(data), devinfo) - return None - - feature = devinfo.features[feature_index] - feature_function = ord(data[1:2]) & 0xF0 - - if feature == FEATURE.BATTERY: - if feature_function == 0x00: - b_discharge = ord(data[2:3]) - b_status = ord(data[3:4]) - return STATUS.CONNECTED, { - PROPS.BATTERY_LEVEL: b_discharge, - PROPS.BATTERY_STATUS: BATTERY_STATUS[b_status], - PROPS.UI_FLAGS: 0 if BATTERY_OK(b_status) else STATUS.UI_NOTIFY, - } - # ? - elif feature == FEATURE.REPROGRAMMABLE_KEYS: - if feature_function == 0x00: - logging.debug('reprogrammable key: %s', repr(data)) - # TODO - pass - # ? - elif feature == FEATURE.WIRELESS: - if feature_function == 0x00: - logging.debug("wireless status: %s", repr(data)) - if data[2:5] == b'\x01\x01\x01': - return STATUS.CONNECTED, {PROPS.UI_FLAGS: STATUS.UI_NOTIFY} - # TODO - pass - # ? - - -def request_status(devinfo): - """Trigger a status request for a device. - - :param devinfo: the device info tuple. - :param listener: the EventsListener that will be used to send the request, - and which will receive the status events from the device. - """ - m = _module(devinfo) - if m and 'request_status' in m.__dict__: - return m.request_status(devinfo) - return default_request_status(devinfo) - - -def process_event(devinfo, data): - """Process an event received for a device. - - :param devinfo: the device info tuple. - :param data: the event data (event packet sans the first two bytes: reply code and device number) - """ - default_result = default_process_event(devinfo, data) - if default_result is not None: - return default_result - - m = _module(devinfo) - if m and 'process_event' in m.__dict__: - return m.process_event(devinfo, data) diff --git a/lib/logitech/devices/constants.py b/lib/logitech/devices/constants.py deleted file mode 100644 index e7725f1e..00000000 --- a/lib/logitech/devices/constants.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# -# - -STATUS = type('STATUS', (), - dict( - UI_NOTIFY=0x01, - UI_POPUP=0x02, - UNKNOWN=-0xFFFF, - UNPAIRED=-0x1000, - UNAVAILABLE=-1, - BOOTING=0, - CONNECTED=1, - )) - -STATUS_NAME = { - STATUS.UNKNOWN: '...', - STATUS.UNPAIRED: 'unpaired', - STATUS.UNAVAILABLE: 'inactive', - STATUS.BOOTING: 'initializing', - STATUS.CONNECTED: 'connected', - } - - -# device properties that may be reported -PROPS = type('PROPS', (), - dict( - BATTERY_LEVEL='battery_level', - BATTERY_STATUS='battery_status', - LIGHT_LEVEL='light_level', - UI_FLAGS='ui_flags', - )) - -# when the receiver reports a device that is not connected -# (and thus cannot be queried), guess the name and type -# based on this table -NAMES = { - 'M315': ('Wireless Mouse M315', 'mouse'), - 'M325': ('Wireless Mouse M325', 'mouse'), - 'M505': ('Wireless Mouse M505', 'mouse'), - 'M510': ('Wireless Mouse M510', 'mouse'), - 'M515': ('Couch Mouse M515', 'mouse'), - 'M525': ('Wireless Mouse M525', 'mouse'), - 'M570': ('Wireless Trackball M570', 'trackball'), - 'M600': ('Touch Mouse M600', 'mouse'), - 'M705': ('Marathon Mouse M705', 'mouse'), - 'K270': ('Wireless Keyboard K270', 'keyboard'), - 'K350': ('Wireless Keyboard K350', 'keyboard'), - 'K360': ('Wireless Keyboard K360', 'keyboard'), - 'K400': ('Wireless Touch Keyboard K400', 'keyboard'), - 'K750': ('Wireless Solar Keyboard K750', 'keyboard'), - 'K800': ('Wireless Illuminated Keyboard K800', 'keyboard'), - 'T400': ('Zone Touch Mouse T400', 'mouse'), - 'T650': ('Wireless Rechargeable Touchpad T650', 'touchpad'), - 'Cube': ('Logitech Cube', 'mouse'), - 'Anywhere MX': ('Anywhere Mouse MX', 'mouse'), - 'Performance MX': ('Performance Mouse MX', 'mouse'), - } diff --git a/lib/logitech/devices/k750.py b/lib/logitech/devices/k750.py deleted file mode 100644 index 81c139fa..00000000 --- a/lib/logitech/devices/k750.py +++ /dev/null @@ -1,55 +0,0 @@ -# -# Functions specific to the K750 solar keyboard. -# - -import logging -from struct import unpack as _unpack - -from .constants import (STATUS, PROPS) - -# -# -# - -_CHARGE_LEVELS = (10, 25, 256) -def _charge_status(data, hasLux=False): - charge, lux = _unpack('!BH', data[2:5]) - - for i in range(0, len(_CHARGE_LEVELS)): - if charge < _CHARGE_LEVELS[i]: - charge_index = i - break - - return 0x10 << charge_index, { - PROPS.BATTERY_LEVEL: charge, - PROPS.LIGHT_LEVEL: lux if hasLux else None, - } - - -def request_status(devinfo): - from ..unifying_receiver.constants import FEATURE - from ..unifying_receiver import api as _api - reply = _api.request(devinfo.handle, devinfo.number, - feature=FEATURE.SOLAR_CHARGE, function=b'\x06', params=b'\x78\x01', - features=devinfo.features) - if reply is None: - return STATUS.UNAVAILABLE - - -def process_event(devinfo, data): - if data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': - # usually sent after the keyboard is turned on or just connected - return _charge_status(data) - - if data[:2] == b'\x09\x10' and data[7:11] == b'GOOD': - # regular solar charge events - return _charge_status(data, True) - - if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD': - logging.debug("Solar key pressed") - if request_status(devinfo) == STATUS.UNAVAILABLE: - return STATUS.UNAVAILABLE, {PROPS.UI_FLAGS: STATUS.UI_POPUP | STATUS.UI_NOTIFY} - - code, props = _charge_status(data) - props[PROPS.UI_FLAGS] = STATUS.UI_POPUP - return code, props diff --git a/lib/logitech/scanner.py b/lib/logitech/scanner.py index dd11e707..3bf3f79b 100644 --- a/lib/logitech/scanner.py +++ b/lib/logitech/scanner.py @@ -7,54 +7,59 @@ def print_receiver(receiver): print (" Serial : %s" % receiver.serial) for f in receiver.firmware: print (" %-10s: %s" % (f.kind, f.version)) - print (" Reported %d paired device(s)" % len(receiver)) + notifications = receiver.request(0x8100) + if notifications: + notifications = ord(notifications[0:1]) << 16 | ord(notifications[1:2]) << 8 + if notifications: + print (" Enabled notifications: %s." % lur.hidpp10.NOTIFICATION_FLAG.flag_names(notifications)) + else: + print (" All notifications disabled.") + + print (" Reported %d paired device(s)." % len(receiver)) + activity = receiver.request(0x83B3) + if activity: + activity = [(d, ord(activity[d - 1])) for d in range(1, receiver.max_devices)] + print(" Device activity counters: %s" % ', '.join(('%d=%d' % (d, a)) for d, a in activity if a > 0)) def scan_devices(receiver): - for number in range(1, 1 + receiver.max_devices): - dev = receiver[number] - if dev is None: - dev = api.PairedDevice(receiver.handle, number) - if dev.codename is None: - continue - + for dev in receiver: print ("--------") print (str(dev)) print ("Codename : %s" % dev.codename) - print ("Name : %s" % dev.name) print ("Kind : %s" % dev.kind) + print ("Name : %s" % dev.name) + print ("Device number: %d" % dev.number) + print ("Wireless PID : %s" % dev.wpid) print ("Serial number: %s" % dev.serial) + print ("Power switch : on the %s" % dev.power_switch_location) - if not dev.protocol: + if not dev.ping(): print ("Device is not connected at this time, no further info available.") continue - print ("HID protocol : HID %01.1f" % dev.protocol) - if dev.protocol < 2.0: - print ("Features query not supported by this device") + print ("HID protocol : HID++ %01.1f" % dev.protocol) + if not dev.features: + print ("Features query not supported by this device.") continue - firmware = dev.firmware - for fw in firmware: + for fw in dev.firmware: print (" %-11s: %s %s" % (fw.kind, fw.name, fw.version)) - all_features = api.get_device_features(dev.handle, dev.number) - for index in range(0, len(all_features)): - feature = all_features[index] + print (" %d features:" % len(dev.features)) + for index, feature in enumerate(dev.features): + feature = dev.features[index] if feature: - print (" ~ Feature %-20s (%s) at index %02X" % (FEATURE_NAME[feature], api._hex(feature), index)) + flags = dev.request(0x0000, feature.bytes(2)) + flags = 0 if flags is None else ord(flags[1:2]) + flags = lur.hidpp20.FEATURE_FLAG.flag_names(flags) + print (" %2d: %-20s {%04X} %s" % (index, feature, feature, flags)) - if FEATURE.BATTERY in all_features: - discharge, dischargeNext, status = api.get_device_battery_level(dev.handle, dev.number, features=all_features) - print (" Battery %d charged (next level %d%), status %s" % (discharge, dischargeNext, status)) - - if FEATURE.REPROGRAMMABLE_KEYS in all_features: - keys = api.get_device_keys(dev.handle, dev.number, features=all_features) - if keys is not None and keys: - print (" %d reprogrammable keys found" % len(keys)) - for k in keys: - flags = ','.join(KEY_FLAG_NAME[f] for f in KEY_FLAG_NAME if k.flags & f) - print (" %2d: %-12s => %-12s : %s" % (k.index, KEY_NAME[k.id], KEY_NAME[k.task], flags)) + if dev.keys: + print (" %d reprogrammable keys:" % len(dev.keys)) + for k in dev.keys: + flags = lur.hidpp20.KEY_FLAG.flag_names(k.flags) + print (" %2d: %-20s => %-20s %s" % (k.index, lur.hidpp20.KEY[k.key], lur.hidpp20.KEY[k.task], flags)) if __name__ == '__main__': @@ -65,12 +70,12 @@ if __name__ == '__main__': args = arg_parser.parse_args() import logging - logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING) + log_format='%(asctime)s %(levelname)8s %(name)s: %(message)s' + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING, format=log_format) - from .unifying_receiver import api - from .unifying_receiver.constants import * + from . import unifying_receiver as lur - receiver = api.Receiver.open() + receiver = lur.Receiver.open() if receiver is None: print ("Logitech Unifying Receiver not found.") else: diff --git a/lib/logitech/unifying_receiver/__init__.py b/lib/logitech/unifying_receiver/__init__.py index 19dac410..15d52bf7 100644 --- a/lib/logitech/unifying_receiver/__init__.py +++ b/lib/logitech/unifying_receiver/__init__.py @@ -6,15 +6,6 @@ implementation. Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. -Strongly recommended to use these functions from a single thread; calling -multiple functions from different threads has a high chance of mixing the -replies and causing apparent failures. - -Basic order of operations is: - - open() to obtain a UR handle - - request() to make a feature call to one of the devices attached to the UR - - close() to close the UR handle - References: http://julien.danjou.info/blog/2012/logitech-k750-linux-support http://6xq.net/git/lars/lshidpp.git/plain/doc/ @@ -22,14 +13,21 @@ http://6xq.net/git/lars/lshidpp.git/plain/doc/ import logging -if logging.root.level > logging.DEBUG: - log = logging.getLogger('LUR') - log.addHandler(logging.NullHandler()) - log.propagate = 0 +_DEBUG = logging.DEBUG +_log = logging.getLogger('LUR') +_log.setLevel(logging.root.level) +# if logging.root.level > logging.DEBUG: +# _log.addHandler(logging.NullHandler()) +# _log.propagate = 0 del logging -from .constants import * -from .exceptions import * -from .api import * +from .common import strhex +from .base import NoReceiver, NoSuchDevice, DeviceUnreachable +from .receiver import Receiver, PairedDevice, MAX_PAIRED_DEVICES +from .hidpp20 import FeatureNotSupported, FeatureCallError +from .devices import DEVICES + +from . import listener +from . import status diff --git a/lib/logitech/unifying_receiver/api.py b/lib/logitech/unifying_receiver/api.py deleted file mode 100644 index 95730d15..00000000 --- a/lib/logitech/unifying_receiver/api.py +++ /dev/null @@ -1,566 +0,0 @@ -# -# Logitech Unifying Receiver API. -# - -from struct import pack as _pack -from struct import unpack as _unpack -import errno as _errno -from threading import local as _local - - -from . import base as _base -from .common import (FirmwareInfo as _FirmwareInfo, - ReprogrammableKeyInfo as _ReprogrammableKeyInfo) -from .constants import (FEATURE, FEATURE_NAME, FEATURE_FLAGS, - FIRMWARE_KIND, DEVICE_KIND, - BATTERY_STATUS, KEY_NAME, - MAX_ATTACHED_DEVICES) -from .exceptions import FeatureNotSupported as _FeatureNotSupported - - -_hex = _base._hex - -from logging import getLogger -_log = getLogger('LUR').getChild('api') -del getLogger - -# -# -# - -class ThreadedHandle(object): - __slots__ = ['path', '_local', '_handles'] - - def __init__(self, initial_handle, path): - assert initial_handle - if type(initial_handle) != int: - raise TypeError('expected int as initial handle, got %s' % repr(initial_handle)) - - assert path - self.path = path - self._local = _local() - self._local.handle = initial_handle - self._handles = [initial_handle] - - def _open(self): - handle = _base.open_path(self.path) - if handle is None: - _log.error("%s failed to open new handle", repr(self)) - else: - # _log.debug("%s opened new handle %d", repr(self), handle) - self._local.handle = handle - self._handles.append(handle) - return handle - - def close(self): - self._local = None - handles, self._handles = self._handles, [] - _log.debug("%s closing %s", repr(self), handles) - for h in handles: - _base.close(h) - - def __del__(self): - self.close() - - def __int__(self): - if self._local: - try: - return self._local.handle - except: - return self._open() - - def __str__(self): - return str(int(self)) - - def __repr__(self): - return '' % self.path - - def __bool__(self): - return bool(self._handles) - __nonzero__ = __bool__ - - -class PairedDevice(object): - def __init__(self, handle, number): - assert handle - self.handle = handle - assert number > 0 and number <= MAX_ATTACHED_DEVICES - self.number = number - - self._protocol = None - self._features = None - self._codename = None - self._name = None - self._kind = None - self._serial = None - self._firmware = None - - def __del__(self): - self.handle = None - - @property - def protocol(self): - if self._protocol is None: - self._protocol = _base.ping(self.handle, self.number) - # _log.debug("device %d protocol %s", self.number, self._protocol) - return self._protocol or 0 - - @property - def features(self): - if self._features is None: - if self.protocol >= 2.0: - self._features = [FEATURE.ROOT] - return self._features - - @property - def codename(self): - if self._codename is None: - codename = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x40 + self.number - 1) - if codename: - self._codename = codename[2:].rstrip(b'\x00').decode('ascii') - # _log.debug("device %d codename %s", self.number, self._codename) - return self._codename - - @property - def name(self): - if self._name is None: - if self.protocol < 2.0: - from ..devices.constants import NAMES as _DEVICE_NAMES - if self.codename in _DEVICE_NAMES: - self._name, self._kind = _DEVICE_NAMES[self._codename] - else: - self._name = get_device_name(self.handle, self.number, self.features) - return self._name or self.codename or '?' - - @property - def kind(self): - if self._kind is None: - if self.protocol < 2.0: - from ..devices.constants import NAMES as _DEVICE_NAMES - if self.codename in _DEVICE_NAMES: - self._name, self._kind = _DEVICE_NAMES[self._codename] - else: - self._kind = get_device_kind(self.handle, self.number, self.features) - return self._kind or '?' - - @property - def firmware(self): - if self._firmware is None and self.protocol >= 2.0: - self._firmware = get_device_firmware(self.handle, self.number, self.features) - # _log.debug("device %d firmware %s", self.number, self._firmware) - return self._firmware or () - - @property - def serial(self): - if self._serial is None: - prefix = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x20 + self.number - 1) - serial = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x30 + self.number - 1) - if prefix and serial: - self._serial = _base._hex(prefix[3:5]) + '-' + _base._hex(serial[1:5]) - # _log.debug("device %d serial %s", self.number, self._serial) - return self._serial or '?' - - def ping(self): - return _base.ping(self.handle, self.number) is not None - - def __str__(self): - return '' % (self.handle, self.number, self.codename or '?') - - -class Receiver(object): - name = 'Unifying Receiver' - max_devices = MAX_ATTACHED_DEVICES - - def __init__(self, handle, path=None): - assert handle - self.handle = handle - assert path - self.path = path - - self._serial = None - self._firmware = None - - def close(self): - handle, self.handle = self.handle, None - return (handle and _base.close(handle)) - - def __del__(self): - self.close() - - @property - def serial(self): - if self._serial is None and self.handle: - serial = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x03') - if serial: - self._serial = _hex(serial[1:5]) - return self._serial - - @property - def firmware(self): - if self._firmware is None and self.handle: - firmware = [] - - reply = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x02') - if reply and reply[0:1] == b'\x02': - fw_version = _hex(reply[1:5]) - fw_version = '%s.%s.B%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8]) - firmware.append(_FirmwareInfo(0, FIRMWARE_KIND[0], '', fw_version, None)) - - reply = _base.request(self.handle, 0xFF, b'\x81\xF1', b'\x04') - if reply and reply[0:1] == b'\x04': - bl_version = _hex(reply[1:3]) - bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4]) - firmware.append(_FirmwareInfo(1, FIRMWARE_KIND[1], '', bl_version, None)) - - self._firmware = tuple(firmware) - - return self._firmware - - def __iter__(self): - if not self.handle: - return - - for number in range(1, 1 + MAX_ATTACHED_DEVICES): - dev = get_device(self.handle, number) - if dev is not None: - yield dev - - def __getitem__(self, key): - if type(key) != int: - raise TypeError('key must be an integer') - if not self.handle or key < 0 or key > MAX_ATTACHED_DEVICES: - raise IndexError(key) - return get_device(self.handle, key) if key > 0 else None - - def __delitem__(self, key): - if type(key) != int: - raise TypeError('key must be an integer') - if not self.handle or key < 0 or key > MAX_ATTACHED_DEVICES: - raise IndexError(key) - if key > 0: - _log.debug("unpairing device %d", key) - reply = _base.request(self.handle, 0xFF, b'\x80\xB2', _pack('!BB', 0x03, key)) - if reply is None or reply[1:2] == b'\x8F': - raise IndexError(key) - - def __len__(self): - if not self.handle: - return 0 - # not really sure about this one... - count = _base.request(self.handle, 0xFF, b'\x81\x00') - return 0 if count is None else ord(count[1:2]) - - def __contains__(self, dev): - # print (self, "contains", dev) - if self.handle == 0: - return False - if type(dev) == int: - return dev > 0 and dev <= MAX_ATTACHED_DEVICES and _base.ping(self.handle, dev) is not None - return dev.ping() - - def __str__(self): - return '' % (self.handle, self.path) - - __bool__ = __nonzero__ = lambda self: self.handle != 0 - - @classmethod - def open(self): - """Opens the first Logitech Unifying Receiver found attached to the machine. - - :returns: An open file handle for the found receiver, or ``None``. - """ - exception = None - - for rawdevice in _base.list_receiver_devices(): - exception = None - try: - handle = _base.open_path(rawdevice.path) - if handle: - return Receiver(handle, rawdevice.path) - except OSError as e: - _log.exception("open %s", rawdevice.path) - if e.errno == _errno.EACCES: - exception = e - - if exception: - # only keep the last exception - raise exception - -# -# -# - -def request(handle, devnumber, feature, function=b'\x04', params=b'', features=None): - """Makes a feature call to the device, and returns the reply data. - - Basically a write() followed by (possibly multiple) reads, until a reply - matching the called feature is received. In theory the UR will always reply - to feature call; otherwise this function will wait indefinitely. - - Incoming data packets not matching the feature and function will be - delivered to the unhandled hook (if any), and ignored. - - :param function: the function to call on that feature, may be an byte value - or a bytes string of length 1. - :param params: optional bytes string to send as function parameters to the - feature; may also be an integer if the function only takes a single byte as - parameter. - - The optional ``features`` parameter is a cached result of the - get_device_features function for this device, necessary to find the feature - index. If the ``features_arrary`` is not provided, one will be obtained by - manually calling get_device_features before making the request call proper. - - :raises FeatureNotSupported: if the device does not support the feature. - """ - feature_index = None - if feature == FEATURE.ROOT: - feature_index = b'\x00' - else: - feature_index = _get_feature_index(handle, devnumber, feature, features) - if feature_index is None: - # i/o read error - return None - - feature_index = _pack('!B', feature_index) - - if type(function) == int: - function = _pack('!B', function) - if type(params) == int: - params = _pack('!B', params) - - return _base.request(handle, devnumber, feature_index + function, params) - - -def get_device(handle, devnumber, features=None): - """Gets the complete info for a device (type, features). - - :returns: a PairedDevice or ``None``. - """ - if _base.ping(handle, devnumber): - devinfo = PairedDevice(handle, devnumber) - # _log.debug("found device %s", devinfo) - return devinfo - - -def get_feature_index(handle, devnumber, feature): - """Reads the index of a device's feature. - - :returns: An int, or ``None`` if the feature is not available. - """ - # _log.debug("device %d get feature index <%s:%s>", devnumber, _hex(feature), FEATURE_NAME[feature]) - if len(feature) != 2: - raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature) - - # FEATURE.ROOT should always be available for any attached devices - reply = _base.request(handle, devnumber, FEATURE.ROOT, feature) - if reply: - feature_index = ord(reply[0:1]) - if feature_index: - feature_flags = ord(reply[1:2]) & 0xE0 - if feature_flags: - _log.debug("device %d feature <%s:%s> has index %d: %s", - devnumber, _hex(feature), FEATURE_NAME[feature], feature_index, - ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) - else: - _log.debug("device %d feature <%s:%s> has index %d", devnumber, _hex(feature), FEATURE_NAME[feature], feature_index) - - # only consider active and supported features? - # if feature_flags: - # raise E.FeatureNotSupported(devnumber, feature) - - return feature_index - - _log.warn("device %d feature <%s:%s> not supported by the device", devnumber, _hex(feature), FEATURE_NAME[feature]) - raise _FeatureNotSupported(devnumber, feature) - - -def _get_feature_index(handle, devnumber, feature, features=None): - if features is None: - return get_feature_index(handle, devnumber, feature) - - if feature in features: - return features.index(feature) - - index = get_feature_index(handle, devnumber, feature) - if index is not None: - try: - if len(features) <= index: - features += [None] * (index + 1 - len(features)) - features[index] = feature - except: - pass - # _log.debug("%s: found feature %s at %d", features, _base._hex(feature), index) - return index - - -def get_device_features(handle, devnumber): - """Returns an array of feature ids. - - Their position in the array is the index to be used when requesting that - feature on the device. - """ - # _log.debug("device %d get device features", devnumber) - - # get the index of the FEATURE_SET - # FEATURE.ROOT should always be available for all devices - fs_index = _base.request(handle, devnumber, FEATURE.ROOT, FEATURE.FEATURE_SET) - if fs_index is None: - _log.warn("device %d FEATURE_SET not available", devnumber) - return None - fs_index = fs_index[:1] - - # For debugging purposes, query all the available features on the device, - # even if unknown. - - # get the number of active features the device has - features_count = _base.request(handle, devnumber, fs_index + b'\x05') - if not features_count: - # this can happen if the device disappeard since the fs_index request - # otherwise we should get at least a count of 1 (the FEATURE_SET we've just used above) - _log.debug("device %d no features available?!", devnumber) - return None - - features_count = ord(features_count[:1]) - # _log.debug("device %d found %d features", devnumber, features_count) - - features = [None] * 0x20 - for index in range(1, 1 + features_count): - # for each index, get the feature residing at that index - feature = _base.request(handle, devnumber, fs_index + b'\x15', _pack('!B', index)) - if feature: - # feature_flags = ord(feature[2:3]) & 0xE0 - feature = feature[0:2].upper() - features[index] = feature - - # if feature_flags: - # _log.debug("device %d feature <%s:%s> at index %d: %s", - # devnumber, _hex(feature), FEATURE_NAME[feature], index, - # ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) - # else: - # _log.debug("device %d feature <%s:%s> at index %d", devnumber, _hex(feature), FEATURE_NAME[feature], index) - - features[0] = FEATURE.ROOT - while features[-1] is None: - del features[-1] - return tuple(features) - - -def get_device_firmware(handle, devnumber, features=None): - """Reads a device's firmware info. - - :returns: a list of FirmwareInfo tuples, ordered by firmware layer. - """ - fw_fi = _get_feature_index(handle, devnumber, FEATURE.FIRMWARE, features) - if fw_fi is None: - return None - - fw_count = _base.request(handle, devnumber, _pack('!BB', fw_fi, 0x05)) - if fw_count: - fw_count = ord(fw_count[:1]) - - fw = [] - for index in range(0, fw_count): - fw_info = _base.request(handle, devnumber, _pack('!BB', fw_fi, 0x15), params=index) - if fw_info: - level = ord(fw_info[:1]) & 0x0F - if level == 0 or level == 1: - kind = FIRMWARE_KIND[level] - name, = _unpack('!3s', fw_info[1:4]) - name = name.decode('ascii') - version = _hex(fw_info[4:6]) - version = '%s.%s' % (version[0:2], version[2:4]) - build, = _unpack('!H', fw_info[6:8]) - if build: - version += ' b%d' % build - extras = fw_info[9:].rstrip(b'\x00') or None - fw_info = _FirmwareInfo(level, kind, name, version, extras) - elif level == 2: - fw_info = _FirmwareInfo(2, FIRMWARE_KIND[2], '', ord(fw_info[1:2]), None) - else: - fw_info = _FirmwareInfo(level, FIRMWARE_KIND[-1], '', '', None) - - fw.append(fw_info) - # _log.debug("device %d firmware %s", devnumber, fw_info) - return tuple(fw) - - -def get_device_kind(handle, devnumber, features=None): - """Reads a device's type. - - :see DEVICE_KIND: - :returns: a string describing the device type, or ``None`` if the device is - not available or does not support the ``NAME`` feature. - """ - name_fi = _get_feature_index(handle, devnumber, FEATURE.NAME, features) - if name_fi is None: - return None - - d_kind = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x25)) - if d_kind: - d_kind = ord(d_kind[:1]) - # _log.debug("device %d type %d = %s", devnumber, d_kind, DEVICE_KIND[d_kind]) - return DEVICE_KIND[d_kind] - - -def get_device_name(handle, devnumber, features=None): - """Reads a device's name. - - :returns: a string with the device name, or ``None`` if the device is not - available or does not support the ``NAME`` feature. - """ - name_fi = _get_feature_index(handle, devnumber, FEATURE.NAME, features) - if name_fi is None: - return None - - name_length = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x05)) - if name_length: - name_length = ord(name_length[:1]) - - d_name = b'' - while len(d_name) < name_length: - name_fragment = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x15), len(d_name)) - if name_fragment: - name_fragment = name_fragment[:name_length - len(d_name)] - d_name += name_fragment - else: - break - - d_name = d_name.decode('ascii') - # _log.debug("device %d name %s", devnumber, d_name) - return d_name - - -def get_device_battery_level(handle, devnumber, features=None): - """Reads a device's battery level. - - :raises FeatureNotSupported: if the device does not support this feature. - """ - bat_fi = _get_feature_index(handle, devnumber, FEATURE.BATTERY, features) - if bat_fi is not None: - battery = _base.request(handle, devnumber, _pack('!BB', bat_fi, 0x05)) - if battery: - discharge, dischargeNext, status = _unpack('!BBB', battery[:3]) - _log.debug("device %d battery %d%% charged, next level %d%% charge, status %d = %s", - devnumber, discharge, dischargeNext, status, BATTERY_STATUS[status]) - return (discharge, dischargeNext, BATTERY_STATUS[status]) - - -def get_device_keys(handle, devnumber, features=None): - rk_fi = _get_feature_index(handle, devnumber, FEATURE.REPROGRAMMABLE_KEYS, features) - if rk_fi is None: - return None - - count = _base.request(handle, devnumber, _pack('!BB', rk_fi, 0x05)) - if count: - keys = [] - - count = ord(count[:1]) - for index in range(0, count): - keydata = _base.request(handle, devnumber, _pack('!BB', rk_fi, 0x15), index) - if keydata: - key, key_task, flags = _unpack('!HHB', keydata[:5]) - rki = _ReprogrammableKeyInfo(index, key, KEY_NAME[key], key_task, KEY_NAME[key_task], flags) - keys.append(rki) - - return keys diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index ef5794df..76b9b12b 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -3,59 +3,74 @@ # Unlikely to be used directly unless you're expanding the API. # -import os as _os from time import time as _timestamp from struct import pack as _pack -from binascii import hexlify as _hexlify -_hex = lambda d: _hexlify(d).decode('ascii').upper() +from random import getrandbits as _random_bits -from .constants import ERROR_NAME -from .exceptions import (NoReceiver as _NoReceiver, - FeatureCallError as _FeatureCallError) - -from logging import getLogger -_log = getLogger('LUR').getChild('base') +from logging import getLogger, DEBUG as _DEBUG +_log = getLogger('LUR.base') del getLogger +from .common import strhex as _strhex, KwException as _KwException +import hidpp10 as _hidpp10 +import hidpp20 as _hidpp20 import hidapi as _hid - # -# These values are defined by the Logitech documentation. -# Overstepping these boundaries will only produce log warnings. +# # -"""Minimim lenght of a feature call packet.""" -_MIN_CALL_SIZE = 7 - - -"""Maximum lenght of a feature call packet.""" -_MAX_CALL_SIZE = 20 - - -"""Minimum size of a feature reply packet.""" -_MIN_REPLY_SIZE = _MIN_CALL_SIZE - - -"""Maximum size of a feature reply packet.""" -_MAX_REPLY_SIZE = _MAX_CALL_SIZE - +_SHORT_MESSAGE_SIZE = 7 +_LONG_MESSAGE_SIZE = 20 +_MEDIUM_MESSAGE_SIZE = 15 +_MAX_READ_SIZE = 32 """Default timeout on read (in ms).""" -DEFAULT_TIMEOUT = 2000 +DEFAULT_TIMEOUT = 3000 +_RECEIVER_REQUEST_TIMEOUT = 500 +_DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT +_PING_TIMEOUT = 5000 + +# +# Exceptions that may be raised by this API. +# + +class NoReceiver(_KwException): + """Raised when trying to talk through a previously open handle, when the + receiver is no longer available. Should only happen if the receiver is + physically disconnected from the machine, or its kernel driver module is + unloaded.""" + pass + + +class NoSuchDevice(_KwException): + """Raised when trying to reach a device number not paired to the receiver.""" + pass + + +class DeviceUnreachable(_KwException): + """Raised when a request is made to an unreachable (turned off) device.""" + pass # # # -def list_receiver_devices(): +def receivers(): """List all the Linux devices exposed by the UR attached to the machine.""" # (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver') # interface 2 if the actual receiver interface + for d in _hid.enumerate(0x046d, 0xc52b, 2): if d.driver == 'logitech-djreceiver': yield d + # apparently there are TWO product ids possible for the UR + for d in _hid.enumerate(0x046d, 0xc532, 2): + if d.driver == 'logitech-djreceiver': + yield d + + def open_path(path): """Checks if the given Linux device path points to the right UR device. @@ -77,7 +92,7 @@ def open(): :returns: An open file handle for the found receiver, or ``None``. """ - for rawdevice in list_receiver_devices(): + for rawdevice in receivers(): handle = open_path(rawdevice.path) if handle: return handle @@ -94,14 +109,14 @@ def close(handle): # _log.info("closed receiver handle %s", repr(handle)) return True except: - # _log.exception("closing receiver handle %s", repr(handle)) + _log.exception("closing receiver handle %s", repr(handle)) pass return False def write(handle, devnumber, data): - """Writes some data to a certain device. + """Writes some data to the receiver, addressed to a certain device. :param handle: an open UR handle. :param devnumber: attached device number. @@ -114,70 +129,77 @@ def write(handle, devnumber, data): been physically removed from the machine, or the kernel driver has been unloaded. The handle will be closed automatically. """ - assert _MIN_CALL_SIZE == 7 - assert _MAX_CALL_SIZE == 20 # the data is padded to either 5 or 18 bytes - wdata = _pack('!BB18s' if len(data) > 5 else '!BB5s', 0x10, devnumber, data) - _log.debug("(%s) <= w[10 %02X %s %s]", handle, devnumber, _hex(wdata[2:4]), _hex(wdata[4:])) + if len(data) > _SHORT_MESSAGE_SIZE - 2: + wdata = _pack('!BB18s', 0x11, devnumber, data) + else: + wdata = _pack('!BB5s', 0x10, devnumber, data) + if _log.isEnabledFor(_DEBUG): + _log.debug("(%s) <= w[%02X %02X %s %s]", handle, ord(wdata[0]), devnumber, _strhex(wdata[2:4]), _strhex(wdata[4:])) try: _hid.write(int(handle), wdata) except Exception as reason: _log.error("write failed, assuming handle %s no longer available", repr(handle)) close(handle) - raise _NoReceiver(reason) + raise NoReceiver(reason) def read(handle, timeout=DEFAULT_TIMEOUT): """Read some data from the receiver. Usually called after a write (feature call), to get the reply. - :param handle: an open UR handle. - :param timeout: read timeout on the UR handle. - If any data was read in the given timeout, returns a tuple of - (reply_code, devnumber, message data). The reply code is generally ``0x11`` - for a successful feature call, or ``0x10`` to indicate some error, e.g. the - device is no longer available. + (code, devnumber, message data). :raises NoReceiver: if the receiver is no longer available, i.e. has been physically removed from the machine, or the kernel driver has been unloaded. The handle will be closed automatically. """ + reply = _read(handle, timeout) + if reply: + return reply[1:] + + +def _read(handle, timeout): try: - data = _hid.read(int(handle), _MAX_REPLY_SIZE, timeout) + data = _hid.read(int(handle), _MAX_READ_SIZE, timeout) except Exception as reason: _log.error("read failed, assuming handle %s no longer available", repr(handle)) close(handle) - raise _NoReceiver(reason) + raise NoReceiver(reason) if data: - if len(data) < _MIN_REPLY_SIZE: - _log.warn("(%s) => r[%s] read packet too short: %d bytes", handle, _hex(data), len(data)) - data += b'\x00' * (_MIN_REPLY_SIZE - len(data)) - if len(data) > _MAX_REPLY_SIZE: - _log.warn("(%s) => r[%s] read packet too long: %d bytes", handle, _hex(data), len(data)) - code = ord(data[:1]) + report_id = ord(data[:1]) + assert (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE or + report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE or + report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE) devnumber = ord(data[1:2]) - _log.debug("(%s) => r[%02X %02X %s %s]", handle, code, devnumber, _hex(data[2:4]), _hex(data[4:])) - return code, devnumber, data[2:] - # _l.log(_LOG_LEVEL, "(-) => r[]") + if _log.isEnabledFor(_DEBUG): + _log.debug("(%s) => r[%02X %02X %s %s]", handle, report_id, devnumber, _strhex(data[2:4]), _strhex(data[4:])) + + return report_id, devnumber, data[2:] + def _skip_incoming(handle): + """Read anything already in the input buffer.""" ihandle = int(handle) while True: try: - data = _hid.read(ihandle, _MAX_REPLY_SIZE, 0) + data = _hid.read(ihandle, _MAX_READ_SIZE, 0) except Exception as reason: _log.error("read failed, assuming receiver %s no longer available", handle) close(handle) - raise _NoReceiver(reason) + raise NoReceiver(reason) if data: - if unhandled_hook: - unhandled_hook(ord(data[:1]), ord(data[1:2]), data[2:]) + report_id = ord(data[:1]) + assert (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE or + report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE or + report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE) + _unhandled(report_id, ord(data[1:2]), data[2:]) else: return @@ -185,22 +207,43 @@ def _skip_incoming(handle): # # -"""The function that will be called on unhandled incoming events. +"""The function that may be called on incoming events. -The hook must be a function with the signature: ``_(int, int, str)``, where -the parameters are: (reply_code, devnumber, data). +The hook must be a callable accepting one tuple parameter, with the format +``( devnumber, request_id, data)``. -This hook will only be called by the request() function, when it receives -replies that do not match the requested feature call. As such, it is not -suitable for intercepting broadcast events from the device (e.g. special -keys being pressed, battery charge events, etc), at least not in a timely -manner. However, these events *may* be delivered here if they happen while -doing a feature call to the device. +This hook will only be called by the request()/ping() functions, when received +replies do not match the expected request_id. As such, it is not suitable for +intercepting broadcast events from the device (e.g. special keys being pressed, +battery charge events, etc), at least not in a timely manner. """ -unhandled_hook = None +events_hook = None + +def _unhandled(report_id, devnumber, data): + """Deliver a possible event to the unhandled_hook (if any).""" + if events_hook: + event = make_event(devnumber, data) + if event: + events_hook(event) -def request(handle, devnumber, feature_index_function, params=b'', features=None): +from collections import namedtuple +_Event = namedtuple('_Event', ['devnumber', 'sub_id', 'address', 'data']) +_Event.__str__ = lambda self: 'Event(%d,%02X,%02X,%s)' % (self.devnumber, self.sub_id, self.address, _strhex(self.data)) +del namedtuple + +def make_event(devnumber, data): + sub_id = ord(data[:1]) + if devnumber == 0xFF: + if sub_id == 0x4A: # receiver lock event + return _Event(devnumber, sub_id, ord(data[1:2]), data[2:]) + else: + address = ord(data[1:2]) + if sub_id > 0x00 and sub_id < 0x80 and (address & 0x01) == 0: + return _Event(devnumber, sub_id, address, data[2:]) + + +def request(handle, devnumber, request_id, *params): """Makes a feature call to a device and waits for a matching reply. This function will skip all incoming messages and events not related to the @@ -209,68 +252,75 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None :param handle: an open UR handle. :param devnumber: attached device number. - :param feature_index_function: a two-byte string of (feature_index, feature_function). + :param request_id: a 16-bit integer. :param params: parameters for the feature call, 3 to 16 bytes. - :param features: optional features array for the device, only used to fill - the FeatureCallError exception if one occurs. - :returns: the reply data packet, or ``None`` if the device is no longer - available. - :raisees FeatureCallError: if the feature call replied with an error. + :returns: the reply data, or ``None`` if some error occured. """ - if type(params) == int: - params = _pack('!B', params) + assert type(request_id) == int + if devnumber != 0xFF and request_id < 0x8000: + timeout = _DEVICE_REQUEST_TIMEOUT + # for HID++ 2.0 feature request, randomize the swid to make it easier to + # recognize the reply for this request. also, always set the last bit + # (0) in swid, to make events easier to identify + request_id = (request_id & 0xFFF0) | _random_bits(4) | 0x01 + else: + timeout = _RECEIVER_REQUEST_TIMEOUT + request_str = _pack('!H', request_id) - # _log.debug("%s device %d request {%s} params [%s]", handle, devnumber, _hex(feature_index_function), _hex(params)) - if len(feature_index_function) != 2: - raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hex(feature_index_function)) + params = b''.join(_pack('B', p) if type(p) == int else p for p in params) + # if _log.isEnabledFor(_DEBUG): + # _log.debug("(%s) device %d request_id {%04X} params [%s]", handle, devnumber, request_id, _strhex(params)) _skip_incoming(handle) ihandle = int(handle) - write(ihandle, devnumber, feature_index_function + params) + write(ihandle, devnumber, request_str + params) while True: now = _timestamp() - reply = read(ihandle, DEFAULT_TIMEOUT) + reply = _read(handle, timeout) delta = _timestamp() - now if reply: - reply_code, reply_devnumber, reply_data = reply + report_id, reply_devnumber, reply_data = reply if reply_devnumber == devnumber: - if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == feature_index_function: - # device not present - _log.debug("device %d request failed on {%s} call: [%s]", devnumber, _hex(feature_index_function), _hex(reply_data)) - return None + if report_id == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_str: + error = ord(reply_data[3:4]) - if reply_code == 0x10 and reply_data[:1] == b'\x8F': - # device not present - _log.debug("device %d request failed: [%s]", devnumber, _hex(reply_data)) - return None + # if error == _hidpp10.ERROR.resource_error: # device unreachable + # _log.warn("(%s) device %d error on request {%04X}: unknown device", handle, devnumber, request_id) + # raise DeviceUnreachable(number=devnumber, request=request_id) - if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function: - # the feature call returned with an error - error_code = ord(reply_data[3]) - _log.warn("device %d request feature call error %d = %s: %s", devnumber, error_code, ERROR_NAME[error_code], _hex(reply_data)) - feature_index = ord(feature_index_function[:1]) - feature_function = feature_index_function[1:2] - feature = None if features is None else features[feature_index] if feature_index < len(features) else None - raise _FeatureCallError(devnumber, feature, feature_index, feature_function, error_code, reply_data) + # if error == _hidpp10.ERROR.unknown_device: # unknown device + # _log.error("(%s) device %d error on request {%04X}: unknown device", handle, devnumber, request_id) + # raise NoSuchDevice(number=devnumber, request=request_id) - if reply_code == 0x11 and reply_data[:2] == feature_index_function: - # a matching reply - # _log.debug("device %d matched reply with feature-index-function [%s]", devnumber, _hex(reply_data[2:])) - return reply_data[2:] + _log.debug("(%s) device %d error on request {%04X}: %d = %s", + handle, devnumber, request_id, error, _hidpp10.ERROR[error]) + break - if reply_code == 0x10 and devnumber == 0xFF and reply_data[:2] == feature_index_function: - # direct calls to the receiver (device 0xFF) may also return successfully with reply code 0x10 - # _log.debug("device %d matched reply with feature-index-function [%s]", devnumber, _hex(reply_data[2:])) - return reply_data[2:] + if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_str: + # a HID++ 2.0 feature call returned with an error + error = ord(reply_data[3:4]) + _log.error("(%s) device %d error on feature request {%04X}: %d = %s", + handle, devnumber, request_id, error, _hidpp20.ERROR[error]) + raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params) - if unhandled_hook: - unhandled_hook(reply_code, reply_devnumber, reply_data) + if reply_data[:2] == request_str: + if devnumber == 0xFF: + if request_id == 0x83B5 or request_id == 0x81F1: + # these replies have to match the first parameter as well + if reply_data[2:3] == params[:1]: + return reply_data[2:] + else: + return reply_data[2:] + else: + return reply_data[2:] - if delta >= DEFAULT_TIMEOUT: - _log.warn("timeout on device %d request {%s} params[%s]", devnumber, _hex(feature_index_function), _hex(params)) - return None + _unhandled(report_id, reply_devnumber, reply_data) + + if delta >= timeout: + _log.warn("timeout on device %d request {%04X} params[%s]", devnumber, request_id, _strhex(params)) + raise DeviceUnreachable(number=devnumber, request=request_id) def ping(handle, devnumber): @@ -278,35 +328,48 @@ def ping(handle, devnumber): :returns: The HID protocol supported by the device, as a floating point number, if the device is active. """ - _log.debug("%s pinging device %d", handle, devnumber) + if _log.isEnabledFor(_DEBUG): + _log.debug("(%s) pinging device %d", handle, devnumber) _skip_incoming(handle) ihandle = int(handle) - write(ihandle, devnumber, b'\x00\x11\x00\x00\xAA') + + # randomize the swid and mark byte to positively identify the ping reply, + # and set the last (0) bit in swid to make it easier to distinguish requests + # from events + request_id = 0x0010 | _random_bits(4) | 0x01 + request_str = _pack('!H', request_id) + ping_mark = _pack('B', _random_bits(8)) + write(ihandle, devnumber, request_str + b'\x00\x00' + ping_mark) while True: now = _timestamp() - reply = read(ihandle, DEFAULT_TIMEOUT) + reply = _read(ihandle, _PING_TIMEOUT) delta = _timestamp() - now if reply: - reply_code, reply_devnumber, reply_data = reply - if reply_devnumber == devnumber: - if reply_code == 0x11 and reply_data[:2] == b'\x00\x11' and reply_data[4:5] == b'\xAA': - # HID 2.0+ device, currently connected - return ord(reply_data[2:3]) + ord(reply_data[3:4]) / 10.0 + report_id, number, data = reply + if number == devnumber: + if data[:2] == request_str and data[4:5] == ping_mark: + # HID++ 2.0+ device, currently connected + return ord(data[2:3]) + ord(data[3:4]) / 10.0 - if reply_code == 0x10 and reply_data == b'\x8F\x00\x11\x01\x00': - # HID 1.0 device, currently connected - return 1.0 + if report_id == 0x10 and data[:1] == b'\x8F' and data[1:3] == request_str: + assert data[-1:] == b'\x00' + error = ord(data[3:4]) - if reply_code == 0x10 and reply_data[:3] == b'\x8F\x00\x11': - # a disconnected device - return None + if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device + return 1.0 - if unhandled_hook: - unhandled_hook(reply_code, reply_devnumber, reply_data) + if error == _hidpp10.ERROR.resource_error: # device unreachable + raise DeviceUnreachable(number=devnumber, request=request_id) - if delta >= DEFAULT_TIMEOUT: - _log.warn("timeout on device %d ping", devnumber) - return None + if error == _hidpp10.ERROR.unknown_device: # no paired device with that number + _log.error("(%s) device %d error on ping request: unknown device", handle, devnumber) + raise NoSuchDevice(devnumber) + + _unhandled(report_id, number, data) + + if delta >= _PING_TIMEOUT: + _log.warn("(%s) timeout on device %d ping", handle, devnumber) + raise DeviceUnreachable(number=devnumber, request=request_id) diff --git a/lib/logitech/unifying_receiver/common.py b/lib/logitech/unifying_receiver/common.py index afd01330..94adc35b 100644 --- a/lib/logitech/unifying_receiver/common.py +++ b/lib/logitech/unifying_receiver/common.py @@ -2,30 +2,79 @@ # Some common functions and types. # -from collections import namedtuple from binascii import hexlify as _hexlify -_hex = lambda d: _hexlify(d).decode('ascii').upper() +from struct import pack as _pack -class FallbackDict(dict): - def __init__(self, fallback_function=lambda x: None, *args, **kwargs): - super(FallbackDict, self).__init__(*args, **kwargs) - self.fallback = fallback_function +class NamedInt(int): + """An integer with an attached name.""" + __slots__ = ['name'] - def __getitem__(self, key): + def __new__(cls, value, name): + obj = int.__new__(cls, value) + obj.name = name + return obj + + def bytes(self, count=2): + value = int(self) + if value.bit_length() > count * 8: + raise ValueError("cannot fit %X into %d bytes" % (value, count)) + + return _pack('!L', value)[-count:] + + + def __str__(self): + return self.name + + def __repr__(self): + return 'NamedInt(%d, %s)' % (int(self), repr(self.name)) + + +class NamedInts(object): + def __init__(self, **kwargs): + values = dict((k, NamedInt(v, k if k == k.upper() else k.replace('__', '/').replace('_', ' '))) for (k, v) in kwargs.items()) + self.__dict__.update(values) + self._indexed = dict((int(v), v) for v in values.values()) + self._fallback = None + + def __getitem__(self, index): + if index in self._indexed: + return self._indexed[index] + + if self._fallback: + value = NamedInt(index, self._fallback(index)) + self._indexed[index] = value + return value + + def __contains__(self, value): + return int(value) in self._indexed + + def __len__(self): + return len(self.values) + + def flag_names(self, value): + return ', '.join(str(self._indexed[k]) for k in self._indexed if k & value == k) + + +def strhex(x): + return _hexlify(x).decode('ascii').upper() + + +class KwException(Exception): + def __init__(self, **kwargs): + super(KwException, self).__init__(kwargs) + + def __getattr__(self, k): try: - return super(FallbackDict, self).__getitem__(key) - except KeyError: - return self.fallback(key) + return super(KwException, self).__getattr__(k) + except AttributeError: + return self.args[0][k] -def list2dict(values_list): - return dict(zip(range(0, len(values_list)), values_list)) - +from collections import namedtuple """Firmware information.""" FirmwareInfo = namedtuple('FirmwareInfo', [ - 'level', 'kind', 'name', 'version', @@ -34,15 +83,8 @@ FirmwareInfo = namedtuple('FirmwareInfo', [ """Reprogrammable keys informations.""" ReprogrammableKeyInfo = namedtuple('ReprogrammableKeyInfo', [ 'index', - 'id', - 'name', + 'key', 'task', - 'task_name', 'flags']) - -class Packet(namedtuple('Packet', ['code', 'devnumber', 'data'])): - def __str__(self): - return 'Packet(%02X,%02X,%s)' % (self.code, self.devnumber, 'None' if self.data is None else _hex(self.data)) - del namedtuple diff --git a/lib/logitech/unifying_receiver/constants.py b/lib/logitech/unifying_receiver/constants.py deleted file mode 100644 index cce5c6e9..00000000 --- a/lib/logitech/unifying_receiver/constants.py +++ /dev/null @@ -1,109 +0,0 @@ -# -# Constants used by the rest of the API. -# - -from struct import pack as _pack -from binascii import hexlify as _hexlify -_hex = lambda d: _hexlify(d).decode('ascii').upper() - -from .common import (FallbackDict, list2dict) - - -"""Possible features available on a Logitech device. - -A particular device might not support all these features, and may support other -unknown features as well. -""" -FEATURE = type('FEATURE', (), - dict( - ROOT=b'\x00\x00', - FEATURE_SET=b'\x00\x01', - FIRMWARE=b'\x00\x03', - NAME=b'\x00\x05', - BATTERY=b'\x10\x00', - REPROGRAMMABLE_KEYS=b'\x1B\x00', - WIRELESS=b'\x1D\x4B', - SOLAR_CHARGE=b'\x43\x01', - )) - -def _feature_name(key): - if key is None: - return None - if type(key) == int: - return FEATURE_NAME[_pack('!H', key)] - return 'UNKNOWN_' + _hex(key) - - -"""Feature names indexed by feature id.""" -FEATURE_NAME = FallbackDict(_feature_name) -FEATURE_NAME[FEATURE.ROOT] = 'ROOT' -FEATURE_NAME[FEATURE.FEATURE_SET] = 'FEATURE_SET' -FEATURE_NAME[FEATURE.FIRMWARE] = 'FIRMWARE' -FEATURE_NAME[FEATURE.NAME] = 'NAME' -FEATURE_NAME[FEATURE.BATTERY] = 'BATTERY' -FEATURE_NAME[FEATURE.REPROGRAMMABLE_KEYS] = 'REPROGRAMMABLE_KEYS' -FEATURE_NAME[FEATURE.WIRELESS] = 'WIRELESS' -FEATURE_NAME[FEATURE.SOLAR_CHARGE] = 'SOLAR_CHARGE' - - -FEATURE_FLAGS = { 0x20: 'internal', 0x40: 'hidden', 0x80: 'obsolete' } - - -_DEVICE_KINDS = ('keyboard', 'remote control', 'numpad', 'mouse', - 'touchpad', 'trackball', 'presenter', 'receiver') - -"""Possible types of devices connected to an UR.""" -DEVICE_KIND = FallbackDict(lambda x: 'unknown', list2dict(_DEVICE_KINDS)) - - -_FIRMWARE_KINDS = ('Firmware', 'Bootloader', 'Hardware', 'Other') - -"""Names of different firmware levels possible, indexed by level.""" -FIRMWARE_KIND = FallbackDict(lambda x: 'Unknown', list2dict(_FIRMWARE_KINDS)) - - -_BATTERY_STATUSES = ('Discharging (in use)', 'Recharging', 'Almost full', - 'Full', 'Slow recharge', 'Invalid battery', 'Thermal error') -BATTERY_OK = lambda status: status < 5 - -"""Names for possible battery status values.""" -BATTERY_STATUS = FallbackDict(lambda x: 'unknown', list2dict(_BATTERY_STATUSES)) - -_KEY_NAMES = ( 'unknown_0000', 'Volume up', 'Volume down', 'Mute', 'Play/Pause', - 'Next', 'Previous', 'Stop', 'Application switcher', - 'unknown_0009', 'Calculator', 'unknown_000B', 'unknown_000C', - 'unknown_000D', 'Mail') - -"""Standard names for reprogrammable keys.""" -KEY_NAME = FallbackDict(lambda x: 'unknown_%04X' % x, list2dict(_KEY_NAMES)) - -"""Possible flags on a reprogrammable key.""" -KEY_FLAG = type('KEY_FLAG', (), dict( - REPROGRAMMABLE=0x10, - FN_SENSITIVE=0x08, - NONSTANDARD=0x04, - IS_FN=0x02, - MSE=0x01, - )) - -KEY_FLAG_NAME = FallbackDict(lambda x: 'unknown') -KEY_FLAG_NAME[KEY_FLAG.REPROGRAMMABLE] = 'reprogrammable' -KEY_FLAG_NAME[KEY_FLAG.FN_SENSITIVE] = 'fn-sensitive' -KEY_FLAG_NAME[KEY_FLAG.NONSTANDARD] = 'nonstandard' -KEY_FLAG_NAME[KEY_FLAG.IS_FN] = 'is-fn' -KEY_FLAG_NAME[KEY_FLAG.MSE] = 'mse' - -_ERROR_NAMES = ('Ok', 'Unknown', 'Invalid argument', 'Out of range', - 'Hardware error', 'Logitech internal', 'Invalid feature index', - 'Invalid function', 'Busy', 'Unsupported') - -"""Names for error codes.""" -ERROR_NAME = FallbackDict(lambda x: 'Unknown error', list2dict(_ERROR_NAMES)) - - -"""Maximum number of devices that can be attached to a single receiver.""" -MAX_ATTACHED_DEVICES = 6 - - -del FallbackDict -del list2dict diff --git a/lib/logitech/unifying_receiver/devices.py b/lib/logitech/unifying_receiver/devices.py new file mode 100644 index 00000000..772b5d18 --- /dev/null +++ b/lib/logitech/unifying_receiver/devices.py @@ -0,0 +1,30 @@ +# +# +# + +from collections import namedtuple +_D = namedtuple('_DeviceDescriptor', ['codename', 'name', 'kind']) +del namedtuple + +DEVICES = ( _D('M315', 'Wireless Mouse M315', 'mouse'), + _D('M325', 'Wireless Mouse M325', 'mouse'), + _D('M505', 'Wireless Mouse M505', 'mouse'), + _D('M510', 'Wireless Mouse M510', 'mouse'), + _D('M515', 'Couch Mouse M515', 'mouse'), + _D('M525', 'Wireless Mouse M525', 'mouse'), + _D('M570', 'Wireless Trackball M570', 'trackball'), + _D('M600', 'Touch Mouse M600', 'mouse'), + _D('M705', 'Marathon Mouse M705', 'mouse'), + _D('K270', 'Wireless Keyboard K270', 'keyboard'), + _D('K350', 'Wireless Keyboard K350', 'keyboard'), + _D('K360', 'Wireless Keyboard K360', 'keyboard'), + _D('K400', 'Wireless Touch Keyboard K400', 'keyboard'), + _D('K750', 'Wireless Solar Keyboard K750', 'keyboard'), + _D('K800', 'Wireless Illuminated Keyboard K800', 'keyboard'), + _D('T400', 'Zone Touch Mouse T400', 'mouse'), + _D('T650', 'Wireless Rechargeable Touchpad T650', 'touchpad'), + _D('Cube', 'Logitech Cube', 'mouse'), + _D('Anywhere MX', 'Anywhere Mouse MX', 'mouse'), + _D('Performance MX', 'Performance Mouse MX', 'mouse'), + ) +DEVICES = { d.codename: d for d in DEVICES } diff --git a/lib/logitech/unifying_receiver/exceptions.py b/lib/logitech/unifying_receiver/exceptions.py deleted file mode 100644 index 69795cd4..00000000 --- a/lib/logitech/unifying_receiver/exceptions.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Exceptions that may be raised by this API. -# - -from .constants import (FEATURE_NAME, ERROR_NAME) - - -class NoReceiver(Exception): - """May be raised when trying to talk through a previously connected - receiver that is no longer available. Should only happen if the receiver is - physically disconnected from the machine, or its kernel driver module is - unloaded.""" - pass - - -class FeatureNotSupported(Exception): - """Raised when trying to request a feature not supported by the device.""" - def __init__(self, devnumber, feature): - super(FeatureNotSupported, self).__init__(devnumber, feature, FEATURE_NAME[feature]) - self.devnumber = devnumber - self.feature = feature - self.feature_name = FEATURE_NAME[feature] - - -class FeatureCallError(Exception): - """Raised if the device replied to a feature call with an error.""" - def __init__(self, devnumber, feature, feature_index, feature_function, error_code, data=None): - super(FeatureCallError, self).__init__(devnumber, feature, feature_index, feature_function, error_code, ERROR_NAME[error_code]) - self.devnumber = devnumber - self.feature = feature - self.feature_name = None if feature is None else FEATURE_NAME[feature] - self.feature_index = feature_index - self.feature_function = feature_function - self.error_code = error_code - self.error_string = ERROR_NAME[error_code] - self.data = data diff --git a/lib/logitech/unifying_receiver/hidpp10.py b/lib/logitech/unifying_receiver/hidpp10.py new file mode 100644 index 00000000..4f2ae120 --- /dev/null +++ b/lib/logitech/unifying_receiver/hidpp10.py @@ -0,0 +1,66 @@ +# +# +# + +from .common import NamedInts as _NamedInts + +# +# constants +# + +DEVICE_KIND = _NamedInts( + keyboard=0x01, + mouse=0x02, + numpad=0x03, + presenter=0x04, + trackball=0x08, + touchpad=0x09) + +POWER_SWITCH_LOCATION = _NamedInts( + base=0x01, + top_case=0x02, + edge_of_top_right_corner=0x03, + top_left_corner=0x05, + bottom_left_corner=0x06, + top_right_corner=0x07, + bottom_right_corner=0x08, + top_edge=0x09, + right_edge=0x0A, + left_edge=0x0B, + bottom_edge=0x0C) + +NOTIFICATION_FLAG = _NamedInts( + battery_status=0x00100000, + wireless=0x00000100, + software_present=0x000000800) + +ERROR = _NamedInts( + invalid_SubID__command=0x01, + invalid_address=0x02, + invalid_value=0x03, + connection_request_failed=0x04, + too_many_devices=0x05, + already_exists=0x06, + busy=0x07, + unknown_device=0x08, + resource_error=0x09, + request_unavailable=0x0A, + unsupported_parameter_value=0x0B, + wrong_pin_code=0x0C) + +PAIRING_ERRORS = _NamedInts( + device_timeout=0x01, + device_not_supported=0x02, + too_many_devices=0x03, + sequence_timeout=0x06) + +# +# functions +# + +def get_battery(device): + """Reads a device's battery level, if provided by the HID++ 1.0 protocol.""" + reply = device.request(0x810D) + if reply: + charge = ord(reply[:1]) + return charge, None diff --git a/lib/logitech/unifying_receiver/hidpp20.py b/lib/logitech/unifying_receiver/hidpp20.py new file mode 100644 index 00000000..85405b32 --- /dev/null +++ b/lib/logitech/unifying_receiver/hidpp20.py @@ -0,0 +1,388 @@ +# +# Logitech Unifying Receiver API. +# + +from struct import pack as _pack, unpack as _unpack + +from logging import getLogger, DEBUG as _DEBUG +_log = getLogger('LUR').getChild('hidpp20') +del getLogger + +from .common import (FirmwareInfo as _FirmwareInfo, + ReprogrammableKeyInfo as _ReprogrammableKeyInfo, + KwException as _KwException, + NamedInts as _NamedInts) + +# +# +# + +"""Possible features available on a Logitech device. + +A particular device might not support all these features, and may support other +unknown features as well. +""" +FEATURE = _NamedInts( + ROOT=0x0000, + FEATURE_SET=0x0001, + FIRMWARE=0x0003, + NAME=0x0005, + BATTERY=0x1000, + REPROGRAMMABLE_KEYS=0x1B00, + WIRELESS=0x1D4B, + SOLAR_CHARGE=0x4301, + TOUCH_MOUSE=0x6110) +FEATURE._fallback = lambda x: 'unknown:%04X' % x + +FEATURE_FLAG = _NamedInts( + internal=0x20, + hidden=0x40, + obsolete=0x80) + +DEVICE_KIND = _NamedInts( + keyboard=0x00, + remote_control=0x01, + numpad=0x02, + mouse=0x03, + touchpad=0x04, + trackball=0x05, + presenter=0x06, + receiver=0x07) + +FIRMWARE_KIND = _NamedInts( + Firmware=0x00, + Bootloader=0x01, + Hardware=0x02, + Other=0x03) + +BATTERY_OK = lambda status: status < 5 + +BATTERY_STATUS = _NamedInts( + discharging=0x00, + recharging=0x01, + almost_full=0x02, + full=0x03, + slow_recharge=0x04, + invalid_battery=0x05, + thermal_error=0x06) + +KEY = _NamedInts( + Volume_Up=0x0001, + Volume_Down=0x0002, + Mute=0x0003, + Play__Pause=0x0004, + Next=0x0005, + Previous=0x0006, + Stop=0x0007, + Application_Switcher=0x0008, + Calculator=0x000A, + Mail=0x000E, + Home=0x001A, + Tools=0x001D, + Search=0x0029, + Sleep=0x002F) +KEY._fallback = lambda x: 'unknown:%04X' % x + +KEY_FLAG = _NamedInts( + reprogrammable=0x10, + FN_sensitive=0x08, + nonstandard=0x04, + is_FN=0x02, + mse=0x01) + +ERROR = _NamedInts( + unknown=0x01, + invalid_argument=0x02, + out_of_range=0x03, + hardware_error=0x04, + logitech_internal=0x05, + invalid_feature_index=0x06, + invalid_function=0x07, + busy=0x08, + unsupported=0x09) + +# +# +# + +class FeatureNotSupported(_KwException): + """Raised when trying to request a feature not supported by the device.""" + pass + +class FeatureCallError(_KwException): + """Raised if the device replied to a feature call with an error.""" + pass + +# +# +# + +class FeaturesArray(object): + """A sequence of features supported by a HID++ 2.0 device.""" + __slots__ = ('supported', 'device', 'features') + + def __init__(self, device): + assert device is not None + self.device = device + self.supported = True + self.features = None + + def __del__(self): + self.supported = False + self.features = None + self.device = None + + def _check(self): + # print ("%s check" % self.device) + if self.supported: + assert self.device + if self.features is not None: + return True + + protocol = self.device.protocol + if protocol == 0: + # device is not connected right now, will have to try later + return False + + # I _think_ this is universally true + if protocol < 2.0: + self.supported = False + # self.device.features = None + self.device = None + return False + + reply = self.device.request(int(FEATURE.ROOT), _pack('!H', FEATURE.FEATURE_SET)) + if reply is None: + self.supported = False + else: + fs_index = ord(reply[0:1]) + if fs_index: + count = self.device.request(fs_index << 8) + if count is None: + _log.warn("FEATURE_SET found, but failed to read features count") + # most likely the device is unavailable + return False + else: + count = ord(count[:1]) + assert count >= fs_index + self.features = [None] * (1 + count) + self.features[0] = FEATURE.ROOT + self.features[fs_index] = FEATURE.FEATURE_SET + return True + else: + self.supported = False + + return False + + __bool__ = __nonzero__ = _check + + def __getitem__(self, index): + if self._check(): + assert type(index) == int + if index < 0 or index >= len(self.features): + raise IndexError(index) + + if self.features[index] is None: + feature = self.device.feature_request(FEATURE.FEATURE_SET, 0x10, index) + if feature: + feature, = _unpack('!H', feature[:2]) + self.features[index] = FEATURE[feature] + + return self.features[index] + + def __contains__(self, value): + if self._check(): + may_have = False + for f in self.features: + if f is None: + may_have = True + elif int(value) == int(f): + return True + elif int(value) < int(f): + break + + if may_have: + reply = self.device.request(int(FEATURE.ROOT), _pack('!H', value)) + if reply: + index = ord(reply[0:1]) + if index: + self.features[index] = FEATURE[int(value)] + return True + + def index(self, value): + if self._check(): + may_have = False + for index, f in enumerate(self.features): + if f is None: + may_have = True + elif int(value) == int(f): + return index + elif int(value) < int(f): + raise ValueError("%s not in list" % repr(value)) + + if may_have: + reply = self.device.request(int(FEATURE.ROOT), _pack('!H', value)) + if reply: + index = ord(reply[0:1]) + self.features[index] = FEATURE[int(value)] + return index + + raise ValueError("%s not in list" % repr(value)) + + def __iter__(self): + if self._check(): + yield FEATURE.ROOT + index = 1 + last_index = len(self.features) + while index < last_index: + yield self.__getitem__(index) + index += 1 + + def __len__(self): + return len(self.features) if self._check() else 0 + +# +# +# + +class KeysArray(object): + """A sequence of key mappings supported by a HID++ 2.0 device.""" + __slots__ = ('device', 'keys') + + def __init__(self, device, count): + assert device is not None + self.device = device + self.keys = [None] * count + + def __del__(self): + self.keys = None + self.device = None + + def __getitem__(self, index): + assert type(index) == int + if index < 0 or index >= len(self.keys): + raise IndexError(index) + + if self.keys[index] is None: + keydata = feature_request(self.device, FEATURE.REPROGRAMMABLE_KEYS, 0x10, index) + if keydata: + key, key_task, flags = _unpack('!HHB', keydata[:5]) + self.keys[index] = _ReprogrammableKeyInfo(index, KEY[key], KEY[key_task], flags) + + return self.keys[index] + + def index(self, value): + for index, k in enumerate(self.keys): + if k is not None and int(value) == int(k.key): + return index + + for index, k in enumerate(self.keys): + if k is None: + k = self.__getitem__(index) + if k is not None: + return index + + def __iter__(self): + for k in range(0, len(self.keys)): + yield self.__getitem__(k) + + def __len__(self): + return len(self.keys) + + +# +# +# + +def feature_request(device, feature, function=0x00, *params): + if device.features: + if feature in device.features: + feature_index = device.features.index(int(feature)) + return device.request((feature_index << 8) + (function & 0xFF), *params) + + +def get_firmware(device): + """Reads a device's firmware info. + + :returns: a list of FirmwareInfo tuples, ordered by firmware layer. + """ + count = feature_request(device, FEATURE.FIRMWARE) + if count: + count = ord(count[:1]) + + fw = [] + for index in range(0, count): + fw_info = feature_request(device, FEATURE.FIRMWARE, 0x10, index) + if fw_info: + level = ord(fw_info[:1]) & 0x0F + if level == 0 or level == 1: + name, version_major, version_minor, build = _unpack('!3sBBH', fw_info[1:8]) + version = '%02X.%02X' % (version_major, version_minor) + if build: + version += '.B%04X' % build + extras = fw_info[9:].rstrip(b'\x00') or None + fw_info = _FirmwareInfo(FIRMWARE_KIND[level], name.decode('ascii'), version, extras) + elif level == FIRMWARE_KIND.Hardware: + fw_info = _FirmwareInfo(FIRMWARE_KIND.Hardware, '', ord(fw_info[1:2]), None) + else: + fw_info = _FirmwareInfo(FIRMWARE_KIND.Other, '', '', None) + + fw.append(fw_info) + # _log.debug("device %d firmware %s", devnumber, fw_info) + return tuple(fw) + + +def get_kind(device): + """Reads a device's type. + + :see DEVICE_KIND: + :returns: a string describing the device type, or ``None`` if the device is + not available or does not support the ``NAME`` feature. + """ + kind = feature_request(device, FEATURE.NAME, 0x20) + if kind: + kind = ord(kind[:1]) + # _log.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind]) + return DEVICE_KIND[kind] + + +def get_name(device): + """Reads a device's name. + + :returns: a string with the device name, or ``None`` if the device is not + available or does not support the ``NAME`` feature. + """ + name_length = feature_request(device, FEATURE.NAME) + if name_length: + name_length = ord(name_length[:1]) + + name = b'' + while len(name) < name_length: + fragment = feature_request(device, FEATURE.NAME, 0x10, len(name)) + if fragment: + name += fragment[:name_length - len(name)] + else: + _log.error("failed to read whole name of %s (expected %d chars)", device, name_length) + return None + + return name.decode('ascii') + + +def get_battery(device): + """Reads a device's battery level. + + :raises FeatureNotSupported: if the device does not support this feature. + """ + battery = feature_request(device, FEATURE.BATTERY) + if battery: + discharge, dischargeNext, status = _unpack('!BBB', battery[:3]) + if _log.isEnabledFor(_DEBUG): + _log.debug("device %d battery %d%% charged, next level %d%% charge, status %d = %s", + device.number, discharge, dischargeNext, status, BATTERY_STATUS[status]) + return discharge, BATTERY_STATUS[status] + + +def get_keys(device): + count = feature_request(device, FEATURE.REPROGRAMMABLE_KEYS) + if count: + return KeysArray(device, ord(count[:1])) diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py index b17ae1b4..a8f6dd4d 100644 --- a/lib/logitech/unifying_receiver/listener.py +++ b/lib/logitech/unifying_receiver/listener.py @@ -3,10 +3,7 @@ # import threading as _threading - -from . import base as _base -from .exceptions import NoReceiver as _NoReceiver -from .common import Packet as _Packet +from time import time as _timestamp # for both Python 2 and 3 try: @@ -14,84 +11,163 @@ try: except ImportError: from queue import Queue as _Queue - -from logging import getLogger +from logging import getLogger, DEBUG as _DEBUG _log = getLogger('LUR').getChild('listener') del getLogger +from . import base as _base + +# +# +# + +class ThreadedHandle(object): + """A thread-local wrapper with different open handles for each thread.""" + + __slots__ = ['path', '_local', '_handles'] + + def __init__(self, initial_handle, path): + assert initial_handle + if type(initial_handle) != int: + raise TypeError('expected int as initial handle, got %s' % repr(initial_handle)) + + assert path + self.path = path + self._local = _threading.local() + self._local.handle = initial_handle + self._handles = [initial_handle] + + def _open(self): + handle = _base.open_path(self.path) + if handle is None: + _log.error("%s failed to open new handle", repr(self)) + else: + # _log.debug("%s opened new handle %d", repr(self), handle) + self._local.handle = handle + self._handles.append(handle) + return handle + + def close(self): + if self._local: + self._local = None + handles, self._handles = self._handles, [] + if _log.isEnabledFor(_DEBUG): + _log.debug("%s closing %s", repr(self), handles) + for h in handles: + _base.close(h) + + def __del__(self): + self.close() + + def __index__(self): + if self._local: + try: + return self._local.handle + except: + return self._open() + __int__ = __index__ + + def __str__(self): + if self._local: + return str(int(self)) + + def __repr__(self): + return '' % self.path + + def __bool__(self): + return bool(self._local) + __nonzero__ = __bool__ + +# +# +# + +_EVENT_READ_TIMEOUT = 500 + class EventsListener(_threading.Thread): """Listener thread for events from the Unifying Receiver. Incoming packets will be passed to the callback function in sequence. """ - def __init__(self, receiver_handle, events_callback): + def __init__(self, receiver, events_callback): super(EventsListener, self).__init__(name=self.__class__.__name__) self.daemon = True self._active = False - self._handle = receiver_handle + self.receiver = receiver self._queued_events = _Queue(32) self._events_callback = events_callback + self.tick_period = 0 + def run(self): self._active = True - _base.unhandled_hook = self._unhandled_hook - ihandle = int(self._handle) - _log.info("started with %s (%d)", repr(self._handle), ihandle) + _base.events_hook = self._events_hook + ihandle = int(self.receiver.handle) + _log.info("started with %s (%d)", self.receiver, ihandle) + + self.has_started() + + last_tick = _timestamp() if self.tick_period else 0 while self._active: if self._queued_events.empty(): try: # _log.debug("read next event") - event = _base.read(ihandle) - # shortcut: we should only be looking at events for proper device numbers - except _NoReceiver: - self._active = False - self._handle = None + event = _base.read(ihandle, _EVENT_READ_TIMEOUT) + except _base.NoReceiver: _log.warning("receiver disconnected") - event = (0xFF, 0xFF, None) + self.receiver.close() + break + + if event: + event = _base.make_event(*event) else: # deliver any queued events event = self._queued_events.get() if event: - event = _Packet(*event) - # _log.debug("processing event %s", event) + _log.debug("processing event %s", event) try: self._events_callback(event) except: _log.exception("processing event %s", event) + elif self.tick_period: + now = _timestamp() + if now - last_tick >= self.tick_period: + last_tick = now + self.tick(now) _base.unhandled_hook = None - handle, self._handle = self._handle, None - if handle: - _base.close(handle) - _log.info("stopped %s", repr(handle)) + del self._queued_events + + self.has_stopped() def stop(self): """Tells the listener to stop as soon as possible.""" - if self._active: - _log.debug("stopping") - self._active = False - handle, self._handle = self._handle, None - if handle: - _base.close(handle) - _log.info("stopped %s", repr(handle)) + self._active = False - @property - def handle(self): - return self._handle + def has_started(self): + """Called right after the thread has started.""" + pass - def _unhandled_hook(self, reply_code, devnumber, data): + def has_stopped(self): + """Called right before the thread stops.""" + pass + + def tick(self, timestamp): + """Called about every tick_period seconds, if set.""" + pass + + def _events_hook(self, event): # only consider unhandled events that were sent from this thread, # i.e. triggered during a callback of a previous event if _threading.current_thread() == self: - event = _Packet(reply_code, devnumber, data) _log.info("queueing unhandled event %s", event) self._queued_events.put(event) def __bool__(self): - return bool(self._active and self._handle) + return bool(self._active and self.receiver) __nonzero__ = __bool__ diff --git a/lib/logitech/unifying_receiver/receiver.py b/lib/logitech/unifying_receiver/receiver.py new file mode 100644 index 00000000..c9290a1d --- /dev/null +++ b/lib/logitech/unifying_receiver/receiver.py @@ -0,0 +1,329 @@ +# +# +# + +import errno as _errno + +from logging import getLogger +_log = getLogger('LUR').getChild('receiver') +del getLogger + +from . import base as _base +from . import hidpp10 as _hidpp10 +from . import hidpp20 as _hidpp20 +from .common import strhex as _strhex, FirmwareInfo as _FirmwareInfo +from .devices import DEVICES as _DEVICES + +# +# +# + +"""A receiver may have a maximum of 6 paired devices at a time.""" +MAX_PAIRED_DEVICES = 6 + + +class PairedDevice(object): + def __init__(self, receiver, number): + assert receiver + self.receiver = receiver + assert number > 0 and number <= MAX_PAIRED_DEVICES + self.number = number + + self._protocol = None + self._wpid = None + self._power_switch = None + self._codename = None + self._name = None + self._kind = None + self._serial = None + self._firmware = None + self._keys = None + + self.features = _hidpp20.FeaturesArray(self) + + def __del__(self): + del self.receiver + del self.features + del self._keys + + @property + def protocol(self): + if self._protocol is None: + self._protocol = _base.ping(self.receiver.handle, self.number) + # _log.debug("device %d protocol %s", self.number, self._protocol) + return self._protocol or 0 + + @property + def wpid(self): + if self._wpid is None: + pair_info = self.receiver.request(0x83B5, 0x20 + self.number - 1) + if pair_info: + self._wpid = _strhex(pair_info[3:5]) + if self._kind is None: + kind = ord(pair_info[7:8]) & 0x0F + self._kind = _hidpp10.DEVICE_KIND[kind] + return self._wpid + + @property + def power_switch_location(self): + if self._power_switch is None: + self.serial + return self._power_switch + + @property + def codename(self): + if self._codename is None: + codename = self.receiver.request(0x83B5, 0x40 + self.number - 1) + if codename: + self._codename = codename[2:].rstrip(b'\x00').decode('utf-8') + # _log.debug("device %d codename %s", self.number, self._codename) + return self._codename + + @property + def name(self): + if self._name is None: + if self.protocol < 2.0: + if self.codename in _DEVICES: + _, self._name, self._kind = _DEVICES[self._codename] + else: + self._name = _hidpp20.get_name(self) + return self._name or self.codename or '?' + + @property + def kind(self): + if self._kind is None: + pair_info = self.receiver.request(0x83B5, 0x20 + self.number - 1) + if pair_info: + kind = ord(pair_info[7:8]) & 0x0F + self._kind = _hidpp10.DEVICE_KIND[kind] + if self._wpid is None: + self._wpid = _strhex(pair_info[3:5]) + if self._kind is None: + if self.protocol < 2.0: + if self.codename in _DEVICES: + _, self._name, self._kind = _DEVICES[self._codename] + else: + self._kind = _hidpp20.get_kind(self) + return self._kind or '?' + + @property + def firmware(self): + if self._firmware is None and self.protocol >= 2.0: + self._firmware = _hidpp20.get_firmware(self) + # _log.debug("device %d firmware %s", self.number, self._firmware) + return self._firmware or () + + @property + def serial(self): + if self._serial is None: + serial = self.receiver.request(0x83B5, 0x30 + self.number - 1) + if serial: + self._serial = _strhex(serial[1:5]) + # _log.debug("device %d serial %s", self.number, self._serial) + ps_location = ord(serial[9:10]) & 0x0F + self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps_location] + return self._serial or '?' + + @property + def keys(self): + if self._keys is None: + self._keys = _hidpp20.get_keys(self) or () + return self._keys + + def request(self, request_id, *params): + return _base.request(self.receiver.handle, self.number, request_id, *params) + + def feature_request(self, feature, function=0x00, *params): + return _hidpp20.feature_request(self, feature, function, *params) + + def ping(self): + return _base.ping(self.receiver.handle, self.number) is not None + + def __index__(self): + return self.number + __int__ = __index__ + + def __hash__(self): + return self.number + + def __cmp__(self, other): + return self.number - other.number + + def __eq__(self, other): + return self.receiver == other.receiver and self.number == other.number + + def __str__(self): + return '' % (self.receiver, self.number, self.codename or '?') + +# +# +# + +class Receiver(object): + """A Unifying Receiver instance. + + The paired devices are available through the sequence interface. + """ + name = 'Unifying Receiver' + max_devices = MAX_PAIRED_DEVICES + create_device = PairedDevice + + def __init__(self, handle, path=None): + assert handle + self.handle = handle + assert path + self.path = path + + self.number = 0xFF + self._serial = None + self._firmware = None + self._devices = {} + + def close(self): + handle, self.handle = self.handle, None + self._devices.clear() + return (handle and _base.close(handle)) + + def __del__(self): + self.close() + + @property + def serial(self): + if self._serial is None and self.handle: + serial = self.request(0x83B5, 0x03) + if serial: + self._serial = _strhex(serial[1:5]) + return self._serial + + @property + def firmware(self): + if self._firmware is None and self.handle: + firmware = [] + + reply = self.request(0x83B5, 0x02) + if reply: + fw_version = _strhex(reply[1:5]) + fw_version = '%s.%s.B%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8]) + firmware.append(_FirmwareInfo(_hidpp20.FIRMWARE_KIND.Firmware, '', fw_version, None)) + + reply = self.request(0x81F1, 0x04) + if reply: + bl_version = _strhex(reply[1:3]) + bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4]) + firmware.append(_FirmwareInfo(_hidpp20.FIRMWARE_KIND.Bootloader, '', bl_version, None)) + + self._firmware = tuple(firmware) + + return self._firmware + + def enable_notifications(self, enable=True): + """Enable or disable device (dis)connection events on this receiver.""" + if not self.handle: + return False + if enable: + # set all possible flags + ok = self.request(0x8000, 0xFF, 0xFF) # and self.request(0x8002, 0x02) + else: + # clear out all possible flags + ok = self.request(0x8000) + + if ok: + _log.info("device notifications %s", 'enabled' if enable else 'disabled') + else: + _log.warn("failed to %s device notifications", 'enable' if enable else 'disable') + return ok + + def notify_devices(self): + """Scan all devices.""" + if self.handle: + if not self.request(0x8002, 0x02): + _log.warn("failed to trigger device events") + + def set_lock(self, lock_closed=True, device=0, timeout=0): + if self.handle: + lock = 0x02 if lock_closed else 0x01 + reply = self.request(0x80B2, lock, device, timeout) + if reply: + return True + _log.warn("failed to %s the receiver lock", 'close' if lock_closed else 'open') + + def request(self, request_id, *params): + if self.handle: + return _base.request(self.handle, 0xFF, request_id, *params) + + def __iter__(self): + for number in range(1, 1 + MAX_PAIRED_DEVICES): + dev = self.__getitem__(number) + if dev is not None: + yield dev + + def __getitem__(self, key): + if not self.handle: + return None + + if key in self._devices: + return self._devices[key] + + if type(key) != int: + raise TypeError('key must be an integer') + if key < 1 or key > MAX_PAIRED_DEVICES: + raise IndexError(key) + + dev = Receiver.create_device(self, key) + if dev.wpid: + self._devices[key] = dev + return dev + + # no paired device at this index + self._devices[key] = None + + def __delitem__(self, key): + if self._devices.get(key) is None: + raise IndexError(key) + + dev = self._devices[key] + reply = self.request(0x80B2, 0x03, int(key)) + if reply: + del self._devices[key] + _log.warn("%s unpaired device %s", self, dev) + else: + _log.error("%s failed to unpair device %s", self, dev) + raise IndexError(key) + + def __len__(self): + count = self.request(0x8102) + return 0 if count is None else ord(count[1:2]) + + def __contains__(self, dev): + if type(dev) == int: + return dev in self._devices + + return self.__contains__(dev.number) + + def __str__(self): + return '' % (self.handle, self.path) + + __bool__ = __nonzero__ = lambda self: self.handle is not None + + @classmethod + def open(self): + """Opens the first Logitech Unifying Receiver found attached to the machine. + + :returns: An open file handle for the found receiver, or ``None``. + """ + exception = None + + for rawdevice in _base.receivers(): + exception = None + try: + handle = _base.open_path(rawdevice.path) + if handle: + return Receiver(handle, rawdevice.path) + except OSError as e: + _log.exception("open %s", rawdevice.path) + if e.errno == _errno.EACCES: + exception = e + + if exception: + # only keep the last exception + raise exception diff --git a/lib/logitech/unifying_receiver/status.py b/lib/logitech/unifying_receiver/status.py new file mode 100644 index 00000000..8cffc2f0 --- /dev/null +++ b/lib/logitech/unifying_receiver/status.py @@ -0,0 +1,231 @@ +# +# +# + +from time import time as _timestamp +from struct import unpack as _unpack + +from logging import getLogger, DEBUG as _DEBUG +_log = getLogger('LUR.status') +del getLogger + +from .common import NamedInts as _NamedInts +from . import hidpp10 as _hidpp10 +from . import hidpp20 as _hidpp20 + +# +# +# + +ALERT = _NamedInts(NONE=0x00, LOW=0x01, MED=0x02, HIGH=0xFF) + +# device properties that may be reported +ENCRYPTED='encrypted' +BATTERY_LEVEL='battery-level' +BATTERY_STATUS='battery-status' +LIGHT_LEVEL='light-level' +ERROR='error' + +# +# +# + +class ReceiverStatus(dict): + def __init__(self, receiver, changed_callback): + assert receiver + self._receiver = receiver + + assert changed_callback + self._changed_callback = changed_callback + + # self.updated = 0 + + self.lock_open = False + self[ERROR] = None + + def __str__(self): + count = len([1 for d in self._receiver if d is not None]) + return ('No devices found.' if count == 0 else + '1 device found.' if count == 1 else + '%d devices found.' % count) + + def _changed(self, alert=ALERT.LOW, reason=None): + # self.updated = _timestamp() + self._changed_callback(self._receiver, alert=alert, reason=reason) + + def process_event(self, event): + if event.sub_id == 0x4A: + self.lock_open = bool(event.address & 0x01) + reason = 'pairing lock is ' + ('open' if self.lock_open else 'closed') + _log.info("%s: %s", self._receiver, reason) + pair_error = ord(event.data[:1]) + if pair_error: + self[ERROR] = _hidpp10.PAIRING_ERRORS[pair_error] + _log.warn("pairing error %d: %s", pair_error, self[ERROR]) + else: + self[ERROR] = None + self._changed(reason=reason) + +# +# +# + +class DeviceStatus(dict): + def __init__(self, device, changed_callback): + assert device + self._device = device + + assert changed_callback + self._changed_callback = changed_callback + + self._active = None + self.updated = 0 + + def __str__(self): + t = [] + if self.get(BATTERY_LEVEL) is not None: + b = 'Battery: %d%%' % self[BATTERY_LEVEL] + if self.get(BATTERY_STATUS): + b += ' (' + self[BATTERY_STATUS] + ')' + t.append(b) + if self.get(LIGHT_LEVEL) is not None: + t.append('Light: %d lux' % self[LIGHT_LEVEL]) + return ', '.join(t) + + def __bool__(self): + return self.updated and self._active + __nonzero__ = __bool__ + + def _changed(self, active=True, alert=ALERT.NONE, reason=None): + assert self._changed_callback + self._active = active + if not active: + battery = self.get(BATTERY_LEVEL) + self.clear() + if battery is not None: + self[BATTERY_LEVEL] = battery + if self.updated == 0: + alert |= ALERT.LOW + self.updated = _timestamp() + if _log.isEnabledFor(_DEBUG): + _log.debug("device %d changed: active=%s %s", self._device.number, self._active, dict(self)) + self._changed_callback(self._device, alert, reason) + + # @property + # def battery(self): + # battery = _hidpp10.get_battery_level(self) + # if battery is None: + # battery = _hidpp20.get_battery_level(self) + # return battery + + def process_event(self, event): + if event.sub_id == 0x40: + if event.address == 0x02: + # device un-paired + self.clear() + self._device.status = None + self._changed(False, ALERT.HIGH, 'unpaired') + self._device = None + else: + _log.warn("device %d disconnection notification %s with unknown type %02X", self._device.number, event, event.address) + + elif event.sub_id == 0x41: + if event.address == 0x04: # unifying protocol + # wpid = _strhex(event.data[4:5] + event.data[3:4]) + # assert wpid == device.wpid + + flags = ord(event.data[:1]) & 0xF0 + link_encrypyed = bool(flags & 0x20) + link_established = not (flags & 0x40) + if _log.isEnabledFor(_DEBUG): + sw_present = bool(flags & 0x10) + has_payload = bool(flags & 0x80) + _log.debug("device %d connection notification: software=%s, encrypted=%s, link=%s, payload=%s", + self._device.number, sw_present, link_encrypyed, link_established, has_payload) + self[ENCRYPTED] = link_encrypyed + self._changed(link_established) + + elif event.address == 0x03: + _log.warn("device %d connection notification %s with eQuad protocol, ignored", self._device.number, event) + + else: + _log.warn("device %d connection notification %s with unknown protocol %02X", self._device.number, event, event.address) + + elif event.sub_id < 0x40: + # a feature event, assuming no device has more than 0x40 features + if event.sub_id >= len(self._device.features): + _log.warn("device %d got event from unknown feature index %02X", self._device.number, event.sub_id) + return None + + feature = self._device.features[event.sub_id] + + if feature == _hidpp20.FEATURE.BATTERY: + if event.address == 0x00: + discharge = ord(event.data[:1]) + battery_status = ord(event.data[1:2]) + self[BATTERY_LEVEL] = discharge + self[BATTERY_STATUS] = BATTERY_STATUS[battery_status] + if _hidpp20.BATTERY_OK(battery_status): + alert = ALERT.NONE + reason = self[ERROR] = None + else: + alert = ALERT.MED + reason = self[ERROR] = self[BATTERY_STATUS] + self._changed(alert=alert, reason=reason) + else: + _log.warn("don't know how to handle BATTERY event %s", event) + + elif feature == _hidpp20.FEATURE.REPROGRAMMABLE_KEYS: + if event.address == 0x00: + _log.debug('reprogrammable key: %s', event) + else: + _log.warn("don't know how to handle REPROGRAMMABLE KEYS event %s", event) + + elif feature == _hidpp20.FEATURE.WIRELESS: + if event.address == 0x00: + _log.debug("wireless status: %s", event) + if event.data[0:3] == b'\x01\x01\x01': + self._changed(alert=ALERT.LOW, reason='powered on') + else: + _log.warn("don't know how to handle WIRELESS event %s", event) + + elif feature == _hidpp20.FEATURE.SOLAR_CHARGE: + if event.data[5:9] == b'GOOD': + charge, lux, adc = _unpack('!BHH', event.data[:5]) + self[BATTERY_LEVEL] = charge + # guesstimate the battery voltage, emphasis on 'guess' + self[BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672) + if event.address == 0x00: + self[LIGHT_LEVEL] = None + self._changed() + elif event.address == 0x10: + self[LIGHT_LEVEL] = lux + if lux > 200: # guesstimate + self[BATTERY_STATUS] += ', charging' + self._changed() + elif event.address == 0x20: + _log.debug("Solar key pressed") + # first cancel any reporting + self._device.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE) + reports_count = 10 + reports_period = 3 # seconds + self._changed(alert=ALERT.MED) + # trigger a new report chain + self._device.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE, 0x00, reports_count, reports_period) + else: + self._changed() + else: + _log.warn("SOLAR_CHARGE event not GOOD? %s", event) + + elif feature == _hidpp20.FEATURE.TOUCH_MOUSE: + if event.address == 0x00: + _log.debug("TOUCH MOUSE points event: %s", event) + elif event.address == 0x10: + touch = ord(event.data[:1]) + button_down = bool(touch & 0x02) + mouse_lifted = bool(touch & 0x01) + _log.debug("TOUCH MOUSE status: button_down=%s mouse_lifted=%s", button_down, mouse_lifted) + + else: + _log.warn("don't know how to handle event %s", event) +