diff --git a/lib/logitech_receiver/base_usb.py b/lib/logitech_receiver/base_usb.py index 513173da..f4edbb6d 100644 --- a/lib/logitech_receiver/base_usb.py +++ b/lib/logitech_receiver/base_usb.py @@ -123,8 +123,8 @@ def _lightspeed_receiver(product_id: int) -> dict: "usb_interface": 2, "receiver_kind": "lightspeed", "name": _("Lightspeed Receiver"), - "may_unpair": False, - "re_pairs": True, + "may_unpair": True, + "re_pairs": False, } diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index 90c3aa20..42df1c08 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -413,6 +413,31 @@ class Receiver: """Receiver specific unpairing.""" return self.write_register(Registers.RECEIVER_PAIRING, 0x03, key) + def force_unpair_slot(self, slot: int) -> bool: + """Force-unpair a slot by writing the unpair register, ignoring cache state. + + Intended for clearing stale pairings on receivers (Lightspeed in particular) + where Solaar cannot read pairing info for a slot. Bypasses the ``may_unpair`` + and ``re_pairs`` gates that ``_unpair_device`` applies. Returns True if the + register write was acknowledged by the receiver. + """ + if not self.handle: + return False + slot = int(slot) + reply = self._unpair_device_per_receiver(slot) + if reply: + cached = self._devices.get(slot) + if cached: + cached.online = False + cached.wpid = None + if slot in self._devices: + del self._devices[slot] + if logger.isEnabledFor(logging.INFO): + logger.info("%s force-unpaired slot %d", self, slot) + return True + logger.warning("%s failed to force-unpair slot %d", self, slot) + return False + def __len__(self): return len([d for d in self._devices.values() if d is not None]) diff --git a/lib/solaar/cli/__init__.py b/lib/solaar/cli/__init__.py index 99312a9b..3e4928c9 100644 --- a/lib/solaar/cli/__init__.py +++ b/lib/solaar/cli/__init__.py @@ -97,7 +97,27 @@ def _create_parser(): sp = subparsers.add_parser("unpair", description="Unpair a device from its receiver. Not all receivers allow unpairing.") sp.add_argument( "device", - help="device to unpair; may be a device number (1..6), a serial number, " "or a substring of a device's name.", + nargs="?", + help="device to unpair; may be a device number (1..6), a serial number, " + "or a substring of a device's name. Omit when using --slot.", + ) + sp.add_argument( + "--receiver", + help="select receiver by name substring or serial number when more than one is present; " + "required with --slot if multiple receivers are attached.", + ) + sp.add_argument( + "--slot", + type=int, + help="force-unpair a specific slot number directly, even if Solaar has no cached device there " + "or the device is currently reachable. Lightspeed receivers only. The slot contents are " + "printed before the write so you can confirm what is about to be cleared.", + ) + sp.add_argument( + "--dry-run", + action="store_true", + help="with --slot, run all safety checks but do not issue the unpair register write. " + "Use to verify the active-device guard before committing to a real write.", ) sp.set_defaults(action="unpair") diff --git a/lib/solaar/cli/unpair.py b/lib/solaar/cli/unpair.py index 5a84a9fb..a6148023 100644 --- a/lib/solaar/cli/unpair.py +++ b/lib/solaar/cli/unpair.py @@ -17,7 +17,12 @@ def run(receivers, args, find_receiver, find_device): assert receivers - assert args.device + + if getattr(args, "slot", None) is not None: + _run_slot_unpair(receivers, args, find_receiver) + return + + assert args.device, "unpair requires a device name, or use --slot" device_name = args.device.lower() dev = next(find_device(receivers, device_name), None) @@ -36,3 +41,50 @@ def run(receivers, args, find_receiver, find_device): print(f"Unpaired {int(number)}: {dev.name} ({codename}) [{wpid}:{serial}]") except Exception as e: raise e + + +def _run_slot_unpair(receivers, args, find_receiver): + if args.receiver: + rcv = find_receiver(receivers, args.receiver.lower()) + if not rcv: + raise Exception(f"no receiver found matching '{args.receiver}'") + elif len(receivers) == 1: + rcv = receivers[0] + else: + names = ", ".join(f"{r.name} [{r.serial}]" for r in receivers) + raise Exception(f"multiple receivers present, pass --receiver to pick one (found: {names})") + + if rcv.receiver_kind != "lightspeed": + raise Exception( + f"--slot unpair is currently only supported on Lightspeed receivers " + f"(this is a {rcv.receiver_kind or 'unknown'} receiver: {rcv.name})" + ) + + slot = int(args.slot) + max_slots = rcv.max_devices or 1 + if slot < 1 or slot > max_slots: + raise Exception(f"--slot {slot} out of range (valid: 1..{max_slots} on {rcv.name})") + + # Populate the cache from the receiver's pairing registers so we can report + # what the slot currently holds. Truthy cache does NOT imply the device is + # reachable on RF — it only means the pairing registers are readable. + list(rcv) + cached = rcv._devices.get(slot) + if cached: + slot_desc = f"{cached.name} [{cached.wpid}:{cached.serial}]" + elif slot in rcv._devices: + slot_desc = "cached None sentinel (pairing info unreadable)" + else: + slot_desc = "no pairing info cached" + + print(f"Slot {slot} on {rcv.name} [{rcv.serial}]: {slot_desc}") + + if getattr(args, "dry_run", False): + print(f"[dry-run] would force-unpair slot {slot} — no register write issued") + return + + ok = rcv.force_unpair_slot(slot) + if ok: + print(f"Slot {slot} unpair register write acknowledged by receiver") + else: + print(f"Slot {slot} unpair register write was not acknowledged (may be a no-op)") diff --git a/lib/solaar/ui/action.py b/lib/solaar/ui/action.py index 58f6dc1a..6751f92b 100644 --- a/lib/solaar/ui/action.py +++ b/lib/solaar/ui/action.py @@ -98,6 +98,10 @@ def unpair(window, device): device_number = device.number try: - del receiver[device_number] + # force=True ensures the unpair register write is issued even on + # re_pairs receivers (Lightspeed, Nano); otherwise _unpair_device + # short-circuits to cache-invalidation only and the slot stays + # bound on the hardware. + receiver._unpair_device(device_number, True) except Exception: common.error_dialog(common.ErrorReason.UNPAIR, device) diff --git a/tests/logitech_receiver/test_receiver.py b/tests/logitech_receiver/test_receiver.py index 1f8c2506..2e8775ec 100644 --- a/tests/logitech_receiver/test_receiver.py +++ b/tests/logitech_receiver/test_receiver.py @@ -312,3 +312,60 @@ def test_extract_device_kind(data, expected_device_kind): device_kind = receiver.extract_device_kind(data) assert str(device_kind) == expected_device_kind + + +def _make_fake_receiver(cached_devices=None): + r = mock.Mock(spec=receiver.Receiver) + r.handle = 0x12 + r._devices = dict(cached_devices or {}) + r.force_unpair_slot = partial(receiver.Receiver.force_unpair_slot, r) + return r + + +def test_force_unpair_slot_empty_slot_acknowledged(): + r = _make_fake_receiver(cached_devices={}) + r._unpair_device_per_receiver = mock.Mock(return_value=b"\x00") + + assert r.force_unpair_slot(2) is True + r._unpair_device_per_receiver.assert_called_once_with(2) + assert 2 not in r._devices + + +def test_force_unpair_slot_stale_sentinel_cleared(): + # "Device not found" state: slot present in cache but value is None. + r = _make_fake_receiver(cached_devices={2: None}) + r._unpair_device_per_receiver = mock.Mock(return_value=b"\x00") + + assert r.force_unpair_slot(2) is True + assert 2 not in r._devices + + +def test_force_unpair_slot_active_device_invalidated(): + dev = mock.Mock() + dev.online = True + dev.wpid = "40B4" + r = _make_fake_receiver(cached_devices={1: dev}) + r._unpair_device_per_receiver = mock.Mock(return_value=b"\x00") + + assert r.force_unpair_slot(1) is True + assert dev.online is False + assert dev.wpid is None + assert 1 not in r._devices + + +def test_force_unpair_slot_register_write_failed(): + r = _make_fake_receiver(cached_devices={2: None}) + r._unpair_device_per_receiver = mock.Mock(return_value=None) + + assert r.force_unpair_slot(2) is False + # On failure, cache state is preserved so Solaar's model stays honest. + assert r._devices == {2: None} + + +def test_force_unpair_slot_no_handle(): + r = _make_fake_receiver() + r.handle = None + r._unpair_device_per_receiver = mock.Mock() + + assert r.force_unpair_slot(2) is False + r._unpair_device_per_receiver.assert_not_called()