From 4100e8c62548dc2ef9a3436eed0d97cb34ad716a Mon Sep 17 00:00:00 2001 From: Daniel Pavel Date: Fri, 28 Sep 2012 00:59:03 +0300 Subject: [PATCH] small clean-ups --- lib/logitech/devices/constants.py | 7 +- lib/logitech/devices/k750.py | 32 +++-- lib/logitech/unifying_receiver/api.py | 7 +- lib/logitech/unifying_receiver/base.py | 4 - lib/logitech/unifying_receiver/constants.py | 1 - lib/logitech/unifying_receiver/exceptions.py | 1 - solaar.py | 131 ++++++++++++------- 7 files changed, 107 insertions(+), 76 deletions(-) diff --git a/lib/logitech/devices/constants.py b/lib/logitech/devices/constants.py index 6a71da53..c9344f74 100644 --- a/lib/logitech/devices/constants.py +++ b/lib/logitech/devices/constants.py @@ -2,21 +2,20 @@ # # - DEVICE_STATUS = type('DEVICE_STATUS', (), dict( UNKNOWN=None, UNAVAILABLE=-1, CONNECTED=0, - ACTIVE=1, + # ACTIVE=1, )) from collections import defaultdict DEVICE_STATUS_NAME = defaultdict(lambda x: None) -DEVICE_STATUS_NAME[DEVICE_STATUS.UNAVAILABLE] = 'not available' +DEVICE_STATUS_NAME[DEVICE_STATUS.UNAVAILABLE] = 'disconnected' DEVICE_STATUS_NAME[DEVICE_STATUS.CONNECTED] = 'connected' -DEVICE_STATUS_NAME[DEVICE_STATUS.ACTIVE] = 'active' +# DEVICE_STATUS_NAME[DEVICE_STATUS.ACTIVE] = 'active' del defaultdict diff --git a/lib/logitech/devices/k750.py b/lib/logitech/devices/k750.py index e500f925..b3b0869f 100644 --- a/lib/logitech/devices/k750.py +++ b/lib/logitech/devices/k750.py @@ -8,17 +8,20 @@ import struct from ..unifying_receiver import api as _api from .constants import * - # # # NAME = 'Wireless Solar Keyboard K750' -# -# -# +_STATUS_NAMES = ('excellent', 'good', 'okay', 'poor') +_CHARGE_LIMITS = (75, 40, 20, -1) +_LIGHTING_LIMITS = (450, 310, 190, -1) + +# +# +# def _trigger_solar_charge_events(receiver, devinfo): return _api.request(receiver, devinfo.number, @@ -26,12 +29,6 @@ def _trigger_solar_charge_events(receiver, devinfo): features_array=devinfo.features_array) -_STATUS_NAMES = ('excellent', 'good', 'okay', 'poor') - -_CHARGE_LIMITS = (75, 40, 20, -1) -_LIGHTING_LIMITS = (450, 310, 190, -1) - - def _charge_status(data): charge, lux = struct.unpack('!BH', data[2:5]) @@ -52,8 +49,17 @@ def _charge_status(data): def request_status(devinfo, listener): - reply = listener.request(_trigger_solar_charge_events, devinfo) - if reply is None: + # Constantly requesting the solar charge status triggers a flood of events, + # which appear to drain the battery rather fast. + # Instead, ping the device for on/off status, and only ask for solar charge + # status when the user presses the solar key on the keyboard. + # + # reply = listener.request(_trigger_solar_charge_events, devinfo) + # if reply is None: + # return DEVICE_STATUS.UNAVAILABLE + + reply = listener.request(_api.ping, devinfo.number) + if not reply: return DEVICE_STATUS.UNAVAILABLE @@ -64,8 +70,10 @@ def process_event(devinfo, listener, data): logging.debug("Keyboard just started") return DEVICE_STATUS.CONNECTED elif data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': + # usually sent after the keyboard is turned on return _charge_status(data) elif data[:2] == b'\x09\x10' and data[7:11] == b'GOOD': + # regular solar charge events return _charge_status(data) elif data[:2] == b'\x09\x20' and data[7:11] == b'GOOD': logging.debug("Solar key pressed") diff --git a/lib/logitech/unifying_receiver/api.py b/lib/logitech/unifying_receiver/api.py index d396dcf7..0fabe89b 100644 --- a/lib/logitech/unifying_receiver/api.py +++ b/lib/logitech/unifying_receiver/api.py @@ -15,12 +15,10 @@ from .unhandled import _publish as _unhandled_publish _LOG_LEVEL = 5 _l = logging.getLogger('logitech.unifying_receiver.api') - # # # - from collections import namedtuple """Tuple returned by list_devices and find_device_by_name.""" @@ -45,7 +43,6 @@ def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=N del namedtuple - # # # @@ -249,8 +246,8 @@ def get_device_features(handle, device): # get the index of the FEATURE_SET # FEATURE.ROOT should always be available for all devices fs_index = base.request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET) - if not fs_index: - _l.warn("(%d,%d) FEATURE_SET not available", handle, device) + if fs_index is None: + # _l.warn("(%d,%d) FEATURE_SET not available", handle, device) return None fs_index = fs_index[:1] diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index 9f34388c..6984cc7f 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -17,13 +17,11 @@ import hidapi as _hid _LOG_LEVEL = 4 _l = logging.getLogger('logitech.unifying_receiver.base') - # # These values are defined by the Logitech documentation. # Overstepping these boundaries will only produce log warnings. # - """Minimim lenght of a feature call packet.""" _MIN_CALL_SIZE = 7 @@ -47,12 +45,10 @@ DEFAULT_TIMEOUT = 1000 """Maximum number of devices attached to a UR.""" MAX_ATTACHED_DEVICES = 6 - # # # - def list_receiver_devices(): """List all the Linux devices exposed by the UR attached to the machine.""" # (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver') diff --git a/lib/logitech/unifying_receiver/constants.py b/lib/logitech/unifying_receiver/constants.py index 47e0b2f5..539740f4 100644 --- a/lib/logitech/unifying_receiver/constants.py +++ b/lib/logitech/unifying_receiver/constants.py @@ -2,7 +2,6 @@ # Constants used by the rest of the API. # - """Possible features available on a Logitech device. A particular device might not support all these features, and may support other diff --git a/lib/logitech/unifying_receiver/exceptions.py b/lib/logitech/unifying_receiver/exceptions.py index 8a2708d2..52b25bce 100644 --- a/lib/logitech/unifying_receiver/exceptions.py +++ b/lib/logitech/unifying_receiver/exceptions.py @@ -2,7 +2,6 @@ # Exceptions that may be raised by this API. # - from .constants import FEATURE_NAME as _FEATURE_NAME from .constants import ERROR_NAME as _ERROR_NAME diff --git a/solaar.py b/solaar.py index fc44d7b3..c8a35d4d 100644 --- a/solaar.py +++ b/solaar.py @@ -1,5 +1,11 @@ #!/usr/bin/env python +__version__ = '0.3' + +# +# +# + import logging import time import threading @@ -12,72 +18,73 @@ from logitech.unifying_receiver.listener import EventsListener from logitech.devices import * - # # A few constants # - APP_TITLE = 'Solaar' UNIFYING_RECEIVER = 'Unifying Receiver' NO_DEVICES = 'No devices attached.' NO_RECEIVER = 'Unifying Receiver not found.' -FOUND_RECEIVER = 'Unifying Receiver detected.' +FOUND_RECEIVER = 'Unifying Receiver found.' STATUS_TIMEOUT = 31 # seconds ICON_UPDATE_SLEEP = 7 # seconds - # +# Optional desktop notifications. # -# - try: import notify2 notify2.init(APP_TITLE) _notifications = {} - import os.path _ICONS = {} + import os.path def notify_desktop(status, title, text, icon=None): + logging.debug("notify_desktop [%d] %s: %s", status, title, text) def _icon_path(name): path = os.path.join(__file__, '..', 'images', name + '.png') - path = os.path.normpath(path) - path = os.path.abspath(path) + path = os.path.abspath(os.path.normpath(path)) return path if os.path.isfile(path) else None if icon is None: icon = title - if icon not in _ICONS: - _ICONS[icon] = _icon_path(icon) - icon_path = _ICONS[icon] - if icon_path: - icon = icon_path + if icon in _ICONS: + path = _ICONS[icon] + else: + _ICONS[icon] = path = _icon_path(icon) + if path: + icon = path if icon is None: icon = 'error' if status < 0 else 'info' if title in _notifications: notification = _notifications[title] else: - notification = notify2.Notification(title, icon=icon) + _notifications[title] = notification = notify2.Notification(title, icon=icon) notification.set_category(APP_TITLE) - _notifications[title] = notification notification.set_urgency(notify2.URGENCY_CRITICAL if status < 0 else notify2.URGENCY_NORMAL) notification.update(title, text, icon) notification.show() + + def clear_notifications(): + all(n.close() for n in list(_notifications.values())) + notify2.uninit() + _notifications.clear() + _ICONS.clear() + except ImportError: - def notify_desktop(status, title, text, icon=None): - pass - + def notify_desktop(status, title, text, icon=None): pass + def clear_notifications(): pass # # # - class StatusThread(threading.Thread): def __init__(self, status_icon): super(StatusThread, self).__init__(name='StatusThread') @@ -98,8 +105,8 @@ class StatusThread(threading.Thread): for devinfo in ur.list_devices(receiver): self.devices[devinfo.number] = devinfo self.listener = EventsListener(receiver, self.events_callback) - logging.info("started events listener %s", self.listener) self.listener.start() + logging.info(str(self.listener)) notify_desktop(1, UNIFYING_RECEIVER, FOUND_RECEIVER) self.last_receiver_status = 1 else: @@ -107,7 +114,7 @@ class StatusThread(threading.Thread): notify_desktop(-1, UNIFYING_RECEIVER, NO_RECEIVER) self.last_receiver_status = -1 elif not self.listener.active: - logging.info("events listener %s stopped", self.listener) + logging.info(str(self.listener)) self.listener = None self.devices.clear() self.statuses.clear() @@ -119,10 +126,9 @@ class StatusThread(threading.Thread): if self.listener and self.devices: update_icon &= self.update_old_statuses() - if self.active and update_icon: - GObject.idle_add(self.update_status_icon) - if self.active: + if update_icon: + GObject.idle_add(self.update_status_icon) time.sleep(ICON_UPDATE_SLEEP) def stop(self): @@ -134,14 +140,15 @@ class StatusThread(threading.Thread): def update_old_statuses(self): updated = False - for devinfo in self.devices.values(): - if devinfo.number not in self.statuses: + for devinfo in list(self.devices.values()): + if devinfo.number in self.statuses: + last_status_time = self.statuses[devinfo.number][0] + if time.time() - last_status_time > STATUS_TIMEOUT: + status = request_status(devinfo, self.listener) + updated |= self.device_status_changed(devinfo, status) + else: self.statuses[devinfo.number] = [0, None, None] - - last_status_time = self.statuses[devinfo.number][0] - if time.time() - last_status_time > STATUS_TIMEOUT: - status = request_status(devinfo, self.listener) - updated |= self.device_status_changed(devinfo, status) + updated |= self.device_status_changed(devinfo, (DEVICE_STATUS.CONNECTED, None)) return updated @@ -164,8 +171,9 @@ class StatusThread(threading.Thread): if devinfo: self.devices[device] = devinfo self.statuses[device] = [0, None, None] - else: - logging.warn("got event (%d, %d, %s) for unknown device", code, device, data) + updated |= self.device_status_changed(devinfo, (DEVICE_STATUS.CONNECTED, None)) + # else: + # logging.warn("got event (%d, %d, %s) for unknown device", code, device, data) else: logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data) @@ -197,39 +205,64 @@ class StatusThread(threading.Thread): return True def update_status_icon(self): - if self.listener: - all_statuses = [] - for d in self.devices: - devinfo = self.devices[d] - status_text = self.statuses[d][2] - if status_text: - all_statuses.append(devinfo.name + '\n\t' + status_text) - else: - all_statuses.append(devinfo.name) - - if all_statuses: + if self.listener and self.listener.active: + if self.devices: + all_statuses = [] + for d in self.devices: + devinfo = self.devices[d] + status_text = self.statuses[d][2] + if status_text: + if ' ' in status_text: + all_statuses.append(devinfo.name) + all_statuses.append(' ' + status_text) + else: + all_statuses.append(devinfo.name + ' ' + status_text) + else: + all_statuses.append(devinfo.name) tooltip = '\n'.join(all_statuses) else: tooltip = NO_DEVICES else: tooltip = NO_RECEIVER - # logging.debug("tooltip %s", tooltip) self.status_icon.set_tooltip_text(tooltip) + def activate_icon(self, status_icon): + if self.listener and self.listener.active: + if self.devices: + for devinfo in list(self.devices.values()): + _, status_code, status_text = self.statuses[devinfo.number] + notify_desktop(status_code, devinfo.name, status_text) + else: + notify_desktop(1, UNIFYING_RECEIVER, NO_DEVICES) + self.last_receiver_status = 1 + else: + notify_desktop(-1, UNIFYING_RECEIVER, NO_RECEIVER) + self.last_receiver_status = -1 + if __name__ == '__main__': - logging.basicConfig(level=6) - logging.captureWarnings(True) + import argparse + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument('-v', '--verbose', action='count', default=0, + help='increase the logger verbosity') + args = arg_parser.parse_args() + + log_level = logging.root.level - 10 * args.verbose + logging.root.setLevel(log_level if log_level > 0 else 1) status_icon = Gtk.StatusIcon.new_from_file('images/' + UNIFYING_RECEIVER + '.png') status_icon.set_title(APP_TITLE) status_icon.set_name(APP_TITLE) status_icon.set_tooltip_text('Initializing...') - status_icon.connect('popup_menu', Gtk.main_quit) GObject.threads_init() status_thread = StatusThread(status_icon) status_thread.start() + + status_icon.connect('popup_menu', Gtk.main_quit) + status_icon.connect('activate', status_thread.activate_icon) Gtk.main() + status_thread.stop() + clear_notifications()