device: clean up listener and notifications code

This commit is contained in:
Peter F. Patel-Schneider 2024-02-23 11:30:04 -05:00
parent 14f19ceaaf
commit b7afc410ba
2 changed files with 27 additions and 82 deletions

View File

@ -1,6 +1,5 @@
# -*- python-mode -*-
## Copyright (C) 2012-2013 Daniel Pavel ## Copyright (C) 2012-2013 Daniel Pavel
## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/
## ##
## This program is free software; you can redistribute it and/or modify ## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by ## it under the terms of the GNU General Public License as published by
@ -17,29 +16,16 @@
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging import logging
import threading as _threading import queue
import threading
from . import base as _base from . import base, exceptions
from . import exceptions
# from time import time as _timestamp
# for both Python 2 and 3
try:
from Queue import Queue as _Queue
except ImportError:
from queue import Queue as _Queue
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
#
#
#
class _ThreadedHandle: class _ThreadedHandle:
"""A thread-local wrapper with different open handles for each thread. """A thread-local wrapper with different open handles for each thread.
Closing a ThreadedHandle will close all handles. Closing a ThreadedHandle will close all handles.
""" """
@ -53,13 +39,13 @@ class _ThreadedHandle:
self._listener = listener self._listener = listener
self.path = path self.path = path
self._local = _threading.local() self._local = threading.local()
# take over the current handle for the thread doing the replacement # take over the current handle for the thread doing the replacement
self._local.handle = handle self._local.handle = handle
self._handles = [handle] self._handles = [handle]
def _open(self): def _open(self):
handle = _base.open_path(self.path) handle = base.open_path(self.path)
if handle is None: if handle is None:
logger.error("%r failed to open new handle", self) logger.error("%r failed to open new handle", self)
else: else:
@ -76,13 +62,13 @@ class _ThreadedHandle:
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug("%r closing %s", self, handles) logger.debug("%r closing %s", self, handles)
for h in handles: for h in handles:
_base.close(h) base.close(h)
@property @property
def notifications_hook(self): def notifications_hook(self):
if self._listener: if self._listener:
assert isinstance(self._listener, _threading.Thread) assert isinstance(self._listener, threading.Thread)
if _threading.current_thread() == self._listener: if threading.current_thread() == self._listener:
return self._listener._notifications_hook return self._listener._notifications_hook
def __del__(self): def __del__(self):
@ -113,26 +99,15 @@ class _ThreadedHandle:
__nonzero__ = __bool__ __nonzero__ = __bool__
# # How long to wait during a read for the next packet, in seconds.
# # Ideally this should be rather long (10s ?), but the read is blocking and this means that when the thread
# # is signalled to stop, it would take a while for it to acknowledge it.
# Forcibly closing the file handle on another thread does _not_ interrupt the read on Linux systems.
# How long to wait during a read for the next packet, in seconds
# Ideally this should be rather long (10s ?), but the read is blocking
# and this means that when the thread is signalled to stop, it would take
# a while for it to acknowledge it.
# Forcibly closing the file handle on another thread does _not_ interrupt the
# read on Linux systems.
_EVENT_READ_TIMEOUT = 1.0 # in seconds _EVENT_READ_TIMEOUT = 1.0 # in seconds
# After this many reads that did not produce a packet, call the tick() method.
# This only happens if tick_period is enabled (>0) for the Listener instance.
# _IDLE_READS = 1 + int(5 // _EVENT_READ_TIMEOUT) # wait at least 5 seconds between ticks
class EventsListener(threading.Thread):
class EventsListener(_threading.Thread):
"""Listener thread for notifications from the Unifying Receiver. """Listener thread for notifications from the Unifying Receiver.
Incoming packets will be passed to the callback function in sequence. Incoming packets will be passed to the callback function in sequence.
""" """
@ -145,7 +120,7 @@ class EventsListener(_threading.Thread):
self.daemon = True self.daemon = True
self._active = False self._active = False
self.receiver = receiver self.receiver = receiver
self._queued_notifications = _Queue(16) self._queued_notifications = queue.Queue(16)
self._notifications_callback = notifications_callback self._notifications_callback = notifications_callback
def run(self): def run(self):
@ -163,13 +138,13 @@ class EventsListener(_threading.Thread):
while self._active: while self._active:
if self._queued_notifications.empty(): if self._queued_notifications.empty():
try: try:
n = _base.read(self.receiver.handle, _EVENT_READ_TIMEOUT) n = base.read(self.receiver.handle, _EVENT_READ_TIMEOUT)
except exceptions.NoReceiver: except exceptions.NoReceiver:
logger.warning("%s disconnected", self.receiver.name) logger.warning("%s disconnected", self.receiver.name)
self.receiver.close() self.receiver.close()
break break
if n: if n:
n = _base.make_notification(*n) n = base.make_notification(*n)
else: else:
n = self._queued_notifications.get() # deliver any queued notifications n = self._queued_notifications.get() # deliver any queued notifications
if n: if n:
@ -194,15 +169,11 @@ class EventsListener(_threading.Thread):
"""Called right before the thread stops.""" """Called right before the thread stops."""
pass pass
# def tick(self, timestamp):
# """Called about every tick_period seconds."""
# pass
def _notifications_hook(self, n): def _notifications_hook(self, n):
# Only consider unhandled notifications that were sent from this thread, # Only consider unhandled notifications that were sent from this thread,
# i.e. triggered by a callback handling a previous notification. # i.e. triggered by a callback handling a previous notification.
assert _threading.current_thread() == self assert threading.current_thread() == self
if self._active: # and _threading.current_thread() == self: if self._active: # and threading.current_thread() == self:
# if logger.isEnabledFor(logging.DEBUG): # if logger.isEnabledFor(logging.DEBUG):
# logger.debug("queueing unhandled %s", n) # logger.debug("queueing unhandled %s", n)
if not self._queued_notifications.full(): if not self._queued_notifications.full():

View File

@ -1,4 +1,5 @@
## Copyright (C) 2012-2013 Daniel Pavel ## Copyright (C) 2012-2013 Daniel Pavel
## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/
## ##
## This program is free software; you can redistribute it and/or modify ## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by ## it under the terms of the GNU General Public License as published by
@ -39,9 +40,6 @@ logger = logging.getLogger(__name__)
_R = _hidpp10.REGISTERS _R = _hidpp10.REGISTERS
_F = _hidpp20_constants.FEATURE _F = _hidpp20_constants.FEATURE
#
#
#
notification_lock = _threading.Lock() notification_lock = _threading.Lock()
@ -60,11 +58,6 @@ def process(device, notification):
return _process_device_notification(device, status, notification) return _process_device_notification(device, status, notification)
#
#
#
def _process_receiver_notification(receiver, status, n): def _process_receiver_notification(receiver, status, n):
# supposedly only 0x4x notifications arrive for the receiver # supposedly only 0x4x notifications arrive for the receiver
assert n.sub_id & 0x40 == 0x40 assert n.sub_id & 0x40 == 0x40
@ -151,21 +144,14 @@ def _process_receiver_notification(receiver, status, n):
logger.warning("%s: unhandled notification %s", receiver, n) logger.warning("%s: unhandled notification %s", receiver, n)
#
#
#
def _process_device_notification(device, status, n): def _process_device_notification(device, status, n):
# incoming packets with SubId >= 0x80 are supposedly replies from # incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here
# HID++ 1.0 requests, should never get here
assert n.sub_id & 0x80 == 0 assert n.sub_id & 0x80 == 0
if n.sub_id == 00: # no-op feature notification, dispose of it quickly if n.sub_id == 00: # no-op feature notification, dispose of it quickly
return False return False
# Allow the device object to handle the notification using custom # Allow the device object to handle the notification using custom per-device state.
# per-device state.
handling_ret = device.handle_notification(n) handling_ret = device.handle_notification(n)
if handling_ret is not None: if handling_ret is not None:
return handling_ret return handling_ret
@ -179,8 +165,7 @@ def _process_device_notification(device, status, n):
# These notifications are from the device itself, so it must be active # These notifications are from the device itself, so it must be active
device.online = True device.online = True
# At this point, we need to know the device's protocol, otherwise it's # At this point, we need to know the device's protocol, otherwise it's possible to not know how to handle it.
# possible to not know how to handle it.
assert device.protocol is not None assert device.protocol is not None
# some custom battery events for HID++ 1.0 devices # some custom battery events for HID++ 1.0 devices
@ -238,19 +223,11 @@ def _process_hidpp10_custom_notification(device, status, n):
status.set_battery_info(charge, next_charge, status_text, voltage) status.set_battery_info(charge, next_charge, status_text, voltage)
return True return True
if n.sub_id == _R.keyboard_illumination:
# message layout: 10 ix 17("address") <??> <?> <??> <light level 1=off..5=max>
# TODO anything we can do with this?
if logger.isEnabledFor(logging.INFO):
logger.info("illumination event: %s", n)
return True
logger.warning("%s: unrecognized %s", device, n) logger.warning("%s: unrecognized %s", device, n)
def _process_hidpp10_notification(device, status, n): def _process_hidpp10_notification(device, status, n):
# device unpairing if n.sub_id == 0x40: # device unpairing
if n.sub_id == 0x40:
if n.address == 0x02: if n.address == 0x02:
# device un-paired # device un-paired
status.clear() status.clear()
@ -263,8 +240,7 @@ def _process_hidpp10_notification(device, status, n):
logger.warning("%s: disconnection with unknown type %02X: %s", device, n.address, n) logger.warning("%s: disconnection with unknown type %02X: %s", device, n.address, n)
return True return True
# device connection (and disconnection) if n.sub_id == 0x41: # device connection (and disconnection)
if n.sub_id == 0x41:
flags = ord(n.data[:1]) & 0xF0 flags = ord(n.data[:1]) & 0xF0
if n.address == 0x02: # very old 27 MHz protocol if n.address == 0x02: # very old 27 MHz protocol
wpid = "00" + _strhex(n.data[2:3]) wpid = "00" + _strhex(n.data[2:3])
@ -296,12 +272,10 @@ def _process_hidpp10_notification(device, status, n):
if n.sub_id == 0x49: if n.sub_id == 0x49:
# raw input event? just ignore it # raw input event? just ignore it
# if n.address == 0x01, no idea what it is, but they keep on coming # if n.address == 0x01, no idea what it is, but they keep on coming
# if n.address == 0x03, appears to be an actual input event, # if n.address == 0x03, appears to be an actual input event, because they only come when input happents
# because they only come when input happents
return True return True
# power notification if n.sub_id == 0x4B: # power notification
if n.sub_id == 0x4B:
if n.address == 0x01: if n.address == 0x01:
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: device powered on", device) logger.debug("%s: device powered on", device)