Add input support to the sono-os #5

Merged
hatam merged 49 commits from input into master 2 years ago
  1. 155
      .gitignore
  2. 49
      setup.sh
  3. 1
      src/rules/90-drm.rules
  4. 13
      src/rules/90-hid-mouse.rules
  5. 46
      src/scripts/python/changemouse.py
  6. 3
      src/scripts/python/conf/desktop.conf
  7. 211
      src/scripts/python/setupmonitor.py
  8. 64
      src/scripts/python/udevhandle.py
  9. 1
      src/scripts/python/util/__init__.py
  10. 63
      src/scripts/python/util/common.py
  11. 80
      src/scripts/python/util/egalax.py
  12. 198
      src/scripts/python/util/pointer.py
  13. 113
      src/scripts/python/util/randr.py
  14. 106
      src/scripts/python/util/x.py
  15. 12
      src/scripts/setupmonitor.sh
  16. 195
      src/scripts/utils/progressbar.sh
  17. 400
      src/scripts/utils/source.sh

155
.gitignore

@ -125,3 +125,158 @@ _deps
# .nfs files are created when an open file is removed but is still being accessed
.nfs*
# ------> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/

49
setup.sh

@ -0,0 +1,49 @@
#!/bin/env bash
source ./src/scripts/utils/source.sh
source ./src/scripts/utils/progressbar.sh
while getopts 'v' OPTION; do
case "$OPTION" in
v)
_V=1
;;
?)
echo "usage: ./setup.sh [-v]" >&2
exit 1
;;
esac
done
enable_trapping
draw_progress_bar 0
log '.: Setting up sono-os v0.1.0 :.'
sleep 1
draw_progress_bar 5
log 'Installing dependancies ...'
# TODO
sleep 1
draw_progress_bar 15
log 'Installing scripts ...'
# TODO
sleep 1
draw_progress_bar 45
log 'Installing config files ...'
# TODO
sleep 1
draw_progress_bar 65
log 'Configuring Logger ...'
# TODO
sleep 1
draw_progress_bar 85
log 'Copying rules to udev ...'
# TODO
sleep 1
draw_progress_bar 100
destroy_scroll_area

1
src/rules/90-drm.rules

@ -0,0 +1 @@
SUBSYSTEM=="drm", ENV{MONITOR_LOCK}="/tmp/monitorlock", ENV{SONOLOG}="/tmp/sonolog.log", RUN+="/usr/local/bin/setupmonitor.sh"

13
src/rules/90-hid-mouse.rules

@ -0,0 +1,13 @@
# Following rule will take affect after binding/unbinding action of HID devices
# which is expected. These rules only take effect one time for each mouse
# insertion, removal. Actually, usb events are triggered when usb and
# usb interface are binding but hid is more specific and happen once for each
# action. change action is not used because
# XAUTHORUTY and DISPLAY must be set and related to the current session. It may
# vary with different graphic drivers.
SUBSYSTEM=="hid", ENV{DISPLAY}=":0", ENV{XAUTHORITY}="/run/user/1000/gdm/Xauthority", ACTION=="bind", RUN+="/usr/local/bin/changemouse.py"
# the following line may not be needed as when we unplug a mouse it would
# automatically remove devices from the list and they need no furthur
# configuration
# SUBSYSTEM=="hid", ACTION=="unbind", RUN+="/usr/local/bin/changemouse.py"

46
src/scripts/python/changemouse.py

@ -0,0 +1,46 @@
#!/usr/bin/env python
"""Change mouse script
Whenever a mouse is changed to the system, this script will be executed. These
are the rules:
+ All regular mouses should connect to the "Virtual core pointer" of the
system
+ Touchpanel should be connected to the master "touch-pointer"
NOTE: if master `touch` is not present in the system, the script will create a
master `touch` itself and hides the pointer. eGalax device input will be
attached to this master. TODO: make master's cursor invisible.
Currently we'll use xinput command-line application, but, It is possible
to write a specified c program that uses Xlib efficiently.
Steps:
+ List all pointer as we don't know what pointer is added to the system
+ group them by their usage by name rules (As if the pointer is eGalax
touch pointer it should be attached to the touch-pointer o.w. it
should be attached to Vitual core pointer (OR trackball TODO))
utility functions to find and group pointers aer available in xutil module.
NOTE: In case of psyco adds and removes mouses with intervals smaller than run
time of this code (which is not probabale at all) a lockfile is used so that
only one instance of this code is running at a moment so no conflicts occur.
"""
import util.pointer as putil
import util.x as xutil
if __name__ == "__main__":
"""Configure Pointers
Execution time: 140ms avg -> Tolarable
"""
v_core, touch_master, e_galax, pointers = putil.get_pointers_categorized()
if e_galax:
xutil.reattach(e_galax.id, touch_master.id)
else:
# TODO: disable touch?
pass
for p in pointers:
xutil.reattach(p.id, v_core.id)

3
src/scripts/python/conf/desktop.conf

@ -0,0 +1,3 @@
[DEFAULT]
MainDisplay=HDMI-3
Policy=Mirror

211
src/scripts/python/setupmonitor.py

@ -0,0 +1,211 @@
"""Setup Monitor Script
Author: Ali Hatami Tajik [hatam](mailto:a.hatam008@gmail.com)
Date: 2023 Mar 04
This script should be used whenever a change happen in the rdm system, I
guess! But as I investigate a rdm change event will happen many time in case
of addition or removal of a monitor auto configuration is done by the
org.mate.SettingsDaemon.plugins.xrandr deamon. I'll searching for a way to
change that event to run our specified script.
This script will do the following:
1. List currently available monitors
2. Map those to config file
3. Replace default values for missing configs
4. Handle missing touch-screen/main monitor
NOTE: In general we use at least two monitoers with our system.
One is for touch screen and one is for regular screen. In
case of one missing a callback is run to handle that
occurance (an event may send to the software or a temp log
file may be updated so the software adjust ifself
correspondigly).
Config files are in JSON format and spesify screen output name and their
mode (xrandr resolution and stuff may be added but for now --auto/--prefered
option is used). The format is:
"""
import Xlib.display
import Xlib.ext.randr
import configparser
from util import edit_distance
from util.egalax import get_egalax_drm_pure_name
import subprocess
from pathlib import Path
import os
CONFIG_NAME = Path(os.path.dirname(os.path.realpath(__file__))) / "conf/desktop.conf"
XRANDR = "/usr/bin/xrandr"
def read_config():
"""Reads config file of desktop setup
This function will reeds the config file of desktop. Config file contins
a default monitor port name which will be the main Monitor display.
NOTE: eGalax devide will be detected automatically from the /dev mountings.
"""
config = configparser.ConfigParser()
read = config.read(CONFIG_NAME)
if not read:
raise FileNotFoundError("Desktop config file not found")
return config
def all_connected_monitor():
"""Generates all connected monitors
as a tuple of (atom name: str, width, height)
"""
display = Xlib.display.Display()
root = display.screen().root
for m in root.xrandr_get_monitors().monitors:
yield (
display.get_atom_name(m.name),
m.width_in_pixels,
m.height_in_pixels,
)
def get_edid_name(drm_name: str):
"""Change eGalax DRM name to atom name
This function is very sensitive to kernel version and might not work
with some kernels.
"""
card_num, name = drm_name[4:].split("-", maxsplit=1)
first, second = name.rsplit("-", maxsplit=1)
return first + "-" + card_num + "-" + second
def prepare_monitors(config):
"""Prepare monitor names
Rules:
- Use default monitor port as the main monitor if it is connected
- If default monitor is not connected then use a monitor with
minimum edit distance.
- Use eGalax as touchpanel and if it's not connected return None.
- other connected monitors will be returned as a list
each monitor is returen as a
Returns
tuple: of
- main monitor -> tuple | None if no monitor available other than
touchpanel
- touchpanel -> tuple | None if touchpanel did not exist
- other -> list: this list may be empty if there is no other
monitor connected
"""
main = config["DEFAULT"]["MainDisplay"]
all_monitors = list(all_connected_monitor())
egalax_drm = get_egalax_drm_pure_name()
egalax_name = get_edid_name(egalax_drm) if egalax_drm else None
egalax_monitor = None
main_monitor = None
for mon in all_monitors:
if egalax_name == mon[0]:
egalax_monitor = mon
if main == mon[0]:
main_monitor = mon
if egalax_monitor:
all_monitors.remove(egalax_monitor)
if not main_monitor:
try:
min_monitor = min(
all_monitors,
key=lambda x: edit_distance(main, x, len(main), len(x)),
)
main_monitor = min_monitor
except:
main_monitor = None
assert len(all_monitors) == 0
if main_monitor:
all_monitors.remove(main_monitor)
return main_monitor, egalax_monitor, all_monitors
def baseline(main, egalax):
"""Base of xrandr arguments
Both main and egalax are monitor tuples mentioned in prepare_monitors"""
if not main and not egalax:
return [], None
elif not main and egalax:
return ["--output", egalax[0], "--primary",
"--mode", f"{egalax[1]}x{egalax[2]}"], None
elif main and not egalax:
return ["--output", main[0], "--primary",
"--mode", f"{main[1]}x{main[2]}"], main[0]
else:
return ["--output", main[0], "--primary",
"--mode", f"{main[1]}x{main[2]}",
"--output", egalax[0], "--right-of", main[0],
"--mode", f"{egalax[1]}x{egalax[1]}"], egalax[0]
def mirror_displays(main, egalax, others: list):
base, should_mirror = baseline(main, egalax)
if should_mirror:
for name, width, height in others:
base.extend(["--output", name, "--mode", f"{width}x{height}",
"--same-as", main[0],
"--scale-from", f"{main[1]}x{main[2]}"])
return base
def off_displays(main, egalax, others: list):
base, should_off = baseline(main, egalax)
if should_off:
for name, width, height in others:
base.extend(["--output", name, "--off"])
return base
def stand_alone_displays(main, egalax, others: list):
base, rightmost = baseline(main, egalax)
if rightmost:
for name, width, height in others:
base.extend(["--output", name, "--mode", f"{width}x{height}",
"--right-of", rightmost])
rightmost = name
return base
POLICY = {
'Off': off_displays,
'Mirror': mirror_displays,
'StandAlone': stand_alone_displays
}
def config_xrandr(conf, main, egalax, others: list):
"""Executes xrandr with policy in the conf
Policies:
Policies are about monitors other than main and touch panel monitors.
There are three supported policies:
- Off: Disables other monitors (default policy if not in config)
- Mirror: Mirror other displays from main monitor.
- StandAlone: Each monitor is mapped to the right of each other
"""
try:
policy = conf['DEFAULT']['Policy']
except:
policy = 'Off'
args = POLICY[policy](main, egalax, others)
cmd = [XRANDR] + args
subprocess.run(cmd)
if __name__ == "__main__":
conf = read_config()
main, egalax, others = prepare_monitors(conf)
config_xrandr(conf, main, egalax, others)

64
src/scripts/python/udevhandle.py

@ -0,0 +1,64 @@
from abc import ABC, abstractmethod
from pyudev import Context, Monitor, MonitorObserver, Device
class Handler(ABC):
"""Abstract Handler calss for device monitoring
NOTE: No checking are done for overlaping filters and callback will be
even by multiple handlers.
Args:
ABC: Abstract Base Class, provides abstract method functionality and
readability.
"""
def __init__(self, filter) -> None:
"""Initiate a monitor observer and applies `filter` if any provided
Args:
filter (_type_): _description_
"""
monitor = Monitor.from_netlink(Context())
if filter:
monitor.filter_by(filter)
self.observer = MonitorObserver(monitor, callback=self.handler)
self.observer.start()
@abstractmethod
def callback(self, device: Device):
"""Callback
This method must be implemented by child calsses. This method is
responsible for further managments of the devices related to its filter.
Args:
device (pyudev.Device): device passed by observer through handler
"""
raise NotImplemented("Callback MUST be implemented")
def handler(self, device):
"""wrapper around callback implemented
Args:
device (pyudev.Device): modified device passed by self.observer
"""
self.callback(device)
class MouseHandler(Handler):
def __init__(self) -> None:
"""Initiate UsbHanlder
Initialization contains two major steps. First it would do a
configuration for currently available devices and then it would wait for
USB udev events to reconfigure the settings. configurations would be
done by (This part is not decided yet. it could be done by BASH SCRIPTS
or we can invoke xinput binaries via python itself. a bash script
solution would be benefitial since it can used as utility).
"""
# TODO: use somthing that only captures
super().__init__("usb")
def callback(self, device):
print(device.action)

1
src/scripts/python/util/__init__.py

@ -0,0 +1 @@
from .common import edit_distance, max_match

63
src/scripts/python/util/common.py

@ -0,0 +1,63 @@
"""Common Utilities"""
def max_match(a: str, b: str) -> str:
"""Maximum matching of intersection of pair of string
This function will return the intersection of two strings from the start.
Example:
>>> a = "/sys/devices/folan/bahman"
>>> b = "/sys/devices/fol/bahman"
>>> max_match(a,b)
'/sys/dedices/fol'
Args:
a (str): firsrt string
b (str): second string
Returns:
str: intersection of two strings OR None if one or both strings are
empty or None
"""
i = 0
if not a or not b:
return None
if len(b) < len(a):
a, b = b, a
for c in a:
if b[i] == c:
i += 1
else:
return a[0:i]
def edit_distance(str1, str2, m, n):
# If first string is empty, the only option is to
# insert all characters of second string into first
if m == 0:
return n
# If second string is empty, the only option is to
# remove all characters of first string
if n == 0:
return m
# If last characters of two strings are same, nothing
# much to do. Ignore last characters and get count for
# remaining strings.
if str1[m - 1] == str2[n - 1]:
return edit_distance(str1, str2, m - 1, n - 1)
# If last characters are not same, consider all three
# operations on last character of first string, recursively
# compute minimum cost for all three operations and take
# minimum of three values.
return 1 + min(
edit_distance(str1, str2, m, n - 1), # Insert
edit_distance(str1, str2, m - 1, n), # Remove
edit_distance(str1, str2, m - 1, n - 1), # Replace
)

80
src/scripts/python/util/egalax.py

@ -0,0 +1,80 @@
"""eGalax
This module is responsible for detecting the touchpanel. It would detect the
touchpannel's drm output and its overal status.
"""
from pathlib import Path
from .x import get_edid_dev_path
from .common import max_match
VENDOR_ID = "0EEF"
DEVICE_ID = "C000"
def get_egalax_path() -> Path:
"""Get device path
This function will return the path of the HID device related to the pannel.
NOTE that it is not the path of the EDID but it can be extracted from it.
Returns:
Path: Path of the eGalax hid device OR None if device is not ceonnected
"""
query = "*" + VENDOR_ID + ":" + DEVICE_ID + "*"
devices = list(Path("/sys/devices").rglob(query))
if devices:
return devices[0]
else:
return None
def is_egalax_connected() -> bool:
"""Checks if device is connected
avaiability of the device is checked by existing a path of the device in the
/sys/devices directory.
Returns:
bool: True if device is connected
"""
devpath = get_egalax_path()
return bool(devpath)
def get_egalax_edid_path() -> Path:
"""return EDID path of touchpannel rdm
This function will find intersection of the edid pathes and eGalax hid
device and if this intersection and returns the maximum match.
Runtime: 160ms on average -> Not efficient
Returns:
Path: edid path of eGalax OR None if not found or device is'nt connected
"""
egalax_dev = get_egalax_path()
if not egalax_dev:
return None
max_dir = "/sys/devices"
max_path = None
for path in get_edid_dev_path():
base_dir = max_match(str(path), str(egalax_dev))
if len(max_dir) < len(base_dir):
max_dir = base_dir
max_path = path
# TODO add sanity check (both edid and VENDOR:DEVICE are in that base)
return max_path
def get_egalax_drm_pure_name() -> str:
"""Extract DRM name form edid path
Returns:
str: pure drm name OR none if device is not found
"""
edid_path = get_egalax_edid_path()
if edid_path:
return str(edid_path.parent.stem)
else:
return None

198
src/scripts/python/util/pointer.py

@ -0,0 +1,198 @@
import util.x as xutil
# Pointer states
SLAVE, MASTER, FLOATING = range(3)
class XInput:
"""Base XInput class
Attributes:
name (str): name of the input
id (int): id of the input
is_master (bool): True if device is master
"""
def __init__(self, name, id, state) -> None:
"""Initializes the class with name, id and master status
Args:
name (str): name of the input. No processing is done on the name
id (int): id of the input
is_master (bool): master status of the input device
"""
self.name = name
self.id = id
self.state = state
class Pointer(XInput):
"""Pointer class
This class is a wrapper around xinput commandline --list output.
Attrs:
is_master (bool): True if the pointer is a master pointer else False
"""
def __init__(self, name, id, state) -> None:
super().__init__(name, id, state)
def __repr__(self) -> str:
return f"<Pointer: {self.name}-{self.id}-{self.state}>"
@property
def slave(self):
return self.state == SLAVE
@property
def master(self):
return self.state == MASTER
def floating(self):
return self.state == FLOATING
def get_short_pointer(id) -> Pointer:
"""Generates Pointer object corresponding to id (short attrs)
Args:
id (int): pointer id
Returns:
Pointer: pointer object with name, id and is_master props
Rises:
ValueError: if id is not reistered with xinput
ValueError: if id is not a pointer id
"""
desc = xutil.get_list_short_with(id)
name, props = desc.rsplit("id=", 1)
if "pointer" in props:
state = FLOATING
if "master" in props:
state = MASTER
elif "slave" in props:
state = SLAVE
return Pointer(name.strip(), props.split(maxsplit=1)[0], state)
else:
raise TypeError(f"id[{id}] is not a pointer id")
def get_ids_iter():
"""xinput id generator
Yields:
int: id of xinput devices
"""
ids = xutil.get_ids()
for id in ids:
yield id
def get_pointer_iter(is_short=True):
"""xinput pointers generator
Args:
is_short (bool, optional): if True generates short type pointers.
Defaults to True.
Yields:
Pointer: xinput pointers
"""
for id in get_ids_iter():
if is_short:
try:
yield get_short_pointer(id)
except TypeError as e:
# ignore if the id is not pointer
pass
except e:
# TODO: logging
pass
else:
pass # TODO
def get_pointers(is_short=True):
"""Wraps pointers in `xinput --list` in Pointer class
Creation of the pointer is done by getting the list description of
each id. if the is_short arg is True, then short list description will be
used which will provide the class `name`, `is_master` and `id` values.
Getting this pointers is done by first calling `xinput --list --id-only` to
get ids and then execute `xinput --list {id}` to get the description with
less-complicated output compare to iterating over `xinput --list --short`
line by line (--short option has some special characters that cause overhead
to the system for processing them individually and per-case).
"""
pointers = []
for pointer in get_pointer_iter(is_short):
pointers.append(pointer)
return pointers
def create_touch_master():
"""_summary_
Raises:
SystemError: If creation of touch pointer failed
Returns:
Pointer: pointer object corresponding to `touch` master
"""
touch = None
xutil.create_master("touch")
id = xutil.get_xi_id_by_name("touch pointer")
if id:
try:
touch = get_short_pointer(id)
except:
raise SystemError(
"touch pointer is not available. cannot create touch pointer"
)
else:
raise SystemError(
"touch pointer is not available. cannot create touch pointer"
)
# TODO configure cursor bitmap
return touch
def get_pointers_categorized():
"""Categorized Pointers
Categories:
1. VCore: Pointer
2. Touch Master: Pointer
3. eGalax: Pointer | None
4. Other non-masters: List[Pinter]
Raises:
SystemError: If creation of touch pointer failed
Returns:
"""
v_core = None
touch = None
e_galax = None
pointers = []
# filter pointers
for pointer in get_pointer_iter():
if pointer.name == "Virtual core pointer":
v_core = pointer
elif "eGalax" in pointer.name:
e_galax = pointer
elif pointer.name == "touch pointer":
touch = pointer
elif not pointer.master:
pointers.append(pointer)
if not touch:
touch = create_touch_master()
return v_core, touch, e_galax, pointers

113
src/scripts/python/util/randr.py

@ -0,0 +1,113 @@
"""RandR
Author: Ali Hatami Tajik [hatam](mailto:a.hatam008@gmail.com)
Creation Date: 2023 Mar 04
---
This module provides a wrapper utility around xrandr.
Classes:
Mode
Setting
Screen
Dir
Pos
Utilities:
get_screens
"""
from enum import Enum
from dataclasses import dataclass
from typing import List
from Xlib.ext import randr as rnd
# TODO: Option class which can be applied by get_args method
# TODO: Screen-related option class ~
# TODO: abs position
class Pos(Enum):
"""Position types in xrandr
Position the output relative to the position of another output.
"""
LEFT_OF = (0,)
RIGHT_OF = (1,)
ABOVE = (2,)
BELOW = (3,)
SAME_AS = 4
class RotationDir(Enum):
"""Rotation direction
This causes the output contents to be rotated in the specified direction.
"""
NORMAL = (0,)
LEFT = (1,)
RIGHT = (2,)
INVERTED = 3
class ReflectDir(Enum):
"""Reflection direction
This causes the output contents to be reflected across the specified axes.
"""
NORMAL = (0,)
X = (1,)
Y = (2,)
XY = 3
@dataclass
class Setting:
"""Settings of a screen
This data struct will be used as the config of each screen. Note that
default screen cannot be use
"""
resolution = (None,)
is_primary = (False,)
is_enabeled = (True,)
rotation = None
position = None
reflection = None
@dataclass
class Mode:
"""Mode
Mode of the screen including width, height, refresh rate(s)
"""
height: int = 0
width: int = 0
frequency: List[int] = []
class Screen:
"""Screen class
This class will hold screen properties and methods related to the screens.
At the time it will use xrandr (and not the verbose mode) to list the
screens and modes.
"""
class Monitor:
"""Monitor Class
List Monitor Outputs and their states
"""

106
src/scripts/python/util/x.py

@ -0,0 +1,106 @@
import subprocess
from pathlib import Path
import os
ENCODING = "utf-8"
XINPUT = "/ust/bin/xinput"
def exec_xinput(args: list):
args.insert(0, XINPUT)
_read, _write = os.pipe()
write_fd = os.fdopen(_write, "w", 0)
os.read()
def get_list_short():
"""Returns string output of the `xinput --list --short` command encoded as
UTF-8"""
completed = subprocess.run(
[XINPUT, "--list", "--short"], capture_output=True
)
return completed.stdout.decode(ENCODING)
def get_list_short_with(id):
"""Short List of the id
Args:
id (int): id registered in xinput
Rises:
ValueError: in case of id not found in devices
"""
completed = subprocess.run(
[XINPUT, "--list", "--short", str(id)], capture_output=True
)
if completed.returncode == 0:
return completed.stdout.decode(ENCODING)
else:
ValueError(f"id[{id}] is not registered")
def reattach(id, master):
"""Reattach a device to a master
Args:
id (str|int): name of the slave or id
master (_type_): _description_
TODO: Error handling should be done. BUT, if the master is not a master or
id is not valid, xinput will not do anything and nothing bad will happen :)
"""
completed = subprocess.run(
[XINPUT, "--reattach", str(id), str(master)], capture_output=True
)
return completed.returncode
def get_ids():
"""returns list of ids registered in xinput"""
completed = subprocess.run(
[XINPUT, "--list", "--id-only"], capture_output=True
)
return list(map(int, completed.stdout.decode(ENCODING).split()))
def create_master(name: str = "touch"):
"""Creates master with specified name
Args:
name (str, optional): name of the master. Defaults to 'touch'.
"""
completed = subprocess.run([XINPUT, "create-master", name])
return completed.returncode
def get_xi_id_by_name(name):
"""find device id from name
Args:
name (str): name of the device
"""
completed = subprocess.run(
[XINPUT, "list", "--id-only", name], capture_output=True
)
if completed.returncode == 1:
return None
else:
return int(completed.stdout.decode(ENCODING))
def map_to_output(output, device_id):
# TODO
pass
def get_edid_dev_path():
"""returns iterator of pathes of devices with edid
devices which has EDID are monitors.
"""
return Path("/sys/devices").rglob("edid")

12
src/scripts/setupmonitor.sh

@ -0,0 +1,12 @@
#!/bin/bash
# This script will run when drm change event detected.
# This sctipt should be placed in /usr/local/bin
# SONOLOG file must be set beforehand in the udev rule
# MONITOR_LOCK should be set
(
flock -n 100 || exit 1
sleep 1 # wait until all changes take place
xrandr --auto
python3 /usr/bin/local/python/setupmonitor.py
echo $(data) - INFO - Setup Monitor Done >> $SONOLOG
) 100> $MONITOR_LOCK

195
src/scripts/utils/progressbar.sh

@ -0,0 +1,195 @@
#!/bin/bash
# https://github.com/pollev/bash_progress_bar - See license at end of file
# Constants
CODE_SAVE_CURSOR="\033[s"
CODE_RESTORE_CURSOR="\033[u"
CODE_CURSOR_IN_SCROLL_AREA="\033[1A"
COLOR_FG="\e[30m"
COLOR_BG="\e[42m"
COLOR_BG_BLOCKED="\e[43m"
RESTORE_FG="\e[39m"
RESTORE_BG="\e[49m"
# Variables
PROGRESS_BLOCKED="false"
TRAPPING_ENABLED="false"
TRAP_SET="false"
CURRENT_NR_LINES=0
setup_scroll_area() {
# If trapping is enabled, we will want to activate it whenever we setup the scroll area and remove it when we break the scroll area
if [ "$TRAPPING_ENABLED" = "true" ]; then
trap_on_interrupt
fi
lines=$(tput lines)
CURRENT_NR_LINES=$lines
let lines=$lines-1
# Scroll down a bit to avoid visual glitch when the screen area shrinks by one row
log -en "\n"
# Save cursor
log -en "$CODE_SAVE_CURSOR"
# Set scroll region (this will place the cursor in the top left)
log -en "\033[0;${lines}r"
# Restore cursor but ensure its inside the scrolling area
log -en "$CODE_RESTORE_CURSOR"
log -en "$CODE_CURSOR_IN_SCROLL_AREA"
# Start empty progress bar
draw_progress_bar 0
}
destroy_scroll_area() {
lines=$(tput lines)
# Save cursor
log -en "$CODE_SAVE_CURSOR"
# Set scroll region (this will place the cursor in the top left)
log -en "\033[0;${lines}r"
# Restore cursor but ensure its inside the scrolling area
log -en "$CODE_RESTORE_CURSOR"
log -en "$CODE_CURSOR_IN_SCROLL_AREA"
# We are done so clear the scroll bar
clear_progress_bar
# Scroll down a bit to avoid visual glitch when the screen area grows by one row
log -en "\n\n"
# Once the scroll area is cleared, we want to remove any trap previously set. Otherwise, ctrl+c will exit our shell
if [ "$TRAP_SET" = "true" ]; then
trap - INT
fi
}
draw_progress_bar() {
percentage=$1
lines=$(tput lines)
let lines=$lines
# Check if the window has been resized. If so, reset the scroll area
if [ "$lines" -ne "$CURRENT_NR_LINES" ]; then
setup_scroll_area
fi
# Save cursor
log -en "$CODE_SAVE_CURSOR"
# Move cursor position to last row
log -en "\033[${lines};0f"
# Clear progress bar
tput el
# Draw progress bar
PROGRESS_BLOCKED="false"
print_bar_text $percentage
# Restore cursor position
log -en "$CODE_RESTORE_CURSOR"
}
block_progress_bar() {
percentage=$1
lines=$(tput lines)
let lines=$lines
# Save cursor
log -en "$CODE_SAVE_CURSOR"
# Move cursor position to last row
log -en "\033[${lines};0f"
# Clear progress bar
tput el
# Draw progress bar
PROGRESS_BLOCKED="true"
print_bar_text $percentage
# Restore cursor position
log -en "$CODE_RESTORE_CURSOR"
}
clear_progress_bar() {
lines=$(tput lines)
let lines=$lines
# Save cursor
log -en "$CODE_SAVE_CURSOR"
# Move cursor position to last row
log -en "\033[${lines};0f"
# clear progress bar
tput el
# Restore cursor position
log -en "$CODE_RESTORE_CURSOR"
}
print_bar_text() {
local percentage=$1
local cols=$(tput cols)
let bar_size=$cols-17
local color="${COLOR_FG}${COLOR_BG}"
if [ "$PROGRESS_BLOCKED" = "true" ]; then
color="${COLOR_FG}${COLOR_BG_BLOCKED}"
fi
# Prepare progress bar
let complete_size=($bar_size*$percentage)/100
let remainder_size=$bar_size-$complete_size
progress_bar=$(log -ne "["; log -en "${color}"; printf_new "#" $complete_size; log -en "${RESTORE_FG}${RESTORE_BG}"; printf_new "." $remainder_size; log -ne "]");
# Print progress bar
log -ne " Progress ${percentage}% ${progress_bar}"
}
enable_trapping() {
TRAPPING_ENABLED="true"
}
trap_on_interrupt() {
# If this function is called, we setup an interrupt handler to cleanup the progress bar
TRAP_SET="true"
trap cleanup_on_interrupt INT
}
cleanup_on_interrupt() {
destroy_scroll_area
exit
}
printf_new() {
str=$1
num=$2
v=$(printf "%-${num}s" "$str")
log -ne "${v// /$str}"
}
# SPDX-License-Identifier: MIT
#
# Copyright (c) 2018--2020 Polle Vanhoof
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

400
src/scripts/utils/source.sh

@ -0,0 +1,400 @@
#!/usr/bin/env bash
# DESC: Handler for unexpected errors
# ARGS: $1 (optional): Exit code (defaults to 1)
# OUTS: None
function script_trap_err() {
local exit_code=1
# Disable the error trap handler to prevent potential recursion
trap - ERR
# Consider any further errors non-fatal to ensure we run to completion
set +o errexit
set +o pipefail
# Validate any provided exit code
if [[ ${1-} =~ ^[0-9]+$ ]]; then
exit_code="$1"
fi
# Output debug data if in Cron mode
if [[ -n ${cron-} ]]; then
# Restore original file output descriptors
if [[ -n ${script_output-} ]]; then
exec 1>&3 2>&4
fi
# Print basic debugging information
printf '%b\n' "$ta_none"
printf '***** Abnormal termination of script *****\n'
printf 'Script Path: %s\n' "$script_path"
printf 'Script Parameters: %s\n' "$script_params"
printf 'Script Exit Code: %s\n' "$exit_code"
# Print the script log if we have it. It's possible we may not if we
# failed before we even called cron_init(). This can happen if bad
# parameters were passed to the script so we bailed out very early.
if [[ -n ${script_output-} ]]; then
# shellcheck disable=SC2312
printf 'Script Output:\n\n%s' "$(cat "$script_output")"
else
printf 'Script Output: None (failed before log init)\n'
fi
fi
# Exit with failure status
exit "$exit_code"
}
# DESC: Handler for exiting the script
# ARGS: None
# OUTS: None
function script_trap_exit() {
cd "$orig_cwd"
# Remove Cron mode script log
if [[ -n ${cron-} && -f ${script_output-} ]]; then
rm "$script_output"
fi
# Remove script execution lock
if [[ -d ${script_lock-} ]]; then
rmdir "$script_lock"
fi
# Restore terminal colours
printf '%b' "$ta_none"
}
# DESC: Exit script with the given message
# ARGS: $1 (required): Message to print on exit
# $2 (optional): Exit code (defaults to 0)
# OUTS: None
# NOTE: The convention used in this script for exit codes is:
# 0: Normal exit
# 1: Abnormal exit due to external error
# 2: Abnormal exit due to script error
function script_exit() {
if [[ $# -eq 1 ]]; then
printf '%s\n' "$1"
exit 0
fi
if [[ ${2-} =~ ^[0-9]+$ ]]; then
printf '%b\n' "$1"
# If we've been provided a non-zero exit code run the error trap
if [[ $2 -ne 0 ]]; then
script_trap_err "$2"
else
exit 0
fi
fi
script_exit 'Missing required argument to script_exit()!' 2
}
# DESC: Generic script initialisation
# ARGS: $@ (optional): Arguments provided to the script
# OUTS: $orig_cwd: The current working directory when the script was run
# $script_path: The full path to the script
# $script_dir: The directory path of the script
# $script_name: The file name of the script
# $script_params: The original parameters provided to the script
# $ta_none: The ANSI control code to reset all text attributes
# NOTE: $script_path only contains the path that was used to call the script
# and will not resolve any symlinks which may be present in the path.
# You can use a tool like realpath to obtain the "true" path. The same
# caveat applies to both the $script_dir and $script_name variables.
# shellcheck disable=SC2034
function script_init() {
# Useful variables
readonly orig_cwd="$PWD"
readonly script_params="$*"
readonly script_path="${BASH_SOURCE[1]}"
script_dir="$(dirname "$script_path")"
script_name="$(basename "$script_path")"
readonly script_dir script_name
# Important to always set as we use it in the exit handler
# shellcheck disable=SC2155
readonly ta_none="$(tput sgr0 2> /dev/null || true)"
}
# DESC: Initialise colour variables
# ARGS: None
# OUTS: Read-only variables with ANSI control codes
# NOTE: If --no-colour was set the variables will be empty. The output of the
# $ta_none variable after each tput is redundant during normal execution,
# but ensures the terminal output isn't mangled when running with xtrace.
# shellcheck disable=SC2034,SC2155
function colour_init() {
if [[ -z ${no_colour-} ]]; then
# Text attributes
readonly ta_bold="$(tput bold 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly ta_uscore="$(tput smul 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly ta_blink="$(tput blink 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly ta_reverse="$(tput rev 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly ta_conceal="$(tput invis 2> /dev/null || true)"
printf '%b' "$ta_none"
# Foreground codes
readonly fg_black="$(tput setaf 0 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly fg_blue="$(tput setaf 4 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly fg_cyan="$(tput setaf 6 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly fg_green="$(tput setaf 2 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly fg_magenta="$(tput setaf 5 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly fg_red="$(tput setaf 1 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly fg_white="$(tput setaf 7 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly fg_yellow="$(tput setaf 3 2> /dev/null || true)"
printf '%b' "$ta_none"
# Background codes
readonly bg_black="$(tput setab 0 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly bg_blue="$(tput setab 4 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly bg_cyan="$(tput setab 6 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly bg_green="$(tput setab 2 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly bg_magenta="$(tput setab 5 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly bg_red="$(tput setab 1 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly bg_white="$(tput setab 7 2> /dev/null || true)"
printf '%b' "$ta_none"
readonly bg_yellow="$(tput setab 3 2> /dev/null || true)"
printf '%b' "$ta_none"
else
# Text attributes
readonly ta_bold=''
readonly ta_uscore=''
readonly ta_blink=''
readonly ta_reverse=''
readonly ta_conceal=''
# Foreground codes
readonly fg_black=''
readonly fg_blue=''
readonly fg_cyan=''
readonly fg_green=''
readonly fg_magenta=''
readonly fg_red=''
readonly fg_white=''
readonly fg_yellow=''
# Background codes
readonly bg_black=''
readonly bg_blue=''
readonly bg_cyan=''
readonly bg_green=''
readonly bg_magenta=''
readonly bg_red=''
readonly bg_white=''
readonly bg_yellow=''
fi
}
# DESC: Initialise Cron mode
# ARGS: None
# OUTS: $script_output: Path to the file stdout & stderr was redirected to
function cron_init() {
if [[ -n ${cron-} ]]; then
# Redirect all output to a temporary file
script_output="$(mktemp --tmpdir "$script_name".XXXXX)"
readonly script_output
exec 3>&1 4>&2 1> "$script_output" 2>&1
fi
}
# DESC: Acquire script lock
# ARGS: $1 (optional): Scope of script execution lock (system or user)
# OUTS: $script_lock: Path to the directory indicating we have the script lock
# NOTE: This lock implementation is extremely simple but should be reliable
# across all platforms. It does *not* support locking a script with
# symlinks or multiple hardlinks as there's no portable way of doing so.
# If the lock was acquired it's automatically released on script exit.
function lock_init() {
local lock_dir
if [[ $1 = 'system' ]]; then
lock_dir="/tmp/$script_name.lock"
elif [[ $1 = 'user' ]]; then
lock_dir="/tmp/$script_name.$UID.lock"
else
script_exit 'Missing or invalid argument to lock_init()!' 2
fi
if mkdir "$lock_dir" 2> /dev/null; then
readonly script_lock="$lock_dir"
verbose_print "Acquired script lock: $script_lock"
else
script_exit "Unable to acquire script lock: $lock_dir" 1
fi
}
# DESC: Pretty print the provided string
# ARGS: $1 (required): Message to print (defaults to a green foreground)
# $2 (optional): Colour to print the message with. This can be an ANSI
# escape code or one of the prepopulated colour variables.
# $3 (optional): Set to any value to not append a new line to the message
# OUTS: None
function pretty_print() {
if [[ $# -lt 1 ]]; then
script_exit 'Missing required argument to pretty_print()!' 2
fi
if [[ -z ${no_colour-} ]]; then
if [[ -n ${2-} ]]; then
printf '%b' "$2"
else
printf '%b' "$fg_green"
fi
fi
# Print message & reset text attributes
if [[ -n ${3-} ]]; then
printf '%s%b' "$1" "$ta_none"
else
printf '%s%b\n' "$1" "$ta_none"
fi
}
# DESC: Only pretty_print() the provided string if verbose mode is enabled
# ARGS: $@ (required): Passed through to pretty_print() function
# OUTS: None
function verbose_print() {
if [[ -n ${verbose-} ]]; then
pretty_print "$@"
fi
}
# DESC: Combines two path variables and removes any duplicates
# ARGS: $1 (required): Path(s) to join with the second argument
# $2 (optional): Path(s) to join with the first argument
# OUTS: $build_path: The constructed path
# NOTE: Heavily inspired by: https://unix.stackexchange.com/a/40973
function build_path() {
if [[ $# -lt 1 ]]; then
script_exit 'Missing required argument to build_path()!' 2
fi
local new_path path_entry temp_path
temp_path="$1:"
if [[ -n ${2-} ]]; then
temp_path="$temp_path$2:"
fi
new_path=
while [[ -n $temp_path ]]; do
path_entry="${temp_path%%:*}"
case "$new_path:" in
*:"$path_entry":*) ;;
*)
new_path="$new_path:$path_entry"
;;
esac
temp_path="${temp_path#*:}"
done
# shellcheck disable=SC2034
build_path="${new_path#:}"
}
# DESC: Check a binary exists in the search path
# ARGS: $1 (required): Name of the binary to test for existence
# $2 (optional): Set to any value to treat failure as a fatal error
# OUTS: None
function check_binary() {
if [[ $# -lt 1 ]]; then
script_exit 'Missing required argument to check_binary()!' 2
fi
if ! command -v "$1" > /dev/null 2>&1; then
if [[ -n ${2-} ]]; then
script_exit "Missing dependency: Couldn't locate $1." 1
else
verbose_print "Missing dependency: $1" "${fg_red-}"
return 1
fi
fi
verbose_print "Found dependency: $1"
return 0
}
# DESC: Validate we have superuser access as root (via sudo if requested)
# ARGS: $1 (optional): Set to any value to not attempt root access via sudo
# OUTS: None
function check_superuser() {
local superuser
if [[ $EUID -eq 0 ]]; then
superuser=true
elif [[ -z ${1-} ]]; then
# shellcheck disable=SC2310
if check_binary sudo; then
verbose_print 'Sudo: Updating cached credentials ...'
if ! sudo -v; then
verbose_print "Sudo: Couldn't acquire credentials ..." \
"${fg_red-}"
else
local test_euid
test_euid="$(sudo -H -- "$BASH" -c 'printf "%s" "$EUID"')"
if [[ $test_euid -eq 0 ]]; then
superuser=true
fi
fi
fi
fi
if [[ -z ${superuser-} ]]; then
verbose_print 'Unable to acquire superuser credentials.' "${fg_red-}"
return 1
fi
verbose_print 'Successfully acquired superuser credentials.'
return 0
}
# DESC: Run the requested command as root (via sudo if requested)
# ARGS: $1 (optional): Set to zero to not attempt execution via sudo
# $@ (required): Passed through for execution as root user
# OUTS: None
function run_as_root() {
if [[ $# -eq 0 ]]; then
script_exit 'Missing required argument to run_as_root()!' 2
fi
if [[ ${1-} =~ ^0$ ]]; then
local skip_sudo=true
shift
fi
if [[ $EUID -eq 0 ]]; then
"$@"
elif [[ -z ${skip_sudo-} ]]; then
sudo -H -- "$@"
else
script_exit "Unable to run requested command as root: $*" 1
fi
}
function log () {
if [[ $_V -eq 1 ]]; then
echo "$@"
fi
}
# vim: syntax=sh cc=80 tw=79 ts=4 sw=4 sts=4 et sr
Loading…
Cancel
Save