201 lines
7.7 KiB
Python
Executable File
201 lines
7.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
"""Adjust Kraken X42/X52/X62/X72 speeds dynamically, with software.
|
||
|
||
Deprecated proof of concept, use ./yoda instead.
|
||
|
||
Usage:
|
||
krakencurve-poc [options] show-sensors
|
||
krakencurve-poc [options] control --pump <profile> --fan <profile>
|
||
krakencurve-poc --help
|
||
krakencurve-poc --version
|
||
|
||
Options:
|
||
--pump <profile> Profile to use for the pump
|
||
--fan <profile> Profile to use for the fan
|
||
--pump-sensor <sensor> Select alternate sensor for pump speed
|
||
--fan-sensor <sensor> Select alternate sensor for fan speed
|
||
--interval <seconds> Update interval in seconds [default: 2]
|
||
-v, --verbose Output additional information
|
||
-g, --debug Show debug information on stderr
|
||
--version Display the version number
|
||
--help Show this message
|
||
|
||
Requirements:
|
||
all platforms liquidctl, including the Python APIs (pip install liquidctl)
|
||
Linux/FreeBSD psutil (pip install psutil)
|
||
macOS iStats (gem install iStats)
|
||
Windows none, system sensors not yet supported
|
||
|
||
Changelog:
|
||
0.0.3 Fix casing of log and error messages
|
||
0.0.2 MacOS support for iStats; breaking refresh of the CLI and sensor names
|
||
0.0.1 Initial proof-of-concept; system sensors only supported on Linux
|
||
|
||
Copyright (C) 2018–2021 Jonas Malaco
|
||
SPDX-License-Identifier: GPL-3.0-or-later
|
||
"""
|
||
|
||
import ast
|
||
import logging
|
||
import sys
|
||
import time
|
||
|
||
from docopt import docopt
|
||
from liquidctl.driver.kraken_two import KrakenTwoDriver
|
||
from liquidctl.util import normalize_profile, interpolate_profile
|
||
|
||
if sys.platform == 'darwin':
|
||
import re
|
||
import subprocess
|
||
elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
|
||
import psutil
|
||
|
||
VERSION = '0.0.3'
|
||
LOGGER = logging.getLogger(__name__)
|
||
|
||
LIQUID_SENSOR = 'kraken.coolant'
|
||
|
||
|
||
def read_sensors(cooler):
|
||
sensors = {}
|
||
if cooler:
|
||
data = {k: v for k, v, u in cooler.get_status()}
|
||
sensors[LIQUID_SENSOR] = data['Liquid temperature']
|
||
if sys.platform == 'darwin':
|
||
istats_stdout = subprocess.check_output(['istats']).decode('utf-8')
|
||
for line in istats_stdout.split('\n'):
|
||
if line.startswith('CPU'):
|
||
cpu_temp = float(re.search(r'\d+\.\d+', line).group(0))
|
||
sensors['istats.cpu'] = cpu_temp
|
||
break
|
||
elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
|
||
for m, li in psutil.sensors_temperatures().items():
|
||
for label, current, _, _ in li:
|
||
sensors['{}.{}'.format(m, label.lower().replace(' ', '_'))] = current
|
||
return sensors
|
||
|
||
|
||
def show_sensors(cooler):
|
||
print('{:<60} {:>18}'.format('Sensor identifier', 'Temperature'))
|
||
print('-' * 80)
|
||
sensors = read_sensors(cooler)
|
||
for k, v in sensors.items():
|
||
if k == LIQUID_SENSOR:
|
||
k = k + ' [default]'
|
||
print('{:<70} {:>6}{}'.format(k, v, '°C'))
|
||
|
||
|
||
def parse_profile(arg, mintemp, maxtemp, minduty=0, maxduty=100):
|
||
"""Parse, validate and normalize a temperature–duty profile.
|
||
|
||
>>> parse_profile('(20,30),(30,50),(34,80),(40,90)', 0, 60, 25, 100)
|
||
[(20, 30), (30, 50), (34, 80), (40, 90), (60, 100)]
|
||
>>> parse_profile('35', 0, 60, 25, 100)
|
||
[(0, 35), (59, 35), (60, 100)]
|
||
|
||
The profile is validated in structure and acceptable ranges. Duty is
|
||
checked against `minduty` and `maxduty`. Temperature must be between
|
||
`mintemp` and `maxtemp`.
|
||
|
||
>>> parse_profile('(20,30),(50,100', 0, 60, 25, 100)
|
||
Traceback (most recent call last):
|
||
...
|
||
ValueError: Profile must be comma-separated (temperature, duty) tuples
|
||
>>> parse_profile('(20,30),(50,100,2)', 0, 60, 25, 100)
|
||
Traceback (most recent call last):
|
||
...
|
||
ValueError: Profile must be comma-separated (temperature, duty) tuples
|
||
>>> parse_profile('(20,30),(50,97.6)', 0, 60, 25, 100)
|
||
Traceback (most recent call last):
|
||
...
|
||
ValueError: Duty must be integer number between 25 and 100
|
||
>>> parse_profile('(20,15),(50,100)', 0, 60, 25, 100)
|
||
Traceback (most recent call last):
|
||
...
|
||
ValueError: Duty must be integer number between 25 and 100
|
||
>>> parse_profile('(20,30),(70,100)', 0, 60, 25, 100)
|
||
Traceback (most recent call last):
|
||
...
|
||
ValueError: Temperature must be integer number between 0 and 60
|
||
|
||
"""
|
||
try:
|
||
val = ast.literal_eval('[' + arg + ']')
|
||
if len(val) == 1 and isinstance(val[0], int):
|
||
# for arg == '<number>' set fixed duty between mintemp and maxtemp - 1
|
||
val = [(mintemp, val[0]), (maxtemp - 1, val[0])]
|
||
except:
|
||
raise ValueError('profile must be comma-separated (temperature, duty) tuples')
|
||
for step in val:
|
||
if not isinstance(step, tuple) or len(step) != 2:
|
||
raise ValueError('profile must be comma-separated (temperature, duty) tuples')
|
||
temp, duty = step
|
||
if not isinstance(temp, int) or temp < mintemp or temp > maxtemp:
|
||
raise ValueError('temperature must be integer between {} and {}'.format(mintemp, maxtemp))
|
||
if not isinstance(duty, int) or duty < minduty or duty > maxduty:
|
||
raise ValueError('duty must be integer between {} and {}'.format(minduty, maxduty))
|
||
return normalize_profile(val, critx=maxtemp)
|
||
|
||
|
||
def control(cooler, pump_profile, fan_profile, update_interval,
|
||
pump_sensor, fan_sensor):
|
||
LOGGER.info('pump following sensor %s and profile %s', pump_sensor, str(pump_profile))
|
||
LOGGER.info('fan following sensor %s and profile %s', fan_sensor, str(fan_profile))
|
||
while True:
|
||
sensors = read_sensors(cooler)
|
||
LOGGER.info('pump control (%s): %.1f°C, fan control (%s): %.1f°C',
|
||
pump_sensor, sensors[pump_sensor], fan_sensor, sensors[fan_sensor])
|
||
pump_duty = interpolate_profile(pump_profile, sensors[pump_sensor])
|
||
fan_duty = interpolate_profile(fan_profile, sensors[fan_sensor])
|
||
cooler.set_instantaneous_speed('pump', pump_duty)
|
||
cooler.set_instantaneous_speed('fan', fan_duty)
|
||
time.sleep(update_interval)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
if len(sys.argv) == 2 and sys.argv[1] == 'doctest':
|
||
import doctest
|
||
doctest.testmod(verbose=True)
|
||
sys.exit(0)
|
||
|
||
args = docopt(__doc__, version='krakencurve-poc v{}'.format(VERSION))
|
||
|
||
if args['--debug']:
|
||
args['--verbose'] = True
|
||
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(name)s: %(message)s')
|
||
LOGGER.debug('krakencurve-poc v%s', VERSION)
|
||
import liquidctl.version
|
||
LOGGER.debug('liquidctl v%s', liquidctl.version.__version__)
|
||
elif args['--verbose']:
|
||
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
|
||
else:
|
||
logging.basicConfig(level=logging.WARNING, format='%(levelname)s: %(message)s')
|
||
sys.tracebacklimit = 0
|
||
|
||
device = KrakenTwoDriver.find_supported_devices()[0]
|
||
device.connect()
|
||
|
||
try:
|
||
if args['show-sensors']:
|
||
show_sensors(device)
|
||
elif args['control']:
|
||
pump_sensor = args['--pump-sensor'] or LIQUID_SENSOR
|
||
pump_max_temp = 100 if pump_sensor != LIQUID_SENSOR else 60
|
||
fan_sensor = args['--fan-sensor'] or LIQUID_SENSOR
|
||
fan_max_temp = 100 if fan_sensor != LIQUID_SENSOR else 60
|
||
|
||
pump_profile = parse_profile(args['--pump'], 0, pump_max_temp, minduty=50)
|
||
fan_profile = parse_profile(args['--fan'], 0, fan_max_temp, minduty=25)
|
||
|
||
control(device, pump_profile, fan_profile,
|
||
update_interval=int(args['--interval']),
|
||
pump_sensor=pump_sensor,
|
||
fan_sensor=fan_sensor)
|
||
else:
|
||
raise Exception('nothing to do')
|
||
except KeyboardInterrupt:
|
||
LOGGER.info('stopped by user')
|
||
finally:
|
||
device.disconnect()
|