diff --git a/app/pairing.py b/app/pairing.py index 15b24d12..5b6efdea 100644 --- a/app/pairing.py +++ b/app/pairing.py @@ -3,10 +3,11 @@ # from logging import getLogger as _Logger - _l = _Logger('pairing') +state = None + class State(object): TICK = 300 PAIR_TIMEOUT = 60 * 1000 / TICK @@ -15,6 +16,9 @@ class State(object): self._watcher = watcher self.reset() + def device(self, number): + return self._watcher.receiver.devices.get(number) + def reset(self): self.success = None self.detected_device = None @@ -72,8 +76,6 @@ class State(object): return True - -def unpair(receiver, devnumber): - reply = receiver.request(0xFF, b'\x80\xB2', b'\x03' + chr(devnumber)) - _l.debug("unpair %d reply %s", devnumber, repr(reply)) - + def unpair(self, number): + _l.debug("unpair %d", number) + self._watcher.receiver.unpair_device(number) diff --git a/app/receiver.py b/app/receiver.py index b978e6e2..4289ed8c 100644 --- a/app/receiver.py +++ b/app/receiver.py @@ -6,7 +6,7 @@ from logging import getLogger as _Logger _LOG_LEVEL = 6 from threading import Event as _Event -from binascii import hexlify as _hexlify +from struct import pack as _pack from logitech.unifying_receiver import base as _base from logitech.unifying_receiver import api as _api @@ -22,7 +22,7 @@ class DeviceInfo(object): """A device attached to the receiver. """ def __init__(self, receiver, number, status=STATUS.UNKNOWN): - self.LOG = _Logger("Device-%d" % number) + self.LOG = _Logger("Device[%d]" % number) self.receiver = receiver self.number = number self._name = None @@ -50,6 +50,9 @@ class DeviceInfo(object): self._status = new_status self.receiver._device_changed(self, urgent) + if new_status < STATUS.CONNECTED: + self.props.clear() + @property def status_text(self): if self._status < STATUS.CONNECTED: @@ -160,7 +163,7 @@ class Receiver(_listener.EventsListener): self.status_changed.urgent = False self.status_changed.reason = None - self.LOG = _Logger("Receiver-%s" % path) + self.LOG = _Logger("Receiver[%s]" % path) self.LOG.info("initializing") self._serial = None @@ -170,8 +173,7 @@ class Receiver(_listener.EventsListener): self.events_filter = None self.events_handler = None - if (_base.request(handle, 0xFF, b'\x81\x00') and - _base.request(handle, 0xFF, b'\x80\x00', b'\x00\x01')): + if _base.request(handle, 0xFF, b'\x80\x00', b'\x00\x01'): self.LOG.info("initialized") else: self.LOG.warn("initialization failed") @@ -256,7 +258,7 @@ class Receiver(_listener.EventsListener): STATUS.CONNECTED if state_code == 0x20 else \ None if state is None: - self.LOG.warn("don't know how to handle status 0x%02x: %s", state_code, event) + self.LOG.warn("don't know how to handle status 0x%02X: %s", state_code, event) else: self.devices[event.devnumber].status = state return @@ -298,7 +300,7 @@ class Receiver(_listener.EventsListener): STATUS.CONNECTED if state_code == 0x20 else \ None if state is None: - self.LOG.warn("don't know how to handle device status 0x%02x: %s", state_code, event) + self.LOG.warn("don't know how to handle device status 0x%02X: %s", state_code, event) return None dev = DeviceInfo(self, event.devnumber, state) @@ -318,11 +320,24 @@ class Receiver(_listener.EventsListener): b[0] -= 0x10 serial = self.request(0xFF, b'\x83\xB5', bytes(b)) if serial: - dev._serial = _hexlify(serial[1:5]).decode('ascii').upper() + dev._serial = _base._hex(serial[1:5]) return dev + def unpair_device(self, number): + if number in self.devices: + dev = self.devices[number] + reply = self.request(0xFF, b'\x80\xB2', _pack('!BB', 0x03, number)) + if reply: + self.LOG.debug("remove device %s => %s", dev, _base._hex(reply)) + del self.devices[number] + self.LOG.warn("unpaired device %s", dev) + self.status = STATUS.CONNECTED + len(self.devices) + return True + self.LOG.warn("failed to unpair device %s", dev) + return False + def __str__(self): - return 'Receiver(%s,%x,%d:%d)' % (self.path, self._handle, self._active, self._status) + return 'Receiver(%s,%X,%d)' % (self.path, self._handle, self._status) @classmethod def open(self): @@ -332,7 +347,6 @@ class Receiver(_listener.EventsListener): """ for rawdevice in _base.list_receiver_devices(): _Logger("receiver").log(_LOG_LEVEL, "checking %s", rawdevice) - handle = _base.try_open(rawdevice.path) if handle: receiver = Receiver(rawdevice.path, handle) diff --git a/app/solaar.py b/app/solaar.py index 0d4806bd..8477dfd2 100644 --- a/app/solaar.py +++ b/app/solaar.py @@ -58,6 +58,8 @@ if __name__ == '__main__': watcher.DUMMY.NAME, watcher.DUMMY.max_devices, args.systray) + ui.action.pair.window = window + ui.action.unpair.window = window if args.systray: menu_actions = (ui.action.pair, @@ -74,8 +76,7 @@ if __name__ == '__main__': w.start() import pairing - ui.action.pair.connect('activate', ui.action._pair_device, - window, pairing.State(w)) + pairing.state = pairing.State(w) from gi.repository import Gtk Gtk.main() diff --git a/app/ui/__init__.py b/app/ui/__init__.py index 4552c6ff..8a74e223 100644 --- a/app/ui/__init__.py +++ b/app/ui/__init__.py @@ -15,14 +15,14 @@ def appicon(receiver_status): APPNAME) -_THEME = Gtk.IconTheme.get_default() +_ICON_THEME = Gtk.IconTheme.get_default() def get_icon(name, fallback): - return name if name and _THEME.has_icon(name) else fallback + return name if name and _ICON_THEME.has_icon(name) else fallback def icon_file(name): - if name and _THEME.has_icon(name): - return _THEME.lookup_icon(name, 0, 0).get_filename() + if name and _ICON_THEME.has_icon(name): + return _ICON_THEME.lookup_icon(name, 0, 0).get_filename() return None diff --git a/app/ui/action.py b/app/ui/action.py index 35f681b3..5d9bfd9a 100644 --- a/app/ui/action.py +++ b/app/ui/action.py @@ -46,14 +46,32 @@ def _show_about_window(action): about.destroy() about = _action('help-about', 'About ' + ui.APPNAME, _show_about_window) +quit = _action('exit', 'Quit', Gtk.main_quit) -def _pair_device(action, window, state): +# +# +# + +import pairing + +def _pair_device(action): action.set_sensitive(False) - pair_dialog = ui.pair_window.create(action, state) - # window.present() - # pair_dialog.set_transient_for(parent_window) - # pair_dialog.set_destroy_with_parent(parent_window) - # pair_dialog.set_modal(True) + pair_dialog = ui.pair_window.create(action, pairing.state) + action.window.present() + pair_dialog.set_transient_for(action.window) + pair_dialog.set_destroy_with_parent(action.window) + pair_dialog.set_modal(True) pair_dialog.present() -pair = _action('add', 'Pair new device', None) -pair.set_sensitive(False) +pair = _action('add', 'Pair new device', _pair_device) + + +def _unpair_device(action): + dev = pairing.state.device(action.devnumber) + action.devnumber = 0 + if dev: + q = Gtk.MessageDialog.new(action.window, + Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, + 'Unpair device %s?', dev.name) + if q.run() == Gtk.ResponseType.YES: + pairing.state.unpair(dev.number) +unpair = _action('remove', 'Unpair', _unpair_device) diff --git a/app/ui/main_window.py b/app/ui/main_window.py index 49d806b0..ec4aaf98 100644 --- a/app/ui/main_window.py +++ b/app/ui/main_window.py @@ -17,18 +17,20 @@ _PLACEHOLDER = '~' # # -def _show_info(action, widget): - widget.set_visible(action.get_active()) +def _toggle_info_button(label, widget): + toggle = lambda a, w: w.set_visible(a.get_active()) + action = ui.action._toggle_action('info', label, toggle, widget) + return action.create_tool_item() def _receiver_box(name): icon = Gtk.Image.new_from_icon_name(name, _SMALL_DEVICE_ICON_SIZE) label = Gtk.Label('Initializing...') - label.set_name('status-label') + label.set_name('label') label.set_alignment(0, 0.5) toolbar = Gtk.Toolbar() - toolbar.set_name('buttons') + toolbar.set_name('toolbar') toolbar.set_style(Gtk.ToolbarStyle.ICONS) toolbar.set_icon_size(Gtk.IconSize.MENU) toolbar.set_show_arrow(False) @@ -41,22 +43,25 @@ def _receiver_box(name): info_label = Gtk.Label() info_label.set_name('info-label') info_label.set_alignment(0, 0.5) - info_label.set_padding(32, 2) + info_label.set_padding(8, 2) info_label.set_selectable(True) - info_action = ui.action._toggle_action('info', 'Receiver info', _show_info, info_label) - toolbar.insert(info_action.create_tool_item(), 0) + info_box = Gtk.Frame() + info_box.add(info_label) + info_box.set_shadow_type(Gtk.ShadowType.ETCHED_IN) + + toolbar.insert(_toggle_info_button('Receiver info', info_box), 0) toolbar.insert(ui.action.pair.create_tool_item(), -1) - vbox = Gtk.VBox(homogeneous=False, spacing=4) + vbox = Gtk.VBox(homogeneous=False, spacing=2) vbox.set_border_width(4) vbox.pack_start(hbox, True, True, 0) - vbox.pack_start(info_label, True, True, 0) + vbox.pack_start(info_box, True, True, 0) frame = Gtk.Frame() frame.add(vbox) frame.show_all() - info_label.set_visible(False) + info_box.set_visible(False) return frame @@ -68,7 +73,7 @@ def _device_box(): label = Gtk.Label('Initializing...') label.set_name('label') label.set_alignment(0, 0.5) - label.set_padding(0, 2) + label.set_padding(4, 4) battery_icon = Gtk.Image.new_from_icon_name('battery_unknown', _STATUS_ICON_SIZE) @@ -83,7 +88,7 @@ def _device_box(): light_label.set_width_chars(8) toolbar = Gtk.Toolbar() - toolbar.set_name('buttons') + toolbar.set_name('toolbar') toolbar.set_style(Gtk.ToolbarStyle.ICONS) toolbar.set_icon_size(Gtk.IconSize.MENU) toolbar.set_show_arrow(False) @@ -99,21 +104,21 @@ def _device_box(): info_label = Gtk.Label() info_label.set_name('info-label') info_label.set_alignment(0, 0.5) - info_label.set_padding(6, 2) + info_label.set_padding(8, 2) info_label.set_selectable(True) - info_action = ui.action._toggle_action('info', 'Device info', _show_info, info_label) - toolbar.insert(info_action.create_tool_item(), 0) + info_box = Gtk.Frame() + info_box.add(info_label) - unpair_action = ui.action._action('remove', 'Unpair', None) - toolbar.insert(unpair_action.create_tool_item(), -1) + toolbar.insert(_toggle_info_button('Device info', info_box), 0) + toolbar.insert(ui.action.unpair.create_tool_item(), -1) - vbox = Gtk.VBox(homogeneous=False, spacing=8) + vbox = Gtk.VBox(homogeneous=False, spacing=4) vbox.pack_start(label, True, True, 0) vbox.pack_start(status_box, True, True, 0) - vbox.pack_start(info_label, True, True, 0) + vbox.pack_start(info_box, True, True, 0) - box = Gtk.HBox(homogeneous=False, spacing=10) + box = Gtk.HBox(homogeneous=False, spacing=4) box.set_border_width(4) box.pack_start(icon, False, False, 0) box.pack_start(vbox, True, True, 0) @@ -121,7 +126,7 @@ def _device_box(): frame = Gtk.Frame() frame.add(box) - info_label.set_visible(False) + info_box.set_visible(False) return frame @@ -160,7 +165,7 @@ def create(title, name, max_devices, systray=False): window.add(vbox) geometry = Gdk.Geometry() - geometry.min_width = 360 + geometry.min_width = 320 geometry.min_height = 20 window.set_geometry_hints(vbox, geometry, Gdk.WindowHints.MIN_SIZE) window.set_resizable(False) @@ -189,18 +194,18 @@ def _info_text(dev): def _update_receiver_box(frame, receiver): - label, toolbar, info_label = ui.find_children(frame, 'status-label', 'buttons', 'info-label') + label, toolbar, info_label = ui.find_children(frame, 'label', 'toolbar', 'info-label') label.set_text(receiver.status_text or '') + if receiver.status < STATUS.CONNECTED: toolbar.set_sensitive(False) - info_label.set_visible(False) + toolbar.get_children()[0].set_active(False) info_label.set_text('') else: toolbar.set_sensitive(True) if not info_label.get_text(): info_label.set_markup(_info_text(receiver)) - info_label.set_visible(toolbar.get_children()[0].get_active()) def _update_device_box(frame, dev): @@ -209,50 +214,47 @@ def _update_device_box(frame, dev): frame.set_name(_PLACEHOLDER) return - icon, label, toolbar, info_label = ui.find_children(frame, 'icon', 'label', 'buttons', 'info-label') + icon, label, info_label = ui.find_children(frame, 'icon', 'label', 'info-label') - frame.set_visible(True) if frame.get_name() != dev.name: frame.set_name(dev.name) - icon.set_tooltip_text('') icon.set_from_icon_name(ui.get_icon(dev.name, dev.kind), _DEVICE_ICON_SIZE) label.set_markup('' + dev.name + '') + frame.set_visible(True) status = ui.find_children(frame, 'status') + status_icons = status.get_children() + toolbar = status_icons[-1] if dev.status < STATUS.CONNECTED: + icon.set_sensitive(False) label.set_sensitive(False) status.set_sensitive(False) - info_label.set_visible(False) + for c in status_icons[1:-1]: + c.set_visible(False) + toolbar.get_children()[0].set_active(False) return + icon.set_sensitive(True) label.set_sensitive(True) status.set_sensitive(True) - info_label.set_visible(toolbar.get_children()[0].get_active()) - if not info_label.get_text(): info_label.set_markup(_info_text(dev)) - status_icons = status.get_children() - battery_icon, battery_label = status_icons[0:2] battery_level = dev.props.get(PROPS.BATTERY_LEVEL) if battery_level is None: - battery_icon.set_sensitive(False) battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE) - battery_label.set_sensitive(False) - battery_label.set_text('') + battery_icon.set_sensitive(False) + battery_label.set_visible(False) else: - battery_icon.set_sensitive(True) icon_name = 'battery_%03d' % (20 * ((battery_level + 10) // 20)) battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE) - battery_label.set_sensitive(True) + battery_icon.set_sensitive(True) battery_label.set_text('%d%%' % battery_level) + battery_label.set_visible(True) battery_status = dev.props.get(PROPS.BATTERY_STATUS) - if battery_status is None: - battery_icon.set_tooltip_text('') - else: - battery_icon.set_tooltip_text(battery_status) + battery_icon.set_tooltip_text(battery_status or '') light_icon, light_label = status_icons[2:4] light_level = dev.props.get(PROPS.LIGHT_LEVEL) @@ -260,11 +262,14 @@ def _update_device_box(frame, dev): light_icon.set_visible(False) light_label.set_visible(False) else: - light_icon.set_visible(True) icon_name = 'light_%03d' % (20 * ((light_level + 50) // 100)) light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE) - light_label.set_visible(True) + light_icon.set_visible(True) light_label.set_text('%d lux' % light_level) + light_label.set_visible(True) + + for b in toolbar.get_children()[:-1]: + b.set_sensitive(True) def update(window, receiver): diff --git a/app/ui/status_icon.py b/app/ui/status_icon.py index 654d3a1f..efee4426 100644 --- a/app/ui/status_icon.py +++ b/app/ui/status_icon.py @@ -19,8 +19,7 @@ def create(window, menu_actions=None): if action: menu.append(action.create_menu_item()) - quit_action = ui.action._action('exit', 'Quit', Gtk.main_quit) - menu.append(quit_action.create_menu_item()) + menu.append(ui.action.quit.create_menu_item()) menu.show_all() icon.connect('popup_menu', diff --git a/app/watcher.py b/app/watcher.py index 0609a0be..b46578f3 100644 --- a/app/watcher.py +++ b/app/watcher.py @@ -10,7 +10,7 @@ from logitech.devices.constants import STATUS from receiver import Receiver -class _DUMMY_RECEIVER: +class _DUMMY_RECEIVER(object): NAME = Receiver.NAME device_name = NAME kind = Receiver.NAME @@ -18,7 +18,7 @@ class _DUMMY_RECEIVER: status_text = 'Receiver not found.' max_devices = Receiver.max_devices devices = {} - def __nonzero__(self): return False + __bool__ = __nonzero__ = lambda self: False DUMMY = _DUMMY_RECEIVER() _l = _Logger('watcher') @@ -91,7 +91,8 @@ class Watcher(Thread): sc = self._receiver.status_changed sc.wait() sc.clear() - _l.debug("status_changed %s %d", sc.reason, sc.urgent) + if sc.urgent: + _l.info("status_changed %s", sc.reason) self.update_ui(self._receiver) if sc.reason and sc.urgent: self.notify(sc.reason) diff --git a/bin/ur_scanner b/bin/scan similarity index 73% rename from bin/ur_scanner rename to bin/scan index 202049fb..bbb8b839 100755 --- a/bin/ur_scanner +++ b/bin/scan @@ -4,4 +4,4 @@ LIB=`dirname "$0"`/../lib export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`uname -m` export PYTHONPATH=$LIB -exec python -OO -m logitech.ur_scanner "$@" +exec python -OO -m logitech.scanner "$@" diff --git a/bin/solaar b/bin/solaar index ab108e9b..9c5cc574 100755 --- a/bin/solaar +++ b/bin/solaar @@ -9,5 +9,5 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`uname -m` export PYTHONPATH=$APP:$LIB export XDG_DATA_DIRS=$SHARE:$XDG_DATA_DIRS -exec python -OO -m solaar "$@" +exec python3 -OO -m solaar "$@" #exec python -OO -m profile -o $TMPDIR/profile.log app/solaar.py "$@" diff --git a/lib/hidapi/hidconsole.py b/lib/hidapi/hidconsole.py index 86ea255e..7356ac90 100644 --- a/lib/hidapi/hidconsole.py +++ b/lib/hidapi/hidconsole.py @@ -3,6 +3,7 @@ import sys import time from binascii import hexlify, unhexlify +_hex = lambda d: hexlify(d).decode('ascii').upper() start_time = 0 @@ -13,7 +14,7 @@ except: def _print(marker, data, scroll=False): - hexs = str(hexlify(data)) + hexs = _hex(data) t = time.time() - start_time s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data)) @@ -44,17 +45,15 @@ def _continuous_read(handle, timeout=1000): if __name__ == '__main__': import argparse arg_parser = argparse.ArgumentParser() - arg_parser.add_argument('--history', default='.hidconsole-history', - help='history file') - arg_parser.add_argument('device', default=None, - help='linux device to connect to') + arg_parser.add_argument('--history', default='.hidconsole-history', help='history file') + arg_parser.add_argument('device', default=None, help='linux device to connect to') args = arg_parser.parse_args() import hidapi print (".. Opening device %s" % args.device) handle = hidapi.open_path(args.device.encode('utf-8')) if handle: - print (".. Opened handle %x, vendor %s product %s serial %s" % (handle, + print (".. Opened handle %X, vendor %s product %s serial %s" % (handle, repr(hidapi.get_manufacturer(handle)), repr(hidapi.get_product(handle)), repr(hidapi.get_serial(handle)))) @@ -84,7 +83,7 @@ if __name__ == '__main__': except Exception as e: pass - print (".. Closing handle %x" % handle) + print (".. Closing handle %X" % handle) hidapi.close(handle) readline.write_history_file(args.history) else: diff --git a/lib/hidapi/native.py b/lib/hidapi/native.py index 68dbcada..2204cbab 100644 --- a/lib/hidapi/native.py +++ b/lib/hidapi/native.py @@ -81,7 +81,7 @@ del namedtuple # create a DeviceInfo tuple from a hid_device object def _makeDeviceInfo(native_device_info): return DeviceInfo( - path=native_device_info.path, + path=native_device_info.path.decode('ascii'), vendor_id=hex(native_device_info.vendor_id)[2:], product_id=hex(native_device_info.product_id)[2:], serial=native_device_info.serial if native_device_info.serial else None, @@ -222,6 +222,8 @@ def open_path(device_path): :returns: an opaque device handle, or ``None``. """ + if type(device_path) == str: + device_path = device_path.encode('ascii') return _native.hid_open_path(device_path) or None diff --git a/lib/logitech/ur_scanner.py b/lib/logitech/scanner.py similarity index 95% rename from lib/logitech/ur_scanner.py rename to lib/logitech/scanner.py index 21de2a0c..c24a13c5 100644 --- a/lib/logitech/ur_scanner.py +++ b/lib/logitech/scanner.py @@ -4,8 +4,6 @@ import logging logging.basicConfig(level=logging.DEBUG) -from binascii import hexlify - from .unifying_receiver import api from .unifying_receiver.constants import * @@ -41,7 +39,7 @@ def scan_devices(receiver): for index in range(0, len(devinfo.features)): feature = devinfo.features[index] if feature: - print (" ~ Feature %s (%s) at index %d" % (FEATURE_NAME[feature], hexlify(feature), index)) + print (" ~ Feature %-20s (%s) at index %d" % (FEATURE_NAME[feature], api._hex(feature), index)) if FEATURE.BATTERY in devinfo.features: discharge, dischargeNext, status = api.get_device_battery_level(receiver, devinfo.number, features=devinfo.features) diff --git a/lib/logitech/unifying_receiver/api.py b/lib/logitech/unifying_receiver/api.py index 27a76cc4..50baf621 100644 --- a/lib/logitech/unifying_receiver/api.py +++ b/lib/logitech/unifying_receiver/api.py @@ -5,7 +5,7 @@ from logging import getLogger as _Logger from struct import pack as _pack from struct import unpack as _unpack -from binascii import hexlify as _hexlify + from . import base as _base from .common import (FirmwareInfo as _FirmwareInfo, @@ -18,6 +18,7 @@ from .constants import (FEATURE, FEATURE_NAME, FEATURE_FLAGS, from .exceptions import FeatureNotSupported as _FeatureNotSupported +_hex = _base._hex _LOG_LEVEL = 5 _l = _Logger('lur.api') @@ -40,19 +41,19 @@ def get_receiver_info(handle): serial = None reply = _base.request(handle, 0xFF, b'\x83\xB5', b'\x03') if reply and reply[0:1] == b'\x03': - serial = _hexlify(reply[1:5]).decode('ascii').upper() + serial = _hex(reply[1:5]) firmware = [] reply = _base.request(handle, 0xFF, b'\x83\xB5', b'\x02') if reply and reply[0:1] == b'\x02': - fw_version = _hexlify(reply[1:5]).decode('ascii') + fw_version = _hex(reply[1:5]) fw_version = '%s.%s.%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8]) firmware.append(_FirmwareInfo(0, FIRMWARE_KIND[0], '', fw_version, None)) reply = _base.request(handle, 0xFF, b'\x81\xF1', b'\x04') if reply and reply[0:1] == b'\x04': - bl_version = _hexlify(reply[1:3]).decode('ascii') + bl_version = _hex(reply[1:3]) bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4]) firmware.append(_FirmwareInfo(1, FIRMWARE_KIND[1], '', bl_version, None)) @@ -101,7 +102,7 @@ def request(handle, devnumber, feature, function=b'\x00', params=b'', features=N feature_index = _pack('!B', features.index(feature)) if feature_index is None: - _l.warn("(%d) feature <%s:%s> not supported", devnumber, _hexlify(feature), FEATURE_NAME[feature]) + _l.warn("(%d) feature <%s:%s> not supported", devnumber, _hex(feature), FEATURE_NAME[feature]) raise _FeatureNotSupported(devnumber, feature) if type(function) == int: @@ -182,7 +183,7 @@ def get_feature_index(handle, devnumber, feature): :returns: An int, or ``None`` if the feature is not available. """ - _l.log(_LOG_LEVEL, "(%d) get feature index <%s:%s>", devnumber, _hexlify(feature), FEATURE_NAME[feature]) + _l.log(_LOG_LEVEL, "(%d) get feature index <%s:%s>", devnumber, _hex(feature), FEATURE_NAME[feature]) if len(feature) != 2: raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature) @@ -196,17 +197,17 @@ def get_feature_index(handle, devnumber, feature): if _l.isEnabledFor(_LOG_LEVEL): if feature_flags: _l.log(_LOG_LEVEL, "(%d) feature <%s:%s> has index %d: %s", - devnumber, _hexlify(feature), FEATURE_NAME[feature], feature_index, + devnumber, _hex(feature), FEATURE_NAME[feature], feature_index, ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) else: - _l.log(_LOG_LEVEL, "(%d) feature <%s:%s> has index %d", devnumber, _hexlify(feature), FEATURE_NAME[feature], feature_index) + _l.log(_LOG_LEVEL, "(%d) feature <%s:%s> has index %d", devnumber, _hex(feature), FEATURE_NAME[feature], feature_index) # if feature_flags: # raise E.FeatureNotSupported(devnumber, feature) return feature_index - _l.warn("(%d) feature <%s:%s> not supported by the device", devnumber, _hexlify(feature), FEATURE_NAME[feature]) + _l.warn("(%d) feature <%s:%s> not supported by the device", devnumber, _hex(feature), FEATURE_NAME[feature]) raise _FeatureNotSupported(devnumber, feature) @@ -252,10 +253,10 @@ def get_device_features(handle, devnumber): if _l.isEnabledFor(_LOG_LEVEL): if feature_flags: _l.log(_LOG_LEVEL, "(%d) feature <%s:%s> at index %d: %s", - devnumber, _hexlify(feature), FEATURE_NAME[feature], index, + devnumber, _hex(feature), FEATURE_NAME[feature], index, ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) else: - _l.log(_LOG_LEVEL, "(%d) feature <%s:%s> at index %d", devnumber, _hexlify(feature), FEATURE_NAME[feature], index) + _l.log(_LOG_LEVEL, "(%d) feature <%s:%s> at index %d", devnumber, _hex(feature), FEATURE_NAME[feature], index) features[0] = FEATURE.ROOT while features[-1] is None: @@ -284,7 +285,7 @@ def get_device_firmware(handle, devnumber, features=None): kind = FIRMWARE_KIND[level] name, = _unpack('!3s', fw_info[1:4]) name = name.decode('ascii') - version = _hexlify(fw_info[4:6]).decode('ascii') + version = _hex(fw_info[4:6]) version = '%s.%s' % (version[0:2], version[2:4]) build, = _unpack('!H', fw_info[6:8]) if build: diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index eb871307..e15ba486 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -6,6 +6,7 @@ from logging import getLogger as _Logger from struct import pack as _pack from binascii import hexlify as _hexlify +_hex = lambda d: _hexlify(d).decode('ascii').upper() from .constants import ERROR_NAME from .exceptions import (NoReceiver as _NoReceiver, @@ -47,7 +48,7 @@ DEFAULT_TIMEOUT = 1000 def _logdebug_hook(reply_code, devnumber, data): """Default unhandled hook, logs the reply as DEBUG.""" - _l.debug("UNHANDLED %s", (reply_code, devnumber, reply_code, data)) + _l.warn("UNHANDLED [%02X %02X %s %s] (%s)", reply_code, devnumber, _hex(data[:2]), _hex(data[2:]), repr(data)) """The function that will be called on unhandled incoming events. @@ -77,6 +78,8 @@ def list_receiver_devices(): return _hid.enumerate(0x046d, 0xc52b, 2) +_PING_RECEIVER = b'\x10\xFF\x81\x00\x00\x00\x00' + def try_open(path): """Checks if the given Linux device path points to the right UR device. @@ -97,28 +100,28 @@ def try_open(path): _l.log(_LOG_LEVEL, "[%s] open failed", path) return None - _l.log(_LOG_LEVEL, "[%s] receiver handle %x", path, receiver_handle) + _l.log(_LOG_LEVEL, "[%s] receiver handle %X", path, receiver_handle) # ping on device id 0 (always an error) - _hid.write(receiver_handle, b'\x10\x00\x00\x10\x00\x00\xAA') + _hid.write(receiver_handle, _PING_RECEIVER) # if this is the right hidraw device, we'll receive a 'bad device' from the UR # otherwise, the read should produce nothing reply = _hid.read(receiver_handle, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) if reply: - if reply[:5] == b'\x10\x00\x8F\x00\x10': + if reply[:5] == _PING_RECEIVER[:5]: # 'device 0 unreachable' is the expected reply from a valid receiver handle - _l.log(_LOG_LEVEL, "[%s] success: handle %x", path, receiver_handle) + _l.log(_LOG_LEVEL, "[%s] success: handle %X", path, receiver_handle) return receiver_handle # any other replies are ignored, and will assume this is the wrong Linux device if _l.isEnabledFor(_LOG_LEVEL): if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': # no idea what this is, but it comes up occasionally - _l.log(_LOG_LEVEL, "[%s] %x mistery reply [%s]", path, receiver_handle, _hexlify(reply).decode('ascii')) + _l.log(_LOG_LEVEL, "[%s] %X mistery reply [%s]", path, receiver_handle, _hex(reply)) else: - _l.log(_LOG_LEVEL, "[%s] %x unknown reply [%s]", path, receiver_handle, _hexlify(reply).decode('ascii')) + _l.log(_LOG_LEVEL, "[%s] %X unknown reply [%s]", path, receiver_handle, _hex(reply)) else: - _l.log(_LOG_LEVEL, "[%s] %x no reply", path, receiver_handle) + _l.log(_LOG_LEVEL, "[%s] %X no reply", path, receiver_handle) close(receiver_handle) @@ -143,10 +146,10 @@ def close(handle): if handle: try: _hid.close(handle) - _l.log(_LOG_LEVEL, "%x closed", handle) + _l.log(_LOG_LEVEL, "closed receiver handle %X", handle) return True except: - _l.exception("%x closing", handle) + _l.exception("closing receiver handle %X", handle) return False @@ -165,17 +168,15 @@ def write(handle, devnumber, data): been physically removed from the machine, or the kernel driver has been unloaded. The handle will be closed automatically. """ - # assert _MIN_CALL_SIZE == 7 - # assert _MAX_CALL_SIZE == 20 + if _l.isEnabledFor(_LOG_LEVEL): + _l.log(_LOG_LEVEL, "(%d) <= w[10 %02X %s %s]", devnumber, devnumber, _hex(data[:2]), _hex(data[2:])) + + assert _MIN_CALL_SIZE == 7 + assert _MAX_CALL_SIZE == 20 # the data is padded to either 5 or 18 bytes wdata = _pack('!BB18s' if len(data) > 5 else '!BB5s', 0x10, devnumber, data) - - if _l.isEnabledFor(_LOG_LEVEL): - hexs = _hexlify(wdata).decode('ascii') - _l.log(_LOG_LEVEL, "(%d) <= w[%s %s %s %s]", devnumber, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:]) - if not _hid.write(handle, wdata): - _l.warn("(%d) write failed, assuming receiver %x no longer available", devnumber, handle) + _l.warn("(%d) write failed, assuming receiver %X no longer available", devnumber, handle) close(handle) raise _NoReceiver @@ -198,20 +199,19 @@ def read(handle, timeout=DEFAULT_TIMEOUT): """ data = _hid.read(handle, _MAX_REPLY_SIZE * 2, timeout) if data is None: - _l.warn("(-) read failed, assuming receiver %x no longer available", handle) + _l.warn("(-) read failed, assuming receiver %X no longer available", handle) close(handle) raise _NoReceiver if data: if len(data) < _MIN_REPLY_SIZE: - _l.warn("(%d) => r[%s] read packet too short: %d bytes", ord(data[1:2]), _hexlify(data), len(data)) + _l.warn("(%d) => r[%s] read packet too short: %d bytes", ord(data[1:2]), _hex(data), len(data)) if len(data) > _MAX_REPLY_SIZE: - _l.warn("(%d) => r[%s] read packet too long: %d bytes", ord(data[1:2]), _hexlify(data), len(data)) - if _l.isEnabledFor(_LOG_LEVEL): - hexs = _hexlify(data).decode('ascii') - _l.log(_LOG_LEVEL, "(%d) => r[%s %s %s %s]", ord(data[1:2]), hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:]) + _l.warn("(%d) => r[%s] read packet too long: %d bytes", ord(data[1:2]), _hex(data), len(data)) code = ord(data[:1]) devnumber = ord(data[1:2]) + if _l.isEnabledFor(_LOG_LEVEL): + _l.log(_LOG_LEVEL, "(%d) => r[%02X %02X %s %s]", devnumber, code, devnumber, _hex(data[2:4]), _hex(data[4:])) return code, devnumber, data[2:] # _l.log(_LOG_LEVEL, "(-) => r[]", handle) @@ -235,11 +235,9 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None :raisees FeatureCallError: if the feature call replied with an error. """ if _l.isEnabledFor(_LOG_LEVEL): - _l.log(_LOG_LEVEL, "(%d) request {%s} params [%s]", devnumber, - _hexlify(feature_index_function).decode('ascii'), - _hexlify(params).decode('ascii')) + _l.log(_LOG_LEVEL, "(%d) request {%s} params [%s]", devnumber, _hex(feature_index_function), _hex(params)) if len(feature_index_function) != 2: - raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hexlify(feature_index_function)) + raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hex(feature_index_function)) retries = 5 @@ -257,7 +255,7 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None if reply_devnumber != devnumber: # this message not for the device we're interested in - # _l.log(_LOG_LEVEL, "(%d) request got reply for unexpected device %d: [%s]", devnumber, reply_devnumber, _hexlify(reply_data)) + # _l.log(_LOG_LEVEL, "(%d) request got reply for unexpected device %d: [%s]", devnumber, reply_devnumber, _hex(reply_data)) # worst case scenario, this is a reply for a concurrent request # on this receiver if unhandled_hook: @@ -266,20 +264,18 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == feature_index_function: # device not present - _l.log(_LOG_LEVEL, "(%d) request ping failed on {%s} call: [%s]", devnumber, - _hexlify(feature_index_function).decode('ascii'), - _hexlify(reply_data).decode('ascii')) + _l.log(_LOG_LEVEL, "(%d) request ping failed on {%s} call: [%s]", devnumber, _hex(feature_index_function), _hex(reply_data)) return None if reply_code == 0x10 and reply_data[:1] == b'\x8F': # device not present - _l.log(_LOG_LEVEL, "(%d) request ping failed: [%s]", devnumber, _hexlify(reply_data).decode('ascii')) + _l.log(_LOG_LEVEL, "(%d) request ping failed: [%s]", devnumber, _hex(reply_data)) return None if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function: # the feature call returned with an error error_code = ord(reply_data[3]) - _l.warn("(%d) request feature call error %d = %s: %s", devnumber, error_code, ERROR_NAME[error_code], _hexlify(reply_data).decode('ascii')) + _l.warn("(%d) request feature call error %d = %s: %s", devnumber, error_code, ERROR_NAME[error_code], _hex(reply_data)) feature_index = ord(feature_index_function[:1]) feature_function = feature_index_function[1:2] feature = None if features is None else features[feature_index] if feature_index < len(features) else None @@ -287,16 +283,14 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None if reply_code == 0x11 and reply_data[:2] == feature_index_function: # a matching reply - # _l.log(_LOG_LEVEL, "(%d) matched reply with feature-index-function [%s]", devnumber, _hexlify(reply_data[2:]).decode('ascii')) + # _l.log(_LOG_LEVEL, "(%d) matched reply with feature-index-function [%s]", devnumber, _hex(reply_data[2:])) return reply_data[2:] if reply_code == 0x10 and devnumber == 0xFF and reply_data[:2] == feature_index_function: # direct calls to the receiver (device 0xFF) may also return successfully with reply code 0x10 - # _l.log(_LOG_LEVEL, "(%d) matched reply with feature-index-function [%s]", devnumber, _hexlify(reply_data[2:]).decode('ascii')) + # _l.log(_LOG_LEVEL, "(%d) matched reply with feature-index-function [%s]", devnumber, _hex(reply_data[2:])) return reply_data[2:] - # _l.log(_LOG_LEVEL, "(%d) unmatched reply {%s} (expected {%s})", devnumber, - # _hexlify(reply_data[:2]).decode('ascii'), - # _hexlify(feature_index_function).decode('ascii')) + # _l.log(_LOG_LEVEL, "(%d) unmatched reply {%s} (expected {%s})", devnumber, _hex(reply_data[:2]), _hex(feature_index_function)) if unhandled_hook: unhandled_hook(reply_code, reply_devnumber, reply_data) diff --git a/lib/logitech/unifying_receiver/common.py b/lib/logitech/unifying_receiver/common.py index d13081ba..d3e0567f 100644 --- a/lib/logitech/unifying_receiver/common.py +++ b/lib/logitech/unifying_receiver/common.py @@ -2,8 +2,9 @@ # Some common functions and types. # -from binascii import hexlify as _hexlify from collections import namedtuple +from binascii import hexlify as _hexlify +_hex = lambda d: _hexlify(d).decode('ascii').upper() class FallbackDict(dict): @@ -50,6 +51,6 @@ ReprogrammableKeyInfo = namedtuple('ReprogrammableKeyInfo', [ class Packet(namedtuple('Packet', ['code', 'devnumber', 'data'])): def __str__(self): - return 'Packet(0x%02x,%d,%s)' % (self.code, self.devnumber, '' if self.data is None else _hexlify(self.data)) + return 'Packet(%02X,%02X,%s)' % (self.code, self.devnumber, 'None' if self.data is None else _hex(self.data)) del namedtuple diff --git a/lib/logitech/unifying_receiver/constants.py b/lib/logitech/unifying_receiver/constants.py index 802a4fea..dbdbaa22 100644 --- a/lib/logitech/unifying_receiver/constants.py +++ b/lib/logitech/unifying_receiver/constants.py @@ -2,8 +2,9 @@ # Constants used by the rest of the API. # -from binascii import hexlify as _hexlify from struct import pack as _pack +from binascii import hexlify as _hexlify +_hex = lambda d: _hexlify(d).decode('ascii').upper() from .common import (FallbackDict, list2dict) @@ -30,7 +31,7 @@ def _feature_name(key): return None if type(key) == int: return FEATURE_NAME[_pack('!H', key)] - return 'UNKNOWN_' + str(_hexlify(key)) + return 'UNKNOWN_' + _hex(key) """Feature names indexed by feature id.""" @@ -70,11 +71,11 @@ BATTERY_STATUS = FallbackDict(lambda x: 'unknown', list2dict(_BATTERY_STATUSES)) _KEY_NAMES = ( 'unknown_0000', 'Volume up', 'Volume down', 'Mute', 'Play/Pause', 'Next', 'Previous', 'Stop', 'Application switcher', - 'unknown_0009', 'Calculator', 'unknown_000b', 'unknown_000c', - 'unknown_000d', 'Mail') + 'unknown_0009', 'Calculator', 'unknown_000B', 'unknown_000C', + 'unknown_000D', 'Mail') """Standard names for reprogrammable keys.""" -KEY_NAME = FallbackDict(lambda x: 'unknown_%04x' % x, list2dict(_KEY_NAMES)) +KEY_NAME = FallbackDict(lambda x: 'unknown_%04X' % x, list2dict(_KEY_NAMES)) """Possible flags on a reprogrammable key.""" KEY_FLAG = type('KEY_FLAG', (), dict( diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py index 9f10f34b..f9e7daa1 100644 --- a/lib/logitech/unifying_receiver/listener.py +++ b/lib/logitech/unifying_receiver/listener.py @@ -28,7 +28,7 @@ def _event_dispatch(listener, callback): # _l.log(_LOG_LEVEL, "starting dispatch") while listener._active: # or not listener._events.empty(): event = listener._events.get() - _l.log(_LOG_LEVEL, "delivering event %s", event) + # _l.log(_LOG_LEVEL, "delivering event %s", event) try: callback(event) except: @@ -47,7 +47,7 @@ class EventsListener(Thread): be captured by the listener and delivered to the callback. """ def __init__(self, receiver_handle, events_callback): - super(EventsListener, self).__init__(group='Unifying Receiver', name='%s-%x' % (self.__class__.__name__, receiver_handle)) + super(EventsListener, self).__init__(group='Unifying Receiver', name='%s-%X' % (self.__class__.__name__, receiver_handle)) self.daemon = True self._active = False @@ -63,7 +63,7 @@ class EventsListener(Thread): _base.unhandled_hook = self._unhandled self._dispatcher = Thread(group='Unifying Receiver', - name='%s-%x-dispatch' % (self.__class__.__name__, receiver_handle), + name='%s-%X-dispatch' % (self.__class__.__name__, receiver_handle), target=_event_dispatch, args=(self, events_callback)) self._dispatcher.daemon = True @@ -85,8 +85,9 @@ class EventsListener(Thread): break if event: + event = _Packet(*event) _l.log(_LOG_LEVEL, "queueing event %s", event) - self._events.put(_Packet(*event)) + self._events.put(event) if self._task: (api_function, args, kwargs), self._task = self._task, None @@ -152,4 +153,5 @@ class EventsListener(Thread): self._events.put(event) def __nonzero__(self): - return self._active and self._handle + return bool(self._active and self._handle) + __bool__ = __nonzero__ diff --git a/lib/logitech/unifying_receiver/tests/test_30_base.py b/lib/logitech/unifying_receiver/tests/test_30_base.py index be1dc7be..ea24d1ac 100644 --- a/lib/logitech/unifying_receiver/tests/test_30_base.py +++ b/lib/logitech/unifying_receiver/tests/test_30_base.py @@ -3,7 +3,6 @@ # import unittest -from binascii import hexlify from .. import base from ..exceptions import * @@ -64,10 +63,10 @@ class Test_UR_Base(unittest.TestCase): reply_code, reply_device, reply_data = reply self.assertEqual(reply_device, 0, "got ping reply for valid device") - self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % hexlify(reply_data)) + self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % base._hex(reply_data)) if reply_code == 0x10: # ping fail - self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % hexlify(reply_data)) + self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % base._hex(reply_data)) elif reply_code == 0x11: self.fail("Got valid ping from device 0") else: @@ -88,13 +87,13 @@ class Test_UR_Base(unittest.TestCase): reply_code, reply_device, reply_data = reply self.assertEqual(reply_device, device, "ping reply for wrong device") - self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % hexlify(reply_data)) + self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % base._hex(reply_data)) if reply_code == 0x10: # ping fail - self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % hexlify(reply_data)) + self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % base._hex(reply_data)) elif reply_code == 0x11: # ping ok - self.assertEqual(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: %s" % hexlify(reply_data)) + self.assertEqual(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: %s" % base._hex(reply_data)) self.assertEqual(reply_data[4:5], b'\xAA') devices.append(device) else: