Rename variable to full name notification

Related #2711
This commit is contained in:
MattHag 2025-01-01 19:34:46 +01:00 committed by Peter F. Patel-Schneider
parent 207be464a5
commit 64ac437b7f
1 changed files with 118 additions and 112 deletions

View File

@ -167,25 +167,25 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP
logger.warning("%s: unhandled notification %s", receiver, hidpp_notification)
def _process_device_notification(device, n):
def _process_device_notification(device, notification):
# incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here
assert n.sub_id & 0x80 == 0
assert notification.sub_id & 0x80 == 0
if n.sub_id == Notification.NO_OPERATION:
if notification.sub_id == Notification.NO_OPERATION:
# dispose it
return False
# Allow the device object to handle the notification using custom per-device state.
handling_ret = device.handle_notification(n)
handling_ret = device.handle_notification(notification)
if handling_ret is not None:
return handling_ret
# 0x40 to 0x7F appear to be HID++ 1.0 or DJ notifications
if n.sub_id >= 0x40:
if n.report_id == base.DJ_MESSAGE_ID:
return _process_dj_notification(device, n)
if notification.sub_id >= 0x40:
if notification.report_id == base.DJ_MESSAGE_ID:
return _process_dj_notification(device, notification)
else:
return _process_hidpp10_notification(device, n)
return _process_hidpp10_notification(device, notification)
# These notifications are from the device itself, so it must be active
device.online = True
@ -194,63 +194,63 @@ def _process_device_notification(device, n):
# some custom battery events for HID++ 1.0 devices
if device.protocol < 2.0:
return _process_hidpp10_custom_notification(device, n)
return _process_hidpp10_custom_notification(device, notification)
# assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications
if not device.features:
logger.warning("%s: feature notification but features not set up: %02X %s", device, n.sub_id, n)
logger.warning("%s: feature notification but features not set up: %02X %s", device, notification.sub_id, notification)
return False
try:
feature = device.features.get_feature(n.sub_id)
feature = device.features.get_feature(notification.sub_id)
except IndexError:
logger.warning("%s: notification from invalid feature index %02X: %s", device, n.sub_id, n)
logger.warning("%s: notification from invalid feature index %02X: %s", device, notification.sub_id, notification)
return False
return _process_feature_notification(device, n, feature)
return _process_feature_notification(device, notification, feature)
def _process_dj_notification(device, n):
def _process_dj_notification(device, notification):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s (%s) DJ %s", device, device.protocol, n)
logger.debug("%s (%s) DJ %s", device, device.protocol, notification)
if n.sub_id == Notification.CONNECT_DISCONNECT:
if notification.sub_id == Notification.CONNECT_DISCONNECT:
# do all DJ paired notifications also show up as HID++ 1.0 notifications?
if logger.isEnabledFor(logging.INFO):
logger.info("%s: ignoring DJ unpaired: %s", device, n)
logger.info("%s: ignoring DJ unpaired: %s", device, notification)
return True
if n.sub_id == Notification.DJ_PAIRING:
if notification.sub_id == Notification.DJ_PAIRING:
# do all DJ paired notifications also show up as HID++ 1.0 notifications?
if logger.isEnabledFor(logging.INFO):
logger.info("%s: ignoring DJ paired: %s", device, n)
logger.info("%s: ignoring DJ paired: %s", device, notification)
return True
if n.sub_id == Notification.CONNECTED:
connected = not n.address & 0x01
if notification.sub_id == Notification.CONNECTED:
connected = not notification.address & 0x01
if logger.isEnabledFor(logging.INFO):
logger.info("%s: DJ connection: %s %s", device, connected, n)
logger.info("%s: DJ connection: %s %s", device, connected, notification)
device.changed(active=connected, alert=Alert.NONE, reason=_("connected") if connected else _("disconnected"))
return True
logger.warning("%s: unrecognized DJ %s", device, n)
logger.warning("%s: unrecognized DJ %s", device, notification)
def _process_hidpp10_custom_notification(device, n):
def _process_hidpp10_custom_notification(device, notification):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s (%s) custom notification %s", device, device.protocol, n)
logger.debug("%s (%s) custom notification %s", device, device.protocol, notification)
if n.sub_id in (Registers.BATTERY_STATUS, Registers.BATTERY_CHARGE):
assert n.data[-1:] == b"\x00"
data = chr(n.address).encode() + n.data
device.set_battery_info(hidpp10.parse_battery_status(n.sub_id, data))
if notification.sub_id in (Registers.BATTERY_STATUS, Registers.BATTERY_CHARGE):
assert notification.data[-1:] == b"\x00"
data = chr(notification.address).encode() + notification.data
device.set_battery_info(hidpp10.parse_battery_status(notification.sub_id, data))
return True
logger.warning("%s: unrecognized %s", device, n)
logger.warning("%s: unrecognized %s", device, notification)
def _process_hidpp10_notification(device, n):
if n.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing
if n.address == 0x02:
def _process_hidpp10_notification(device, notification):
if notification.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing
if notification.address == 0x02:
# device un-paired
device.wpid = None
if device.number in device.receiver:
@ -258,21 +258,23 @@ def _process_hidpp10_notification(device, n):
device.changed(active=False, alert=Alert.ALL, reason=_("unpaired"))
## device.status = None
else:
logger.warning("%s: disconnection with unknown type %02X: %s", device, n.address, n)
logger.warning("%s: disconnection with unknown type %02X: %s", device, notification.address, notification)
return True
if n.sub_id == Notification.DJ_PAIRING: # device connection (and disconnection)
flags = ord(n.data[:1]) & 0xF0
if n.address == 0x02: # very old 27 MHz protocol
wpid = "00" + common.strhex(n.data[2:3])
if notification.sub_id == Notification.DJ_PAIRING: # device connection (and disconnection)
flags = ord(notification.data[:1]) & 0xF0
if notification.address == 0x02: # very old 27 MHz protocol
wpid = "00" + common.strhex(notification.data[2:3])
link_established = True
link_encrypted = bool(flags & 0x80)
elif n.address > 0x00: # all other protocols are supposed to be almost the same
wpid = common.strhex(n.data[2:3] + n.data[1:2])
elif notification.address > 0x00: # all other protocols are supposed to be almost the same
wpid = common.strhex(notification.data[2:3] + notification.data[1:2])
link_established = not (flags & 0x40)
link_encrypted = bool(flags & 0x20) or n.address == 0x10 # Bolt protocol always encrypted
link_encrypted = bool(flags & 0x20) or notification.address == 0x10 # Bolt protocol always encrypted
else:
logger.warning("%s: connection notification with unknown protocol %02X: %s", device.number, n.address, n)
logger.warning(
"%s: connection notification with unknown protocol %02X: %s", device.number, notification.address, notification
)
return True
if wpid != device.wpid:
logger.warning("%s wpid mismatch, got %s", device, wpid)
@ -280,7 +282,7 @@ def _process_hidpp10_notification(device, n):
logger.debug(
"%s: protocol %s connection notification: software=%s, encrypted=%s, link=%s, payload=%s",
device,
n.address,
notification.address,
bool(flags & 0x10),
link_encrypted,
link_established,
@ -292,75 +294,79 @@ def _process_hidpp10_notification(device, n):
device.changed(active=link_established)
return True
if n.sub_id == Notification.RAW_INPUT:
if notification.sub_id == Notification.RAW_INPUT:
# raw input event? just ignore it
# if n.address == 0x01, no idea what it is, but they keep on coming
# if n.address == 0x03, appears to be an actual input event, because they only come when input happents
# if notification.address == 0x01, no idea what it is, but they keep on coming
# if notification.address == 0x03, appears to be an actual input event, because they only come when input happents
return True
if n.sub_id == Notification.POWER:
if n.address == 0x01:
if notification.sub_id == Notification.POWER:
if notification.address == 0x01:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: device powered on", device)
reason = device.status_string() or _("powered on")
device.changed(active=True, alert=Alert.NOTIFICATION, reason=reason)
else:
logger.warning("%s: unknown %s", device, n)
logger.warning("%s: unknown %s", device, notification)
return True
logger.warning("%s: unrecognized %s", device, n)
logger.warning("%s: unrecognized %s", device, notification)
def _process_feature_notification(device, n, feature):
def _process_feature_notification(device, notification, feature):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"%s: notification for feature %s, report %s, data %s", device, feature, n.address >> 4, common.strhex(n.data)
"%s: notification for feature %s, report %s, data %s",
device,
feature,
notification.address >> 4,
common.strhex(notification.data),
)
if feature == _F.BATTERY_STATUS:
if n.address == 0x00:
device.set_battery_info(hidpp20.decipher_battery_status(n.data)[1])
elif n.address == 0x10:
if notification.address == 0x00:
device.set_battery_info(hidpp20.decipher_battery_status(notification.data)[1])
elif notification.address == 0x10:
if logger.isEnabledFor(logging.INFO):
logger.info("%s: spurious BATTERY status %s", device, n)
logger.info("%s: spurious BATTERY status %s", device, notification)
else:
logger.warning("%s: unknown BATTERY %s", device, n)
logger.warning("%s: unknown BATTERY %s", device, notification)
elif feature == _F.BATTERY_VOLTAGE:
if n.address == 0x00:
device.set_battery_info(hidpp20.decipher_battery_voltage(n.data)[1])
if notification.address == 0x00:
device.set_battery_info(hidpp20.decipher_battery_voltage(notification.data)[1])
else:
logger.warning("%s: unknown VOLTAGE %s", device, n)
logger.warning("%s: unknown VOLTAGE %s", device, notification)
elif feature == _F.UNIFIED_BATTERY:
if n.address == 0x00:
device.set_battery_info(hidpp20.decipher_battery_unified(n.data)[1])
if notification.address == 0x00:
device.set_battery_info(hidpp20.decipher_battery_unified(notification.data)[1])
else:
logger.warning("%s: unknown UNIFIED BATTERY %s", device, n)
logger.warning("%s: unknown UNIFIED BATTERY %s", device, notification)
elif feature == _F.ADC_MEASUREMENT:
if n.address == 0x00:
result = hidpp20.decipher_adc_measurement(n.data)
if notification.address == 0x00:
result = hidpp20.decipher_adc_measurement(notification.data)
if result:
device.set_battery_info(result[1])
else: # this feature is used to signal device becoming inactive
device.changed(active=False)
else:
logger.warning("%s: unknown ADC MEASUREMENT %s", device, n)
logger.warning("%s: unknown ADC MEASUREMENT %s", device, notification)
elif feature == _F.SOLAR_DASHBOARD:
if n.data[5:9] == b"GOOD":
charge, lux, adc = struct.unpack("!BHH", n.data[:5])
if notification.data[5:9] == b"GOOD":
charge, lux, adc = struct.unpack("!BHH", notification.data[:5])
# guesstimate the battery voltage, emphasis on 'guess'
# status_text = '%1.2fV' % (adc * 2.67793237653 / 0x0672)
status_text = BatteryStatus.DISCHARGING
if n.address == 0x00:
if notification.address == 0x00:
device.set_battery_info(common.Battery(charge, None, status_text, None))
elif n.address == 0x10:
elif notification.address == 0x10:
if lux > 200:
status_text = BatteryStatus.RECHARGING
device.set_battery_info(common.Battery(charge, None, status_text, None, lux))
elif n.address == 0x20:
elif notification.address == 0x20:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: Light Check button pressed", device)
device.changed(alert=Alert.SHOW_WINDOW)
@ -371,72 +377,72 @@ def _process_feature_notification(device, n, feature):
reports_period = 2 # seconds
device.feature_request(_F.SOLAR_DASHBOARD, 0x00, reports_count, reports_period)
else:
logger.warning("%s: unknown SOLAR CHARGE %s", device, n)
logger.warning("%s: unknown SOLAR CHARGE %s", device, notification)
else:
logger.warning("%s: SOLAR CHARGE not GOOD? %s", device, n)
logger.warning("%s: SOLAR CHARGE not GOOD? %s", device, notification)
elif feature == _F.WIRELESS_DEVICE_STATUS:
if n.address == 0x00:
if notification.address == 0x00:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("wireless status: %s", n)
reason = "powered on" if n.data[2] == 1 else None
if n.data[1] == 1: # device is asking for software reconfiguration so need to change status
logger.debug("wireless status: %s", notification)
reason = "powered on" if notification.data[2] == 1 else None
if notification.data[1] == 1: # device is asking for software reconfiguration so need to change status
alert = Alert.NONE
device.changed(active=True, alert=alert, reason=reason, push=True)
else:
logger.warning("%s: unknown WIRELESS %s", device, n)
logger.warning("%s: unknown WIRELESS %s", device, notification)
elif feature == _F.TOUCHMOUSE_RAW_POINTS:
if n.address == 0x00:
if notification.address == 0x00:
if logger.isEnabledFor(logging.INFO):
logger.info("%s: TOUCH MOUSE points %s", device, n)
elif n.address == 0x10:
touch = ord(n.data[:1])
logger.info("%s: TOUCH MOUSE points %s", device, notification)
elif notification.address == 0x10:
touch = ord(notification.data[:1])
button_down = bool(touch & 0x02)
mouse_lifted = bool(touch & 0x01)
if logger.isEnabledFor(logging.INFO):
logger.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", device, button_down, mouse_lifted)
else:
logger.warning("%s: unknown TOUCH MOUSE %s", device, n)
logger.warning("%s: unknown TOUCH MOUSE %s", device, notification)
# TODO: what are REPROG_CONTROLS_V{2,3}?
elif feature == _F.REPROG_CONTROLS:
if n.address == 0x00:
if notification.address == 0x00:
if logger.isEnabledFor(logging.INFO):
logger.info("%s: reprogrammable key: %s", device, n)
logger.info("%s: reprogrammable key: %s", device, notification)
else:
logger.warning("%s: unknown REPROG_CONTROLS %s", device, n)
logger.warning("%s: unknown REPROG_CONTROLS %s", device, notification)
elif feature == _F.BACKLIGHT2:
if n.address == 0x00:
level = struct.unpack("!B", n.data[1:2])[0]
if notification.address == 0x00:
level = struct.unpack("!B", notification.data[1:2])[0]
if device.setting_callback:
device.setting_callback(device, settings_templates.Backlight2Level, [level])
elif feature == _F.REPROG_CONTROLS_V4:
if n.address == 0x00:
if notification.address == 0x00:
if logger.isEnabledFor(logging.DEBUG):
cid1, cid2, cid3, cid4 = struct.unpack("!HHHH", n.data[:8])
cid1, cid2, cid3, cid4 = struct.unpack("!HHHH", notification.data[:8])
logger.debug("%s: diverted controls pressed: 0x%x, 0x%x, 0x%x, 0x%x", device, cid1, cid2, cid3, cid4)
elif n.address == 0x10:
elif notification.address == 0x10:
if logger.isEnabledFor(logging.DEBUG):
dx, dy = struct.unpack("!hh", n.data[:4])
dx, dy = struct.unpack("!hh", notification.data[:4])
logger.debug("%s: rawXY dx=%i dy=%i", device, dx, dy)
elif n.address == 0x20:
elif notification.address == 0x20:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: received analyticsKeyEvents", device)
elif logger.isEnabledFor(logging.INFO):
logger.info("%s: unknown REPROG_CONTROLS_V4 %s", device, n)
logger.info("%s: unknown REPROG_CONTROLS_V4 %s", device, notification)
elif feature == _F.HIRES_WHEEL:
if n.address == 0x00:
if notification.address == 0x00:
if logger.isEnabledFor(logging.INFO):
flags, delta_v = struct.unpack(">bh", n.data[:3])
flags, delta_v = struct.unpack(">bh", notification.data[:3])
high_res = (flags & 0x10) != 0
periods = flags & 0x0F
logger.info("%s: WHEEL: res: %d periods: %d delta V:%-3d", device, high_res, periods, delta_v)
elif n.address == 0x10:
ratchet = n.data[0]
elif notification.address == 0x10:
ratchet = notification.data[0]
if logger.isEnabledFor(logging.INFO):
logger.info("%s: WHEEL: ratchet: %d", device, ratchet)
if ratchet < 2: # don't process messages with unusual ratchet values
@ -444,19 +450,19 @@ def _process_feature_notification(device, n, feature):
device.setting_callback(device, settings_templates.ScrollRatchet, [2 if ratchet else 1])
else:
if logger.isEnabledFor(logging.INFO):
logger.info("%s: unknown WHEEL %s", device, n)
logger.info("%s: unknown WHEEL %s", device, notification)
elif feature == _F.ONBOARD_PROFILES:
if n.address > 0x10:
if notification.address > 0x10:
if logger.isEnabledFor(logging.INFO):
logger.info("%s: unknown ONBOARD PROFILES %s", device, n)
logger.info("%s: unknown ONBOARD PROFILES %s", device, notification)
else:
if n.address == 0x00:
profile_sector = struct.unpack("!H", n.data[:2])[0]
if notification.address == 0x00:
profile_sector = struct.unpack("!H", notification.data[:2])[0]
if profile_sector:
settings_templates.profile_change(device, profile_sector)
elif n.address == 0x10:
resolution_index = struct.unpack("!B", n.data[:1])[0]
elif notification.address == 0x10:
resolution_index = struct.unpack("!B", notification.data[:1])[0]
profile_sector = struct.unpack("!H", device.feature_request(_F.ONBOARD_PROFILES, 0x40)[:2])[0]
if device.setting_callback:
for profile in device.profiles.profiles.values() if device.profiles else []:
@ -467,18 +473,18 @@ def _process_feature_notification(device, n, feature):
break
elif feature == _F.BRIGHTNESS_CONTROL:
if n.address > 0x10:
if notification.address > 0x10:
if logger.isEnabledFor(logging.INFO):
logger.info("%s: unknown BRIGHTNESS CONTROL %s", device, n)
logger.info("%s: unknown BRIGHTNESS CONTROL %s", device, notification)
else:
if n.address == 0x00:
brightness = struct.unpack("!H", n.data[:2])[0]
if notification.address == 0x00:
brightness = struct.unpack("!H", notification.data[:2])[0]
device.setting_callback(device, settings_templates.BrightnessControl, [brightness])
elif n.address == 0x10:
brightness = n.data[0] & 0x01
elif notification.address == 0x10:
brightness = notification.data[0] & 0x01
if brightness:
brightness = struct.unpack("!H", device.feature_request(_F.BRIGHTNESS_CONTROL, 0x10)[:2])[0]
device.setting_callback(device, settings_templates.BrightnessControl, [brightness])
diversion.process_notification(device, n, feature)
diversion.process_notification(device, notification, feature)
return True