device: move changed method from status to Device and Receiver

This commit is contained in:
Peter F. Patel-Schneider 2024-03-10 08:02:49 -04:00
parent 1fe2eab1a4
commit a1418cd834
6 changed files with 50 additions and 54 deletions

View File

@ -25,7 +25,7 @@ from typing import Optional
import hidapi as _hid
import solaar.configuration as _configuration
from . import base, descriptors, exceptions, hidpp10, hidpp10_constants, hidpp20
from . import base, descriptors, exceptions, hidpp10, hidpp10_constants, hidpp20, hidpp20_constants, settings
from .common import ALERT, Battery
from .settings_templates import check_feature_settings as _check_feature_settings
@ -66,7 +66,7 @@ class Device:
if receiver:
assert 0 < number <= 15 # some receivers have devices past their max # of devices
self.number = number # will be None at this point for directly connected devices
self.online = online
self.online = online # is the device online? - gates many atempts to contact the device
self.descriptor = None
self.isDevice = True # some devices act as receiver so we need a property to distinguish them
self.may_unpair = False
@ -95,6 +95,7 @@ class Device:
self.notification_flags = None
self.battery_info = None
self.link_encrypted = None
self._active = None # lags self.online - is used to help determine when to setup devices
self._feature_settings_checked = False
self._gestures_lock = _threading.Lock()
@ -375,7 +376,7 @@ class Device:
# update the leds on the device, if any
_hidpp10.set_3leds(self, info.level, charging=info.charging(), warning=bool(alert))
if hasattr(self, "status"):
self.status.changed(active=True, alert=alert, reason=reason)
self.changed(active=True, alert=alert, reason=reason)
# Retrieve and regularize battery status
def read_battery(self):
@ -408,6 +409,33 @@ class Device:
logger.info("%s: device notifications %s %s", self, "enabled" if enable else "disabled", flag_names)
return flag_bits if ok else None
def changed(self, active=None, alert=ALERT.NONE, reason=None, push=False):
"""The status of the device had changed, so invoke the status callback.
Also push notifications and settings to the device when necessary."""
changed_callback = self.status._changed_callback
if active is not None:
self.online = active
was_active, self._active = self._active, active
if active:
if not was_active:
if self.protocol < 2.0: # Make sure to set notification flags on the device
self.notification_flags = self.enable_connection_notifications()
self.read_battery() # battery information may have changed so try to read it now
# Push settings for new devices when devices request software reconfiguration
# and when devices become active if they don't have wireless device status feature,
if (
was_active is None
or not was_active
or push
and (not self.features or hidpp20_constants.FEATURE.WIRELESS_DEVICE_STATUS not in self.features)
):
if logger.isEnabledFor(logging.INFO):
logger.info("%s pushing device settings %s", self, self.settings)
settings.apply_all_settings(self)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("device %d changed: active=%s %s", self.number, self._active, self.battery_info)
changed_callback(self, alert, reason)
def add_notification_handler(self, id: str, fn):
"""Adds the notification handling callback `fn` to this device under name `id`.
If a callback has already been registered under this name, it's replaced with

View File

@ -133,7 +133,7 @@ class EventsListener(threading.Thread):
if self.receiver.isDevice: # ping (wired or BT) devices to see if they are really online
if self.receiver.ping():
self.receiver.status.changed(True, reason="initialization")
self.receiver.changed(active=True, reason="initialization")
while self._active:
if self._queued_notifications.empty():

View File

@ -76,7 +76,7 @@ def _process_receiver_notification(receiver, status, n):
receiver.pairing.error = error_string = _hidpp10_constants.PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
status.changed(reason=reason)
receiver.changed(reason=reason)
return True
elif n.sub_id == _R.discovery_status_notification: # Bolt pairing
@ -94,7 +94,7 @@ def _process_receiver_notification(receiver, status, n):
if discover_error:
receiver.pairing.error = discover_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
status.changed(reason=reason)
receiver.changed(reason=reason)
return True
elif n.sub_id == _R.device_discovery_notification: # Bolt pairing
@ -134,7 +134,7 @@ def _process_receiver_notification(receiver, status, n):
receiver.pairing.error = error_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
status.changed(reason=reason)
receiver.changed(reason=reason)
return True
elif n.sub_id == _R.passkey_request_notification: # Bolt pairing
@ -209,7 +209,7 @@ def _process_dj_notification(device, status, n):
connected = not n.address & 0x01
if logger.isEnabledFor(logging.INFO):
logger.info("%s: DJ connection: %s %s", device, connected, n)
status.changed(active=connected, alert=_ALERT.NONE, reason=_("connected") if connected else _("disconnected"))
device.changed(active=connected, alert=_ALERT.NONE, reason=_("connected") if connected else _("disconnected"))
return True
logger.warning("%s: unrecognized DJ %s", device, n)
@ -233,10 +233,10 @@ def _process_hidpp10_notification(device, status, n):
if n.address == 0x02:
# device un-paired
device.wpid = None
device.status = None
if device.number in device.receiver:
del device.receiver[device.number]
status.changed(active=False, alert=_ALERT.ALL, reason=_("unpaired"))
device.changed(active=False, alert=_ALERT.ALL, reason=_("unpaired"))
device.status = None
else:
logger.warning("%s: disconnection with unknown type %02X: %s", device, n.address, n)
return True
@ -267,7 +267,7 @@ def _process_hidpp10_notification(device, status, n):
bool(flags & 0x80),
)
device.link_encrypted = link_encrypted
status.changed(active=link_established)
device.changed(active=link_established)
return True
if n.sub_id == 0x49:
@ -281,7 +281,7 @@ def _process_hidpp10_notification(device, status, n):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: device powered on", device)
reason = device.status_string() or _("powered on")
status.changed(active=True, alert=_ALERT.NOTIFICATION, reason=reason)
device.changed(active=True, alert=_ALERT.NOTIFICATION, reason=reason)
else:
logger.warning("%s: unknown %s", device, n)
return True
@ -320,7 +320,7 @@ def _process_feature_notification(device, status, n, feature):
if result:
device.set_battery_info(result[1])
else: # this feature is used to signal device becoming inactive
status.changed(active=False)
device.changed(active=False)
else:
logger.warning("%s: unknown ADC MEASUREMENT %s", device, n)
@ -339,7 +339,7 @@ def _process_feature_notification(device, status, n, feature):
elif n.address == 0x20:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: Light Check button pressed", device)
status.changed(alert=_ALERT.SHOW_WINDOW)
device.changed(alert=_ALERT.SHOW_WINDOW)
# first cancel any reporting
# device.feature_request(_F.SOLAR_DASHBOARD)
# trigger a new report chain
@ -358,7 +358,7 @@ def _process_feature_notification(device, status, n, feature):
reason = "powered on" if n.data[2] == 1 else None
if n.data[1] == 1: # device is asking for software reconfiguration so need to change status
alert = _ALERT.NONE
status.changed(active=True, alert=alert, reason=reason, push=True)
device.changed(active=True, alert=alert, reason=reason, push=True)
else:
logger.warning("%s: unknown WIRELESS %s", device, n)

View File

@ -25,6 +25,7 @@ import hidapi as _hid
from . import base as _base
from . import exceptions, hidpp10, hidpp10_constants
from .common import ALERT
from .device import Device
from .i18n import _, ngettext
@ -102,6 +103,10 @@ class Receiver:
def __del__(self):
self.close()
def changed(self, alert=ALERT.NOTIFICATION, reason=None):
"""The status of the device had changed, so invoke the status callback"""
self.status._changed_callback(self, alert=alert, reason=reason)
@property
def firmware(self):
if self._firmware is None and self.handle:

View File

@ -19,8 +19,6 @@ import logging
from . import hidpp10
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp20_constants as _hidpp20_constants
from . import settings as _settings
from .common import NamedInts
logger = logging.getLogger(__name__)
@ -52,9 +50,6 @@ class ReceiverStatus:
assert changed_callback
self._changed_callback = changed_callback
def changed(self, alert=ALERT.NOTIFICATION, reason=None):
self._changed_callback(self._receiver, alert=alert, reason=reason)
class DeviceStatus:
"""Holds the 'runtime' status of a peripheral
@ -73,35 +68,3 @@ class DeviceStatus:
return bool(self._active)
__nonzero__ = __bool__
def changed(self, active=None, alert=ALERT.NONE, reason=None, push=False):
d = self._device
if active is not None:
d.online = active
was_active, self._active = self._active, active
if active:
if not was_active:
# Make sure to set notification flags on the device, they
# get cleared when the device is turned off (but not when the device
# goes idle, and we can't tell the difference right now).
if d.protocol < 2.0:
self._device.notification_flags = d.enable_connection_notifications()
# battery information may have changed so try to read it now
self._device.read_battery()
# Push settings for new devices when devices request software reconfiguration
# and when devices become active if they don't have wireless device status feature,
if (
was_active is None
or push
or not was_active
and (not d.features or _hidpp20_constants.FEATURE.WIRELESS_DEVICE_STATUS not in d.features)
):
if logger.isEnabledFor(logging.INFO):
logger.info("%s pushing device settings %s", d, d.settings)
_settings.apply_all_settings(d)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("device %d changed: active=%s %s", d.number, self._active, self.battery)
self._changed_callback(d, alert, reason)

View File

@ -291,7 +291,7 @@ def ping_all(resuming=False):
if resuming and hasattr(listener_thread.receiver, "status"):
listener_thread.receiver.status._active = None # ensure that settings are pushed
if listener_thread.receiver.ping():
listener_thread.receiver.status.changed(active=True, push=True)
listener_thread.receiver.changed(active=True, push=True)
listener_thread._status_changed(listener_thread.receiver)
else:
count = listener_thread.receiver.count()
@ -300,7 +300,7 @@ def ping_all(resuming=False):
if resuming and hasattr(dev, "status"):
dev.status._active = None # ensure that settings are pushed
if dev.ping():
dev.status.changed(active=True, push=True)
dev.changed(active=True, push=True)
listener_thread._status_changed(dev)
count -= 1
if not count: