dropped the watcher thread

now the receiver is looked for and initialized on the application's main
thread
This commit is contained in:
Daniel Pavel 2012-11-01 06:30:29 +02:00
parent e7bb599689
commit a8a72f7ae5
19 changed files with 573 additions and 686 deletions

View File

@ -5,6 +5,7 @@
from logging import getLogger as _Logger from logging import getLogger as _Logger
_l = _Logger('pairing') _l = _Logger('pairing')
from logitech.unifying_receiver import base as _base
state = None state = None
@ -12,12 +13,12 @@ class State(object):
TICK = 300 TICK = 300
PAIR_TIMEOUT = 60 * 1000 / TICK PAIR_TIMEOUT = 60 * 1000 / TICK
def __init__(self, watcher): def __init__(self, listener):
self._watcher = watcher self.listener = listener
self.reset() self.reset()
def device(self, number): def device(self, number):
return self._watcher.receiver.devices.get(number) return self.listener.devices.get(number)
def reset(self): def reset(self):
self.success = None self.success = None
@ -25,14 +26,14 @@ class State(object):
self._countdown = self.PAIR_TIMEOUT self._countdown = self.PAIR_TIMEOUT
def countdown(self, assistant): def countdown(self, assistant):
if self._countdown < 0 or not self.listener:
return False
if self._countdown == self.PAIR_TIMEOUT: if self._countdown == self.PAIR_TIMEOUT:
self.start_scan() self.start_scan()
self._countdown -= 1 self._countdown -= 1
return True return True
if self._countdown < 0:
return False
self._countdown -= 1 self._countdown -= 1
if self._countdown > 0 and self.success is None: if self._countdown > 0 and self.success is None:
return True return True
@ -43,16 +44,16 @@ class State(object):
def start_scan(self): def start_scan(self):
self.reset() self.reset()
self._watcher.receiver.events_filter = self.filter_events self.listener.events_filter = self.filter_events
reply = self._watcher.receiver.request(0xFF, b'\x80\xB2', b'\x01') reply = _base.request(self.listener.handle, 0xFF, b'\x80\xB2', b'\x01')
_l.debug("start scan reply %s", repr(reply)) _l.debug("start scan reply %s", repr(reply))
def stop_scan(self): def stop_scan(self):
if self._countdown >= 0: if self._countdown >= 0:
self._countdown = -1 self._countdown = -1
reply = self._watcher.receiver.request(0xFF, b'\x80\xB2', b'\x02') reply = _base.request(self.listener.handle, 0xFF, b'\x80\xB2', b'\x02')
_l.debug("stop scan reply %s", repr(reply)) _l.debug("stop scan reply %s", repr(reply))
self._watcher.receiver.events_filter = None self.listener.events_filter = None
def filter_events(self, event): def filter_events(self, event):
if event.devnumber == 0xFF: if event.devnumber == 0xFF:
@ -66,16 +67,16 @@ class State(object):
return True return True
return False return False
if event.devnumber in self._watcher.receiver.devices: if event.devnumber in self.listener.receiver.devices:
return False return False
_l.debug("event for new device? %s", event) _l.debug("event for new device? %s", event)
if event.code == 0x10 and event.data[0:2] == b'\x41\x04': if event.code == 0x10 and event.data[0:2] == b'\x41\x04':
self.detected_device = self._watcher.receiver.make_device(event) self.detected_device = self.listener.make_device(event)
return True return True
return True return True
def unpair(self, number): def unpair(self, device):
_l.debug("unpair %d", number) _l.debug("unpair %s", device)
self._watcher.receiver.unpair_device(number) self.listener.unpair_device(device)

View File

@ -3,13 +3,12 @@
# #
from logging import getLogger as _Logger from logging import getLogger as _Logger
from threading import Event as _Event
from struct import pack as _pack from struct import pack as _pack
from logitech.unifying_receiver import base as _base from logitech.unifying_receiver import base as _base
from logitech.unifying_receiver import api as _api from logitech.unifying_receiver import api as _api
from logitech.unifying_receiver import listener as _listener from logitech.unifying_receiver.listener import EventsListener as _EventsListener
from logitech.unifying_receiver.common import FallbackDict as _FallbackDict
from logitech import devices as _devices from logitech import devices as _devices
from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS, NAMES) from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS, NAMES)
@ -17,7 +16,6 @@ from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS, NAMES)
# #
# #
class _FeaturesArray(object): class _FeaturesArray(object):
__slots__ = ('device', 'features', 'supported') __slots__ = ('device', 'features', 'supported')
@ -27,31 +25,26 @@ class _FeaturesArray(object):
self.supported = True self.supported = True
def _check(self): def _check(self):
if not self.supported: if self.supported:
return False if self.features is not None:
return True
if self.features is not None: if self.device.status >= STATUS.CONNECTED:
return True handle = self.device.handle
try:
if self.device.status >= STATUS.CONNECTED: index = _api.get_feature_index(handle, self.device.number, _api.FEATURE.FEATURE_SET)
handle = self.device.receiver.handle except _api._FeatureNotSupported:
try:
index = _api.get_feature_index(handle, self.device.number, _api.FEATURE.FEATURE_SET)
except _api._FeatureNotSupported:
index = None
if index is None:
self.supported = False
else:
count = _base.request(handle, self.device.number, _pack('!B', index) + b'\x00')
if count is None:
self.supported = False self.supported = False
else: else:
count = ord(count[:1]) count = _base.request(handle, self.device.number, _pack('!BB', index, 0x00))
self.features = [None] * (1 + count) if count is None:
self.features[0] = _api.FEATURE.ROOT self.supported = False
self.features[index] = _api.FEATURE.FEATURE_SET else:
return True count = ord(count[:1])
self.features = [None] * (1 + count)
self.features[0] = _api.FEATURE.ROOT
self.features[index] = _api.FEATURE.FEATURE_SET
return True
return False return False
@ -65,7 +58,7 @@ class _FeaturesArray(object):
raise IndexError raise IndexError
if self.features[index] is None: if self.features[index] is None:
fs_index = self.features.index(_api.FEATURE.FEATURE_SET) fs_index = self.features.index(_api.FEATURE.FEATURE_SET)
feature = _base.request(self.device.receiver.handle, self.device.number, _pack('!BB', fs_index, 0x10), _pack('!B', index)) feature = _base.request(self.device.handle, self.device.number, _pack('!BB', fs_index, 0x10), _pack('!B', index))
if feature is not None: if feature is not None:
self.features[index] = feature[:2] self.features[index] = feature[:2]
@ -104,29 +97,34 @@ class _FeaturesArray(object):
def __len__(self): def __len__(self):
return len(self.features) if self._check() else 0 return len(self.features) if self._check() else 0
#
#
#
class DeviceInfo(object): class DeviceInfo(_api.PairedDevice):
"""A device attached to the receiver. """A device attached to the receiver.
""" """
def __init__(self, receiver, number, pair_code, status=STATUS.UNKNOWN): def __init__(self, listener, number, pair_code, status=STATUS.UNKNOWN):
super(DeviceInfo, self).__init__(listener.handle, number)
self.LOG = _Logger("Device[%d]" % number) self.LOG = _Logger("Device[%d]" % number)
self.receiver = receiver self._listener = listener
self.number = number
self._pair_code = pair_code self._pair_code = pair_code
self._serial = None self._serial = None
self._codename = None self._codename = None
self._name = None
self._kind = None
self._firmware = None
self._status = status self._status = status
self.props = {} self.props = {}
self.features = _FeaturesArray(self) self.features = _FeaturesArray(self)
# read them now, otherwise it it temporarily hang the UI
if status >= STATUS.CONNECTED:
n, k, s, f = self.name, self.kind, self.serial, self.firmware
@property @property
def handle(self): def receiver(self):
return self.receiver.handle return self._listener.receiver
@property @property
def status(self): def status(self):
@ -138,7 +136,7 @@ class DeviceInfo(object):
self.LOG.debug("status %d => %d", self._status, new_status) self.LOG.debug("status %d => %d", self._status, new_status)
urgent = new_status < STATUS.CONNECTED or self._status < STATUS.CONNECTED urgent = new_status < STATUS.CONNECTED or self._status < STATUS.CONNECTED
self._status = new_status self._status = new_status
self.receiver._device_changed(self, urgent) self._listener.status_changed_callback(self, urgent)
if new_status < STATUS.CONNECTED: if new_status < STATUS.CONNECTED:
self.props.clear() self.props.clear()
@ -161,13 +159,9 @@ class DeviceInfo(object):
def name(self): def name(self):
if self._name is None: if self._name is None:
if self._status >= STATUS.CONNECTED: if self._status >= STATUS.CONNECTED:
self._name = _api.get_device_name(self.receiver.handle, self.number, self.features) self._name = _api.get_device_name(self.handle, self.number, self.features)
return self._name or self.codename return self._name or self.codename
@property
def device_name(self):
return self.name
@property @property
def kind(self): def kind(self):
if self._kind is None: if self._kind is None:
@ -176,7 +170,7 @@ class DeviceInfo(object):
if codename in NAMES: if codename in NAMES:
self._kind = NAMES[codename][-1] self._kind = NAMES[codename][-1]
else: else:
self._kind = _api.get_device_kind(self.receiver.handle, self.number, self.features) self._kind = _api.get_device_kind(self.handle, self.number, self.features)
return self._kind or '?' return self._kind or '?'
@property @property
@ -185,7 +179,7 @@ class DeviceInfo(object):
# dodgy # dodgy
b = bytearray(self._pair_code) b = bytearray(self._pair_code)
b[0] -= 0x10 b[0] -= 0x10
serial = _base.request(self.receiver.handle, 0xFF, b'\x83\xB5', bytes(b)) serial = _base.request(self.handle, 0xFF, b'\x83\xB5', bytes(b))
if serial: if serial:
self._serial = _base._hex(serial[1:5]) self._serial = _base._hex(serial[1:5])
return self._serial or '?' return self._serial or '?'
@ -193,7 +187,7 @@ class DeviceInfo(object):
@property @property
def codename(self): def codename(self):
if self._codename is None: if self._codename is None:
codename = _base.request(self.receiver.handle, 0xFF, b'\x83\xB5', self._pair_code) codename = _base.request(self.handle, 0xFF, b'\x83\xB5', self._pair_code)
if codename: if codename:
self._codename = codename[2:].rstrip(b'\x00').decode('ascii') self._codename = codename[2:].rstrip(b'\x00').decode('ascii')
return self._codename or '?' return self._codename or '?'
@ -202,19 +196,16 @@ class DeviceInfo(object):
def firmware(self): def firmware(self):
if self._firmware is None: if self._firmware is None:
if self._status >= STATUS.CONNECTED: if self._status >= STATUS.CONNECTED:
self._firmware = _api.get_device_firmware(self.receiver.handle, self.number, self.features) self._firmware = _api.get_device_firmware(self.handle, self.number, self.features)
return self._firmware or () return self._firmware or ()
def ping(self):
return _api.ping(self.receiver.handle, self.number)
def process_event(self, code, data): def process_event(self, code, data):
if code == 0x10 and data[:1] == b'\x8F': if code == 0x10 and data[:1] == b'\x8F':
self.status = STATUS.UNAVAILABLE self.status = STATUS.UNAVAILABLE
return True return True
if code == 0x11: if code == 0x11:
status = _devices.process_event(self, data, self.receiver) status = _devices.process_event(self, data)
if status: if status:
if type(status) == int: if type(status) == int:
self.status = status self.status = status
@ -225,161 +216,110 @@ class DeviceInfo(object):
self.props.update(status[1]) self.props.update(status[1])
if self.status == status[0]: if self.status == status[0]:
if p != self.props: if p != self.props:
self.receiver._device_changed(self) self._listener.status_changed_callback(self)
else: else:
self.status = status[0] self.status = status[0]
return True return True
self.LOG.warn("don't know how to handle status %s", status) self.LOG.warn("don't know how to handle processed event status %s", status)
return False return False
def __hash__(self):
return self.number
def __str__(self): def __str__(self):
return 'DeviceInfo(%d,%s,%d)' % (self.number, self.name, self._status) return 'DeviceInfo(%d,%s,%d)' % (self.number, self._name or '?', self._status)
def __repr__(self):
return '<DeviceInfo(number=%d,name=%s,status=%d)>' % (self.number, self.name, self._status)
# #
# #
# #
class Receiver(_listener.EventsListener): _RECEIVER_STATUS_NAME = _FallbackDict(
lambda x:
'1 device found' if x == STATUS.CONNECTED + 1 else
'%d devices found' if x > STATUS.CONNECTED else
'?',
{
STATUS.UNKNOWN: 'Initializing...',
STATUS.UNAVAILABLE: 'Receiver not found.',
STATUS.BOOTING: 'Scanning...',
STATUS.CONNECTED: 'No devices found.',
}
)
class ReceiverListener(_EventsListener):
"""Keeps the status of a Unifying Receiver. """Keeps the status of a Unifying Receiver.
""" """
NAME = kind = 'Unifying Receiver'
max_devices = _api.MAX_ATTACHED_DEVICES
def __init__(self, path, handle): def __init__(self, receiver, status_changed_callback):
super(Receiver, self).__init__(handle, self._events_handler) super(ReceiverListener, self).__init__(receiver.handle, self._events_handler)
self.path = path self.receiver = receiver
self._status = STATUS.BOOTING self.LOG = _Logger("ReceiverListener(%s)" % receiver.path)
self.status_changed = _Event()
self.status_changed.urgent = False
self.status_changed.reason = None
self.LOG = _Logger("Receiver[%s]" % path)
self.LOG.info("initializing")
self._serial = None
self._firmware = None
self.devices = {}
self.events_filter = None self.events_filter = None
self.events_handler = None self.events_handler = None
if _base.request(handle, 0xFF, b'\x80\x00', b'\x00\x01'): self.status_changed_callback = status_changed_callback or (lambda reason=None, urgent=False: None)
receiver.kind = receiver.name
receiver.devices = {}
receiver.status = STATUS.BOOTING
receiver.status_text = _RECEIVER_STATUS_NAME[STATUS.BOOTING]
if _base.request(receiver.handle, 0xFF, b'\x80\x00', b'\x00\x01'):
self.LOG.info("initialized") self.LOG.info("initialized")
else: else:
self.LOG.warn("initialization failed") self.LOG.warn("initialization failed")
if _base.request(handle, 0xFF, b'\x80\x02', b'\x02'): if _base.request(receiver.handle, 0xFF, b'\x80\x02', b'\x02'):
self.LOG.info("triggered device events") self.LOG.info("triggered device events")
else: else:
self.LOG.warn("failed to trigger device events") self.LOG.warn("failed to trigger device events")
def close(self): def change_status(self, new_status):
"""Closes the receiver's handle. if new_status != self.receiver.status:
self.LOG.debug("status %d => %d", self.receiver.status, new_status)
self.receiver.status = new_status
self.receiver.status_text = _RECEIVER_STATUS_NAME[new_status]
self.status_changed_callback(self.receiver, True)
The receiver can no longer be used in API calls after this. def _device_status_from(self, event):
""" state_code = ord(event.data[2:3]) & 0xF0
self.LOG.info("closing") state = STATUS.UNAVAILABLE if state_code == 0x60 else \
self.stop() STATUS.CONNECTED if state_code == 0xA0 else \
STATUS.CONNECTED if state_code == 0x20 else \
@property STATUS.UNKNOWN
def status(self): if state == STATUS.UNKNOWN:
return self._status self.LOG.warn("don't know how to handle state code 0x%02X: %s", state_code, event)
return state
@status.setter
def status(self, new_status):
if new_status != self._status:
self.LOG.debug("status %d => %d", self._status, new_status)
self._status = new_status
self.status_changed.reason = self
self.status_changed.urgent = True
self.status_changed.set()
@property
def status_text(self):
status = self._status
if status == STATUS.UNKNOWN:
return 'Initializing...'
if status == STATUS.UNAVAILABLE:
return 'Receiver not found.'
if status == STATUS.BOOTING:
return 'Scanning...'
if status == STATUS.CONNECTED:
return 'No devices found.'
if len(self.devices) > 1:
return '%d devices found' % len(self.devices)
return '1 device found'
@property
def device_name(self):
return self.NAME
def count_devices(self):
return _api.count_devices(self._handle)
@property
def serial(self):
if self._serial is None:
if self:
self._serial, self._firmware = _api.get_receiver_info(self._handle)
return self._serial or '?'
@property
def firmware(self):
if self._firmware is None:
if self:
self._serial, self._firmware = _api.get_receiver_info(self._handle)
return self._firmware or ('?', '?')
def _device_changed(self, dev, urgent=False):
self.status_changed.reason = dev
self.status_changed.urgent = urgent
self.status_changed.set()
def _events_handler(self, event): def _events_handler(self, event):
if self.events_filter and self.events_filter(event): if self.events_filter and self.events_filter(event):
return return
if event.code == 0x10 and event.data[0:2] == b'\x41\x04': if event.code == 0x10 and event.data[0:2] == b'\x41\x04':
if event.devnumber in self.devices:
state_code = ord(event.data[2:3]) & 0xF0
state = STATUS.UNAVAILABLE if state_code == 0x60 else \
STATUS.CONNECTED if state_code == 0xA0 else \
STATUS.CONNECTED if state_code == 0x20 else \
None
if state is None:
self.LOG.warn("don't know how to handle status 0x%02X: %s", state_code, event)
else:
self.devices[event.devnumber].status = state
return
dev = self.make_device(event) if event.devnumber in self.receiver.devices:
if dev is None: status = self._device_status_from(event)
self.LOG.warn("failed to make new device from %s", event) if status > STATUS.UNKNOWN:
self.receiver.devices[event.devnumber].status = status
else: else:
self.devices[event.devnumber] = dev dev = self.make_device(event)
self.LOG.info("new device ready %s", dev) if dev is None:
self.status = STATUS.CONNECTED + len(self.devices) self.LOG.warn("failed to make new device from %s", event)
else:
self.receiver.devices[event.devnumber] = dev
self.change_status(STATUS.CONNECTED + len(self.receiver.devices))
return return
if event.devnumber == 0xFF: if event.devnumber == 0xFF:
if event.code == 0xFF and event.data is None: if event.code == 0xFF and event.data is None:
# receiver disconnected # receiver disconnected
self.LOG.info("disconnected") self.LOG.warn("disconnected")
self.devices = {} self.receiver.devices = {}
self.status = STATUS.UNAVAILABLE self.change_status(STATUS.UNAVAILABLE)
return return
elif event.devnumber in self.devices: elif event.devnumber in self.receiver.devices:
dev = self.devices[event.devnumber] dev = self.receiver.devices[event.devnumber]
if dev.process_event(event.code, event.data): if dev.process_event(event.code, event.data):
return return
@ -389,49 +329,50 @@ class Receiver(_listener.EventsListener):
self.LOG.warn("don't know how to handle event %s", event) self.LOG.warn("don't know how to handle event %s", event)
def make_device(self, event): def make_device(self, event):
if event.devnumber < 1 or event.devnumber > self.max_devices: if event.devnumber < 1 or event.devnumber > self.receiver.max_devices:
self.LOG.warn("got event for invalid device number %d: %s", event.devnumber, event) self.LOG.warn("got event for invalid device number %d: %s", event.devnumber, event)
return None return None
state_code = ord(event.data[2:3]) & 0xF0 status = self._device_status_from(event)
state = STATUS.UNAVAILABLE if state_code == 0x60 else \
STATUS.CONNECTED if state_code == 0xA0 else \
STATUS.CONNECTED if state_code == 0x20 else \
None
if state is None:
self.LOG.warn("don't know how to handle device status 0x%02X: %s", state_code, event)
return None
return DeviceInfo(self, event.devnumber, event.data[4:5], state) dev = DeviceInfo(self, event.devnumber, event.data[4:5], status)
self.LOG.info("new device %s", dev)
self.status_changed_callback(dev, True)
return dev
def unpair_device(self, number): def unpair_device(self, device):
if number in self.devices: try:
dev = self.devices[number] del self.receiver[device.number]
reply = _base.request(self._handle, 0xFF, b'\x80\xB2', _pack('!BB', 0x03, number)) except IndexError:
if reply: self.LOG.error("failed to unpair device %s", device)
self.LOG.debug("remove device %s => %s", dev, _base._hex(reply)) return False
del self.devices[number]
self.LOG.warn("unpaired device %s", dev) del self.receiver.devices[device.number]
self.status = STATUS.CONNECTED + len(self.devices) self.LOG.info("unpaired device %s", device)
return True self.change_status(STATUS.CONNECTED + len(self.receiver.devices))
self.LOG.warn("failed to unpair device %s", dev) device.status = STATUS.UNPAIRED
return False return True
def __str__(self): def __str__(self):
return 'Receiver(%s,%X,%d)' % (self.path, self._handle, self._status) return '<ReceiverListener(%s,%d)>' % (self.path, self.receiver.status)
@classmethod @classmethod
def open(self): def open(self, status_changed_callback=None):
"""Opens the first Logitech Unifying Receiver found attached to the machine. receiver = _api.Receiver.open()
if receiver:
rl = ReceiverListener(receiver, status_changed_callback)
rl.start()
return rl
:returns: An open file handle for the found receiver, or ``None``. #
""" #
for rawdevice in _base.list_receiver_devices(): #
_Logger("receiver").debug("checking %s", rawdevice)
handle = _base.try_open(rawdevice.path)
if handle:
receiver = Receiver(rawdevice.path, handle)
receiver.start()
return receiver
return None class _DUMMY_RECEIVER(object):
name = _api.Receiver.name
max_devices = _api.Receiver.max_devices
status = STATUS.UNAVAILABLE
status_text = _RECEIVER_STATUS_NAME[STATUS.UNAVAILABLE]
devices = {}
__bool__ = __nonzero__ = lambda self: False
DUMMY = _DUMMY_RECEIVER()

View File

@ -31,9 +31,9 @@ def _parse_arguments():
args = arg_parser.parse_args() args = arg_parser.parse_args()
import logging import logging
log_level = logging.root.level - 10 * args.verbose log_level = logging.ERROR - 10 * args.verbose
log_format='%(asctime)s %(levelname)8s [%(threadName)s] %(name)s: %(message)s' log_format='%(asctime)s %(levelname)8s [%(threadName)s] %(name)s: %(message)s'
logging.basicConfig(level=max(log_level, 1), format=log_format) logging.basicConfig(level=max(log_level, logging.DEBUG), format=log_format)
return args return args
@ -43,44 +43,64 @@ if __name__ == '__main__':
import ui import ui
# check if the notifications are available # check if the notifications are available and enabled
args.notifications &= args.systray args.notifications &= args.systray
if ui.notify.init(APPNAME): if ui.notify.available and ui.notify.init(APPNAME):
ui.action.toggle_notifications.set_active(args.notifications) ui.action.toggle_notifications.set_active(args.notifications)
else: else:
ui.action.toggle_notifications = None ui.action.toggle_notifications = None
import watcher from receiver import (ReceiverListener, DUMMY)
window = ui.main_window.create(APPNAME, window = ui.main_window.create(APPNAME,
watcher.DUMMY.NAME, DUMMY.name,
watcher.DUMMY.max_devices, DUMMY.max_devices,
args.systray) args.systray)
ui.action.pair.window = window
ui.action.unpair.window = window
if args.systray: if args.systray:
menu_actions = (ui.action.pair, menu_actions = (ui.action.toggle_notifications,
ui.action.toggle_notifications,
ui.action.about) ui.action.about)
icon = ui.status_icon.create(window, menu_actions) icon = ui.status_icon.create(window, menu_actions)
else: else:
icon = None icon = None
window.present() window.present()
w = watcher.Watcher(APPNAME,
lambda r: ui.update(r, icon, window),
ui.notify.show if ui.notify.available else None)
w.start()
import pairing import pairing
pairing.state = pairing.State(w)
from gi.repository import Gtk listener = None
notify_missing = True
def status_changed(reason, urgent=False):
global listener
receiver = DUMMY if listener is None else listener.receiver
ui.update(receiver, icon, window, reason)
if ui.notify.available and reason and urgent:
ui.notify.show(reason or receiver)
def check_for_listener():
global listener, notify_missing
if listener is None:
listener = ReceiverListener.open(status_changed)
if listener is None:
pairing.state = None
if notify_missing:
status_changed(DUMMY, True)
ui.notify.show(DUMMY)
notify_missing = False
else:
# print ("opened receiver", listener, listener.receiver)
pairing.state = pairing.State(listener)
notify_missing = True
status_changed(listener.receiver, True)
return True
from gi.repository import Gtk, GObject
GObject.timeout_add(5000, check_for_listener)
check_for_listener()
Gtk.main() Gtk.main()
w.stop() if listener is not None:
ui.notify.uninit() listener.stop()
import logging ui.notify.uninit()
logging.shutdown()

View File

@ -1,7 +1,7 @@
# pass # pass
APPNAME = 'Solaar' APPNAME = 'Solaar'
APPVERSION = '0.5' APPVERSION = '0.6'
from . import (notify, status_icon, main_window, pair_window, action) from . import (notify, status_icon, main_window, pair_window, action)
@ -50,9 +50,10 @@ def find_children(container, *child_names):
return tuple(result) if count > 1 else result[0] return tuple(result) if count > 1 else result[0]
def update(receiver, icon, window): def update(receiver, icon, window, reason):
GObject.idle_add(action.pair.set_sensitive, receiver.status > 0) assert receiver is not None
assert reason is not None
if window: if window:
GObject.idle_add(main_window.update, window, receiver) GObject.idle_add(main_window.update, window, receiver, reason)
if icon: if icon:
GObject.idle_add(status_icon.update, icon, receiver) GObject.idle_add(status_icon.update, icon, receiver)

View File

@ -62,23 +62,31 @@ quit = _action('exit', 'Quit', Gtk.main_quit)
import pairing import pairing
def _pair_device(action): def _pair_device(action, frame):
action.set_sensitive(False) window = frame.get_toplevel()
pair_dialog = ui.pair_window.create(action, pairing.state)
pair_dialog = ui.pair_window.create( action, pairing.state)
pair_dialog.set_transient_for(window)
pair_dialog.set_modal(True) pair_dialog.set_modal(True)
window.present()
pair_dialog.present() pair_dialog.present()
pair = _action('add', 'Pair new device', _pair_device)
def pair(frame):
return _action('add', 'Pair new device', _pair_device, frame)
def _unpair_device(action): def _unpair_device(action, frame):
dev = pairing.state.device(action.devnumber) window = frame.get_toplevel()
action.devnumber = 0 window.present()
if dev: device = frame._device
qdialog = Gtk.MessageDialog(action.window, 0, qdialog = Gtk.MessageDialog(window, 0,
Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO,
"Unpair device '%s' ?" % dev.name) "Unpair device\n%s ?" % device.name)
choice = qdialog.run() choice = qdialog.run()
qdialog.destroy() qdialog.destroy()
if choice == Gtk.ResponseType.YES: if choice == Gtk.ResponseType.YES:
pairing.state.unpair(dev.number) pairing.state.unpair(device)
unpair = _action('remove', 'Unpair', _unpair_device)
def unpair(frame):
return _action('remove', 'Unpair', _unpair_device, frame)

View File

@ -2,7 +2,7 @@
# #
# #
from gi.repository import (Gtk, Gdk, GObject) from gi.repository import (Gtk, Gdk)
import ui import ui
from logitech.devices.constants import (STATUS, PROPS) from logitech.devices.constants import (STATUS, PROPS)
@ -17,13 +17,27 @@ _PLACEHOLDER = '~'
# #
# #
def _toggle_info_button(label, widget): def _info_text(dev):
toggle = lambda a, w: w.set_visible(a.get_active()) fw_text = '\n'.join(['%-12s\t<tt>%s%s%s</tt>' %
action = ui.action._toggle_action('info', label, toggle, widget) (f.kind, f.name, ' ' if f.name else '', f.version) for f in dev.firmware])
return action.create_tool_item() return ('<small>'
'Serial \t\t<tt>%s</tt>\n'
'%s'
'</small>' % (dev.serial, fw_text))
def _toggle_info(action, label_widget, box_widget, frame):
if action.get_active():
box_widget.set_visible(True)
if not label_widget.get_text():
label_widget.set_markup(_info_text(frame._device))
else:
box_widget.set_visible(False)
def _receiver_box(name): def _make_receiver_box(name):
frame = Gtk.Frame()
frame._device = None
icon = Gtk.Image.new_from_icon_name(name, _SMALL_DEVICE_ICON_SIZE) icon = Gtk.Image.new_from_icon_name(name, _SMALL_DEVICE_ICON_SIZE)
label = Gtk.Label('Initializing...') label = Gtk.Label('Initializing...')
@ -51,22 +65,25 @@ def _receiver_box(name):
info_box.add(info_label) info_box.add(info_label)
info_box.set_shadow_type(Gtk.ShadowType.ETCHED_IN) info_box.set_shadow_type(Gtk.ShadowType.ETCHED_IN)
toolbar.insert(_toggle_info_button('Receiver info', info_box), 0) toggle_info_action = ui.action._toggle_action('info', 'Receiver info', _toggle_info, info_label, info_box, frame)
toolbar.insert(ui.action.pair.create_tool_item(), -1) toolbar.insert(toggle_info_action.create_tool_item(), 0)
toolbar.insert(ui.action.pair(frame).create_tool_item(), -1)
vbox = Gtk.VBox(homogeneous=False, spacing=2) vbox = Gtk.VBox(homogeneous=False, spacing=2)
vbox.set_border_width(4) vbox.set_border_width(4)
vbox.pack_start(hbox, True, True, 0) vbox.pack_start(hbox, True, True, 0)
vbox.pack_start(info_box, True, True, 0) vbox.pack_start(info_box, True, True, 0)
frame = Gtk.Frame()
frame.add(vbox) frame.add(vbox)
frame.show_all() frame.show_all()
info_box.set_visible(False) info_box.set_visible(False)
return frame return frame
def _device_box(index): def _make_device_box(index):
frame = Gtk.Frame()
frame._device = None
icon = Gtk.Image.new_from_icon_name('image-missing', _DEVICE_ICON_SIZE) icon = Gtk.Image.new_from_icon_name('image-missing', _DEVICE_ICON_SIZE)
icon.set_name('icon') icon.set_name('icon')
icon.set_alignment(0.5, 0) icon.set_alignment(0.5, 0)
@ -111,11 +128,9 @@ def _device_box(index):
info_box = Gtk.Frame() info_box = Gtk.Frame()
info_box.add(info_label) info_box.add(info_label)
toolbar.insert(_toggle_info_button('Device info', info_box), 0) toggle_info_action = ui.action._toggle_action('info', 'Device info', _toggle_info, info_label, info_box, frame)
def _set_number(action): toolbar.insert(toggle_info_action.create_tool_item(), 0)
action.devnumber = index toolbar.insert(ui.action.unpair(frame).create_tool_item(), -1)
unpair_action = ui.action.wrap_action(ui.action.unpair, _set_number)
toolbar.insert(unpair_action.create_tool_item(), -1)
vbox = Gtk.VBox(homogeneous=False, spacing=4) vbox = Gtk.VBox(homogeneous=False, spacing=4)
vbox.pack_start(label, True, True, 0) vbox.pack_start(label, True, True, 0)
@ -128,7 +143,6 @@ def _device_box(index):
box.pack_start(vbox, True, True, 0) box.pack_start(vbox, True, True, 0)
box.show_all() box.show_all()
frame = Gtk.Frame()
frame.add(box) frame.add(box)
info_box.set_visible(False) info_box.set_visible(False)
return frame return frame
@ -158,10 +172,10 @@ def create(title, name, max_devices, systray=False):
vbox = Gtk.VBox(homogeneous=False, spacing=4) vbox = Gtk.VBox(homogeneous=False, spacing=4)
vbox.set_border_width(4) vbox.set_border_width(4)
rbox = _receiver_box(name) rbox = _make_receiver_box(name)
vbox.add(rbox) vbox.add(rbox)
for i in range(1, 1 + max_devices): for i in range(1, 1 + max_devices):
dbox = _device_box(i) dbox = _make_device_box(i)
vbox.add(dbox) vbox.add(dbox)
vbox.set_visible(True) vbox.set_visible(True)
@ -187,31 +201,23 @@ def create(title, name, max_devices, systray=False):
# #
# #
def _info_text(dev):
fw_text = '\n'.join(['%-12s\t<tt>%s%s%s</tt>' %
(f.kind, f.name, ' ' if f.name else '', f.version) for f in dev.firmware])
return ('<small>'
'Serial \t\t<tt>%s</tt>\n'
'%s'
'</small>' % (dev.serial, fw_text))
def _update_receiver_box(frame, receiver): def _update_receiver_box(frame, receiver):
label, toolbar, info_label = ui.find_children(frame, 'label', 'toolbar', 'info-label') label, toolbar, info_label = ui.find_children(frame, 'label', 'toolbar', 'info-label')
label.set_text(receiver.status_text or '') label.set_text(receiver.status_text or '')
if receiver.status < STATUS.CONNECTED: if receiver.status < STATUS.CONNECTED:
frame._device = None
toolbar.set_sensitive(False) toolbar.set_sensitive(False)
toolbar.get_children()[0].set_active(False) toolbar.get_children()[0].set_active(False)
info_label.set_text('') info_label.set_text('')
else: else:
toolbar.set_sensitive(True) toolbar.set_sensitive(True)
if not info_label.get_text(): frame._device = receiver
info_label.set_markup(_info_text(receiver))
def _update_device_box(frame, dev): def _update_device_box(frame, dev):
frame._device = dev
icon, label, info_label = ui.find_children(frame, 'icon', 'label', 'info-label') icon, label, info_label = ui.find_children(frame, 'icon', 'label', 'info-label')
if frame.get_name() != dev.name: if frame.get_name() != dev.name:
@ -229,60 +235,59 @@ def _update_device_box(frame, dev):
for c in status_icons[1:-1]: for c in status_icons[1:-1]:
c.set_visible(False) c.set_visible(False)
toolbar.get_children()[0].set_active(False) toolbar.get_children()[0].set_active(False)
return
icon.set_sensitive(True)
label.set_sensitive(True)
status.set_sensitive(True)
if not info_label.get_text():
info_label.set_markup(_info_text(dev))
battery_icon, battery_label = status_icons[0:2]
battery_level = dev.props.get(PROPS.BATTERY_LEVEL)
if battery_level is None:
battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
battery_icon.set_sensitive(False)
battery_label.set_visible(False)
else: else:
icon_name = 'battery_%03d' % (20 * ((battery_level + 10) // 20)) icon.set_sensitive(True)
battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE) label.set_sensitive(True)
battery_icon.set_sensitive(True) status.set_sensitive(True)
battery_label.set_text('%d%%' % battery_level)
battery_label.set_visible(True)
battery_status = dev.props.get(PROPS.BATTERY_STATUS) battery_icon, battery_label = status_icons[0:2]
battery_icon.set_tooltip_text(battery_status or '') battery_level = dev.props.get(PROPS.BATTERY_LEVEL)
if battery_level is None:
battery_icon.set_from_icon_name('battery_unknown', _STATUS_ICON_SIZE)
battery_icon.set_sensitive(False)
battery_label.set_visible(False)
else:
icon_name = 'battery_%03d' % (20 * ((battery_level + 10) // 20))
battery_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
battery_icon.set_sensitive(True)
battery_label.set_text('%d%%' % battery_level)
battery_label.set_visible(True)
light_icon, light_label = status_icons[2:4] battery_status = dev.props.get(PROPS.BATTERY_STATUS)
light_level = dev.props.get(PROPS.LIGHT_LEVEL) battery_icon.set_tooltip_text(battery_status or '')
if light_level is None:
light_icon.set_visible(False)
light_label.set_visible(False)
else:
icon_name = 'light_%03d' % (20 * ((light_level + 50) // 100))
light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
light_icon.set_visible(True)
light_label.set_text('%d lux' % light_level)
light_label.set_visible(True)
for b in toolbar.get_children()[:-1]: light_icon, light_label = status_icons[2:4]
b.set_sensitive(True) light_level = dev.props.get(PROPS.LIGHT_LEVEL)
if light_level is None:
light_icon.set_visible(False)
light_label.set_visible(False)
else:
icon_name = 'light_%03d' % (20 * ((light_level + 50) // 100))
light_icon.set_from_icon_name(icon_name, _STATUS_ICON_SIZE)
light_icon.set_visible(True)
light_label.set_text('%d lux' % light_level)
light_label.set_visible(True)
for b in toolbar.get_children()[:-1]:
b.set_sensitive(True)
frame.set_visible(True) frame.set_visible(True)
def update(window, receiver):
def update(window, receiver, reason):
print ("update", receiver, receiver.status, reason)
window.set_icon_name(ui.appicon(receiver.status)) window.set_icon_name(ui.appicon(receiver.status))
vbox = window.get_child() vbox = window.get_child()
controls = list(vbox.get_children()) controls = list(vbox.get_children())
GObject.idle_add(_update_receiver_box, controls[0], receiver) if reason == receiver:
_update_receiver_box(controls[0], receiver)
for index in range(1, len(controls)): else:
dev = receiver.devices[index] if index in receiver.devices else None frame = controls[reason.number]
frame = controls[index] if reason.status == STATUS.UNPAIRED:
if dev is None:
frame.set_visible(False) frame.set_visible(False)
frame.set_name(_PLACEHOLDER) frame.set_name(_PLACEHOLDER)
frame._device = None
else: else:
GObject.idle_add(_update_device_box, frame, dev) _update_device_box(frame, reason)

View File

@ -50,7 +50,7 @@ try:
def show(dev): def show(dev):
"""Show a notification with title and text.""" """Show a notification with title and text."""
if available and Notify.is_initted(): if available and Notify.is_initted():
summary = dev.device_name summary = dev.name
# if a notification with same name is already visible, reuse it to avoid spamming # if a notification with same name is already visible, reuse it to avoid spamming
n = _notifications.get(summary) n = _notifications.get(summary)

View File

@ -30,15 +30,14 @@ def _device_confirmed(entry, _2, trigger, assistant, page):
return True return True
def _finish(assistant, action): def _finish(assistant):
logging.debug("finish %s", assistant) logging.debug("finish %s", assistant)
assistant.destroy() assistant.destroy()
action.set_sensitive(True)
def _cancel(assistant, action, state): def _cancel(assistant, state):
logging.debug("cancel %s", assistant) logging.debug("cancel %s", assistant)
state.stop_scan() state.stop_scan()
_finish(assistant, action) _finish(assistant)
def _prepare(assistant, page, state): def _prepare(assistant, page, state):
index = assistant.get_current_page() index = assistant.get_current_page()
@ -119,8 +118,8 @@ def create(action, state):
assistant.scan_complete = _scan_complete assistant.scan_complete = _scan_complete
assistant.connect('prepare', _prepare, state) assistant.connect('prepare', _prepare, state)
assistant.connect('cancel', _cancel, action, state) assistant.connect('cancel', _cancel, state)
assistant.connect('close', _finish, action) assistant.connect('close', _finish)
assistant.connect('apply', _finish, action) assistant.connect('apply', _finish)
return assistant return assistant

View File

@ -1,104 +0,0 @@
#
#
#
from threading import Thread
import time
from logging import getLogger as _Logger
from logitech.devices.constants import STATUS
from receiver import Receiver
class _DUMMY_RECEIVER(object):
NAME = Receiver.NAME
device_name = NAME
kind = Receiver.NAME
status = STATUS.UNAVAILABLE
status_text = 'Receiver not found.'
max_devices = Receiver.max_devices
devices = {}
__bool__ = __nonzero__ = lambda self: False
DUMMY = _DUMMY_RECEIVER()
_l = _Logger('watcher')
def _sleep(seconds, granularity, breakout=lambda: False):
slept = 0
while slept < seconds and not breakout():
time.sleep(granularity)
slept += granularity
class Watcher(Thread):
"""Keeps an active receiver object if possible, and updates the UI when
necessary.
"""
def __init__(self, apptitle, update_ui, notify=None):
super(Watcher, self).__init__(group=apptitle, name='Watcher')
self._active = False
self._receiver = DUMMY
self.update_ui = update_ui
self.notify = notify or (lambda d: None)
@property
def receiver(self):
return self._receiver
def run(self):
self._active = True
notify_missing = True
while self._active:
if self._receiver == DUMMY:
r = Receiver.open()
if r is None:
if notify_missing:
_sleep(0.8, 0.4, lambda: not self._active)
notify_missing = False
if self._active:
self.update_ui(DUMMY)
self.notify(DUMMY)
_sleep(4, 0.4, lambda: not self._active)
continue
_l.info("receiver %s ", r)
self._receiver = r
notify_missing = True
self.update_ui(r)
self.notify(r)
if self._active:
if self._receiver:
_l.debug("waiting for status_changed")
sc = self._receiver.status_changed
sc.wait()
if not self._active:
break
sc.clear()
if sc.urgent:
_l.info("status_changed %s", sc.reason)
self.update_ui(self._receiver)
if sc.reason and sc.urgent:
self.notify(sc.reason)
else:
self._receiver = DUMMY
self.update_ui(DUMMY)
self.notify(DUMMY)
if self._receiver:
self._receiver.close()
self._receiver = _DUMMY_RECEIVER
def stop(self):
if self._active:
_l.info("stopping %s", self)
self._active = False
if self._receiver:
# break out of an eventual wait()
self._receiver.status_changed.reason = None
self._receiver.status_changed.set()
self.join()

View File

@ -30,7 +30,7 @@ def _module(device_name):
# #
# #
def default_request_status(devinfo, listener=None): def default_request_status(devinfo):
if FEATURE.BATTERY in devinfo.features: if FEATURE.BATTERY in devinfo.features:
reply = _api.get_device_battery_level(devinfo.handle, devinfo.number, features=devinfo.features) reply = _api.get_device_battery_level(devinfo.handle, devinfo.number, features=devinfo.features)
if reply: if reply:
@ -41,7 +41,7 @@ def default_request_status(devinfo, listener=None):
return STATUS.CONNECTED if reply else STATUS.UNAVAILABLE return STATUS.CONNECTED if reply else STATUS.UNAVAILABLE
def default_process_event(devinfo, data, listener=None): def default_process_event(devinfo, data):
feature_index = ord(data[0:1]) feature_index = ord(data[0:1])
if feature_index >= len(devinfo.features): if feature_index >= len(devinfo.features):
logging.warn("mistery event %s for %s", repr(data), devinfo) logging.warn("mistery event %s for %s", repr(data), devinfo)
@ -72,7 +72,7 @@ def default_process_event(devinfo, data, listener=None):
# ? # ?
def request_status(devinfo, listener=None): def request_status(devinfo):
"""Trigger a status request for a device. """Trigger a status request for a device.
:param devinfo: the device info tuple. :param devinfo: the device info tuple.
@ -81,20 +81,20 @@ def request_status(devinfo, listener=None):
""" """
m = _module(devinfo.name) m = _module(devinfo.name)
if m and 'request_status' in m.__dict__: if m and 'request_status' in m.__dict__:
return m.request_status(devinfo, listener) return m.request_status(devinfo)
return default_request_status(devinfo, listener) return default_request_status(devinfo)
def process_event(devinfo, data, listener=None): def process_event(devinfo, data):
"""Process an event received for a device. """Process an event received for a device.
:param devinfo: the device info tuple. :param devinfo: the device info tuple.
:param data: the event data (event packet sans the first two bytes: reply code and device number) :param data: the event data (event packet sans the first two bytes: reply code and device number)
""" """
default_result = default_process_event(devinfo, data, listener) default_result = default_process_event(devinfo, data)
if default_result is not None: if default_result is not None:
return default_result return default_result
m = _module(devinfo.name) m = _module(devinfo.name)
if m and 'process_event' in m.__dict__: if m and 'process_event' in m.__dict__:
return m.process_event(devinfo, data, listener) return m.process_event(devinfo, data)

View File

@ -5,6 +5,7 @@
STATUS = type('STATUS', (), STATUS = type('STATUS', (),
dict( dict(
UNKNOWN=-9999, UNKNOWN=-9999,
UNPAIRED=-1000,
UNAVAILABLE=-1, UNAVAILABLE=-1,
BOOTING=0, BOOTING=0,
CONNECTED=1, CONNECTED=1,
@ -12,6 +13,7 @@ STATUS = type('STATUS', (),
STATUS_NAME = { STATUS_NAME = {
STATUS.UNKNOWN: '...', STATUS.UNKNOWN: '...',
STATUS.UNPAIRED: 'unpaired',
STATUS.UNAVAILABLE: 'inactive', STATUS.UNAVAILABLE: 'inactive',
STATUS.BOOTING: 'initializing', STATUS.BOOTING: 'initializing',
STATUS.CONNECTED: 'connected', STATUS.CONNECTED: 'connected',

View File

@ -28,7 +28,7 @@ def _charge_status(data, hasLux=False):
} }
def request_status(devinfo, listener=None): def request_status(devinfo):
reply = _api.request(devinfo.handle, devinfo.number, reply = _api.request(devinfo.handle, devinfo.number,
feature=FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', feature=FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01',
features=devinfo.features) features=devinfo.features)
@ -36,7 +36,7 @@ def request_status(devinfo, listener=None):
return STATUS.UNAVAILABLE return STATUS.UNAVAILABLE
def process_event(devinfo, data, listener=None): def process_event(devinfo, data):
if data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': if data[:2] == b'\x09\x00' and data[7:11] == b'GOOD':
# usually sent after the keyboard is turned on or just connected # usually sent after the keyboard is turned on or just connected
return _charge_status(data) return _charge_status(data)
@ -47,4 +47,4 @@ def process_event(devinfo, data, listener=None):
if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD': if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD':
logging.debug("Solar key pressed") logging.debug("Solar key pressed")
return request_status(devinfo, listener) or _charge_status(data) return request_status(devinfo) or _charge_status(data)

View File

@ -1,52 +1,38 @@
#!/usr/bin/env python #!/usr/bin/env python
import logging
logging.basicConfig(level=logging.DEBUG)
from .unifying_receiver import api
from .unifying_receiver.constants import *
def print_receiver(receiver): def print_receiver(receiver):
print ("Unifying Receiver") print (str(receiver))
serial, firmware = api.get_receiver_info(receiver) print (" Serial : %s" % receiver.serial)
for f in receiver.firmware:
print (" Serial : %s" % serial)
for f in firmware:
print (" %-10s: %s" % (f.kind, f.version)) print (" %-10s: %s" % (f.kind, f.version))
print ("--------") print ("--------")
def scan_devices(receiver): def scan_devices(receiver):
print_receiver(receiver) for dev in receiver:
print (str(dev))
print ("Name: %s" % dev.name)
print ("Kind: %s" % dev.kind)
devices = api.list_devices(receiver) firmware = dev.firmware
if not devices:
print ("!! No attached devices found.")
return
for devinfo in devices:
print ("Device [%d] %s (%s)" % (devinfo.number, devinfo.name, devinfo.kind))
# print " Protocol %s" % devinfo.protocol
firmware = api.get_device_firmware(receiver, devinfo.number, features=devinfo.features)
for fw in firmware: for fw in firmware:
print (" %-10s: %s %s" % (fw.kind, fw.name, fw.version)) print (" %-10s: %s %s" % (fw.kind, fw.name, fw.version))
for index in range(0, len(devinfo.features)): all_features = api.get_device_features(dev.handle, dev.number)
feature = devinfo.features[index] for index in range(0, len(all_features)):
feature = all_features[index]
if feature: if feature:
print (" ~ Feature %-20s (%s) at index %02X" % (FEATURE_NAME[feature], api._hex(feature), index)) print (" ~ Feature %-20s (%s) at index %02X" % (FEATURE_NAME[feature], api._hex(feature), index))
if FEATURE.BATTERY in devinfo.features: if FEATURE.BATTERY in all_features:
discharge, dischargeNext, status = api.get_device_battery_level(receiver, devinfo.number, features=devinfo.features) discharge, dischargeNext, status = api.get_device_battery_level(dev.handle, dev.number, features=all_features)
print (" Battery %d charged (next level %d%), status %s" % (discharge, dischargeNext, status)) print (" Battery %d charged (next level %d%), status %s" % (discharge, dischargeNext, status))
if FEATURE.REPROGRAMMABLE_KEYS in devinfo.features: if FEATURE.REPROGRAMMABLE_KEYS in all_features:
keys = api.get_device_keys(receiver, devinfo.number, features=devinfo.features) keys = api.get_device_keys(dev.handle, dev.number, features=all_features)
if keys is not None and keys: if keys is not None and keys:
print (" %d reprogrammable keys found" % len(keys)) print (" %d reprogrammable keys found" % len(keys))
for k in keys: for k in keys:
@ -58,20 +44,22 @@ def scan_devices(receiver):
if __name__ == '__main__': if __name__ == '__main__':
import argparse import argparse
arg_parser = argparse.ArgumentParser() arg_parser = argparse.ArgumentParser(prog='scan')
arg_parser.add_argument('-v', '--verbose', action='count', default=0, arg_parser.add_argument('-v', '--verbose', action='store_true', default=False,
help='log the HID data traffic with the receiver') help='log the HID data traffic')
args = arg_parser.parse_args() args = arg_parser.parse_args()
log_level = logging.root.level - 10 * args.verbose import logging
logging.root.setLevel(log_level if log_level > 0 else 1) logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
for rawdevice in api._base.list_receiver_devices(): from .unifying_receiver import api
receiver = api._base.try_open(rawdevice.path) from .unifying_receiver.constants import *
if receiver:
print ("!! Logitech Unifying Receiver found (%s)." % rawdevice.path) receiver = api.Receiver.open()
scan_devices(receiver) if receiver is None:
api.close(receiver)
break
else:
print ("!! Logitech Unifying Receiver not found.") print ("!! Logitech Unifying Receiver not found.")
else:
print ("!! Found Logitech Unifying Receiver: %s" % receiver)
print_receiver(receiver)
scan_devices(receiver)
receiver.close()

View File

@ -22,19 +22,11 @@ http://6xq.net/git/lars/lshidpp.git/plain/doc/
import logging import logging
log = logging.getLogger('LUR') if logging.root.level > logging.DEBUG:
log.propagate = 0 log = logging.getLogger('LUR')
log.setLevel(logging.DEBUG) log.addHandler(logging.NullHandler())
log.propagate = 0
if logging.root.level < logging.DEBUG:
handler = logging.FileHandler('lur.log', mode='w')
handler.setFormatter(logging.root.handlers[0].formatter)
else:
handler = logging.NullHandler()
log.addHandler(handler)
del handler
del log
del logging del logging

View File

@ -8,7 +8,6 @@ from struct import unpack as _unpack
from . import base as _base from . import base as _base
from .common import (FirmwareInfo as _FirmwareInfo, from .common import (FirmwareInfo as _FirmwareInfo,
AttachedDeviceInfo as _AttachedDeviceInfo,
ReprogrammableKeyInfo as _ReprogrammableKeyInfo) ReprogrammableKeyInfo as _ReprogrammableKeyInfo)
from .constants import (FEATURE, FEATURE_NAME, FEATURE_FLAGS, from .constants import (FEATURE, FEATURE_NAME, FEATURE_FLAGS,
FIRMWARE_KIND, DEVICE_KIND, FIRMWARE_KIND, DEVICE_KIND,
@ -27,44 +26,153 @@ del getLogger
# #
# #
"""Opens the first Logitech Unifying Receiver found attached to the machine. class PairedDevice(object):
def __init__(self, handle, number):
self.handle = handle
self.number = number
:returns: An open file handle for the found receiver, or ``None``. self._name = None
""" self._kind = None
open = _base.open self._firmware = None
self.features = [FEATURE.ROOT]
@property
def name(self):
if self._name is None:
self._name = get_device_name(self.handle, self.number, self.features)
return self._name or '?'
@property
def kind(self):
if self._kind is None:
self._kind = get_device_kind(self.handle, self.number, self.features)
return self._kind or '?'
@property
def firmware(self):
if self._firmware is None:
self._firmware = get_device_firmware(self.handle, self.number, self.features)
return self._firmware or ()
def ping(self):
reply = _base.request(self.handle, self.number, b'\x00\x10', b'\x00\x00\xAA')
return reply is not None and reply[2:3] == b'\xAA'
def __str__(self):
return '<PairedDevice(%X,%d,%s)>' % (self.handle, self.number, self._name or '?')
def __hash__(self):
return self.number
"""Closes a HID device handle.""" class Receiver(object):
close = _base.close name = 'Unifying Receiver'
max_devices = MAX_ATTACHED_DEVICES
def __init__(self, handle, path=None):
self.handle = handle
self.path = path
def get_receiver_info(handle): self._serial = None
serial = None self._firmware = None
reply = _base.request(handle, 0xFF, b'\x83\xB5', b'\x03')
if reply and reply[0:1] == b'\x03':
serial = _hex(reply[1:5])
firmware = [] def close(self):
handle = self.handle
self.handle = 0
return (handle and _base.close(handle))
reply = _base.request(handle, 0xFF, b'\x83\xB5', b'\x02') @property
if reply and reply[0:1] == b'\x02': def serial(self):
fw_version = _hex(reply[1:5]) if self._serial is None and self.handle:
fw_version = '%s.%s.%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8]) serial = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x03')
firmware.append(_FirmwareInfo(0, FIRMWARE_KIND[0], '', fw_version, None)) if serial:
self._serial = _hex(serial[1:5])
return self._serial
reply = _base.request(handle, 0xFF, b'\x81\xF1', b'\x04') @property
if reply and reply[0:1] == b'\x04': def firmware(self):
bl_version = _hex(reply[1:3]) if self._firmware is None and self.handle:
bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4]) firmware = []
firmware.append(_FirmwareInfo(1, FIRMWARE_KIND[1], '', bl_version, None))
return (serial, tuple(firmware)) reply = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x02')
if reply and reply[0:1] == b'\x02':
fw_version = _hex(reply[1:5])
fw_version = '%s.%s.%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8])
firmware.append(_FirmwareInfo(0, FIRMWARE_KIND[0], '', fw_version, None))
reply = _base.request(self.handle, 0xFF, b'\x81\xF1', b'\x04')
if reply and reply[0:1] == b'\x04':
bl_version = _hex(reply[1:3])
bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4])
firmware.append(_FirmwareInfo(1, FIRMWARE_KIND[1], '', bl_version, None))
def count_devices(handle): self._firmware = tuple(firmware)
count = _base.request(handle, 0xFF, b'\x81\x00')
return 0 if count is None else ord(count[1:2])
return self._firmware
def __iter__(self):
if self.handle == 0:
return
for number in range(1, MAX_ATTACHED_DEVICES):
dev = get_device(self.handle, number)
if dev is not None:
yield dev
def __getitem__(self, key):
if type(key) != int:
raise TypeError('key must be an integer')
if self.handle == 0 or key < 0 or key > MAX_ATTACHED_DEVICES:
raise IndexError(key)
return get_device(self.handle, key) if key > 0 else None
def __delitem__(self, key):
if type(key) != int:
raise TypeError('key must be an integer')
if self.handle == 0 or key < 0 or key > MAX_ATTACHED_DEVICES:
raise IndexError(key)
if key > 0:
_log.debug("unpairing device %d", key)
reply = _base.request(self.handle, 0xFF, b'\x80\xB2', _pack('!BB', 0x03, key))
if reply is None or reply[1:2] == b'\x8F':
raise IndexError(key)
def __len__(self):
if self.handle == 0:
return 0
# not really sure about this one...
count = _base.request(self.handle, 0xFF, b'\x81\x00')
return 0 if count is None else ord(count[1:2])
def __contains__(self, dev):
if self.handle == 0:
return False
if type(dev) == int:
return (dev < 1 or dev > MAX_ATTACHED_DEVICES) and ping(self.handle, dev)
return ping(self.handle, dev.number)
def __str__(self):
return '<Receiver(%X,%s)>' % (self.handle, self.path)
def __hash__(self):
return self.handle
__bool__ = __nonzero__ = lambda self: self.handle != 0
@classmethod
def open(self):
"""Opens the first Logitech Unifying Receiver found attached to the machine.
:returns: An open file handle for the found receiver, or ``None``.
"""
for rawdevice in _base.list_receiver_devices():
handle = _base.try_open(rawdevice.path)
if handle:
return Receiver(handle, rawdevice.path)
#
#
#
def request(handle, devnumber, feature, function=b'\x00', params=b'', features=None): def request(handle, devnumber, feature, function=b'\x00', params=b'', features=None):
"""Makes a feature call to the device, and returns the reply data. """Makes a feature call to the device, and returns the reply data.
@ -109,72 +217,22 @@ def request(handle, devnumber, feature, function=b'\x00', params=b'', features=N
def ping(handle, devnumber): def ping(handle, devnumber):
"""Pings a device to check if it is attached to the UR. """
:returns: True if the device is connected to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
""" """
reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA') reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA')
return reply is not None and reply[2:3] == b'\xAA' return reply is not None and reply[2:3] == b'\xAA'
def get_device_protocol(handle, devnumber): def get_device(handle, devnumber, features=None):
reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA') """Gets the complete info for a device (type, features).
if reply is not None and len(reply) > 2 and reply[2:3] == b'\xAA':
return 'HID %d.%d' % (ord(reply[0:1]), ord(reply[1:2]))
:returns: a PairedDevice or ``None``.
def find_device_by_name(handle, name):
"""Searches for an attached device by name.
This function does it the hard way, querying all possible device numbers.
:returns: an AttachedDeviceInfo tuple, or ``None``.
""" """
_log.debug("searching for device '%s'", name) if ping(handle, devnumber):
devinfo = PairedDevice(handle, devnumber)
for devnumber in range(1, 1 + MAX_ATTACHED_DEVICES): # _log.debug("(%d) found device %s", devnumber, devinfo)
features = get_device_features(handle, devnumber) return devinfo
if features:
d_name = get_device_name(handle, devnumber, features)
if d_name == name:
return get_device_info(handle, devnumber, name=d_name, features=features)
def list_devices(handle):
"""List all devices attached to the UR.
This function does it the hard way, querying all possible device numbers.
:returns: a list of AttachedDeviceInfo tuples.
"""
_log.debug("listing all devices")
devices = []
for device in range(1, 1 + MAX_ATTACHED_DEVICES):
features = get_device_features(handle, device)
if features:
devices.append(get_device_info(handle, device, features=features))
return devices
def get_device_info(handle, devnumber, name=None, features=None):
"""Gets the complete info for a device (type, name, features).
:returns: an AttachedDeviceInfo tuple, or ``None``.
"""
if features is None:
features = get_device_features(handle, devnumber)
if features is None:
return None
d_kind = get_device_kind(handle, devnumber, features)
d_name = get_device_name(handle, devnumber, features) if name is None else name
devinfo = _AttachedDeviceInfo(handle, devnumber, d_kind, d_name, features)
_log.debug("(%d) found device %s", devnumber, devinfo)
return devinfo
def get_feature_index(handle, devnumber, feature): def get_feature_index(handle, devnumber, feature):
@ -182,7 +240,7 @@ def get_feature_index(handle, devnumber, feature):
:returns: An int, or ``None`` if the feature is not available. :returns: An int, or ``None`` if the feature is not available.
""" """
_log.debug("(%d) get feature index <%s:%s>", devnumber, _hex(feature), FEATURE_NAME[feature]) # _log.debug("(%d) get feature index <%s:%s>", devnumber, _hex(feature), FEATURE_NAME[feature])
if len(feature) != 2: if len(feature) != 2:
raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature) raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature)
@ -191,13 +249,13 @@ def get_feature_index(handle, devnumber, feature):
if reply: if reply:
feature_index = ord(reply[0:1]) feature_index = ord(reply[0:1])
if feature_index: if feature_index:
feature_flags = ord(reply[1:2]) & 0xE0 # feature_flags = ord(reply[1:2]) & 0xE0
if feature_flags: # if feature_flags:
_log.debug("(%d) feature <%s:%s> has index %d: %s", # _log.debug("(%d) feature <%s:%s> has index %d: %s",
devnumber, _hex(feature), FEATURE_NAME[feature], feature_index, # devnumber, _hex(feature), FEATURE_NAME[feature], feature_index,
','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) # ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k]))
else: # else:
_log.debug("(%d) feature <%s:%s> has index %d", devnumber, _hex(feature), FEATURE_NAME[feature], feature_index) # _log.debug("(%d) feature <%s:%s> has index %d", devnumber, _hex(feature), FEATURE_NAME[feature], feature_index)
# only consider active and supported features? # only consider active and supported features?
# if feature_flags: # if feature_flags:
@ -218,6 +276,8 @@ def _get_feature_index(handle, devnumber, feature, features=None):
index = get_feature_index(handle, devnumber, feature) index = get_feature_index(handle, devnumber, feature)
if index is not None: if index is not None:
if len(features) <= index:
features += [None] * (index + 1 - len(features))
features[index] = feature features[index] = feature
return index return index
@ -228,7 +288,7 @@ def get_device_features(handle, devnumber):
Their position in the array is the index to be used when requesting that Their position in the array is the index to be used when requesting that
feature on the device. feature on the device.
""" """
_log.debug("(%d) get device features", devnumber) # _log.debug("(%d) get device features", devnumber)
# get the index of the FEATURE_SET # get the index of the FEATURE_SET
# FEATURE.ROOT should always be available for all devices # FEATURE.ROOT should always be available for all devices
@ -250,23 +310,23 @@ def get_device_features(handle, devnumber):
return None return None
features_count = ord(features_count[:1]) features_count = ord(features_count[:1])
_log.debug("(%d) found %d features", devnumber, features_count) # _log.debug("(%d) found %d features", devnumber, features_count)
features = [None] * 0x20 features = [None] * 0x20
for index in range(1, 1 + features_count): for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index # for each index, get the feature residing at that index
feature = _base.request(handle, devnumber, fs_index + b'\x10', _pack('!B', index)) feature = _base.request(handle, devnumber, fs_index + b'\x10', _pack('!B', index))
if feature: if feature:
feature_flags = ord(feature[2:3]) & 0xE0 # feature_flags = ord(feature[2:3]) & 0xE0
feature = feature[0:2].upper() feature = feature[0:2].upper()
features[index] = feature features[index] = feature
if feature_flags: # if feature_flags:
_log.debug("(%d) feature <%s:%s> at index %d: %s", # _log.debug("(%d) feature <%s:%s> at index %d: %s",
devnumber, _hex(feature), FEATURE_NAME[feature], index, # devnumber, _hex(feature), FEATURE_NAME[feature], index,
','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k])) # ','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k]))
else: # else:
_log.debug("(%d) feature <%s:%s> at index %d", devnumber, _hex(feature), FEATURE_NAME[feature], index) # _log.debug("(%d) feature <%s:%s> at index %d", devnumber, _hex(feature), FEATURE_NAME[feature], index)
features[0] = FEATURE.ROOT features[0] = FEATURE.ROOT
while features[-1] is None: while features[-1] is None:
@ -309,7 +369,7 @@ def get_device_firmware(handle, devnumber, features=None):
fw_info = _FirmwareInfo(level, FIRMWARE_KIND[-1], '', '', None) fw_info = _FirmwareInfo(level, FIRMWARE_KIND[-1], '', '', None)
fw.append(fw_info) fw.append(fw_info)
_log.debug("(%d) firmware %s", devnumber, fw_info) # _log.debug("(%d) firmware %s", devnumber, fw_info)
return tuple(fw) return tuple(fw)
@ -327,7 +387,7 @@ def get_device_kind(handle, devnumber, features=None):
d_kind = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x20)) d_kind = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x20))
if d_kind: if d_kind:
d_kind = ord(d_kind[:1]) d_kind = ord(d_kind[:1])
_log.debug("(%d) device type %d = %s", devnumber, d_kind, DEVICE_KIND[d_kind]) # _log.debug("(%d) device type %d = %s", devnumber, d_kind, DEVICE_KIND[d_kind])
return DEVICE_KIND[d_kind] return DEVICE_KIND[d_kind]
@ -355,7 +415,7 @@ def get_device_name(handle, devnumber, features=None):
break break
d_name = d_name.decode('ascii') d_name = d_name.decode('ascii')
_log.debug("(%d) device name %s", devnumber, d_name) # _log.debug("(%d) device name %s", devnumber, d_name)
return d_name return d_name

View File

@ -130,12 +130,9 @@ def open():
""" """
for rawdevice in list_receiver_devices(): for rawdevice in list_receiver_devices():
_log.info("checking %s", rawdevice) _log.info("checking %s", rawdevice)
handle = try_open(rawdevice.path)
receiver = try_open(rawdevice.path) if handle:
if receiver: return handle
return receiver
return None
def close(handle): def close(handle):
@ -143,7 +140,7 @@ def close(handle):
if handle: if handle:
try: try:
_hid.close(handle) _hid.close(handle)
_log.info("closed receiver handle %X", handle) # _log.info("closed receiver handle %X", handle)
return True return True
except: except:
_log.exception("closing receiver handle %X", handle) _log.exception("closing receiver handle %X", handle)
@ -239,7 +236,7 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None
if type(params) == int: if type(params) == int:
params = _pack('!B', params) params = _pack('!B', params)
_log.debug("(%d) request {%s} params [%s]", devnumber, _hex(feature_index_function), _hex(params)) # _log.debug("(%d) request {%s} params [%s]", devnumber, _hex(feature_index_function), _hex(params))
if len(feature_index_function) != 2: if len(feature_index_function) != 2:
raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hex(feature_index_function)) raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % _hex(feature_index_function))

View File

@ -23,14 +23,6 @@ def list2dict(values_list):
return dict(zip(range(0, len(values_list)), values_list)) return dict(zip(range(0, len(values_list)), values_list))
"""Tuple returned by list_devices and find_device_by_name."""
AttachedDeviceInfo = namedtuple('AttachedDeviceInfo', [
'handle',
'number',
'kind',
'name',
'features'])
"""Firmware information.""" """Firmware information."""
FirmwareInfo = namedtuple('FirmwareInfo', [ FirmwareInfo = namedtuple('FirmwareInfo', [
'level', 'level',

View File

@ -29,7 +29,7 @@ def _event_dispatch(listener, callback):
event = listener._events.get(True, _READ_EVENT_TIMEOUT * 10) event = listener._events.get(True, _READ_EVENT_TIMEOUT * 10)
except: except:
continue continue
_log.debug("delivering event %s", event) # _log.debug("delivering event %s", event)
try: try:
callback(event) callback(event)
except: except:
@ -70,7 +70,6 @@ class EventsListener(_Thread):
self._dispatcher.start() self._dispatcher.start()
while self._active: while self._active:
event = None
try: try:
# _log.debug("read next event") # _log.debug("read next event")
event = _base.read(self._handle, _READ_EVENT_TIMEOUT) event = _base.read(self._handle, _READ_EVENT_TIMEOUT)
@ -79,29 +78,28 @@ class EventsListener(_Thread):
_log.warn("receiver disconnected") _log.warn("receiver disconnected")
self._events.put(_Packet(0xFF, 0xFF, None)) self._events.put(_Packet(0xFF, 0xFF, None))
self._active = False self._active = False
break else:
if event is not None:
matched = False
task = None if self._tasks.empty() else self._tasks.queue[0]
if task and task[-1] is None:
devnumber, data = task[:2]
if event[1] == devnumber:
# _log.debug("matching %s to %d, %s", event, devnumber, repr(data))
if event[0] == 0x11 or (event[0] == 0x10 and devnumber == 0xFF):
matched = (event[2][:2] == data[:2]) or (event[2][:1] == b'\xFF' and event[2][1:3] == data[:2])
elif event[0] == 0x10:
if event[2][:1] == b'\x8F' and event[2][1:3] == data[:2]:
matched = True
if event is not None: if matched:
matched = False # _log.debug("request reply %s", event)
task = None if self._tasks.empty() else self._tasks.queue[0] task[-1] = event
if task and task[0] and task[-1] is None: self._tasks.task_done()
devnumber, data = task[1:3] else:
if event[1] == devnumber: event = _Packet(*event)
# _log.debug("matching %s to %d, %s", event, devnumber, repr(data)) _log.info("queueing event %s", event)
if event[0] == 0x11 or (event[0] == 0x10 and devnumber == 0xFF): self._events.put(event)
matched = (event[2][:2] == data[:2]) or (event[2][:1] == b'\xFF' and event[2][1:3] == data[:2])
elif event[0] == 0x10:
if event[2][:1] == b'\x8F' and event[2][1:3] == data[:2]:
matched = True
if matched:
# _log.debug("request reply %s", event)
task[-1] = event
self._tasks.task_done()
else:
event = _Packet(*event)
_log.info("queueing event %s", event)
self._events.put(event)
_base.request_context = None _base.request_context = None
_base.close(self._handle) _base.close(self._handle)
@ -123,11 +121,10 @@ class EventsListener(_Thread):
def write(self, handle, devnumber, data): def write(self, handle, devnumber, data):
assert handle == self._handle assert handle == self._handle
# _log.debug("write %02X %s", devnumber, _base._hex(data)) # _log.debug("write %02X %s", devnumber, _base._hex(data))
task = [False, devnumber, data, None] task = [devnumber, data, None]
self._tasks.put(task) self._tasks.put(task)
_base.write(self._handle, devnumber, data) _base.write(self._handle, devnumber, data)
task[0] = True # _log.debug("task queued %s", task)
_log.debug("task queued %s", task)
def read(self, handle, timeout=_base.DEFAULT_TIMEOUT): def read(self, handle, timeout=_base.DEFAULT_TIMEOUT):
assert handle == self._handle assert handle == self._handle
@ -135,7 +132,7 @@ class EventsListener(_Thread):
assert not self._tasks.empty() assert not self._tasks.empty()
self._tasks.join() self._tasks.join()
task = self._tasks.get(False) task = self._tasks.get(False)
_log.debug("task ready %s", task) # _log.debug("task ready %s", task)
return task[-1] return task[-1]
def unhandled_hook(self, reply_code, devnumber, data): def unhandled_hook(self, reply_code, devnumber, data):

View File

@ -13,21 +13,21 @@ from ..common import *
class Test_UR_API(unittest.TestCase): class Test_UR_API(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.handle = None cls.receiver = None
cls.device = None cls.device = None
cls.features = None cls.features = None
cls.device_info = None cls.device_info = None
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
if cls.handle: if cls.receiver:
api.close(cls.handle) cls.receiver.close()
cls.device = None cls.device = None
cls.features = None cls.features = None
cls.device_info = None cls.device_info = None
def _check(self, check_device=True, check_features=False): def _check(self, check_device=True, check_features=False):
if self.handle is None: if self.receiver is None:
self.fail("No receiver found") self.fail("No receiver found")
if check_device and self.device is None: if check_device and self.device is None:
self.fail("Found no devices attached.") self.fail("Found no devices attached.")
@ -35,13 +35,13 @@ class Test_UR_API(unittest.TestCase):
self.fail("no feature set available") self.fail("no feature set available")
def test_00_open_receiver(self): def test_00_open_receiver(self):
Test_UR_API.handle = api.open() Test_UR_API.receiver = api.Receiver.open()
self._check(check_device=False) self._check(check_device=False)
def test_05_ping_device_zero(self): def test_05_ping_device_zero(self):
self._check(check_device=False) self._check(check_device=False)
ok = api.ping(self.handle, 0) ok = api.ping(self.receiver.handle, 0)
self.assertIsNotNone(ok, "invalid ping reply") self.assertIsNotNone(ok, "invalid ping reply")
self.assertFalse(ok, "device zero replied") self.assertFalse(ok, "device zero replied")
@ -50,33 +50,33 @@ class Test_UR_API(unittest.TestCase):
devices = [] devices = []
for device in range(1, 1 + MAX_ATTACHED_DEVICES): for devnumber in range(1, 1 + MAX_ATTACHED_DEVICES):
ok = api.ping(self.handle, device) ok = api.ping(self.receiver.handle, devnumber)
self.assertIsNotNone(ok, "invalid ping reply") self.assertIsNotNone(ok, "invalid ping reply")
if ok: if ok:
devices.append(device) devices.append(self.receiver[devnumber])
if devices: if devices:
Test_UR_API.device = devices[0] Test_UR_API.device = devices[0].number
def test_30_get_feature_index(self): def test_30_get_feature_index(self):
self._check() self._check()
fs_index = api.get_feature_index(self.handle, self.device, FEATURE.FEATURE_SET) fs_index = api.get_feature_index(self.receiver.handle, self.device, FEATURE.FEATURE_SET)
self.assertIsNotNone(fs_index, "feature FEATURE_SET not available") self.assertIsNotNone(fs_index, "feature FEATURE_SET not available")
self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index)) self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index))
def test_31_bad_feature(self): def test_31_bad_feature(self):
self._check() self._check()
reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF') reply = api.request(self.receiver.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF')
self.assertIsNotNone(reply, "invalid reply") self.assertIsNotNone(reply, "invalid reply")
self.assertEqual(reply[:5], b'\x00' * 5, "invalid reply") self.assertEqual(reply[:5], b'\x00' * 5, "invalid reply")
def test_40_get_device_features(self): def test_40_get_device_features(self):
self._check() self._check()
features = api.get_device_features(self.handle, self.device) features = api.get_device_features(self.receiver.handle, self.device)
self.assertIsNotNone(features, "failed to read features array") self.assertIsNotNone(features, "failed to read features array")
self.assertIn(FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available") self.assertIn(FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available")
# cache this to simplify next tests # cache this to simplify next tests
@ -85,7 +85,7 @@ class Test_UR_API(unittest.TestCase):
def test_50_get_device_firmware(self): def test_50_get_device_firmware(self):
self._check(check_features=True) self._check(check_features=True)
d_firmware = api.get_device_firmware(self.handle, self.device, self.features) d_firmware = api.get_device_firmware(self.receiver.handle, self.device, self.features)
self.assertIsNotNone(d_firmware, "failed to get device firmware") self.assertIsNotNone(d_firmware, "failed to get device firmware")
self.assertGreater(len(d_firmware), 0, "device reported no firmware") self.assertGreater(len(d_firmware), 0, "device reported no firmware")
for fw in d_firmware: for fw in d_firmware:
@ -94,30 +94,30 @@ class Test_UR_API(unittest.TestCase):
def test_52_get_device_kind(self): def test_52_get_device_kind(self):
self._check(check_features=True) self._check(check_features=True)
d_kind = api.get_device_kind(self.handle, self.device, self.features) d_kind = api.get_device_kind(self.receiver.handle, self.device, self.features)
self.assertIsNotNone(d_kind, "failed to get device kind") self.assertIsNotNone(d_kind, "failed to get device kind")
self.assertGreater(len(d_kind), 0, "empty device kind") self.assertGreater(len(d_kind), 0, "empty device kind")
def test_55_get_device_name(self): def test_55_get_device_name(self):
self._check(check_features=True) self._check(check_features=True)
d_name = api.get_device_name(self.handle, self.device, self.features) d_name = api.get_device_name(self.receiver.handle, self.device, self.features)
self.assertIsNotNone(d_name, "failed to read device name") self.assertIsNotNone(d_name, "failed to read device name")
self.assertGreater(len(d_name), 0, "empty device name") self.assertGreater(len(d_name), 0, "empty device name")
def test_59_get_device_info(self): def test_59_get_device_info(self):
self._check(check_features=True) self._check(check_features=True)
device_info = api.get_device_info(self.handle, self.device, features=self.features) device_info = api.get_device(self.receiver.handle, self.device, features=self.features)
self.assertIsNotNone(device_info, "failed to read full device info") self.assertIsNotNone(device_info, "failed to read full device info")
self.assertIsInstance(device_info, AttachedDeviceInfo) self.assertIsInstance(device_info, api.PairedDevice)
Test_UR_API.device_info = device_info Test_UR_API.device_info = device_info
def test_60_get_battery_level(self): def test_60_get_battery_level(self):
self._check(check_features=True) self._check(check_features=True)
if FEATURE.BATTERY in self.features: if FEATURE.BATTERY in self.features:
battery = api.get_device_battery_level(self.handle, self.device, self.features) battery = api.get_device_battery_level(self.receiver.handle, self.device, self.features)
self.assertIsNotNone(battery, "failed to read battery level") self.assertIsNotNone(battery, "failed to read battery level")
self.assertIsInstance(battery, tuple, "result not a tuple") self.assertIsInstance(battery, tuple, "result not a tuple")
else: else:
@ -126,21 +126,9 @@ class Test_UR_API(unittest.TestCase):
def test_70_list_devices(self): def test_70_list_devices(self):
self._check(check_device=False) self._check(check_device=False)
all_devices = api.list_devices(self.handle) for dev in self.receiver:
if all_devices: self.assertIsNotNone(dev)
self.assertIsNotNone(self.device) self.assertIsInstance(dev, api.PairedDevice)
for device_info in all_devices:
self.assertIsInstance(device_info, AttachedDeviceInfo)
else:
self.assertIsNone(self.device)
def test_70_find_device_by_name(self):
self._check()
all_devices = api.list_devices(self.handle)
for device_info in all_devices:
device = api.find_device_by_name(self.handle, device_info.name)
self.assertEqual(device, device_info)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()