refactored
This commit is contained in:
+167
-138
@@ -2,201 +2,230 @@
|
|||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import os
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional, Union
|
||||||
|
|
||||||
VERBOSE = True
|
# Configuration
|
||||||
|
LOG_FORMAT = "%(message)s"
|
||||||
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def log(*args):
|
# Constants for attribute mappings
|
||||||
if VERBOSE:
|
BOOL_ENABLE_MAP = {"true": "enable", "false": "disable"}
|
||||||
print(*args)
|
BOOL_ALLOW_MAP = {"true": "allow", "false": "disallow"}
|
||||||
|
RGB_RANGE_MAP = {
|
||||||
|
"0": "automatic", "1": "full", "2": "limited",
|
||||||
|
"automatic": "automatic", "full": "full", "limited": "limited"
|
||||||
|
}
|
||||||
|
ROTATION_MAP = {"1": "normal", "2": "left", "4": "inverted", "8": "right"}
|
||||||
|
VRR_POLICY_MAP = {
|
||||||
|
"0": "never", "1": "always", "2": "automatic",
|
||||||
|
"never": "never", "always": "always", "automatic": "automatic"
|
||||||
|
}
|
||||||
|
|
||||||
def run_command(cmd):
|
@dataclass
|
||||||
log(f"[CMD] {cmd}")
|
class OutputConfig:
|
||||||
try:
|
"""Represents the configuration for a single display output."""
|
||||||
subprocess.run(cmd, shell=True, check=True)
|
data: Dict[str, Any]
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
log(f"Error executing command: {e}")
|
|
||||||
|
|
||||||
def get_mode_string(output, mode_id):
|
@property
|
||||||
for mode in output.get('modes', []):
|
def name(self) -> str:
|
||||||
if mode['id'] == mode_id:
|
return self.data.get("name", "Unknown")
|
||||||
width = mode['size']['width']
|
|
||||||
height = mode['size']['height']
|
|
||||||
refresh_rate = round(mode['refreshRate'])
|
|
||||||
return f"{width}x{height}@{refresh_rate}"
|
|
||||||
return None
|
|
||||||
|
|
||||||
def save_profile(profile_path):
|
@property
|
||||||
log(f"Saving current display profile to {profile_path}...")
|
def is_enabled(self) -> bool:
|
||||||
try:
|
return bool(self.data.get("enabled", False))
|
||||||
with open(profile_path, 'w') as f:
|
|
||||||
subprocess.run(['kscreen-doctor', '--json'], stdout=f, check=True)
|
|
||||||
log("Profile saved successfully.")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error saving profile: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def apply_attribute(output_name, attribute, value, value_map=None):
|
def get(self, key: str, default: Any = None) -> Any:
|
||||||
if value is None:
|
return self.data.get(key, default)
|
||||||
log(f"Output {output_name} has no attribute {attribute}, skipping...")
|
|
||||||
return
|
|
||||||
|
|
||||||
# If it's a list or dict, something might be wrong or it's a complex structure we don't handle this way
|
class KDEProfileManager:
|
||||||
if isinstance(value, (list, dict)):
|
"""Manages KDE display profiles using kscreen-doctor."""
|
||||||
log(f"Output {output_name} attribute {attribute} has complex value {value}, skipping...")
|
|
||||||
return
|
|
||||||
|
|
||||||
str_value = str(value).lower()
|
def __init__(self, dry_run: bool = False):
|
||||||
if value_map:
|
self.dry_run = dry_run
|
||||||
if str_value in value_map:
|
|
||||||
value = value_map[str_value]
|
|
||||||
else:
|
|
||||||
log(f"Warning: Value {str_value} not found in map for {attribute}, using as is.")
|
|
||||||
|
|
||||||
cmd = f"kscreen-doctor output.{output_name}.{attribute}.{value}"
|
def _run_command(self, cmd: Union[str, List[str]]) -> None:
|
||||||
run_command(cmd)
|
"""Executes a shell command."""
|
||||||
|
cmd_str = cmd if isinstance(cmd, str) else " ".join(cmd)
|
||||||
|
logger.info(f"[CMD] {cmd_str}")
|
||||||
|
|
||||||
def load_profile(profile_path):
|
if self.dry_run:
|
||||||
log(f"Loading display profile from {profile_path}...")
|
return
|
||||||
if not os.path.exists(profile_path):
|
|
||||||
print(f"Profile file not found: {profile_path}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(profile_path, 'r') as f:
|
subprocess.run(cmd, shell=isinstance(cmd, str), check=True, capture_output=False)
|
||||||
profile = json.load(f)
|
except subprocess.CalledProcessError as e:
|
||||||
except Exception as e:
|
logger.error(f"Error executing command: {e}")
|
||||||
print(f"Error reading profile JSON: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
outputs = profile.get('outputs', [])
|
def save_profile(self, profile_path: Path) -> None:
|
||||||
|
"""Saves the current display configuration to a JSON file."""
|
||||||
|
logger.info(f"Saving current display profile to {profile_path}...")
|
||||||
|
try:
|
||||||
|
profile_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(profile_path, "w") as f:
|
||||||
|
subprocess.run(["kscreen-doctor", "--json"], stdout=f, check=True)
|
||||||
|
logger.info("Profile saved successfully.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to save profile: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
# Maps from JSON values to kscreen-doctor options
|
def _apply_attribute(self, output_name: str, attribute: str, value: Any, value_map: Optional[Dict[str, str]] = None) -> None:
|
||||||
bool_enable_map = {"true": "enable", "false": "disable"}
|
"""Applies a single attribute to an output."""
|
||||||
bool_allow_map = {"true": "allow", "false": "disallow"}
|
if value is None:
|
||||||
rgb_range_map = {
|
logger.debug(f"Output {output_name} has no attribute {attribute}, skipping...")
|
||||||
"0": "automatic", "1": "full", "2": "limited",
|
return
|
||||||
"automatic": "automatic", "full": "full", "limited": "limited"
|
|
||||||
}
|
|
||||||
rotation_map = {"1": "normal", "2": "left", "4": "inverted", "8": "right"}
|
|
||||||
vrr_policy_map = {
|
|
||||||
"0": "never", "1": "always", "2": "automatic",
|
|
||||||
"never": "never", "always": "always", "automatic": "automatic"
|
|
||||||
}
|
|
||||||
|
|
||||||
# 1. Handle Enable/Disable (Enable first to avoid no-output state)
|
if isinstance(value, (list, dict)):
|
||||||
log("Enabling enabled outputs...")
|
logger.warning(f"Output {output_name} attribute {attribute} has complex value {value}, skipping...")
|
||||||
for out in outputs:
|
return
|
||||||
if out.get('enabled'):
|
|
||||||
run_command(f"kscreen-doctor output.{out['name']}.enable")
|
|
||||||
|
|
||||||
log("Disabling disabled outputs...")
|
str_value = str(value).lower()
|
||||||
for out in outputs:
|
if value_map:
|
||||||
if not out.get('enabled'):
|
if str_value in value_map:
|
||||||
run_command(f"kscreen-doctor output.{out['name']}.disable")
|
value = value_map[str_value]
|
||||||
|
else:
|
||||||
|
logger.warning(f"Value '{str_value}' not found in map for '{attribute}', using as is.")
|
||||||
|
|
||||||
# 2. Apply other attributes
|
self._run_command(f"kscreen-doctor output.{output_name}.{attribute}.{value}")
|
||||||
for out in outputs:
|
|
||||||
name = out['name']
|
|
||||||
log(f"Configuring output: {name}")
|
|
||||||
|
|
||||||
# WCG
|
def _get_mode_string(self, output: OutputConfig, mode_id: str) -> Optional[str]:
|
||||||
apply_attribute(name, "wcg", out.get('wcg'), bool_enable_map)
|
"""Finds the mode string (WxH@R) for a given mode ID."""
|
||||||
|
for mode in output.get("modes", []):
|
||||||
|
if str(mode.get("id")) == str(mode_id):
|
||||||
|
size = mode.get("size", {})
|
||||||
|
width = size.get("width")
|
||||||
|
height = size.get("height")
|
||||||
|
refresh = round(mode.get("refreshRate", 0))
|
||||||
|
if width and height:
|
||||||
|
return f"{width}x{height}@{refresh}"
|
||||||
|
return None
|
||||||
|
|
||||||
# SDR Brightness
|
def _configure_output(self, output: OutputConfig, all_outputs: List[OutputConfig]) -> None:
|
||||||
apply_attribute(name, "sdr-brightness", out.get('sdr-brightness'))
|
"""Configures all attributes for a single output."""
|
||||||
|
name = output.name
|
||||||
|
logger.info(f"Configuring output: {name}")
|
||||||
|
|
||||||
# VRR Policy
|
# Direct mappings
|
||||||
apply_attribute(name, "vrrpolicy", out.get('vrrPolicy'), vrr_policy_map)
|
self._apply_attribute(name, "wcg", output.get("wcg"), BOOL_ENABLE_MAP)
|
||||||
|
self._apply_attribute(name, "sdr-brightness", output.get("sdr-brightness"))
|
||||||
|
self._apply_attribute(name, "vrrpolicy", output.get("vrrPolicy"), VRR_POLICY_MAP)
|
||||||
|
self._apply_attribute(name, "rgbrange", output.get("rgbRange"), RGB_RANGE_MAP)
|
||||||
|
self._apply_attribute(name, "overscan", output.get("overscan"))
|
||||||
|
self._apply_attribute(name, "hdr", output.get("hdr"), BOOL_ENABLE_MAP)
|
||||||
|
|
||||||
# RGB Range
|
# Brightness (normalized 0.0-1.0 to 0-100)
|
||||||
apply_attribute(name, "rgbrange", out.get('rgbRange'), rgb_range_map)
|
brightness = output.get("brightness")
|
||||||
|
|
||||||
# Overscan
|
|
||||||
apply_attribute(name, "overscan", out.get('overscan'))
|
|
||||||
|
|
||||||
# HDR
|
|
||||||
apply_attribute(name, "hdr", out.get('hdr'), bool_enable_map)
|
|
||||||
|
|
||||||
# Brightness (0.0-1.0 to 0-100)
|
|
||||||
brightness = out.get('brightness')
|
|
||||||
if brightness is not None:
|
if brightness is not None:
|
||||||
apply_attribute(name, "brightness", int(float(brightness) * 100))
|
self._apply_attribute(name, "brightness", int(float(brightness) * 100))
|
||||||
|
|
||||||
# Max BPC
|
# Max BPC
|
||||||
max_bpc = out.get('maxBpc')
|
max_bpc = output.get("maxBpc")
|
||||||
if max_bpc == 0:
|
if max_bpc == 0:
|
||||||
max_bpc = "automatic"
|
max_bpc = "automatic"
|
||||||
apply_attribute(name, "maxbpc", max_bpc)
|
self._apply_attribute(name, "maxbpc", max_bpc)
|
||||||
|
|
||||||
# DDC/CI
|
# DDC/CI
|
||||||
apply_attribute(name, "ddcCi", out.get('ddcCiAllowed'), bool_allow_map)
|
self._apply_attribute(name, "ddcCi", output.get("ddcCiAllowed"), BOOL_ALLOW_MAP)
|
||||||
|
|
||||||
# Mirroring
|
# Mirroring
|
||||||
replication_source_id = out.get('replicationSource', 0)
|
mirror_source_id = output.get("replicationSource", 0)
|
||||||
if replication_source_id != 0:
|
if mirror_source_id != 0:
|
||||||
# Find the name of the replication source
|
source_name = next((o.name for o in all_outputs if o.get("id") == mirror_source_id), "none")
|
||||||
source_name = "none"
|
self._apply_attribute(name, "mirror", source_name)
|
||||||
for other_out in outputs:
|
|
||||||
if other_out.get('id') == replication_source_id:
|
|
||||||
source_name = other_out['name']
|
|
||||||
break
|
|
||||||
apply_attribute(name, "mirror", source_name)
|
|
||||||
else:
|
else:
|
||||||
apply_attribute(name, "mirror", "none")
|
self._apply_attribute(name, "mirror", "none")
|
||||||
|
|
||||||
# ICC Profile
|
# ICC Profile
|
||||||
icc_path = out.get('iccProfilePath')
|
self._apply_attribute(name, "iccProfilePath", output.get("iccProfilePath"))
|
||||||
if icc_path:
|
|
||||||
apply_attribute(name, "iccProfilePath", icc_path)
|
|
||||||
|
|
||||||
# Mode
|
# Mode
|
||||||
mode_id = out.get('currentModeId')
|
mode_id = output.get("currentModeId")
|
||||||
if mode_id:
|
if mode_id:
|
||||||
mode_str = get_mode_string(out, mode_id)
|
mode_str = self._get_mode_string(output, mode_id)
|
||||||
if mode_str:
|
if mode_str:
|
||||||
apply_attribute(name, "mode", mode_str)
|
self._apply_attribute(name, "mode", mode_str)
|
||||||
|
|
||||||
# Position
|
# Position
|
||||||
pos = out.get('pos')
|
pos = output.get("pos")
|
||||||
if pos:
|
if pos:
|
||||||
apply_attribute(name, "position", f"{pos['x']},{pos['y']}")
|
self._apply_attribute(name, "position", f"{pos.get('x', 0)},{pos.get('y', 0)}")
|
||||||
|
|
||||||
# Scale
|
# Scale and Rotation
|
||||||
apply_attribute(name, "scale", out.get('scale'))
|
self._apply_attribute(name, "scale", output.get("scale"))
|
||||||
|
self._apply_attribute(name, "rotation", output.get("rotation"), ROTATION_MAP)
|
||||||
|
|
||||||
# Rotation
|
# Priority and Primary status
|
||||||
apply_attribute(name, "rotation", out.get('rotation'), rotation_map)
|
priority = output.get("priority")
|
||||||
|
|
||||||
# Priority and Primary
|
|
||||||
priority = out.get('priority')
|
|
||||||
if priority is not None:
|
if priority is not None:
|
||||||
if priority == 1:
|
if priority == 1:
|
||||||
run_command(f"kscreen-doctor output.{name}.primary")
|
self._run_command(f"kscreen-doctor output.{name}.primary")
|
||||||
run_command(f"kscreen-doctor output.{name}.priority.{priority}")
|
self._run_command(f"kscreen-doctor output.{name}.priority.{priority}")
|
||||||
|
|
||||||
log("Display configuration restored.")
|
def load_profile(self, profile_path: Path) -> None:
|
||||||
|
"""Loads and applies a display configuration from a JSON file."""
|
||||||
|
logger.info(f"Loading display profile from {profile_path}...")
|
||||||
|
|
||||||
|
if not profile_path.exists():
|
||||||
|
logger.error(f"Profile file not found: {profile_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(profile_path, "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error reading profile JSON: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
outputs = [OutputConfig(out) for out in data.get("outputs", [])]
|
||||||
|
|
||||||
|
# 1. Handle Enable/Disable (Enable first to ensure we always have an active display)
|
||||||
|
logger.info("Enabling requested outputs...")
|
||||||
|
for out in outputs:
|
||||||
|
if out.is_enabled:
|
||||||
|
self._run_command(f"kscreen-doctor output.{out.name}.enable")
|
||||||
|
|
||||||
|
logger.info("Disabling requested outputs...")
|
||||||
|
for out in outputs:
|
||||||
|
if not out.is_enabled:
|
||||||
|
self._run_command(f"kscreen-doctor output.{out.name}.disable")
|
||||||
|
|
||||||
|
# 2. Configure each output
|
||||||
|
for out in outputs:
|
||||||
|
if out.is_enabled:
|
||||||
|
self._configure_output(out, outputs)
|
||||||
|
|
||||||
|
logger.info("Display configuration restored.")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="KDE Display Profile Manager")
|
parser = argparse.ArgumentParser(description="KDE Display Profile Manager")
|
||||||
|
parser.add_argument("--dry-run", action="store_true", help="Print commands without executing them")
|
||||||
|
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
|
||||||
|
|
||||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||||
|
|
||||||
# Save command
|
save_parser = subparsers.add_parser("save", help="Save current configuration")
|
||||||
save_parser = subparsers.add_parser("save", help="Save current display configuration to a profile")
|
save_parser.add_argument("profile", type=Path, help="Path to save the profile")
|
||||||
save_parser.add_argument("profile", help="Path to the profile file (e.g., profiles/myprofile.json)")
|
|
||||||
|
|
||||||
# Load command
|
load_parser = subparsers.add_parser("load", help="Load a configuration")
|
||||||
load_parser = subparsers.add_parser("load", help="Load a display configuration from a profile")
|
load_parser.add_argument("profile", type=Path, help="Path to the profile to load")
|
||||||
load_parser.add_argument("profile", help="Path to the profile file")
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.verbose:
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
manager = KDEProfileManager(dry_run=args.dry_run)
|
||||||
|
|
||||||
if args.command == "save":
|
if args.command == "save":
|
||||||
save_profile(args.profile)
|
manager.save_profile(args.profile)
|
||||||
elif args.command == "load":
|
elif args.command == "load":
|
||||||
load_profile(args.profile)
|
manager.load_profile(args.profile)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user