device: Support per-slot unpair on Lightspeed receivers (CLI + GUI) (#3183)
* Add CLI --slot unpair for Lightspeed receivers Adds Receiver.force_unpair_slot(), a low-level method that writes the RECEIVER_PAIRING unpair register (action 0x03) for a given slot regardless of cache state, bypassing may_unpair and re_pairs gates. Intended for clearing stale pairings on Lightspeed receivers where Solaar cannot read the slot's pairing info or the device is no longer reachable on RF. Extends the `solaar unpair` CLI with three new flags: --receiver <name> select which receiver to target --slot <N> target a specific slot number directly --dry-run print what would happen without issuing the write The --slot path is gated to Lightspeed receivers only (by receiver_kind) so Unifying/Bolt/Nano behavior is unchanged. It populates the cache first and prints the current slot contents so the user can confirm what is about to be cleared, but does not refuse based on active/offline state — the explicit --slot N is treated as sufficient intent. Verified end-to-end on a C547 dual-slot Lightspeed receiver: stale slot cleared, RECEIVER_INFO sub-registers 0x21/0x31 went to None, connection count register dropped from 2 to 1, running solaar daemon picked up the change in real time via the existing DJ pairing notification hook. Covered by 5 unit tests against a mocked Receiver: empty slot, stale sentinel, active device invalidation, register write failure, closed handle. * Enable GUI unpair for Lightspeed receivers Flip _lightspeed_receiver() to may_unpair: True so the GUI unpair button becomes sensitive for Lightspeed-paired devices, and route the GUI unpair action through _unpair_device(n, force=True) so the unpair register write actually fires instead of short-circuiting to cache invalidation. The previous GUI path called `del receiver[n]`, which dispatches to Receiver.__delitem__ → _unpair_device(n, force=False). On receivers with re_pairs=True (Lightspeed, Nano) that hits the cache-invalidation branch at receiver.py:391 and never writes the unpair register — a "fake unpair" that would have left the slot bound on the hardware even after the button was enabled. With force=True, the GUI now issues RECEIVER_PAIRING action 0x03 for the selected slot, matching the CLI unpair path (cli/unpair.py:39) which has always used force=True. Lightspeed and Unifying unpair behavior are now symmetric: the button is enabled, the confirmation dialog is shown, and the register write is performed. The pair/add flow is untouched: it still uses set_lock(device=0) which lets the receiver firmware pick an empty slot, re_pairs remains True so the listener's silent-replace branch continues to handle re-pair into an occupied devnumber. Verified on dual-slot C547 hardware that pairing into an empty slot preserves the occupant of the other slot. Stale pairings where Solaar can't enumerate the slot (no cached device row to right-click) still require the --slot CLI from the preceding commit — that path is orthogonal to this GUI enablement. * Apply suggestion from @pfps Lightspeed receivers don't appear to re-pair. --------- Co-authored-by: Peter F. Patel-Schneider <pfpschneider@gmail.com>
This commit is contained in:
parent
8b023115d2
commit
ba0b45df22
|
|
@ -123,8 +123,8 @@ def _lightspeed_receiver(product_id: int) -> dict:
|
||||||
"usb_interface": 2,
|
"usb_interface": 2,
|
||||||
"receiver_kind": "lightspeed",
|
"receiver_kind": "lightspeed",
|
||||||
"name": _("Lightspeed Receiver"),
|
"name": _("Lightspeed Receiver"),
|
||||||
"may_unpair": False,
|
"may_unpair": True,
|
||||||
"re_pairs": True,
|
"re_pairs": False,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -413,6 +413,31 @@ class Receiver:
|
||||||
"""Receiver specific unpairing."""
|
"""Receiver specific unpairing."""
|
||||||
return self.write_register(Registers.RECEIVER_PAIRING, 0x03, key)
|
return self.write_register(Registers.RECEIVER_PAIRING, 0x03, key)
|
||||||
|
|
||||||
|
def force_unpair_slot(self, slot: int) -> bool:
|
||||||
|
"""Force-unpair a slot by writing the unpair register, ignoring cache state.
|
||||||
|
|
||||||
|
Intended for clearing stale pairings on receivers (Lightspeed in particular)
|
||||||
|
where Solaar cannot read pairing info for a slot. Bypasses the ``may_unpair``
|
||||||
|
and ``re_pairs`` gates that ``_unpair_device`` applies. Returns True if the
|
||||||
|
register write was acknowledged by the receiver.
|
||||||
|
"""
|
||||||
|
if not self.handle:
|
||||||
|
return False
|
||||||
|
slot = int(slot)
|
||||||
|
reply = self._unpair_device_per_receiver(slot)
|
||||||
|
if reply:
|
||||||
|
cached = self._devices.get(slot)
|
||||||
|
if cached:
|
||||||
|
cached.online = False
|
||||||
|
cached.wpid = None
|
||||||
|
if slot in self._devices:
|
||||||
|
del self._devices[slot]
|
||||||
|
if logger.isEnabledFor(logging.INFO):
|
||||||
|
logger.info("%s force-unpaired slot %d", self, slot)
|
||||||
|
return True
|
||||||
|
logger.warning("%s failed to force-unpair slot %d", self, slot)
|
||||||
|
return False
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return len([d for d in self._devices.values() if d is not None])
|
return len([d for d in self._devices.values() if d is not None])
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -97,7 +97,27 @@ def _create_parser():
|
||||||
sp = subparsers.add_parser("unpair", description="Unpair a device from its receiver. Not all receivers allow unpairing.")
|
sp = subparsers.add_parser("unpair", description="Unpair a device from its receiver. Not all receivers allow unpairing.")
|
||||||
sp.add_argument(
|
sp.add_argument(
|
||||||
"device",
|
"device",
|
||||||
help="device to unpair; may be a device number (1..6), a serial number, " "or a substring of a device's name.",
|
nargs="?",
|
||||||
|
help="device to unpair; may be a device number (1..6), a serial number, "
|
||||||
|
"or a substring of a device's name. Omit when using --slot.",
|
||||||
|
)
|
||||||
|
sp.add_argument(
|
||||||
|
"--receiver",
|
||||||
|
help="select receiver by name substring or serial number when more than one is present; "
|
||||||
|
"required with --slot if multiple receivers are attached.",
|
||||||
|
)
|
||||||
|
sp.add_argument(
|
||||||
|
"--slot",
|
||||||
|
type=int,
|
||||||
|
help="force-unpair a specific slot number directly, even if Solaar has no cached device there "
|
||||||
|
"or the device is currently reachable. Lightspeed receivers only. The slot contents are "
|
||||||
|
"printed before the write so you can confirm what is about to be cleared.",
|
||||||
|
)
|
||||||
|
sp.add_argument(
|
||||||
|
"--dry-run",
|
||||||
|
action="store_true",
|
||||||
|
help="with --slot, run all safety checks but do not issue the unpair register write. "
|
||||||
|
"Use to verify the active-device guard before committing to a real write.",
|
||||||
)
|
)
|
||||||
sp.set_defaults(action="unpair")
|
sp.set_defaults(action="unpair")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,12 @@
|
||||||
|
|
||||||
def run(receivers, args, find_receiver, find_device):
|
def run(receivers, args, find_receiver, find_device):
|
||||||
assert receivers
|
assert receivers
|
||||||
assert args.device
|
|
||||||
|
if getattr(args, "slot", None) is not None:
|
||||||
|
_run_slot_unpair(receivers, args, find_receiver)
|
||||||
|
return
|
||||||
|
|
||||||
|
assert args.device, "unpair requires a device name, or use --slot"
|
||||||
|
|
||||||
device_name = args.device.lower()
|
device_name = args.device.lower()
|
||||||
dev = next(find_device(receivers, device_name), None)
|
dev = next(find_device(receivers, device_name), None)
|
||||||
|
|
@ -36,3 +41,50 @@ def run(receivers, args, find_receiver, find_device):
|
||||||
print(f"Unpaired {int(number)}: {dev.name} ({codename}) [{wpid}:{serial}]")
|
print(f"Unpaired {int(number)}: {dev.name} ({codename}) [{wpid}:{serial}]")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
|
||||||
|
def _run_slot_unpair(receivers, args, find_receiver):
|
||||||
|
if args.receiver:
|
||||||
|
rcv = find_receiver(receivers, args.receiver.lower())
|
||||||
|
if not rcv:
|
||||||
|
raise Exception(f"no receiver found matching '{args.receiver}'")
|
||||||
|
elif len(receivers) == 1:
|
||||||
|
rcv = receivers[0]
|
||||||
|
else:
|
||||||
|
names = ", ".join(f"{r.name} [{r.serial}]" for r in receivers)
|
||||||
|
raise Exception(f"multiple receivers present, pass --receiver to pick one (found: {names})")
|
||||||
|
|
||||||
|
if rcv.receiver_kind != "lightspeed":
|
||||||
|
raise Exception(
|
||||||
|
f"--slot unpair is currently only supported on Lightspeed receivers "
|
||||||
|
f"(this is a {rcv.receiver_kind or 'unknown'} receiver: {rcv.name})"
|
||||||
|
)
|
||||||
|
|
||||||
|
slot = int(args.slot)
|
||||||
|
max_slots = rcv.max_devices or 1
|
||||||
|
if slot < 1 or slot > max_slots:
|
||||||
|
raise Exception(f"--slot {slot} out of range (valid: 1..{max_slots} on {rcv.name})")
|
||||||
|
|
||||||
|
# Populate the cache from the receiver's pairing registers so we can report
|
||||||
|
# what the slot currently holds. Truthy cache does NOT imply the device is
|
||||||
|
# reachable on RF — it only means the pairing registers are readable.
|
||||||
|
list(rcv)
|
||||||
|
cached = rcv._devices.get(slot)
|
||||||
|
if cached:
|
||||||
|
slot_desc = f"{cached.name} [{cached.wpid}:{cached.serial}]"
|
||||||
|
elif slot in rcv._devices:
|
||||||
|
slot_desc = "cached None sentinel (pairing info unreadable)"
|
||||||
|
else:
|
||||||
|
slot_desc = "no pairing info cached"
|
||||||
|
|
||||||
|
print(f"Slot {slot} on {rcv.name} [{rcv.serial}]: {slot_desc}")
|
||||||
|
|
||||||
|
if getattr(args, "dry_run", False):
|
||||||
|
print(f"[dry-run] would force-unpair slot {slot} — no register write issued")
|
||||||
|
return
|
||||||
|
|
||||||
|
ok = rcv.force_unpair_slot(slot)
|
||||||
|
if ok:
|
||||||
|
print(f"Slot {slot} unpair register write acknowledged by receiver")
|
||||||
|
else:
|
||||||
|
print(f"Slot {slot} unpair register write was not acknowledged (may be a no-op)")
|
||||||
|
|
|
||||||
|
|
@ -98,6 +98,10 @@ def unpair(window, device):
|
||||||
device_number = device.number
|
device_number = device.number
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del receiver[device_number]
|
# force=True ensures the unpair register write is issued even on
|
||||||
|
# re_pairs receivers (Lightspeed, Nano); otherwise _unpair_device
|
||||||
|
# short-circuits to cache-invalidation only and the slot stays
|
||||||
|
# bound on the hardware.
|
||||||
|
receiver._unpair_device(device_number, True)
|
||||||
except Exception:
|
except Exception:
|
||||||
common.error_dialog(common.ErrorReason.UNPAIR, device)
|
common.error_dialog(common.ErrorReason.UNPAIR, device)
|
||||||
|
|
|
||||||
|
|
@ -312,3 +312,60 @@ def test_extract_device_kind(data, expected_device_kind):
|
||||||
device_kind = receiver.extract_device_kind(data)
|
device_kind = receiver.extract_device_kind(data)
|
||||||
|
|
||||||
assert str(device_kind) == expected_device_kind
|
assert str(device_kind) == expected_device_kind
|
||||||
|
|
||||||
|
|
||||||
|
def _make_fake_receiver(cached_devices=None):
|
||||||
|
r = mock.Mock(spec=receiver.Receiver)
|
||||||
|
r.handle = 0x12
|
||||||
|
r._devices = dict(cached_devices or {})
|
||||||
|
r.force_unpair_slot = partial(receiver.Receiver.force_unpair_slot, r)
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def test_force_unpair_slot_empty_slot_acknowledged():
|
||||||
|
r = _make_fake_receiver(cached_devices={})
|
||||||
|
r._unpair_device_per_receiver = mock.Mock(return_value=b"\x00")
|
||||||
|
|
||||||
|
assert r.force_unpair_slot(2) is True
|
||||||
|
r._unpair_device_per_receiver.assert_called_once_with(2)
|
||||||
|
assert 2 not in r._devices
|
||||||
|
|
||||||
|
|
||||||
|
def test_force_unpair_slot_stale_sentinel_cleared():
|
||||||
|
# "Device not found" state: slot present in cache but value is None.
|
||||||
|
r = _make_fake_receiver(cached_devices={2: None})
|
||||||
|
r._unpair_device_per_receiver = mock.Mock(return_value=b"\x00")
|
||||||
|
|
||||||
|
assert r.force_unpair_slot(2) is True
|
||||||
|
assert 2 not in r._devices
|
||||||
|
|
||||||
|
|
||||||
|
def test_force_unpair_slot_active_device_invalidated():
|
||||||
|
dev = mock.Mock()
|
||||||
|
dev.online = True
|
||||||
|
dev.wpid = "40B4"
|
||||||
|
r = _make_fake_receiver(cached_devices={1: dev})
|
||||||
|
r._unpair_device_per_receiver = mock.Mock(return_value=b"\x00")
|
||||||
|
|
||||||
|
assert r.force_unpair_slot(1) is True
|
||||||
|
assert dev.online is False
|
||||||
|
assert dev.wpid is None
|
||||||
|
assert 1 not in r._devices
|
||||||
|
|
||||||
|
|
||||||
|
def test_force_unpair_slot_register_write_failed():
|
||||||
|
r = _make_fake_receiver(cached_devices={2: None})
|
||||||
|
r._unpair_device_per_receiver = mock.Mock(return_value=None)
|
||||||
|
|
||||||
|
assert r.force_unpair_slot(2) is False
|
||||||
|
# On failure, cache state is preserved so Solaar's model stays honest.
|
||||||
|
assert r._devices == {2: None}
|
||||||
|
|
||||||
|
|
||||||
|
def test_force_unpair_slot_no_handle():
|
||||||
|
r = _make_fake_receiver()
|
||||||
|
r.handle = None
|
||||||
|
r._unpair_device_per_receiver = mock.Mock()
|
||||||
|
|
||||||
|
assert r.force_unpair_slot(2) is False
|
||||||
|
r._unpair_device_per_receiver.assert_not_called()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue