cleaned-up the app a bit

This commit is contained in:
Daniel Pavel 2012-10-08 14:12:38 +03:00
parent 882332256b
commit 257f74b496
10 changed files with 246 additions and 206 deletions

View File

@ -19,14 +19,10 @@ def _status_updated(watcher, icon, window):
watcher.status_changed.clear()
if icon:
GObject.idle_add(icon.set_tooltip_markup, text)
GObject.idle_add(ui.icon.update, icon, watcher.rstatus, text)
if window:
GObject.idle_add(ui.window.update, window, dict(watcher.devices))
# def _pair_new_device(trigger, watcher):
# pass
GObject.idle_add(ui.window.update, window, watcher.rstatus, dict(watcher.devices))
def run(config):
@ -37,14 +33,8 @@ def run(config):
watcher = WatcherThread(ui.notify.show)
watcher.start()
window = ui.window.create(APP_TITLE, watcher.devices[0], not config.start_hidden, config.close_to_tray)
menu_actions = [('Scan all devices', watcher.full_scan),
# ('Pair new device', _pair_new_device, watcher),
None,
('Quit', Gtk.main_quit)]
tray_icon = ui.icon.create(APP_TITLE, menu_actions, (ui.window.toggle, window))
window = ui.window.create(APP_TITLE, watcher.rstatus, not config.start_hidden, config.close_to_tray)
tray_icon = ui.icon.create(APP_TITLE, (ui.window.toggle, window))
ui_update_thread = threading.Thread(target=_status_updated, name='ui_update', args=(watcher, tray_icon, window))
ui_update_thread.daemon = True

0
app/actions.py Normal file
View File

View File

@ -1,3 +1,3 @@
# pass
from . import (icon, notify, window)
from . import (notify, icon, window)

View File

@ -5,12 +5,12 @@
from gi.repository import Gtk
def _show_icon_menu(icon, button, time, menu):
menu.popup(None, None, icon.position_menu, icon, button, time)
_ICON_OK = 'Solaar'
_ICON_FAIL = _ICON_OK + '-fail'
def create(title, menu_actions, click_action=None):
icon = Gtk.StatusIcon.new_from_icon_name(title)
def create(title, click_action=None):
icon = Gtk.StatusIcon.new_from_icon_name(_ICON_OK)
icon.set_title(title)
icon.set_name(title)
@ -22,20 +22,23 @@ def create(title, menu_actions, click_action=None):
else:
icon.connect('activate', click_action)
if menu_actions:
if type(menu_actions) == list:
menu = Gtk.Menu()
for action in menu_actions:
if action:
item = Gtk.MenuItem(action[0])
args = action[2:] if len(action) > 2 else ()
item.connect('activate', action[1], *args)
else:
item = Gtk.SeparatorMenuItem()
menu.append(item)
menu.show_all()
icon.connect('popup_menu', _show_icon_menu, menu)
else:
icon.connect('popup_menu', menu_actions)
menu = Gtk.Menu()
item = Gtk.MenuItem('Quit')
item.connect('activate', Gtk.main_quit)
menu.append(item)
menu.show_all()
icon.connect('popup_menu',
lambda icon, button, time, menu:
menu.popup(None, None, icon.position_menu, icon, button, time),
menu)
return icon
def update(icon, rstatus, tooltip):
icon.set_tooltip_markup(tooltip)
if rstatus.code < 0:
icon.set_from_icon_name(_ICON_FAIL)
else:
icon.set_from_icon_name(_ICON_OK)

View File

@ -16,7 +16,7 @@ try:
def init(app_title, active=True):
"""Init the notifications system."""
global _app_title, _active
global _app_title
_app_title = app_title
return set_active(active)

View File

@ -12,114 +12,153 @@ _PLACEHOLDER = '~'
_MAX_DEVICES = 6
def update(window, devices):
if not window or not window.get_child():
return
def _update_receiver_box(box, receiver):
icon, vbox = box.get_children()
label, buttons_box = vbox.get_children()
label.set_text(receiver.text or '')
buttons_box.set_visible(receiver.code >= 0)
controls = list(window.get_child().get_children())
first = controls[0].get_child()
icon, label = first.get_children()
rstatus = devices[0]
if rstatus.text:
label.set_markup('<big><b>%s</b></big>\n%s' % (rstatus.name, rstatus.text))
else:
label.set_markup('<big><b>%s</b></big>' % rstatus.name)
def _update_device_box(frame, devstatus):
frame.set_visible(devstatus is not None)
for index in range(1, 1 + _MAX_DEVICES):
devstatus = devices.get(index)
controls[index].set_visible(devstatus is not None)
box = frame.get_child()
icon, expander = box.get_children()
box = controls[index].get_child()
icon, expander = box.get_children()
if devstatus:
if icon.get_name() != devstatus.name:
icon.set_name(devstatus.name)
icon.set_from_icon_name(devstatus.name, _DEVICE_ICON_SIZE)
if devstatus:
if icon.get_name() != devstatus.name:
icon.set_name(devstatus.name)
icon.set_from_icon_name(devstatus.name, _DEVICE_ICON_SIZE)
if devstatus.code < 0:
expander.set_sensitive(False)
expander.set_expanded(False)
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.text))
else:
expander.set_sensitive(True)
ebox = expander.get_child()
texts = []
light_icon = ebox.get_children()[-2]
light_level = getattr(devstatus, 'light_level', None)
light_icon.set_visible(light_level is not None)
if light_level is not None:
texts.append('Light: %d lux' % light_level)
icon_name = 'light_%02d' % (20 * ((light_level + 50) // 100))
light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
light_icon.set_tooltip_text(texts[-1])
battery_icon = ebox.get_children()[-1]
battery_level = getattr(devstatus, 'battery_level', None)
battery_icon.set_sensitive(battery_level is not None)
if battery_level is None:
battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
battery_icon.set_tooltip_text('Battery: unknown')
else:
texts.append('Battery: %d%%' % battery_level)
icon_name = 'battery_%02d' % (20 * ((battery_level + 10) // 20))
battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
battery_icon.set_tooltip_text(texts[-1])
battery_status = getattr(devstatus, 'battery_status', None)
if battery_status is not None:
texts.append(battery_status)
battery_icon.set_tooltip_text(battery_icon.get_tooltip_text() + '\n' + battery_status)
if texts:
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, ', '.join(texts)))
else:
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.text))
if devstatus.code < 0:
expander.set_sensitive(False)
expander.set_expanded(False)
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.text))
else:
icon.set_name(_PLACEHOLDER)
expander.set_label(_PLACEHOLDER)
expander.set_sensitive(True)
ebox = expander.get_child()
texts = []
light_icon = ebox.get_children()[-2]
light_level = getattr(devstatus, 'light_level', None)
light_icon.set_visible(light_level is not None)
if light_level is not None:
texts.append('Light: %d lux' % light_level)
icon_name = 'light_%02d' % (20 * ((light_level + 50) // 100))
light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
light_icon.set_tooltip_text(texts[-1])
battery_icon = ebox.get_children()[-1]
battery_level = getattr(devstatus, 'battery_level', None)
battery_icon.set_sensitive(battery_level is not None)
if battery_level is None:
battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
battery_icon.set_tooltip_text('Battery: unknown')
else:
texts.append('Battery: %d%%' % battery_level)
icon_name = 'battery_%02d' % (20 * ((battery_level + 10) // 20))
battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
battery_icon.set_tooltip_text(texts[-1])
battery_status = getattr(devstatus, 'battery_status', None)
if battery_status is not None:
texts.append(battery_status)
battery_icon.set_tooltip_text(battery_icon.get_tooltip_text() + '\n' + battery_status)
if texts:
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, ', '.join(texts)))
else:
expander.set_label('<big><b>%s</b></big>\n%s' % (devstatus.name, devstatus.text))
else:
icon.set_name(_PLACEHOLDER)
expander.set_label(_PLACEHOLDER)
def _device_box(title):
icon = Gtk.Image.new_from_icon_name(title, _DEVICE_ICON_SIZE)
def update(window, receiver, devices):
if window and window.get_child():
controls = list(window.get_child().get_children())
_update_receiver_box(controls[0], receiver)
for index in range(1, 1 + _MAX_DEVICES):
_update_device_box(controls[index], devices.get(index))
def _receiver_box(rstatus):
box = Gtk.HBox(homogeneous=False, spacing=8)
box.set_border_width(8)
icon = Gtk.Image.new_from_icon_name(rstatus.name, _DEVICE_ICON_SIZE)
icon.set_alignment(0.5, 0)
icon.set_name(title)
icon.set_name(rstatus.name)
box.pack_start(icon, False, False, 0)
vbox = Gtk.VBox(homogeneous=False, spacing=4)
box.pack_start(vbox, True, True, 0)
label = Gtk.Label()
label.set_can_focus(False)
label.set_alignment(0, 0)
vbox.pack_start(label, False, False, 0)
buttons_box = Gtk.HButtonBox()
buttons_box.set_spacing(8)
buttons_box.set_layout(Gtk.ButtonBoxStyle.START)
vbox.pack_start(buttons_box, True, True, 0)
def _action(button, action):
button.set_sensitive(False)
action()
button.set_sensitive(True)
def _add_button(name, icon, action):
button = Gtk.Button(name.split(' ')[0])
button.set_image(Gtk.Image.new_from_icon_name(icon, Gtk.IconSize.BUTTON))
button.set_relief(Gtk.ReliefStyle.HALF)
button.set_tooltip_text(name)
button.set_focus_on_click(False)
if action:
button.connect('clicked', _action, action)
else:
button.set_sensitive(False)
buttons_box.pack_start(button, False, False, 0)
_add_button('Scan for devices', 'reload', rstatus.refresh)
_add_button('Pair new device', 'add', rstatus.pair)
box.show_all()
return box
def _device_box():
icon = Gtk.Image()
icon.set_alignment(0.5, 0)
icon.set_name(_PLACEHOLDER)
box = Gtk.HBox(homogeneous=False, spacing=8)
box.pack_start(icon, False, False, 0)
box.set_border_width(8)
if title == _PLACEHOLDER:
expander = Gtk.Expander()
expander.set_can_focus(False)
expander.set_label(_PLACEHOLDER)
expander.set_use_markup(True)
expander.set_spacing(4)
expander = Gtk.Expander()
expander.set_can_focus(False)
expander.set_label(_PLACEHOLDER)
expander.set_use_markup(True)
expander.set_spacing(4)
ebox = Gtk.HBox(False, 8)
ebox = Gtk.HBox(False, 8)
battery_icon = Gtk.Image.new_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
ebox.pack_end(battery_icon, False, True, 0)
battery_icon = Gtk.Image.new_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
ebox.pack_end(battery_icon, False, True, 0)
light_icon = Gtk.Image.new_from_icon_name('light_unknown', _STATUS_ICON_SIZE)
ebox.pack_end(light_icon, False, True, 0)
light_icon = Gtk.Image.new_from_icon_name('light_unknown', _STATUS_ICON_SIZE)
ebox.pack_end(light_icon, False, True, 0)
expander.add(ebox)
box.pack_start(expander, True, True, 1)
else:
label = Gtk.Label()
label.set_can_focus(False)
label.set_markup('<big><b>%s</b></big>' % title)
label.set_alignment(0, 0)
box.add(label)
expander.add(ebox)
box.pack_start(expander, True, True, 1)
frame = Gtk.Frame()
frame.add(box)
frame.show_all()
frame.set_visible(title != _PLACEHOLDER)
frame.set_visible(False)
return frame
@ -127,29 +166,39 @@ def create(title, rstatus, show=True, close_to_tray=False):
vbox = Gtk.VBox(homogeneous=False, spacing=4)
vbox.set_border_width(4)
vbox.add(_device_box(rstatus.name))
vbox.add(_receiver_box(rstatus))
for i in range(1, 1 + _MAX_DEVICES):
vbox.add(_device_box(_PLACEHOLDER))
vbox.add(_device_box())
vbox.set_visible(True)
window = Gtk.Window()
window.add(vbox)
window.set_title(title)
Gtk.Window.set_default_icon_name('mouse')
window.set_icon_name(title)
window.set_title(title)
window.set_keep_above(True)
# window.set_skip_taskbar_hint(True)
# window.set_skip_pager_hint(True)
window.set_deletable(False)
window.set_resizable(False)
window.set_position(Gtk.WindowPosition.MOUSE)
window.set_type_hint(Gdk.WindowTypeHint.UTILITY)
window.set_wmclass(title, 'status-window')
window.set_role('status-window')
# window.set_skip_taskbar_hint(True)
# window.set_skip_pager_hint(True)
# window.set_wmclass(title, 'status-window')
# window.set_role('status-window')
if close_to_tray:
def _state_event(window, event):
if event.new_window_state & Gdk.WindowState.ICONIFIED:
position = window.get_position()
window.hide()
window.deiconify()
window.move(*position)
return True
window.connect('window-state-event', _state_event)
window.connect('delete-event', lambda w, e: toggle(None, window) or True)
else:
@ -160,14 +209,6 @@ def create(title, rstatus, show=True, close_to_tray=False):
return window
def _state_event(window, event):
if event.new_window_state & Gdk.WindowState.ICONIFIED:
position = window.get_position()
window.hide()
window.deiconify()
window.move(*position)
return True
def toggle(_, window):
if window.get_visible():
position = window.get_position()

View File

@ -5,21 +5,23 @@
import logging
import threading
import time
from binascii import hexlify as _hexlify
from logitech.unifying_receiver import api
from logitech.unifying_receiver.listener import EventsListener
from logitech import devices
_STATUS_TIMEOUT = 34 # seconds
_THREAD_SLEEP = 5 # seconds
_STATUS_TIMEOUT = 31 # seconds
_THREAD_SLEEP = 2 # seconds
_UNIFYING_RECEIVER = 'Unifying Receiver'
_NO_DEVICES = 'No devices attached.'
_NO_RECEIVER = 'Receiver not found.'
_INITIALIZING = 'Initializing...'
_SCANNING = 'Scanning...'
_NO_RECEIVER = 'not found'
_NO_DEVICES = 'No devices found.'
_OKAY = 'Status okay.'
class _DevStatus(api.AttachedDeviceInfo):
@ -44,7 +46,9 @@ class WatcherThread(threading.Thread):
self.rstatus = _DevStatus(0, _UNIFYING_RECEIVER, _UNIFYING_RECEIVER, None, None)
self.rstatus.refresh = self.full_scan
self.devices = {0: self.rstatus}
self.rstatus.pair = None
self.devices = {}
def run(self):
self.active = True
@ -56,16 +60,15 @@ class WatcherThread(threading.Thread):
receiver = api.open()
if receiver:
self._device_status_changed(self.rstatus, (devices.STATUS.CONNECTED, _SCANNING))
self._device_status_changed(self.rstatus, (-10, _SCANNING))
self._update_status_text()
for devinfo in api.list_devices(receiver):
self._new_device(devinfo)
if len(self.devices) > 1:
self._device_status_changed(self.rstatus, (devices.STATUS.CONNECTED, ''))
if self.devices:
self._device_status_changed(self.rstatus, (devices.STATUS.CONNECTED, _OKAY))
else:
self._device_status_changed(self.rstatus, (devices.STATUS.CONNECTED, _NO_DEVICES))
self._update_status_text()
self.listener = EventsListener(receiver, self._events_callback)
self.listener.start()
@ -74,11 +77,11 @@ class WatcherThread(threading.Thread):
elif not self.listener.active:
self.listener = None
self._device_status_changed(self.rstatus, (devices.STATUS.UNAVAILABLE, _NO_RECEIVER))
self.devices = {0: self.rstatus}
self.devices.clear()
if self.active:
update_icon = True
if self.listener and len(self.devices) > 1:
if self.listener and self.devices:
update_icon &= self._check_old_statuses()
if self.active:
@ -86,29 +89,33 @@ class WatcherThread(threading.Thread):
self._update_status_text()
time.sleep(_THREAD_SLEEP)
self.listener.stop()
if self.listener:
api.close(self.listener.receiver)
self.listener = None
def stop(self):
self.active = False
if self.listener:
self.listener.stop()
api.close(self.listener.receiver)
self.join()
def full_scan(self, _=None):
updated = False
def full_scan(self, *args):
if self.active and self.listener:
updated = False
for devnumber in range(1, 1 + api.C.MAX_ATTACHED_DEVICES):
devstatus = self.devices.get(devnumber)
if devstatus:
status = devices.request_status(devstatus, self.listener)
updated |= self._device_status_changed(devstatus, status)
else:
devstatus = self._new_device(devnumber)
updated |= devstatus is not None
for devnumber in range(1, 1 + api.C.MAX_ATTACHED_DEVICES):
devstatus = self.devices.get(devnumber)
if devstatus:
status = devices.request_status(devstatus, self.listener)
updated |= self._device_status_changed(devstatus, status)
else:
devstatus = self._new_device(devnumber)
updated |= devstatus is not None
if updated:
self._update_status_text()
if updated:
self._update_status_text()
def _request_status(self, devstatus):
if devstatus:
if self.listener and devstatus:
status = devices.request_status(devstatus, self.listener)
self._device_status_changed(devstatus, status)
@ -124,8 +131,12 @@ class WatcherThread(threading.Thread):
return updated
def _new_device(self, dev):
if not self.active:
return None
logging.debug("new devstatus from %s", dev)
if type(dev) == int:
dev = api.get_device_info(self.listener.receiver, dev)
dev = self.listener.request(api.get_device_info, dev)
logging.debug("new devstatus from %s", dev)
if dev:
devstatus = _DevStatus(*dev)
@ -135,13 +146,13 @@ class WatcherThread(threading.Thread):
return devstatus
def _events_callback(self, code, devnumber, data):
logging.debug("%s: event %02x %d %s", time.asctime(), code, devnumber, repr(data))
logging.debug("%s: event (%02x %02x [%s])", time.asctime(), code, devnumber, _hexlify(data))
updated = False
if devnumber in self.devices:
devstatus = self.devices[devnumber]
if code == 0x10 and data[0] == 'b\x8F':
if code == 0x10 and data[:1] == b'\x8F':
updated = True
self._device_status_changed(devstatus, devices.STATUS.UNAVAILABLE)
elif code == 0x11:
@ -153,7 +164,7 @@ class WatcherThread(threading.Thread):
self._new_device(devnumber)
updated = True
else:
logging.warn("don't know how to handle event (%d, %d, %s)", code, devnumber, data)
logging.warn("don't know how to handle event (%02x, %02x, [%s])", code, devnumber, _hexlify(data))
if updated:
self._update_status_text()
@ -189,7 +200,7 @@ class WatcherThread(threading.Thread):
devstatus.text = status_text
logging.debug("%s: device '%s' status update %s => %s: %s", time.asctime(), devstatus.name, old_status_code, status_code, status_text)
if status_code == 0 or old_status_code != status_code:
if status_code <= 0 or old_status_code <= 0 or status_code < old_status_code:
self._notify(devstatus.code, devstatus.name, devstatus.text)
return True
@ -201,27 +212,26 @@ class WatcherThread(threading.Thread):
def _update_status_text(self):
last_status_text = self.status_text
if self.rstatus.code < 0:
self.status_text = '<b>' + self.rstatus.name + '</b>: ' + self.rstatus.text
else:
all_statuses = []
for devnumber in range(1, 1 + api.C.MAX_ATTACHED_DEVICES):
if devnumber in self.devices:
devstatus = self.devices[devnumber]
if devstatus.text:
if ' ' in devstatus.text:
all_statuses.append('<b>' + devstatus.name + '</b>')
all_statuses.append(' ' + devstatus.text)
else:
all_statuses.append('<b>' + devstatus.name + '</b>: ' + devstatus.text)
else:
all_statuses.append('<b>' + devstatus.name + '</b>')
all_statuses.append('')
if self.devices:
lines = []
if self.rstatus.code < 0:
lines += (self.rstatus.text, '')
if all_statuses:
self.status_text = '\n'.join(all_statuses).rstrip('\n')
else:
self.status_text = '<b>' + self.rstatus.name + '</b>: ' + _NO_DEVICES
devstatuses = [self.devices[d] for d in range(1, 1 + api.C.MAX_ATTACHED_DEVICES) if d in self.devices]
for devstatus in devstatuses:
if devstatus.text:
if ' ' in devstatus.text:
lines.append('<b>' + devstatus.name + '</b>')
lines.append(' ' + devstatus.text)
else:
lines.append('<b>' + devstatus.name + '</b>: ' + devstatus.text)
else:
lines.append('<b>' + devstatus.name + '</b>')
lines.append('')
self.status_text = '\n'.join(lines).rstrip('\n')
else:
self.status_text = self.rstatus.text
if self.status_text != last_status_text:
self.status_changed.set()

View File

@ -12,7 +12,6 @@ from .common import AttachedDeviceInfo
from .common import ReprogrammableKeyInfo
from . import constants as C
from . import exceptions as E
from . import unhandled as _unhandled
from . import base as _base
@ -23,19 +22,11 @@ _l = logging.getLogger('lur.api')
#
#
def open():
"""Opens the first Logitech UR found attached to the machine.
"""Opens the first Logitech Unifying Receiver found attached to the machine.
:returns: An open file handle for the found receiver, or ``None``.
"""
for rawdevice in _base.list_receiver_devices():
_l.log(_LOG_LEVEL, "checking %s", rawdevice)
receiver = _base.try_open(rawdevice.path)
if receiver:
return receiver
return None
:returns: An open file handle for the found receiver, or ``None``.
"""
open = _base.open
"""Closes a HID device handle."""
@ -85,11 +76,16 @@ def ping(handle, devnumber):
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
"""
reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA')
return reply is not None and reply[2:3] == b'\xAA'
def get_device_protocol(handle, devnumber):
reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA')
if reply is not None and len(reply) > 2 and reply[2:3] == b'\xAA':
return 'HID %d.%d' % (ord(reply[0:1]), ord(reply[1:2]))
def find_device_by_name(handle, device_name):
"""Searches for an attached device by name.

View File

@ -14,7 +14,7 @@ from . import unhandled as _unhandled
import hidapi as _hid
_LOG_LEVEL = 4
_LOG_LEVEL = 5
_l = logging.getLogger('lur.base')
#

View File

@ -12,12 +12,12 @@ from . import exceptions as E
# from . import unhandled as _unhandled
_LOG_LEVEL = 6
_LOG_LEVEL = 5
_l = logging.getLogger('lur.listener')
_READ_EVENT_TIMEOUT = int(_base.DEFAULT_TIMEOUT * 0.1) # ms
_IDLE_SLEEP = int(_base.DEFAULT_TIMEOUT * 0.9) # ms
_READ_EVENT_TIMEOUT = int(_base.DEFAULT_TIMEOUT / 5) # ms
_IDLE_SLEEP = _base.DEFAULT_TIMEOUT / 2 # ms
class EventsListener(threading.Thread):