diff --git a/app/pairing.py b/app/pairing.py index 5b6efdea..96ff9c12 100644 --- a/app/pairing.py +++ b/app/pairing.py @@ -5,6 +5,7 @@ from logging import getLogger as _Logger _l = _Logger('pairing') +from logitech.unifying_receiver import base as _base state = None @@ -12,12 +13,12 @@ class State(object): TICK = 300 PAIR_TIMEOUT = 60 * 1000 / TICK - def __init__(self, watcher): - self._watcher = watcher + def __init__(self, listener): + self.listener = listener self.reset() def device(self, number): - return self._watcher.receiver.devices.get(number) + return self.listener.devices.get(number) def reset(self): self.success = None @@ -25,14 +26,14 @@ class State(object): self._countdown = self.PAIR_TIMEOUT def countdown(self, assistant): + if self._countdown < 0 or not self.listener: + return False + if self._countdown == self.PAIR_TIMEOUT: self.start_scan() self._countdown -= 1 return True - if self._countdown < 0: - return False - self._countdown -= 1 if self._countdown > 0 and self.success is None: return True @@ -43,16 +44,16 @@ class State(object): def start_scan(self): self.reset() - self._watcher.receiver.events_filter = self.filter_events - reply = self._watcher.receiver.request(0xFF, b'\x80\xB2', b'\x01') + self.listener.events_filter = self.filter_events + reply = _base.request(self.listener.handle, 0xFF, b'\x80\xB2', b'\x01') _l.debug("start scan reply %s", repr(reply)) def stop_scan(self): if self._countdown >= 0: self._countdown = -1 - reply = self._watcher.receiver.request(0xFF, b'\x80\xB2', b'\x02') + reply = _base.request(self.listener.handle, 0xFF, b'\x80\xB2', b'\x02') _l.debug("stop scan reply %s", repr(reply)) - self._watcher.receiver.events_filter = None + self.listener.events_filter = None def filter_events(self, event): if event.devnumber == 0xFF: @@ -66,16 +67,16 @@ class State(object): return True return False - if event.devnumber in self._watcher.receiver.devices: + if event.devnumber in self.listener.receiver.devices: return False _l.debug("event for new device? %s", event) if event.code == 0x10 and event.data[0:2] == b'\x41\x04': - self.detected_device = self._watcher.receiver.make_device(event) + self.detected_device = self.listener.make_device(event) return True return True - def unpair(self, number): - _l.debug("unpair %d", number) - self._watcher.receiver.unpair_device(number) + def unpair(self, device): + _l.debug("unpair %s", device) + self.listener.unpair_device(device) diff --git a/app/receiver.py b/app/receiver.py index 93e55a80..6b794b69 100644 --- a/app/receiver.py +++ b/app/receiver.py @@ -3,13 +3,12 @@ # from logging import getLogger as _Logger - -from threading import Event as _Event from struct import pack as _pack from logitech.unifying_receiver import base as _base from logitech.unifying_receiver import api as _api -from logitech.unifying_receiver import listener as _listener +from logitech.unifying_receiver.listener import EventsListener as _EventsListener +from logitech.unifying_receiver.common import FallbackDict as _FallbackDict from logitech import devices as _devices from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS, NAMES) @@ -17,7 +16,6 @@ from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS, NAMES) # # - class _FeaturesArray(object): __slots__ = ('device', 'features', 'supported') @@ -27,31 +25,26 @@ class _FeaturesArray(object): self.supported = True def _check(self): - if not self.supported: - return False + if self.supported: + if self.features is not None: + return True - if self.features is not None: - return True - - if self.device.status >= STATUS.CONNECTED: - handle = self.device.receiver.handle - try: - index = _api.get_feature_index(handle, self.device.number, _api.FEATURE.FEATURE_SET) - except _api._FeatureNotSupported: - index = None - - if index is None: - self.supported = False - else: - count = _base.request(handle, self.device.number, _pack('!B', index) + b'\x00') - if count is None: + if self.device.status >= STATUS.CONNECTED: + handle = self.device.handle + try: + index = _api.get_feature_index(handle, self.device.number, _api.FEATURE.FEATURE_SET) + except _api._FeatureNotSupported: self.supported = False else: - count = ord(count[:1]) - self.features = [None] * (1 + count) - self.features[0] = _api.FEATURE.ROOT - self.features[index] = _api.FEATURE.FEATURE_SET - return True + count = _base.request(handle, self.device.number, _pack('!BB', index, 0x00)) + if count is None: + self.supported = False + else: + count = ord(count[:1]) + self.features = [None] * (1 + count) + self.features[0] = _api.FEATURE.ROOT + self.features[index] = _api.FEATURE.FEATURE_SET + return True return False @@ -65,7 +58,7 @@ class _FeaturesArray(object): raise IndexError if self.features[index] is None: fs_index = self.features.index(_api.FEATURE.FEATURE_SET) - feature = _base.request(self.device.receiver.handle, self.device.number, _pack('!BB', fs_index, 0x10), _pack('!B', index)) + feature = _base.request(self.device.handle, self.device.number, _pack('!BB', fs_index, 0x10), _pack('!B', index)) if feature is not None: self.features[index] = feature[:2] @@ -104,29 +97,34 @@ class _FeaturesArray(object): def __len__(self): return len(self.features) if self._check() else 0 +# +# +# -class DeviceInfo(object): +class DeviceInfo(_api.PairedDevice): """A device attached to the receiver. """ - def __init__(self, receiver, number, pair_code, status=STATUS.UNKNOWN): + def __init__(self, listener, number, pair_code, status=STATUS.UNKNOWN): + super(DeviceInfo, self).__init__(listener.handle, number) + self.LOG = _Logger("Device[%d]" % number) - self.receiver = receiver - self.number = number + self._listener = listener self._pair_code = pair_code self._serial = None self._codename = None - self._name = None - self._kind = None - self._firmware = None self._status = status self.props = {} self.features = _FeaturesArray(self) + # read them now, otherwise it it temporarily hang the UI + if status >= STATUS.CONNECTED: + n, k, s, f = self.name, self.kind, self.serial, self.firmware + @property - def handle(self): - return self.receiver.handle + def receiver(self): + return self._listener.receiver @property def status(self): @@ -138,7 +136,7 @@ class DeviceInfo(object): self.LOG.debug("status %d => %d", self._status, new_status) urgent = new_status < STATUS.CONNECTED or self._status < STATUS.CONNECTED self._status = new_status - self.receiver._device_changed(self, urgent) + self._listener.status_changed_callback(self, urgent) if new_status < STATUS.CONNECTED: self.props.clear() @@ -161,13 +159,9 @@ class DeviceInfo(object): def name(self): if self._name is None: if self._status >= STATUS.CONNECTED: - self._name = _api.get_device_name(self.receiver.handle, self.number, self.features) + self._name = _api.get_device_name(self.handle, self.number, self.features) return self._name or self.codename - @property - def device_name(self): - return self.name - @property def kind(self): if self._kind is None: @@ -176,7 +170,7 @@ class DeviceInfo(object): if codename in NAMES: self._kind = NAMES[codename][-1] else: - self._kind = _api.get_device_kind(self.receiver.handle, self.number, self.features) + self._kind = _api.get_device_kind(self.handle, self.number, self.features) return self._kind or '?' @property @@ -185,7 +179,7 @@ class DeviceInfo(object): # dodgy b = bytearray(self._pair_code) b[0] -= 0x10 - serial = _base.request(self.receiver.handle, 0xFF, b'\x83\xB5', bytes(b)) + serial = _base.request(self.handle, 0xFF, b'\x83\xB5', bytes(b)) if serial: self._serial = _base._hex(serial[1:5]) return self._serial or '?' @@ -193,7 +187,7 @@ class DeviceInfo(object): @property def codename(self): if self._codename is None: - codename = _base.request(self.receiver.handle, 0xFF, b'\x83\xB5', self._pair_code) + codename = _base.request(self.handle, 0xFF, b'\x83\xB5', self._pair_code) if codename: self._codename = codename[2:].rstrip(b'\x00').decode('ascii') return self._codename or '?' @@ -202,19 +196,16 @@ class DeviceInfo(object): def firmware(self): if self._firmware is None: if self._status >= STATUS.CONNECTED: - self._firmware = _api.get_device_firmware(self.receiver.handle, self.number, self.features) + self._firmware = _api.get_device_firmware(self.handle, self.number, self.features) return self._firmware or () - def ping(self): - return _api.ping(self.receiver.handle, self.number) - def process_event(self, code, data): if code == 0x10 and data[:1] == b'\x8F': self.status = STATUS.UNAVAILABLE return True if code == 0x11: - status = _devices.process_event(self, data, self.receiver) + status = _devices.process_event(self, data) if status: if type(status) == int: self.status = status @@ -225,161 +216,110 @@ class DeviceInfo(object): self.props.update(status[1]) if self.status == status[0]: if p != self.props: - self.receiver._device_changed(self) + self._listener.status_changed_callback(self) else: self.status = status[0] return True - self.LOG.warn("don't know how to handle status %s", status) + self.LOG.warn("don't know how to handle processed event status %s", status) return False - def __hash__(self): - return self.number - def __str__(self): - return 'DeviceInfo(%d,%s,%d)' % (self.number, self.name, self._status) - - def __repr__(self): - return '' % (self.number, self.name, self._status) + return 'DeviceInfo(%d,%s,%d)' % (self.number, self._name or '?', self._status) # # # -class Receiver(_listener.EventsListener): +_RECEIVER_STATUS_NAME = _FallbackDict( + lambda x: + '1 device found' if x == STATUS.CONNECTED + 1 else + '%d devices found' if x > STATUS.CONNECTED else + '?', + { + STATUS.UNKNOWN: 'Initializing...', + STATUS.UNAVAILABLE: 'Receiver not found.', + STATUS.BOOTING: 'Scanning...', + STATUS.CONNECTED: 'No devices found.', + } + ) + +class ReceiverListener(_EventsListener): """Keeps the status of a Unifying Receiver. """ - NAME = kind = 'Unifying Receiver' - max_devices = _api.MAX_ATTACHED_DEVICES - def __init__(self, path, handle): - super(Receiver, self).__init__(handle, self._events_handler) - self.path = path + def __init__(self, receiver, status_changed_callback): + super(ReceiverListener, self).__init__(receiver.handle, self._events_handler) + self.receiver = receiver - self._status = STATUS.BOOTING - self.status_changed = _Event() - self.status_changed.urgent = False - self.status_changed.reason = None + self.LOG = _Logger("ReceiverListener(%s)" % receiver.path) - self.LOG = _Logger("Receiver[%s]" % path) - self.LOG.info("initializing") - - self._serial = None - self._firmware = None - - self.devices = {} self.events_filter = None self.events_handler = None - if _base.request(handle, 0xFF, b'\x80\x00', b'\x00\x01'): + self.status_changed_callback = status_changed_callback or (lambda reason=None, urgent=False: None) + + receiver.kind = receiver.name + receiver.devices = {} + receiver.status = STATUS.BOOTING + receiver.status_text = _RECEIVER_STATUS_NAME[STATUS.BOOTING] + + if _base.request(receiver.handle, 0xFF, b'\x80\x00', b'\x00\x01'): self.LOG.info("initialized") else: self.LOG.warn("initialization failed") - if _base.request(handle, 0xFF, b'\x80\x02', b'\x02'): + if _base.request(receiver.handle, 0xFF, b'\x80\x02', b'\x02'): self.LOG.info("triggered device events") else: self.LOG.warn("failed to trigger device events") - def close(self): - """Closes the receiver's handle. + def change_status(self, new_status): + if new_status != self.receiver.status: + self.LOG.debug("status %d => %d", self.receiver.status, new_status) + self.receiver.status = new_status + self.receiver.status_text = _RECEIVER_STATUS_NAME[new_status] + self.status_changed_callback(self.receiver, True) - The receiver can no longer be used in API calls after this. - """ - self.LOG.info("closing") - self.stop() - - @property - def status(self): - return self._status - - @status.setter - def status(self, new_status): - if new_status != self._status: - self.LOG.debug("status %d => %d", self._status, new_status) - self._status = new_status - self.status_changed.reason = self - self.status_changed.urgent = True - self.status_changed.set() - - @property - def status_text(self): - status = self._status - if status == STATUS.UNKNOWN: - return 'Initializing...' - if status == STATUS.UNAVAILABLE: - return 'Receiver not found.' - if status == STATUS.BOOTING: - return 'Scanning...' - if status == STATUS.CONNECTED: - return 'No devices found.' - if len(self.devices) > 1: - return '%d devices found' % len(self.devices) - return '1 device found' - - @property - def device_name(self): - return self.NAME - - def count_devices(self): - return _api.count_devices(self._handle) - - @property - def serial(self): - if self._serial is None: - if self: - self._serial, self._firmware = _api.get_receiver_info(self._handle) - return self._serial or '?' - - @property - def firmware(self): - if self._firmware is None: - if self: - self._serial, self._firmware = _api.get_receiver_info(self._handle) - return self._firmware or ('?', '?') - - - def _device_changed(self, dev, urgent=False): - self.status_changed.reason = dev - self.status_changed.urgent = urgent - self.status_changed.set() + def _device_status_from(self, event): + state_code = ord(event.data[2:3]) & 0xF0 + state = STATUS.UNAVAILABLE if state_code == 0x60 else \ + STATUS.CONNECTED if state_code == 0xA0 else \ + STATUS.CONNECTED if state_code == 0x20 else \ + STATUS.UNKNOWN + if state == STATUS.UNKNOWN: + self.LOG.warn("don't know how to handle state code 0x%02X: %s", state_code, event) + return state def _events_handler(self, event): if self.events_filter and self.events_filter(event): return if event.code == 0x10 and event.data[0:2] == b'\x41\x04': - if event.devnumber in self.devices: - state_code = ord(event.data[2:3]) & 0xF0 - state = STATUS.UNAVAILABLE if state_code == 0x60 else \ - STATUS.CONNECTED if state_code == 0xA0 else \ - STATUS.CONNECTED if state_code == 0x20 else \ - None - if state is None: - self.LOG.warn("don't know how to handle status 0x%02X: %s", state_code, event) - else: - self.devices[event.devnumber].status = state - return - dev = self.make_device(event) - if dev is None: - self.LOG.warn("failed to make new device from %s", event) + if event.devnumber in self.receiver.devices: + status = self._device_status_from(event) + if status > STATUS.UNKNOWN: + self.receiver.devices[event.devnumber].status = status else: - self.devices[event.devnumber] = dev - self.LOG.info("new device ready %s", dev) - self.status = STATUS.CONNECTED + len(self.devices) + dev = self.make_device(event) + if dev is None: + self.LOG.warn("failed to make new device from %s", event) + else: + self.receiver.devices[event.devnumber] = dev + self.change_status(STATUS.CONNECTED + len(self.receiver.devices)) return if event.devnumber == 0xFF: if event.code == 0xFF and event.data is None: # receiver disconnected - self.LOG.info("disconnected") - self.devices = {} - self.status = STATUS.UNAVAILABLE + self.LOG.warn("disconnected") + self.receiver.devices = {} + self.change_status(STATUS.UNAVAILABLE) return - elif event.devnumber in self.devices: - dev = self.devices[event.devnumber] + elif event.devnumber in self.receiver.devices: + dev = self.receiver.devices[event.devnumber] if dev.process_event(event.code, event.data): return @@ -389,49 +329,50 @@ class Receiver(_listener.EventsListener): self.LOG.warn("don't know how to handle event %s", event) def make_device(self, event): - if event.devnumber < 1 or event.devnumber > self.max_devices: + if event.devnumber < 1 or event.devnumber > self.receiver.max_devices: self.LOG.warn("got event for invalid device number %d: %s", event.devnumber, event) return None - state_code = ord(event.data[2:3]) & 0xF0 - state = STATUS.UNAVAILABLE if state_code == 0x60 else \ - STATUS.CONNECTED if state_code == 0xA0 else \ - STATUS.CONNECTED if state_code == 0x20 else \ - None - if state is None: - self.LOG.warn("don't know how to handle device status 0x%02X: %s", state_code, event) - return None + status = self._device_status_from(event) - return DeviceInfo(self, event.devnumber, event.data[4:5], state) + dev = DeviceInfo(self, event.devnumber, event.data[4:5], status) + self.LOG.info("new device %s", dev) + self.status_changed_callback(dev, True) + return dev - def unpair_device(self, number): - if number in self.devices: - dev = self.devices[number] - reply = _base.request(self._handle, 0xFF, b'\x80\xB2', _pack('!BB', 0x03, number)) - if reply: - self.LOG.debug("remove device %s => %s", dev, _base._hex(reply)) - del self.devices[number] - self.LOG.warn("unpaired device %s", dev) - self.status = STATUS.CONNECTED + len(self.devices) - return True - self.LOG.warn("failed to unpair device %s", dev) - return False + def unpair_device(self, device): + try: + del self.receiver[device.number] + except IndexError: + self.LOG.error("failed to unpair device %s", device) + return False + + del self.receiver.devices[device.number] + self.LOG.info("unpaired device %s", device) + self.change_status(STATUS.CONNECTED + len(self.receiver.devices)) + device.status = STATUS.UNPAIRED + return True def __str__(self): - return 'Receiver(%s,%X,%d)' % (self.path, self._handle, self._status) + return '' % (self.path, self.receiver.status) @classmethod - def open(self): - """Opens the first Logitech Unifying Receiver found attached to the machine. + def open(self, status_changed_callback=None): + receiver = _api.Receiver.open() + if receiver: + rl = ReceiverListener(receiver, status_changed_callback) + rl.start() + return rl - :returns: An open file handle for the found receiver, or ``None``. - """ - for rawdevice in _base.list_receiver_devices(): - _Logger("receiver").debug("checking %s", rawdevice) - handle = _base.try_open(rawdevice.path) - if handle: - receiver = Receiver(rawdevice.path, handle) - receiver.start() - return receiver +# +# +# - return None +class _DUMMY_RECEIVER(object): + name = _api.Receiver.name + max_devices = _api.Receiver.max_devices + status = STATUS.UNAVAILABLE + status_text = _RECEIVER_STATUS_NAME[STATUS.UNAVAILABLE] + devices = {} + __bool__ = __nonzero__ = lambda self: False +DUMMY = _DUMMY_RECEIVER() diff --git a/app/solaar.py b/app/solaar.py index bb1c0c58..e9eb3955 100644 --- a/app/solaar.py +++ b/app/solaar.py @@ -31,9 +31,9 @@ def _parse_arguments(): args = arg_parser.parse_args() import logging - log_level = logging.root.level - 10 * args.verbose + log_level = logging.ERROR - 10 * args.verbose log_format='%(asctime)s %(levelname)8s [%(threadName)s] %(name)s: %(message)s' - logging.basicConfig(level=max(log_level, 1), format=log_format) + logging.basicConfig(level=max(log_level, logging.DEBUG), format=log_format) return args @@ -43,44 +43,64 @@ if __name__ == '__main__': import ui - # check if the notifications are available + # check if the notifications are available and enabled args.notifications &= args.systray - if ui.notify.init(APPNAME): + if ui.notify.available and ui.notify.init(APPNAME): ui.action.toggle_notifications.set_active(args.notifications) else: ui.action.toggle_notifications = None - import watcher + from receiver import (ReceiverListener, DUMMY) window = ui.main_window.create(APPNAME, - watcher.DUMMY.NAME, - watcher.DUMMY.max_devices, + DUMMY.name, + DUMMY.max_devices, args.systray) - ui.action.pair.window = window - ui.action.unpair.window = window if args.systray: - menu_actions = (ui.action.pair, - ui.action.toggle_notifications, + menu_actions = (ui.action.toggle_notifications, ui.action.about) icon = ui.status_icon.create(window, menu_actions) else: icon = None window.present() - w = watcher.Watcher(APPNAME, - lambda r: ui.update(r, icon, window), - ui.notify.show if ui.notify.available else None) - w.start() - import pairing - pairing.state = pairing.State(w) - from gi.repository import Gtk + listener = None + notify_missing = True + + def status_changed(reason, urgent=False): + global listener + receiver = DUMMY if listener is None else listener.receiver + ui.update(receiver, icon, window, reason) + if ui.notify.available and reason and urgent: + ui.notify.show(reason or receiver) + + def check_for_listener(): + global listener, notify_missing + if listener is None: + listener = ReceiverListener.open(status_changed) + if listener is None: + pairing.state = None + if notify_missing: + status_changed(DUMMY, True) + ui.notify.show(DUMMY) + notify_missing = False + else: + # print ("opened receiver", listener, listener.receiver) + pairing.state = pairing.State(listener) + notify_missing = True + status_changed(listener.receiver, True) + return True + + from gi.repository import Gtk, GObject + + GObject.timeout_add(5000, check_for_listener) + check_for_listener() Gtk.main() - w.stop() - ui.notify.uninit() + if listener is not None: + listener.stop() - import logging - logging.shutdown() + ui.notify.uninit() diff --git a/app/ui/__init__.py b/app/ui/__init__.py index 8a74e223..cc4308a0 100644 --- a/app/ui/__init__.py +++ b/app/ui/__init__.py @@ -1,7 +1,7 @@ # pass APPNAME = 'Solaar' -APPVERSION = '0.5' +APPVERSION = '0.6' from . import (notify, status_icon, main_window, pair_window, action) @@ -50,9 +50,10 @@ def find_children(container, *child_names): return tuple(result) if count > 1 else result[0] -def update(receiver, icon, window): - GObject.idle_add(action.pair.set_sensitive, receiver.status > 0) +def update(receiver, icon, window, reason): + assert receiver is not None + assert reason is not None if window: - GObject.idle_add(main_window.update, window, receiver) + GObject.idle_add(main_window.update, window, receiver, reason) if icon: GObject.idle_add(status_icon.update, icon, receiver) diff --git a/app/ui/action.py b/app/ui/action.py index 0a995200..6b3d4c31 100644 --- a/app/ui/action.py +++ b/app/ui/action.py @@ -62,23 +62,31 @@ quit = _action('exit', 'Quit', Gtk.main_quit) import pairing -def _pair_device(action): - action.set_sensitive(False) - pair_dialog = ui.pair_window.create(action, pairing.state) +def _pair_device(action, frame): + window = frame.get_toplevel() + + pair_dialog = ui.pair_window.create( action, pairing.state) + pair_dialog.set_transient_for(window) pair_dialog.set_modal(True) + + window.present() pair_dialog.present() -pair = _action('add', 'Pair new device', _pair_device) + +def pair(frame): + return _action('add', 'Pair new device', _pair_device, frame) -def _unpair_device(action): - dev = pairing.state.device(action.devnumber) - action.devnumber = 0 - if dev: - qdialog = Gtk.MessageDialog(action.window, 0, - Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, - "Unpair device '%s' ?" % dev.name) - choice = qdialog.run() - qdialog.destroy() - if choice == Gtk.ResponseType.YES: - pairing.state.unpair(dev.number) -unpair = _action('remove', 'Unpair', _unpair_device) +def _unpair_device(action, frame): + window = frame.get_toplevel() + window.present() + device = frame._device + qdialog = Gtk.MessageDialog(window, 0, + Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, + "Unpair device\n%s ?" % device.name) + choice = qdialog.run() + qdialog.destroy() + if choice == Gtk.ResponseType.YES: + pairing.state.unpair(device) + +def unpair(frame): + return _action('remove', 'Unpair', _unpair_device, frame) diff --git a/app/ui/main_window.py b/app/ui/main_window.py index 62fc99e0..9dbd1da0 100644 --- a/app/ui/main_window.py +++ b/app/ui/main_window.py @@ -2,7 +2,7 @@ # # -from gi.repository import (Gtk, Gdk, GObject) +from gi.repository import (Gtk, Gdk) import ui from logitech.devices.constants import (STATUS, PROPS) @@ -17,13 +17,27 @@ _PLACEHOLDER = '~' # # -def _toggle_info_button(label, widget): - toggle = lambda a, w: w.set_visible(a.get_active()) - action = ui.action._toggle_action('info', label, toggle, widget) - return action.create_tool_item() +def _info_text(dev): + fw_text = '\n'.join(['%-12s\t%s%s%s' % + (f.kind, f.name, ' ' if f.name else '', f.version) for f in dev.firmware]) + return ('' + 'Serial \t\t%s\n' + '%s' + '' % (dev.serial, fw_text)) + +def _toggle_info(action, label_widget, box_widget, frame): + if action.get_active(): + box_widget.set_visible(True) + if not label_widget.get_text(): + label_widget.set_markup(_info_text(frame._device)) + else: + box_widget.set_visible(False) -def _receiver_box(name): +def _make_receiver_box(name): + frame = Gtk.Frame() + frame._device = None + icon = Gtk.Image.new_from_icon_name(name, _SMALL_DEVICE_ICON_SIZE) label = Gtk.Label('Initializing...') @@ -51,22 +65,25 @@ def _receiver_box(name): info_box.add(info_label) info_box.set_shadow_type(Gtk.ShadowType.ETCHED_IN) - toolbar.insert(_toggle_info_button('Receiver info', info_box), 0) - toolbar.insert(ui.action.pair.create_tool_item(), -1) + toggle_info_action = ui.action._toggle_action('info', 'Receiver info', _toggle_info, info_label, info_box, frame) + toolbar.insert(toggle_info_action.create_tool_item(), 0) + toolbar.insert(ui.action.pair(frame).create_tool_item(), -1) vbox = Gtk.VBox(homogeneous=False, spacing=2) vbox.set_border_width(4) vbox.pack_start(hbox, True, True, 0) vbox.pack_start(info_box, True, True, 0) - frame = Gtk.Frame() frame.add(vbox) frame.show_all() info_box.set_visible(False) return frame -def _device_box(index): +def _make_device_box(index): + frame = Gtk.Frame() + frame._device = None + icon = Gtk.Image.new_from_icon_name('image-missing', _DEVICE_ICON_SIZE) icon.set_name('icon') icon.set_alignment(0.5, 0) @@ -111,11 +128,9 @@ def _device_box(index): info_box = Gtk.Frame() info_box.add(info_label) - toolbar.insert(_toggle_info_button('Device info', info_box), 0) - def _set_number(action): - action.devnumber = index - unpair_action = ui.action.wrap_action(ui.action.unpair, _set_number) - toolbar.insert(unpair_action.create_tool_item(), -1) + toggle_info_action = ui.action._toggle_action('info', 'Device info', _toggle_info, info_label, info_box, frame) + toolbar.insert(toggle_info_action.create_tool_item(), 0) + toolbar.insert(ui.action.unpair(frame).create_tool_item(), -1) vbox = Gtk.VBox(homogeneous=False, spacing=4) vbox.pack_start(label, True, True, 0) @@ -128,7 +143,6 @@ def _device_box(index): box.pack_start(vbox, True, True, 0) box.show_all() - frame = Gtk.Frame() frame.add(box) info_box.set_visible(False) return frame @@ -158,10 +172,10 @@ def create(title, name, max_devices, systray=False): vbox = Gtk.VBox(homogeneous=False, spacing=4) vbox.set_border_width(4) - rbox = _receiver_box(name) + rbox = _make_receiver_box(name) vbox.add(rbox) for i in range(1, 1 + max_devices): - dbox = _device_box(i) + dbox = _make_device_box(i) vbox.add(dbox) vbox.set_visible(True) @@ -187,31 +201,23 @@ def create(title, name, max_devices, systray=False): # # -def _info_text(dev): - fw_text = '\n'.join(['%-12s\t%s%s%s' % - (f.kind, f.name, ' ' if f.name else '', f.version) for f in dev.firmware]) - return ('' - 'Serial \t\t%s\n' - '%s' - '' % (dev.serial, fw_text)) - - def _update_receiver_box(frame, receiver): label, toolbar, info_label = ui.find_children(frame, 'label', 'toolbar', 'info-label') label.set_text(receiver.status_text or '') - if receiver.status < STATUS.CONNECTED: + frame._device = None toolbar.set_sensitive(False) toolbar.get_children()[0].set_active(False) info_label.set_text('') else: toolbar.set_sensitive(True) - if not info_label.get_text(): - info_label.set_markup(_info_text(receiver)) + frame._device = receiver def _update_device_box(frame, dev): + frame._device = dev + icon, label, info_label = ui.find_children(frame, 'icon', 'label', 'info-label') if frame.get_name() != dev.name: @@ -229,60 +235,59 @@ def _update_device_box(frame, dev): for c in status_icons[1:-1]: c.set_visible(False) toolbar.get_children()[0].set_active(False) - return - - icon.set_sensitive(True) - label.set_sensitive(True) - status.set_sensitive(True) - if not info_label.get_text(): - info_label.set_markup(_info_text(dev)) - - battery_icon, battery_label = status_icons[0:2] - battery_level = dev.props.get(PROPS.BATTERY_LEVEL) - if battery_level is None: - battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE) - battery_icon.set_sensitive(False) - battery_label.set_visible(False) else: - icon_name = 'battery_%03d' % (20 * ((battery_level + 10) // 20)) - battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE) - battery_icon.set_sensitive(True) - battery_label.set_text('%d%%' % battery_level) - battery_label.set_visible(True) + icon.set_sensitive(True) + label.set_sensitive(True) + status.set_sensitive(True) - battery_status = dev.props.get(PROPS.BATTERY_STATUS) - battery_icon.set_tooltip_text(battery_status or '') + battery_icon, battery_label = status_icons[0:2] + battery_level = dev.props.get(PROPS.BATTERY_LEVEL) + if battery_level is None: + battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE) + battery_icon.set_sensitive(False) + battery_label.set_visible(False) + else: + icon_name = 'battery_%03d' % (20 * ((battery_level + 10) // 20)) + battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE) + battery_icon.set_sensitive(True) + battery_label.set_text('%d%%' % battery_level) + battery_label.set_visible(True) - light_icon, light_label = status_icons[2:4] - light_level = dev.props.get(PROPS.LIGHT_LEVEL) - if light_level is None: - light_icon.set_visible(False) - light_label.set_visible(False) - else: - icon_name = 'light_%03d' % (20 * ((light_level + 50) // 100)) - light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE) - light_icon.set_visible(True) - light_label.set_text('%d lux' % light_level) - light_label.set_visible(True) + battery_status = dev.props.get(PROPS.BATTERY_STATUS) + battery_icon.set_tooltip_text(battery_status or '') - for b in toolbar.get_children()[:-1]: - b.set_sensitive(True) + light_icon, light_label = status_icons[2:4] + light_level = dev.props.get(PROPS.LIGHT_LEVEL) + if light_level is None: + light_icon.set_visible(False) + light_label.set_visible(False) + else: + icon_name = 'light_%03d' % (20 * ((light_level + 50) // 100)) + light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE) + light_icon.set_visible(True) + light_label.set_text('%d lux' % light_level) + light_label.set_visible(True) + + for b in toolbar.get_children()[:-1]: + b.set_sensitive(True) frame.set_visible(True) -def update(window, receiver): + +def update(window, receiver, reason): + print ("update", receiver, receiver.status, reason) window.set_icon_name(ui.appicon(receiver.status)) vbox = window.get_child() controls = list(vbox.get_children()) - GObject.idle_add(_update_receiver_box, controls[0], receiver) - - for index in range(1, len(controls)): - dev = receiver.devices[index] if index in receiver.devices else None - frame = controls[index] - if dev is None: + if reason == receiver: + _update_receiver_box(controls[0], receiver) + else: + frame = controls[reason.number] + if reason.status == STATUS.UNPAIRED: frame.set_visible(False) frame.set_name(_PLACEHOLDER) + frame._device = None else: - GObject.idle_add(_update_device_box, frame, dev) + _update_device_box(frame, reason) diff --git a/app/ui/notify.py b/app/ui/notify.py index 4bdd9a0b..5367e796 100644 --- a/app/ui/notify.py +++ b/app/ui/notify.py @@ -50,7 +50,7 @@ try: def show(dev): """Show a notification with title and text.""" if available and Notify.is_initted(): - summary = dev.device_name + summary = dev.name # if a notification with same name is already visible, reuse it to avoid spamming n = _notifications.get(summary) diff --git a/app/ui/pair_window.py b/app/ui/pair_window.py index 61e6a2e1..5ec5c1ee 100644 --- a/app/ui/pair_window.py +++ b/app/ui/pair_window.py @@ -30,15 +30,14 @@ def _device_confirmed(entry, _2, trigger, assistant, page): return True -def _finish(assistant, action): +def _finish(assistant): logging.debug("finish %s", assistant) assistant.destroy() - action.set_sensitive(True) -def _cancel(assistant, action, state): +def _cancel(assistant, state): logging.debug("cancel %s", assistant) state.stop_scan() - _finish(assistant, action) + _finish(assistant) def _prepare(assistant, page, state): index = assistant.get_current_page() @@ -119,8 +118,8 @@ def create(action, state): assistant.scan_complete = _scan_complete assistant.connect('prepare', _prepare, state) - assistant.connect('cancel', _cancel, action, state) - assistant.connect('close', _finish, action) - assistant.connect('apply', _finish, action) + assistant.connect('cancel', _cancel, state) + assistant.connect('close', _finish) + assistant.connect('apply', _finish) return assistant diff --git a/app/watcher.py b/app/watcher.py deleted file mode 100644 index 122451ac..00000000 --- a/app/watcher.py +++ /dev/null @@ -1,104 +0,0 @@ -# -# -# - -from threading import Thread -import time -from logging import getLogger as _Logger - -from logitech.devices.constants import STATUS -from receiver import Receiver - - -class _DUMMY_RECEIVER(object): - NAME = Receiver.NAME - device_name = NAME - kind = Receiver.NAME - status = STATUS.UNAVAILABLE - status_text = 'Receiver not found.' - max_devices = Receiver.max_devices - devices = {} - __bool__ = __nonzero__ = lambda self: False -DUMMY = _DUMMY_RECEIVER() - -_l = _Logger('watcher') - - -def _sleep(seconds, granularity, breakout=lambda: False): - slept = 0 - while slept < seconds and not breakout(): - time.sleep(granularity) - slept += granularity - - -class Watcher(Thread): - """Keeps an active receiver object if possible, and updates the UI when - necessary. - """ - def __init__(self, apptitle, update_ui, notify=None): - super(Watcher, self).__init__(group=apptitle, name='Watcher') - self._active = False - self._receiver = DUMMY - - self.update_ui = update_ui - self.notify = notify or (lambda d: None) - - @property - def receiver(self): - return self._receiver - - def run(self): - self._active = True - notify_missing = True - - while self._active: - if self._receiver == DUMMY: - r = Receiver.open() - if r is None: - if notify_missing: - _sleep(0.8, 0.4, lambda: not self._active) - notify_missing = False - if self._active: - self.update_ui(DUMMY) - self.notify(DUMMY) - _sleep(4, 0.4, lambda: not self._active) - continue - - _l.info("receiver %s ", r) - self._receiver = r - notify_missing = True - - self.update_ui(r) - self.notify(r) - - if self._active: - if self._receiver: - _l.debug("waiting for status_changed") - sc = self._receiver.status_changed - sc.wait() - if not self._active: - break - sc.clear() - if sc.urgent: - _l.info("status_changed %s", sc.reason) - self.update_ui(self._receiver) - if sc.reason and sc.urgent: - self.notify(sc.reason) - else: - self._receiver = DUMMY - self.update_ui(DUMMY) - self.notify(DUMMY) - - if self._receiver: - self._receiver.close() - self._receiver = _DUMMY_RECEIVER - - def stop(self): - if self._active: - _l.info("stopping %s", self) - self._active = False - if self._receiver: - # break out of an eventual wait() - self._receiver.status_changed.reason = None - self._receiver.status_changed.set() - self.join() diff --git a/lib/logitech/devices/__init__.py b/lib/logitech/devices/__init__.py index 90fe56d1..c415a415 100644 --- a/lib/logitech/devices/__init__.py +++ b/lib/logitech/devices/__init__.py @@ -30,7 +30,7 @@ def _module(device_name): # # -def default_request_status(devinfo, listener=None): +def default_request_status(devinfo): if FEATURE.BATTERY in devinfo.features: reply = _api.get_device_battery_level(devinfo.handle, devinfo.number, features=devinfo.features) if reply: @@ -41,7 +41,7 @@ def default_request_status(devinfo, listener=None): return STATUS.CONNECTED if reply else STATUS.UNAVAILABLE -def default_process_event(devinfo, data, listener=None): +def default_process_event(devinfo, data): feature_index = ord(data[0:1]) if feature_index >= len(devinfo.features): logging.warn("mistery event %s for %s", repr(data), devinfo) @@ -72,7 +72,7 @@ def default_process_event(devinfo, data, listener=None): # ? -def request_status(devinfo, listener=None): +def request_status(devinfo): """Trigger a status request for a device. :param devinfo: the device info tuple. @@ -81,20 +81,20 @@ def request_status(devinfo, listener=None): """ m = _module(devinfo.name) if m and 'request_status' in m.__dict__: - return m.request_status(devinfo, listener) - return default_request_status(devinfo, listener) + return m.request_status(devinfo) + return default_request_status(devinfo) -def process_event(devinfo, data, listener=None): +def process_event(devinfo, data): """Process an event received for a device. :param devinfo: the device info tuple. :param data: the event data (event packet sans the first two bytes: reply code and device number) """ - default_result = default_process_event(devinfo, data, listener) + default_result = default_process_event(devinfo, data) if default_result is not None: return default_result m = _module(devinfo.name) if m and 'process_event' in m.__dict__: - return m.process_event(devinfo, data, listener) + return m.process_event(devinfo, data) diff --git a/lib/logitech/devices/constants.py b/lib/logitech/devices/constants.py index 32da3911..5b802146 100644 --- a/lib/logitech/devices/constants.py +++ b/lib/logitech/devices/constants.py @@ -5,6 +5,7 @@ STATUS = type('STATUS', (), dict( UNKNOWN=-9999, + UNPAIRED=-1000, UNAVAILABLE=-1, BOOTING=0, CONNECTED=1, @@ -12,6 +13,7 @@ STATUS = type('STATUS', (), STATUS_NAME = { STATUS.UNKNOWN: '...', + STATUS.UNPAIRED: 'unpaired', STATUS.UNAVAILABLE: 'inactive', STATUS.BOOTING: 'initializing', STATUS.CONNECTED: 'connected', diff --git a/lib/logitech/devices/k750.py b/lib/logitech/devices/k750.py index c935ecc0..d3dc37d7 100644 --- a/lib/logitech/devices/k750.py +++ b/lib/logitech/devices/k750.py @@ -28,7 +28,7 @@ def _charge_status(data, hasLux=False): } -def request_status(devinfo, listener=None): +def request_status(devinfo): reply = _api.request(devinfo.handle, devinfo.number, feature=FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', features=devinfo.features) @@ -36,7 +36,7 @@ def request_status(devinfo, listener=None): return STATUS.UNAVAILABLE -def process_event(devinfo, data, listener=None): +def process_event(devinfo, data): if data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': # usually sent after the keyboard is turned on or just connected return _charge_status(data) @@ -47,4 +47,4 @@ def process_event(devinfo, data, listener=None): if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD': logging.debug("Solar key pressed") - return request_status(devinfo, listener) or _charge_status(data) + return request_status(devinfo) or _charge_status(data) diff --git a/lib/logitech/scanner.py b/lib/logitech/scanner.py index 2a5a8a93..25104c43 100644 --- a/lib/logitech/scanner.py +++ b/lib/logitech/scanner.py @@ -1,52 +1,38 @@ #!/usr/bin/env python -import logging -logging.basicConfig(level=logging.DEBUG) - -from .unifying_receiver import api -from .unifying_receiver.constants import * - - def print_receiver(receiver): - print ("Unifying Receiver") + print (str(receiver)) - serial, firmware = api.get_receiver_info(receiver) - - print (" Serial : %s" % serial) - for f in firmware: + print (" Serial : %s" % receiver.serial) + for f in receiver.firmware: print (" %-10s: %s" % (f.kind, f.version)) print ("--------") def scan_devices(receiver): - print_receiver(receiver) + for dev in receiver: + print (str(dev)) + print ("Name: %s" % dev.name) + print ("Kind: %s" % dev.kind) - devices = api.list_devices(receiver) - if not devices: - print ("!! No attached devices found.") - return - - for devinfo in devices: - print ("Device [%d] %s (%s)" % (devinfo.number, devinfo.name, devinfo.kind)) - # print " Protocol %s" % devinfo.protocol - - firmware = api.get_device_firmware(receiver, devinfo.number, features=devinfo.features) + firmware = dev.firmware for fw in firmware: print (" %-10s: %s %s" % (fw.kind, fw.name, fw.version)) - for index in range(0, len(devinfo.features)): - feature = devinfo.features[index] + all_features = api.get_device_features(dev.handle, dev.number) + for index in range(0, len(all_features)): + feature = all_features[index] if feature: print (" ~ Feature %-20s (%s) at index %02X" % (FEATURE_NAME[feature], api._hex(feature), index)) - if FEATURE.BATTERY in devinfo.features: - discharge, dischargeNext, status = api.get_device_battery_level(receiver, devinfo.number, features=devinfo.features) + if FEATURE.BATTERY in all_features: + discharge, dischargeNext, status = api.get_device_battery_level(dev.handle, dev.number, features=all_features) print (" Battery %d charged (next level %d%), status %s" % (discharge, dischargeNext, status)) - if FEATURE.REPROGRAMMABLE_KEYS in devinfo.features: - keys = api.get_device_keys(receiver, devinfo.number, features=devinfo.features) + if FEATURE.REPROGRAMMABLE_KEYS in all_features: + keys = api.get_device_keys(dev.handle, dev.number, features=all_features) if keys is not None and keys: print (" %d reprogrammable keys found" % len(keys)) for k in keys: @@ -58,20 +44,22 @@ def scan_devices(receiver): if __name__ == '__main__': import argparse - arg_parser = argparse.ArgumentParser() - arg_parser.add_argument('-v', '--verbose', action='count', default=0, - help='log the HID data traffic with the receiver') + arg_parser = argparse.ArgumentParser(prog='scan') + arg_parser.add_argument('-v', '--verbose', action='store_true', default=False, + help='log the HID data traffic') args = arg_parser.parse_args() - log_level = logging.root.level - 10 * args.verbose - logging.root.setLevel(log_level if log_level > 0 else 1) + import logging + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING) - for rawdevice in api._base.list_receiver_devices(): - receiver = api._base.try_open(rawdevice.path) - if receiver: - print ("!! Logitech Unifying Receiver found (%s)." % rawdevice.path) - scan_devices(receiver) - api.close(receiver) - break - else: + from .unifying_receiver import api + from .unifying_receiver.constants import * + + receiver = api.Receiver.open() + if receiver is None: print ("!! Logitech Unifying Receiver not found.") + else: + print ("!! Found Logitech Unifying Receiver: %s" % receiver) + print_receiver(receiver) + scan_devices(receiver) + receiver.close() diff --git a/lib/logitech/unifying_receiver/__init__.py b/lib/logitech/unifying_receiver/__init__.py index f6485ea7..19dac410 100644 --- a/lib/logitech/unifying_receiver/__init__.py +++ b/lib/logitech/unifying_receiver/__init__.py @@ -22,19 +22,11 @@ http://6xq.net/git/lars/lshidpp.git/plain/doc/ import logging -log = logging.getLogger('LUR') -log.propagate = 0 -log.setLevel(logging.DEBUG) +if logging.root.level > logging.DEBUG: + log = logging.getLogger('LUR') + log.addHandler(logging.NullHandler()) + log.propagate = 0 -if logging.root.level < logging.DEBUG: - handler = logging.FileHandler('lur.log', mode='w') - handler.setFormatter(logging.root.handlers[0].formatter) -else: - handler = logging.NullHandler() -log.addHandler(handler) -del handler - -del log del logging diff --git a/lib/logitech/unifying_receiver/api.py b/lib/logitech/unifying_receiver/api.py index 382b82ce..38411496 100644 --- a/lib/logitech/unifying_receiver/api.py +++ b/lib/logitech/unifying_receiver/api.py @@ -8,7 +8,6 @@ from struct import unpack as _unpack from . import base as _base from .common import (FirmwareInfo as _FirmwareInfo, - AttachedDeviceInfo as _AttachedDeviceInfo, ReprogrammableKeyInfo as _ReprogrammableKeyInfo) from .constants import (FEATURE, FEATURE_NAME, FEATURE_FLAGS, FIRMWARE_KIND, DEVICE_KIND, @@ -27,44 +26,153 @@ del getLogger # # -"""Opens the first Logitech Unifying Receiver found attached to the machine. +class PairedDevice(object): + def __init__(self, handle, number): + self.handle = handle + self.number = number -:returns: An open file handle for the found receiver, or ``None``. -""" -open = _base.open + self._name = None + self._kind = None + self._firmware = None + self.features = [FEATURE.ROOT] + + @property + def name(self): + if self._name is None: + self._name = get_device_name(self.handle, self.number, self.features) + return self._name or '?' + + @property + def kind(self): + if self._kind is None: + self._kind = get_device_kind(self.handle, self.number, self.features) + return self._kind or '?' + + @property + def firmware(self): + if self._firmware is None: + self._firmware = get_device_firmware(self.handle, self.number, self.features) + return self._firmware or () + + def ping(self): + reply = _base.request(self.handle, self.number, b'\x00\x10', b'\x00\x00\xAA') + return reply is not None and reply[2:3] == b'\xAA' + + def __str__(self): + return '' % (self.handle, self.number, self._name or '?') + + def __hash__(self): + return self.number -"""Closes a HID device handle.""" -close = _base.close +class Receiver(object): + name = 'Unifying Receiver' + max_devices = MAX_ATTACHED_DEVICES + def __init__(self, handle, path=None): + self.handle = handle + self.path = path -def get_receiver_info(handle): - serial = None - reply = _base.request(handle, 0xFF, b'\x83\xB5', b'\x03') - if reply and reply[0:1] == b'\x03': - serial = _hex(reply[1:5]) + self._serial = None + self._firmware = None - firmware = [] + def close(self): + handle = self.handle + self.handle = 0 + return (handle and _base.close(handle)) - reply = _base.request(handle, 0xFF, b'\x83\xB5', b'\x02') - if reply and reply[0:1] == b'\x02': - fw_version = _hex(reply[1:5]) - fw_version = '%s.%s.%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8]) - firmware.append(_FirmwareInfo(0, FIRMWARE_KIND[0], '', fw_version, None)) + @property + def serial(self): + if self._serial is None and self.handle: + serial = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x03') + if serial: + self._serial = _hex(serial[1:5]) + return self._serial - reply = _base.request(handle, 0xFF, b'\x81\xF1', b'\x04') - if reply and reply[0:1] == b'\x04': - bl_version = _hex(reply[1:3]) - bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4]) - firmware.append(_FirmwareInfo(1, FIRMWARE_KIND[1], '', bl_version, None)) + @property + def firmware(self): + if self._firmware is None and self.handle: + firmware = [] - return (serial, tuple(firmware)) + reply = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x02') + if reply and reply[0:1] == b'\x02': + fw_version = _hex(reply[1:5]) + fw_version = '%s.%s.%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8]) + firmware.append(_FirmwareInfo(0, FIRMWARE_KIND[0], '', fw_version, None)) + reply = _base.request(self.handle, 0xFF, b'\x81\xF1', b'\x04') + if reply and reply[0:1] == b'\x04': + bl_version = _hex(reply[1:3]) + bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4]) + firmware.append(_FirmwareInfo(1, FIRMWARE_KIND[1], '', bl_version, None)) -def count_devices(handle): - count = _base.request(handle, 0xFF, b'\x81\x00') - return 0 if count is None else ord(count[1:2]) + self._firmware = tuple(firmware) + return self._firmware + + def __iter__(self): + if self.handle == 0: + return + + for number in range(1, MAX_ATTACHED_DEVICES): + dev = get_device(self.handle, number) + if dev is not None: + yield dev + + def __getitem__(self, key): + if type(key) != int: + raise TypeError('key must be an integer') + if self.handle == 0 or key < 0 or key > MAX_ATTACHED_DEVICES: + raise IndexError(key) + return get_device(self.handle, key) if key > 0 else None + + def __delitem__(self, key): + if type(key) != int: + raise TypeError('key must be an integer') + if self.handle == 0 or key < 0 or key > MAX_ATTACHED_DEVICES: + raise IndexError(key) + if key > 0: + _log.debug("unpairing device %d", key) + reply = _base.request(self.handle, 0xFF, b'\x80\xB2', _pack('!BB', 0x03, key)) + if reply is None or reply[1:2] == b'\x8F': + raise IndexError(key) + + def __len__(self): + if self.handle == 0: + return 0 + # not really sure about this one... + count = _base.request(self.handle, 0xFF, b'\x81\x00') + return 0 if count is None else ord(count[1:2]) + + def __contains__(self, dev): + if self.handle == 0: + return False + if type(dev) == int: + return (dev < 1 or dev > MAX_ATTACHED_DEVICES) and ping(self.handle, dev) + return ping(self.handle, dev.number) + + def __str__(self): + return '' % (self.handle, self.path) + + def __hash__(self): + return self.handle + + __bool__ = __nonzero__ = lambda self: self.handle != 0 + + @classmethod + def open(self): + """Opens the first Logitech Unifying Receiver found attached to the machine. + + :returns: An open file handle for the found receiver, or ``None``. + """ + for rawdevice in _base.list_receiver_devices(): + handle = _base.try_open(rawdevice.path) + if handle: + return Receiver(handle, rawdevice.path) + +# +# +# def request(handle, devnumber, feature, function=b'\x00', params=b'', features=None): """Makes a feature call to the device, and returns the reply data. @@ -109,72 +217,22 @@ def request(handle, devnumber, feature, function=b'\x00', params=b'', features=N def ping(handle, devnumber): - """Pings a device to check if it is attached to the UR. - - :returns: True if the device is connected to the UR, False if the device is - not attached, None if no conclusive reply is received. + """ + :returns: True if the device is connected to the UR. """ reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA') return reply is not None and reply[2:3] == b'\xAA' -def get_device_protocol(handle, devnumber): - reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA') - if reply is not None and len(reply) > 2 and reply[2:3] == b'\xAA': - return 'HID %d.%d' % (ord(reply[0:1]), ord(reply[1:2])) +def get_device(handle, devnumber, features=None): + """Gets the complete info for a device (type, features). - -def find_device_by_name(handle, name): - """Searches for an attached device by name. - - This function does it the hard way, querying all possible device numbers. - - :returns: an AttachedDeviceInfo tuple, or ``None``. + :returns: a PairedDevice or ``None``. """ - _log.debug("searching for device '%s'", name) - - for devnumber in range(1, 1 + MAX_ATTACHED_DEVICES): - features = get_device_features(handle, devnumber) - if features: - d_name = get_device_name(handle, devnumber, features) - if d_name == name: - return get_device_info(handle, devnumber, name=d_name, features=features) - - -def list_devices(handle): - """List all devices attached to the UR. - - This function does it the hard way, querying all possible device numbers. - - :returns: a list of AttachedDeviceInfo tuples. - """ - _log.debug("listing all devices") - - devices = [] - - for device in range(1, 1 + MAX_ATTACHED_DEVICES): - features = get_device_features(handle, device) - if features: - devices.append(get_device_info(handle, device, features=features)) - - return devices - - -def get_device_info(handle, devnumber, name=None, features=None): - """Gets the complete info for a device (type, name, features). - - :returns: an AttachedDeviceInfo tuple, or ``None``. - """ - if features is None: - features = get_device_features(handle, devnumber) - if features is None: - return None - - d_kind = get_device_kind(handle, devnumber, features) - d_name = get_device_name(handle, devnumber, features) if name is None else name - devinfo = _AttachedDeviceInfo(handle, devnumber, d_kind, d_name, features) - _log.debug("(%d) found device %s", devnumber, devinfo) - return devinfo + if ping(handle, devnumber): + devinfo = PairedDevice(handle, devnumber) + # _log.debug("(%d) found device %s", devnumber, devinfo) + return devinfo def get_feature_index(handle, devnumber, feature): @@ -182,7 +240,7 @@ def get_feature_index(handle, devnumber, feature): :returns: An int, or ``None`` if the feature is not available. """ - _log.debug("(%d) get feature index <%s:%s>", devnumber, _hex(feature), FEATURE_NAME[feature]) + # _log.debug("(%d) get feature index <%s:%s>", devnumber, _hex(feature), FEATURE_NAME[feature]) if len(feature) != 2: raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature) @@ -191,13 +249,13 @@ def get_feature_index(handle, devnumber, feature): if reply: feature_index = ord(reply[0:1]) if feature_index: - feature_flags = ord(reply[1:2]) & 0xE0 - if feature_flags: - _log.debug("(%d) feature <%s:%s> has index %d: %s", - devnumber, _hex(feature), FEATURE_NAME[feature], feature_index, - ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) - else: - _log.debug("(%d) feature <%s:%s> has index %d", devnumber, _hex(feature), FEATURE_NAME[feature], feature_index) + # feature_flags = ord(reply[1:2]) & 0xE0 + # if feature_flags: + # _log.debug("(%d) feature <%s:%s> has index %d: %s", + # devnumber, _hex(feature), FEATURE_NAME[feature], feature_index, + # ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) + # else: + # _log.debug("(%d) feature <%s:%s> has index %d", devnumber, _hex(feature), FEATURE_NAME[feature], feature_index) # only consider active and supported features? # if feature_flags: @@ -218,6 +276,8 @@ def _get_feature_index(handle, devnumber, feature, features=None): index = get_feature_index(handle, devnumber, feature) if index is not None: + if len(features) <= index: + features += [None] * (index + 1 - len(features)) features[index] = feature return index @@ -228,7 +288,7 @@ def get_device_features(handle, devnumber): Their position in the array is the index to be used when requesting that feature on the device. """ - _log.debug("(%d) get device features", devnumber) + # _log.debug("(%d) get device features", devnumber) # get the index of the FEATURE_SET # FEATURE.ROOT should always be available for all devices @@ -250,23 +310,23 @@ def get_device_features(handle, devnumber): return None features_count = ord(features_count[:1]) - _log.debug("(%d) found %d features", devnumber, features_count) + # _log.debug("(%d) found %d features", devnumber, features_count) features = [None] * 0x20 for index in range(1, 1 + features_count): # for each index, get the feature residing at that index feature = _base.request(handle, devnumber, fs_index + b'\x10', _pack('!B', index)) if feature: - feature_flags = ord(feature[2:3]) & 0xE0 + # feature_flags = ord(feature[2:3]) & 0xE0 feature = feature[0:2].upper() features[index] = feature - if feature_flags: - _log.debug("(%d) feature <%s:%s> at index %d: %s", - devnumber, _hex(feature), FEATURE_NAME[feature], index, - ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) - else: - _log.debug("(%d) feature <%s:%s> at index %d", devnumber, _hex(feature), FEATURE_NAME[feature], index) + # if feature_flags: + # _log.debug("(%d) feature <%s:%s> at index %d: %s", + # devnumber, _hex(feature), FEATURE_NAME[feature], index, + # ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) + # else: + # _log.debug("(%d) feature <%s:%s> at index %d", devnumber, _hex(feature), FEATURE_NAME[feature], index) features[0] = FEATURE.ROOT while features[-1] is None: @@ -309,7 +369,7 @@ def get_device_firmware(handle, devnumber, features=None): fw_info = _FirmwareInfo(level, FIRMWARE_KIND[-1], '', '', None) fw.append(fw_info) - _log.debug("(%d) firmware %s", devnumber, fw_info) + # _log.debug("(%d) firmware %s", devnumber, fw_info) return tuple(fw) @@ -327,7 +387,7 @@ def get_device_kind(handle, devnumber, features=None): d_kind = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x20)) if d_kind: d_kind = ord(d_kind[:1]) - _log.debug("(%d) device type %d = %s", devnumber, d_kind, DEVICE_KIND[d_kind]) + # _log.debug("(%d) device type %d = %s", devnumber, d_kind, DEVICE_KIND[d_kind]) return DEVICE_KIND[d_kind] @@ -355,7 +415,7 @@ def get_device_name(handle, devnumber, features=None): break d_name = d_name.decode('ascii') - _log.debug("(%d) device name %s", devnumber, d_name) + # _log.debug("(%d) device name %s", devnumber, d_name) return d_name diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index 9974d0bf..82df116f 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -130,12 +130,9 @@ def open(): """ for rawdevice in list_receiver_devices(): _log.info("checking %s", rawdevice) - - receiver = try_open(rawdevice.path) - if receiver: - return receiver - - return None + handle = try_open(rawdevice.path) + if handle: + return handle def close(handle): @@ -143,7 +140,7 @@ def close(handle): if handle: try: _hid.close(handle) - _log.info("closed receiver handle %X", handle) + # _log.info("closed receiver handle %X", handle) return True except: _log.exception("closing receiver handle %X", handle) @@ -239,7 +236,7 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None if type(params) == int: params = _pack('!B', params) - _log.debug("(%d) request {%s} params [%s]", devnumber, _hex(feature_index_function), _hex(params)) + # _log.debug("(%d) request {%s} params [%s]", devnumber, _hex(feature_index_function), _hex(params)) if len(feature_index_function) != 2: raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hex(feature_index_function)) diff --git a/lib/logitech/unifying_receiver/common.py b/lib/logitech/unifying_receiver/common.py index d3e0567f..afd01330 100644 --- a/lib/logitech/unifying_receiver/common.py +++ b/lib/logitech/unifying_receiver/common.py @@ -23,14 +23,6 @@ def list2dict(values_list): return dict(zip(range(0, len(values_list)), values_list)) -"""Tuple returned by list_devices and find_device_by_name.""" -AttachedDeviceInfo = namedtuple('AttachedDeviceInfo', [ - 'handle', - 'number', - 'kind', - 'name', - 'features']) - """Firmware information.""" FirmwareInfo = namedtuple('FirmwareInfo', [ 'level', diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py index 220a7fcb..a3e53264 100644 --- a/lib/logitech/unifying_receiver/listener.py +++ b/lib/logitech/unifying_receiver/listener.py @@ -29,7 +29,7 @@ def _event_dispatch(listener, callback): event = listener._events.get(True, _READ_EVENT_TIMEOUT * 10) except: continue - _log.debug("delivering event %s", event) + # _log.debug("delivering event %s", event) try: callback(event) except: @@ -70,7 +70,6 @@ class EventsListener(_Thread): self._dispatcher.start() while self._active: - event = None try: # _log.debug("read next event") event = _base.read(self._handle, _READ_EVENT_TIMEOUT) @@ -79,29 +78,28 @@ class EventsListener(_Thread): _log.warn("receiver disconnected") self._events.put(_Packet(0xFF, 0xFF, None)) self._active = False - break + else: + if event is not None: + matched = False + task = None if self._tasks.empty() else self._tasks.queue[0] + if task and task[-1] is None: + devnumber, data = task[:2] + if event[1] == devnumber: + # _log.debug("matching %s to %d, %s", event, devnumber, repr(data)) + if event[0] == 0x11 or (event[0] == 0x10 and devnumber == 0xFF): + matched = (event[2][:2] == data[:2]) or (event[2][:1] == b'\xFF' and event[2][1:3] == data[:2]) + elif event[0] == 0x10: + if event[2][:1] == b'\x8F' and event[2][1:3] == data[:2]: + matched = True - if event is not None: - matched = False - task = None if self._tasks.empty() else self._tasks.queue[0] - if task and task[0] and task[-1] is None: - devnumber, data = task[1:3] - if event[1] == devnumber: - # _log.debug("matching %s to %d, %s", event, devnumber, repr(data)) - if event[0] == 0x11 or (event[0] == 0x10 and devnumber == 0xFF): - matched = (event[2][:2] == data[:2]) or (event[2][:1] == b'\xFF' and event[2][1:3] == data[:2]) - elif event[0] == 0x10: - if event[2][:1] == b'\x8F' and event[2][1:3] == data[:2]: - matched = True - - if matched: - # _log.debug("request reply %s", event) - task[-1] = event - self._tasks.task_done() - else: - event = _Packet(*event) - _log.info("queueing event %s", event) - self._events.put(event) + if matched: + # _log.debug("request reply %s", event) + task[-1] = event + self._tasks.task_done() + else: + event = _Packet(*event) + _log.info("queueing event %s", event) + self._events.put(event) _base.request_context = None _base.close(self._handle) @@ -123,11 +121,10 @@ class EventsListener(_Thread): def write(self, handle, devnumber, data): assert handle == self._handle # _log.debug("write %02X %s", devnumber, _base._hex(data)) - task = [False, devnumber, data, None] + task = [devnumber, data, None] self._tasks.put(task) _base.write(self._handle, devnumber, data) - task[0] = True - _log.debug("task queued %s", task) + # _log.debug("task queued %s", task) def read(self, handle, timeout=_base.DEFAULT_TIMEOUT): assert handle == self._handle @@ -135,7 +132,7 @@ class EventsListener(_Thread): assert not self._tasks.empty() self._tasks.join() task = self._tasks.get(False) - _log.debug("task ready %s", task) + # _log.debug("task ready %s", task) return task[-1] def unhandled_hook(self, reply_code, devnumber, data): diff --git a/lib/logitech/unifying_receiver/tests/test_50_api.py b/lib/logitech/unifying_receiver/tests/test_50_api.py index b29ccfe1..c2ee9792 100644 --- a/lib/logitech/unifying_receiver/tests/test_50_api.py +++ b/lib/logitech/unifying_receiver/tests/test_50_api.py @@ -13,21 +13,21 @@ from ..common import * class Test_UR_API(unittest.TestCase): @classmethod def setUpClass(cls): - cls.handle = None + cls.receiver = None cls.device = None cls.features = None cls.device_info = None @classmethod def tearDownClass(cls): - if cls.handle: - api.close(cls.handle) + if cls.receiver: + cls.receiver.close() cls.device = None cls.features = None cls.device_info = None def _check(self, check_device=True, check_features=False): - if self.handle is None: + if self.receiver is None: self.fail("No receiver found") if check_device and self.device is None: self.fail("Found no devices attached.") @@ -35,13 +35,13 @@ class Test_UR_API(unittest.TestCase): self.fail("no feature set available") def test_00_open_receiver(self): - Test_UR_API.handle = api.open() + Test_UR_API.receiver = api.Receiver.open() self._check(check_device=False) def test_05_ping_device_zero(self): self._check(check_device=False) - ok = api.ping(self.handle, 0) + ok = api.ping(self.receiver.handle, 0) self.assertIsNotNone(ok, "invalid ping reply") self.assertFalse(ok, "device zero replied") @@ -50,33 +50,33 @@ class Test_UR_API(unittest.TestCase): devices = [] - for device in range(1, 1 + MAX_ATTACHED_DEVICES): - ok = api.ping(self.handle, device) + for devnumber in range(1, 1 + MAX_ATTACHED_DEVICES): + ok = api.ping(self.receiver.handle, devnumber) self.assertIsNotNone(ok, "invalid ping reply") if ok: - devices.append(device) + devices.append(self.receiver[devnumber]) if devices: - Test_UR_API.device = devices[0] + Test_UR_API.device = devices[0].number def test_30_get_feature_index(self): self._check() - fs_index = api.get_feature_index(self.handle, self.device, FEATURE.FEATURE_SET) + fs_index = api.get_feature_index(self.receiver.handle, self.device, FEATURE.FEATURE_SET) self.assertIsNotNone(fs_index, "feature FEATURE_SET not available") self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index)) def test_31_bad_feature(self): self._check() - reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF') + reply = api.request(self.receiver.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF') self.assertIsNotNone(reply, "invalid reply") self.assertEqual(reply[:5], b'\x00' * 5, "invalid reply") def test_40_get_device_features(self): self._check() - features = api.get_device_features(self.handle, self.device) + features = api.get_device_features(self.receiver.handle, self.device) self.assertIsNotNone(features, "failed to read features array") self.assertIn(FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available") # cache this to simplify next tests @@ -85,7 +85,7 @@ class Test_UR_API(unittest.TestCase): def test_50_get_device_firmware(self): self._check(check_features=True) - d_firmware = api.get_device_firmware(self.handle, self.device, self.features) + d_firmware = api.get_device_firmware(self.receiver.handle, self.device, self.features) self.assertIsNotNone(d_firmware, "failed to get device firmware") self.assertGreater(len(d_firmware), 0, "device reported no firmware") for fw in d_firmware: @@ -94,30 +94,30 @@ class Test_UR_API(unittest.TestCase): def test_52_get_device_kind(self): self._check(check_features=True) - d_kind = api.get_device_kind(self.handle, self.device, self.features) + d_kind = api.get_device_kind(self.receiver.handle, self.device, self.features) self.assertIsNotNone(d_kind, "failed to get device kind") self.assertGreater(len(d_kind), 0, "empty device kind") def test_55_get_device_name(self): self._check(check_features=True) - d_name = api.get_device_name(self.handle, self.device, self.features) + d_name = api.get_device_name(self.receiver.handle, self.device, self.features) self.assertIsNotNone(d_name, "failed to read device name") self.assertGreater(len(d_name), 0, "empty device name") def test_59_get_device_info(self): self._check(check_features=True) - device_info = api.get_device_info(self.handle, self.device, features=self.features) + device_info = api.get_device(self.receiver.handle, self.device, features=self.features) self.assertIsNotNone(device_info, "failed to read full device info") - self.assertIsInstance(device_info, AttachedDeviceInfo) + self.assertIsInstance(device_info, api.PairedDevice) Test_UR_API.device_info = device_info def test_60_get_battery_level(self): self._check(check_features=True) if FEATURE.BATTERY in self.features: - battery = api.get_device_battery_level(self.handle, self.device, self.features) + battery = api.get_device_battery_level(self.receiver.handle, self.device, self.features) self.assertIsNotNone(battery, "failed to read battery level") self.assertIsInstance(battery, tuple, "result not a tuple") else: @@ -126,21 +126,9 @@ class Test_UR_API(unittest.TestCase): def test_70_list_devices(self): self._check(check_device=False) - all_devices = api.list_devices(self.handle) - if all_devices: - self.assertIsNotNone(self.device) - for device_info in all_devices: - self.assertIsInstance(device_info, AttachedDeviceInfo) - else: - self.assertIsNone(self.device) - - def test_70_find_device_by_name(self): - self._check() - - all_devices = api.list_devices(self.handle) - for device_info in all_devices: - device = api.find_device_by_name(self.handle, device_info.name) - self.assertEqual(device, device_info) + for dev in self.receiver: + self.assertIsNotNone(dev) + self.assertIsInstance(dev, api.PairedDevice) if __name__ == '__main__': unittest.main()