diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 00000000..98976f4c --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,59 @@ +# +# +# + +import threading +from gi.repository import Gtk +from gi.repository import GObject + +from . import constants as C +from .watcher import WatcherThread +from . import ui + + +def _status_updated(watcher, icon, window): + while True: + watcher.status_changed.wait() + text = watcher.status_text + watcher.status_changed.clear() + + if icon: + GObject.idle_add(icon.set_tooltip_text, text) + + if window: + ur_detected = watcher.has_receiver() + devices = [ watcher.devices[k] for k in watcher.devices ] if ur_detected else [] + GObject.idle_add(ui.window.update, window, ur_detected, devices) + + +# def _pair_new_device(trigger, watcher): +# pass + + +def run(images_path): + GObject.threads_init() + + ui.init(images_path) + ui.notify.start(C.APP_TITLE, ui.image) + + watcher = WatcherThread(ui.notify.show) + watcher.start() + + window = ui.window.create(C.APP_TITLE, ui.image) + + menu_actions = [('Scan all devices', watcher.request_all_statuses), + # ('Pair new device', _pair_new_device, watcher), + None, + ('Quit', Gtk.main_quit)] + + click_action = (ui.window.toggle, window) if window else None + tray_icon = ui.icon.create(ui.image('icon'), C.APP_TITLE, menu_actions, click_action) + + ui_update_thread = threading.Thread(target=_status_updated, name='ui_update', args=(watcher, tray_icon, window)) + ui_update_thread.daemon = True + ui_update_thread.start() + + Gtk.main() + + watcher.stop() + ui.notify.stop() diff --git a/app/constants.py b/app/constants.py new file mode 100644 index 00000000..39578bcc --- /dev/null +++ b/app/constants.py @@ -0,0 +1,10 @@ +# +# Commonly used strings +# + +APP_TITLE = 'Solaar' +UNIFYING_RECEIVER = 'Unifying Receiver' +NO_DEVICES = 'No devices attached.' +SCANNING = 'Initializing...' +NO_RECEIVER = 'Unifying Receiver not found.' +FOUND_RECEIVER = 'Unifying Receiver found.' diff --git a/app/ui/__init__.py b/app/ui/__init__.py new file mode 100644 index 00000000..73ca10ae --- /dev/null +++ b/app/ui/__init__.py @@ -0,0 +1,26 @@ +# pass + +import os.path as _os_path +from . import (icon, notify, window) + + +_images_path = None +_IMAGES = {} + + +def init(images_path=None): + global _images_path + _images_path = images_path + + +def image(name): + if name in _IMAGES: + return _IMAGES[name] + + if _images_path: + path = _os_path.join(_images_path, name + '.png') + if _os_path.isfile(path): + _IMAGES[name] = path + return path + else: + _IMAGES[name] = None diff --git a/app/ui/icon.py b/app/ui/icon.py new file mode 100644 index 00000000..8bf0fcc1 --- /dev/null +++ b/app/ui/icon.py @@ -0,0 +1,42 @@ +# +# +# + +from gi.repository import Gtk + + +def _show_icon_menu(icon, button, time, menu): + menu.popup(None, None, icon.position_menu, icon, button, time) + + +def create(app_icon, title, menu_actions, click_action=None): + icon = Gtk.StatusIcon.new_from_file(app_icon) + icon.set_title(title) + icon.set_name(title) + + if click_action: + if type(click_action) == tuple: + function = click_action[0] + args = click_action[1:] + icon.connect('activate', function, *args) + else: + icon.connect('activate', click_action) + + if menu_actions: + if type(menu_actions) == list: + menu = Gtk.Menu() + for action in menu_actions: + if action: + item = Gtk.MenuItem(action[0]) + function = action[1] + args = action[2:] if len(action) > 2 else () + item.connect('activate', function, *args) + menu.append(item) + else: + menu.append(Gtk.SeparatorMenuItem()) + menu.show_all() + icon.connect('popup_menu', _show_icon_menu, menu) + else: + icon.connect('popup_menu', menu_actions) + + return icon diff --git a/app/ui/notify.py b/app/ui/notify.py new file mode 100644 index 00000000..808553d8 --- /dev/null +++ b/app/ui/notify.py @@ -0,0 +1,56 @@ +# +# Optional desktop notifications. +# + +try: + import notify2 as _notify + + + available = True + _app_title = None + _images = lambda x: None + _notifications = {} + + + def start(app_title, images=None): + global _app_title, _images + _notify.init(app_title) + _app_title = app_title + _images = images or (lambda x: None) + + + def stop(): + global _app_title + _app_title = None + all(n.close() for n in list(_notifications.values())) + _notify.uninit() + _notifications.clear() + + + def show(status, title, text, icon=None): + if not _app_title: + return + + if title in _notifications: + notification = _notifications[title] + else: + _notifications[title] = notification = _notify.Notification(title) + + if text == notification.message: + # there's no need to show the same notification twice in a row + return + + path = _images('devices/' + title if icon is None else icon) + icon = ('error' if status < 0 else 'info') if path is None else path + + notification.update(title, text, icon) + notification.show() + +except ImportError: + import logging + logging.exception("ouch") + logging.warn("python-notify2 not found, desktop notifications are disabled") + available = False + def start(app_title): pass + def stop(): pass + def show(status, title, text, icon=None): pass diff --git a/app/ui/window.py b/app/ui/window.py new file mode 100644 index 00000000..e9544e4b --- /dev/null +++ b/app/ui/window.py @@ -0,0 +1,213 @@ +# +# +# + +from gi.repository import Gtk +from gi.repository import Gdk +from gi.repository import GdkPixbuf + +from .. import constants as C + + +_DEVICE_ICON_SIZE = 48 +_STATUS_ICON_SIZE = 64 +_PLACEHOLDER = '~' +_images = None +_MAX_DEVICES = 7 + +_ICONS = {} + + +def _icon(icon, title, size=_DEVICE_ICON_SIZE, fallback=None): + icon = icon or Gtk.Image() + + if title and title in _ICONS: + icon.set_from_pixbuf(_ICONS[title]) + else: + icon_file = _images(title) if title else None + if icon_file: + pixbuf = GdkPixbuf.Pixbuf().new_from_file(icon_file) + if pixbuf.get_width() > size or pixbuf.get_height() > size: + if pixbuf.get_width() > pixbuf.get_height(): + new_width = size + new_height = size * pixbuf.get_height() / pixbuf.get_width() + else: + new_width = size * pixbuf.get_width() / pixbuf.get_height() + new_height = size + pixbuf = pixbuf.scale_simple(new_width, new_height, GdkPixbuf.InterpType.HYPER) + icon.set_from_pixbuf(pixbuf) + _ICONS[title] = pixbuf + elif fallback: + icon.set_from_icon_name(fallback, size if size < _DEVICE_ICON_SIZE else Gtk.IconSize.DIALOG) + + if size >= _DEVICE_ICON_SIZE: + icon.set_size_request(size, size) + return icon + + +def update(window, ur_available, devices): + if not window or not window.get_child(): + return + controls = list(window.get_child().get_children()) + + first = controls[0] + first.set_visible(not ur_available or not devices) + if ur_available: + ur_status = C.FOUND_RECEIVER if devices else C.NO_DEVICES + else: + ur_status = C.NO_RECEIVER + _, label = first.get_children() + label.set_markup('%s\n%s' % (C.UNIFYING_RECEIVER, ur_status)) + + for index in range(1, _MAX_DEVICES): + box = controls[index] + devstatus = [d for d in devices if d.number == index] + devstatus = devstatus[0] if devstatus else None + box.set_visible(devstatus is not None) + + if devstatus: + box.set_sensitive(devstatus.code >= 0) + icon, expander = box.get_children() + if not expander.get_data('devstatus'): + expander.set_data('devstatus', devstatus,) + _icon(icon, 'devices/' + devstatus.name, fallback=devstatus.type.lower()) + + label = expander.get_label_widget() + if expander.get_expanded(): + label.set_markup('%s' % devstatus.name) + else: + label.set_markup('%s\n%s' % (devstatus.name, devstatus.props['text'])) + + ebox = expander.get_child() + + # refresh_button = ebox.get_children()[0] + # refresh_button.connect('activate', devstatus.refresh) + + texts = [] + + battery_icon = ebox.get_children()[-1] + if 'battery-level' in devstatus.props: + level = devstatus.props['battery-level'] + icon_name = 'battery/' + str((level + 10) // 20) + _icon(battery_icon, icon_name, _STATUS_ICON_SIZE) + texts.append('Battery: ' + str(level) + '%') + else: + _icon(battery_icon, 'battery/unknown', _STATUS_ICON_SIZE) + texts.append('Battery: unknown') + battery_icon.set_tooltip_text(texts[-1]) + + light_icon = ebox.get_children()[-2] + if 'light-level' in devstatus.props: + lux = devstatus.props['light-level'] + icon_name = 'light/' + str((lux + 50) // 100) + _icon(light_icon, icon_name, _STATUS_ICON_SIZE) + + texts.append('Light: ' + str(lux) + ' lux') + light_icon.set_tooltip_text(texts[-1]) + light_icon.set_visible(True) + else: + light_icon.set_visible(False) + + label = ebox.get_children()[-3] + label.set_text('\n'.join(texts)) + +def _expander_activate(expander): + devstatus = expander.get_data('devstatus') + if devstatus: + label = expander.get_label_widget() + if expander.get_expanded(): + label.set_markup('%s\n%s' % (devstatus.name, devstatus.props['text'])) + else: + label.set_markup('%s' % devstatus.name) + + +def _device_box(title): + icon = _icon(None, 'devices/' + title) + icon.set_alignment(0.5, 0) + + label = Gtk.Label() + label.set_markup('%s' % title) + label.set_alignment(0, 0.5) + label.set_can_focus(False) + + box = Gtk.HBox(spacing=10) + box.pack_start(icon, False, False, 0) + + if title == C.UNIFYING_RECEIVER: + box.add(label) + else: + expander = Gtk.Expander() + expander.set_can_focus(False) + expander.set_label_widget(label) + expander.connect('activate', _expander_activate) + + ebox = Gtk.HBox(False, 10) + ebox.set_border_width(4) + + # refresh_button = Gtk.Button() + # refresh_button.set_image(_icon(None, None, size=Gtk.IconSize.SMALL_TOOLBAR, fallback='reload')) + # refresh_button.set_focus_on_click(False) + # refresh_button.set_can_focus(False) + # refresh_button.set_image_position(Gtk.PositionType.TOP) + # refresh_button.set_alignment(0.5, 0.5) + # refresh_button.set_relief(Gtk.ReliefStyle.NONE) + # refresh_button.set_size_request(20, 20) + # refresh_button.set_tooltip_text('Refresh') + # ebox.pack_start(refresh_button, False, False, 2) + + label = Gtk.Label() + label.set_alignment(0, 0.5) + ebox.pack_start(label, False, True, 8) + + light_icon = _icon(None, 'light/unknown', _STATUS_ICON_SIZE) + ebox.pack_end(light_icon, False, True, 0) + + battery_icon = _icon(None, 'battery/unknown', _STATUS_ICON_SIZE) + ebox.pack_end(battery_icon, False, True, 0) + + expander.add(ebox) + box.pack_start(expander, True, True, 1) + + box.show_all() + box.set_visible(title != _PLACEHOLDER) + return box + + +def create(title, images=None): + global _images + _images = images or (lambda x: None) + + vbox = Gtk.VBox(spacing=8) + vbox.set_border_width(6) + + vbox.add(_device_box(C.UNIFYING_RECEIVER)) + for i in range(1, _MAX_DEVICES): + vbox.add(_device_box(_PLACEHOLDER)) + vbox.set_visible(True) + + window = Gtk.Window() # Gtk.WindowType.POPUP) + window.set_title(title) + window.set_icon_from_file(_images('icon')) + window.set_keep_above(True) + window.set_decorated(False) + window.set_skip_taskbar_hint(True) + window.set_skip_pager_hint(True) + window.set_deletable(False) + window.set_resizable(False) + window.set_position(Gtk.WindowPosition.MOUSE) + window.set_type_hint(Gdk.WindowTypeHint.TOOLTIP) + window.connect('focus-out-event', _hide) + + window.add(vbox) + return window + + +def _hide(window, _): + window.set_visible(False) + + +def toggle(_, window): + if window.get_visible(): + window.set_visible(False) + else: + window.present() diff --git a/app/watcher.py b/app/watcher.py new file mode 100644 index 00000000..3b058496 --- /dev/null +++ b/app/watcher.py @@ -0,0 +1,200 @@ +# +# +# + +import logging +import threading +import time + +import constants as C +from logitech.unifying_receiver import api +from logitech.unifying_receiver.listener import EventsListener +from logitech import devices + + +_STATUS_TIMEOUT = 97 # seconds +_THREAD_SLEEP = 7 # seconds +_FORGET_TIMEOUT = 5 * 60 # seconds + + +class _DevStatus(api.AttachedDeviceInfo): + timestamp = time.time() + code = devices.STATUS.CONNECTED + props = {devices.PROPS.TEXT: devices.STATUS_NAME[devices.STATUS.CONNECTED]} + refresh = None + + +class WatcherThread(threading.Thread): + def __init__(self, notify_callback=None): + super(WatcherThread, self).__init__(name='WatcherThread') + self.daemon = True + self.active = False + + self.notify = notify_callback + self.status_text = None + self.status_changed = threading.Event() + + self.listener = None + self.devices = {} + + def run(self): + self.active = True + self._notify(0, C.UNIFYING_RECEIVER, C.SCANNING) + + while self.active: + if self.listener is None: + receiver = api.open() + if receiver: + self._notify(1, C.UNIFYING_RECEIVER, C.FOUND_RECEIVER) + for devinfo in api.list_devices(receiver): + devstatus = _DevStatus(*devinfo) + self.devices[devinfo.number] = devstatus + self._notify(devices.STATUS.CONNECTED, devstatus.name, devices.STATUS_NAME[devices.STATUS.CONNECTED]) + self.listener = EventsListener(receiver, self._events_callback) + self.listener.start() + self._update_status() + else: + self._notify(-1, C.UNIFYING_RECEIVER, C.NO_RECEIVER) + elif not self.listener.active: + self.listener = None + self._notify(-1, C.UNIFYING_RECEIVER, C.NO_RECEIVER) + self.devices.clear() + + if self.active: + update_icon = True + if self.listener and self.devices: + update_icon &= self._check_old_statuses() + + if self.active: + if update_icon: + self._update_status() + time.sleep(_THREAD_SLEEP) + + def stop(self): + self.active = False + if self.listener: + self.listener.stop() + api.close(self.listener.receiver) + + def has_receiver(self): + return self.listener is not None and self.listener.active + + def request_all_statuses(self, _=None): + updated = False + + for d in range(1, 7): + devstatus = self.devices.get(d) + if devstatus: + status = devices.request_status(devstatus, self.listener) + updated |= self._device_status_changed(devstatus, status) + else: + devstatus = self._new_device(d) + updated |= devstatus is not None + + if updated: + self._update_status() + + def _check_old_statuses(self): + updated = False + + for devstatus in list(self.devices.values()): + if time.time() - devstatus.timestamp > _STATUS_TIMEOUT: + status = devices.ping(devstatus, self.listener) + updated |= self._device_status_changed(devstatus, status) + + return updated + + def _new_device(self, device): + devinfo = api.get_device_info(self.listener.receiver, device) + if devinfo: + devstatus = _DevStatus(*devinfo) + self.devices[device] = devstatus + self._notify(devstatus.code, devstatus.name, devstatus.props[devices.PROPS.TEXT]) + return devinfo + + def _events_callback(self, code, device, data): + logging.debug("%s: event %02x %d %s", time.asctime(), code, device, repr(data)) + + updated = False + + if device in self.devices: + devstatus = self.devices[device] + if code == 0x10 and data[0] == 'b\x8F': + updated = True + self._device_status_changed(devstatus, devices.STATUS.UNAVAILABLE) + elif code == 0x11: + status = devices.process_event(devstatus, self.listener, data) + updated |= self._device_status_changed(devstatus, status) + else: + logging.warn("unknown event code %02x", code) + elif device: + logging.debug("got event (%d, %d, %s) for new device", code, device, repr(data)) + self._new_device(device) + updated = True + else: + logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data) + + if updated: + self._update_status() + + def _device_status_changed(self, devstatus, status): + if status is None: + return False + + old_status_code = devstatus.code + devstatus.timestamp = time.time() + + if type(status) == int: + devstatus.code = status + if devstatus.code in devices.STATUS_NAME: + devstatus.props[devices.PROPS.TEXT] = devices.STATUS_NAME[devstatus.code] + else: + devstatus.code = status[0] + devstatus.props.update(status[1]) + + if old_status_code != devstatus.code: + logging.debug("%s: device status changed %s => %s: %s", time.asctime(), old_status_code, devstatus.code, devstatus.props) + # if not (devstatus.code == 0 and old_status_code > 0): + self._notify(devstatus.code, devstatus.name, devstatus.props[devices.PROPS.TEXT]) + + return True + + def _notify(self, *args): + if self.notify: + self.notify(*args) + + def notify_full(self): + if self.listener and self.listener.active: + if self.devices: + for devstatus in self.devices.values(): + self._notify(0, devstatus.name, devstatus.props[devices.PROPS.TEXT]) + else: + self._notify(0, C.UNIFYING_RECEIVER, C.NO_DEVICES) + else: + self._notify(-1, C.UNIFYING_RECEIVER, C.NO_RECEIVER) + + def _update_status(self): + last_status_text = self.status_text + + if self.listener and self.listener.active: + if self.devices: + all_statuses = [] + for d in self.devices: + devstatus = self.devices[d] + status_text = devstatus.props[devices.PROPS.TEXT] + if status_text: + if ' ' in status_text: + all_statuses.append(devstatus.name) + all_statuses.append(' ' + status_text) + else: + all_statuses.append(devstatus.name + ' ' + status_text) + else: + all_statuses.append(devstatus.name) + self.status_text = '\n'.join(all_statuses) + else: + self.status_text = C.NO_DEVICES + else: + self.status_text = C.NO_RECEIVER + + if self.status_text != last_status_text: + self.status_changed.set() diff --git a/images/Unifying Receiver.png b/images/Unifying Receiver.png deleted file mode 100644 index 314ed231..00000000 Binary files a/images/Unifying Receiver.png and /dev/null differ diff --git a/images/Wireless Solar Keyboard K750.png b/images/Wireless Solar Keyboard K750.png deleted file mode 100644 index 77eb391f..00000000 Binary files a/images/Wireless Solar Keyboard K750.png and /dev/null differ diff --git a/images/battery/0.png b/images/battery/0.png new file mode 100644 index 00000000..0b6a41a9 Binary files /dev/null and b/images/battery/0.png differ diff --git a/images/battery/1.png b/images/battery/1.png new file mode 100644 index 00000000..dd86a944 Binary files /dev/null and b/images/battery/1.png differ diff --git a/images/battery/2.png b/images/battery/2.png new file mode 100644 index 00000000..d03dd593 Binary files /dev/null and b/images/battery/2.png differ diff --git a/images/battery/3.png b/images/battery/3.png new file mode 100644 index 00000000..bc774f8b Binary files /dev/null and b/images/battery/3.png differ diff --git a/images/battery/4.png b/images/battery/4.png new file mode 100644 index 00000000..23e72ac3 Binary files /dev/null and b/images/battery/4.png differ diff --git a/images/battery/5.png b/images/battery/5.png new file mode 100644 index 00000000..3561ffb0 Binary files /dev/null and b/images/battery/5.png differ diff --git a/images/battery/unknown.png b/images/battery/unknown.png new file mode 100644 index 00000000..9f6cc466 Binary files /dev/null and b/images/battery/unknown.png differ diff --git a/images/devices/Unifying Receiver.png b/images/devices/Unifying Receiver.png new file mode 100644 index 00000000..15fae366 Binary files /dev/null and b/images/devices/Unifying Receiver.png differ diff --git a/images/devices/Wireless Solar Keyboard K750.png b/images/devices/Wireless Solar Keyboard K750.png new file mode 100644 index 00000000..2f746bbb Binary files /dev/null and b/images/devices/Wireless Solar Keyboard K750.png differ diff --git a/images/light/0.png b/images/light/0.png new file mode 100644 index 00000000..fb967968 Binary files /dev/null and b/images/light/0.png differ diff --git a/images/light/1.png b/images/light/1.png new file mode 100644 index 00000000..2a61b19c Binary files /dev/null and b/images/light/1.png differ diff --git a/images/light/2.png b/images/light/2.png new file mode 100644 index 00000000..0c93cbd8 Binary files /dev/null and b/images/light/2.png differ diff --git a/images/light/3.png b/images/light/3.png new file mode 100644 index 00000000..ff70a2c2 Binary files /dev/null and b/images/light/3.png differ diff --git a/images/light/4.png b/images/light/4.png new file mode 100644 index 00000000..a559d1da Binary files /dev/null and b/images/light/4.png differ diff --git a/images/light/5.png b/images/light/5.png new file mode 100644 index 00000000..0acf7a9b Binary files /dev/null and b/images/light/5.png differ diff --git a/images/light/unknown.png b/images/light/unknown.png new file mode 100644 index 00000000..c0c091b6 Binary files /dev/null and b/images/light/unknown.png differ diff --git a/lib/cli/ur_scanner.py b/lib/cli/ur_scanner.py index 1e0cc2ef..805f4cd7 100644 --- a/lib/cli/ur_scanner.py +++ b/lib/cli/ur_scanner.py @@ -24,7 +24,7 @@ def scan_devices(receiver): for index in range(0, len(devinfo.features)): feature = devinfo.features[index] if feature: - print "~ Feature %s (%s) at index %d" % (FEATURE_NAME[feature], hexlify(feature), index) + print " ~ Feature %s (%s) at index %d" % (FEATURE_NAME[feature], hexlify(feature), index) if FEATURE.BATTERY in devinfo.features: discharge, dischargeNext, status = api.get_device_battery_level(receiver, devinfo.number, features_array=devinfo.features) @@ -35,19 +35,8 @@ def scan_devices(receiver): if keys is not None and keys: print " %d reprogrammable keys found" % len(keys) for k in keys: - flags = '' - if k.flags & KEY_FLAG.REPROGRAMMABLE: - flags += ' reprogrammable' - if k.flags & KEY_FLAG.FN_SENSITIVE: - flags += ' fn-sensitive' - if k.flags & KEY_FLAG.NONSTANDARD: - flags += ' nonstandard' - if k.flags & KEY_FLAG.IS_FN: - flags += ' is-fn' - if k.flags & KEY_FLAG.MSE: - flags += ' mse' - - print " %2d: %s => %s :%s" % (k.index, KEY_NAME[k.id], KEY_NAME[k.task], flags) + flags = ','.join(KEY_FLAG_NAME[f] for f in KEY_FLAG_NAME if k.flags & f) + print " %2d: %-12s => %-12s :%s" % (k.index, KEY_NAME[k.id], KEY_NAME[k.task], flags) print "--------" diff --git a/lib/logitech/devices/__init__.py b/lib/logitech/devices/__init__.py index 9f8d5287..2300861a 100644 --- a/lib/logitech/devices/__init__.py +++ b/lib/logitech/devices/__init__.py @@ -5,20 +5,55 @@ from . import k750 from .constants import * +from ..unifying_receiver import api as _api +from ..unifying_receiver.common import FallbackDict as _FDict -_REQUEST_STATUS_FUNCTIONS = { - k750.NAME : k750.request_status, - } + +def ping(devinfo, listener): + reply = listener.request(_api.ping, devinfo.number) + return STATUS.CONNECTED if reply else STATUS.UNAVAILABLE + + +def default_request_status(devinfo, listener): + if _api.C.FEATURE.BATTERY in devinfo.features: + reply = listener.request(_api.get_device_battery_level, devinfo.number, features_array=devinfo.features) + if reply: + discharge, dischargeNext, status = reply + return STATUS.CONNECTED, {PROPS.BATTERY_LEVEL: discharge} + + +def default_process_event(devinfo, listener, data): + feature_index = ord(data[0:1]) + feature = devinfo.features[feature_index] + + if feature == _api.C.FEATURE.BATTERY: + if ord(data[1:2]) & 0xF0 == 0: + discharge = ord(data[2:3]) + status = _api.C.BATTERY_STATUS[ord(data[3:4])] + return STATUS.CONNECTED, {BATTERY_LEVEL: discharge, BATTERY_STATUS: status} + # ? + elif feature == _api.C.FEATURE.REPROGRAMMABLE_KEYS: + if ord(data[1:2]) & 0xF0 == 0: + print 'reprogrammable key', repr(data) + # TODO + pass + # ? + elif feature == _api.C.FEATURE.WIRELESS: + if ord(data[1:2]) & 0xF0 == 0: + # TODO + pass + # ? + + +_REQUEST_STATUS_FUNCTIONS = _FDict() +_REQUEST_STATUS_FUNCTIONS[k750.NAME] = k750.request_status def request_status(devinfo, listener): - if devinfo.name in _REQUEST_STATUS_FUNCTIONS: - return _REQUEST_STATUS_FUNCTIONS[devinfo.name](devinfo, listener) + return _REQUEST_STATUS_FUNCTIONS[devinfo.name](devinfo, listener) or default_request_status(devinfo, listener) or ping(devinfo, listener) -_PROCESS_EVENT_FUNCTIONS = { - k750.NAME : k750.process_event, - } +_PROCESS_EVENT_FUNCTIONS = _FDict() +_PROCESS_EVENT_FUNCTIONS[k750.NAME] = k750.process_event def process_event(devinfo, listener, data): - if devinfo.name in _PROCESS_EVENT_FUNCTIONS: - return _PROCESS_EVENT_FUNCTIONS[devinfo.name](devinfo, listener, data) + return default_process_event(devinfo, listener, data) or _PROCESS_EVENT_FUNCTIONS[devinfo.name](devinfo, listener, data) diff --git a/lib/logitech/devices/constants.py b/lib/logitech/devices/constants.py index c9344f74..1696c6b7 100644 --- a/lib/logitech/devices/constants.py +++ b/lib/logitech/devices/constants.py @@ -2,7 +2,7 @@ # # -DEVICE_STATUS = type('DEVICE_STATUS', (), +STATUS = type('STATUS', (), dict( UNKNOWN=None, UNAVAILABLE=-1, @@ -10,12 +10,21 @@ DEVICE_STATUS = type('DEVICE_STATUS', (), # ACTIVE=1, )) +PROPS = type('PROPS', (), + dict( + TEXT='text', + BATTERY_LEVEL='battery-level', + BATTERY_STATUS='battery-status', + LIGHT_LUX='lux', + LIGHT_LEVEL='light-level', + )) + from collections import defaultdict -DEVICE_STATUS_NAME = defaultdict(lambda x: None) -DEVICE_STATUS_NAME[DEVICE_STATUS.UNAVAILABLE] = 'disconnected' -DEVICE_STATUS_NAME[DEVICE_STATUS.CONNECTED] = 'connected' -# DEVICE_STATUS_NAME[DEVICE_STATUS.ACTIVE] = 'active' +STATUS_NAME = defaultdict(lambda x: None) +STATUS_NAME[STATUS.UNAVAILABLE] = 'disconnected' +STATUS_NAME[STATUS.CONNECTED] = 'connected' +# STATUS_NAME[STATUS.ACTIVE] = 'active' del defaultdict diff --git a/lib/logitech/devices/k750.py b/lib/logitech/devices/k750.py index 8d084cd2..b6c6f590 100644 --- a/lib/logitech/devices/k750.py +++ b/lib/logitech/devices/k750.py @@ -3,10 +3,10 @@ # import logging -import struct +from struct import unpack as _unpack from ..unifying_receiver import api as _api -from .constants import * +from . import constants as C # # @@ -14,10 +14,7 @@ from .constants import * NAME = 'Wireless Solar Keyboard K750' -_STATUS_NAMES = ('excellent', 'good', 'okay', 'poor', 'very low') - _CHARGE_LIMITS = (75, 40, 20, 10, -1) -_LIGHTING_LIMITS = (400, 200, 50, 20, -1) # # @@ -25,58 +22,55 @@ _LIGHTING_LIMITS = (400, 200, 50, 20, -1) def _trigger_solar_charge_events(receiver, devinfo): return _api.request(receiver, devinfo.number, - feature=_api.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', + feature=_api.C.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', features_array=devinfo.features) def _charge_status(data): - charge, lux = struct.unpack('!BH', data[2:5]) + charge, lux = _unpack('!BH', data[2:5]) + + d = {} for i in range(0, len(_CHARGE_LIMITS)): if charge >= _CHARGE_LIMITS[i]: charge_index = i break - text = 'Charge %d%% (%s)' % (charge, _STATUS_NAMES[charge_index]) + else: + charge_index = 0 + d[C.PROPS.BATTERY_LEVEL] = charge + text = 'Battery %d%%' % charge if lux > 0: - for i in range(0, len(_CHARGE_LIMITS)): - if lux > _LIGHTING_LIMITS[i]: - lighting_index = i - break - text += ', Lighting %s (%d lux)' % (_STATUS_NAMES[lighting_index], lux) + d[C.PROPS.LIGHT_LEVEL] = lux + text = 'Light: %d lux' % lux + ', ' + text - return 0x10 << charge_index, text + d[C.PROPS.TEXT] = text + return 0x10 << charge_index, d def request_status(devinfo, listener): - # Constantly requesting the solar charge status triggers a flood of events, - # which appear to drain the battery rather fast. - # Instead, ping the device for on/off status, and only ask for solar charge - # status when the user presses the solar key on the keyboard. - # - # reply = listener.request(_trigger_solar_charge_events, devinfo) - # if reply is None: - # return DEVICE_STATUS.UNAVAILABLE - - reply = listener.request(_api.ping, devinfo.number) - if not reply: - return DEVICE_STATUS.UNAVAILABLE + reply = listener.request(_trigger_solar_charge_events, devinfo) + if reply is None: + return C.STATUS.UNAVAILABLE def process_event(devinfo, listener, data): + if data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': + # usually sent after the keyboard is turned on + return _charge_status(data) + + if data[:2] == b'\x09\x10' and data[7:11] == b'GOOD': + # regular solar charge events + return _charge_status(data) + + if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD': + logging.debug("Solar key pressed") + if _trigger_solar_charge_events(listener.receiver, devinfo) is None: + return C.STATUS.UNAVAILABLE + return _charge_status(data) + if data[:2] == b'\x05\x00': # wireless device status if data[2:5] == b'\x01\x01\x01': logging.debug("Keyboard just started") - return DEVICE_STATUS.CONNECTED - elif data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': - # usually sent after the keyboard is turned on - return _charge_status(data) - elif data[:2] == b'\x09\x10' and data[7:11] == b'GOOD': - # regular solar charge events - return _charge_status(data) - elif data[:2] == b'\x09\x20' and data[7:11] == b'GOOD': - logging.debug("Solar key pressed") - if _trigger_solar_charge_events(listener.receiver, devinfo) is None: - return DEVICE_STATUS.UNAVAILABLE - return _charge_status(data) + return C.STATUS.CONNECTED diff --git a/lib/logitech/unifying_receiver/api.py b/lib/logitech/unifying_receiver/api.py index eb136be4..d615bd48 100644 --- a/lib/logitech/unifying_receiver/api.py +++ b/lib/logitech/unifying_receiver/api.py @@ -3,14 +3,17 @@ # import logging -import struct -from binascii import hexlify +from struct import pack as _pack +from struct import unpack as _unpack +from binascii import hexlify as _hexlify -from .common import * -from .constants import * -from .exceptions import * -from . import base -from .unhandled import _publish as _unhandled_publish +from .common import FirmwareInfo +from .common import AttachedDeviceInfo +from .common import ReprogrammableKeyInfo +from . import constants as C +from . import exceptions as E +from . import unhandled as _unhandled +from . import base as _base _LOG_LEVEL = 5 @@ -25,10 +28,10 @@ def open(): :returns: An open file handle for the found receiver, or ``None``. """ - for rawdevice in base.list_receiver_devices(): + for rawdevice in _base.list_receiver_devices(): _l.log(_LOG_LEVEL, "checking %s", rawdevice) - receiver = base.try_open(rawdevice.path) + receiver = _base.try_open(rawdevice.path) if receiver: return receiver @@ -36,7 +39,7 @@ def open(): """Closes a HID device handle.""" -close = base.close +close = _base.close def request(handle, device, feature, function=b'\x00', params=b'', features_array=None): @@ -58,7 +61,7 @@ def request(handle, device, feature, function=b'\x00', params=b'', features_arra """ feature_index = None - if feature == FEATURE.ROOT: + if feature == C.FEATURE.ROOT: feature_index = b'\x00' else: if features_array is None: @@ -67,13 +70,13 @@ def request(handle, device, feature, function=b'\x00', params=b'', features_arra _l.log(_LOG_LEVEL, "(%d,%d) no features array available", handle, device) return None if feature in features_array: - feature_index = struct.pack('!B', features_array.index(feature)) + feature_index = _pack('!B', features_array.index(feature)) if feature_index is None: - _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, hexlify(feature), FEATURE_NAME[feature]) - raise FeatureNotSupported(device, feature) + _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, _hexlify(feature), C.FEATURE_NAME[feature]) + raise E.FeatureNotSupported(device, feature) - return base.request(handle, device, feature_index + function, params) + return _base.request(handle, device, feature_index + function, params) def ping(handle, device): @@ -93,13 +96,13 @@ def ping(handle, device): if reply_device != device: # oops - _l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, hexlify(reply_data)) - _unhandled_publish(reply_code, reply_device, reply_data) - return _status(base.read(handle)) + _l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, _hexlify(reply_data)) + _unhandled._publish(reply_code, reply_device, reply_data) + return _status(_base.read(handle)) if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4:5] == ping_marker): # ping ok - _l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, hexlify(reply_data)) + _l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, _hexlify(reply_data)) return True if (reply_code == 0x10 and reply_data[:2] == b'\x8F\x00'): @@ -111,19 +114,19 @@ def ping(handle, device): # some devices may reply with a SOLAR_CHARGE event before the # ping_ok reply, especially right after the device connected to the # receiver - _l.log(_LOG_LEVEL, "(%d,%d) ping: solar status %s", handle, device, hexlify(reply_data)) - _unhandled_publish(reply_code, reply_device, reply_data) - return _status(base.read(handle)) + _l.log(_LOG_LEVEL, "(%d,%d) ping: solar status [%s]", handle, device, _hexlify(reply_data)) + _unhandled._publish(reply_code, reply_device, reply_data) + return _status(_base.read(handle)) # ugh - _l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device: %d=[%s]", handle, device, reply[0], hexlify(reply[2])) - _unhandled_publish(reply_code, reply_device, reply_data) + _l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device: %d=[%s]", handle, device, reply_code, _hexlify(reply_data)) + _unhandled._publish(reply_code, reply_device, reply_data) return None _l.log(_LOG_LEVEL, "(%d,%d) pinging", handle, device) - base.write(handle, device, b'\x00\x10\x00\x00' + ping_marker) + _base.write(handle, device, b'\x00\x10\x00\x00' + ping_marker) # pings may take a while to reply success - return _status(base.read(handle, base.DEFAULT_TIMEOUT * 3)) + return _status(_base.read(handle, _base.DEFAULT_TIMEOUT * 3)) def find_device_by_name(handle, device_name): @@ -133,7 +136,7 @@ def find_device_by_name(handle, device_name): """ _l.log(_LOG_LEVEL, "(%d,) searching for device '%s'", handle, device_name) - for device in range(1, 1 + base.MAX_ATTACHED_DEVICES): + for device in range(1, 1 + _base.MAX_ATTACHED_DEVICES): features_array = get_device_features(handle, device) if features_array: d_name = get_device_name(handle, device, features_array) @@ -150,7 +153,7 @@ def list_devices(handle): devices = [] - for device in range(1, 1 + base.MAX_ATTACHED_DEVICES): + for device in range(1, 1 + _base.MAX_ATTACHED_DEVICES): features_array = get_device_features(handle, device) if features_array: devices.append(get_device_info(handle, device, features_array=features_array)) @@ -181,31 +184,31 @@ def get_feature_index(handle, device, feature): :returns: An int, or ``None`` if the feature is not available. """ - _l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, hexlify(feature), FEATURE_NAME[feature]) + _l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, _hexlify(feature), C.FEATURE_NAME[feature]) if len(feature) != 2: raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature) # FEATURE.ROOT should always be available for any attached devices - reply = base.request(handle, device, FEATURE.ROOT, feature) + reply = _base.request(handle, device, C.FEATURE.ROOT, feature) if reply: # only consider active and supported features feature_index = ord(reply[0:1]) if feature_index: feature_flags = ord(reply[1:2]) & 0xE0 - _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, hexlify(feature), FEATURE_NAME[feature], feature_index, feature_flags) + _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, _hexlify(feature), C.FEATURE_NAME[feature], feature_index, feature_flags) if feature_flags == 0: return feature_index if feature_flags & 0x80: - _l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, hexlify(feature), FEATURE_NAME[feature]) + _l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, _hexlify(feature), C.FEATURE_NAME[feature]) if feature_flags & 0x40: - _l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, hexlify(feature), FEATURE_NAME[feature]) + _l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, _hexlify(feature), C.FEATURE_NAME[feature]) if feature_flags & 0x20: - _l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, hexlify(feature), FEATURE_NAME[feature]) - raise FeatureNotSupported(device, feature) + _l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, _hexlify(feature), C.FEATURE_NAME[feature]) + raise E.FeatureNotSupported(device, feature) else: - _l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, hexlify(feature), FEATURE_NAME[feature]) - raise FeatureNotSupported(device, feature) + _l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, _hexlify(feature), C.FEATURE_NAME[feature]) + raise E.FeatureNotSupported(device, feature) def get_device_features(handle, device): @@ -218,7 +221,7 @@ def get_device_features(handle, device): # get the index of the FEATURE_SET # FEATURE.ROOT should always be available for all devices - fs_index = base.request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET) + fs_index = _base.request(handle, device, C.FEATURE.ROOT, C.FEATURE.FEATURE_SET) if fs_index is None: # _l.warn("(%d,%d) FEATURE_SET not available", handle, device) return None @@ -228,7 +231,7 @@ def get_device_features(handle, device): # even if unknown. # get the number of active features the device has - features_count = base.request(handle, device, fs_index + b'\x00') + features_count = _base.request(handle, device, fs_index + b'\x00') if not features_count: # this can happen if the device disappeard since the fs_index request # otherwise we should get at least a count of 1 (the FEATURE_SET we've just used above) @@ -238,17 +241,19 @@ def get_device_features(handle, device): features_count = ord(features_count[:1]) _l.log(_LOG_LEVEL, "(%d,%d) found %d features", handle, device, features_count) - # a device may have a maximum of 15 features, other than FEATURE.ROOT - features = [None] * 0x10 + features = [None] * 0x20 for index in range(1, 1 + features_count): # for each index, get the feature residing at that index - feature = base.request(handle, device, fs_index + b'\x10', struct.pack('!B', index)) + feature = _base.request(handle, device, fs_index + b'\x10', _pack('!B', index)) if feature: feature = feature[0:2].upper() features[index] = feature - _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, hexlify(feature), FEATURE_NAME[feature], index) + _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, _hexlify(feature), C.FEATURE_NAME[feature], index) - return None if all(c == None for c in features) else features + features[0] = C.FEATURE.ROOT + while features[-1] is None: + del features[-1] + return features def get_device_firmware(handle, device, features_array=None): @@ -259,35 +264,35 @@ def get_device_firmware(handle, device, features_array=None): def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None): return FirmwareInfo(level, type, name, version, build, extras) - fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array) + fw_count = request(handle, device, C.FEATURE.FIRMWARE, features_array=features_array) if fw_count: fw_count = ord(fw_count[:1]) fw = [] for index in range(0, fw_count): - index = struct.pack('!B', index) - fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=index, features_array=features_array) + index = _pack('!B', index) + fw_info = request(handle, device, C.FEATURE.FIRMWARE, function=b'\x10', params=index, features_array=features_array) if fw_info: fw_level = ord(fw_info[:1]) & 0x0F if fw_level == 0 or fw_level == 1: - fw_type = FIRMWARE_TYPE[fw_level] - name, = struct.unpack('!3s', fw_info[1:4]) + fw_type = C.FIRMWARE_TYPE[fw_level] + name, = _unpack('!3s', fw_info[1:4]) name = name.decode('ascii') version = ( chr(0x30 + (ord(fw_info[4:5]) >> 4)) + chr(0x30 + (ord(fw_info[4:5]) & 0x0F)) + '.' + chr(0x30 + (ord(fw_info[5:6]) >> 4)) + chr(0x30 + (ord(fw_info[5:6]) & 0x0F))) - build, = struct.unpack('!H', fw_info[6:8]) + build, = _unpack('!H', fw_info[6:8]) extras = fw_info[9:].rstrip(b'\x00') if extras: fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build, extras=extras) else: fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build) elif fw_level == 2: - fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPE[2], version=ord(fw_info[1:2])) + fw_info = _makeFirmwareInfo(level=2, type=C.FIRMWARE_TYPE[2], version=ord(fw_info[1:2])) else: - fw_info = _makeFirmwareInfo(level=fw_level, type=FIRMWARE_TYPE[-1]) + fw_info = _makeFirmwareInfo(level=fw_level, type=C.FIRMWARE_TYPE[-1]) fw.append(fw_info) _l.log(_LOG_LEVEL, "(%d:%d) firmware %s", handle, device, fw_info) @@ -301,11 +306,11 @@ def get_device_type(handle, device, features_array=None): :returns: a string describing the device type, or ``None`` if the device is not available or does not support the ``NAME`` feature. """ - d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array) + d_type = request(handle, device, C.FEATURE.NAME, function=b'\x20', features_array=features_array) if d_type: d_type = ord(d_type[:1]) - _l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPE[d_type]) - return DEVICE_TYPE[d_type] + _l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, device, d_type, C.DEVICE_TYPE[d_type]) + return C.DEVICE_TYPE[d_type] def get_device_name(handle, device, features_array=None): @@ -314,14 +319,14 @@ def get_device_name(handle, device, features_array=None): :returns: a string with the device name, or ``None`` if the device is not available or does not support the ``NAME`` feature. """ - name_length = request(handle, device, FEATURE.NAME, features_array=features_array) + name_length = request(handle, device, C.FEATURE.NAME, features_array=features_array) if name_length: name_length = ord(name_length[:1]) d_name = b'' while len(d_name) < name_length: - name_index = struct.pack('!B', len(d_name)) - name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', params=name_index, features_array=features_array) + name_index = _pack('!B', len(d_name)) + name_fragment = request(handle, device, C.FEATURE.NAME, function=b'\x10', params=name_index, features_array=features_array) name_fragment = name_fragment[:name_length - len(d_name)] d_name += name_fragment @@ -335,24 +340,24 @@ def get_device_battery_level(handle, device, features_array=None): :raises FeatureNotSupported: if the device does not support this feature. """ - battery = request(handle, device, FEATURE.BATTERY, features_array=features_array) + battery = request(handle, device, C.FEATURE.BATTERY, features_array=features_array) if battery: - discharge, dischargeNext, status = struct.unpack('!BBB', battery[:3]) - _l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSE[status]) - return (discharge, dischargeNext, BATTERY_STATUS[status]) + discharge, dischargeNext, status = _unpack('!BBB', battery[:3]) + _l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, C.BATTERY_STATUSE[status]) + return (discharge, dischargeNext, C.BATTERY_STATUS[status]) def get_device_keys(handle, device, features_array=None): - count = request(handle, device, FEATURE.REPROGRAMMABLE_KEYS, features_array=features_array) + count = request(handle, device, C.FEATURE.REPROGRAMMABLE_KEYS, features_array=features_array) if count: keys = [] count = ord(count[:1]) for index in range(0, count): - keyindex = struct.pack('!B', index) - keydata = request(handle, device, FEATURE.REPROGRAMMABLE_KEYS, function=b'\x10', params=keyindex, features_array=features_array) + keyindex = _pack('!B', index) + keydata = request(handle, device, C.FEATURE.REPROGRAMMABLE_KEYS, function=b'\x10', params=keyindex, features_array=features_array) if keydata: - key, key_task, flags = struct.unpack('!HHB', keydata[:5]) - keys.append(ReprogrammableKeyInfo(index, key, KEY_NAME[key], key_task, KEY_NAME[key_task], flags)) + key, key_task, flags = _unpack('!HHB', keydata[:5]) + keys.append(ReprogrammableKeyInfo(index, key, C.KEY_NAME[key], key_task, C.KEY_NAME[key_task], flags)) return keys diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index 1e1aa30f..6b57c5db 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -4,11 +4,11 @@ # import logging -import struct -from binascii import hexlify +from struct import pack as _pack +from binascii import hexlify as _hexlify -from .constants import * -from .exceptions import * +import constants as C +import exceptions as E from . import unhandled as _unhandled import hidapi as _hid @@ -92,9 +92,9 @@ def try_open(path): # any other replies are ignored, and will assume this is the wrong receiver device if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': # no idea what this is, but it comes up occasionally - _l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, hexlify(reply)) + _l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, _hexlify(reply)) else: - _l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, hexlify(reply)) + _l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, _hexlify(reply)) else: _l.log(_LOG_LEVEL, "[%s] (%d,) no reply", path, receiver_handle) @@ -146,16 +146,18 @@ def write(handle, device, data): data += b'\x00' * (_MIN_CALL_SIZE - 2 - len(data)) elif len(data) > _MIN_CALL_SIZE - 2: data += b'\x00' * (_MAX_CALL_SIZE - 2 - len(data)) - wdata = struct.pack('!BB', 0x10, device) + data - _l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, hexlify(wdata)) + wdata = _pack('!BB', 0x10, device) + data + + _l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, _hexlify(wdata)) if len(wdata) < _MIN_CALL_SIZE: - _l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, hexlify(wdata), len(wdata)) + _l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, _hexlify(wdata), len(wdata)) if len(wdata) > _MAX_CALL_SIZE: - _l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, hexlify(wdata), len(wdata)) + _l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, _hexlify(wdata), len(wdata)) + if not _hid.write(handle, wdata): _l.warn("(%d,%d) write failed, assuming receiver no longer available", handle, device) close(handle) - raise NoReceiver() + raise E.NoReceiver def read(handle, timeout=DEFAULT_TIMEOUT): @@ -178,17 +180,18 @@ def read(handle, timeout=DEFAULT_TIMEOUT): if data is None: _l.warn("(%d,*) read failed, assuming receiver no longer available", handle) close(handle) - raise NoReceiver + raise E.NoReceiver if data: - _l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, hexlify(data)) + _l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, _hexlify(data)) if len(data) < _MIN_REPLY_SIZE: - _l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, hexlify(data), len(data)) + _l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, _hexlify(data), len(data)) if len(data) > _MAX_REPLY_SIZE: - _l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, hexlify(data), len(data)) + _l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, _hexlify(data), len(data)) code = ord(data[:1]) device = ord(data[1:2]) return code, device, data[2:] + _l.log(_LOG_LEVEL, "(%d,*) => r[]", handle) @@ -209,13 +212,16 @@ def request(handle, device, feature_index_function, params=b'', features_array=N available. :raisees FeatureCallError: if the feature call replied with an error. """ - _l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, hexlify(feature_index_function), hexlify(params)) + _l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, _hexlify(feature_index_function), _hexlify(params)) if len(feature_index_function) != 2: - raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % hexlify(feature_index_function)) + raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hexlify(feature_index_function)) + + retries = 5 write(handle, device, feature_index_function + params) - while True: + while retries > 0: reply = read(handle) + retries -= 1 if not reply: # keep waiting... @@ -225,7 +231,7 @@ def request(handle, device, feature_index_function, params=b'', features_array=N if reply_device != device: # this message not for the device we're interested in - _l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, reply_device, hexlify(reply_data)) + _l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, reply_device, _hexlify(reply_data)) # worst case scenario, this is a reply for a concurrent request # on this receiver _unhandled._publish(reply_code, reply_device, reply_data) @@ -233,27 +239,27 @@ def request(handle, device, feature_index_function, params=b'', features_array=N if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:2] == feature_index_function: # device not present - _l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, hexlify(feature_index_function), hexlify(reply_data)) + _l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, _hexlify(feature_index_function), _hexlify(reply_data)) return None if reply_code == 0x10 and reply_data[:1] == b'\x8F': # device not present - _l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, hexlify(reply_data)) + _l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, _hexlify(reply_data)) return None if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function: # an error returned from the device error_code = ord(reply_data[3]) - _l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error_code, ERROR_NAME[error_code], hexlify(reply_data)) + _l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error_code, C.ERROR_NAME[error_code], _hexlify(reply_data)) feature_index = ord(feature_index_function[:1]) feature_function = feature_index_function[1:2] feature = None if features_array is None else features_array[feature_index] - raise FeatureCallError(device, feature, feature_index, feature_function, error_code, reply_data) + raise E.FeatureCallError(device, feature, feature_index, feature_function, error_code, reply_data) if reply_code == 0x11 and reply_data[:2] == feature_index_function: # a matching reply - _l.log(_LOG_LEVEL, "(%d,%d) matched reply with data [%s]", handle, device, hexlify(reply_data[2:])) + _l.log(_LOG_LEVEL, "(%d,%d) matched reply with feature-index-function [%s]", handle, device, _hexlify(reply_data[2:])) return reply_data[2:] - _l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, hexlify(reply_data[:2]), hexlify(feature_index_function)) + _l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, _hexlify(reply_data[:2]), _hexlify(feature_index_function)) _unhandled._publish(reply_code, reply_device, reply_data) diff --git a/lib/logitech/unifying_receiver/common.py b/lib/logitech/unifying_receiver/common.py index 10adaf05..fc6096af 100644 --- a/lib/logitech/unifying_receiver/common.py +++ b/lib/logitech/unifying_receiver/common.py @@ -3,7 +3,7 @@ # class FallbackDict(dict): - def __init__(self, fallback_function, *args, **kwargs): + def __init__(self, fallback_function=lambda x: None, *args, **kwargs): super(FallbackDict, self).__init__(*args, **kwargs) self.fallback = fallback_function @@ -18,7 +18,6 @@ def list2dict(values_list): return dict(zip(range(0, len(values_list)), values_list)) - from collections import namedtuple """Tuple returned by list_devices and find_device_by_name.""" diff --git a/lib/logitech/unifying_receiver/constants.py b/lib/logitech/unifying_receiver/constants.py index a952f5a8..a217a905 100644 --- a/lib/logitech/unifying_receiver/constants.py +++ b/lib/logitech/unifying_receiver/constants.py @@ -5,7 +5,7 @@ from binascii import hexlify as _hexlify from struct import pack as _pack -from .common import * +from .common import (FallbackDict, list2dict) """Possible features available on a Logitech device. @@ -21,7 +21,7 @@ FEATURE = type('FEATURE', (), NAME=b'\x00\x05', BATTERY=b'\x10\x00', REPROGRAMMABLE_KEYS=b'\x1B\x00', - WIRELESS_STATUS=b'\x1D\x4B', + WIRELESS=b'\x1D\x4B', SOLAR_CHARGE=b'\x43\x01', )) @@ -41,7 +41,7 @@ FEATURE_NAME[FEATURE.FIRMWARE] = 'FIRMWARE' FEATURE_NAME[FEATURE.NAME] = 'NAME' FEATURE_NAME[FEATURE.BATTERY] = 'BATTERY' FEATURE_NAME[FEATURE.REPROGRAMMABLE_KEYS] = 'REPROGRAMMABLE_KEYS' -FEATURE_NAME[FEATURE.WIRELESS_STATUS] = 'WIRELESS_STATUS' +FEATURE_NAME[FEATURE.WIRELESS] = 'WIRELESS' FEATURE_NAME[FEATURE.SOLAR_CHARGE] = 'SOLAR_CHARGE' @@ -54,7 +54,7 @@ DEVICE_TYPE = FallbackDict(lambda x: 'unknown', list2dict(_DEVICE_TYPES)) _FIRMWARE_TYPES = ('Main (HID)', 'Bootloader', 'Hardware', 'Other') -"""Names of different firmware levels possible, ordered from top to bottom.""" +"""Names of different firmware levels possible, indexed by level.""" FIRMWARE_TYPE = FallbackDict(lambda x: 'Unknown', list2dict(_FIRMWARE_TYPES)) @@ -74,7 +74,7 @@ _KEY_NAMES = ( 'unknown_0000', 'Volume up', 'Volume down', 'Mute', 'Play/Pause', KEY_NAME = FallbackDict(lambda x: 'unknown_%04x' % x, list2dict(_KEY_NAMES)) """Possible flags on a reprogrammable key.""" -KEY_FLAG = type('REPROGRAMMABLE_KEY_FLAGS', (), dict( +KEY_FLAG = type('KEY_FLAG', (), dict( REPROGRAMMABLE=0x10, FN_SENSITIVE=0x08, NONSTANDARD=0x04, @@ -82,9 +82,20 @@ KEY_FLAG = type('REPROGRAMMABLE_KEY_FLAGS', (), dict( MSE=0x01, )) +KEY_FLAG_NAME = FallbackDict(lambda x: 'unknown') +KEY_FLAG_NAME[KEY_FLAG.REPROGRAMMABLE] = 'reprogrammable' +KEY_FLAG_NAME[KEY_FLAG.FN_SENSITIVE] = 'fn-sensitive' +KEY_FLAG_NAME[KEY_FLAG.NONSTANDARD] = 'nonstandard' +KEY_FLAG_NAME[KEY_FLAG.IS_FN] = 'is-fn' +KEY_FLAG_NAME[KEY_FLAG.MSE] = 'mse' + _ERROR_NAMES = ('Ok', 'Unknown', 'Invalid argument', 'Out of range', 'Hardware error', 'Logitech internal', 'Invalid feature index', 'Invalid function', 'Busy', 'Unsupported') """Names for error codes.""" ERROR_NAME = FallbackDict(lambda x: 'Unknown error', list2dict(_ERROR_NAMES)) + + +del FallbackDict +del list2dict diff --git a/lib/logitech/unifying_receiver/exceptions.py b/lib/logitech/unifying_receiver/exceptions.py index 19a46443..77e71469 100644 --- a/lib/logitech/unifying_receiver/exceptions.py +++ b/lib/logitech/unifying_receiver/exceptions.py @@ -2,8 +2,7 @@ # Exceptions that may be raised by this API. # -from .constants import FEATURE_NAME as _FEATURE_NAME -from .constants import ERROR_NAME as _ERROR_NAME +from . import constants as C class NoReceiver(Exception): @@ -17,21 +16,21 @@ class NoReceiver(Exception): class FeatureNotSupported(Exception): """Raised when trying to request a feature not supported by the device.""" def __init__(self, device, feature): - super(FeatureNotSupported, self).__init__(device, feature, _FEATURE_NAME[feature]) + super(FeatureNotSupported, self).__init__(device, feature, C.FEATURE_NAME[feature]) self.device = device self.feature = feature - self.feature_name = _FEATURE_NAME[feature] + self.feature_name = C.FEATURE_NAME[feature] class FeatureCallError(Exception): """Raised if the device replied to a feature call with an error.""" def __init__(self, device, feature, feature_index, feature_function, error_code, data=None): - super(FeatureCallError, self).__init__(device, feature, feature_index, feature_function, error_code, _ERROR_NAME[error_code]) + super(FeatureCallError, self).__init__(device, feature, feature_index, feature_function, error_code, C.ERROR_NAME[error_code]) self.device = device self.feature = feature - self.feature_name = None if feature is None else _FEATURE_NAME[feature] + self.feature_name = None if feature is None else C.FEATURE_NAME[feature] self.feature_index = feature_index self.feature_function = feature_function self.error_code = error_code - self.error_string = _ERROR_NAME[error_code] + self.error_string = C.ERROR_NAME[error_code] self.data = data diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py index 28c52113..23eabf39 100644 --- a/lib/logitech/unifying_receiver/listener.py +++ b/lib/logitech/unifying_receiver/listener.py @@ -4,11 +4,12 @@ import logging import threading -from time import sleep -from binascii import hexlify +from time import sleep as _sleep +# from binascii import hexlify as _hexlify -from . import base -from .exceptions import * +from . import base as _base +from . import exceptions as E +# from . import unhandled as _unhandled _LOG_LEVEL = 6 @@ -16,7 +17,7 @@ _l = logging.getLogger('logitech.unifying_receiver.listener') _READ_EVENT_TIMEOUT = 90 # ms -_IDLE_SLEEP = 900 # ms +_IDLE_SLEEP = 950 # ms class EventsListener(threading.Thread): @@ -34,6 +35,7 @@ class EventsListener(threading.Thread): def __init__(self, receiver, events_callback): super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + hex(receiver)) self.daemon = True + self.active = False self.receiver = receiver self.callback = events_callback @@ -45,13 +47,17 @@ class EventsListener(threading.Thread): self.task_done = threading.Event() def run(self): - _l.log(_LOG_LEVEL, "(%d) starting", self.receiver) self.active = True + _l.log(_LOG_LEVEL, "(%d) starting", self.receiver) + + # last_hook = _unhandled.hook + # _unhandled.hook = self.callback + while self.active: try: # _l.log(_LOG_LEVEL, "(%d) reading next event", self.receiver) - event = base.read(self.receiver, _READ_EVENT_TIMEOUT) - except NoReceiver: + event = _base.read(self.receiver, _READ_EVENT_TIMEOUT) + except E.NoReceiver: _l.warn("(%d) receiver disconnected", self.receiver) self.active = False break @@ -62,11 +68,13 @@ class EventsListener(threading.Thread): self.callback.__call__(*event) elif self.task is None: # _l.log(_LOG_LEVEL, "(%d) idle sleep", self.receiver) - sleep(_IDLE_SLEEP / 1000.0) + _sleep(_IDLE_SLEEP / 1000.0) else: self.task_reply = self._make_request(*self.task) self.task_done.set() + # _unhandled.hook = last_hook + def stop(self): """Tells the listener to stop as soon as possible.""" _l.log(_LOG_LEVEL, "(%d) stopping", self.receiver) @@ -88,7 +96,7 @@ class EventsListener(threading.Thread): self.task = self.task_reply = None self.task_processing.release() - # _l.log(_LOG_LEVEL, "(%d) request '%s.%s' => [%s]", self.receiver, api_function.__module__, api_function.__name__, hexlify(reply)) + # _l.log(_LOG_LEVEL, "(%d) request '%s.%s' => [%s]", self.receiver, api_function.__module__, api_function.__name__, _hexlify(reply)) if isinstance(reply, Exception): raise reply return reply @@ -97,7 +105,7 @@ class EventsListener(threading.Thread): _l.log(_LOG_LEVEL, "(%d) calling '%s.%s' with %s, %s", self.receiver, api_function.__module__, api_function.__name__, args, kwargs) try: return api_function.__call__(self.receiver, *args, **kwargs) - except NoReceiver as nr: + except E.NoReceiver as nr: self.task_reply = nr self.active = False except Exception as e: diff --git a/lib/logitech/unifying_receiver/tests/test_10_constants.py b/lib/logitech/unifying_receiver/tests/test_10_constants.py index cd56fcf8..57f7e34c 100644 --- a/lib/logitech/unifying_receiver/tests/test_10_constants.py +++ b/lib/logitech/unifying_receiver/tests/test_10_constants.py @@ -5,7 +5,7 @@ import unittest import struct -from logitech.unifying_receiver.constants import * +from ..constants import * class Test_UR_Constants(unittest.TestCase): diff --git a/lib/logitech/unifying_receiver/tests/test_30_base.py b/lib/logitech/unifying_receiver/tests/test_30_base.py index a5811449..4d5c282c 100644 --- a/lib/logitech/unifying_receiver/tests/test_30_base.py +++ b/lib/logitech/unifying_receiver/tests/test_30_base.py @@ -5,10 +5,10 @@ import unittest from binascii import hexlify -from logitech.unifying_receiver import base -from logitech.unifying_receiver.exceptions import * -from logitech.unifying_receiver.constants import * -from logitech.unifying_receiver.unhandled import * +from .. import base +from ..exceptions import * +from ..constants import * +from .. import unhandled class Test_UR_Base(unittest.TestCase): @@ -157,15 +157,15 @@ class Test_UR_Base(unittest.TestCase): global received_unhandled received_unhandled = (code, device, data) - # set_unhandled_hook(_unhandled) + unhandled.hook = _unhandled base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET) reply = base.request(self.handle, self.device, fs_index + b'\x00') self.assertIsNotNone(reply, "request returned None reply") self.assertNotEquals(reply[:1], b'\x00') - # self.assertIsNotNone(received_unhandled, "extra message not received by unhandled hook") + self.assertIsNotNone(received_unhandled, "extra message not received by unhandled hook") received_unhandled = None - # set_unhandled_hook() + unhandled.hook = None base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET) reply = base.request(self.handle, self.device, fs_index + b'\x00') self.assertIsNotNone(reply, "request returned None reply") diff --git a/lib/logitech/unifying_receiver/tests/test_50_api.py b/lib/logitech/unifying_receiver/tests/test_50_api.py index 7bb39383..8b5e347b 100644 --- a/lib/logitech/unifying_receiver/tests/test_50_api.py +++ b/lib/logitech/unifying_receiver/tests/test_50_api.py @@ -5,9 +5,9 @@ import unittest import warnings -from logitech.unifying_receiver import api -from logitech.unifying_receiver.exceptions import * -from logitech.unifying_receiver.constants import * +from .. import api +from ..constants import * +from ..common import * class Test_UR_API(unittest.TestCase): @@ -45,7 +45,7 @@ class Test_UR_API(unittest.TestCase): devices = [] - for device in range(1, 1 + api.base.MAX_ATTACHED_DEVICES): + for device in range(1, 1 + api._base.MAX_ATTACHED_DEVICES): ok = api.ping(self.handle, device) self.assertIsNotNone(ok, "invalid ping reply") if ok: @@ -98,7 +98,7 @@ class Test_UR_API(unittest.TestCase): self.assertIsNotNone(d_firmware, "failed to get device firmware") self.assertGreater(len(d_firmware), 0, "device reported no firmware") for fw in d_firmware: - self.assertIsInstance(fw, api.FirmwareInfo) + self.assertIsInstance(fw, FirmwareInfo) def test_52_get_device_type(self): if self.handle is None: @@ -134,7 +134,7 @@ class Test_UR_API(unittest.TestCase): device_info = api.get_device_info(self.handle, self.device, features_array=self.features_array) self.assertIsNotNone(device_info, "failed to read full device info") - self.assertIsInstance(device_info, api.AttachedDeviceInfo) + self.assertIsInstance(device_info, AttachedDeviceInfo) Test_UR_API.device_info = device_info def test_60_get_battery_level(self): @@ -160,7 +160,7 @@ class Test_UR_API(unittest.TestCase): if all_devices: self.assertIsNotNone(self.device) for device_info in all_devices: - self.assertIsInstance(device_info, api.AttachedDeviceInfo) + self.assertIsInstance(device_info, AttachedDeviceInfo) else: self.assertIsNone(self.device) diff --git a/lib/logitech/unifying_receiver/unhandled.py b/lib/logitech/unifying_receiver/unhandled.py index acc1cf15..0b5b151e 100644 --- a/lib/logitech/unifying_receiver/unhandled.py +++ b/lib/logitech/unifying_receiver/unhandled.py @@ -4,39 +4,33 @@ # import logging -from binascii import hexlify - -_l = logging.getLogger('logitech.unifying_receiver.unhandled') +from binascii import hexlify as _hexlify -def _logging_unhandled_hook(reply_code, device, data): +def _logdebug_hook(reply_code, device, data): """Default unhandled hook, logs the reply as DEBUG.""" - _l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, hexlify(data)) + _l = logging.getLogger('logitech.unifying_receiver.unhandled') + _l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, _hexlify(data)) -_unhandled_hook = _logging_unhandled_hook +"""The function that will be called on unhandled incoming events. + +The hook must be a function with the signature: ``_(int, int, str)``, where +the parameters are: (reply code, device number, data). + +This hook will only be called by the request() function, when it receives +replies that do not match the requested feature call. As such, it is not +suitable for intercepting broadcast events from the device (e.g. special +keys being pressed, battery charge events, etc), at least not in a timely +manner. However, these events *may* be delivered here if they happen while +doing a feature call to the device. + +The default implementation logs the unhandled reply as DEBUG. +""" +hook = _logdebug_hook def _publish(reply_code, device, data): """Delivers a reply to the unhandled hook, if any.""" - if _unhandled_hook is not None: - _unhandled_hook.__call__(reply_code, device, data) - - -def set_unhandled_hook(hook=None): - """Sets the function that will be called on unhandled incoming events. - - The hook must be a function with the signature: ``_(int, int, str)``, where - the parameters are: (reply code, device number, data). - - This hook will only be called by the request() function, when it receives - replies that do not match the requested feature call. As such, it is not - suitable for intercepting broadcast events from the device (e.g. special - keys being pressed, battery charge events, etc), at least not in a timely - manner. However, these events *may* be delivered here if they happen while - doing a feature call to the device. - - The default implementation logs the unhandled reply as DEBUG. - """ - global _unhandled_hook - _unhandled_hook = hook + if hook is not None: + hook.__call__(reply_code, device, data) diff --git a/solaar b/solaar index ee39be5a..2d96e6fb 100755 --- a/solaar +++ b/solaar @@ -2,7 +2,8 @@ cd `dirname "$0"` -export LD_LIBRARY_PATH=$PWD/lib -export PYTHONPATH=$PWD/lib +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD/lib +export PYTHONPATH=$PWD:$PWD/lib exec python -OO solaar.py "$@" +#exec python -OO -m profile -o $TMPDIR/profile.log solaar.py "$@" diff --git a/solaar.py b/solaar.py index 2d240d4b..51d86ca9 100644 --- a/solaar.py +++ b/solaar.py @@ -1,267 +1,13 @@ #!/usr/bin/env python -__version__ = '0.3' +__version__ = '0.4' # # # import logging -import time -import threading - -from gi.repository import GObject -from gi.repository import Gtk - -from logitech.unifying_receiver import api as ur -from logitech.unifying_receiver.listener import EventsListener - -from logitech.devices import * - -# -# A few constants -# - -APP_TITLE = 'Solaar' -UNIFYING_RECEIVER = 'Unifying Receiver' -NO_DEVICES = 'No devices attached.' -NO_RECEIVER = 'Unifying Receiver not found.' -FOUND_RECEIVER = 'Unifying Receiver found.' - -STATUS_TIMEOUT = 61 # seconds -ICON_UPDATE_SLEEP = 7 # seconds - -# -# Optional desktop notifications. -# - -try: - import notify2 - notify2.init(APP_TITLE) - - _notifications = {} - _ICONS = {} - import os.path - - def notify_desktop(status, title, text, icon=None): - logging.debug("notify_desktop [%d] %s: %s", status, title, text) - def _icon_path(name): - path = os.path.join(__file__, '..', 'images', name + '.png') - path = os.path.abspath(os.path.normpath(path)) - return path if os.path.isfile(path) else None - - if icon is None: - icon = title - if icon in _ICONS: - path = _ICONS[icon] - else: - _ICONS[icon] = path = _icon_path(icon) - if path: - icon = path - if icon is None: - icon = 'error' if status < 0 else 'info' - - if title in _notifications: - notification = _notifications[title] - else: - _notifications[title] = notification = notify2.Notification(title, icon=icon) - notification.set_category(APP_TITLE) - - notification.set_urgency(notify2.URGENCY_CRITICAL if status < 0 else notify2.URGENCY_NORMAL) - notification.update(title, text, icon) - notification.show() - - def clear_notifications(): - all(n.close() for n in list(_notifications.values())) - notify2.uninit() - _notifications.clear() - _ICONS.clear() - -except ImportError: - logging.warn("python-notify2 not found, desktop notifications are disabled") - def notify_desktop(status, title, text, icon=None): pass - def clear_notifications(): pass - -# -# -# - -class StatusThread(threading.Thread): - def __init__(self, status_icon): - super(StatusThread, self).__init__(name='StatusThread') - self.daemon = True - self.status_icon = status_icon - - self.last_receiver_status = None - self.listener = None - self.devices = {} - self.statuses = {} - - def run(self): - self.active = True - while self.active: - if self.listener is None: - receiver = ur.open() - if receiver: - for devinfo in ur.list_devices(receiver): - self.devices[devinfo.number] = devinfo - self.listener = EventsListener(receiver, self.events_callback) - self.listener.start() - logging.info(str(self.listener)) - notify_desktop(1, UNIFYING_RECEIVER, FOUND_RECEIVER) - self.last_receiver_status = 1 - else: - if self.last_receiver_status != -1: - notify_desktop(-1, UNIFYING_RECEIVER, NO_RECEIVER) - self.last_receiver_status = -1 - elif not self.listener.active: - logging.info(str(self.listener)) - self.listener = None - self.devices.clear() - self.statuses.clear() - notify_desktop(-1, UNIFYING_RECEIVER, NO_RECEIVER) - self.last_receiver_status = -1 - - if self.active: - update_icon = True - if self.listener and self.devices: - update_icon &= self.update_old_statuses() - - if self.active: - if update_icon: - GObject.idle_add(self.update_status_icon) - time.sleep(ICON_UPDATE_SLEEP) - - def stop(self): - self.active = False - if self.listener: - self.listener.stop() - ur.close(self.listener.receiver) - - def update_old_statuses(self): - updated = False - - for devinfo in list(self.devices.values()): - if devinfo.number in self.statuses: - last_status_time = self.statuses[devinfo.number][0] - if time.time() - last_status_time > STATUS_TIMEOUT: - status = request_status(devinfo, self.listener) - updated |= self.device_status_changed(devinfo, status) - else: - self.statuses[devinfo.number] = [0, None, None] - updated |= self.device_status_changed(devinfo, (DEVICE_STATUS.CONNECTED, None)) - - return updated - - def events_callback(self, code, device, data): - updated = False - - if device in self.devices: - devinfo = self.devices[device] - if code == 0x10 and data[0] == 'b\x8F': - updated = True - self.device_status_changed(devinfo, DEVICE_STATUS.UNAVAILABLE) - elif code == 0x11: - status = process_event(devinfo, self.listener, data) - updated |= self.device_status_changed(devinfo, status) - else: - logging.warn("unknown event code %02x", code) - elif device: - logging.debug("got event (%d, %d, %s) for new device", code, device, data) - devinfo = ur.get_device_info(self.listener.receiver, device) - if devinfo: - self.devices[device] = devinfo - self.statuses[device] = [0, None, None] - updated |= self.device_status_changed(devinfo, (DEVICE_STATUS.CONNECTED, None)) - # else: - # logging.warn("got event (%d, %d, %s) for unknown device", code, device, data) - else: - logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data) - - if updated: - GObject.idle_add(self.update_status_icon) - - def device_status_changed(self, devinfo, status): - if status is None or devinfo.number not in self.statuses: - return False - - if type(status) == int: - status_code = status - status_text = DEVICE_STATUS_NAME[status_code] - else: - status_code = status[0] - status_text = DEVICE_STATUS_NAME[status_code] if status[1] is None else status[1] - - device_status = self.statuses[devinfo.number] - old_status_code = device_status[1] - - device_status[0] = time.time() - device_status[1] = status_code - device_status[2] = status_text - - if old_status_code == status_code: - # the device has been missing twice in a row (1 to 2 minutes), so - # forget about it - if status_code == DEVICE_STATUS.UNAVAILABLE: - del self.devices[devinfo.number] - del self.statuses[devinfo.number] - else: - logging.debug("device status changed from %s => %s: %s", old_status_code, status_code, status_text) - notify_desktop(status_code, devinfo.name, status_text) - - return True - - def update_status_icon(self): - if self.listener and self.listener.active: - if self.devices: - all_statuses = [] - for d in self.devices: - devinfo = self.devices[d] - status_text = self.statuses[d][2] - if status_text: - if ' ' in status_text: - all_statuses.append(devinfo.name) - all_statuses.append(' ' + status_text) - else: - all_statuses.append(devinfo.name + ' ' + status_text) - else: - all_statuses.append(devinfo.name) - tooltip = '\n'.join(all_statuses) - else: - tooltip = NO_DEVICES - else: - tooltip = NO_RECEIVER - - self.status_icon.set_tooltip_text(tooltip) - - def activate_icon(self, _): - if self.listener and self.listener.active: - if self.devices: - for devinfo in list(self.devices.values()): - _, status_code, status_text = self.statuses[devinfo.number] - notify_desktop(status_code, devinfo.name, status_text) - else: - notify_desktop(1, UNIFYING_RECEIVER, NO_DEVICES) - self.last_receiver_status = 1 - else: - notify_desktop(-1, UNIFYING_RECEIVER, NO_RECEIVER) - self.last_receiver_status = -1 - - - -def show_icon_menu(icon, button, time, status_thread): - menu = Gtk.Menu() - - status_item = Gtk.MenuItem('Status') - status_item.connect('activate', status_thread.activate_icon) - menu.append(status_item) - - quit_item = Gtk.MenuItem('Quit') - quit_item.connect('activate', Gtk.main_quit) - menu.append(quit_item) - - menu.show_all() - menu.popup(None, None, icon.position_menu, icon, button, time) +import os.path if __name__ == '__main__': @@ -272,21 +18,10 @@ if __name__ == '__main__': args = arg_parser.parse_args() log_level = logging.root.level - 10 * args.verbose - logging.root.setLevel(log_level if log_level > 0 else 1) + logging.basicConfig(level=log_level if log_level > 0 else 1) - status_icon = Gtk.StatusIcon.new_from_file('images/' + UNIFYING_RECEIVER + '.png') - status_icon.set_title(APP_TITLE) - status_icon.set_name(APP_TITLE) - status_icon.set_tooltip_text('Searching...') - notify_desktop(0, UNIFYING_RECEIVER, 'Searching...') + images_path = os.path.join(__file__, '..', 'images') + images_path = os.path.abspath(os.path.normpath(images_path)) - GObject.threads_init() - status_thread = StatusThread(status_icon) - status_thread.start() - - status_icon.connect('popup_menu', show_icon_menu, status_thread) - status_icon.connect('activate', status_thread.activate_icon) - Gtk.main() - - status_thread.stop() - clear_notifications() + import app + app.run(images_path)