Clean up imports (#2537)

* Remove import as _ in solaar startup

Related #2273

* Remove import as _ in listener

Related #2273

* Remove import as _ in cli init

Related #2273

* Remove import as _ in gtk

Related #2273

* Remove import as _ in show

Related #2273

* Remove import as _ in tray

Related #2273

* Remove import as _ in profiles

Related #2273

* Remove import as _ in config

Related #2273

* Remove import as _ in config panel

Related #2273

* Remove import as _ in window

Related #2273

* Remove import as _ in pair

Related #2273

* Remove import as _ in pair window

Related #2273

* Remove import as _ in cli package

Related #2273

* Remove import as _ in ui package

Related #2273

* Remove commented out code

Related #2273

* Use constant for Logitech ID
This commit is contained in:
MattHag 2024-07-15 14:37:18 +02:00 committed by GitHub
parent d9d67ed738
commit 67829c5807
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 368 additions and 427 deletions

View File

@ -21,10 +21,9 @@
def init_paths():
"""Make the app work in the source tree."""
import os.path as _path
import os.path
import sys
# Python 2 need conversion from utf-8 filenames
# Python 3 might have problems converting back to UTF-8 in case of Unicode surrogates
try:
decoded_path = sys.path[0]
@ -33,18 +32,17 @@ def init_paths():
except UnicodeError:
sys.stderr.write(
"ERROR: Solaar cannot recognize encoding of filesystem path, "
"this may happen because non UTF-8 characters in the pathname.\n"
"this may happen due to non UTF-8 characters in the pathname.\n"
)
sys.exit(1)
prefix = _path.normpath(_path.join(_path.realpath(decoded_path), ".."))
src_lib = _path.join(prefix, "lib")
share_lib = _path.join(prefix, "share", "solaar", "lib")
root = os.path.join(os.path.realpath(decoded_path), "..")
prefix = os.path.normpath(root)
src_lib = os.path.join(prefix, "lib")
share_lib = os.path.join(prefix, "share", "solaar", "lib")
for location in src_lib, share_lib:
init_py = _path.join(location, "solaar", "__init__.py")
# print ("sys.path[0]: checking", init_py)
if _path.exists(init_py):
# print ("sys.path[0]: found", location, "replacing", sys.path[0])
init_py = os.path.join(location, "solaar", "__init__.py")
if os.path.exists(init_py):
sys.path[0] = location
break

View File

@ -29,6 +29,8 @@ from threading import Thread
import hidapi
LOGITECH_VENDOR_ID = 0x046D
interactive = os.isatty(0)
prompt = "?? Input: " if interactive else ""
start_time = time.time()
@ -126,8 +128,8 @@ def _validate_input(line, hidpp=False):
def _open(args):
def matchfn(bid, vid, pid, _a, _b):
if vid == 0x046D:
return {"vid": 0x046D}
if vid == LOGITECH_VENDOR_ID:
return {"vid": vid}
device = args.device
if args.hidpp and not device:

View File

@ -153,8 +153,6 @@ def _match(action, device, filterfn):
return d_info
elif action == "remove":
# print (dict(device), dict(usb_device))
d_info = DeviceInfo(
path=device.device_node,
bus_id=None,
@ -217,16 +215,6 @@ def find_paired_node_wpid(receiver_path, index):
def monitor_glib(callback, filterfn):
c = pyudev.Context()
# already existing devices
# for device in c.list_devices(subsystem='hidraw'):
# # print (device, dict(device), dict(device.attributes))
# for filter in device_filters:
# d_info = _match('add', device, *filter)
# if d_info:
# GLib.idle_add(callback, 'add', d_info)
# break
m = pyudev.Monitor.from_netlink(c)
m.filter_by(subsystem="hidraw")
@ -248,15 +236,12 @@ def monitor_glib(callback, filterfn):
try:
# io_add_watch_full may not be available...
GLib.io_add_watch_full(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, filterfn)
# print ("did io_add_watch_full")
except AttributeError:
try:
# and the priority parameter appeared later in the API
GLib.io_add_watch(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, filterfn)
# print ("did io_add_watch with priority")
except Exception:
GLib.io_add_watch(m, GLib.IO_IN, _process_udev_event, callback, filterfn)
# print ("did io_add_watch")
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Starting dbus monitoring")

View File

@ -14,20 +14,18 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import pkgutil as _pkgutil
import subprocess as _subprocess
import sys as _sys
import pkgutil
import subprocess
import sys
NAME = "Solaar"
try:
__version__ = (
_subprocess.check_output(["git", "describe", "--always"], cwd=_sys.path[0], stderr=_subprocess.DEVNULL)
.strip()
.decode()
subprocess.check_output(["git", "describe", "--always"], cwd=sys.path[0], stderr=subprocess.DEVNULL).strip().decode()
)
except Exception:
try:
__version__ = _pkgutil.get_data("solaar", "commit").strip().decode()
__version__ = pkgutil.get_data("solaar", "commit").strip().decode()
except Exception:
__version__ = _pkgutil.get_data("solaar", "version").strip().decode()
__version__ = pkgutil.get_data("solaar", "version").strip().decode()

View File

@ -14,18 +14,17 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import argparse as _argparse
import argparse
import logging
import sys as _sys
import sys
from importlib import import_module
from traceback import extract_tb
from traceback import format_exc
import logitech_receiver.device as _device
import logitech_receiver.receiver as _receiver
from logitech_receiver import base
from logitech_receiver import device
from logitech_receiver import receiver
from solaar import NAME
@ -33,7 +32,7 @@ logger = logging.getLogger(__name__)
def _create_parser():
parser = _argparse.ArgumentParser(
parser = argparse.ArgumentParser(
prog=NAME.lower(), add_help=False, epilog=f"For details on individual actions, run `{NAME.lower()} <action> --help`."
)
subparsers = parser.add_subparsers(title="actions", help="optional action to perform")
@ -107,14 +106,14 @@ def _receivers(dev_path=None):
if dev_path is not None and dev_path != dev_info.path:
continue
try:
r = _receiver.ReceiverFactory.create_receiver(dev_info)
r = receiver.ReceiverFactory.create_receiver(dev_info)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, r)
if r:
yield r
except Exception as e:
logger.exception("opening " + str(dev_info))
_sys.exit(f"{NAME.lower()}: error: {str(e)}")
sys.exit(f"{NAME.lower()}: error: {str(e)}")
def _receivers_and_devices(dev_path=None):
@ -123,9 +122,9 @@ def _receivers_and_devices(dev_path=None):
continue
try:
if dev_info.isDevice:
d = _device.DeviceFactory.create_device(base, dev_info)
d = device.DeviceFactory.create_device(base, dev_info)
else:
d = _receiver.ReceiverFactory.create_receiver(dev_info)
d = receiver.ReceiverFactory.create_receiver(dev_info)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, d)
@ -133,7 +132,7 @@ def _receivers_and_devices(dev_path=None):
yield d
except Exception as e:
logger.exception("opening " + str(dev_info))
_sys.exit(f"{NAME.lower()}: error: {str(e)}")
sys.exit(f"{NAME.lower()}: error: {str(e)}")
def _find_receiver(receivers, name):
@ -185,9 +184,6 @@ def _find_device(receivers, name):
break
# raise Exception("no device found matching '%s'" % name)
def run(cli_args=None, hidraw_path=None):
if cli_args:
action = cli_args[0]
@ -197,9 +193,9 @@ def run(cli_args=None, hidraw_path=None):
# Python 3 has an undocumented 'feature' that breaks parsing empty args
# http://bugs.python.org/issue16308
if "cmd" not in args:
_cli_parser.print_usage(_sys.stderr)
_sys.stderr.write(f"{NAME.lower()}: error: too few arguments\n")
_sys.exit(2)
_cli_parser.print_usage(sys.stderr)
sys.stderr.write(f"{NAME.lower()}: error: too few arguments\n")
sys.exit(2)
action = args.action
assert action in actions
@ -215,7 +211,7 @@ def run(cli_args=None, hidraw_path=None):
m = import_module("." + action, package=__name__)
m.run(c, args, _find_receiver, _find_device)
except AssertionError:
tb_last = extract_tb(_sys.exc_info()[2])[-1]
_sys.exit(f"{NAME.lower()}: assertion failed: {tb_last[0]} line {int(tb_last[1])}")
tb_last = extract_tb(sys.exc_info()[2])[-1]
sys.exit(f"{NAME.lower()}: assertion failed: {tb_last[0]} line {int(tb_last[1])}")
except Exception:
_sys.exit(f"{NAME.lower()}: error: {format_exc()}")
sys.exit(f"{NAME.lower()}: error: {format_exc()}")

View File

@ -14,13 +14,13 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import yaml as _yaml
import yaml
from logitech_receiver import settings as _settings
from logitech_receiver import settings_templates as _settings_templates
from logitech_receiver.common import NamedInts as _NamedInts
from logitech_receiver import settings
from logitech_receiver import settings_templates
from logitech_receiver.common import NamedInts
from solaar import configuration as _configuration
from solaar import configuration
def _print_setting(s, verbose=True):
@ -28,9 +28,9 @@ def _print_setting(s, verbose=True):
if verbose:
if s.description:
print("#", s.description.replace("\n", " "))
if s.kind == _settings.KIND.toggle:
if s.kind == settings.KIND.toggle:
print("# possible values: on/true/t/yes/y/1 or off/false/f/no/n/0 or Toggle/~")
elif s.kind == _settings.KIND.choice:
elif s.kind == settings.KIND.choice:
print(
"# possible values: one of [",
", ".join(str(v) for v in s.choices),
@ -51,7 +51,7 @@ def _print_setting_keyed(s, key, verbose=True):
if verbose:
if s.description:
print("#", s.description.replace("\n", " "))
if s.kind == _settings.KIND.multiple_toggle:
if s.kind == settings.KIND.multiple_toggle:
k = next((k for k in s._labels if key == k), None)
if k is None:
print(s.name, "=? (key not found)")
@ -62,7 +62,7 @@ def _print_setting_keyed(s, key, verbose=True):
print(s.name, "= ? (failed to read from device)")
else:
print(s.name, s.val_to_string({k: value[str(int(k))]}))
elif s.kind == _settings.KIND.map_choice:
elif s.kind == settings.KIND.map_choice:
k = next((k for k in s.choices.keys() if key == k), None)
if k is None:
print(s.name, "=? (key not found)")
@ -158,8 +158,7 @@ def run(receivers, args, find_receiver, find_device):
if not args.setting: # print all settings, so first set them all up
if not dev.settings:
raise Exception(f"no settings for {dev.name}")
_configuration.attach_to(dev)
# _settings.apply_all_settings(dev)
configuration.attach_to(dev)
print(dev.name, f"({dev.codename}) [{dev.wpid}:{dev.serial}]")
for s in dev.settings:
print("")
@ -167,7 +166,7 @@ def run(receivers, args, find_receiver, find_device):
return
setting_name = args.setting.lower()
setting = _settings_templates.check_feature_setting(dev, setting_name)
setting = settings_templates.check_feature_setting(dev, setting_name)
if not setting and dev.descriptor and dev.descriptor.settings:
for sclass in dev.descriptor.settings:
if sclass.register and sclass.name == setting_name:
@ -179,7 +178,6 @@ def run(receivers, args, find_receiver, find_device):
raise Exception(f"no setting '{args.setting}' for {dev.name}")
if args.value_key is None:
# setting.apply()
_print_setting(setting)
return
@ -210,32 +208,32 @@ def run(receivers, args, find_receiver, find_device):
if remote:
argl = ["config", dev.serial or dev.unitId, setting.name]
argl.extend([a for a in [args.value_key, args.extra_subkey, args.extra2] if a is not None])
application.run(_yaml.safe_dump(argl))
application.run(yaml.safe_dump(argl))
else:
if dev.persister and setting.persist:
dev.persister[setting.name] = setting._value
def set(dev, setting, args, save):
if setting.kind == _settings.KIND.toggle:
if setting.kind == settings.KIND.toggle:
value = select_toggle(args.value_key, setting)
args.value_key = value
message = f"Setting {setting.name} of {dev.name} to {value}"
result = setting.write(value, save=save)
elif setting.kind == _settings.KIND.range:
elif setting.kind == settings.KIND.range:
value = select_range(args.value_key, setting)
args.value_key = value
message = f"Setting {setting.name} of {dev.name} to {value}"
result = setting.write(value, save=save)
elif setting.kind == _settings.KIND.choice:
elif setting.kind == settings.KIND.choice:
value = select_choice(args.value_key, setting.choices, setting, None)
args.value_key = int(value)
message = f"Setting {setting.name} of {dev.name} to {value}"
result = setting.write(value, save=save)
elif setting.kind == _settings.KIND.map_choice:
elif setting.kind == settings.KIND.map_choice:
if args.extra_subkey is None:
_print_setting_keyed(setting, args.value_key)
return (None, None, None)
@ -253,13 +251,13 @@ def set(dev, setting, args, save):
message = f"Setting {setting.name} of {dev.name} key {k!r} to {value!r}"
result = setting.write_key_value(int(k), value, save=save)
elif setting.kind == _settings.KIND.multiple_toggle:
elif setting.kind == settings.KIND.multiple_toggle:
if args.extra_subkey is None:
_print_setting_keyed(setting, args.value_key)
return (None, None, None)
key = args.value_key
all_keys = getattr(setting, "choices_universe", None)
ikey = all_keys[int(key) if key.isdigit() else key] if isinstance(all_keys, _NamedInts) else to_int(key)
ikey = all_keys[int(key) if key.isdigit() else key] if isinstance(all_keys, NamedInts) else to_int(key)
k = next((k for k in setting._labels if key == k), None)
if k is None and ikey is not None:
k = next((k for k in setting._labels if ikey == k), None)
@ -272,12 +270,12 @@ def set(dev, setting, args, save):
message = f"Setting {setting.name} key {k!r} to {value!r}"
result = setting.write_key_value(str(int(k)), value, save=save)
elif setting.kind == _settings.KIND.multiple_range:
elif setting.kind == settings.KIND.multiple_range:
if args.extra_subkey is None:
raise Exception(f"{setting.name}: setting needs both key and value to set")
key = args.value_key
all_keys = getattr(setting, "choices_universe", None)
ikey = all_keys[int(key) if key.isdigit() else key] if isinstance(all_keys, _NamedInts) else to_int(key)
ikey = all_keys[int(key) if key.isdigit() else key] if isinstance(all_keys, NamedInts) else to_int(key)
if args.extra2 is None or to_int(args.extra2) is None:
raise Exception(f"{setting.name}: setting needs an integer value, not {args.extra2}")
if not setting._value: # ensure that there are values to look through

View File

@ -14,12 +14,12 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from time import time as _timestamp
from time import time
from logitech_receiver import base as _base
from logitech_receiver import base
from logitech_receiver import hidpp10
from logitech_receiver import hidpp10_constants as _hidpp10_constants
from logitech_receiver import notifications as _notifications
from logitech_receiver import hidpp10_constants
from logitech_receiver import notifications
_hidpp10 = hidpp10.Hidpp10()
@ -39,8 +39,8 @@ def run(receivers, args, find_receiver, _ignore):
# check if it's necessary to set the notification flags
old_notification_flags = _hidpp10.get_notification_flags(receiver) or 0
if not (old_notification_flags & _hidpp10_constants.NOTIFICATION_FLAG.wireless):
_hidpp10.set_notification_flags(receiver, old_notification_flags | _hidpp10_constants.NOTIFICATION_FLAG.wireless)
if not (old_notification_flags & hidpp10_constants.NOTIFICATION_FLAG.wireless):
_hidpp10.set_notification_flags(receiver, old_notification_flags | hidpp10_constants.NOTIFICATION_FLAG.wireless)
# get all current devices
known_devices = [dev.number for dev in receiver]
@ -50,8 +50,8 @@ def run(receivers, args, find_receiver, _ignore):
nonlocal known_devices
assert n
if n.devnumber == 0xFF:
_notifications.process(receiver, n)
elif n.sub_id == 0x41 and len(n.data) == _base._SHORT_MESSAGE_SIZE - 4:
notifications.process(receiver, n)
elif n.sub_id == 0x41 and len(n.data) == base._SHORT_MESSAGE_SIZE - 4:
kd, known_devices = known_devices, None # only process one connection notification
if kd is not None:
if n.devnumber not in kd:
@ -66,13 +66,13 @@ def run(receivers, args, find_receiver, _ignore):
if receiver.receiver_kind == "bolt": # Bolt receivers require authentication to pair a device
receiver.discover(timeout=timeout)
print("Bolt Pairing: long-press the pairing key or button on your device (timing out in", timeout, "seconds).")
pairing_start = _timestamp()
pairing_start = time()
patience = 5 # the discovering notification may come slightly later, so be patient
while receiver.pairing.discovering or _timestamp() - pairing_start < patience:
while receiver.pairing.discovering or time() - pairing_start < patience:
if receiver.pairing.device_address and receiver.pairing.device_authentication and receiver.pairing.device_name:
break
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
n = base.read(receiver.handle)
n = base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
address = receiver.pairing.device_address
@ -83,15 +83,15 @@ def run(receivers, args, find_receiver, _ignore):
receiver.pair_device(
address=address,
authentication=authentication,
entropy=20 if kind == _hidpp10_constants.DEVICE_KIND.keyboard else 10,
entropy=20 if kind == hidpp10_constants.DEVICE_KIND.keyboard else 10,
)
pairing_start = _timestamp()
pairing_start = time()
patience = 5 # the discovering notification may come slightly later, so be patient
while receiver.pairing.lock_open or _timestamp() - pairing_start < patience:
while receiver.pairing.lock_open or time() - pairing_start < patience:
if receiver.pairing.device_passkey:
break
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
n = base.read(receiver.handle)
n = base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
if authentication & 0x01:
@ -102,24 +102,24 @@ def run(receivers, args, find_receiver, _ignore):
print(f"Bolt Pairing: press {passkey}")
print("and then press left and right buttons simultaneously")
while receiver.pairing.lock_open:
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
n = base.read(receiver.handle)
n = base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
else:
receiver.set_lock(False, timeout=timeout)
print("Pairing: turn your new device on (timing out in", timeout, "seconds).")
pairing_start = _timestamp()
pairing_start = time()
patience = 5 # the lock-open notification may come slightly later, wait for it a bit
while receiver.pairing.lock_open or _timestamp() - pairing_start < patience:
n = _base.read(receiver.handle)
while receiver.pairing.lock_open or time() - pairing_start < patience:
n = base.read(receiver.handle)
if n:
n = _base.make_notification(*n)
n = base.make_notification(*n)
if n:
receiver.handle.notifications_hook(n)
if not (old_notification_flags & _hidpp10_constants.NOTIFICATION_FLAG.wireless):
if not (old_notification_flags & hidpp10_constants.NOTIFICATION_FLAG.wireless):
# only clear the flags if they weren't set before, otherwise a
# concurrently running Solaar app might stop working properly
_hidpp10.set_notification_flags(receiver, old_notification_flags)

View File

@ -14,10 +14,10 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from logitech_receiver import base as _base
from logitech_receiver import hidpp10_constants as _hidpp10_constants
from logitech_receiver.common import strhex as _strhex
from logitech_receiver.hidpp10_constants import Registers as Reg
from logitech_receiver import base
from logitech_receiver import hidpp10_constants
from logitech_receiver.common import strhex
from logitech_receiver.hidpp10_constants import Registers
from solaar.cli.show import _print_device
from solaar.cli.show import _print_receiver
@ -44,37 +44,42 @@ def run(receivers, args, find_receiver, _ignore):
print("")
print(" Register Dump")
rgst = receiver.read_register(Reg.NOTIFICATIONS)
print(" Notifications %#04x: %s" % (Reg.NOTIFICATIONS % 0x100, "0x" + _strhex(rgst) if rgst else "None"))
rgst = receiver.read_register(Reg.RECEIVER_CONNECTION)
print(" Connection State %#04x: %s" % (Reg.RECEIVER_CONNECTION % 0x100, "0x" + _strhex(rgst) if rgst else "None"))
rgst = receiver.read_register(Reg.DEVICES_ACTIVITY)
print(" Device Activity %#04x: %s" % (Reg.DEVICES_ACTIVITY % 0x100, "0x" + _strhex(rgst) if rgst else "None"))
rgst = receiver.read_register(Registers.NOTIFICATIONS)
print(" Notifications %#04x: %s" % (Registers.NOTIFICATIONS % 0x100, "0x" + strhex(rgst) if rgst else "None"))
rgst = receiver.read_register(Registers.RECEIVER_CONNECTION)
print(
" Connection State %#04x: %s"
% (Registers.RECEIVER_CONNECTION % 0x100, "0x" + strhex(rgst) if rgst else "None")
)
rgst = receiver.read_register(Registers.DEVICES_ACTIVITY)
print(
" Device Activity %#04x: %s" % (Registers.DEVICES_ACTIVITY % 0x100, "0x" + strhex(rgst) if rgst else "None")
)
for sub_reg in range(0, 16):
rgst = receiver.read_register(Reg.RECEIVER_INFO, sub_reg)
rgst = receiver.read_register(Registers.RECEIVER_INFO, sub_reg)
print(
" Pairing Register %#04x %#04x: %s"
% (Reg.RECEIVER_INFO % 0x100, sub_reg, "0x" + _strhex(rgst) if rgst else "None")
% (Registers.RECEIVER_INFO % 0x100, sub_reg, "0x" + strhex(rgst) if rgst else "None")
)
for device in range(0, 7):
for sub_reg in [0x10, 0x20, 0x30, 0x50]:
rgst = receiver.read_register(Reg.RECEIVER_INFO, sub_reg + device)
rgst = receiver.read_register(Registers.RECEIVER_INFO, sub_reg + device)
print(
" Pairing Register %#04x %#04x: %s"
% (Reg.RECEIVER_INFO % 0x100, sub_reg + device, "0x" + _strhex(rgst) if rgst else "None")
% (Registers.RECEIVER_INFO % 0x100, sub_reg + device, "0x" + strhex(rgst) if rgst else "None")
)
rgst = receiver.read_register(Reg.RECEIVER_INFO, 0x40 + device)
rgst = receiver.read_register(Registers.RECEIVER_INFO, 0x40 + device)
print(
" Pairing Name %#04x %#02x: %s"
% (Reg.RECEIVER_INFO % 0x100, 0x40 + device, rgst[2 : 2 + ord(rgst[1:2])] if rgst else "None")
% (Registers.RECEIVER_INFO % 0x100, 0x40 + device, rgst[2 : 2 + ord(rgst[1:2])] if rgst else "None")
)
for part in range(1, 4):
rgst = receiver.read_register(Reg.RECEIVER_INFO, 0x60 + device, part)
rgst = receiver.read_register(Registers.RECEIVER_INFO, 0x60 + device, part)
print(
" Pairing Name %#04x %#02x %#02x: %2d %s"
% (
Reg.RECEIVER_INFO % 0x100,
Registers.RECEIVER_INFO % 0x100,
0x60 + device,
part,
ord(rgst[2:3]) if rgst else 0,
@ -82,39 +87,39 @@ def run(receivers, args, find_receiver, _ignore):
)
)
for sub_reg in range(0, 5):
rgst = receiver.read_register(Reg.FIRMWARE, sub_reg)
rgst = receiver.read_register(Registers.FIRMWARE, sub_reg)
print(
" Firmware %#04x %#04x: %s"
% (Reg.FIRMWARE % 0x100, sub_reg, "0x" + _strhex(rgst) if rgst is not None else "None")
% (Registers.FIRMWARE % 0x100, sub_reg, "0x" + strhex(rgst) if rgst is not None else "None")
)
print("")
for reg in range(0, 0xFF):
last = None
for sub in range(0, 0xFF):
rgst = _base.request(receiver.handle, 0xFF, 0x8100 | reg, sub, return_error=True)
if isinstance(rgst, int) and rgst == _hidpp10_constants.ERROR.invalid_address:
rgst = base.request(receiver.handle, 0xFF, 0x8100 | reg, sub, return_error=True)
if isinstance(rgst, int) and rgst == hidpp10_constants.ERROR.invalid_address:
break
elif isinstance(rgst, int) and rgst == _hidpp10_constants.ERROR.invalid_value:
elif isinstance(rgst, int) and rgst == hidpp10_constants.ERROR.invalid_value:
continue
else:
if not isinstance(last, bytes) or not isinstance(rgst, bytes) or last != rgst:
print(
" Register Short %#04x %#04x: %s"
% (reg, sub, "0x" + _strhex(rgst) if isinstance(rgst, bytes) else str(rgst))
% (reg, sub, "0x" + strhex(rgst) if isinstance(rgst, bytes) else str(rgst))
)
last = rgst
last = None
for sub in range(0, 0xFF):
rgst = _base.request(receiver.handle, 0xFF, 0x8100 | (0x200 + reg), sub, return_error=True)
if isinstance(rgst, int) and rgst == _hidpp10_constants.ERROR.invalid_address:
rgst = base.request(receiver.handle, 0xFF, 0x8100 | (0x200 + reg), sub, return_error=True)
if isinstance(rgst, int) and rgst == hidpp10_constants.ERROR.invalid_address:
break
elif isinstance(rgst, int) and rgst == _hidpp10_constants.ERROR.invalid_value:
elif isinstance(rgst, int) and rgst == hidpp10_constants.ERROR.invalid_value:
continue
else:
if not isinstance(last, bytes) or not isinstance(rgst, bytes) or last != rgst:
print(
" Register Long %#04x %#04x: %s"
% (reg, sub, "0x" + _strhex(rgst) if isinstance(rgst, bytes) else str(rgst))
% (reg, sub, "0x" + strhex(rgst) if isinstance(rgst, bytes) else str(rgst))
)
last = rgst

View File

@ -14,12 +14,12 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import traceback as _traceback
import traceback
import yaml as _yaml
import yaml
from logitech_receiver.hidpp20 import OnboardProfiles as _OnboardProfiles
from logitech_receiver.hidpp20 import OnboardProfilesVersion as _OnboardProfilesVersion
from logitech_receiver.hidpp20 import OnboardProfiles
from logitech_receiver.hidpp20 import OnboardProfilesVersion
def run(receivers, args, find_receiver, find_device):
@ -42,15 +42,15 @@ def run(receivers, args, find_receiver, find_device):
print(f"Device {dev.name} is either offline or has no onboard profiles")
elif not profiles_file:
print(f"#Dumping profiles from {dev.name}")
print(_yaml.dump(dev.profiles))
print(yaml.dump(dev.profiles))
else:
try:
with open(profiles_file, "r") as f:
print(f"Reading profiles from {profiles_file}")
profiles = _yaml.safe_load(f)
if not isinstance(profiles, _OnboardProfiles):
profiles = yaml.safe_load(f)
if not isinstance(profiles, OnboardProfiles):
print("Profiles file does not contain current onboard profiles")
elif getattr(profiles, "version", None) != _OnboardProfilesVersion:
elif getattr(profiles, "version", None) != OnboardProfilesVersion:
version = getattr(profiles, "version", None)
print(f"Missing or incorrect profile version {version} in loaded profile")
elif getattr(profiles, "name", None) != dev.name:
@ -62,4 +62,4 @@ def run(receivers, args, find_receiver, find_device):
print(f"Wrote {written} sectors to {dev.name}")
except Exception as exc:
print("Profiles not written:", exc)
print(_traceback.format_exc())
print(traceback.format_exc())

View File

@ -16,13 +16,14 @@
from logitech_receiver import exceptions
from logitech_receiver import hidpp10
from logitech_receiver import hidpp10_constants as _hidpp10_constants
from logitech_receiver import hidpp10_constants
from logitech_receiver import hidpp20
from logitech_receiver import hidpp20_constants as _hidpp20_constants
from logitech_receiver import receiver as _receiver
from logitech_receiver import settings_templates as _settings_templates
from logitech_receiver.common import NamedInt as _NamedInt
from logitech_receiver.common import strhex as _strhex
from logitech_receiver import hidpp20_constants
from logitech_receiver import receiver
from logitech_receiver import settings_templates
from logitech_receiver.common import LOGITECH_VENDOR_ID
from logitech_receiver.common import NamedInt
from logitech_receiver.common import strhex
from solaar import NAME
from solaar import __version__
@ -36,7 +37,7 @@ def _print_receiver(receiver):
print(receiver.name)
print(" Device path :", receiver.path)
print(f" USB id : 046d:{receiver.product_id}")
print(f" USB id : {LOGITECH_VENDOR_ID:04x}:{receiver.product_id}")
print(" Serial :", receiver.serial)
pending = hidpp10.get_configuration_pending_flags(receiver)
if pending:
@ -52,12 +53,12 @@ def _print_receiver(receiver):
notification_flags = _hidpp10.get_notification_flags(receiver)
if notification_flags is not None:
if notification_flags:
notification_names = _hidpp10_constants.NOTIFICATION_FLAG.flag_names(notification_flags)
notification_names = hidpp10_constants.NOTIFICATION_FLAG.flag_names(notification_flags)
print(f" Notifications: {', '.join(notification_names)} (0x{notification_flags:06X})")
else:
print(" Notifications: (none)")
activity = receiver.read_register(_hidpp10_constants.Registers.DEVICES_ACTIVITY)
activity = receiver.read_register(hidpp10_constants.Registers.DEVICES_ACTIVITY)
if activity:
activity = [(d, ord(activity[d - 1 : d])) for d in range(1, receiver.max_devices)]
activity_text = ", ".join(f"{int(d)}={int(a)}" for d, a in activity if a > 0)
@ -67,7 +68,7 @@ def _print_receiver(receiver):
def _battery_text(level) -> str:
if level is None:
return "N/A"
elif isinstance(level, _NamedInt):
elif isinstance(level, NamedInt):
return str(level)
else:
return f"{int(level)}%"
@ -103,7 +104,7 @@ def _print_device(dev, num=None):
if dev.wpid:
print(f" WPID : {dev.wpid}")
if dev.product_id:
print(f" USB id : 046d:{dev.product_id}")
print(f" USB id : {LOGITECH_VENDOR_ID:04x}:{dev.product_id}")
print(" Codename :", dev.codename)
print(" Kind :", dev.kind)
if dev.protocol:
@ -128,14 +129,14 @@ def _print_device(dev, num=None):
notification_flags = _hidpp10.get_notification_flags(dev)
if notification_flags is not None:
if notification_flags:
notification_names = _hidpp10_constants.NOTIFICATION_FLAG.flag_names(notification_flags)
notification_names = hidpp10_constants.NOTIFICATION_FLAG.flag_names(notification_flags)
print(f" Notifications: {', '.join(notification_names)} (0x{notification_flags:06X}).")
else:
print(" Notifications: (none).")
device_features = _hidpp10.get_device_features(dev)
if device_features is not None:
if device_features:
device_features_names = _hidpp10_constants.DEVICE_FEATURES.flag_names(device_features)
device_features_names = hidpp10_constants.DEVICE_FEATURES.flag_names(device_features)
print(f" Features: {', '.join(device_features_names)} (0x{device_features:06X})")
else:
print(" Features: (none)")
@ -143,15 +144,15 @@ def _print_device(dev, num=None):
if dev.online and dev.features:
print(f" Supports {len(dev.features)} HID++ 2.0 features:")
dev_settings = []
_settings_templates.check_feature_settings(dev, dev_settings)
settings_templates.check_feature_settings(dev, dev_settings)
for feature, index in dev.features.enumerate():
flags = dev.request(0x0000, feature.bytes(2))
flags = 0 if flags is None else ord(flags[1:2])
flags = _hidpp20_constants.FEATURE_FLAG.flag_names(flags)
flags = hidpp20_constants.FEATURE_FLAG.flag_names(flags)
version = dev.features.get_feature_version(int(feature))
version = version if version else 0
print(" %2d: %-22s {%04X} V%s %s " % (index, feature, feature, version, ", ".join(flags)))
if feature == _hidpp20_constants.FEATURE.HIRES_WHEEL:
if feature == hidpp20_constants.FEATURE.HIRES_WHEEL:
wheel = _hidpp20.get_hires_wheel(dev)
if wheel:
multi, has_invert, has_switch, inv, res, target, ratchet = wheel
@ -168,7 +169,7 @@ def _print_device(dev, num=None):
print(" HID++ notification")
else:
print(" HID notification")
elif feature == _hidpp20_constants.FEATURE.MOUSE_POINTER:
elif feature == hidpp20_constants.FEATURE.MOUSE_POINTER:
mouse_pointer = _hidpp20.get_mouse_pointer_info(dev)
if mouse_pointer:
print(f" DPI: {mouse_pointer['dpi']}")
@ -181,13 +182,13 @@ def _print_device(dev, num=None):
print(" Provide vertical tuning, trackball")
else:
print(" No vertical tuning, standard mice")
elif feature == _hidpp20_constants.FEATURE.VERTICAL_SCROLLING:
elif feature == hidpp20_constants.FEATURE.VERTICAL_SCROLLING:
vertical_scrolling_info = _hidpp20.get_vertical_scrolling_info(dev)
if vertical_scrolling_info:
print(f" Roller type: {vertical_scrolling_info['roller']}")
print(f" Ratchet per turn: {vertical_scrolling_info['ratchet']}")
print(f" Scroll lines: {vertical_scrolling_info['lines']}")
elif feature == _hidpp20_constants.FEATURE.HI_RES_SCROLLING:
elif feature == hidpp20_constants.FEATURE.HI_RES_SCROLLING:
scrolling_mode, scrolling_resolution = _hidpp20.get_hi_res_scrolling_info(dev)
if scrolling_mode:
print(" Hi-res scrolling enabled")
@ -195,49 +196,49 @@ def _print_device(dev, num=None):
print(" Hi-res scrolling disabled")
if scrolling_resolution:
print(f" Hi-res scrolling multiplier: {scrolling_resolution}")
elif feature == _hidpp20_constants.FEATURE.POINTER_SPEED:
elif feature == hidpp20_constants.FEATURE.POINTER_SPEED:
pointer_speed = _hidpp20.get_pointer_speed_info(dev)
if pointer_speed:
print(f" Pointer Speed: {pointer_speed}")
elif feature == _hidpp20_constants.FEATURE.LOWRES_WHEEL:
elif feature == hidpp20_constants.FEATURE.LOWRES_WHEEL:
wheel_status = _hidpp20.get_lowres_wheel_status(dev)
if wheel_status:
print(f" Wheel Reports: {wheel_status}")
elif feature == _hidpp20_constants.FEATURE.NEW_FN_INVERSION:
elif feature == hidpp20_constants.FEATURE.NEW_FN_INVERSION:
inversion = _hidpp20.get_new_fn_inversion(dev)
if inversion:
inverted, default_inverted = inversion
print(" Fn-swap:", "enabled" if inverted else "disabled")
print(" Fn-swap default:", "enabled" if default_inverted else "disabled")
elif feature == _hidpp20_constants.FEATURE.HOSTS_INFO:
elif feature == hidpp20_constants.FEATURE.HOSTS_INFO:
host_names = _hidpp20.get_host_names(dev)
for host, (paired, name) in host_names.items():
print(f" Host {host} ({'paired' if paired else 'unpaired'}): {name}")
elif feature == _hidpp20_constants.FEATURE.DEVICE_NAME:
elif feature == hidpp20_constants.FEATURE.DEVICE_NAME:
print(f" Name: {_hidpp20.get_name(dev)}")
print(f" Kind: {_hidpp20.get_kind(dev)}")
elif feature == _hidpp20_constants.FEATURE.DEVICE_FRIENDLY_NAME:
elif feature == hidpp20_constants.FEATURE.DEVICE_FRIENDLY_NAME:
print(f" Friendly Name: {_hidpp20.get_friendly_name(dev)}")
elif feature == _hidpp20_constants.FEATURE.DEVICE_FW_VERSION:
elif feature == hidpp20_constants.FEATURE.DEVICE_FW_VERSION:
for fw in _hidpp20.get_firmware(dev):
extras = _strhex(fw.extras) if fw.extras else ""
extras = strhex(fw.extras) if fw.extras else ""
print(f" Firmware: {fw.kind} {fw.name} {fw.version} {extras}")
ids = _hidpp20.get_ids(dev)
if ids:
unitId, modelId, tid_map = ids
print(f" Unit ID: {unitId} Model ID: {modelId} Transport IDs: {tid_map}")
elif (
feature == _hidpp20_constants.FEATURE.REPORT_RATE
or feature == _hidpp20_constants.FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE
feature == hidpp20_constants.FEATURE.REPORT_RATE
or feature == hidpp20_constants.FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE
):
print(f" Report Rate: {_hidpp20.get_polling_rate(dev)}")
elif feature == _hidpp20_constants.FEATURE.CONFIG_CHANGE:
response = dev.feature_request(_hidpp20_constants.FEATURE.CONFIG_CHANGE, 0x00)
elif feature == hidpp20_constants.FEATURE.CONFIG_CHANGE:
response = dev.feature_request(hidpp20_constants.FEATURE.CONFIG_CHANGE, 0x00)
print(f" Configuration: {response.hex()}")
elif feature == _hidpp20_constants.FEATURE.REMAINING_PAIRING:
elif feature == hidpp20_constants.FEATURE.REMAINING_PAIRING:
print(f" Remaining Pairings: {int(_hidpp20.get_remaining_pairing(dev))}")
elif feature == _hidpp20_constants.FEATURE.ONBOARD_PROFILES:
if _hidpp20.get_onboard_mode(dev) == _hidpp20_constants.ONBOARD_MODES.MODE_HOST:
elif feature == hidpp20_constants.FEATURE.ONBOARD_PROFILES:
if _hidpp20.get_onboard_mode(dev) == hidpp20_constants.ONBOARD_MODES.MODE_HOST:
mode = "Host"
else:
mode = "On-Board"
@ -266,9 +267,9 @@ def _print_device(dev, num=None):
print(f" Has {len(dev.keys)} reprogrammable keys:")
for k in dev.keys:
# TODO: add here additional variants for other REPROG_CONTROLS
if dev.keys.keyversion == _hidpp20_constants.FEATURE.REPROG_CONTROLS_V2:
if dev.keys.keyversion == hidpp20_constants.FEATURE.REPROG_CONTROLS_V2:
print(" %2d: %-26s => %-27s %s" % (k.index, k.key, k.default_task, ", ".join(k.flags)))
if dev.keys.keyversion == _hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
if dev.keys.keyversion == hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
print(" %2d: %-26s, default: %-27s => %-26s" % (k.index, k.key, k.default_task, k.mapped_to))
gmask_fmt = ",".join(k.group_mask)
gmask_fmt = gmask_fmt if gmask_fmt else "empty"
@ -311,7 +312,7 @@ def run(devices, args, find_receiver, find_device):
if device_name == "all":
for d in devices:
if isinstance(d, _receiver.Receiver):
if isinstance(d, receiver.Receiver):
_print_receiver(d)
count = d.count()
if count:

View File

@ -15,9 +15,9 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import json as _json
import json
import logging
import os as _os
import os
import threading
import yaml
@ -28,9 +28,9 @@ from solaar import __version__
logger = logging.getLogger(__name__)
_XDG_CONFIG_HOME = _os.environ.get("XDG_CONFIG_HOME") or _os.path.expanduser(_os.path.join("~", ".config"))
_yaml_file_path = _os.path.join(_XDG_CONFIG_HOME, "solaar", "config.yaml")
_json_file_path = _os.path.join(_XDG_CONFIG_HOME, "solaar", "config.json")
_XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser(os.path.join("~", ".config"))
_yaml_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "config.yaml")
_json_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "config.json")
_KEY_VERSION = "_version"
_KEY_NAME = "_NAME"
@ -45,18 +45,18 @@ _config = []
def _load():
loaded_config = []
if _os.path.isfile(_yaml_file_path):
if os.path.isfile(_yaml_file_path):
path = _yaml_file_path
try:
with open(_yaml_file_path) as config_file:
loaded_config = yaml.safe_load(config_file)
except Exception as e:
logger.error("failed to load from %s: %s", _yaml_file_path, e)
elif _os.path.isfile(_json_file_path):
elif os.path.isfile(_json_file_path):
path = _json_file_path
try:
with open(_json_file_path) as config_file:
loaded_config = _json.load(config_file)
loaded_config = json.load(config_file)
except Exception as e:
logger.error("failed to load from %s: %s", _json_file_path, e)
loaded_config = _convert_json(loaded_config)
@ -129,10 +129,10 @@ def save(defer=False):
global save_timer
if not _config:
return
dirname = _os.path.dirname(_yaml_file_path)
if not _os.path.isdir(dirname):
dirname = os.path.dirname(_yaml_file_path)
if not os.path.isdir(dirname):
try:
_os.makedirs(dirname)
os.makedirs(dirname)
except Exception:
logger.error("failed to create %s", dirname)
return

View File

@ -29,15 +29,13 @@ import tempfile
from traceback import format_exc
import solaar.cli as _cli
import solaar.configuration as _configuration
import solaar.dbus as _dbus
import solaar.listener as _listener
import solaar.ui as _ui
import solaar.ui.common as _common
from solaar import NAME
from solaar import __version__
from solaar import cli
from solaar import configuration
from solaar import dbus
from solaar import listener
from solaar import ui
logger = logging.getLogger(__name__)
@ -87,12 +85,12 @@ def _parse_arguments():
arg_parser.add_argument("--tray-icon-size", type=int, help="explicit size for tray icons")
arg_parser.add_argument("-V", "--version", action="version", version="%(prog)s " + __version__)
arg_parser.add_argument("--help-actions", action="store_true", help="print help for the optional actions")
arg_parser.add_argument("action", nargs=argparse.REMAINDER, choices=_cli.actions, help="optional actions to perform")
arg_parser.add_argument("action", nargs=argparse.REMAINDER, choices=cli.actions, help="optional actions to perform")
args = arg_parser.parse_args()
if args.help_actions:
_cli.print_help()
cli.print_help()
return
if args.window is None:
@ -146,7 +144,7 @@ def main():
return
if args.action:
# if any argument, run comandline and exit
return _cli.run(args.action, args.hidraw_path)
return cli.run(args.action, args.hidraw_path)
gi = _require("gi", "python3-gi (in Ubuntu) or python3-gobject (in Fedora)")
_require("gi.repository.Gtk", "gir1.2-gtk-3.0", gi, "Gtk", "3.0")
@ -166,17 +164,17 @@ def main():
logger.warning("Solaar udev file not found in expected location")
logger.warning("See https://pwr-solaar.github.io/Solaar/installation for more information")
try:
_listener.setup_scanner(_ui.status_changed, _ui.setting_changed, _common.error_dialog)
listener.setup_scanner(ui.status_changed, ui.setting_changed, ui.common.error_dialog)
if args.restart_on_wake_up:
_dbus.watch_suspend_resume(_listener.start_all, _listener.stop_all)
dbus.watch_suspend_resume(listener.start_all, listener.stop_all)
else:
_dbus.watch_suspend_resume(lambda: _listener.ping_all(True))
dbus.watch_suspend_resume(lambda: listener.ping_all(True))
_configuration.defer_saves = True # allow configuration saves to be deferred
configuration.defer_saves = True # allow configuration saves to be deferred
# main UI event loop
_ui.run_loop(_listener.start_all, _listener.stop_all, args.window != "only", args.window != "hide")
ui.run_loop(listener.start_all, listener.stop_all, args.window != "only", args.window != "hide")
except Exception:
sys.exit(f"{NAME.lower()}: error: {format_exc()}")

View File

@ -15,7 +15,7 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import errno as _errno
import errno
import logging
import subprocess
import time
@ -24,15 +24,13 @@ from collections import namedtuple
from functools import partial
import gi
import logitech_receiver.device as _device
import logitech_receiver.receiver as _receiver
import logitech_receiver
from logitech_receiver import base as _base
from logitech_receiver import base
from logitech_receiver import exceptions
from logitech_receiver import hidpp10_constants as _hidpp10_constants
from logitech_receiver import listener as _listener
from logitech_receiver import notifications as _notifications
from logitech_receiver.hidpp10_constants import Registers
from logitech_receiver import hidpp10_constants
from logitech_receiver import listener
from logitech_receiver import notifications
from . import configuration
from . import dbus
@ -43,9 +41,6 @@ from gi.repository import GLib # NOQA: E402 # isort:skip
logger = logging.getLogger(__name__)
_IR = _hidpp10_constants.INFO_SUBREGISTERS
_GHOST_DEVICE = namedtuple("_GHOST_DEVICE", ("receiver", "number", "name", "kind", "online"))
_GHOST_DEVICE.__bool__ = lambda self: False
_GHOST_DEVICE.__nonzero__ = _GHOST_DEVICE.__bool__
@ -55,7 +50,7 @@ def _ghost(device):
return _GHOST_DEVICE(receiver=device.receiver, number=device.number, name=device.name, kind=device.kind, online=False)
class SolaarListener(_listener.EventsListener):
class SolaarListener(listener.EventsListener):
"""Keeps the status of a Receiver or Device (member name is receiver but it can also be a device)."""
def __init__(self, receiver, status_changed_callback):
@ -69,7 +64,7 @@ class SolaarListener(_listener.EventsListener):
logger.info("%s: notifications listener has started (%s)", self.receiver, self.receiver.handle)
nfs = self.receiver.enable_connection_notifications()
if logger.isEnabledFor(logging.WARNING):
if not self.receiver.isDevice and not ((nfs if nfs else 0) & _hidpp10_constants.NOTIFICATION_FLAG.wireless):
if not self.receiver.isDevice and not ((nfs if nfs else 0) & hidpp10_constants.NOTIFICATION_FLAG.wireless):
logger.warning(
"Receiver on %s might not support connection notifications, GUI might not show its devices",
self.receiver.path,
@ -142,11 +137,9 @@ class SolaarListener(_listener.EventsListener):
def _notifications_handler(self, n):
assert self.receiver
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("%s: handling %s", self.receiver, n)
if n.devnumber == 0xFF:
# a receiver notification
_notifications.process(self.receiver, n)
notifications.process(self.receiver, n)
return
# a notification that came in to the device listener - strange, but nothing needs to be done here
@ -156,7 +149,7 @@ class SolaarListener(_listener.EventsListener):
return
# DJ pairing notification - ignore - hid++ 1.0 pairing notification is all that is needed
if n.sub_id == 0x41 and n.report_id == _base.DJ_MESSAGE_ID:
if n.sub_id == 0x41 and n.report_id == base.DJ_MESSAGE_ID:
if logger.isEnabledFor(logging.INFO):
logger.info("ignoring DJ pairing notification %s", n)
return
@ -170,7 +163,7 @@ class SolaarListener(_listener.EventsListener):
# FIXME: hacky fix for kernel/hardware race condition
# If the device was just turned on or woken up from sleep, it may not be ready to receive commands.
# The "payload" bit of the wireless tatus notification seems to tell us this. If this is the case, we
# The "payload" bit of the wireless status notification seems to tell us this. If this is the case, we
# must wait a short amount of time to avoid causing a broken pipe error.
device_ready = not bool(ord(n.data[0:1]) & 0x80) or n.sub_id != 0x41
if not device_ready:
@ -183,7 +176,13 @@ class SolaarListener(_listener.EventsListener):
if not already_known:
if n.address == 0x0A and not self.receiver.receiver_kind == "bolt":
# some Nanos send a notification even if no new pairing - check that there really is a device there
if self.receiver.read_register(Registers.RECEIVER_INFO, _IR.pairing_information + n.devnumber - 1) is None:
if (
self.receiver.read_register(
hidpp10_constants.Registers.RECEIVER_INFO,
hidpp10_constants.INFO_SUBREGISTERS.pairing_information + n.devnumber - 1,
)
is None
):
return
dev = self.receiver.register_new_device(n.devnumber, n)
elif self.receiver.pairing.lock_open and self.receiver.re_pairs and not ord(n.data[0:1]) & 0x40:
@ -212,7 +211,7 @@ class SolaarListener(_listener.EventsListener):
# the receiver changed status as well
self._status_changed(self.receiver)
_notifications.process(dev, n)
notifications.process(dev, n)
if self.receiver.pairing.lock_open and not already_known:
# this should be the first notification after a device was paired
@ -256,17 +255,17 @@ def _start(device_info):
assert _status_callback and _setting_callback
isDevice = device_info.isDevice
if not isDevice:
receiver = _receiver.ReceiverFactory.create_receiver(device_info, _setting_callback)
receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver(device_info, _setting_callback)
else:
receiver = _device.DeviceFactory.create_device(_base, device_info, _setting_callback)
if receiver:
configuration.attach_to(receiver)
if receiver.bluetooth and receiver.hid_serial:
dbus.watch_bluez_connect(receiver.hid_serial, partial(_process_bluez_dbus, receiver))
receiver.cleanups.append(_cleanup_bluez_dbus)
receiver_ = logitech_receiver.device.DeviceFactory.create_device(base, device_info, _setting_callback)
if receiver_:
configuration.attach_to(receiver_)
if receiver_.bluetooth and receiver_.hid_serial:
dbus.watch_bluez_connect(receiver_.hid_serial, partial(_process_bluez_dbus, receiver_))
receiver_.cleanups.append(_cleanup_bluez_dbus)
if receiver:
rl = SolaarListener(receiver, _status_callback)
if receiver_:
rl = SolaarListener(receiver_, _status_callback)
rl.start()
_all_listeners[device_info.path] = rl
return rl
@ -278,7 +277,7 @@ def start_all():
stop_all() # just in case this it called twice in a row...
if logger.isEnabledFor(logging.INFO):
logger.info("starting receiver listening threads")
for device_info in _base.receivers_and_devices():
for device_info in base.receivers_and_devices():
_process_receiver_event("add", device_info)
@ -333,14 +332,14 @@ def setup_scanner(status_changed_callback, setting_changed_callback, error_callb
_status_callback = status_changed_callback
_setting_callback = setting_changed_callback
_error_callback = error_callback
_base.notify_on_receivers_glib(_process_receiver_event)
base.notify_on_receivers_glib(_process_receiver_event)
def _process_add(device_info, retry):
try:
_start(device_info)
except OSError as e:
if e.errno == _errno.EACCES:
if e.errno == errno.EACCES:
try:
output = subprocess.check_output(["/usr/bin/getfacl", "-p", device_info.path], text=True)
if logger.isEnabledFor(logging.WARNING):

View File

@ -18,21 +18,21 @@
import logging
from threading import Thread as _Thread
from threading import Thread
logger = logging.getLogger(__name__)
try:
from Queue import Queue as _Queue
from Queue import Queue
except ImportError:
from queue import Queue as _Queue
from queue import Queue
class TaskRunner(_Thread):
class TaskRunner(Thread):
def __init__(self, name):
super().__init__(name=name)
self.daemon = True
self.queue = _Queue(16)
self.queue = Queue(16)
self.alive = False
def __call__(self, function, *args, **kwargs):

View File

@ -18,7 +18,7 @@
import logging
import gi
import yaml as _yaml
import yaml
from logitech_receiver.common import Alert
@ -65,7 +65,7 @@ def _activate(app):
def _command_line(app, command_line):
args = command_line.get_arguments()
args = _yaml.safe_load("".join(args)) if args else args
args = yaml.safe_load("".join(args)) if args else args
if not args:
_activate(app)
elif args[0] == "config": # config call from remote instance

View File

@ -19,7 +19,7 @@ import logging
import gi
from solaar.i18n import _
from solaar.tasks import TaskRunner as _TaskRunner
from solaar.tasks import TaskRunner
gi.require_version("Gtk", "3.0")
from gi.repository import GLib # NOQA: E402
@ -74,7 +74,7 @@ _task_runner = None
def start_async():
global _task_runner
_task_runner = _TaskRunner("AsyncUI")
_task_runner = TaskRunner("AsyncUI")
_task_runner.start()

View File

@ -18,18 +18,17 @@
import logging
import traceback
from threading import Timer as _Timer
from threading import Timer
import gi
from logitech_receiver.hidpp20 import LEDEffectSetting as _LEDEffectSetting
from logitech_receiver.settings import KIND as _SETTING_KIND
from logitech_receiver.settings import SENSITIVITY_IGNORE as _SENSITIVITY_IGNORE
from logitech_receiver import hidpp20
from logitech_receiver import settings
from solaar.i18n import _
from solaar.i18n import ngettext
from .common import ui_async as _ui_async
from .common import ui_async
gi.require_version("Gtk", "3.0")
from gi.repository import Gdk # NOQA: E402
@ -49,7 +48,7 @@ def _read_async(setting, force_read, sbox, device_is_online, sensitive):
logger.warning("%s: error reading so use None (%s): %s", s.name, s._device, repr(e))
GLib.idle_add(_update_setting_item, sb, v, online, sensitive, True, priority=99)
_ui_async(_do_read, setting, force_read, sbox, device_is_online, sensitive)
ui_async(_do_read, setting, force_read, sbox, device_is_online, sensitive)
def _write_async(setting, value, sbox, sensitive=True, key=None):
@ -71,7 +70,7 @@ def _write_async(setting, value, sbox, sensitive=True, key=None):
sbox._failed.set_visible(False)
sbox._spinner.set_visible(True)
sbox._spinner.start()
_ui_async(_do_write, setting, value, sbox, key)
ui_async(_do_write, setting, value, sbox, key)
class ComboBoxText(Gtk.ComboBoxText):
@ -105,7 +104,7 @@ class Control:
def layout(self, sbox, label, change, spinner, failed):
sbox.pack_start(label, False, False, 0)
sbox.pack_end(change, False, False, 0)
fill = sbox.setting.kind == _SETTING_KIND.range or sbox.setting.kind == _SETTING_KIND.hetero
fill = sbox.setting.kind == settings.KIND.range or sbox.setting.kind == settings.KIND.hetero
sbox.pack_end(self, fill, fill, 0)
sbox.pack_end(spinner, False, False, 0)
sbox.pack_end(failed, False, False, 0)
@ -144,7 +143,7 @@ class SliderControl(Gtk.Scale, Control):
if self.get_sensitive():
if self.timer:
self.timer.cancel()
self.timer = _Timer(0.5, lambda: GLib.idle_add(self.do_change))
self.timer = Timer(0.5, lambda: GLib.idle_add(self.do_change))
self.timer.start()
def do_change(self):
@ -435,7 +434,7 @@ class MultipleRangeControl(MultipleControl):
if control.get_sensitive():
if hasattr(control, "_timer"):
control._timer.cancel()
control._timer = _Timer(0.5, lambda: GLib.idle_add(self._write, control, item, sub_item))
control._timer = Timer(0.5, lambda: GLib.idle_add(self._write, control, item, sub_item))
control._timer.start()
def _write(self, control, item, sub_item):
@ -495,7 +494,7 @@ class PackedRangeControl(MultipleRangeControl):
if control.get_sensitive():
if hasattr(control, "_timer"):
control._timer.cancel()
control._timer = _Timer(0.5, lambda: GLib.idle_add(self._write, control, item))
control._timer = Timer(0.5, lambda: GLib.idle_add(self._write, control, item))
control._timer.start()
def _write(self, control, item):
@ -537,14 +536,14 @@ class HeteroKeyControl(Gtk.HBox, Control):
item_lblbox.set_visible(False)
else:
item_lblbox = None
if item["kind"] == _SETTING_KIND.choice:
if item["kind"] == settings.KIND.choice:
item_box = ComboBoxText()
for entry in item["choices"]:
item_box.append(str(int(entry)), str(entry))
item_box.set_active(0)
item_box.connect("changed", self.changed)
self.pack_start(item_box, False, False, 0)
elif item["kind"] == _SETTING_KIND.range:
elif item["kind"] == settings.KIND.range:
item_box = Scale()
item_box.set_range(item["min"], item["max"])
item_box.set_round_digits(0)
@ -559,7 +558,7 @@ class HeteroKeyControl(Gtk.HBox, Control):
result = {}
for k, (_lblbox, box) in self._items.items():
result[str(k)] = box.get_value()
result = _LEDEffectSetting(**result)
result = hidpp20.LEDEffectSetting(**result)
return result
def set_value(self, value):
@ -587,7 +586,7 @@ class HeteroKeyControl(Gtk.HBox, Control):
self.setup_visibles(int(self._items["ID"][1].get_value()))
if hasattr(control, "_timer"):
control._timer.cancel()
control._timer = _Timer(0.3, lambda: GLib.idle_add(self._write, control))
control._timer = Timer(0.3, lambda: GLib.idle_add(self._write, control))
control._timer.start()
def _write(self, control):
@ -598,13 +597,13 @@ class HeteroKeyControl(Gtk.HBox, Control):
_write_async(self.sbox.setting, new_state, self.sbox)
_allowables_icons = {True: "changes-allow", False: "changes-prevent", _SENSITIVITY_IGNORE: "dialog-error"}
_allowables_icons = {True: "changes-allow", False: "changes-prevent", settings.SENSITIVITY_IGNORE: "dialog-error"}
_allowables_tooltips = {
True: _("Changes allowed"),
False: _("No changes allowed"),
_SENSITIVITY_IGNORE: _("Ignore this setting"),
settings.SENSITIVITY_IGNORE: _("Ignore this setting"),
}
_next_allowable = {True: False, False: _SENSITIVITY_IGNORE, _SENSITIVITY_IGNORE: True}
_next_allowable = {True: False, False: settings.SENSITIVITY_IGNORE, settings.SENSITIVITY_IGNORE: True}
_icons_allowables = {v: k for k, v in _allowables_icons.items()}
@ -618,7 +617,7 @@ def _change_click(button, sbox):
_change_icon(new_allowed, icon)
if sbox.setting._device.persister: # remember the new setting sensitivity
sbox.setting._device.persister.set_sensitivity(sbox.setting.name, new_allowed)
if allowed == _SENSITIVITY_IGNORE: # update setting if it was being ignored
if allowed == settings.SENSITIVITY_IGNORE: # update setting if it was being ignored
setting = next((s for s in sbox.setting._device.settings if s.name == sbox.setting.name), None)
if setting:
persisted = sbox.setting._device.persister.get(setting.name) if sbox.setting._device.persister else None
@ -660,21 +659,21 @@ def _create_sbox(s, device):
change.set_sensitive(True)
change.connect("clicked", _change_click, sbox)
if s.kind == _SETTING_KIND.toggle:
if s.kind == settings.KIND.toggle:
control = ToggleControl(sbox)
elif s.kind == _SETTING_KIND.range:
elif s.kind == settings.KIND.range:
control = SliderControl(sbox)
elif s.kind == _SETTING_KIND.choice:
elif s.kind == settings.KIND.choice:
control = _create_choice_control(sbox)
elif s.kind == _SETTING_KIND.map_choice:
elif s.kind == settings.KIND.map_choice:
control = MapChoiceControl(sbox)
elif s.kind == _SETTING_KIND.multiple_toggle:
elif s.kind == settings.KIND.multiple_toggle:
control = MultipleToggleControl(sbox, change)
elif s.kind == _SETTING_KIND.multiple_range:
elif s.kind == settings.KIND.multiple_range:
control = MultipleRangeControl(sbox, change)
elif s.kind == _SETTING_KIND.packed_range:
elif s.kind == settings.KIND.packed_range:
control = PackedRangeControl(sbox, change)
elif s.kind == _SETTING_KIND.hetero:
elif s.kind == settings.KIND.hetero:
control = HeteroKeyControl(sbox, change)
else:
if logger.isEnabledFor(logging.WARNING):
@ -691,7 +690,6 @@ def _create_sbox(s, device):
def _update_setting_item(sbox, value, is_online=True, sensitive=True, nullOK=False):
# sbox._spinner.set_visible(False) # don't repack item box
sbox._spinner.stop()
sensitive = sbox._change_icon._allowed if sensitive is None else sensitive
if value is None and not nullOK:

View File

@ -35,8 +35,8 @@ from logitech_receiver.common import NamedInt
from logitech_receiver.common import NamedInts
from logitech_receiver.common import UnsortedNamedInts
from logitech_receiver.settings import KIND as _SKIND
from logitech_receiver.settings import Setting as _Setting
from logitech_receiver.settings_templates import SETTINGS as _SETTINGS
from logitech_receiver.settings import Setting
from logitech_receiver.settings_templates import SETTINGS
from solaar.i18n import _
from solaar.ui import rule_actions
@ -931,7 +931,7 @@ class DeviceInfo:
serial: str = ""
unitId: str = ""
codename: str = ""
settings: Dict[str, _Setting] = field(default_factory=dict)
settings: Dict[str, Setting] = field(default_factory=dict)
def __post_init__(self):
if self.serial is None or self.serial == "?":
@ -1281,7 +1281,7 @@ class SetValueControl(Gtk.HBox):
def _all_settings():
settings = {}
for s in sorted(_SETTINGS, key=lambda setting: setting.label):
for s in sorted(SETTINGS, key=lambda setting: setting.label):
if s.name not in settings:
settings[s.name] = [s]
else:
@ -1326,7 +1326,6 @@ class _DeviceUI:
self.device_field.set_value("")
self.device_field.set_valign(Gtk.Align.CENTER)
self.device_field.set_size_request(400, 0)
# self.device_field.connect('changed', self._changed_device)
self.device_field.connect("changed", self._on_update)
self.widgets[self.device_field] = (1, 1, 1, 1)
@ -1477,9 +1476,9 @@ class _SettingWithValueUI:
(including the extra value if it exists) and the second element is the extra value to be pinned to
the start of the list (or `None` if there is no extra value).
"""
if isinstance(setting, _Setting):
if isinstance(setting, Setting):
setting = type(setting)
if isinstance(setting, type) and issubclass(setting, _Setting):
if isinstance(setting, type) and issubclass(setting, Setting):
choices = UnsortedNamedInts()
universe = getattr(setting, "choices_universe", None)
if universe:

View File

@ -21,7 +21,7 @@ import logging
from solaar import NAME
from solaar.i18n import _
from . import icons as _icons
from . import icons
logger = logging.getLogger(__name__)
@ -66,14 +66,6 @@ if available:
_notifications.clear()
Notify.uninit()
# def toggle(action):
# if action.get_active():
# init()
# else:
# uninit()
# action.set_sensitive(available)
# return action.get_active()
def alert(reason, icon=None):
assert reason
@ -84,15 +76,13 @@ if available:
# we need to use the filename here because the notifications daemon
# is an external application that does not know about our icon sets
icon_file = _icons.icon_file(NAME.lower()) if icon is None else _icons.icon_file(icon)
icon_file = icons.icon_file(NAME.lower()) if icon is None else icons.icon_file(icon)
n.update(NAME.lower(), reason, icon_file)
n.set_urgency(Notify.Urgency.NORMAL)
n.set_hint("desktop-entry", GLib.Variant("s", NAME.lower()))
try:
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("showing %s", n)
n.show()
except Exception:
logger.exception("showing %s", n)
@ -113,16 +103,10 @@ if available:
message = reason
else:
message = _("unspecified reason")
# elif dev.status is None:
# message = _("unpaired")
# elif bool(dev.status):
# message = dev.status_string() or _("connected")
# else:
# message = _("offline")
# we need to use the filename here because the notifications daemon
# is an external application that does not know about our icon sets
icon_file = _icons.device_icon_file(dev.name, dev.kind) if icon is None else _icons.icon_file(icon)
icon_file = icons.device_icon_file(dev.name, dev.kind) if icon is None else icons.icon_file(icon)
n.update(summary, message, icon_file)
n.set_urgency(Notify.Urgency.NORMAL)
@ -131,8 +115,6 @@ if available:
n.set_hint("value", GLib.Variant("i", progress))
try:
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("showing %s", n)
n.show()
except Exception:
logger.exception("showing %s", n)

View File

@ -19,12 +19,12 @@ import logging
from gi.repository import GLib
from gi.repository import Gtk
from logitech_receiver import hidpp10_constants as _hidpp10_constants
from logitech_receiver import hidpp10_constants
from solaar.i18n import _
from solaar.i18n import ngettext
from . import icons as _icons
from . import icons
logger = logging.getLogger(__name__)
@ -108,7 +108,7 @@ def _check_lock_state(assistant, receiver, count):
return True
elif receiver.pairing.discovering and receiver.pairing.device_address and receiver.pairing.device_name:
add = receiver.pairing.device_address
ent = 20 if receiver.pairing.device_kind == _hidpp10_constants.DEVICE_KIND.keyboard else 10
ent = 20 if receiver.pairing.device_kind == hidpp10_constants.DEVICE_KIND.keyboard else 10
if receiver.pair_device(address=add, authentication=receiver.pairing.device_authentication, entropy=ent):
return True
else:
@ -200,8 +200,8 @@ def _create_success_page(assistant, device):
header = Gtk.Label(label=_("Found a new device:"))
page.pack_start(header, False, False, 0)
device_icon = Gtk.Image()
icon_name = _icons.device_icon_name(device.name, device.kind)
device_icon.set_from_icon_name(icon_name, _icons.LARGE_SIZE)
icon_name = icons.device_icon_name(device.name, device.kind)
device_icon.set_from_icon_name(icon_name, icons.LARGE_SIZE)
page.pack_start(device_icon, True, True, 0)
device_label = Gtk.Label()
device_label.set_markup(f"<b>{device.name}</b>")

View File

@ -17,12 +17,12 @@
from shlex import quote as shlex_quote
from gi.repository import Gtk
from logitech_receiver import diversion as _DIV
from logitech_receiver import diversion
from logitech_receiver.diversion import CLICK
from logitech_receiver.diversion import DEPRESS
from logitech_receiver.diversion import RELEASE
from logitech_receiver.diversion import XK_KEYS as _XK_KEYS
from logitech_receiver.diversion import buttons as _buttons
from logitech_receiver.diversion import XK_KEYS
from logitech_receiver.diversion import buttons
from solaar.i18n import _
from solaar.ui.rule_base import CompletionEntry
@ -30,7 +30,7 @@ from solaar.ui.rule_base import RuleComponentUI
class ActionUI(RuleComponentUI):
CLASS = _DIV.Action
CLASS = diversion.Action
@classmethod
def icon_name(cls):
@ -38,8 +38,8 @@ class ActionUI(RuleComponentUI):
class KeyPressUI(ActionUI):
CLASS = _DIV.KeyPress
KEY_NAMES = [k[3:] if k.startswith("XK_") else k for k, v in _XK_KEYS.items() if isinstance(v, int)]
CLASS = diversion.KeyPress
KEY_NAMES = [k[3:] if k.startswith("XK_") else k for k, v in XK_KEYS.items() if isinstance(v, int)]
def create_widgets(self):
self.widgets = {}
@ -107,7 +107,6 @@ class KeyPressUI(ActionUI):
self._create_field()
self._create_del_btn()
# self.widgets[self.add_btn] = (n + 1, 0, 1, 1)
self.widgets[self.add_btn] = (n, 1, 1, 1)
super().show(component, editable)
for i in range(n):
@ -137,7 +136,7 @@ class KeyPressUI(ActionUI):
class MouseScrollUI(ActionUI):
CLASS = _DIV.MouseScroll
CLASS = diversion.MouseScroll
MIN_VALUE = -2000
MAX_VALUE = 2000
@ -191,10 +190,10 @@ class MouseScrollUI(ActionUI):
class MouseClickUI(ActionUI):
CLASS = _DIV.MouseClick
CLASS = diversion.MouseClick
MIN_VALUE = 1
MAX_VALUE = 9
BUTTONS = list(_buttons.keys())
BUTTONS = list(buttons.keys())
ACTIONS = [CLICK, DEPRESS, RELEASE]
def create_widgets(self):
@ -249,7 +248,7 @@ class MouseClickUI(ActionUI):
class ExecuteUI(ActionUI):
CLASS = _DIV.Execute
CLASS = diversion.Execute
def create_widgets(self):
self.widgets = {}

View File

@ -17,7 +17,7 @@
from contextlib import contextmanager as contextlib_contextmanager
from gi.repository import Gtk
from logitech_receiver import diversion as _DIV
from logitech_receiver import diversion
def norm(s):
@ -47,7 +47,7 @@ class CompletionEntry(Gtk.Entry):
class RuleComponentUI:
CLASS = _DIV.RuleComponent
CLASS = diversion.RuleComponent
def __init__(self, panel, on_update=None):
self.panel = panel

View File

@ -16,10 +16,10 @@
from dataclasses import dataclass
from gi.repository import Gtk
from logitech_receiver import diversion as _DIV
from logitech_receiver.diversion import Key as _Key
from logitech_receiver.hidpp20 import FEATURE as _ALL_FEATURES
from logitech_receiver.special_keys import CONTROL as _CONTROL
from logitech_receiver import diversion
from logitech_receiver.diversion import Key
from logitech_receiver.hidpp20 import FEATURE
from logitech_receiver.special_keys import CONTROL
from solaar.i18n import _
from solaar.ui.rule_base import CompletionEntry
@ -27,7 +27,7 @@ from solaar.ui.rule_base import RuleComponentUI
class ConditionUI(RuleComponentUI):
CLASS = _DIV.Condition
CLASS = diversion.Condition
@classmethod
def icon_name(cls):
@ -35,7 +35,7 @@ class ConditionUI(RuleComponentUI):
class ProcessUI(ConditionUI):
CLASS = _DIV.Process
CLASS = diversion.Process
def create_widgets(self):
self.widgets = {}
@ -65,7 +65,7 @@ class ProcessUI(ConditionUI):
class MouseProcessUI(ConditionUI):
CLASS = _DIV.MouseProcess
CLASS = diversion.MouseProcess
def create_widgets(self):
self.widgets = {}
@ -95,17 +95,17 @@ class MouseProcessUI(ConditionUI):
class FeatureUI(ConditionUI):
CLASS = _DIV.Feature
CLASS = diversion.Feature
FEATURES_WITH_DIVERSION = [
str(_ALL_FEATURES.CROWN),
str(_ALL_FEATURES.THUMB_WHEEL),
str(_ALL_FEATURES.LOWRES_WHEEL),
str(_ALL_FEATURES.HIRES_WHEEL),
str(_ALL_FEATURES.GESTURE_2),
str(_ALL_FEATURES.REPROG_CONTROLS_V4),
str(_ALL_FEATURES.GKEY),
str(_ALL_FEATURES.MKEYS),
str(_ALL_FEATURES.MR),
str(FEATURE.CROWN),
str(FEATURE.THUMB_WHEEL),
str(FEATURE.LOWRES_WHEEL),
str(FEATURE.HIRES_WHEEL),
str(FEATURE.GESTURE_2),
str(FEATURE.REPROG_CONTROLS_V4),
str(FEATURE.GKEY),
str(FEATURE.MKEYS),
str(FEATURE.MR),
]
def create_widgets(self):
@ -118,10 +118,9 @@ class FeatureUI(ConditionUI):
for feature in self.FEATURES_WITH_DIVERSION:
self.field.append(feature, feature)
self.field.set_valign(Gtk.Align.CENTER)
# self.field.set_vexpand(True)
self.field.set_size_request(600, 0)
self.field.connect("changed", self._on_update)
all_features = [str(f) for f in _ALL_FEATURES]
all_features = [str(f) for f in FEATURE]
CompletionEntry.add_completion_to_entry(self.field.get_child(), all_features)
self.widgets[self.field] = (0, 1, 1, 1)
@ -151,7 +150,7 @@ class FeatureUI(ConditionUI):
class ReportUI(ConditionUI):
CLASS = _DIV.Report
CLASS = diversion.Report
MIN_VALUE = -1 # for invalid values
MAX_VALUE = 15
@ -164,7 +163,6 @@ class ReportUI(ConditionUI):
self.field.set_halign(Gtk.Align.CENTER)
self.field.set_valign(Gtk.Align.CENTER)
self.field.set_hexpand(True)
# self.field.set_vexpand(True)
self.field.connect("changed", self._on_update)
self.widgets[self.field] = (0, 1, 1, 1)
@ -186,7 +184,7 @@ class ReportUI(ConditionUI):
class ModifiersUI(ConditionUI):
CLASS = _DIV.Modifiers
CLASS = diversion.Modifiers
def create_widgets(self):
self.widgets = {}
@ -195,7 +193,7 @@ class ModifiersUI(ConditionUI):
self.widgets[self.label] = (0, 0, 5, 1)
self.labels = {}
self.switches = {}
for i, m in enumerate(_DIV.MODIFIERS):
for i, m in enumerate(diversion.MODIFIERS):
switch = Gtk.Switch(halign=Gtk.Align.CENTER, valign=Gtk.Align.START, hexpand=True)
label = Gtk.Label(label=m, halign=Gtk.Align.CENTER, valign=Gtk.Align.END, hexpand=True)
self.widgets[label] = (i, 1, 1, 1)
@ -207,7 +205,7 @@ class ModifiersUI(ConditionUI):
def show(self, component, editable):
super().show(component, editable)
with self.ignore_changes():
for m in _DIV.MODIFIERS:
for m in diversion.MODIFIERS:
self.switches[m].set_active(m in component.modifiers)
def collect_value(self):
@ -223,8 +221,8 @@ class ModifiersUI(ConditionUI):
class KeyUI(ConditionUI):
CLASS = _DIV.Key
KEY_NAMES = map(str, _CONTROL)
CLASS = diversion.Key
KEY_NAMES = map(str, CONTROL)
def create_widgets(self):
self.widgets = {}
@ -241,23 +239,23 @@ class KeyUI(ConditionUI):
self.key_field.connect("changed", self._on_update)
self.widgets[self.key_field] = (0, 1, 2, 1)
self.action_pressed_radio = Gtk.RadioButton.new_with_label_from_widget(None, _("Key down"))
self.action_pressed_radio.connect("toggled", self._on_update, _Key.DOWN)
self.action_pressed_radio.connect("toggled", self._on_update, Key.DOWN)
self.widgets[self.action_pressed_radio] = (2, 1, 1, 1)
self.action_released_radio = Gtk.RadioButton.new_with_label_from_widget(self.action_pressed_radio, _("Key up"))
self.action_released_radio.connect("toggled", self._on_update, _Key.UP)
self.action_released_radio.connect("toggled", self._on_update, Key.UP)
self.widgets[self.action_released_radio] = (3, 1, 1, 1)
def show(self, component, editable):
super().show(component, editable)
with self.ignore_changes():
self.key_field.set_text(str(component.key) if self.component.key else "")
if not component.action or component.action == _Key.DOWN:
if not component.action or component.action == Key.DOWN:
self.action_pressed_radio.set_active(True)
else:
self.action_released_radio.set_active(True)
def collect_value(self):
action = _Key.UP if self.action_released_radio.get_active() else _Key.DOWN
action = Key.UP if self.action_released_radio.get_active() else Key.DOWN
return [self.key_field.get_text(), action]
def _on_update(self, *args):
@ -275,8 +273,8 @@ class KeyUI(ConditionUI):
class KeyIsDownUI(ConditionUI):
CLASS = _DIV.KeyIsDown
KEY_NAMES = map(str, _CONTROL)
CLASS = diversion.KeyIsDown
KEY_NAMES = map(str, CONTROL)
def create_widgets(self):
self.widgets = {}
@ -316,7 +314,7 @@ class KeyIsDownUI(ConditionUI):
class TestUI(ConditionUI):
CLASS = _DIV.Test
CLASS = diversion.Test
def create_widgets(self):
self.widgets = {}
@ -330,13 +328,13 @@ class TestUI(ConditionUI):
self.test = Gtk.ComboBoxText.new_with_entry()
self.test.append("", "")
for t in _DIV.TESTS:
for t in diversion.TESTS:
self.test.append(t, t)
self.test.set_halign(Gtk.Align.END)
self.test.set_valign(Gtk.Align.CENTER)
self.test.set_hexpand(False)
self.test.set_size_request(300, 0)
CompletionEntry.add_completion_to_entry(self.test.get_child(), _DIV.TESTS)
CompletionEntry.add_completion_to_entry(self.test.get_child(), diversion.TESTS)
self.test.connect("changed", self._on_update)
self.widgets[self.test] = (1, 1, 1, 1)
@ -350,7 +348,7 @@ class TestUI(ConditionUI):
with self.ignore_changes():
self.test.set_active_id(component.test)
self.parameter.set_text(str(component.parameter) if component.parameter is not None else "")
if component.test not in _DIV.TESTS:
if component.test not in diversion.TESTS:
self.test.get_child().set_text(component.test)
self._change_status_icon()
@ -367,7 +365,7 @@ class TestUI(ConditionUI):
self._change_status_icon()
def _change_status_icon(self):
icon = "dialog-warning" if (self.test.get_active_text() or "").strip() not in _DIV.TESTS else ""
icon = "dialog-warning" if (self.test.get_active_text() or "").strip() not in diversion.TESTS else ""
self.test.get_child().set_icon_from_icon_name(Gtk.EntryIconPosition.SECONDARY, icon)
@classmethod
@ -395,7 +393,7 @@ class TestBytesMode:
class TestBytesUI(ConditionUI):
CLASS = _DIV.TestBytes
CLASS = diversion.TestBytes
_common_elements = [
TestBytesElement("begin", _("begin (inclusive)"), 0, 16),
@ -508,7 +506,7 @@ class TestBytesUI(ConditionUI):
class MouseGestureUI(ConditionUI):
CLASS = _DIV.MouseGesture
CLASS = diversion.MouseGesture
MOUSE_GESTURE_NAMES = [
"Mouse Up",
"Mouse Down",
@ -519,7 +517,7 @@ class MouseGestureUI(ConditionUI):
"Mouse Down-left",
"Mouse Down-right",
]
MOVE_NAMES = list(map(str, _CONTROL)) + MOUSE_GESTURE_NAMES
MOVE_NAMES = list(map(str, CONTROL)) + MOUSE_GESTURE_NAMES
def create_widgets(self):
self.widgets = {}

View File

@ -18,7 +18,7 @@
import logging
import os
from time import time as _timestamp
from time import time
import gi
@ -31,11 +31,10 @@ import solaar.gtk as gtk
from solaar import NAME
from solaar.i18n import _
from . import icons as _icons
from .about import show_window as _show_about_window
from .action import make_image_menu_item
from .window import popup as _window_popup
from .window import toggle as _window_toggle
from . import about
from . import action
from . import icons
from . import window
logger = logging.getLogger(__name__)
@ -52,8 +51,8 @@ def _create_menu(quit_handler):
menu.append(no_receiver)
menu.append(Gtk.SeparatorMenuItem.new())
menu.append(make_image_menu_item(_("About %s") % NAME, "help-about", _show_about_window))
menu.append(make_image_menu_item(_("Quit %s") % NAME, "application-exit", quit_handler))
menu.append(action.make_image_menu_item(_("About %s") % NAME, "help-about", about.show_window))
menu.append(action.make_image_menu_item(_("Quit %s") % NAME, "application-exit", quit_handler))
menu.show_all()
return menu
@ -78,7 +77,7 @@ def _scroll(tray_icon, event, direction=None):
# scroll events come way too fast (at least 5-6 at once) so take a little break between them
global _last_scroll
now = now or _timestamp()
now = now or time()
if now - _last_scroll < 0.33: # seconds
return
_last_scroll = now
@ -162,13 +161,13 @@ try:
return icon_info.get_filename() if icon_info else icon_name
def _create(menu):
_icons._init_icon_paths()
icons._init_icon_paths()
ind = AppIndicator3.Indicator.new(
"indicator-solaar", _icon_file(_icons.TRAY_INIT), AppIndicator3.IndicatorCategory.HARDWARE
"indicator-solaar", _icon_file(icons.TRAY_INIT), AppIndicator3.IndicatorCategory.HARDWARE
)
ind.set_title(NAME)
ind.set_status(AppIndicator3.IndicatorStatus.ACTIVE)
# ind.set_attention_icon_full(_icon_file(_icons.TRAY_ATTENTION), '') # works poorly for XFCE 16
# ind.set_attention_icon_full(_icon_file(icons.TRAY_ATTENTION), '') # works poorly for XFCE 16
# ind.set_label(NAME.lower(), NAME.lower())
ind.set_menu(menu)
@ -187,21 +186,21 @@ try:
_ignore, _ignore, name, device = _picked_device
battery_level = device.battery_info.level if device.battery_info is not None else None
battery_charging = device.battery_info.charging() if device.battery_info is not None else None
tray_icon_name = _icons.battery(battery_level, battery_charging)
tray_icon_name = icons.battery(battery_level, battery_charging)
description = f"{name}: {device.status_string()}"
else:
# there may be a receiver, but no peripherals
tray_icon_name = _icons.TRAY_OKAY if _devices_info else _icons.TRAY_INIT
tray_icon_name = icons.TRAY_OKAY if _devices_info else icons.TRAY_INIT
description_lines = _generate_description_lines()
description = "\n".join(description_lines).rstrip("\n")
# icon_file = _icons.icon_file(icon_name, _TRAY_ICON_SIZE)
# icon_file = icons.icon_file(icon_name, _TRAY_ICON_SIZE)
_icon.set_icon_full(_icon_file(tray_icon_name), description)
def attention(reason=None):
if _icon.get_status() != AppIndicator3.IndicatorStatus.ATTENTION:
# _icon.set_attention_icon_full(_icon_file(_icons.TRAY_ATTENTION), reason or '') # works poorly for XFCe 16
# _icon.set_attention_icon_full(_icon_file(icons.TRAY_ATTENTION), reason or '') # works poorly for XFCe 16
_icon.set_status(AppIndicator3.IndicatorStatus.ATTENTION)
GLib.timeout_add(10 * 1000, _icon.set_status, AppIndicator3.IndicatorStatus.ACTIVE)
@ -210,11 +209,11 @@ except ImportError:
logger.debug("using StatusIcon")
def _create(menu):
icon = Gtk.StatusIcon.new_from_icon_name(_icons.TRAY_INIT)
icon = Gtk.StatusIcon.new_from_icon_name(icons.TRAY_INIT)
icon.set_name(NAME.lower())
icon.set_title(NAME)
icon.set_tooltip_text(NAME)
icon.connect("activate", _window_toggle)
icon.connect("activate", window.toggle)
icon.connect("scroll-event", _scroll)
icon.connect("popup-menu", lambda icon, button, time: menu.popup(None, None, icon.position_menu, icon, button, time))
@ -235,10 +234,10 @@ except ImportError:
_ignore, _ignore, name, device = _picked_device
battery_level = device.battery_info.level if device.battery_info is not None else None
battery_charging = device.battery_info.charging() if device.battery_info is not None else None
tray_icon_name = _icons.battery(battery_level, battery_charging)
tray_icon_name = icons.battery(battery_level, battery_charging)
else:
# there may be a receiver, but no peripherals
tray_icon_name = _icons.TRAY_OKAY if _devices_info else _icons.TRAY_ATTENTION
tray_icon_name = icons.TRAY_OKAY if _devices_info else icons.TRAY_ATTENTION
_icon.set_from_icon_name(tray_icon_name)
_icon_before_attention = None
@ -246,7 +245,7 @@ except ImportError:
def _blink(count):
global _icon_before_attention
if count % 2:
_icon.set_from_icon_name(_icons.TRAY_ATTENTION)
_icon.set_from_icon_name(icons.TRAY_ATTENTION)
else:
_icon.set_from_icon_name(_icon_before_attention)
@ -337,7 +336,7 @@ def _add_device(device):
_devices_info.insert(index, new_device_info)
label = (" " if device.number else "") + device.name
new_menu_item = make_image_menu_item(label, None, _window_popup, receiver_path, device.number)
new_menu_item = action.make_image_menu_item(label, None, window.popup, receiver_path, device.number)
_menu.insert(new_menu_item, index)
return index
@ -360,8 +359,8 @@ def _add_receiver(receiver):
index = len(_devices_info)
new_receiver_info = (receiver.path, None, receiver.name, None)
_devices_info.insert(index, new_receiver_info)
icon_name = _icons.device_icon_name(receiver.name, receiver.kind)
new_menu_item = make_image_menu_item(receiver.name, icon_name, _window_popup, receiver.path)
icon_name = icons.device_icon_name(receiver.name, receiver.kind)
new_menu_item = action.make_image_menu_item(receiver.name, icon_name, window.popup, receiver.path)
_menu.insert(new_menu_item, index)
return 0
@ -385,7 +384,7 @@ def _update_menu_item(index, device):
menu_item = menu_items[index]
level = device.battery_info.level if device.battery_info is not None else None
charging = device.battery_info.charging() if device.battery_info is not None else None
icon_name = _icons.battery(level, charging)
icon_name = icons.battery(level, charging)
menu_item.label.set_label((" " if 0 < device.number <= 6 else "") + device.name + ": " + device.status_string())
image_widget = menu_item.icon
image_widget.set_sensitive(bool(device.online))

View File

@ -20,22 +20,21 @@ import logging
import gi
from gi.repository.GObject import TYPE_PYOBJECT
from logitech_receiver import hidpp10_constants as _hidpp10_constants
from logitech_receiver.common import NamedInt as _NamedInt
from logitech_receiver.common import NamedInts as _NamedInts
from logitech_receiver import hidpp10_constants
from logitech_receiver.common import LOGITECH_VENDOR_ID
from logitech_receiver.common import NamedInt
from logitech_receiver.common import NamedInts
from solaar import NAME
from solaar.i18n import _
from solaar.i18n import ngettext
from . import action as _action
from . import config_panel as _config_panel
from . import icons as _icons
from .about import show_window as _show_about_window
from .common import ui_async as _ui_async
from .diversion_rules import show_window as _show_diversion_window
# from solaar import __version__ as VERSION
from . import about
from . import action
from . import config_panel
from . import diversion_rules
from . import icons
from .common import ui_async
gi.require_version("Gdk", "3.0")
from gi.repository import Gdk # NOQA: E402
@ -56,7 +55,7 @@ except (ValueError, AttributeError):
_CAN_SET_ROW_NONE = ""
# tree model columns
_COLUMN = _NamedInts(PATH=0, NUMBER=1, ACTIVE=2, NAME=3, ICON=4, STATUS_TEXT=5, STATUS_ICON=6, DEVICE=7)
_COLUMN = NamedInts(PATH=0, NUMBER=1, ACTIVE=2, NAME=3, ICON=4, STATUS_TEXT=5, STATUS_ICON=6, DEVICE=7)
_COLUMN_TYPES = (str, int, bool, str, str, str, str, TYPE_PYOBJECT)
_TREE_SEPATATOR = (None, 0, False, None, None, None, None, None)
assert len(_TREE_SEPATATOR) == len(_COLUMN_TYPES)
@ -131,7 +130,7 @@ def _create_device_panel():
p.pack_start(Gtk.Separator.new(Gtk.Orientation.HORIZONTAL), False, False, 0) # spacer
p._config = _config_panel.create()
p._config = config_panel.create()
p.pack_end(p._config, True, True, 4)
return p
@ -174,7 +173,7 @@ def _create_buttons_box():
assert receiver is not None
assert bool(receiver)
assert receiver.kind is None
_action.pair(_window, receiver)
action.pair(_window, receiver)
bb._pair = _new_button(_("Pair new device"), "list-add", clicked=_pair_new_device)
bb.add(bb._pair)
@ -184,7 +183,7 @@ def _create_buttons_box():
device = _find_selected_device()
assert device is not None
assert device.kind is not None
_action.unpair(_window, device)
action.unpair(_window, device)
bb._unpair = _new_button(_("Unpair"), "edit-delete", clicked=_unpair_current_device)
bb.add(bb._unpair)
@ -233,7 +232,6 @@ def _create_tree(model):
tree.set_headers_visible(False)
tree.set_show_expanders(False)
tree.set_level_indentation(20)
# tree.set_fixed_height_mode(True)
tree.set_enable_tree_lines(True)
tree.set_reorderable(False)
tree.set_enable_search(False)
@ -307,19 +305,14 @@ def _create_window_layout():
bottom_buttons_box.set_spacing(20)
quit_button = _new_button(_("Quit %s") % NAME, "application-exit", _SMALL_BUTTON_ICON_SIZE, clicked=destroy)
bottom_buttons_box.add(quit_button)
about_button = _new_button(_("About %s") % NAME, "help-about", _SMALL_BUTTON_ICON_SIZE, clicked=_show_about_window)
about_button = _new_button(_("About %s") % NAME, "help-about", _SMALL_BUTTON_ICON_SIZE, clicked=about.show_window)
bottom_buttons_box.add(about_button)
diversion_button = _new_button(
_("Rule Editor"), "", _SMALL_BUTTON_ICON_SIZE, clicked=lambda *_trigger: _show_diversion_window(_model)
_("Rule Editor"), "", _SMALL_BUTTON_ICON_SIZE, clicked=lambda *_trigger: diversion_rules.show_window(_model)
)
bottom_buttons_box.add(diversion_button)
bottom_buttons_box.set_child_secondary(diversion_button, True)
# solaar_version = Gtk.Label()
# solaar_version.set_markup('<small>' + NAME + ' v' + VERSION + '</small>')
# bottom_buttons_box.add(solaar_version)
# bottom_buttons_box.set_child_secondary(solaar_version, True)
vbox = Gtk.Box.new(Gtk.Orientation.VERTICAL, 8)
vbox.set_border_width(8)
vbox.pack_start(panel, True, True, 0)
@ -335,10 +328,6 @@ def _create(delete_action):
window = Gtk.Window()
window.set_title(NAME)
window.set_role("status-window")
# window.set_type_hint(Gdk.WindowTypeHint.UTILITY)
# window.set_skip_taskbar_hint(True)
# window.set_skip_pager_hint(True)
window.connect("delete-event", delete_action)
vbox = _create_window_layout()
@ -373,8 +362,6 @@ def _find_selected_device_id():
def _device_selected(selection):
model, item = selection.get_selected()
device = model.get_value(item, _COLUMN.DEVICE) if item else None
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug("window tree selected device %s", device)
if device:
_update_info_panel(device, full=True)
else:
@ -399,7 +386,7 @@ def _receiver_row(receiver_path, receiver=None):
item = _model.iter_next(item)
if not item and receiver:
icon_name = _icons.device_icon_name(receiver.name)
icon_name = icons.device_icon_name(receiver.name)
status_text = None
status_icon = None
row_data = (receiver_path, 0, True, receiver.name, icon_name, status_text, status_icon, receiver)
@ -444,7 +431,7 @@ def _device_row(receiver_path, device_number, device=None):
item = _model.iter_next(item)
if not item and device:
icon_name = _icons.device_icon_name(device.name, device.kind)
icon_name = icons.device_icon_name(device.name, device.kind)
status_text = None
status_icon = None
row_data = (
@ -516,8 +503,7 @@ def _update_details(button):
yield (_("Path"), device.path)
if device.kind is None:
# 046d is the Logitech vendor id
yield (_("USB ID"), "046d:" + device.product_id)
yield (_("USB ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id)
if read_all:
yield (_("Serial"), device.serial)
@ -530,7 +516,7 @@ def _update_details(button):
if device.wpid:
yield (_("Wireless PID"), device.wpid)
if device.product_id:
yield (_("Product ID"), "046d:" + device.product_id)
yield (_("Product ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id)
hid_version = device.protocol
yield (_("Protocol"), f"HID++ {hid_version:1.1f}" if hid_version else _("Unknown"))
if read_all and device.polling_rate:
@ -553,7 +539,7 @@ def _update_details(button):
flag_bits = device.notification_flags
if flag_bits is not None:
flag_names = (
(f"({_('none')})",) if flag_bits == 0 else _hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits)
(f"({_('none')})",) if flag_bits == 0 else hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits)
)
yield (_("Notifications"), (f"\n{' ':15}").join(flag_names))
@ -588,7 +574,7 @@ def _update_details(button):
if read_all:
_details._current_device = None
else:
_ui_async(_read_slow, selected_device)
ui_async(_read_slow, selected_device)
_details.set_visible(visible)
@ -670,7 +656,7 @@ def _update_device_panel(device, panel, buttons, full=False):
panel._battery.set_visible(True)
battery_next_level = device.battery_info.next_level
charging = device.battery_info.charging() if device.battery_info is not None else None
icon_name = _icons.battery(battery_level, charging)
icon_name = icons.battery(battery_level, charging)
panel._battery._icon.set_from_icon_name(icon_name, _INFO_ICON_SIZE)
panel._battery._icon.set_sensitive(True)
panel._battery._text.set_sensitive(is_online)
@ -686,9 +672,9 @@ def _update_device_panel(device, panel, buttons, full=False):
if battery_voltage is not None and battery_level is not None:
text += ", "
if battery_level is not None:
text += _(str(battery_level)) if isinstance(battery_level, _NamedInt) else f"{int(battery_level)}%"
text += _(str(battery_level)) if isinstance(battery_level, NamedInt) else f"{int(battery_level)}%"
if battery_next_level is not None and not charging:
if isinstance(battery_next_level, _NamedInt):
if isinstance(battery_next_level, NamedInt):
text += "<small> (" + _("next reported ") + _(str(battery_next_level)) + ")</small>"
else:
text += "<small> (" + _("next reported ") + f"{int(battery_next_level)}%" + ")</small>"
@ -731,7 +717,7 @@ def _update_device_panel(device, panel, buttons, full=False):
if light_level is None:
panel._lux.set_visible(False)
else:
panel._lux._icon.set_from_icon_name(_icons.lux(light_level), _INFO_ICON_SIZE)
panel._lux._icon.set_from_icon_name(icons.lux(light_level), _INFO_ICON_SIZE)
panel._lux._text.set_text(_("%(light_level)d lux") % {"light_level": light_level})
panel._lux.set_visible(True)
else:
@ -744,7 +730,7 @@ def _update_device_panel(device, panel, buttons, full=False):
panel.set_visible(True)
if full:
_config_panel.update(device, is_online)
config_panel.update(device, is_online)
def _update_info_panel(device, full=False):
@ -760,7 +746,7 @@ def _update_info_panel(device, full=False):
assert device
_info._title.set_markup(f"<b>{device.name}</b>")
icon_name = _icons.device_icon_name(device.name, device.kind)
icon_name = icons.device_icon_name(device.name, device.kind)
_info._icon.set_from_icon_name(icon_name, _DEVICE_ICON_SIZE)
if device.kind is None:
@ -821,7 +807,7 @@ def destroy(_ignore1=None, _ignore2=None):
w, _window = _window, None
w.destroy()
w = None
_config_panel.destroy()
config_panel.destroy()
_empty = None
_info = None
@ -871,7 +857,7 @@ def update(device, need_popup=False, refresh=False):
update_device(device, item, selected_device_id, need_popup, full=refresh)
elif item:
_model.remove(item)
_config_panel.clean(device)
config_panel.clean(device)
# make sure all rows are visible
_tree.expand_all()
@ -890,14 +876,14 @@ def update_device(device, item, selected_device_id, need_popup, full=False):
else:
if battery_voltage is not None and False: # Use levels instead of voltage here
status_text = f"{int(battery_voltage)}mV"
elif isinstance(battery_level, _NamedInt):
elif isinstance(battery_level, NamedInt):
status_text = _(str(battery_level))
else:
status_text = f"{int(battery_level)}%"
_model.set_value(item, _COLUMN.STATUS_TEXT, status_text)
charging = device.battery_info.charging() if device.battery_info is not None else None
icon_name = _icons.battery(battery_level, charging)
icon_name = icons.battery(battery_level, charging)
_model.set_value(item, _COLUMN.STATUS_ICON, icon_name)
_model.set_value(item, _COLUMN.NAME, device.codename)