Merge branch 'master' into diversion_remove_inheritance

This commit is contained in:
MattHag 2025-03-01 12:07:04 +01:00 committed by GitHub
commit c3f3c91e59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 419 additions and 294 deletions

View File

@ -8,7 +8,7 @@ jobs:
strategy: strategy:
matrix: matrix:
python-version: [3.8, 3.12] python-version: [3.8, 3.13]
steps: steps:
- name: Checkout - name: Checkout
@ -47,7 +47,7 @@ jobs:
strategy: strategy:
matrix: matrix:
python-version: [3.8, 3.12] python-version: [3.8, 3.13]
steps: steps:
- name: Checkout - name: Checkout

6
.gitignore vendored
View File

@ -23,3 +23,9 @@ __pycache__/
/po/*.po~ /po/*.po~
/.idea/ /.idea/
.DS_Store
._*
Pipfile
Pipfile.lock

View File

@ -73,7 +73,7 @@ If you are running the system version of Python in Debian/Ubuntu you should have
In Fedora you need `gtk3` and `python3-gobject`. In Fedora you need `gtk3` and `python3-gobject`.
You may have to install `gcc` and the Python development package (`python3-dev` or `python3-devel`, You may have to install `gcc` and the Python development package (`python3-dev` or `python3-devel`,
depending on your distribution). depending on your distribution).
Other system packages may be required depending on your distribution, such as `python-gobject-common-devel`. Other system packages may be required depending on your distribution, such as `python-gobject-common-devel` and `python-typing-extensions'.
Although the Solaar CLI does not require Gtk3, Although the Solaar CLI does not require Gtk3,
`solaar config` does use Gtk3 capabilities to determine whether the Solaar GUI is running `solaar config` does use Gtk3 capabilities to determine whether the Solaar GUI is running
and thus should tell the Solaar GUI to update its information about settings and thus should tell the Solaar GUI to update its information about settings

40
docs/unistallation.md Normal file
View File

@ -0,0 +1,40 @@
---
title: Uninstalling Solaar
layout: page
---
# Uninstalling Solaar
## Uninstalling from Debian systems
If you installed Solaar using `apt`, you can remove it by running:
```bash
sudo apt remove --purge solaar
```
## Uninstalling from GitHub
If you cloned and installed Solaar from GitHub manually, navigate to the cloned directory and run:
```bash
sudo make uninstall
```
## Removing Configuration Files
Solaar may leave behind configuration files in your home directory. To delete them, run:
```bash
rm -rf ~/.config/solaar
```
## Verifying Uninstallation
To confirm that Solaar is fully removed, try running:
```bash
which solaar
```
If no output is returned, Solaar has been successfully uninstalled.

View File

@ -233,6 +233,7 @@ def _match(
vid = device["vendor_id"] vid = device["vendor_id"]
pid = device["product_id"] pid = device["product_id"]
hid_bus_type = device["bus_type"]
# Translate hidapi bus_type to the bus_id values Solaar expects # Translate hidapi bus_type to the bus_id values Solaar expects
if device.get("bus_type") == 0x01: if device.get("bus_type") == 0x01:
@ -241,31 +242,49 @@ def _match(
bus_id = 0x05 # Bluetooth bus_id = 0x05 # Bluetooth
else: else:
bus_id = None bus_id = None
logger.info(f"Device {device['path']} has an unsupported bus type {hid_bus_type:02X}")
return None
# Skip unlikely devices with all-zero VID PID or unsupported bus IDs
if vid == 0 and pid == 0:
logger.info(f"Device {device['path']} has all-zero VID and PID")
logger.info(f"Skipping unlikely device {device['path']} ({bus_id}/{vid:04X}/{pid:04X})")
return None
# Check for hidpp support # Check for hidpp support
device["hidpp_short"] = False device["hidpp_short"] = False
device["hidpp_long"] = False device["hidpp_long"] = False
device_handle = None device_handle = None
def check_hidpp_short():
report = _get_input_report(device_handle, 0x10, 32)
if len(report) == 1 + 6 and report[0] == 0x10:
device["hidpp_short"] = True
def check_hidpp_long():
report = _get_input_report(device_handle, 0x11, 32)
if len(report) == 1 + 19 and report[0] == 0x11:
device["hidpp_long"] = True
try: try:
device_handle = open_path(device["path"]) device_handle = open_path(device["path"])
try:
report = _get_input_report(device_handle, 0x10, 32) for check_func in (check_hidpp_short, check_hidpp_long):
if len(report) == 1 + 6 and report[0] == 0x10: try:
device["hidpp_short"] = True check_func()
except HIDError as e: except HIDError as e:
logger.info(f"Error opening device {device['path']} ({bus_id}/{vid:04X}/{pid:04X}) for hidpp check: {e}") logger.info(
try: f"Error while {check_func.__name__}"
report = _get_input_report(device_handle, 0x11, 32) f"on device {device['path']} ({bus_id}/{vid:04X}/{pid:04X}) for hidpp check: {e}"
if len(report) == 1 + 19 and report[0] == 0x11: )
device["hidpp_long"] = True except HIDError as e:
except HIDError as e: logger.info(f"Error opening device {device['path']} ({bus_id}/{vid:04X}/{pid:04X}) for hidpp check: {e}")
logger.info(f"Error opening device {device['path']} ({bus_id}/{vid:04X}/{pid:04X}) for hidpp check: {e}")
finally: finally:
if device_handle: if device_handle:
close(device_handle) close(device_handle)
logger.info( logger.info(
"Found device BID %s VID %04X PID %04X HID++ %s %s", "Found device BID %s VID %04X PID %04X HID++ SHORT %s LONG %s",
bus_id, bus_id,
vid, vid,
pid, pid,
@ -317,6 +336,8 @@ def _match(
) )
return d_info return d_info
logger.info(f"Finished checking HIDPP support for device {device['path']} ({bus_id}/{vid:04X}/{pid:04X})")
def find_paired_node(receiver_path: str, index: int, timeout: int): def find_paired_node(receiver_path: str, index: int, timeout: int):
"""Find the node of a device paired with a receiver""" """Find the node of a device paired with a receiver"""

View File

@ -50,7 +50,7 @@ print_lock = Lock()
def _print(marker, data, scroll=False): def _print(marker, data, scroll=False):
t = time.time() - start_time t = time.time() - start_time
if isinstance(data, str): if isinstance(data, str):
s = marker + " " + data s = f"{marker} {data}"
else: else:
hexs = strhex(data) hexs = strhex(data)
s = "%s (% 8.3f) [%s %s %s %s] %s" % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data)) s = "%s (% 8.3f) [%s %s %s %s] %s" % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
@ -90,7 +90,7 @@ def _continuous_read(handle, timeout=2000):
try: try:
reply = hidapi.read(handle, 128, timeout) reply = hidapi.read(handle, 128, timeout)
except OSError as e: except OSError as e:
_error("Read failed, aborting: " + str(e), True) _error(f"Read failed, aborting: {str(e)}", True)
break break
assert reply is not None assert reply is not None
if reply: if reply:
@ -101,7 +101,7 @@ def _validate_input(line, hidpp=False):
try: try:
data = unhexlify(line.encode("ascii")) data = unhexlify(line.encode("ascii"))
except Exception as e: except Exception as e:
_error("Invalid input: " + str(e)) _error(f"Invalid input: {str(e)}")
return None return None
if hidpp: if hidpp:

View File

@ -333,7 +333,7 @@ class NamedInt(int):
return self.name.lower() == other.lower() return self.name.lower() == other.lower()
# this should catch comparisons with bytes in Py3 # this should catch comparisons with bytes in Py3
if other is not None: if other is not None:
raise TypeError("Unsupported type " + str(type(other))) raise TypeError(f"Unsupported type {str(type(other))}")
def __ne__(self, other): def __ne__(self, other):
return not self.__eq__(other) return not self.__eq__(other)
@ -467,7 +467,7 @@ class NamedInts:
def __setitem__(self, index, name): def __setitem__(self, index, name):
assert isinstance(index, int), type(index) assert isinstance(index, int), type(index)
if isinstance(name, NamedInt): if isinstance(name, NamedInt):
assert int(index) == int(name), repr(index) + " " + repr(name) assert int(index) == int(name), f"{repr(index)} {repr(name)}"
value = name value = name
elif isinstance(name, str): elif isinstance(name, str):
value = NamedInt(index, name) value = NamedInt(index, name)

View File

@ -86,7 +86,7 @@ if available:
n.set_urgency(Notify.Urgency.NORMAL) n.set_urgency(Notify.Urgency.NORMAL)
n.set_hint("desktop-entry", GLib.Variant("s", "solaar")) # replace with better name late n.set_hint("desktop-entry", GLib.Variant("s", "solaar")) # replace with better name late
try: try:
n.show() return n.show()
except Exception: except Exception:
logger.exception(f"showing {n}") logger.exception(f"showing {n}")

View File

@ -231,14 +231,14 @@ class Device:
self._codename = codename self._codename = codename
elif self.protocol < 2.0: elif self.protocol < 2.0:
self._codename = "? (%s)" % (self.wpid or self.product_id) self._codename = "? (%s)" % (self.wpid or self.product_id)
return self._codename or "?? (%s)" % (self.wpid or self.product_id) return self._codename or f"?? ({self.wpid or self.product_id})"
@property @property
def name(self): def name(self):
if not self._name: if not self._name:
if self.online and self.protocol >= 2.0: if self.online and self.protocol >= 2.0:
self._name = _hidpp20.get_name(self) self._name = _hidpp20.get_name(self)
return self._name or self._codename or "Unknown device %s" % (self.wpid or self.product_id) return self._name or self._codename or f"Unknown device {self.wpid or self.product_id}"
def get_ids(self): def get_ids(self):
ids = _hidpp20.get_ids(self) ids = _hidpp20.get_ids(self)
@ -540,7 +540,10 @@ class Device:
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
) )
handle = self.handle or self.receiver.handle handle = self.handle or self.receiver.handle
protocol = self.low_level.ping(handle, self.number, long_message=long) try:
protocol = self.low_level.ping(handle, self.number, long_message=long)
except exceptions.NoReceiver: # if ping fails, device is offline
protocol = None
self.online = protocol is not None self.online = protocol is not None
if protocol: if protocol:
self._protocol = protocol self._protocol = protocol

View File

@ -541,7 +541,7 @@ class Rule:
self.source = source self.source = source
def __str__(self): def __str__(self):
source = "(" + self.source + ")" if self.source else "" source = f"({self.source})" if self.source else ""
return f"Rule{source}[{', '.join([c.__str__() for c in self.components])}]" return f"Rule{source}[{', '.join([c.__str__() for c in self.components])}]"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
@ -593,7 +593,7 @@ class Not(ConditionProtocol):
self.component = compile_component(op) self.component = compile_component(op)
def __str__(self): def __str__(self):
return "Not: " + str(self.component) return f"Not: {str(self.component)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -706,7 +706,7 @@ class Process(ConditionProtocol):
self.process = str(process) self.process = str(process)
def __str__(self): def __str__(self):
return "Process: " + str(self.process) return f"Process: {str(self.process)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -737,7 +737,7 @@ class MouseProcess(ConditionProtocol):
self.process = str(process) self.process = str(process)
def __str__(self): def __str__(self):
return "MouseProcess: " + str(self.process) return f"MouseProcess: {str(self.process)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -755,14 +755,14 @@ class MouseProcess(ConditionProtocol):
class Feature(ConditionProtocol): class Feature(ConditionProtocol):
def __init__(self, feature: str, warn: bool = True): def __init__(self, feature: str, warn: bool = True):
try: try:
self.feature = SupportedFeature[feature] self.feature = SupportedFeature[feature.replace(" ", "_")]
except KeyError: except KeyError:
self.feature = None self.feature = None
if warn: if warn:
logger.warning("rule Feature argument not name of a feature: %s", feature) logger.warning("rule Feature argument not name of a feature: %s", feature)
def __str__(self): def __str__(self):
return "Feature: " + str(self.feature) return f"Feature: {str(self.feature)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -783,7 +783,7 @@ class Report(ConditionProtocol):
self.report = report self.report = report
def __str__(self): def __str__(self):
return "Report: " + str(self.report) return f"Report: {str(self.report)}"
def evaluate(self, report, notification: HIDPPNotification, device, last_result): def evaluate(self, report, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -856,7 +856,7 @@ class Modifiers(ConditionProtocol):
logger.warning("unknown rule Modifier value: %s", k) logger.warning("unknown rule Modifier value: %s", k)
def __str__(self): def __str__(self):
return "Modifiers: " + str(self.desired) return f"Modifiers: {str(self.desired)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -1002,7 +1002,7 @@ class Test(ConditionProtocol):
logger.warning("rule Test argument not valid %s", test) logger.warning("rule Test argument not valid %s", test)
def __str__(self): def __str__(self):
return "Test: " + str(self.test) return f"Test: {str(self.test)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -1030,7 +1030,7 @@ class TestBytes(ConditionProtocol):
logger.warning("rule TestBytes argument not valid %s", test) logger.warning("rule TestBytes argument not valid %s", test)
def __str__(self): def __str__(self):
return "TestBytes: " + str(self.test) return f"TestBytes: {str(self.test)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -1105,7 +1105,7 @@ class Active(ConditionProtocol):
self.devID = devID self.devID = devID
def __str__(self): def __str__(self):
return "Active: " + str(self.devID) return f"Active: {str(self.devID)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -1126,7 +1126,7 @@ class Device(ConditionProtocol):
self.devID = devID self.devID = devID
def __str__(self): def __str__(self):
return "Device: " + str(self.devID) return f"Device: {str(self.devID)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -1146,7 +1146,7 @@ class Host(ConditionProtocol):
self.host = host self.host = host
def __str__(self): def __str__(self):
return "Host: " + str(self.host) return f"Host: {str(self.host)}"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
@ -1442,7 +1442,7 @@ class Later(ActionProtocol):
self.components = self.rule.components self.components = self.rule.components
def __str__(self): def __str__(self):
return "Later: [" + str(self.delay) + ", " + ", ".join(str(c) for c in self.components) + "]" return f"Later: [{str(self.delay)}, " + ", ".join(str(c) for c in self.components) + "]"
def evaluate(self, feature, notification: HIDPPNotification, device, last_result): def evaluate(self, feature, notification: HIDPPNotification, device, last_result):
if self.delay and self.rule: if self.delay and self.rule:

View File

@ -47,6 +47,7 @@ DEVICE_KIND = NamedInts(
class PowerSwitchLocation(IntEnum): class PowerSwitchLocation(IntEnum):
UNKNOWN = 0x00
BASE = 0x01 BASE = 0x01
TOP_CASE = 0x02 TOP_CASE = 0x02
EDGE_OF_TOP_RIGHT_CORNER = 0x03 EDGE_OF_TOP_RIGHT_CORNER = 0x03
@ -59,6 +60,13 @@ class PowerSwitchLocation(IntEnum):
LEFT_EDGE = 0x0B LEFT_EDGE = 0x0B
BOTTOM_EDGE = 0x0C BOTTOM_EDGE = 0x0C
@classmethod
def location(cls, loc: int) -> PowerSwitchLocation:
try:
return cls(loc)
except ValueError:
return cls.UNKNOWN
class NotificationFlag(Flag): class NotificationFlag(Flag):
"""Some flags are used both by devices and receivers. """Some flags are used both by devices and receivers.

View File

@ -452,23 +452,23 @@ class PersistentRemappableAction:
if self.actionId == special_keys.ACTIONID.Empty: if self.actionId == special_keys.ACTIONID.Empty:
return None return None
elif self.actionId == special_keys.ACTIONID.Key: elif self.actionId == special_keys.ACTIONID.Key:
return "Key: " + str(self.modifiers) + str(self.remapped) return f"Key: {str(self.modifiers)}{str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Mouse: elif self.actionId == special_keys.ACTIONID.Mouse:
return "Mouse Button: " + str(self.remapped) return f"Mouse Button: {str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Xdisp: elif self.actionId == special_keys.ACTIONID.Xdisp:
return "X Displacement " + str(self.remapped) return f"X Displacement {str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Ydisp: elif self.actionId == special_keys.ACTIONID.Ydisp:
return "Y Displacement " + str(self.remapped) return f"Y Displacement {str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Vscroll: elif self.actionId == special_keys.ACTIONID.Vscroll:
return "Vertical Scroll " + str(self.remapped) return f"Vertical Scroll {str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Hscroll: elif self.actionId == special_keys.ACTIONID.Hscroll:
return "Horizontal Scroll: " + str(self.remapped) return f"Horizontal Scroll: {str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Consumer: elif self.actionId == special_keys.ACTIONID.Consumer:
return "Consumer: " + str(self.remapped) return f"Consumer: {str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Internal: elif self.actionId == special_keys.ACTIONID.Internal:
return "Internal Action " + str(self.remapped) return f"Internal Action {str(self.remapped)}"
elif self.actionId == special_keys.ACTIONID.Internal: elif self.actionId == special_keys.ACTIONID.Internal:
return "Power " + str(self.remapped) return f"Power {str(self.remapped)}"
else: else:
return "Unknown" return "Unknown"
@ -1159,8 +1159,19 @@ class RGBEffectsInfo(LEDEffectsInfo): # effects that the LEDs can do using RGB_
self.zones.append(LEDZoneInfo(SupportedFeature.RGB_EFFECTS, 0x00, 1, 0x00, device, i)) self.zones.append(LEDZoneInfo(SupportedFeature.RGB_EFFECTS, 0x00, 1, 0x00, device, i))
ButtonBehaviors = common.NamedInts(MacroExecute=0x0, MacroStop=0x1, MacroStopAll=0x2, Send=0x8, Function=0x9) class ButtonBehavior(IntEnum):
ButtonMappingTypes = common.NamedInts(No_Action=0x0, Button=0x1, Modifier_And_Key=0x2, Consumer_Key=0x3) MACRO_EXECUTE = 0x0
MACRO_STOP = 0x1
MACRO_STOP_ALL = 0x2
SEND = 0x8
FUNCTION = 0x9
class ButtonMappingType(IntEnum):
NO_ACTION = 0x0
BUTTON = 0x1
MODIFIER_AND_KEY = 0x2
CONSUMER_KEY = 0x3
class ButtonFunctions(IntEnum): class ButtonFunctions(IntEnum):
@ -1209,27 +1220,29 @@ class Button:
@classmethod @classmethod
def from_bytes(cls, bytes_) -> Button: def from_bytes(cls, bytes_) -> Button:
behavior_id = bytes_[0] >> 4 behavior = bytes_[0] >> 4
behavior = ButtonBehaviors[behavior_id] if behavior == ButtonBehavior.MACRO_EXECUTE or behavior == ButtonBehavior.MACRO_STOP:
if behavior == ButtonBehaviors.MacroExecute or behavior == ButtonBehaviors.MacroStop:
sector = ((bytes_[0] & 0x0F) << 8) + bytes_[1] sector = ((bytes_[0] & 0x0F) << 8) + bytes_[1]
address = (bytes_[2] << 8) + bytes_[3] address = (bytes_[2] << 8) + bytes_[3]
result = cls(behavior=behavior, sector=sector, address=address) result = cls(behavior=behavior, sector=sector, address=address)
elif behavior == ButtonBehaviors.Send: elif behavior == ButtonBehavior.SEND:
mapping_type = ButtonMappingTypes[bytes_[1]] try:
if mapping_type == ButtonMappingTypes.Button: mapping_type = ButtonMappingType(bytes_[1]).value
value = ButtonButtons[(bytes_[2] << 8) + bytes_[3]] if mapping_type == ButtonMappingType.BUTTON:
result = cls(behavior=behavior, type=mapping_type, value=value) value = ButtonButtons[(bytes_[2] << 8) + bytes_[3]]
elif mapping_type == ButtonMappingTypes.Modifier_And_Key: result = cls(behavior=behavior, type=mapping_type, value=value)
modifiers = bytes_[2] elif mapping_type == ButtonMappingType.MODIFIER_AND_KEY:
value = ButtonKeys[bytes_[3]] modifiers = bytes_[2]
result = cls(behavior=behavior, type=mapping_type, modifiers=modifiers, value=value) value = ButtonKeys[bytes_[3]]
elif mapping_type == ButtonMappingTypes.Consumer_Key: result = cls(behavior=behavior, type=mapping_type, modifiers=modifiers, value=value)
value = ButtonConsumerKeys[(bytes_[2] << 8) + bytes_[3]] elif mapping_type == ButtonMappingType.CONSUMER_KEY:
result = cls(behavior=behavior, type=mapping_type, value=value) value = ButtonConsumerKeys[(bytes_[2] << 8) + bytes_[3]]
elif mapping_type == ButtonMappingTypes.No_Action: result = cls(behavior=behavior, type=mapping_type, value=value)
result = cls(behavior=behavior, type=mapping_type) elif mapping_type == ButtonMappingType.NO_ACTION:
elif behavior == ButtonBehaviors.Function: result = cls(behavior=behavior, type=mapping_type)
except Exception:
pass
elif behavior == ButtonBehavior.FUNCTION:
second_byte = bytes_[1] second_byte = bytes_[1]
try: try:
btn_func = ButtonFunctions(second_byte).value btn_func = ButtonFunctions(second_byte).value
@ -1243,20 +1256,20 @@ class Button:
def to_bytes(self): def to_bytes(self):
bytes = common.int2bytes(self.behavior << 4, 1) if self.behavior is not None else None bytes = common.int2bytes(self.behavior << 4, 1) if self.behavior is not None else None
if self.behavior == ButtonBehaviors.MacroExecute or self.behavior == ButtonBehaviors.MacroStop: if self.behavior == ButtonBehavior.MACRO_EXECUTE.value or self.behavior == ButtonBehavior.MACRO_STOP.value:
bytes = common.int2bytes((self.behavior << 12) + self.sector, 2) + common.int2bytes(self.address, 2) bytes = common.int2bytes((self.behavior << 12) + self.sector, 2) + common.int2bytes(self.address, 2)
elif self.behavior == ButtonBehaviors.Send: elif self.behavior == ButtonBehavior.SEND.value:
bytes += common.int2bytes(self.type, 1) bytes += common.int2bytes(self.type, 1)
if self.type == ButtonMappingTypes.Button: if self.type == ButtonMappingType.BUTTON:
bytes += common.int2bytes(self.value, 2) bytes += common.int2bytes(self.value, 2)
elif self.type == ButtonMappingTypes.Modifier_And_Key: elif self.type == ButtonMappingType.MODIFIER_AND_KEY:
bytes += common.int2bytes(self.modifiers, 1) bytes += common.int2bytes(self.modifiers, 1)
bytes += common.int2bytes(self.value, 1) bytes += common.int2bytes(self.value, 1)
elif self.type == ButtonMappingTypes.Consumer_Key: elif self.type == ButtonMappingType.CONSUMER_KEY:
bytes += common.int2bytes(self.value, 2) bytes += common.int2bytes(self.value, 2)
elif self.type == ButtonMappingTypes.No_Action: elif self.type == ButtonMappingType.NO_ACTION:
bytes += b"\xff\xff" bytes += b"\xff\xff"
elif self.behavior == ButtonBehaviors.Function: elif self.behavior == ButtonBehavior.FUNCTION:
data = common.int2bytes(self.data, 1) if self.data else b"\x00" data = common.int2bytes(self.data, 1) if self.data else b"\x00"
bytes += common.int2bytes(self.value, 1) + b"\xff" + data bytes += common.int2bytes(self.value, 1) + b"\xff" + data
else: else:
@ -1266,7 +1279,7 @@ class Button:
def __repr__(self): def __repr__(self):
return "%s{%s}" % ( return "%s{%s}" % (
self.__class__.__name__, self.__class__.__name__,
", ".join([str(key) + ":" + str(val) for key, val in self.__dict__.items()]), ", ".join([f"{str(key)}:{str(val)}" for key, val in self.__dict__.items()]),
) )
@ -1503,25 +1516,6 @@ def feature_request(device, feature, function=0x00, *params, no_reply=False):
return device.request((feature_index << 8) + (function & 0xFF), *params, no_reply=no_reply) return device.request((feature_index << 8) + (function & 0xFF), *params, no_reply=no_reply)
# voltage to remaining charge from Logitech
battery_voltage_remaining = (
(4186, 100),
(4067, 90),
(3989, 80),
(3922, 70),
(3859, 60),
(3811, 50),
(3778, 40),
(3751, 30),
(3717, 20),
(3671, 10),
(3646, 5),
(3579, 2),
(3500, 0),
(-1000, 0),
)
class Hidpp20: class Hidpp20:
def get_firmware(self, device) -> tuple[common.FirmwareInfo] | None: def get_firmware(self, device) -> tuple[common.FirmwareInfo] | None:
"""Reads a device's firmware info. """Reads a device's firmware info.
@ -1848,7 +1842,7 @@ class Hidpp20:
state = device.feature_request(SupportedFeature.REPORT_RATE, 0x10) state = device.feature_request(SupportedFeature.REPORT_RATE, 0x10)
if state: if state:
rate = struct.unpack("!B", state[:1])[0] rate = struct.unpack("!B", state[:1])[0]
return str(rate) + "ms" return f"{str(rate)}ms"
else: else:
rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"] rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"]
state = device.feature_request(SupportedFeature.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20) state = device.feature_request(SupportedFeature.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20)
@ -1912,10 +1906,9 @@ def decipher_battery_voltage(report: bytes):
status = BatteryStatus.SLOW_RECHARGE status = BatteryStatus.SLOW_RECHARGE
elif flags & (1 << 5): elif flags & (1 << 5):
charge_lvl = ChargeLevel.CRITICAL charge_lvl = ChargeLevel.CRITICAL
for level in battery_voltage_remaining: charge_level = estimate_battery_level_percentage(voltage)
if level[0] < voltage: if charge_level:
charge_lvl = level[1] charge_lvl = charge_level
break
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug( logger.debug(
"battery voltage %d mV, charging %s, status %d = %s, level %s, type %s", "battery voltage %d mV, charging %s, status %d = %s, level %s, type %s",
@ -1929,7 +1922,7 @@ def decipher_battery_voltage(report: bytes):
return SupportedFeature.BATTERY_VOLTAGE, Battery(charge_lvl, None, status, voltage) return SupportedFeature.BATTERY_VOLTAGE, Battery(charge_lvl, None, status, voltage)
def decipher_battery_unified(report): def decipher_battery_unified(report) -> tuple[SupportedFeature, Battery]:
discharge, level, status_byte, _ignore = struct.unpack("!BBBB", report[:4]) discharge, level, status_byte, _ignore = struct.unpack("!BBBB", report[:4])
try: try:
status = BatteryStatus(status_byte) status = BatteryStatus(status_byte)
@ -1940,27 +1933,64 @@ def decipher_battery_unified(report):
logger.debug("battery unified %s%% charged, level %s, charging %s", discharge, level, status) logger.debug("battery unified %s%% charged, level %s, charging %s", discharge, level, status)
if level == 8: if level == 8:
level = BatteryLevelApproximation.FULL approx_level = BatteryLevelApproximation.FULL
elif level == 4: elif level == 4:
level = BatteryLevelApproximation.GOOD approx_level = BatteryLevelApproximation.GOOD
elif level == 2: elif level == 2:
level = BatteryLevelApproximation.LOW approx_level = BatteryLevelApproximation.LOW
elif level == 1: elif level == 1:
level = BatteryLevelApproximation.CRITICAL approx_level = BatteryLevelApproximation.CRITICAL
else: else:
level = BatteryLevelApproximation.EMPTY approx_level = BatteryLevelApproximation.EMPTY
return SupportedFeature.UNIFIED_BATTERY, Battery(discharge if discharge else level, None, status, None) return SupportedFeature.UNIFIED_BATTERY, Battery(discharge if discharge else approx_level, None, status, None)
def decipher_adc_measurement(report): def decipher_adc_measurement(report) -> tuple[SupportedFeature, Battery]:
# partial implementation - needs mapping to levels # partial implementation - needs mapping to levels
charge_level = None adc_voltage, flags = struct.unpack("!HB", report[:3])
adc, flags = struct.unpack("!HB", report[:3]) charge_level = estimate_battery_level_percentage(adc_voltage)
for level in battery_voltage_remaining:
if level[0] < adc:
charge_level = level[1]
break
if flags & 0x01: if flags & 0x01:
status = BatteryStatus.RECHARGING if flags & 0x02 else BatteryStatus.DISCHARGING status = BatteryStatus.RECHARGING if flags & 0x02 else BatteryStatus.DISCHARGING
return SupportedFeature.ADC_MEASUREMENT, Battery(charge_level, None, status, adc) return SupportedFeature.ADC_MEASUREMENT, Battery(charge_level, None, status, adc_voltage)
def estimate_battery_level_percentage(value_millivolt: int) -> int | None:
"""Estimate battery level percentage based on battery voltage.
Uses linear approximation to estimate the battery level in percent.
Parameters
----------
value_millivolt
Measured battery voltage in millivolt.
"""
battery_voltage_to_percentage = [
(4186, 100),
(4067, 90),
(3989, 80),
(3922, 70),
(3859, 60),
(3811, 50),
(3778, 40),
(3751, 30),
(3717, 20),
(3671, 10),
(3646, 5),
(3579, 2),
(3500, 0),
]
if value_millivolt >= battery_voltage_to_percentage[0][0]:
return battery_voltage_to_percentage[0][1]
if value_millivolt <= battery_voltage_to_percentage[-1][0]:
return battery_voltage_to_percentage[-1][1]
for i in range(len(battery_voltage_to_percentage) - 1):
v_high, p_high = battery_voltage_to_percentage[i]
v_low, p_low = battery_voltage_to_percentage[i + 1]
if v_low <= value_millivolt <= v_high:
# Linear interpolation
percent = p_low + (p_high - p_low) * (value_millivolt - v_low) / (v_high - v_low)
return round(percent)
return 0

View File

@ -117,7 +117,7 @@ class EventsListener(threading.Thread):
path_name = receiver.path.split("/")[2] path_name = receiver.path.split("/")[2]
except IndexError: except IndexError:
path_name = receiver.path path_name = receiver.path
super().__init__(name=self.__class__.__name__ + ":" + path_name) super().__init__(name=f"{self.__class__.__name__}:{path_name}")
self.daemon = True self.daemon = True
self._active = False self._active = False
self.receiver = receiver self.receiver = receiver

View File

@ -108,7 +108,7 @@ def extract_codename(response: bytes) -> str:
def extract_power_switch_location(response: bytes) -> str: def extract_power_switch_location(response: bytes) -> str:
"""Extracts power switch location from response.""" """Extracts power switch location from response."""
index = response[9] & 0x0F index = response[9] & 0x0F
return hidpp10_constants.PowerSwitchLocation(index).name.lower() return hidpp10_constants.PowerSwitchLocation.location(index).name.lower()
def extract_connection_count(response: bytes) -> int: def extract_connection_count(response: bytes) -> int:

View File

@ -1167,11 +1167,11 @@ class Multiplatform(settings.Setting):
if version == 0: if version == 0:
return "" return ""
elif version & 0xFF: elif version & 0xFF:
return str(version >> 8) + "." + str(version & 0xFF) return f"{str(version >> 8)}.{str(version & 0xFF)}"
else: else:
return str(version >> 8) return str(version >> 8)
return "" if low == 0 and high == 0 else " " + _str_os_version(low) + "-" + _str_os_version(high) return "" if low == 0 and high == 0 else f" {_str_os_version(low)}-{_str_os_version(high)}"
infos = device.feature_request(_F.MULTIPLATFORM) infos = device.feature_request(_F.MULTIPLATFORM)
assert infos, "Oops, multiplatform count cannot be retrieved!" assert infos, "Oops, multiplatform count cannot be retrieved!"
@ -1227,7 +1227,7 @@ class ChangeHost(settings.Setting):
choices = common.NamedInts() choices = common.NamedInts()
for host in range(0, numHosts): for host in range(0, numHosts):
paired, hostName = hostNames.get(host, (True, "")) paired, hostName = hostNames.get(host, (True, ""))
choices[host] = str(host + 1) + ":" + hostName if hostName else str(host + 1) choices[host] = f"{str(host + 1)}:{hostName}" if hostName else str(host + 1)
return cls(choices=choices, read_skip_byte_count=1) if choices and len(choices) > 1 else None return cls(choices=choices, read_skip_byte_count=1) if choices and len(choices) > 1 else None
@ -1735,7 +1735,7 @@ class PerKeyLighting(settings.Settings):
key = ( key = (
setting_class.keys_universe[i] setting_class.keys_universe[i]
if i in setting_class.keys_universe if i in setting_class.keys_universe
else common.NamedInt(i, "KEY " + str(i)) else common.NamedInt(i, f"KEY {str(i)}")
) )
choices_map[key] = setting_class.choices_universe choices_map[key] = setting_class.choices_universe
result = cls(choices_map) if choices_map else None result = cls(choices_map) if choices_map else None

View File

@ -316,9 +316,9 @@ CONTROL = NamedInts(
) )
for i in range(1, 33): # add in G keys - these are not really Logitech Controls for i in range(1, 33): # add in G keys - these are not really Logitech Controls
CONTROL[0x1000 + i] = "G" + str(i) CONTROL[0x1000 + i] = f"G{str(i)}"
for i in range(1, 9): # add in M keys - these are not really Logitech Controls for i in range(1, 9): # add in M keys - these are not really Logitech Controls
CONTROL[0x1100 + i] = "M" + str(i) CONTROL[0x1100 + i] = f"M{str(i)}"
CONTROL[0x1200] = "MR" # add in MR key - this is not really a Logitech Control CONTROL[0x1200] = "MR" # add in MR key - this is not really a Logitech Control
CONTROL._fallback = lambda x: f"unknown:{x:04X}" CONTROL._fallback = lambda x: f"unknown:{x:04X}"

View File

@ -116,8 +116,7 @@ def _receivers(dev_path=None):
continue continue
try: try:
r = receiver.create_receiver(base, dev_info) r = receiver.create_receiver(base, dev_info)
if logger.isEnabledFor(logging.DEBUG): logger.debug("[%s] => %s", dev_info.path, r)
logger.debug("[%s] => %s", dev_info.path, r)
if r: if r:
yield r yield r
except Exception as e: except Exception as e:
@ -135,8 +134,7 @@ def _receivers_and_devices(dev_path=None):
else: else:
d = receiver.create_receiver(base, dev_info) d = receiver.create_receiver(base, dev_info)
if logger.isEnabledFor(logging.DEBUG): logger.debug("[%s] => %s", dev_info.path, d)
logger.debug("[%s] => %s", dev_info.path, d)
if d is not None: if d is not None:
yield d yield d
except Exception as e: except Exception as e:

View File

@ -45,29 +45,29 @@ def run(receivers, args, find_receiver, _ignore):
print("") print("")
print(" Register Dump") print(" Register Dump")
rgst = receiver.read_register(Registers.NOTIFICATIONS) rgst = receiver.read_register(Registers.NOTIFICATIONS)
print(" Notifications %#04x: %s" % (Registers.NOTIFICATIONS % 0x100, "0x" + strhex(rgst) if rgst else "None")) print(" Notifications %#04x: %s" % (Registers.NOTIFICATIONS % 0x100, f"0x{strhex(rgst)}" if rgst else "None"))
rgst = receiver.read_register(Registers.RECEIVER_CONNECTION) rgst = receiver.read_register(Registers.RECEIVER_CONNECTION)
print( print(
" Connection State %#04x: %s" " Connection State %#04x: %s"
% (Registers.RECEIVER_CONNECTION % 0x100, "0x" + strhex(rgst) if rgst else "None") % (Registers.RECEIVER_CONNECTION % 0x100, f"0x{strhex(rgst)}" if rgst else "None")
) )
rgst = receiver.read_register(Registers.DEVICES_ACTIVITY) rgst = receiver.read_register(Registers.DEVICES_ACTIVITY)
print( print(
" Device Activity %#04x: %s" % (Registers.DEVICES_ACTIVITY % 0x100, "0x" + strhex(rgst) if rgst else "None") " Device Activity %#04x: %s" % (Registers.DEVICES_ACTIVITY % 0x100, f"0x{strhex(rgst)}" if rgst else "None")
) )
for sub_reg in range(0, 16): for sub_reg in range(0, 16):
rgst = receiver.read_register(Registers.RECEIVER_INFO, sub_reg) rgst = receiver.read_register(Registers.RECEIVER_INFO, sub_reg)
print( print(
" Pairing Register %#04x %#04x: %s" " Pairing Register %#04x %#04x: %s"
% (Registers.RECEIVER_INFO % 0x100, sub_reg, "0x" + strhex(rgst) if rgst else "None") % (Registers.RECEIVER_INFO % 0x100, sub_reg, f"0x{strhex(rgst)}" if rgst else "None")
) )
for device in range(0, 7): for device in range(0, 7):
for sub_reg in [0x10, 0x20, 0x30, 0x50]: for sub_reg in [0x10, 0x20, 0x30, 0x50]:
rgst = receiver.read_register(Registers.RECEIVER_INFO, sub_reg + device) rgst = receiver.read_register(Registers.RECEIVER_INFO, sub_reg + device)
print( print(
" Pairing Register %#04x %#04x: %s" " Pairing Register %#04x %#04x: %s"
% (Registers.RECEIVER_INFO % 0x100, sub_reg + device, "0x" + strhex(rgst) if rgst else "None") % (Registers.RECEIVER_INFO % 0x100, sub_reg + device, f"0x{strhex(rgst)}" if rgst else "None")
) )
rgst = receiver.read_register(Registers.RECEIVER_INFO, 0x40 + device) rgst = receiver.read_register(Registers.RECEIVER_INFO, 0x40 + device)
print( print(
@ -90,7 +90,7 @@ def run(receivers, args, find_receiver, _ignore):
rgst = receiver.read_register(Registers.FIRMWARE, sub_reg) rgst = receiver.read_register(Registers.FIRMWARE, sub_reg)
print( print(
" Firmware %#04x %#04x: %s" " Firmware %#04x %#04x: %s"
% (Registers.FIRMWARE % 0x100, sub_reg, "0x" + strhex(rgst) if rgst is not None else "None") % (Registers.FIRMWARE % 0x100, sub_reg, f"0x{strhex(rgst)}" if rgst is not None else "None")
) )
print("") print("")

View File

@ -82,8 +82,8 @@ def _battery_line(dev):
level, nextLevel, status, voltage = battery.level, battery.next_level, battery.status, battery.voltage level, nextLevel, status, voltage = battery.level, battery.next_level, battery.status, battery.voltage
text = _battery_text(level) text = _battery_text(level)
if voltage is not None: if voltage is not None:
text = text + f" {voltage}mV " text = f"{text} {voltage}mV "
nextText = "" if nextLevel is None else ", next level " + _battery_text(nextLevel) nextText = "" if nextLevel is None else f", next level {_battery_text(nextLevel)}"
print(f" Battery: {text}, {status}{nextText}.") print(f" Battery: {text}, {status}{nextText}.")
else: else:
print(" Battery status unavailable.") print(" Battery status unavailable.")

View File

@ -26,8 +26,8 @@ def run(receivers, args, find_receiver, find_device):
if not dev.receiver.may_unpair: if not dev.receiver.may_unpair:
print( print(
"Receiver with USB id %s for %s [%s:%s] does not unpair, but attempting anyway." f"Receiver with USB id {dev.receiver.product_id} for {dev.name} [{dev.wpid}:{dev.serial}] does not unpair,",
% (dev.receiver.product_id, dev.name, dev.wpid, dev.serial) "but attempting anyway.",
) )
try: try:
# query these now, it's last chance to get them # query these now, it's last chance to get them

View File

@ -62,8 +62,7 @@ def _load():
loaded_config = _convert_json(loaded_config) loaded_config = _convert_json(loaded_config)
else: else:
path = None path = None
if logger.isEnabledFor(logging.DEBUG): logger.debug("load => %s", loaded_config)
logger.debug("load => %s", loaded_config)
global _config global _config
_config = _parse_config(loaded_config, path) _config = _parse_config(loaded_config, path)
@ -78,14 +77,13 @@ def _parse_config(loaded_config, config_path):
loaded_version = loaded_config[0] loaded_version = loaded_config[0]
discard_derived_properties = loaded_version != current_version discard_derived_properties = loaded_version != current_version
if discard_derived_properties: if discard_derived_properties:
if logger.isEnabledFor(logging.INFO): logger.info(
logger.info( "config file '%s' was generated by another version of solaar "
"config file '%s' was generated by another version of solaar " "(config: %s, current: %s). refreshing detected device capabilities",
"(config: %s, current: %s). refreshing detected device capabilities", config_path,
config_path, loaded_version,
loaded_version, current_version,
current_version, )
)
for device in loaded_config[1:]: for device in loaded_config[1:]:
assert isinstance(device, dict) assert isinstance(device, dict)
@ -154,8 +152,7 @@ def do_save():
try: try:
with open(_yaml_file_path, "w") as config_file: with open(_yaml_file_path, "w") as config_file:
yaml.dump(_config, config_file, default_flow_style=None, width=150) yaml.dump(_config, config_file, default_flow_style=None, width=150)
if logger.isEnabledFor(logging.INFO): logger.info("saved %s to %s", _config, _yaml_file_path)
logger.info("saved %s to %s", _config, _yaml_file_path)
except Exception as e: except Exception as e:
logger.error("failed to save to %s: %s", _yaml_file_path, e) logger.error("failed to save to %s: %s", _yaml_file_path, e)
@ -251,11 +248,9 @@ def persister(device):
break break
if not entry: if not entry:
if not device.online: # don't create entry for offline devices if not device.online: # don't create entry for offline devices
if logger.isEnabledFor(logging.INFO): logger.info("not setting up persister for offline device %s", device._name)
logger.info("not setting up persister for offline device %s", device._name)
return return
if logger.isEnabledFor(logging.INFO): logger.info("setting up persister for device %s", device.name)
logger.info("setting up persister for device %s", device.name)
entry = _DeviceEntry() entry = _DeviceEntry()
_config.append(entry) _config.append(entry)
entry.update(device.name, device.wpid, device.serial, modelId, unitId) entry.update(device.name, device.wpid, device.serial, modelId, unitId)

View File

@ -68,8 +68,7 @@ def watch_suspend_resume(
dbus_interface=_LOGIND_INTERFACE, dbus_interface=_LOGIND_INTERFACE,
path=_LOGIND_PATH, path=_LOGIND_PATH,
) )
if logger.isEnabledFor(logging.INFO): logger.info("connected to system dbus, watching for suspend/resume events")
logger.info("connected to system dbus, watching for suspend/resume events")
_BLUETOOTH_PATH_PREFIX = "/org/bluez/hci0/dev_" _BLUETOOTH_PATH_PREFIX = "/org/bluez/hci0/dev_"

View File

@ -134,9 +134,8 @@ def _parse_arguments():
logging.getLogger("").addHandler(stream_handler) logging.getLogger("").addHandler(stream_handler)
if not args.action: if not args.action:
if logger.isEnabledFor(logging.INFO): language, encoding = locale.getlocale()
language, encoding = locale.getlocale() logger.info("version %s, language %s (%s)", __version__, language, encoding)
logger.info("version %s, language %s (%s)", __version__, language, encoding)
return args return args

View File

@ -31,7 +31,7 @@ def _find_locale_path(locale_domain: str) -> str:
src_share = os.path.normpath(os.path.join(os.path.realpath(sys.path[0]), "..", "share")) src_share = os.path.normpath(os.path.join(os.path.realpath(sys.path[0]), "..", "share"))
for location in prefix_share, src_share: for location in prefix_share, src_share:
mo_files = glob(os.path.join(location, "locale", "*", "LC_MESSAGES", locale_domain + ".mo")) mo_files = glob(os.path.join(location, "locale", "*", "LC_MESSAGES", f"{locale_domain}.mo"))
if mo_files: if mo_files:
return os.path.join(location, "locale") return os.path.join(location, "locale")
raise FileNotFoundError(f"Could not find locale path for {locale_domain}") raise FileNotFoundError(f"Could not find locale path for {locale_domain}")
@ -47,7 +47,7 @@ def set_locale_to_system_default():
""" """
try: try:
locale.setlocale(locale.LC_ALL, "") locale.setlocale(locale.LC_ALL, "")
except PermissionError: except Exception:
pass pass
try: try:

View File

@ -54,18 +54,14 @@ logger = logging.getLogger(__name__)
ACTION_ADD = "add" ACTION_ADD = "add"
_GHOST_DEVICE = namedtuple("_GHOST_DEVICE", ("receiver", "number", "name", "kind", "online")) _GHOST_DEVICE = namedtuple("_GHOST_DEVICE", ("receiver", "number", "name", "kind", "online", "path"))
_GHOST_DEVICE.__bool__ = lambda self: False _GHOST_DEVICE.__bool__ = lambda self: False
_GHOST_DEVICE.__nonzero__ = _GHOST_DEVICE.__bool__ _GHOST_DEVICE.__nonzero__ = _GHOST_DEVICE.__bool__
def _ghost(device): def _ghost(device):
return _GHOST_DEVICE( return _GHOST_DEVICE(
receiver=device.receiver, receiver=device.receiver, number=device.number, name=device.name, kind=device.kind, online=False, path=None
number=device.number,
name=device.name,
kind=device.kind,
online=False,
) )
@ -79,15 +75,13 @@ class SolaarListener(listener.EventsListener):
receiver.status_callback = self._status_changed receiver.status_callback = self._status_changed
def has_started(self): def has_started(self):
if logger.isEnabledFor(logging.INFO): logger.info("%s: notifications listener has started (%s)", self.receiver, self.receiver.handle)
logger.info("%s: notifications listener has started (%s)", self.receiver, self.receiver.handle)
nfs = self.receiver.enable_connection_notifications() nfs = self.receiver.enable_connection_notifications()
if logger.isEnabledFor(logging.WARNING): if not self.receiver.isDevice and not ((nfs if nfs else 0) & hidpp10_constants.NotificationFlag.WIRELESS.value):
if not self.receiver.isDevice and not ((nfs if nfs else 0) & hidpp10_constants.NotificationFlag.WIRELESS.value): logger.warning(
logger.warning( "Receiver on %s might not support connection notifications, GUI might not show its devices",
"Receiver on %s might not support connection notifications, GUI might not show its devices", self.receiver.path,
self.receiver.path, )
)
self.receiver.notification_flags = nfs self.receiver.notification_flags = nfs
self.receiver.notify_devices() self.receiver.notify_devices()
self._status_changed(self.receiver) self._status_changed(self.receiver)
@ -95,8 +89,7 @@ class SolaarListener(listener.EventsListener):
def has_stopped(self): def has_stopped(self):
r, self.receiver = self.receiver, None r, self.receiver = self.receiver, None
assert r is not None assert r is not None
if logger.isEnabledFor(logging.INFO): logger.info("%s: notifications listener has stopped", r)
logger.info("%s: notifications listener has stopped", r)
# because udev is not notifying us about device removal, make sure to clean up in _all_listeners # because udev is not notifying us about device removal, make sure to clean up in _all_listeners
_all_listeners.pop(r.path, None) _all_listeners.pop(r.path, None)
@ -144,8 +137,7 @@ class SolaarListener(listener.EventsListener):
if not device: if not device:
# Device was unpaired, and isn't valid anymore. # Device was unpaired, and isn't valid anymore.
# We replace it with a ghost so that the UI has something to work with while cleaning up. # We replace it with a ghost so that the UI has something to work with while cleaning up.
if logger.isEnabledFor(logging.INFO): logger.info("device %s was unpaired, ghosting", device)
logger.info("device %s was unpaired, ghosting", device)
device = _ghost(device) device = _ghost(device)
self.status_changed_callback(device, alert, reason) self.status_changed_callback(device, alert, reason)
@ -163,20 +155,17 @@ class SolaarListener(listener.EventsListener):
# a notification that came in to the device listener - strange, but nothing needs to be done here # a notification that came in to the device listener - strange, but nothing needs to be done here
if self.receiver.isDevice: if self.receiver.isDevice:
if logger.isEnabledFor(logging.DEBUG): logger.debug("Notification %s via device %s being ignored.", n, self.receiver)
logger.debug("Notification %s via device %s being ignored.", n, self.receiver)
return return
# DJ pairing notification - ignore - hid++ 1.0 pairing notification is all that is needed # DJ pairing notification - ignore - hid++ 1.0 pairing notification is all that is needed
if n.sub_id == 0x41 and n.report_id == base.DJ_MESSAGE_ID: if n.sub_id == 0x41 and n.report_id == base.DJ_MESSAGE_ID:
if logger.isEnabledFor(logging.INFO): logger.info("ignoring DJ pairing notification %s", n)
logger.info("ignoring DJ pairing notification %s", n)
return return
# a device notification # a device notification
if not (0 < n.devnumber <= 16): # some receivers have devices past their max # devices if not (0 < n.devnumber <= 16): # some receivers have devices past their max # devices
if logger.isEnabledFor(logging.WARNING): logger.warning("Unexpected device number (%s) in notification %s.", n.devnumber, n)
logger.warning("Unexpected device number (%s) in notification %s.", n.devnumber, n)
return return
already_known = n.devnumber in self.receiver already_known = n.devnumber in self.receiver
@ -221,8 +210,7 @@ class SolaarListener(listener.EventsListener):
# Apply settings every time the device connects # Apply settings every time the device connects
if n.sub_id == 0x41: if n.sub_id == 0x41:
if logger.isEnabledFor(logging.INFO): logger.info("connection %s for device wpid %s kind %s serial %s", n, dev.wpid, dev.kind, dev._serial)
logger.info("connection %s for device wpid %s kind %s serial %s", n, dev.wpid, dev.kind, dev._serial)
# If there are saved configs, bring the device's settings up-to-date. # If there are saved configs, bring the device's settings up-to-date.
# They will be applied when the device is marked as online. # They will be applied when the device is marked as online.
configuration.attach_to(dev) configuration.attach_to(dev)
@ -234,10 +222,8 @@ class SolaarListener(listener.EventsListener):
if self.receiver.pairing.lock_open and not already_known: if self.receiver.pairing.lock_open and not already_known:
# this should be the first notification after a device was paired # this should be the first notification after a device was paired
if logger.isEnabledFor(logging.WARNING): logger.warning("first notification was not a connection notification")
logger.warning("first notification was not a connection notification") logger.info("%s: pairing detected new device", self.receiver)
if logger.isEnabledFor(logging.INFO):
logger.info("%s: pairing detected new device", self.receiver)
self.receiver.pairing.new_device = dev self.receiver.pairing.new_device = dev
elif dev.online is None: elif dev.online is None:
dev.ping() dev.ping()
@ -253,17 +239,16 @@ def _process_bluez_dbus(device: Device, path, dictionary: dict, signature):
if device: if device:
if dictionary.get("Connected") is not None: if dictionary.get("Connected") is not None:
connected = dictionary.get("Connected") connected = dictionary.get("Connected")
if logger.isEnabledFor(logging.INFO): logger.info("bluez dbus for %s: %s", device, "CONNECTED" if connected else "DISCONNECTED")
logger.info("bluez dbus for %s: %s", device, "CONNECTED" if connected else "DISCONNECTED")
device.changed(connected, reason=i18n._("connected") if connected else i18n._("disconnected")) device.changed(connected, reason=i18n._("connected") if connected else i18n._("disconnected"))
elif device is not None: elif device is not None:
if logger.isEnabledFor(logging.INFO): logger.info("bluez cleanup for %s", device)
logger.info("bluez cleanup for %s", device)
_cleanup_bluez_dbus(device) _cleanup_bluez_dbus(device)
def _cleanup_bluez_dbus(device: Device): def _cleanup_bluez_dbus(device: Device):
"""Remove dbus signal receiver for device""" """Remove dbus signal receiver for device"""
diversion_remove_inheritance
if device and logger.isEnabledFor(logging.INFO): if device and logger.isEnabledFor(logging.INFO):
logger.info(f"bluez cleanup for {device}") logger.info(f"bluez cleanup for {device}")
dbus.watch_bluez_connect(device.hid_serial, None) dbus.watch_bluez_connect(device.hid_serial, None)
@ -296,8 +281,7 @@ def _start(device_info: DeviceInfo):
def start_all(): def start_all():
stop_all() # just in case this it called twice in a row... stop_all() # just in case this it called twice in a row...
if logger.isEnabledFor(logging.INFO): logger.info("starting receiver listening threads")
logger.info("starting receiver listening threads")
for device_info in base.receivers_and_devices(): for device_info in base.receivers_and_devices():
_process_receiver_event(ACTION_ADD, device_info) _process_receiver_event(ACTION_ADD, device_info)
@ -306,8 +290,7 @@ def stop_all():
listeners = list(_all_listeners.values()) listeners = list(_all_listeners.values())
_all_listeners.clear() _all_listeners.clear()
if listeners: if listeners:
if logger.isEnabledFor(logging.INFO): logger.info("stopping receiver listening threads %s", listeners)
logger.info("stopping receiver listening threads %s", listeners)
for listener_thread in listeners: for listener_thread in listeners:
listener_thread.stop() listener_thread.stop()
configuration.save() configuration.save()
@ -319,8 +302,7 @@ def stop_all():
# after a resume, the device may have been off so mark its saved status to ensure # after a resume, the device may have been off so mark its saved status to ensure
# that the status is pushed to the device when it comes back # that the status is pushed to the device when it comes back
def ping_all(resuming=False): def ping_all(resuming=False):
if logger.isEnabledFor(logging.INFO): logger.info("ping all devices%s", " when resuming" if resuming else "")
logger.info("ping all devices%s", " when resuming" if resuming else "")
for listener_thread in _all_listeners.values(): for listener_thread in _all_listeners.values():
if listener_thread.receiver.isDevice: if listener_thread.receiver.isDevice:
if resuming: if resuming:
@ -363,8 +345,7 @@ def _process_add(device_info: DeviceInfo, retry):
if e.errno == errno.EACCES: if e.errno == errno.EACCES:
try: try:
output = subprocess.check_output(["/usr/bin/getfacl", "-p", device_info.path], text=True) output = subprocess.check_output(["/usr/bin/getfacl", "-p", device_info.path], text=True)
if logger.isEnabledFor(logging.WARNING): logger.warning("Missing permissions on %s\n%s.", device_info.path, output)
logger.warning("Missing permissions on %s\n%s.", device_info.path, output)
except Exception: except Exception:
pass pass
if retry: if retry:
@ -382,8 +363,7 @@ def _process_receiver_event(action, device_info):
assert action is not None assert action is not None
assert device_info is not None assert device_info is not None
assert _error_callback assert _error_callback
if logger.isEnabledFor(logging.INFO): logger.info("receiver event %s %s", action, device_info)
logger.info("receiver event %s %s", action, device_info)
# whatever the action, stop any previous receivers at this path # whatever the action, stop any previous receivers at this path
listener_thread = _all_listeners.pop(device_info.path, None) listener_thread = _all_listeners.pop(device_info.path, None)
if listener_thread is not None: if listener_thread is not None:

View File

@ -46,8 +46,7 @@ class TaskRunner(Thread):
def run(self): def run(self):
self.alive = True self.alive = True
if logger.isEnabledFor(logging.DEBUG): logger.debug("started")
logger.debug("started")
while self.alive: while self.alive:
task = self.queue.get() task = self.queue.get()
@ -59,5 +58,4 @@ class TaskRunner(Thread):
except Exception: except Exception:
logger.exception("calling %s", function) logger.exception("calling %s", function)
if logger.isEnabledFor(logging.DEBUG): logger.debug("stopped")
logger.debug("stopped")

View File

@ -56,8 +56,7 @@ class GtkSignal(Enum):
def _startup(app, startup_hook, use_tray, show_window): def _startup(app, startup_hook, use_tray, show_window):
if logger.isEnabledFor(logging.DEBUG): logger.debug("startup registered=%s, remote=%s", app.get_is_registered(), app.get_is_remote())
logger.debug("startup registered=%s, remote=%s", app.get_is_registered(), app.get_is_remote())
common.start_async() common.start_async()
desktop_notifications.init() desktop_notifications.init()
if use_tray: if use_tray:
@ -67,8 +66,7 @@ def _startup(app, startup_hook, use_tray, show_window):
def _activate(app): def _activate(app):
if logger.isEnabledFor(logging.DEBUG): logger.debug("activate")
logger.debug("activate")
if app.get_windows(): if app.get_windows():
window.popup() window.popup()
else: else:
@ -81,8 +79,7 @@ def _command_line(app, command_line):
if not args: if not args:
_activate(app) _activate(app)
elif args[0] == "config": # config call from remote instance elif args[0] == "config": # config call from remote instance
if logger.isEnabledFor(logging.INFO): logger.info("remote command line %s", args)
logger.info("remote command line %s", args)
dev = find_device(args[1]) dev = find_device(args[1])
if dev: if dev:
setting = next((s for s in dev.settings if s.name == args[2]), None) setting = next((s for s in dev.settings if s.name == args[2]), None)
@ -92,8 +89,7 @@ def _command_line(app, command_line):
def _shutdown(_app, shutdown_hook): def _shutdown(_app, shutdown_hook):
if logger.isEnabledFor(logging.DEBUG): logger.debug("shutdown")
logger.debug("shutdown")
shutdown_hook() shutdown_hook()
common.stop_async() common.stop_async()
tray.destroy() tray.destroy()
@ -127,8 +123,7 @@ def run_loop(
def _status_changed(device, alert, reason, refresh=False): def _status_changed(device, alert, reason, refresh=False):
assert device is not None assert device is not None
if logger.isEnabledFor(logging.DEBUG): logger.debug("status changed: %s (%s) %s", device, alert, reason)
logger.debug("status changed: %s (%s) %s", device, alert, reason)
if alert is None: if alert is None:
alert = Alert.NONE alert = Alert.NONE

View File

@ -34,7 +34,7 @@ class AboutView:
def init_ui(self) -> None: def init_ui(self) -> None:
self.view = Gtk.AboutDialog() self.view = Gtk.AboutDialog()
self.view.set_program_name(NAME) self.view.set_program_name(NAME)
self.view.set_icon_name(NAME.lower()) self.view.set_logo_icon_name(NAME.lower())
self.view.set_license_type(Gtk.License.GPL_2_0) self.view.set_license_type(Gtk.License.GPL_2_0)
self.view.connect(GtkSignal.RESPONSE.value, lambda x, y: self.handle_close(x)) self.view.connect(GtkSignal.RESPONSE.value, lambda x, y: self.handle_close(x))

View File

@ -55,8 +55,7 @@ def _read_async(setting, force_read, sbox, device_is_online, sensitive):
v = s.read(not force) v = s.read(not force)
except Exception as e: except Exception as e:
v = None v = None
if logger.isEnabledFor(logging.WARNING): logger.warning("%s: error reading so use None (%s): %s", s.name, s._device, repr(e))
logger.warning("%s: error reading so use None (%s): %s", s.name, s._device, repr(e))
GLib.idle_add(_update_setting_item, sb, v, online, sensitive, True, priority=99) GLib.idle_add(_update_setting_item, sb, v, online, sensitive, True, priority=99)
ui_async(_do_read, setting, force_read, sbox, device_is_online, sensitive) ui_async(_do_read, setting, force_read, sbox, device_is_online, sensitive)
@ -232,6 +231,9 @@ class ChoiceControlBig(Gtk.Entry, Control):
key = self.get_text() key = self.get_text()
return next((x for x in self.choices if x == key), None) return next((x for x in self.choices if x == key), None)
def set_choices(self, choices):
self.choices = choices
def changed(self, *args): def changed(self, *args):
self.value = self.get_choice() self.value = self.get_choice()
icon = "dialog-warning" if self.value is None else "dialog-question" if self.get_sensitive() else "" icon = "dialog-warning" if self.value is None else "dialog-question" if self.get_sensitive() else ""
@ -284,7 +286,6 @@ class MapChoiceControl(Gtk.HBox, Control):
choices = self.sbox.setting.choices[key_choice] choices = self.sbox.setting.choices[key_choice]
if choices != self.value_choices: if choices != self.value_choices:
self.value_choices = choices self.value_choices = choices
self.valueBox.remove_all()
self.valueBox.set_choices(choices) self.valueBox.set_choices(choices)
current = self.sbox.setting._value.get(key_choice) if self.sbox.setting._value else None current = self.sbox.setting._value.get(key_choice) if self.sbox.setting._value else None
if current is not None: if current is not None:
@ -387,7 +388,7 @@ class MultipleToggleControl(MultipleControl):
elem.set_state(v) elem.set_state(v)
if elem.get_state(): if elem.get_state():
active += 1 active += 1
to_join.append(lbl.get_text() + ": " + str(elem.get_state())) to_join.append(f"{lbl.get_text()}: {str(elem.get_state())}")
b = ", ".join(to_join) b = ", ".join(to_join)
self._button.set_label(f"{active} / {total}") self._button.set_label(f"{active} / {total}")
self._button.set_tooltip_text(b) self._button.set_tooltip_text(b)
@ -471,7 +472,7 @@ class MultipleRangeControl(MultipleControl):
item = ch._setting_item item = ch._setting_item
v = value.get(int(item), None) v = value.get(int(item), None)
if v is not None: if v is not None:
b += str(item) + ": (" b += f"{str(item)}: ("
to_join = [] to_join = []
for c in ch._sub_items: for c in ch._sub_items:
sub_item = c._setting_sub_item sub_item = c._setting_sub_item
@ -481,7 +482,7 @@ class MultipleRangeControl(MultipleControl):
sub_item_value = c._control.get_value() sub_item_value = c._control.get_value()
c._control.set_value(sub_item_value) c._control.set_value(sub_item_value)
n += 1 n += 1
to_join.append(str(sub_item) + f"={sub_item_value}") to_join.append(f"{str(sub_item)}={sub_item_value}")
b += ", ".join(to_join) + ") " b += ", ".join(to_join) + ") "
lbl_text = ngettext("%d value", "%d values", n) % n lbl_text = ngettext("%d value", "%d values", n) % n
self._button.set_label(lbl_text) self._button.set_label(lbl_text)
@ -534,7 +535,7 @@ class PackedRangeControl(MultipleRangeControl):
h.control.set_value(v) h.control.set_value(v)
else: else:
v = self.sbox.setting._value[int(item)] v = self.sbox.setting._value[int(item)]
b += str(item) + ": (" + str(v) + ") " b += f"{str(item)}: ({str(v)}) "
lbl_text = ngettext("%d value", "%d values", n) % n lbl_text = ngettext("%d value", "%d values", n) % n
self._button.set_label(lbl_text) self._button.set_label(lbl_text)
self._button.set_tooltip_text(b) self._button.set_tooltip_text(b)
@ -694,8 +695,7 @@ def _create_sbox(s, _device):
elif s.kind == settings.Kind.HETERO: elif s.kind == settings.Kind.HETERO:
control = HeteroKeyControl(sbox, change) control = HeteroKeyControl(sbox, change)
else: else:
if logger.isEnabledFor(logging.WARNING): logger.warning("setting %s display not implemented", s.label)
logger.warning("setting %s display not implemented", s.label)
return None return None
control.set_sensitive(False) # the first read will enable it control.set_sensitive(False) # the first read will enable it
@ -717,7 +717,10 @@ def _update_setting_item(sbox, value, is_online=True, sensitive=True, null_okay=
return return
sbox._failed.set_visible(False) sbox._failed.set_visible(False)
sbox._control.set_sensitive(False) sbox._control.set_sensitive(False)
sbox._control.set_value(value) try: # a call was producing a TypeError so guard against that
sbox._control.set_value(value)
except TypeError as e:
logger.warning("%s: error setting control value (%s): %s", sbox.setting.name, sbox.setting._device, repr(e))
sbox._control.set_sensitive(sensitive is True) sbox._control.set_sensitive(sensitive is True)
_change_icon(sensitive, sbox._change_icon) _change_icon(sensitive, sbox._change_icon)
@ -821,10 +824,9 @@ def record_setting(device, setting, values):
def _record_setting(device, setting_class, values): def _record_setting(device, setting_class, values):
if logger.isEnabledFor(logging.DEBUG): logger.debug("on %s changing setting %s to %s", device, setting_class.name, values)
logger.debug("on %s changing setting %s to %s", device, setting_class.name, values)
setting = next((s for s in device.settings if s.name == setting_class.name), None) setting = next((s for s in device.settings if s.name == setting_class.name), None)
if setting is None and logger.isEnabledFor(logging.DEBUG): if setting is None:
logger.debug( logger.debug(
"No setting for %s found on %s when trying to record a change made elsewhere", "No setting for %s found on %s when trying to record a change made elsewhere",
setting_class.name, setting_class.name,

View File

@ -58,8 +58,7 @@ if available:
global available global available
if available: if available:
if not Notify.is_initted(): if not Notify.is_initted():
if logger.isEnabledFor(logging.INFO): logger.info("starting desktop notifications")
logger.info("starting desktop notifications")
try: try:
return Notify.init(NAME.lower()) return Notify.init(NAME.lower())
except Exception: except Exception:
@ -70,8 +69,7 @@ if available:
def uninit(): def uninit():
"""Stop desktop notifications.""" """Stop desktop notifications."""
if available and Notify.is_initted(): if available and Notify.is_initted():
if logger.isEnabledFor(logging.INFO): logger.info("stopping desktop notifications")
logger.info("stopping desktop notifications")
_notifications.clear() _notifications.clear()
Notify.uninit() Notify.uninit()
@ -124,7 +122,7 @@ if available:
n.set_hint("value", GLib.Variant("i", progress)) n.set_hint("value", GLib.Variant("i", progress))
try: try:
n.show() return n.show()
except Exception: except Exception:
logger.exception(f"showing {n}") logger.exception(f"showing {n}")

View File

@ -1101,9 +1101,12 @@ class UnsupportedRuleComponentUI(RuleComponentUI):
def create_widgets(self): def create_widgets(self):
self.label = Gtk.Label(valign=Gtk.Align.CENTER, hexpand=True) self.label = Gtk.Label(valign=Gtk.Align.CENTER, hexpand=True)
self.label.set_text(_("This editor does not support the selected rule component yet.")) self.label.set_text(_("This editor does not support the selected rule component yet.") if self.component else "")
self.widgets[self.label] = (0, 0, 1, 1) self.widgets[self.label] = (0, 0, 1, 1)
def collect_value(self):
return self.component.components[:] # not editable on the bottom panel
@classmethod @classmethod
def right_label(cls, component): def right_label(cls, component):
return str(component) return str(component)

View File

@ -37,8 +37,7 @@ def _init_icon_paths():
return return
_default_theme = Gtk.IconTheme.get_default() _default_theme = Gtk.IconTheme.get_default()
if logger.isEnabledFor(logging.DEBUG): logger.debug("icon theme paths: %s", _default_theme.get_search_path())
logger.debug("icon theme paths: %s", _default_theme.get_search_path())
if gtk.battery_icons_style == "symbolic": if gtk.battery_icons_style == "symbolic":
global TRAY_OKAY global TRAY_OKAY
@ -57,8 +56,7 @@ def battery(level=None, charging=False):
if not _default_theme.has_icon(icon_name): if not _default_theme.has_icon(icon_name):
logger.warning("icon %s not found in current theme", icon_name) logger.warning("icon %s not found in current theme", icon_name)
return TRAY_OKAY # use Solaar icon if battery icon not available return TRAY_OKAY # use Solaar icon if battery icon not available
elif logger.isEnabledFor(logging.DEBUG): logger.debug("battery icon for %s:%s = %s", level, charging, icon_name)
logger.debug("battery icon for %s:%s = %s", level, charging, icon_name)
return icon_name return icon_name
@ -105,7 +103,7 @@ def device_icon_set(name="_", kind=None):
icon_set += ("input-mouse",) icon_set += ("input-mouse",)
elif str(kind) == "headset": elif str(kind) == "headset":
icon_set += ("audio-headphones", "audio-headset") icon_set += ("audio-headphones", "audio-headset")
icon_set += ("input-" + str(kind),) icon_set += (f"input-{str(kind)}",)
# icon_set += (name.replace(' ', '-'),) # icon_set += (name.replace(' ', '-'),)
_ICON_SETS[name] = icon_set _ICON_SETS[name] = icon_set
return icon_set return icon_set

View File

@ -99,8 +99,7 @@ def prepare(receiver):
def check_lock_state(assistant, receiver, count=2): def check_lock_state(assistant, receiver, count=2):
if not assistant.is_drawable(): if not assistant.is_drawable():
if logger.isEnabledFor(logging.DEBUG): logger.debug("assistant %s destroyed, bailing out", assistant)
logger.debug("assistant %s destroyed, bailing out", assistant)
return False return False
return _check_lock_state(assistant, receiver, count) return _check_lock_state(assistant, receiver, count)
@ -136,21 +135,18 @@ def _check_lock_state(assistant, receiver, count):
def _pairing_failed(assistant, receiver, error): def _pairing_failed(assistant, receiver, error):
assistant.remove_page(0) # needed to reset the window size assistant.remove_page(0) # needed to reset the window size
if logger.isEnabledFor(logging.DEBUG): logger.debug("%s fail: %s", receiver, error)
logger.debug("%s fail: %s", receiver, error)
_create_failure_page(assistant, error) _create_failure_page(assistant, error)
def _pairing_succeeded(assistant, receiver, device): def _pairing_succeeded(assistant, receiver, device):
assistant.remove_page(0) # needed to reset the window size assistant.remove_page(0) # needed to reset the window size
if logger.isEnabledFor(logging.DEBUG): logger.debug("%s success: %s", receiver, device)
logger.debug("%s success: %s", receiver, device)
_create_success_page(assistant, device) _create_success_page(assistant, device)
def _finish(assistant, receiver): def _finish(assistant, receiver):
if logger.isEnabledFor(logging.DEBUG): logger.debug("finish %s", assistant)
logger.debug("finish %s", assistant)
assistant.destroy() assistant.destroy()
receiver.pairing.new_device = None receiver.pairing.new_device = None
if receiver.pairing.lock_open: if receiver.pairing.lock_open:
@ -165,8 +161,7 @@ def _finish(assistant, receiver):
def _show_passcode(assistant, receiver, passkey): def _show_passcode(assistant, receiver, passkey):
if logger.isEnabledFor(logging.DEBUG): logger.debug("%s show passkey: %s", receiver, passkey)
logger.debug("%s show passkey: %s", receiver, passkey)
name = receiver.pairing.device_name name = receiver.pairing.device_name
authentication = receiver.pairing.device_authentication authentication = receiver.pairing.device_authentication
intro_text = _("%(receiver_name)s: pair new device") % {"receiver_name": receiver.name} intro_text = _("%(receiver_name)s: pair new device") % {"receiver_name": receiver.name}

View File

@ -138,7 +138,7 @@ class KeyPressUI(ActionUI):
@classmethod @classmethod
def right_label(cls, component): def right_label(cls, component):
return " + ".join(component.key_names) + (" (" + component.action + ")" if component.action != CLICK else "") return " + ".join(component.key_names) + (f" ({component.action})" if component.action != CLICK else "")
class MouseScrollUI(ActionUI): class MouseScrollUI(ActionUI):

View File

@ -383,7 +383,7 @@ class TestUI(ConditionUI):
@classmethod @classmethod
def right_label(cls, component): def right_label(cls, component):
return component.test + (" " + repr(component.parameter) if component.parameter is not None else "") return component.test + (f" {repr(component.parameter)}" if component.parameter is not None else "")
@dataclass @dataclass

View File

@ -132,8 +132,7 @@ def _scroll(tray_icon, event, direction=None):
_picked_device = None _picked_device = None
_picked_device = candidate or _picked_device _picked_device = candidate or _picked_device
if logger.isEnabledFor(logging.DEBUG): logger.debug("scroll: picked %s", _picked_device)
logger.debug("scroll: picked %s", _picked_device)
_update_tray_icon() _update_tray_icon()
@ -153,8 +152,7 @@ try:
# treat unavailable versions the same as unavailable packages # treat unavailable versions the same as unavailable packages
raise ImportError from exc raise ImportError from exc
if logger.isEnabledFor(logging.DEBUG): logger.debug(f"using {'Ayatana ' if ayatana_appindicator_found else ''}AppIndicator3")
logger.debug(f"using {'Ayatana ' if ayatana_appindicator_found else ''}AppIndicator3")
# Defense against AppIndicator3 bug that treats files in current directory as icon files # Defense against AppIndicator3 bug that treats files in current directory as icon files
# https://bugs.launchpad.net/ubuntu/+source/libappindicator/+bug/1363277 # https://bugs.launchpad.net/ubuntu/+source/libappindicator/+bug/1363277
@ -212,8 +210,7 @@ try:
GLib.timeout_add(10 * 1000, _icon.set_status, AppIndicator3.IndicatorStatus.ACTIVE) GLib.timeout_add(10 * 1000, _icon.set_status, AppIndicator3.IndicatorStatus.ACTIVE)
except ImportError: except ImportError:
if logger.isEnabledFor(logging.DEBUG): logger.debug("using StatusIcon")
logger.debug("using StatusIcon")
def _create(menu): def _create(menu):
icon = Gtk.StatusIcon.new_from_icon_name(icons.TRAY_INIT) icon = Gtk.StatusIcon.new_from_icon_name(icons.TRAY_INIT)
@ -317,8 +314,7 @@ def _pick_device_with_lowest_battery():
picked = info picked = info
picked_level = level or 0 picked_level = level or 0
if logger.isEnabledFor(logging.DEBUG): logger.debug("picked device with lowest battery: %s", picked)
logger.debug("picked device with lowest battery: %s", picked)
return picked return picked

View File

@ -411,8 +411,7 @@ def _receiver_row(receiver_path, receiver=None):
status_icon = None status_icon = None
row_data = (receiver_path, 0, True, receiver.name, icon_name, status_text, status_icon, receiver) row_data = (receiver_path, 0, True, receiver.name, icon_name, status_text, status_icon, receiver)
assert len(row_data) == len(_TREE_SEPATATOR) assert len(row_data) == len(_TREE_SEPATATOR)
if logger.isEnabledFor(logging.DEBUG): logger.debug("new receiver row %s", row_data)
logger.debug("new receiver row %s", row_data)
item = _model.append(None, row_data) item = _model.append(None, row_data)
if _TREE_SEPATATOR: if _TREE_SEPATATOR:
_model.append(None, _TREE_SEPATATOR) _model.append(None, _TREE_SEPATATOR)
@ -465,8 +464,7 @@ def _device_row(receiver_path, device_number, device=None):
device, device,
) )
assert len(row_data) == len(_TREE_SEPATATOR) assert len(row_data) == len(_TREE_SEPATATOR)
if logger.isEnabledFor(logging.DEBUG): logger.debug("new device row %s at index %d", row_data, new_child_index)
logger.debug("new device row %s at index %d", row_data, new_child_index)
item = _model.insert(receiver_row, new_child_index, row_data) item = _model.insert(receiver_row, new_child_index, row_data)
return item or None return item or None
@ -566,7 +564,7 @@ def _update_details(button):
def _make_text(items): def _make_text(items):
text = "\n".join("%-13s: %s" % (name, value) for name, value in items) text = "\n".join("%-13s: %s" % (name, value) for name, value in items)
return "<small><tt>" + text + "</tt></small>" return f"<small><tt>{text}</tt></small>"
def _displayable_items(items): def _displayable_items(items):
for name, value in items: for name, value in items:
@ -868,7 +866,7 @@ def update(device, need_popup=False, refresh=False):
else: else:
path = device.receiver.path if device.receiver is not None else device.path path = device.receiver.path if device.receiver is not None else device.path
assert device.number is not None and device.number >= 0, "invalid device number" + str(device.number) assert device.number is not None and device.number >= 0, f"invalid device number{str(device.number)}"
item = _device_row(path, device.number, device if bool(device) else None) item = _device_row(path, device.number, device if bool(device) else None)
if bool(device) and item: if bool(device) and item:

View File

@ -16,6 +16,7 @@ nav:
- Translation: i18n.md - Translation: i18n.md
- Implementation: implementation.md - Implementation: implementation.md
- Installation: installation.md - Installation: installation.md
- Uninstallation: uninstallation.md
- Rules: rules.md - Rules: rules.md
- Usage: usage.md - Usage: usage.md

View File

@ -76,12 +76,13 @@ setup(
"psutil (>= 5.4.3)", "psutil (>= 5.4.3)",
'dbus-python ; platform_system=="Linux"', 'dbus-python ; platform_system=="Linux"',
"PyGObject", "PyGObject",
"typing_extensions",
], ],
extras_require={ extras_require={
"report-descriptor": ["hid-parser"], "report-descriptor": ["hid-parser"],
"desktop-notifications": ["Notify (>= 0.7)"], "desktop-notifications": ["Notify (>= 0.7)"],
"git-commit": ["python-git-info"], "git-commit": ["python-git-info"],
"test": ["pytest", "pytest-mock", "pytest-cov", "typing_extensions"], "test": ["pytest", "pytest-mock", "pytest-cov"],
"dev": ["ruff"], "dev": ["ruff"],
}, },
package_dir={"": "lib"}, package_dir={"": "lib"},

View File

@ -2,22 +2,28 @@ from unittest import mock
from logitech_receiver import desktop_notifications from logitech_receiver import desktop_notifications
# depends on external environment, so make some tests dependent on availability
def test_notifications_available():
result = desktop_notifications.notifications_available()
assert not result
def test_init(): def test_init():
assert not desktop_notifications.init() result = desktop_notifications.init()
assert result == desktop_notifications.available
def test_uninit(): def test_uninit():
assert desktop_notifications.uninit() is None assert desktop_notifications.uninit() is None
class MockDevice(mock.Mock):
name = "MockDevice"
def close():
return True
def test_show(): def test_show():
dev = mock.MagicMock() dev = MockDevice()
reason = "unknown" reason = "unknown"
assert desktop_notifications.show(dev, reason) is None result = desktop_notifications.show(dev, reason)
assert result is not None if desktop_notifications.available else result is None

View File

@ -107,7 +107,7 @@ def test_get_battery_voltage():
feature, battery = _hidpp20.get_battery_voltage(device) feature, battery = _hidpp20.get_battery_voltage(device)
assert feature == SupportedFeature.BATTERY_VOLTAGE assert feature == SupportedFeature.BATTERY_VOLTAGE
assert battery.level == 90 assert battery.level == 92
assert common.BatteryStatus.RECHARGING in battery.status assert common.BatteryStatus.RECHARGING in battery.status
assert battery.voltage == 0x1000 assert battery.voltage == 0x1000
@ -130,7 +130,7 @@ def test_get_adc_measurement():
feature, battery = _hidpp20.get_adc_measurement(device) feature, battery = _hidpp20.get_adc_measurement(device)
assert feature == SupportedFeature.ADC_MEASUREMENT assert feature == SupportedFeature.ADC_MEASUREMENT
assert battery.level == 90 assert battery.level == 92
assert battery.status == common.BatteryStatus.RECHARGING assert battery.status == common.BatteryStatus.RECHARGING
assert battery.voltage == 0x1000 assert battery.voltage == 0x1000
@ -389,7 +389,7 @@ def test_decipher_battery_voltage():
feature, battery = hidpp20.decipher_battery_voltage(report) feature, battery = hidpp20.decipher_battery_voltage(report)
assert feature == SupportedFeature.BATTERY_VOLTAGE assert feature == SupportedFeature.BATTERY_VOLTAGE
assert battery.level == 90 assert battery.level == 92
assert common.BatteryStatus.RECHARGING in battery.status assert common.BatteryStatus.RECHARGING in battery.status
assert battery.voltage == 0x1000 assert battery.voltage == 0x1000
@ -410,7 +410,7 @@ def test_decipher_adc_measurement():
feature, battery = hidpp20.decipher_adc_measurement(report) feature, battery = hidpp20.decipher_adc_measurement(report)
assert feature == SupportedFeature.ADC_MEASUREMENT assert feature == SupportedFeature.ADC_MEASUREMENT
assert battery.level == 90 assert battery.level == 92
assert battery.status == common.BatteryStatus.RECHARGING assert battery.status == common.BatteryStatus.RECHARGING
assert battery.voltage == 0x1000 assert battery.voltage == 0x1000
@ -449,3 +449,36 @@ def test_feature_flag_names(code, expected_flags):
) )
def test_led_zone_locations(code, expected_name): def test_led_zone_locations(code, expected_name):
assert hidpp20.LEDZoneLocations[code] == expected_name assert hidpp20.LEDZoneLocations[code] == expected_name
@pytest.mark.parametrize(
"millivolt, expected_percentage",
[
(-1234, 0),
(500, 0),
(2000, 0),
(3500, 0),
(3519, 0),
(3520, 1),
(3559, 1),
(3579, 2),
(3646, 5),
(3671, 10),
(3717, 20),
(3751, 30),
(3778, 40),
(3811, 50),
(3859, 60),
(3922, 70),
(3989, 80),
(4067, 90),
(4180, 99),
(4181, 100),
(4186, 100),
(4500, 100),
],
)
def test_estimate_battery_level_percentage(millivolt, expected_percentage):
percentage = hidpp20.estimate_battery_level_percentage(millivolt)
assert percentage == expected_percentage

View File

@ -259,12 +259,22 @@ def test_extract_codename():
assert codename == "K520" assert codename == "K520"
def test_extract_power_switch_location(): @pytest.mark.parametrize(
response = b"0\x19\x8e>\xb8\x06\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00" "power_switch_byte, expected_location",
[
(b"\x01", "base"),
(b"\x09", "top_edge"),
(b"\x0c", "bottom_edge"),
(b"\x00", "unknown"),
(b"\x0f", "unknown"),
],
)
def test_extract_power_switch_location(power_switch_byte, expected_location):
response = b"\x19\x8e>\xb8\x06\x00\x00\x00\x00" + power_switch_byte + b"\x00\x00\x00\x00\x00"
ps_location = receiver.extract_power_switch_location(response) ps_location = receiver.extract_power_switch_location(response)
assert ps_location == "base" assert ps_location == expected_location
def test_extract_connection_count(): def test_extract_connection_count():

View File

@ -502,7 +502,8 @@ def test_simple_template(test, mocker, mock_gethostname):
assert setting.choices == test.choices assert setting.choices == test.choices
value = setting.read(cached=False) value = setting.read(cached=False)
assert value == tst.initial_value unreadable = hasattr(setting._rw, "read_fnid") and setting._rw.read_fnid is None
assert value == (tst.initial_value if not unreadable else None)
cached_value = setting.read(cached=True) cached_value = setting.read(cached=True)
assert cached_value == tst.initial_value assert cached_value == tst.initial_value

View File

@ -2,15 +2,13 @@ from unittest import mock
from solaar.ui import desktop_notifications from solaar.ui import desktop_notifications
# depends on external environment, so make some tests dependent on availability
def test_notifications_available():
result = desktop_notifications.notifications_available()
assert not result
def test_init(): def test_init():
assert not desktop_notifications.init() result = desktop_notifications.init()
assert result == desktop_notifications.available
def test_uninit(): def test_uninit():
@ -22,7 +20,20 @@ def test_alert():
assert desktop_notifications.alert(reason) is None assert desktop_notifications.alert(reason) is None
class MockDevice(mock.Mock):
name = "MockDevice"
def close():
return True
def test_show(): def test_show():
dev = mock.MagicMock() dev = MockDevice()
reason = "unknown" reason = "unknown"
assert desktop_notifications.show(dev, reason) is None available = desktop_notifications.init()
result = desktop_notifications.show(dev, reason)
if available:
assert result is not None
else:
assert result is None