greatly simplified loading of icons

This commit is contained in:
Daniel Pavel 2012-10-05 02:39:32 +03:00
parent 9ba6aa1c31
commit cb3a42c04e
42 changed files with 376 additions and 447 deletions

View File

@ -6,11 +6,12 @@ import threading
from gi.repository import Gtk
from gi.repository import GObject
from . import constants as C
from .watcher import WatcherThread
from . import ui
APP_TITLE = 'Solaar'
def _status_updated(watcher, icon, window):
while True:
watcher.status_changed.wait()
@ -18,36 +19,32 @@ def _status_updated(watcher, icon, window):
watcher.status_changed.clear()
if icon:
GObject.idle_add(icon.set_tooltip_text, text)
GObject.idle_add(icon.set_tooltip_markup, text)
if window:
ur_detected = watcher.has_receiver()
devices = [ watcher.devices[k] for k in watcher.devices ] if ur_detected else []
GObject.idle_add(ui.window.update, window, ur_detected, devices)
GObject.idle_add(ui.window.update, window, dict(watcher.devices))
# def _pair_new_device(trigger, watcher):
# pass
def run(images_path):
def run():
GObject.threads_init()
ui.init(images_path)
ui.notify.start(C.APP_TITLE, ui.image)
ui.notify.start(APP_TITLE)
watcher = WatcherThread(ui.notify.show)
watcher.start()
window = ui.window.create(C.APP_TITLE, ui.image)
window = ui.window.create(APP_TITLE, watcher.devices[0])
menu_actions = [('Scan all devices', watcher.request_all_statuses),
menu_actions = [('Scan all devices', watcher.full_scan),
# ('Pair new device', _pair_new_device, watcher),
None,
('Quit', Gtk.main_quit)]
click_action = (ui.window.toggle, window) if window else None
tray_icon = ui.icon.create(ui.image('icon'), C.APP_TITLE, menu_actions, click_action)
tray_icon = ui.icon.create(APP_TITLE, menu_actions, (ui.window.toggle, window))
ui_update_thread = threading.Thread(target=_status_updated, name='ui_update', args=(watcher, tray_icon, window))
ui_update_thread.daemon = True

View File

@ -1,10 +0,0 @@
#
# Commonly used strings
#
APP_TITLE = 'Solaar'
UNIFYING_RECEIVER = 'Unifying Receiver'
NO_DEVICES = 'No devices attached.'
SCANNING = 'Initializing...'
NO_RECEIVER = 'Unifying Receiver not found.'
FOUND_RECEIVER = 'Unifying Receiver found.'

View File

@ -1,26 +1,3 @@
# pass
import os.path as _os_path
from . import (icon, notify, window)
_images_path = None
_IMAGES = {}
def init(images_path=None):
global _images_path
_images_path = images_path
def image(name):
if name in _IMAGES:
return _IMAGES[name]
if _images_path:
path = _os_path.join(_images_path, name + '.png')
if _os_path.isfile(path):
_IMAGES[name] = path
return path
else:
_IMAGES[name] = None

View File

@ -9,8 +9,8 @@ def _show_icon_menu(icon, button, time, menu):
menu.popup(None, None, icon.position_menu, icon, button, time)
def create(app_icon, title, menu_actions, click_action=None):
icon = Gtk.StatusIcon.new_from_file(app_icon)
def create(title, menu_actions, click_action=None):
icon = Gtk.StatusIcon.new_from_icon_name(title)
icon.set_title(title)
icon.set_name(title)
@ -28,12 +28,11 @@ def create(app_icon, title, menu_actions, click_action=None):
for action in menu_actions:
if action:
item = Gtk.MenuItem(action[0])
function = action[1]
args = action[2:] if len(action) > 2 else ()
item.connect('activate', function, *args)
menu.append(item)
item.connect('activate', action[1], *args)
else:
menu.append(Gtk.SeparatorMenuItem())
item = Gtk.SeparatorMenuItem()
menu.append(item)
menu.show_all()
icon.connect('popup_menu', _show_icon_menu, menu)
else:

View File

@ -7,28 +7,30 @@ try:
available = True
_app_title = None
_images = lambda x: None
_notifications = {}
def start(app_title, images=None):
global _app_title, _images
def start(app_title):
"""Init the notifications system."""
_notify.init(app_title)
_app_title = app_title
_images = images or (lambda x: None)
return True
def stop():
global _app_title
_app_title = None
all(n.close() for n in list(_notifications.values()))
_notify.uninit()
"""Stop the notifications system."""
for n in list(_notifications.values()):
try:
n.close()
except Exception:
# DBUS
pass
_notifications.clear()
_notify.uninit()
def show(status, title, text, icon=None):
if not _app_title:
def show(status_code, title, text, icon=None):
"""Show a notification with title and text."""
if not available:
return
if title in _notifications:
@ -40,11 +42,14 @@ try:
# there's no need to show the same notification twice in a row
return
path = _images('devices/' + title if icon is None else icon)
icon = ('error' if status < 0 else 'info') if path is None else path
icon = icon or title
notification.update(title, text, title)
try:
notification.show()
except Exception:
# DBUS
pass
notification.update(title, text, icon)
notification.show()
except ImportError:
import logging
@ -53,4 +58,4 @@ except ImportError:
available = False
def start(app_title): pass
def stop(): pass
def show(status, title, text, icon=None): pass
def show(status_code, title, text, icon=None): pass

View File

@ -4,210 +4,164 @@
from gi.repository import Gtk
from gi.repository import Gdk
from gi.repository import GdkPixbuf
from .. import constants as C
_DEVICE_ICON_SIZE = 48
_STATUS_ICON_SIZE = 64
_DEVICE_ICON_SIZE = Gtk.IconSize.DIALOG
_STATUS_ICON_SIZE = Gtk.IconSize.DIALOG
_PLACEHOLDER = '~'
_images = None
_MAX_DEVICES = 7
_ICONS = {}
_MAX_DEVICES = 6
def _icon(icon, title, size=_DEVICE_ICON_SIZE, fallback=None):
icon = icon or Gtk.Image()
if title and title in _ICONS:
icon.set_from_pixbuf(_ICONS[title])
else:
icon_file = _images(title) if title else None
if icon_file:
pixbuf = GdkPixbuf.Pixbuf().new_from_file(icon_file)
if pixbuf.get_width() > size or pixbuf.get_height() > size:
if pixbuf.get_width() > pixbuf.get_height():
new_width = size
new_height = size * pixbuf.get_height() / pixbuf.get_width()
else:
new_width = size * pixbuf.get_width() / pixbuf.get_height()
new_height = size
pixbuf = pixbuf.scale_simple(new_width, new_height, GdkPixbuf.InterpType.HYPER)
icon.set_from_pixbuf(pixbuf)
_ICONS[title] = pixbuf
elif fallback:
icon.set_from_icon_name(fallback, size if size < _DEVICE_ICON_SIZE else Gtk.IconSize.DIALOG)
if size >= _DEVICE_ICON_SIZE:
icon.set_size_request(size, size)
return icon
def update(window, ur_available, devices):
def update(window, devices):
if not window or not window.get_child():
return
controls = list(window.get_child().get_children())
first = controls[0]
first.set_visible(not ur_available or not devices)
if ur_available:
ur_status = C.FOUND_RECEIVER if devices else C.NO_DEVICES
else:
ur_status = C.NO_RECEIVER
_, label = first.get_children()
label.set_markup('<big><b>%s</b></big>\n%s' % (C.UNIFYING_RECEIVER, ur_status))
first = controls[0].get_child()
icon, label = first.get_children()
rstatus = devices[0]
label.set_markup('<big><b>%s</b></big>\n%s' % (rstatus.name, rstatus.text))
for index in range(1, _MAX_DEVICES):
box = controls[index]
devstatus = [d for d in devices if d.number == index]
devstatus = devstatus[0] if devstatus else None
box.set_visible(devstatus is not None)
for index in range(1, 1 + _MAX_DEVICES):
devstatus = devices.get(index)
controls[index].set_visible(devstatus is not None)
box = controls[index].get_child()
icon, expander = box.get_children()
if devstatus:
box.set_sensitive(devstatus.code >= 0)
icon, expander = box.get_children()
if not expander.get_data('devstatus'):
expander.set_data('devstatus', devstatus,)
_icon(icon, 'devices/' + devstatus.name, fallback=devstatus.type.lower())
if icon.get_name() != devstatus.name:
icon.set_name(devstatus.name)
icon.set_from_icon_name(devstatus.name, _DEVICE_ICON_SIZE)
label = expander.get_label_widget()
if expander.get_expanded():
label.set_markup('<big><b>%s</b></big>' % devstatus.name)
if devstatus.code < 0:
expander.set_sensitive(False)
expander.set_expanded(False)
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.text))
else:
label.set_markup('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.props['text']))
expander.set_sensitive(True)
ebox = expander.get_child()
ebox = expander.get_child()
texts = []
# refresh_button = ebox.get_children()[0]
# refresh_button.connect('activate', devstatus.refresh)
light_icon = ebox.get_children()[-2]
light_level = getattr(devstatus, 'light_level', None)
light_icon.set_visible(light_level is not None)
if light_level is not None:
texts.append('Light: %d lux' % light_level)
icon_name = 'light_%02d' % (20 * ((light_level + 50) // 100))
light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
light_icon.set_tooltip_text(texts[-1])
texts = []
battery_icon = ebox.get_children()[-1]
battery_level = getattr(devstatus, 'battery_level', None)
battery_icon.set_sensitive(battery_level is not None)
if battery_level is None:
battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
battery_icon.set_tooltip_text('Battery: unknown')
else:
texts.append('Battery: %d%%' % battery_level)
icon_name = 'battery_%02d' % (20 * ((battery_level + 10) // 20))
battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
battery_icon.set_tooltip_text(texts[-1])
battery_icon = ebox.get_children()[-1]
if 'battery-level' in devstatus.props:
level = devstatus.props['battery-level']
icon_name = 'battery/' + str((level + 10) // 20)
_icon(battery_icon, icon_name, _STATUS_ICON_SIZE)
texts.append('Battery: ' + str(level) + '%')
else:
_icon(battery_icon, 'battery/unknown', _STATUS_ICON_SIZE)
texts.append('Battery: unknown')
battery_icon.set_tooltip_text(texts[-1])
battery_status = getattr(devstatus, 'battery_status', None)
if battery_status is not None:
texts.append(battery_status)
battery_icon.set_tooltip_text(battery_icon.get_tooltip_text() + '\n' + battery_status)
light_icon = ebox.get_children()[-2]
if 'light-level' in devstatus.props:
lux = devstatus.props['light-level']
icon_name = 'light/' + str((lux + 50) // 100)
_icon(light_icon, icon_name, _STATUS_ICON_SIZE)
texts.append('Light: ' + str(lux) + ' lux')
light_icon.set_tooltip_text(texts[-1])
light_icon.set_visible(True)
else:
light_icon.set_visible(False)
label = ebox.get_children()[-3]
label.set_text('\n'.join(texts))
def _expander_activate(expander):
devstatus = expander.get_data('devstatus')
if devstatus:
label = expander.get_label_widget()
if expander.get_expanded():
label.set_markup('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.props['text']))
if texts:
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, ', '.join(texts)))
else:
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.text))
else:
label.set_markup('<big><b>%s</b></big>' % devstatus.name)
icon.set_name(_PLACEHOLDER)
expander.set_label(_PLACEHOLDER)
def _device_box(title):
icon = _icon(None, 'devices/' + title)
icon = Gtk.Image.new_from_icon_name(title, _DEVICE_ICON_SIZE)
icon.set_alignment(0.5, 0)
icon.set_name(title)
label = Gtk.Label()
label.set_markup('<big><b>%s</b></big>' % title)
label.set_alignment(0, 0.5)
label.set_can_focus(False)
box = Gtk.HBox(spacing=10)
box = Gtk.HBox(homogeneous=False, spacing=8)
box.pack_start(icon, False, False, 0)
box.set_border_width(8)
if title == C.UNIFYING_RECEIVER:
box.add(label)
else:
if title == _PLACEHOLDER:
expander = Gtk.Expander()
expander.set_can_focus(False)
expander.set_label_widget(label)
expander.connect('activate', _expander_activate)
expander.set_label(_PLACEHOLDER)
expander.set_use_markup(True)
expander.set_spacing(4)
ebox = Gtk.HBox(False, 10)
ebox.set_border_width(4)
ebox = Gtk.HBox(False, 8)
# refresh_button = Gtk.Button()
# refresh_button.set_image(_icon(None, None, size=Gtk.IconSize.SMALL_TOOLBAR, fallback='reload'))
# refresh_button.set_focus_on_click(False)
# refresh_button.set_can_focus(False)
# refresh_button.set_image_position(Gtk.PositionType.TOP)
# refresh_button.set_alignment(0.5, 0.5)
# refresh_button.set_relief(Gtk.ReliefStyle.NONE)
# refresh_button.set_size_request(20, 20)
# refresh_button.set_tooltip_text('Refresh')
# ebox.pack_start(refresh_button, False, False, 2)
label = Gtk.Label()
label.set_alignment(0, 0.5)
ebox.pack_start(label, False, True, 8)
light_icon = _icon(None, 'light/unknown', _STATUS_ICON_SIZE)
ebox.pack_end(light_icon, False, True, 0)
battery_icon = _icon(None, 'battery/unknown', _STATUS_ICON_SIZE)
battery_icon = Gtk.Image.new_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
ebox.pack_end(battery_icon, False, True, 0)
light_icon = Gtk.Image.new_from_icon_name('light_unknown', _STATUS_ICON_SIZE)
ebox.pack_end(light_icon, False, True, 0)
expander.add(ebox)
box.pack_start(expander, True, True, 1)
else:
label = Gtk.Label()
label.set_can_focus(False)
label.set_markup('<big><b>%s</b></big>' % title)
label.set_alignment(0, 0)
box.add(label)
box.show_all()
box.set_visible(title != _PLACEHOLDER)
return box
frame = Gtk.Frame()
frame.add(box)
frame.show_all()
frame.set_visible(title != _PLACEHOLDER)
return frame
def create(title, images=None):
global _images
_images = images or (lambda x: None)
def create(title, rstatus):
vbox = Gtk.VBox(homogeneous=False, spacing=4)
vbox.set_border_width(4)
vbox = Gtk.VBox(spacing=8)
vbox.set_border_width(6)
vbox.add(_device_box(C.UNIFYING_RECEIVER))
for i in range(1, _MAX_DEVICES):
vbox.add(_device_box(rstatus.name))
for i in range(1, 1 + _MAX_DEVICES):
vbox.add(_device_box(_PLACEHOLDER))
vbox.set_visible(True)
window = Gtk.Window() # Gtk.WindowType.POPUP)
window = Gtk.Window()
window.set_title(title)
window.set_icon_from_file(_images('icon'))
window.set_icon_name(title)
window.set_keep_above(True)
window.set_decorated(False)
window.set_skip_taskbar_hint(True)
window.set_skip_pager_hint(True)
# window.set_skip_taskbar_hint(True)
# window.set_skip_pager_hint(True)
window.set_deletable(False)
window.set_resizable(False)
window.set_position(Gtk.WindowPosition.MOUSE)
window.set_type_hint(Gdk.WindowTypeHint.TOOLTIP)
window.connect('focus-out-event', _hide)
window.set_type_hint(Gdk.WindowTypeHint.UTILITY)
window.set_wmclass(title, 'status-window')
window.set_role('status-window')
window.connect('window-state-event', _state_event)
window.connect('delete-event', lambda w, e: toggle(None, window) or True)
window.add(vbox)
window.present()
return window
def _hide(window, _):
window.set_visible(False)
def _state_event(window, event):
if event.new_window_state & Gdk.WindowState.ICONIFIED:
position = window.get_position()
window.hide()
window.deiconify()
window.move(*position)
return True
def toggle(_, window):
if window.get_visible():
window.set_visible(False)
position = window.get_position()
window.hide()
window.move(*position)
else:
window.present()

View File

@ -6,25 +6,31 @@ import logging
import threading
import time
import constants as C
from logitech.unifying_receiver import api
from logitech.unifying_receiver.listener import EventsListener
from logitech import devices
_STATUS_TIMEOUT = 97 # seconds
_THREAD_SLEEP = 7 # seconds
_FORGET_TIMEOUT = 5 * 60 # seconds
_STATUS_TIMEOUT = 34 # seconds
_THREAD_SLEEP = 5 # seconds
_UNIFYING_RECEIVER = 'Unifying Receiver'
_NO_DEVICES = 'No devices attached.'
_SCANNING = 'Initializing...'
_NO_RECEIVER = 'not found'
_FOUND_RECEIVER = 'found'
class _DevStatus(api.AttachedDeviceInfo):
timestamp = time.time()
code = devices.STATUS.CONNECTED
props = {devices.PROPS.TEXT: devices.STATUS_NAME[devices.STATUS.CONNECTED]}
code = devices.STATUS.UNKNOWN
text = ''
refresh = None
class WatcherThread(threading.Thread):
"""Keeps a map of all attached devices and their statuses."""
def __init__(self, notify_callback=None):
super(WatcherThread, self).__init__(name='WatcherThread')
self.daemon = True
@ -35,39 +41,46 @@ class WatcherThread(threading.Thread):
self.status_changed = threading.Event()
self.listener = None
self.devices = {}
self.rstatus = _DevStatus(0, _UNIFYING_RECEIVER, _UNIFYING_RECEIVER, None, None)
self.rstatus.refresh = self.full_scan
self.devices = {0: self.rstatus}
def run(self):
self.active = True
self._notify(0, C.UNIFYING_RECEIVER, C.SCANNING)
self._notify(0, _UNIFYING_RECEIVER, _SCANNING)
while self.active:
if self.listener is None:
receiver = api.open()
if receiver:
self._notify(1, C.UNIFYING_RECEIVER, C.FOUND_RECEIVER)
self._device_status_changed(self.rstatus, (devices.STATUS.CONNECTED, _FOUND_RECEIVER))
for devinfo in api.list_devices(receiver):
devstatus = _DevStatus(*devinfo)
self.devices[devinfo.number] = devstatus
self._notify(devices.STATUS.CONNECTED, devstatus.name, devices.STATUS_NAME[devices.STATUS.CONNECTED])
self._new_device(devinfo)
if len(self.devices) == 1:
self._device_status_changed(self.rstatus, (devices.STATUS.CONNECTED, _NO_DEVICES))
self._update_status_text()
self.listener = EventsListener(receiver, self._events_callback)
self.listener.start()
self._update_status()
else:
self._notify(-1, C.UNIFYING_RECEIVER, C.NO_RECEIVER)
self._device_status_changed(self.rstatus, (devices.STATUS.UNAVAILABLE, _NO_RECEIVER))
elif not self.listener.active:
self.listener = None
self._notify(-1, C.UNIFYING_RECEIVER, C.NO_RECEIVER)
self.devices.clear()
self._device_status_changed(self.rstatus, (devices.STATUS.UNAVAILABLE, _NO_RECEIVER))
self.devices = {0: self.rstatus}
if self.active:
update_icon = True
if self.listener and self.devices:
if self.listener and len(self.devices) > 1:
update_icon &= self._check_old_statuses()
if self.active:
if update_icon:
self._update_status()
self._update_status_text()
time.sleep(_THREAD_SLEEP)
def stop(self):
@ -76,49 +89,55 @@ class WatcherThread(threading.Thread):
self.listener.stop()
api.close(self.listener.receiver)
def has_receiver(self):
return self.listener is not None and self.listener.active
def request_all_statuses(self, _=None):
def full_scan(self, _=None):
updated = False
for d in range(1, 7):
devstatus = self.devices.get(d)
for devnumber in range(1, 1 + api.C.MAX_ATTACHED_DEVICES):
devstatus = self.devices.get(devnumber)
if devstatus:
status = devices.request_status(devstatus, self.listener)
updated |= self._device_status_changed(devstatus, status)
else:
devstatus = self._new_device(d)
devstatus = self._new_device(devnumber)
updated |= devstatus is not None
if updated:
self._update_status()
self._update_status_text()
def _request_status(self, devstatus):
if devstatus:
status = devices.request_status(devstatus, self.listener)
self._device_status_changed(devstatus, status)
def _check_old_statuses(self):
updated = False
for devstatus in list(self.devices.values()):
if time.time() - devstatus.timestamp > _STATUS_TIMEOUT:
status = devices.ping(devstatus, self.listener)
updated |= self._device_status_changed(devstatus, status)
if devstatus != self.rstatus:
if time.time() - devstatus.timestamp > _STATUS_TIMEOUT:
status = devices.ping(devstatus, self.listener)
updated |= self._device_status_changed(devstatus, status)
return updated
def _new_device(self, device):
devinfo = api.get_device_info(self.listener.receiver, device)
if devinfo:
devstatus = _DevStatus(*devinfo)
self.devices[device] = devstatus
self._notify(devstatus.code, devstatus.name, devstatus.props[devices.PROPS.TEXT])
return devinfo
def _new_device(self, dev):
if type(dev) == int:
dev = api.get_device_info(self.listener.receiver, dev)
logging.debug("new devstatus from %s", dev)
if dev:
devstatus = _DevStatus(*dev)
devstatus.refresh = self._request_status
self.devices[dev.number] = devstatus
self._device_status_changed(devstatus, devices.STATUS.CONNECTED)
return devstatus
def _events_callback(self, code, device, data):
logging.debug("%s: event %02x %d %s", time.asctime(), code, device, repr(data))
def _events_callback(self, code, devnumber, data):
logging.debug("%s: event %02x %d %s", time.asctime(), code, devnumber, repr(data))
updated = False
if device in self.devices:
devstatus = self.devices[device]
if devnumber in self.devices:
devstatus = self.devices[devnumber]
if code == 0x10 and data[0] == 'b\x8F':
updated = True
self._device_status_changed(devstatus, devices.STATUS.UNAVAILABLE)
@ -127,17 +146,16 @@ class WatcherThread(threading.Thread):
updated |= self._device_status_changed(devstatus, status)
else:
logging.warn("unknown event code %02x", code)
elif device:
logging.debug("got event (%d, %d, %s) for new device", code, device, repr(data))
self._new_device(device)
elif devnumber:
self._new_device(devnumber)
updated = True
else:
logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data)
logging.warn("don't know how to handle event (%d, %d, %s)", code, devnumber, data)
if updated:
self._update_status()
self._update_status_text()
def _device_status_changed(self, devstatus, status):
def _device_status_changed(self, devstatus, status=None):
if status is None:
return False
@ -147,15 +165,19 @@ class WatcherThread(threading.Thread):
if type(status) == int:
devstatus.code = status
if devstatus.code in devices.STATUS_NAME:
devstatus.props[devices.PROPS.TEXT] = devices.STATUS_NAME[devstatus.code]
devstatus.text = devices.STATUS_NAME[devstatus.code]
else:
devstatus.code = status[0]
devstatus.props.update(status[1])
if isinstance(status[1], str):
devstatus.text = status[1]
elif isinstance(status[1], dict):
for key, value in status[1].items():
setattr(devstatus, key, value)
if old_status_code != devstatus.code:
logging.debug("%s: device status changed %s => %s: %s", time.asctime(), old_status_code, devstatus.code, devstatus.props)
# if not (devstatus.code == 0 and old_status_code > 0):
self._notify(devstatus.code, devstatus.name, devstatus.props[devices.PROPS.TEXT])
logging.debug("%s: device '%s' status changed %s => %s: %s", time.asctime(), devstatus.name, old_status_code, devstatus.code, devstatus.text)
if devstatus.code // 256 != old_status_code // 256:
self._notify(devstatus.code, devstatus.name, devstatus.text)
return True
@ -163,38 +185,30 @@ class WatcherThread(threading.Thread):
if self.notify:
self.notify(*args)
def notify_full(self):
if self.listener and self.listener.active:
if self.devices:
for devstatus in self.devices.values():
self._notify(0, devstatus.name, devstatus.props[devices.PROPS.TEXT])
else:
self._notify(0, C.UNIFYING_RECEIVER, C.NO_DEVICES)
else:
self._notify(-1, C.UNIFYING_RECEIVER, C.NO_RECEIVER)
def _update_status(self):
def _update_status_text(self):
last_status_text = self.status_text
if self.listener and self.listener.active:
if self.devices:
all_statuses = []
for d in self.devices:
devstatus = self.devices[d]
status_text = devstatus.props[devices.PROPS.TEXT]
if status_text:
if ' ' in status_text:
all_statuses.append(devstatus.name)
all_statuses.append(' ' + status_text)
else:
all_statuses.append(devstatus.name + ' ' + status_text)
else:
all_statuses.append(devstatus.name)
self.status_text = '\n'.join(all_statuses)
else:
self.status_text = C.NO_DEVICES
if self.rstatus.code < 0:
self.status_text = '<b>' + self.rstatus.name + '</b>: ' + self.rstatus.text
else:
self.status_text = C.NO_RECEIVER
all_statuses = []
for devnumber in range(1, 1 + api.C.MAX_ATTACHED_DEVICES):
if devnumber in self.devices:
devstatus = self.devices[devnumber]
if devstatus.text:
if ' ' in devstatus.text:
all_statuses.append('<b>' + devstatus.name + '</b>')
all_statuses.append(' ' + devstatus.text)
else:
all_statuses.append('<b>' + devstatus.name + '</b>: ' + devstatus.text)
else:
all_statuses.append('<b>' + devstatus.name + '</b>')
all_statuses.append('')
if all_statuses:
self.status_text = '\n'.join(all_statuses).rstrip('\n')
else:
self.status_text = '<b>' + self.rstatus.name + '</b>: ' + _NO_DEVICES
if self.status_text != last_status_text:
self.status_changed.set()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.2 KiB

View File

@ -4,27 +4,21 @@
STATUS = type('STATUS', (),
dict(
UNKNOWN=None,
UNKNOWN=-9999,
UNAVAILABLE=-1,
CONNECTED=0,
# ACTIVE=1,
))
STATUS_NAME = {
STATUS.UNAVAILABLE: 'disconnected?',
STATUS.CONNECTED: 'connected',
}
PROPS = type('PROPS', (),
dict(
TEXT='text',
BATTERY_LEVEL='battery-level',
BATTERY_STATUS='battery-status',
LIGHT_LUX='lux',
LIGHT_LEVEL='light-level',
BATTERY_LEVEL='battery_level',
BATTERY_STATUS='battery_status',
LIGHT_LEVEL='light_level',
))
from collections import defaultdict
STATUS_NAME = defaultdict(lambda x: None)
STATUS_NAME[STATUS.UNAVAILABLE] = 'disconnected'
STATUS_NAME[STATUS.CONNECTED] = 'connected'
# STATUS_NAME[STATUS.ACTIVE] = 'active'
del defaultdict

View File

@ -14,8 +14,6 @@ from . import constants as C
NAME = 'Wireless Solar Keyboard K750'
_CHARGE_LIMITS = (75, 40, 20, 10, -1)
#
#
#
@ -26,23 +24,24 @@ def _trigger_solar_charge_events(receiver, devinfo):
features_array=devinfo.features)
def _charge_status(data):
def _charge_status(data, hasLux=False):
charge, lux = _unpack('!BH', data[2:5])
d = {}
for i in range(0, len(_CHARGE_LIMITS)):
if charge >= _CHARGE_LIMITS[i]:
_CHARGE_LEVELS = (10, 25, 256)
for i in range(0, len(_CHARGE_LEVELS)):
if charge < _CHARGE_LEVELS[i]:
charge_index = i
break
else:
charge_index = 0
d[C.PROPS.BATTERY_LEVEL] = charge
text = 'Battery %d%%' % charge
if lux > 0:
if hasLux:
d[C.PROPS.LIGHT_LEVEL] = lux
text = 'Light: %d lux' % lux + ', ' + text
else:
d[C.PROPS.LIGHT_LEVEL] = None
d[C.PROPS.TEXT] = text
return 0x10 << charge_index, d
@ -61,7 +60,7 @@ def process_event(devinfo, listener, data):
if data[:2] == b'\x09\x10' and data[7:11] == b'GOOD':
# regular solar charge events
return _charge_status(data)
return _charge_status(data, True)
if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD':
logging.debug("Solar key pressed")

View File

@ -42,7 +42,7 @@ def open():
close = _base.close
def request(handle, device, feature, function=b'\x00', params=b'', features_array=None):
def request(handle, devnumber, feature, function=b'\x00', params=b'', features_array=None):
"""Makes a feature call to the device, and returns the reply data.
Basically a write() followed by (possibly multiple) reads, until a reply
@ -65,22 +65,22 @@ def request(handle, device, feature, function=b'\x00', params=b'', features_arra
feature_index = b'\x00'
else:
if features_array is None:
features_array = get_device_features(handle, device)
features_array = get_device_features(handle, devnumber)
if features_array is None:
_l.log(_LOG_LEVEL, "(%d,%d) no features array available", handle, device)
_l.log(_LOG_LEVEL, "(%d,%d) no features array available", handle, devnumber)
return None
if feature in features_array:
feature_index = _pack('!B', features_array.index(feature))
if feature_index is None:
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, _hexlify(feature), C.FEATURE_NAME[feature])
raise E.FeatureNotSupported(device, feature)
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature])
raise E.FeatureNotSupported(devnumber, feature)
return _base.request(handle, device, feature_index + function, params)
return _base.request(handle, devnumber, feature_index + function, params)
def ping(handle, device):
"""Pings a device number to check if it is attached to the UR.
def ping(handle, devnumber):
"""Pings a device to check if it is attached to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
@ -92,39 +92,39 @@ def ping(handle, device):
if not reply:
return None
reply_code, reply_device, reply_data = reply
reply_code, reply_devnumber, reply_data = reply
if reply_device != device:
if reply_devnumber != devnumber:
# oops
_l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, _hexlify(reply_data))
_unhandled._publish(reply_code, reply_device, reply_data)
_l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, devnumber, reply_devnumber, _hexlify(reply_data))
_unhandled._publish(reply_code, reply_devnumber, reply_data)
return _status(_base.read(handle))
if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4:5] == ping_marker):
# ping ok
_l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, _hexlify(reply_data))
_l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, devnumber, _hexlify(reply_data))
return True
if (reply_code == 0x10 and reply_data[:2] == b'\x8F\x00'):
# ping failed
_l.log(_LOG_LEVEL, "(%d,%d) ping: device not present", handle, device)
_l.log(_LOG_LEVEL, "(%d,%d) ping: device not present", handle, devnumber)
return False
if (reply_code == 0x11 and reply_data[:2] == b'\x09\x00' and len(reply_data) == 18 and reply_data[7:11] == b'GOOD'):
# some devices may reply with a SOLAR_CHARGE event before the
# ping_ok reply, especially right after the device connected to the
# receiver
_l.log(_LOG_LEVEL, "(%d,%d) ping: solar status [%s]", handle, device, _hexlify(reply_data))
_unhandled._publish(reply_code, reply_device, reply_data)
_l.log(_LOG_LEVEL, "(%d,%d) ping: solar status [%s]", handle, devnumber, _hexlify(reply_data))
_unhandled._publish(reply_code, reply_devnumber, reply_data)
return _status(_base.read(handle))
# ugh
_l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device: %d=[%s]", handle, device, reply_code, _hexlify(reply_data))
_unhandled._publish(reply_code, reply_device, reply_data)
_l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device: %d=[%s]", handle, devnumber, reply_code, _hexlify(reply_data))
_unhandled._publish(reply_code, reply_devnumber, reply_data)
return None
_l.log(_LOG_LEVEL, "(%d,%d) pinging", handle, device)
_base.write(handle, device, b'\x00\x10\x00\x00' + ping_marker)
_l.log(_LOG_LEVEL, "(%d,%d) pinging", handle, devnumber)
_base.write(handle, devnumber, b'\x00\x10\x00\x00' + ping_marker)
# pings may take a while to reply success
return _status(_base.read(handle, _base.DEFAULT_TIMEOUT * 3))
@ -136,12 +136,12 @@ def find_device_by_name(handle, device_name):
"""
_l.log(_LOG_LEVEL, "(%d,) searching for device '%s'", handle, device_name)
for device in range(1, 1 + _base.MAX_ATTACHED_DEVICES):
features_array = get_device_features(handle, device)
for devnumber in range(1, 1 + C.MAX_ATTACHED_DEVICES):
features_array = get_device_features(handle, devnumber)
if features_array:
d_name = get_device_name(handle, device, features_array)
d_name = get_device_name(handle, devnumber, features_array)
if d_name == device_name:
return get_device_info(handle, device, device_name=d_name, features_array=features_array)
return get_device_info(handle, devnumber, device_name=d_name, features_array=features_array)
def list_devices(handle):
@ -153,7 +153,7 @@ def list_devices(handle):
devices = []
for device in range(1, 1 + _base.MAX_ATTACHED_DEVICES):
for device in range(1, 1 + C.MAX_ATTACHED_DEVICES):
features_array = get_device_features(handle, device)
if features_array:
devices.append(get_device_info(handle, device, features_array=features_array))
@ -161,67 +161,67 @@ def list_devices(handle):
return devices
def get_device_info(handle, device, device_name=None, features_array=None):
def get_device_info(handle, devnumber, device_name=None, features_array=None):
"""Gets the complete info for a device (type, name, firmwares, and features_array).
:returns: an AttachedDeviceInfo tuple, or ``None``.
"""
if features_array is None:
features_array = get_device_features(handle, device)
features_array = get_device_features(handle, devnumber)
if features_array is None:
return None
d_type = get_device_type(handle, device, features_array)
d_name = get_device_name(handle, device, features_array) if device_name is None else device_name
d_firmware = get_device_firmware(handle, device, features_array)
devinfo = AttachedDeviceInfo(device, d_type, d_name, d_firmware, features_array)
_l.log(_LOG_LEVEL, "(%d,%d) found device %s", handle, device, devinfo)
d_type = get_device_type(handle, devnumber, features_array)
d_name = get_device_name(handle, devnumber, features_array) if device_name is None else device_name
d_firmware = get_device_firmware(handle, devnumber, features_array)
devinfo = AttachedDeviceInfo(devnumber, d_type, d_name, d_firmware, features_array)
_l.log(_LOG_LEVEL, "(%d,%d) found device %s", handle, devnumber, devinfo)
return devinfo
def get_feature_index(handle, device, feature):
def get_feature_index(handle, devnumber, feature):
"""Reads the index of a device's feature.
:returns: An int, or ``None`` if the feature is not available.
"""
_l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, _hexlify(feature), C.FEATURE_NAME[feature])
_l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature])
if len(feature) != 2:
raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature)
# FEATURE.ROOT should always be available for any attached devices
reply = _base.request(handle, device, C.FEATURE.ROOT, feature)
reply = _base.request(handle, devnumber, C.FEATURE.ROOT, feature)
if reply:
# only consider active and supported features
feature_index = ord(reply[0:1])
if feature_index:
feature_flags = ord(reply[1:2]) & 0xE0
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, _hexlify(feature), C.FEATURE_NAME[feature], feature_index, feature_flags)
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature], feature_index, feature_flags)
if feature_flags == 0:
return feature_index
if feature_flags & 0x80:
_l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, _hexlify(feature), C.FEATURE_NAME[feature])
_l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature])
if feature_flags & 0x40:
_l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, _hexlify(feature), C.FEATURE_NAME[feature])
_l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature])
if feature_flags & 0x20:
_l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, _hexlify(feature), C.FEATURE_NAME[feature])
raise E.FeatureNotSupported(device, feature)
_l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature])
raise E.FeatureNotSupported(devnumber, feature)
else:
_l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, _hexlify(feature), C.FEATURE_NAME[feature])
raise E.FeatureNotSupported(device, feature)
_l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature])
raise E.FeatureNotSupported(devnumber, feature)
def get_device_features(handle, device):
def get_device_features(handle, devnumber):
"""Returns an array of feature ids.
Their position in the array is the index to be used when requesting that
feature on the device.
"""
_l.log(_LOG_LEVEL, "(%d,%d) get device features", handle, device)
_l.log(_LOG_LEVEL, "(%d,%d) get device features", handle, devnumber)
# get the index of the FEATURE_SET
# FEATURE.ROOT should always be available for all devices
fs_index = _base.request(handle, device, C.FEATURE.ROOT, C.FEATURE.FEATURE_SET)
fs_index = _base.request(handle, devnumber, C.FEATURE.ROOT, C.FEATURE.FEATURE_SET)
if fs_index is None:
# _l.warn("(%d,%d) FEATURE_SET not available", handle, device)
return None
@ -231,24 +231,24 @@ def get_device_features(handle, device):
# even if unknown.
# get the number of active features the device has
features_count = _base.request(handle, device, fs_index + b'\x00')
features_count = _base.request(handle, devnumber, fs_index + b'\x00')
if not features_count:
# this can happen if the device disappeard since the fs_index request
# otherwise we should get at least a count of 1 (the FEATURE_SET we've just used above)
_l.log(_LOG_LEVEL, "(%d,%d) no features available?!", handle, device)
_l.log(_LOG_LEVEL, "(%d,%d) no features available?!", handle, devnumber)
return None
features_count = ord(features_count[:1])
_l.log(_LOG_LEVEL, "(%d,%d) found %d features", handle, device, features_count)
_l.log(_LOG_LEVEL, "(%d,%d) found %d features", handle, devnumber, features_count)
features = [None] * 0x20
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = _base.request(handle, device, fs_index + b'\x10', _pack('!B', index))
feature = _base.request(handle, devnumber, fs_index + b'\x10', _pack('!B', index))
if feature:
feature = feature[0:2].upper()
features[index] = feature
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, _hexlify(feature), C.FEATURE_NAME[feature], index)
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, devnumber, _hexlify(feature), C.FEATURE_NAME[feature], index)
features[0] = C.FEATURE.ROOT
while features[-1] is None:
@ -256,7 +256,7 @@ def get_device_features(handle, device):
return features
def get_device_firmware(handle, device, features_array=None):
def get_device_firmware(handle, devnumber, features_array=None):
"""Reads a device's firmware info.
:returns: a list of FirmwareInfo tuples, ordered by firmware layer.
@ -264,14 +264,14 @@ def get_device_firmware(handle, device, features_array=None):
def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None):
return FirmwareInfo(level, type, name, version, build, extras)
fw_count = request(handle, device, C.FEATURE.FIRMWARE, features_array=features_array)
fw_count = request(handle, devnumber, C.FEATURE.FIRMWARE, features_array=features_array)
if fw_count:
fw_count = ord(fw_count[:1])
fw = []
for index in range(0, fw_count):
index = _pack('!B', index)
fw_info = request(handle, device, C.FEATURE.FIRMWARE, function=b'\x10', params=index, features_array=features_array)
fw_info = request(handle, devnumber, C.FEATURE.FIRMWARE, function=b'\x10', params=index, features_array=features_array)
if fw_info:
fw_level = ord(fw_info[:1]) & 0x0F
if fw_level == 0 or fw_level == 1:
@ -295,67 +295,67 @@ def get_device_firmware(handle, device, features_array=None):
fw_info = _makeFirmwareInfo(level=fw_level, type=C.FIRMWARE_TYPE[-1])
fw.append(fw_info)
_l.log(_LOG_LEVEL, "(%d:%d) firmware %s", handle, device, fw_info)
_l.log(_LOG_LEVEL, "(%d:%d) firmware %s", handle, devnumber, fw_info)
return fw
def get_device_type(handle, device, features_array=None):
def get_device_type(handle, devnumber, features_array=None):
"""Reads a device's type.
:see DEVICE_TYPE:
:returns: a string describing the device type, or ``None`` if the device is
not available or does not support the ``NAME`` feature.
"""
d_type = request(handle, device, C.FEATURE.NAME, function=b'\x20', features_array=features_array)
d_type = request(handle, devnumber, C.FEATURE.NAME, function=b'\x20', features_array=features_array)
if d_type:
d_type = ord(d_type[:1])
_l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, device, d_type, C.DEVICE_TYPE[d_type])
_l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, devnumber, d_type, C.DEVICE_TYPE[d_type])
return C.DEVICE_TYPE[d_type]
def get_device_name(handle, device, features_array=None):
def get_device_name(handle, devnumber, features_array=None):
"""Reads a device's name.
:returns: a string with the device name, or ``None`` if the device is not
available or does not support the ``NAME`` feature.
"""
name_length = request(handle, device, C.FEATURE.NAME, features_array=features_array)
name_length = request(handle, devnumber, C.FEATURE.NAME, features_array=features_array)
if name_length:
name_length = ord(name_length[:1])
d_name = b''
while len(d_name) < name_length:
name_index = _pack('!B', len(d_name))
name_fragment = request(handle, device, C.FEATURE.NAME, function=b'\x10', params=name_index, features_array=features_array)
name_fragment = request(handle, devnumber, C.FEATURE.NAME, function=b'\x10', params=name_index, features_array=features_array)
name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment
d_name = d_name.decode('ascii')
_l.log(_LOG_LEVEL, "(%d,%d) device name %s", handle, device, d_name)
_l.log(_LOG_LEVEL, "(%d,%d) device name %s", handle, devnumber, d_name)
return d_name
def get_device_battery_level(handle, device, features_array=None):
def get_device_battery_level(handle, devnumber, features_array=None):
"""Reads a device's battery level.
:raises FeatureNotSupported: if the device does not support this feature.
"""
battery = request(handle, device, C.FEATURE.BATTERY, features_array=features_array)
battery = request(handle, devnumber, C.FEATURE.BATTERY, features_array=features_array)
if battery:
discharge, dischargeNext, status = _unpack('!BBB', battery[:3])
_l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, C.BATTERY_STATUSE[status])
return (discharge, dischargeNext, C.BATTERY_STATUS[status])
def get_device_keys(handle, device, features_array=None):
count = request(handle, device, C.FEATURE.REPROGRAMMABLE_KEYS, features_array=features_array)
def get_device_keys(handle, devnumber, features_array=None):
count = request(handle, devnumber, C.FEATURE.REPROGRAMMABLE_KEYS, features_array=features_array)
if count:
keys = []
count = ord(count[:1])
for index in range(0, count):
keyindex = _pack('!B', index)
keydata = request(handle, device, C.FEATURE.REPROGRAMMABLE_KEYS, function=b'\x10', params=keyindex, features_array=features_array)
keydata = request(handle, devnumber, C.FEATURE.REPROGRAMMABLE_KEYS, function=b'\x10', params=keyindex, features_array=features_array)
if keydata:
key, key_task, flags = _unpack('!HHB', keydata[:5])
keys.append(ReprogrammableKeyInfo(index, key, C.KEY_NAME[key], key_task, C.KEY_NAME[key_task], flags))

View File

@ -41,10 +41,6 @@ _MAX_REPLY_SIZE = _MAX_CALL_SIZE
"""Default timeout on read (in ms)."""
DEFAULT_TIMEOUT = 1000
"""Maximum number of devices attached to a UR."""
MAX_ATTACHED_DEVICES = 6
#
#
#
@ -57,7 +53,7 @@ def list_receiver_devices():
def try_open(path):
"""Checks if the given device path points to the right UR device.
"""Checks if the given Linux device path points to the right UR device.
:param path: the Linux device path.
@ -128,11 +124,11 @@ def close(handle):
# return _write(handle, device, data)
def write(handle, device, data):
def write(handle, devnumber, data):
"""Writes some data to a certain device.
:param handle: an open UR handle.
:param device: attached device number.
:param devnumber: attached device number.
:param data: data to send, up to 5 bytes.
The first two (required) bytes of data must be the feature index for the
@ -146,16 +142,16 @@ def write(handle, device, data):
data += b'\x00' * (_MIN_CALL_SIZE - 2 - len(data))
elif len(data) > _MIN_CALL_SIZE - 2:
data += b'\x00' * (_MAX_CALL_SIZE - 2 - len(data))
wdata = _pack('!BB', 0x10, device) + data
wdata = _pack('!BB', 0x10, devnumber) + data
_l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, _hexlify(wdata))
_l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, devnumber, _hexlify(wdata))
if len(wdata) < _MIN_CALL_SIZE:
_l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, _hexlify(wdata), len(wdata))
_l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, devnumber, _hexlify(wdata), len(wdata))
if len(wdata) > _MAX_CALL_SIZE:
_l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, _hexlify(wdata), len(wdata))
_l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, devnumber, _hexlify(wdata), len(wdata))
if not _hid.write(handle, wdata):
_l.warn("(%d,%d) write failed, assuming receiver no longer available", handle, device)
_l.warn("(%d,%d) write failed, assuming receiver no longer available", handle, devnumber)
close(handle)
raise E.NoReceiver
@ -168,9 +164,9 @@ def read(handle, timeout=DEFAULT_TIMEOUT):
:param timeout: read timeout on the UR handle.
If any data was read in the given timeout, returns a tuple of
(reply_code, device, message data). The reply code should be ``0x11`` for a
successful feature call, or ``0x10`` to indicate some error, e.g. the device
is no longer available.
(reply_code, devnumber, message data). The reply code is generally ``0x11``
for a successful feature call, or ``0x10`` to indicate some error, e.g. the
device is no longer available.
:raises NoReceiver: if the receiver is no longer available, i.e. has
been physically removed from the machine, or the kernel driver has been
@ -189,21 +185,21 @@ def read(handle, timeout=DEFAULT_TIMEOUT):
if len(data) > _MAX_REPLY_SIZE:
_l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, _hexlify(data), len(data))
code = ord(data[:1])
device = ord(data[1:2])
return code, device, data[2:]
devnumber = ord(data[1:2])
return code, devnumber, data[2:]
_l.log(_LOG_LEVEL, "(%d,*) => r[]", handle)
def request(handle, device, feature_index_function, params=b'', features_array=None):
"""Makes a feature call device and waits for a matching reply.
def request(handle, devnumber, feature_index_function, params=b'', features_array=None):
"""Makes a feature call to a device and waits for a matching reply.
This function will skip all incoming messages and events not related to the
device we're requesting for, or the feature specified in the initial
request; it will also wait for a matching reply indefinitely.
:param handle: an open UR handle.
:param device: attached device number.
:param devnumber: attached device number.
:param feature_index_function: a two-byte string of (feature_index, feature_function).
:param params: parameters for the feature call, 3 to 16 bytes.
:param features_array: optional features array for the device, only used to
@ -212,13 +208,13 @@ def request(handle, device, feature_index_function, params=b'', features_array=N
available.
:raisees FeatureCallError: if the feature call replied with an error.
"""
_l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, _hexlify(feature_index_function), _hexlify(params))
_l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, devnumber, _hexlify(feature_index_function), _hexlify(params))
if len(feature_index_function) != 2:
raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hexlify(feature_index_function))
retries = 5
write(handle, device, feature_index_function + params)
write(handle, devnumber, feature_index_function + params)
while retries > 0:
reply = read(handle)
retries -= 1
@ -227,39 +223,39 @@ def request(handle, device, feature_index_function, params=b'', features_array=N
# keep waiting...
continue
reply_code, reply_device, reply_data = reply
reply_code, reply_devnumber, reply_data = reply
if reply_device != device:
if reply_devnumber != devnumber:
# this message not for the device we're interested in
_l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, reply_device, _hexlify(reply_data))
_l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, devnumber, reply_devnumber, _hexlify(reply_data))
# worst case scenario, this is a reply for a concurrent request
# on this receiver
_unhandled._publish(reply_code, reply_device, reply_data)
_unhandled._publish(reply_code, reply_devnumber, reply_data)
continue
if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:2] == feature_index_function:
# device not present
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, _hexlify(feature_index_function), _hexlify(reply_data))
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, devnumber, _hexlify(feature_index_function), _hexlify(reply_data))
return None
if reply_code == 0x10 and reply_data[:1] == b'\x8F':
# device not present
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, _hexlify(reply_data))
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, devnumber, _hexlify(reply_data))
return None
if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function:
# an error returned from the device
error_code = ord(reply_data[3])
_l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error_code, C.ERROR_NAME[error_code], _hexlify(reply_data))
_l.warn("(%d,%d) request feature call error %d = %s: %s", handle, devnumber, error_code, C.ERROR_NAME[error_code], _hexlify(reply_data))
feature_index = ord(feature_index_function[:1])
feature_function = feature_index_function[1:2]
feature = None if features_array is None else features_array[feature_index]
raise E.FeatureCallError(device, feature, feature_index, feature_function, error_code, reply_data)
raise E.FeatureCallError(devnumber, feature, feature_index, feature_function, error_code, reply_data)
if reply_code == 0x11 and reply_data[:2] == feature_index_function:
# a matching reply
_l.log(_LOG_LEVEL, "(%d,%d) matched reply with feature-index-function [%s]", handle, device, _hexlify(reply_data[2:]))
_l.log(_LOG_LEVEL, "(%d,%d) matched reply with feature-index-function [%s]", handle, devnumber, _hexlify(reply_data[2:]))
return reply_data[2:]
_l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, _hexlify(reply_data[:2]), _hexlify(feature_index_function))
_unhandled._publish(reply_code, reply_device, reply_data)
_l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, devnumber, _hexlify(reply_data[:2]), _hexlify(feature_index_function))
_unhandled._publish(reply_code, reply_devnumber, reply_data)

View File

@ -97,5 +97,9 @@ _ERROR_NAMES = ('Ok', 'Unknown', 'Invalid argument', 'Out of range',
ERROR_NAME = FallbackDict(lambda x: 'Unknown error', list2dict(_ERROR_NAMES))
"""Maximum number of devices that can be attached to a single receiver."""
MAX_ATTACHED_DEVICES = 6
del FallbackDict
del list2dict

View File

@ -15,18 +15,18 @@ class NoReceiver(Exception):
class FeatureNotSupported(Exception):
"""Raised when trying to request a feature not supported by the device."""
def __init__(self, device, feature):
super(FeatureNotSupported, self).__init__(device, feature, C.FEATURE_NAME[feature])
self.device = device
def __init__(self, devnumber, feature):
super(FeatureNotSupported, self).__init__(devnumber, feature, C.FEATURE_NAME[feature])
self.devnumber = devnumber
self.feature = feature
self.feature_name = C.FEATURE_NAME[feature]
class FeatureCallError(Exception):
"""Raised if the device replied to a feature call with an error."""
def __init__(self, device, feature, feature_index, feature_function, error_code, data=None):
super(FeatureCallError, self).__init__(device, feature, feature_index, feature_function, error_code, C.ERROR_NAME[error_code])
self.device = device
def __init__(self, devnumber, feature, feature_index, feature_function, error_code, data=None):
super(FeatureCallError, self).__init__(devnumber, feature, feature_index, feature_function, error_code, C.ERROR_NAME[error_code])
self.devnumber = devnumber
self.feature = feature
self.feature_name = None if feature is None else C.FEATURE_NAME[feature]
self.feature_index = feature_index

View File

@ -23,14 +23,14 @@ _IDLE_SLEEP = 950 # ms
class EventsListener(threading.Thread):
"""Listener thread for events from the Unifying Receiver.
Incoming events (code, device, data) will be delivered to the callback
function. The callback is called in the listener thread, so it should return
as fast as possible.
Incoming events (reply_code, devnumber, data) will be passed to the callback
function. The callback is called in the listener thread, so for best results
it should return as fast as possible.
While this listener is running, you should use the request() method to make
regular UR API calls, otherwise the replies will be captured by the listener
and delivered as events to the callback. As an exception, you can make UR
API calls in the events callback.
regular UR API calls, otherwise the replies may be captured by the listener
and delivered as events to the callback. As an exception, you can make API
calls in the events callback.
"""
def __init__(self, receiver, events_callback):
super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + hex(receiver))

View File

@ -80,7 +80,7 @@ class Test_UR_Base(unittest.TestCase):
devices = []
for device in range(1, 1 + base.MAX_ATTACHED_DEVICES):
for device in range(1, 1 + MAX_ATTACHED_DEVICES):
w = base.write(self.handle, device, b'\x00\x10\x00\x00\xAA')
self.assertIsNone(w, "write should have returned None")
reply = base.read(self.handle, base.DEFAULT_TIMEOUT * 3)

View File

@ -45,7 +45,7 @@ class Test_UR_API(unittest.TestCase):
devices = []
for device in range(1, 1 + api._base.MAX_ATTACHED_DEVICES):
for device in range(1, 1 + MAX_ATTACHED_DEVICES):
ok = api.ping(self.handle, device)
self.assertIsNotNone(ok, "invalid ping reply")
if ok:

View File

@ -7,16 +7,16 @@ import logging
from binascii import hexlify as _hexlify
def _logdebug_hook(reply_code, device, data):
def _logdebug_hook(reply_code, devnumber, data):
"""Default unhandled hook, logs the reply as DEBUG."""
_l = logging.getLogger('logitech.unifying_receiver.unhandled')
_l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, _hexlify(data))
_l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", devnumber, reply_code, _hexlify(data))
"""The function that will be called on unhandled incoming events.
The hook must be a function with the signature: ``_(int, int, str)``, where
the parameters are: (reply code, device number, data).
the parameters are: (reply_code, devnumber, data).
This hook will only be called by the request() function, when it receives
replies that do not match the requested feature call. As such, it is not
@ -30,7 +30,7 @@ The default implementation logs the unhandled reply as DEBUG.
hook = _logdebug_hook
def _publish(reply_code, device, data):
def _publish(reply_code, devnumber, data):
"""Delivers a reply to the unhandled hook, if any."""
if hook is not None:
hook.__call__(reply_code, device, data)
hook.__call__(reply_code, devnumber, data)

3
resources/README Normal file
View File

@ -0,0 +1,3 @@
Battery and weather icons from the Oxygen icon theme.
Lightbulb icon from the GNOME icon theme.
Unifying receiver and Wireless Keyboard K750 icons from Logitech web pages.

BIN
resources/icons/Solaar.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

View File

Before

Width:  |  Height:  |  Size: 17 KiB

After

Width:  |  Height:  |  Size: 17 KiB

View File

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View File

Before

Width:  |  Height:  |  Size: 17 KiB

After

Width:  |  Height:  |  Size: 17 KiB

View File

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 18 KiB

View File

Before

Width:  |  Height:  |  Size: 19 KiB

After

Width:  |  Height:  |  Size: 19 KiB

View File

Before

Width:  |  Height:  |  Size: 21 KiB

After

Width:  |  Height:  |  Size: 21 KiB

View File

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

Before

Width:  |  Height:  |  Size: 21 KiB

After

Width:  |  Height:  |  Size: 21 KiB

View File

Before

Width:  |  Height:  |  Size: 40 KiB

After

Width:  |  Height:  |  Size: 40 KiB

View File

Before

Width:  |  Height:  |  Size: 14 KiB

After

Width:  |  Height:  |  Size: 14 KiB

View File

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View File

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.4 KiB

3
solaar
View File

@ -4,6 +4,7 @@ cd `dirname "$0"`
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD/lib
export PYTHONPATH=$PWD:$PWD/lib
export XDG_DATA_DIRS=$PWD/resources:$XDG_DATA_DIRS
exec python -OO solaar.py "$@"
#exec python -OO -m profile -o $TMPDIR/profile.log solaar.py "$@"
# exec python -OO -m profile -o $TMPDIR/profile.log solaar.py "$@"

View File

@ -7,7 +7,6 @@ __version__ = '0.4'
#
import logging
import os.path
if __name__ == '__main__':
@ -20,8 +19,5 @@ if __name__ == '__main__':
log_level = logging.root.level - 10 * args.verbose
logging.basicConfig(level=log_level if log_level > 0 else 1)
images_path = os.path.join(__file__, '..', 'images')
images_path = os.path.abspath(os.path.normpath(images_path))
import app
app.run(images_path)
app.run()