Introduce Enum for notification types

This commit is contained in:
MattHag 2024-05-28 01:46:24 +02:00 committed by Peter F. Patel-Schneider
parent 9726b93a78
commit d67466298b
3 changed files with 27 additions and 13 deletions

View File

@ -619,3 +619,13 @@ class Alert(IntEnum):
SHOW_WINDOW = 0x02 SHOW_WINDOW = 0x02
ATTENTION = 0x04 ATTENTION = 0x04
ALL = 0xFF ALL = 0xFF
class Notification(IntEnum):
NO_OPERATION = 0x00
CONNECT_DISCONNECT = 0x40
DJ_PAIRING = 0x41
CONNECTED = 0x42
RAW_INPUT = 0x49
PAIRING_LOCK = 0x4A
POWER = 0x4B

View File

@ -33,6 +33,7 @@ from . import hidpp20_constants
from . import settings_templates from . import settings_templates
from .common import Alert from .common import Alert
from .common import BatteryStatus from .common import BatteryStatus
from .common import Notification
from .hidpp10_constants import Registers from .hidpp10_constants import Registers
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -58,7 +59,7 @@ def _process_receiver_notification(receiver, n):
# supposedly only 0x4x notifications arrive for the receiver # supposedly only 0x4x notifications arrive for the receiver
assert n.sub_id & 0x40 == 0x40 assert n.sub_id & 0x40 == 0x40
if n.sub_id == 0x4A: # pairing lock notification if n.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(n.address & 0x01) receiver.pairing.lock_open = bool(n.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
@ -117,9 +118,10 @@ def _process_receiver_notification(receiver, n):
logger.info("%s: %s", receiver, reason) logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None receiver.pairing.error = None
if not receiver.pairing.lock_open: if not receiver.pairing.lock_open:
receiver.pairing.counter = ( receiver.pairing.counter = None
receiver.pairing.device_address receiver.pairing.device_address = None
) = receiver.pairing.device_authentication = receiver.pairing.device_name = None receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = n.data[0] pair_error = n.data[0]
if receiver.pairing.lock_open: if receiver.pairing.lock_open:
receiver.pairing.new_device = None receiver.pairing.new_device = None
@ -147,7 +149,8 @@ def _process_device_notification(device, n):
# incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here # incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here
assert n.sub_id & 0x80 == 0 assert n.sub_id & 0x80 == 0
if n.sub_id == 00: # no-op feature notification, dispose of it quickly if n.sub_id == Notification.NO_OPERATION:
# dispose it
return False return False
# Allow the device object to handle the notification using custom per-device state. # Allow the device object to handle the notification using custom per-device state.
@ -188,19 +191,19 @@ def _process_dj_notification(device, n):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s (%s) DJ %s", device, device.protocol, n) logger.debug("%s (%s) DJ %s", device, device.protocol, n)
if n.sub_id == 0x40: if n.sub_id == Notification.CONNECT_DISCONNECT:
# do all DJ paired notifications also show up as HID++ 1.0 notifications? # do all DJ paired notifications also show up as HID++ 1.0 notifications?
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
logger.info("%s: ignoring DJ unpaired: %s", device, n) logger.info("%s: ignoring DJ unpaired: %s", device, n)
return True return True
if n.sub_id == 0x41: if n.sub_id == Notification.DJ_PAIRING:
# do all DJ paired notifications also show up as HID++ 1.0 notifications? # do all DJ paired notifications also show up as HID++ 1.0 notifications?
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
logger.info("%s: ignoring DJ paired: %s", device, n) logger.info("%s: ignoring DJ paired: %s", device, n)
return True return True
if n.sub_id == 0x42: if n.sub_id == Notification.CONNECTED:
connected = not n.address & 0x01 connected = not n.address & 0x01
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
logger.info("%s: DJ connection: %s %s", device, connected, n) logger.info("%s: DJ connection: %s %s", device, connected, n)
@ -224,7 +227,7 @@ def _process_hidpp10_custom_notification(device, n):
def _process_hidpp10_notification(device, n): def _process_hidpp10_notification(device, n):
if n.sub_id == 0x40: # device unpairing if n.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing
if n.address == 0x02: if n.address == 0x02:
# device un-paired # device un-paired
device.wpid = None device.wpid = None
@ -236,7 +239,7 @@ def _process_hidpp10_notification(device, n):
logger.warning("%s: disconnection with unknown type %02X: %s", device, n.address, n) logger.warning("%s: disconnection with unknown type %02X: %s", device, n.address, n)
return True return True
if n.sub_id == 0x41: # device connection (and disconnection) if n.sub_id == Notification.DJ_PAIRING: # device connection (and disconnection)
flags = ord(n.data[:1]) & 0xF0 flags = ord(n.data[:1]) & 0xF0
if n.address == 0x02: # very old 27 MHz protocol if n.address == 0x02: # very old 27 MHz protocol
wpid = "00" + common.strhex(n.data[2:3]) wpid = "00" + common.strhex(n.data[2:3])
@ -267,13 +270,13 @@ def _process_hidpp10_notification(device, n):
device.changed(active=link_established) device.changed(active=link_established)
return True return True
if n.sub_id == 0x49: if n.sub_id == Notification.RAW_INPUT:
# raw input event? just ignore it # raw input event? just ignore it
# if n.address == 0x01, no idea what it is, but they keep on coming # if n.address == 0x01, no idea what it is, but they keep on coming
# if n.address == 0x03, appears to be an actual input event, because they only come when input happents # if n.address == 0x03, appears to be an actual input event, because they only come when input happents
return True return True
if n.sub_id == 0x4B: # power notification if n.sub_id == Notification.POWER:
if n.address == 0x01: if n.address == 0x01:
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: device powered on", device) logger.debug("%s: device powered on", device)

View File

@ -35,6 +35,7 @@ from . import exceptions
from . import hidpp10 from . import hidpp10
from . import hidpp10_constants from . import hidpp10_constants
from .common import Alert from .common import Alert
from .common import Notification
from .device import Device from .device import Device
from .hidpp10_constants import Registers from .hidpp10_constants import Registers
@ -230,7 +231,7 @@ class Receiver:
raise IndexError(f"{self}: device number {int(number)} already registered") raise IndexError(f"{self}: device number {int(number)} already registered")
assert notification is None or notification.devnumber == number assert notification is None or notification.devnumber == number
assert notification is None or notification.sub_id == 0x41 assert notification is None or notification.sub_id == Notification.DJ_PAIRING
try: try:
time.sleep(0.05) # let receiver settle time.sleep(0.05) # let receiver settle