nzxt-kraken3: add support for expanded hwmon driver capabilities (#529)

* Initial sensor reading support from hwmon for X53 and Z53

* Remove superflous get_status from KrakenZ3

* KrakenX3: read pump duty from hwmon, if available

* Initial support for setting fixed duty through hwmon and directly

* Refactor hwmon_ctrl_mapping info outside of classes

Will fix tests later

* Implement speed profile setting through hwmon; store only hwmon channel no in hwmon_ctrl_mapping

* Initial test fixing

* Test Z3 reading from hwmon, todo reading directly

* Separate X3 and Z3 status reports; add Z3 direct access status report testing

* Fix 1.4.x backward compatibility test

* Add changed in note to kraken 3 guide docs

* Mark Kraken Z3 with h in README

* Note Z3 in guide for hwmon as well

* Add tests for setting fixed duty directly for Kraken X3 and Z3

* Add tests for setting fixed duty through hwmon for Kraken X3 and Z3

* Add test for setting Kraken X3 speed profile directly

* Add test for setting Kraken Z3 speed profile directly

* Check pwmX_enable availability instead for setting speed profiles through hwmon

* Refactor PWM output of test curve; add test for setting Kraken X3 speed profile through hwmon

* Add test for setting speed profiles for Kraken Z3 through hwmon

* Fan has a min duty of 0% on Kraken Z3, update the guide

* Test fan report as well

* Add explanation of why we wait before setting pwmX_enable to 2
This commit is contained in:
Aleksa Savić 2022-11-25 22:21:57 +01:00 committed by GitHub
parent c095479bac
commit 25447b7464
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 459 additions and 37 deletions

View File

@ -125,7 +125,7 @@ subjective "from more to less liquid control-ly" order.
| AIO liquid cooler | [NZXT Kraken X31, X41, X61](docs/asetek-690lc-guide.md) | USB | <sup>_LZ_</sup> | 1.9.1 |
| AIO liquid cooler | [NZXT Kraken X42, X52, X62, X72](docs/kraken-x2-m2-guide.md) | USB HID | <sup>_h_</sup> | 1.11.1 |
| AIO liquid cooler | [NZXT Kraken X53, X63, X73](docs/kraken-x3-z3-guide.md) | USB HID | <sup>_h_</sup> | 1.11.1 |
| AIO liquid cooler | [NZXT Kraken Z53, Z63, Z73](docs/kraken-x3-z3-guide.md) | USB & USB HID | <sup>_e_</sup> | 1.11.1 |
| AIO liquid cooler | [NZXT Kraken Z53, Z63, Z73](docs/kraken-x3-z3-guide.md) | USB & USB HID | <sup>_he_</sup> | 1.11.1 |
| Pump controller | [Aquacomputer D5 Next](docs/aquacomputer-d5next-guide.md) | USB HID | <sup>_ehp_</sup> | 1.11.1 |
| Fan/LED controller | [Aquacomputer Octo](docs/aquacomputer-octo-guide.md) | USB HID | <sup>_ehp_</sup> | 1.11.1 |
| Fan/LED controller | [Aquacomputer Quadro](docs/aquacomputer-quadro-guide.md) | USB HID | <sup>_ehp_</sup> | 1.11.1 |

View File

@ -74,9 +74,9 @@ Fixed speeds can be set by specifying the desired channel and duty value.
```
| Channel | Minimum duty | Maximum duty | X models | Z models |
| --- | --- | --- | :---: | :---: |
| --- | -- | --- | :---: | :---: |
| `pump` | 20% | 100% | ✓ | ✓ |
| `fan` | 20% | 100% | | ✓ |
| `fan` | 0% | 100% | | ✓ |
For profiles, one or more temperatureduty pairs are supplied instead of single value.
@ -174,16 +174,14 @@ Images and GiFs are automatically resized and rotated to match the device orient
[Linux hwmon]: #interaction-with-linux-hwmon-drivers
_New in 1.9.0._<br>
_Changed in git: expanded support for reading and writing through hwmon._<br>
Kraken X3 devices feature incomplete support by the [liquidtux] `nzxt-kraken3`
driver, and partial status data is provided through a standard hwmon sysfs
interface.
_As of February 2022, the driver is too limited for liquidctl to use; still..._
Kraken X3 and Z3 devices feature support by the [liquidtux] `nzxt-kraken3` driver,
and status data is provided through a standard hwmon sysfs interface.
Starting with version 1.9.0, liquidctl automatically detects when a kernel
driver is bound to the device and, whenever possible, uses it instead of
directly accessing the device. Alternatively, direct access to the device can
directly accessing the device. Alternatively, direct access to the device can
be forced with `--direct-access`.
[liquidtux]: https://github.com/liquidctl/liquidtux

View File

@ -15,6 +15,7 @@ import io
import math
import logging
import sys
import time
from PIL import Image, ImageSequence
@ -73,6 +74,10 @@ _COLOR_CHANNELS_KRAKENZ = {
"external": 0b001,
}
_HWMON_CTRL_MAPPING_KRAKENX = {"pump": 1}
_HWMON_CTRL_MAPPING_KRAKENZ = {"pump": 1, "fan": 2}
# Available LED channel modes/animations
# name -> (mode, size/variant, speed scale, min colors, max colors)
# FIXME any point in a one-color *alternating* or tai-chi animations?
@ -176,6 +181,7 @@ class KrakenX3(UsbHidDriver):
{
"speed_channels": _SPEED_CHANNELS_KRAKENX,
"color_channels": _COLOR_CHANNELS_KRAKENX,
"hwmon_ctrl_mapping": _HWMON_CTRL_MAPPING_KRAKENX,
},
),
(
@ -185,14 +191,18 @@ class KrakenX3(UsbHidDriver):
{
"speed_channels": _SPEED_CHANNELS_KRAKENX,
"color_channels": _COLOR_CHANNELS_KRAKENX,
"hwmon_ctrl_mapping": _HWMON_CTRL_MAPPING_KRAKENX,
},
),
]
def __init__(self, device, description, speed_channels, color_channels, **kwargs):
def __init__(
self, device, description, speed_channels, color_channels, hwmon_ctrl_mapping, **kwargs
):
super().__init__(device, description)
self._speed_channels = speed_channels
self._color_channels = color_channels
self._hwmon_ctrl_mapping = hwmon_ctrl_mapping
def initialize(self, direct_access=False, **kwargs):
"""Initialize the device and the driver.
@ -272,30 +282,34 @@ class KrakenX3(UsbHidDriver):
]
def _get_status_from_hwmon(self):
return [
status_readings = [
(_STATUS_TEMPERATURE, self._hwmon.read_int("temp1_input") * 1e-3, "°C"),
(_STATUS_PUMP_SPEED, self._hwmon.read_int("fan1_input"), "rpm"),
(_STATUS_PUMP_DUTY, self._hwmon.read_int("pwm1") * 100.0 / 255, "%"),
]
if self._hwmon.has_attribute("pwm1"):
status_readings.append(
(_STATUS_PUMP_DUTY, self._hwmon.read_int("pwm1") * 100.0 / 255, "%")
)
else:
# An older version of the kernel driver only exposed coolant temp and pump speed
_LOGGER.warning("pump duty cannot be read from %s kernel driver", self._hwmon.driver)
return status_readings
def get_status(self, direct_access=False, **kwargs):
"""Get a status report.
Returns a list of `(property, value, unit)` tuples.
"""
# no driver currently supports pwm1, so silently fallback to
# direct mode if it isn't available; for the same reason, also don't
# yet issue a warning when directly accessing the device
if self._hwmon and not direct_access and self._hwmon.has_attribute("pwm1"):
if self._hwmon and not direct_access:
_LOGGER.info("bound to %s kernel driver, reading status from hwmon", self._hwmon.driver)
return self._get_status_from_hwmon()
if self._hwmon:
level = logging.WARNING if direct_access else logging.INFO
_LOGGER.log(
level, "directly reading the status despite %s kernel driver", self._hwmon.driver
_LOGGER.warning(
"directly reading the status despite %s kernel driver", self._hwmon.driver
)
return self._get_status_directly()
@ -324,7 +338,26 @@ class KrakenX3(UsbHidDriver):
sval = _ANIMATION_SPEEDS[speed]
self._write_colors(cid, mode, colors, sval, direction)
def set_speed_profile(self, channel, profile, **kwargs):
def _set_speed_profile_hwmon(self, channel, interp):
hwmon_ctrl_channel = self._hwmon_ctrl_mapping[channel]
# Write duty curve for channel
for idx, duty in enumerate(interp):
pwm_duty = duty * 255 // 100
self._hwmon.write_int(f"temp{hwmon_ctrl_channel}_auto_point{idx + 1}_pwm", pwm_duty)
# The device can get confused when hammered with HID reports, which can happen when
# we set all curve points (done above) through the kernel driver, when the device
# is in curve mode. In that case, the driver sends a report for each point value change
# to update it. We send the whole curve to the device again by setting pwmX_enable to 2,
# regardless of what it was, to ensure that the curve is properly applied. Wait just for
# a bit to ensure that goes through
time.sleep(0.2)
# Set channel to curve mode
self._hwmon.write_int(f"pwm{hwmon_ctrl_channel}_enable", 2)
def set_speed_profile(self, channel, profile, direct_access=False, **kwargs):
"""Set channel to use a speed duty profile."""
cid, dmin, dmax = self._speed_channels[channel]
@ -336,12 +369,78 @@ class KrakenX3(UsbHidDriver):
_LOGGER.info(
"setting %s PWM duty to %d%% for liquid temperature >= %d°C", channel, duty, temp
)
self._write(header + interp)
def set_fixed_speed(self, channel, duty, **kwargs):
if self._hwmon:
hwmon_pwm_enable_name = f"pwm{self._hwmon_ctrl_mapping[channel]}_enable"
# Check if the required attribute is present
if self._hwmon.has_attribute(hwmon_pwm_enable_name):
# It is, and if we have to use direct access, warn that we are sidestepping the kernel driver
if direct_access:
_LOGGER.warning(
"directly writing duty curve despite %s kernel driver having support",
self._hwmon.driver,
)
return self._write(header + interp)
_LOGGER.info(
"bound to %s kernel driver, writing duty curve to hwmon", self._hwmon.driver
)
return self._set_speed_profile_hwmon(channel, interp)
elif not direct_access:
_LOGGER.warning(
"required duty curve functionality is not available in %s kernel driver, falling back to direct access",
self._hwmon.driver,
)
return self._write(header + interp)
def _set_fixed_speed_directly(self, channel, duty):
self.set_speed_profile(channel, [(0, duty), (_CRITICAL_TEMPERATURE - 1, duty)], True)
def _set_fixed_speed_hwmon(self, channel, duty):
hwmon_pwm_name = f"pwm{self._hwmon_ctrl_mapping[channel]}"
hwmon_pwm_enable_name = f"{hwmon_pwm_name}_enable"
# Convert duty from percent to PWM range (0-255)
pwm_duty = duty * 255 // 100
# Write duty to hwmon
self._hwmon.write_int(hwmon_pwm_name, pwm_duty)
# Set channel to direct percent mode
self._hwmon.write_int(hwmon_pwm_enable_name, 1)
def set_fixed_speed(self, channel, duty, direct_access=False, **kwargs):
"""Set channel to a fixed speed duty."""
self.set_speed_profile(channel, [(0, duty), (_CRITICAL_TEMPERATURE - 1, duty)])
if self._hwmon:
_, dmin, dmax = self._speed_channels[channel]
duty = clamp(duty, dmin, dmax)
hwmon_pwm_name = f"pwm{self._hwmon_ctrl_mapping[channel]}"
# Check if the required attribute is present
if self._hwmon.has_attribute(hwmon_pwm_name):
# It is, and if we have to use direct access, warn that we are sidestepping the kernel driver
if direct_access:
_LOGGER.warning(
"directly writing fixed speed despite %s kernel driver having support",
self._hwmon.driver,
)
return self._set_fixed_speed_directly(channel, duty)
_LOGGER.info(
"bound to %s kernel driver, writing fixed speed to hwmon", self._hwmon.driver
)
return self._set_fixed_speed_hwmon(channel, duty)
elif not direct_access:
_LOGGER.warning(
"required PWM functionality is not available in %s kernel driver, falling back to direct access",
self._hwmon.driver,
)
return self._set_fixed_speed_directly(channel, duty)
def _read(self):
data = self.device.read(_READ_LENGTH)
@ -450,6 +549,7 @@ class KrakenZ3(KrakenX3):
{
"speed_channels": _SPEED_CHANNELS_KRAKENZ,
"color_channels": _COLOR_CHANNELS_KRAKENZ,
"hwmon_ctrl_mapping": _HWMON_CTRL_MAPPING_KRAKENZ,
},
)
]
@ -543,15 +643,13 @@ class KrakenZ3(KrakenX3):
self._status.append(("LCD Brightness", self.brightness, "%"))
self._status.append(("LCD Orientation", self.orientation * 90, "°"))
def get_status(self, **kwargs):
"""Get a status report.
Returns a list of `(property, value, unit)` tuples.
"""
def _get_status_directly(self):
self.device.clear_enqueued_reports()
self._write([0x74, 0x01])
msg = self._read()
if msg[15:17] == [0xFF, 0xFF]:
_LOGGER.warning("unexpected temperature reading, possible firmware fault;")
_LOGGER.warning("try resetting the device or updating the firmware")
return [
(_STATUS_TEMPERATURE, msg[15] + msg[16] / 10, "°C"),
(_STATUS_PUMP_SPEED, msg[18] << 8 | msg[17], "rpm"),
@ -560,6 +658,15 @@ class KrakenZ3(KrakenX3):
(_STATUS_FAN_DUTY, msg[25], "%"),
]
def _get_status_from_hwmon(self):
return [
(_STATUS_TEMPERATURE, self._hwmon.read_int("temp1_input") * 1e-3, "°C"),
(_STATUS_PUMP_SPEED, self._hwmon.read_int("fan1_input"), "rpm"),
(_STATUS_PUMP_DUTY, self._hwmon.read_int("pwm1") * 100.0 / 255, "%"),
(_STATUS_FAN_SPEED, self._hwmon.read_int("fan2_input"), "rpm"),
(_STATUS_FAN_DUTY, self._hwmon.read_int("pwm2") * 100.0 / 255, "%"),
]
def _read_until_first_match(self, parsers):
for _ in range(_MAX_READ_ATTEMPTS):
msg = self._read()

View File

@ -50,16 +50,19 @@ def test_kraken3_backwards_modes_are_deprecated(caplog):
from liquidctl.driver.kraken3 import KrakenX3
from liquidctl.driver.kraken3 import _COLOR_CHANNELS_KRAKENX
from liquidctl.driver.kraken3 import _SPEED_CHANNELS_KRAKENX
from liquidctl.driver.kraken3 import _HWMON_CTRL_MAPPING_KRAKENX
for mode in modes:
base_mode = mode.replace('backwards-', '')
old = KrakenX3(MockHidapiDevice(), 'Mock X63',
speed_channels=_SPEED_CHANNELS_KRAKENX,
color_channels=_COLOR_CHANNELS_KRAKENX)
color_channels=_COLOR_CHANNELS_KRAKENX,
hwmon_ctrl_mapping=_HWMON_CTRL_MAPPING_KRAKENX)
new = KrakenX3(MockHidapiDevice(), 'Mock X63',
speed_channels=_SPEED_CHANNELS_KRAKENX,
color_channels=_COLOR_CHANNELS_KRAKENX)
color_channels=_COLOR_CHANNELS_KRAKENX,
hwmon_ctrl_mapping=_HWMON_CTRL_MAPPING_KRAKENX)
colors = [RADICAL_RED, MOUNTAIN_MEADOW]

View File

@ -12,6 +12,8 @@ from liquidctl.driver.kraken3 import (
_SPEED_CHANNELS_KRAKENX,
_COLOR_CHANNELS_KRAKENZ,
_SPEED_CHANNELS_KRAKENZ,
_HWMON_CTRL_MAPPING_KRAKENX,
_HWMON_CTRL_MAPPING_KRAKENZ,
)
from test_krakenz3_response import krakenz3_response
@ -20,16 +22,64 @@ from liquidctl.util import Hue2Accessory
# https://github.com/liquidctl/liquidctl/issues/160#issuecomment-664044103
SAMPLE_STATUS = bytes.fromhex(
X3_SAMPLE_STATUS = bytes.fromhex(
"7502200036000B51535834353320012101A80635350000000000000000000000"
"0000000000000000000000000000000000000000000000000000000000000000"
)
# https://github.com/liquidctl/liquidctl/issues/160#issue-665781804
FAULTY_STATUS = bytes.fromhex(
X3_FAULTY_STATUS = bytes.fromhex(
"7502200036000B5153583435332001FFFFCC0A64640000000000000000000000"
"0000000000000000000000000000000000000000000000000000000000000000"
)
Z3_SAMPLE_STATUS = bytes.fromhex(
"75012E0018001051393434363731011803690314140102000000000000000000"
"0000000000000000000000000000000000000000000000000000000000000000"
)
test_curve_final_pwm = [
30,
32,
34,
36,
38,
40,
42,
44,
46,
48,
50,
58,
65,
72,
80,
82,
83,
85,
87,
88,
90,
91,
92,
93,
94,
95,
96,
97,
98,
99,
100,
100,
100,
100,
100,
100,
100,
100,
100,
100,
]
@pytest.fixture
def mock_krakenx3():
@ -39,6 +89,7 @@ def mock_krakenx3():
"Mock Kraken X73",
speed_channels=_SPEED_CHANNELS_KRAKENX,
color_channels=_COLOR_CHANNELS_KRAKENX,
hwmon_ctrl_mapping=_HWMON_CTRL_MAPPING_KRAKENX,
)
dev.connect()
@ -53,6 +104,7 @@ def mock_krakenz3():
"Mock Kraken Z73",
speed_channels=_SPEED_CHANNELS_KRAKENZ,
color_channels=_COLOR_CHANNELS_KRAKENZ,
hwmon_ctrl_mapping=_HWMON_CTRL_MAPPING_KRAKENZ,
)
dev.connect()
@ -162,7 +214,7 @@ def test_krakenx3_reads_status_directly(mock_krakenx3, has_hwmon, direct_access)
if has_hwmon:
mock_krakenx3._hwmon = HwmonDevice(None, None)
mock_krakenx3.device.preload_read(Report(0, SAMPLE_STATUS))
mock_krakenx3.device.preload_read(Report(0, X3_SAMPLE_STATUS))
temperature, pump_speed, pump_duty = mock_krakenx3.get_status(direct_access=direct_access)
@ -184,12 +236,110 @@ def test_krakenx3_reads_status_from_hwmon(mock_krakenx3, tmp_path):
assert pump_duty == ("Pump duty", pytest.approx(53, rel=1.0 / 255), "%")
@pytest.mark.parametrize("has_hwmon,direct_access", [(False, False), (True, True)])
def test_krakenx3_set_fixed_speeds_directly(mock_krakenx3, has_hwmon, direct_access, tmp_path):
"""For both test cases only direct access should be used"""
if has_hwmon:
mock_krakenx3._hwmon = HwmonDevice("mock_module", tmp_path)
(tmp_path / "pwm1").write_text("0")
(tmp_path / "pwm1_enable").write_text("0")
mock_krakenx3.set_fixed_speed("pump", 84, direct_access=direct_access)
pump_report = mock_krakenx3.device.sent[0]
assert pump_report.number == 0x72
assert pump_report.data[3:43] == [84 for i in range(0, 39)] + [100]
# Assert that hwmon wasn't touched
if has_hwmon:
assert (tmp_path / "pwm1_enable").read_text() == "0"
assert (tmp_path / "pwm1").read_text() == "0"
@pytest.mark.parametrize("has_support", [False, True])
def test_krakenx3_set_fixed_speeds_hwmon(mock_krakenx3, has_support, tmp_path):
mock_krakenx3._hwmon = HwmonDevice("mock_module", tmp_path)
if has_support:
(tmp_path / "pwm1").write_text("0\n")
(tmp_path / "pwm1_enable").write_text("0\n")
mock_krakenx3.set_fixed_speed("pump", 84)
if has_support:
assert (tmp_path / "pwm1_enable").read_text() == "1"
assert (tmp_path / "pwm1").read_text() == "214"
else:
# Assert fallback to direct access
pump_report = mock_krakenx3.device.sent[0]
assert pump_report.number == 0x72
assert pump_report.data[3:43] == [84 for i in range(0, 39)] + [100]
@pytest.mark.parametrize("has_hwmon,direct_access", [(False, False), (True, True)])
def test_krakenx3_set_speed_profile_directly(mock_krakenx3, has_hwmon, direct_access, tmp_path):
"""For both test cases only direct access should be used"""
if has_hwmon:
mock_krakenx3._hwmon = HwmonDevice("mock_module", tmp_path)
(tmp_path / "pwm1").write_text("0")
(tmp_path / "pwm1_enable").write_text("0")
for i in range(1, 40 + 1):
(tmp_path / f"temp1_auto_point{i}_pwm").write_text("0")
curve_profile = zip([20, 30, 34, 40, 50], [30, 50, 80, 90, 100])
mock_krakenx3.set_speed_profile("pump", curve_profile, direct_access=direct_access)
pump_report = mock_krakenx3.device.sent[0]
assert pump_report.number == 0x72
assert pump_report.data[3:43] == test_curve_final_pwm
# Assert that hwmon wasn't touched
if has_hwmon:
assert (tmp_path / "pwm1_enable").read_text() == "0"
assert (tmp_path / "pwm1").read_text() == "0"
for i in range(1, 40):
assert (tmp_path / f"temp1_auto_point{i}_pwm").read_text() == "0"
@pytest.mark.parametrize("has_support", [False, True])
def test_krakenx3_set_speed_profile_hwmon(mock_krakenx3, has_support, tmp_path):
mock_krakenx3._hwmon = HwmonDevice("mock_module", tmp_path)
if has_support:
(tmp_path / "pwm1_enable").write_text("0\n")
for i in range(1, 40 + 1):
(tmp_path / f"temp1_auto_point{i}_pwm").write_text("0")
curve_profile = zip([20, 30, 34, 40, 50], [30, 50, 80, 90, 100])
mock_krakenx3.set_speed_profile("pump", curve_profile)
if has_support:
assert (tmp_path / "pwm1_enable").read_text() == "2"
for i in range(1, 40 + 1):
assert int((tmp_path / f"temp1_auto_point{i}_pwm").read_text()) == (
test_curve_final_pwm[i - 1] * 255 // 100
)
else:
# Assert fallback to direct access
pump_report = mock_krakenx3.device.sent[0]
assert pump_report.number == 0x72
assert pump_report.data[3:43] == test_curve_final_pwm
@pytest.mark.parametrize("has_hwmon,direct_access", [(False, False), (True, True)])
def test_krakenx3_warns_on_faulty_temperature(mock_krakenx3, has_hwmon, direct_access, caplog):
if has_hwmon:
mock_krakenx3._hwmon = HwmonDevice(None, None)
mock_krakenx3.device.preload_read(Report(0, FAULTY_STATUS))
mock_krakenx3.device.preload_read(Report(0, X3_FAULTY_STATUS))
_ = mock_krakenx3.get_status(direct_access=direct_access)
assert "unexpected temperature reading" in caplog.text
@ -203,10 +353,174 @@ def test_krakenx3_not_totally_broken(mock_krakenx3):
mock_krakenx3.set_fixed_speed(channel="pump", duty=50)
@pytest.mark.parametrize("has_hwmon,direct_access", [(False, False), (True, True)])
def test_krakenz3_reads_status_directly(mock_krakenz3, has_hwmon, direct_access):
if has_hwmon:
mock_krakenz3._hwmon = HwmonDevice(None, None)
mock_krakenz3.device.preload_read(Report(0, Z3_SAMPLE_STATUS))
temperature, pump_speed, pump_duty, fan_speed, fan_duty = mock_krakenz3.get_status(
direct_access=direct_access
)
assert temperature == ("Liquid temperature", 24.3, "°C")
assert pump_speed == ("Pump speed", 873, "rpm")
assert pump_duty == ("Pump duty", 20, "%")
assert fan_speed == ("Fan speed", 0, "rpm")
assert fan_duty == ("Fan duty", 0, "%")
def test_krakenz3_reads_status_from_hwmon(mock_krakenz3, tmp_path):
mock_krakenz3._hwmon = HwmonDevice("mock_module", tmp_path)
(tmp_path / "temp1_input").write_text("33100\n")
(tmp_path / "fan1_input").write_text("1704\n")
(tmp_path / "pwm1").write_text("135\n")
(tmp_path / "fan2_input").write_text("1704\n")
(tmp_path / "pwm2").write_text("135\n")
temperature, pump_speed, pump_duty, fan_speed, fan_duty = mock_krakenz3.get_status()
assert temperature == ("Liquid temperature", 33.1, "°C")
assert pump_speed == ("Pump speed", 1704, "rpm")
assert pump_duty == ("Pump duty", pytest.approx(53, rel=1.0 / 255), "%")
assert fan_speed == ("Fan speed", 1704, "rpm")
assert fan_duty == ("Fan duty", pytest.approx(53, rel=1.0 / 255), "%")
@pytest.mark.parametrize("has_hwmon,direct_access", [(False, False), (True, True)])
def test_krakenz3_set_fixed_speeds_directly(mock_krakenz3, has_hwmon, direct_access, tmp_path):
"""For both test cases only direct access should be used"""
if has_hwmon:
mock_krakenz3._hwmon = HwmonDevice("mock_module", tmp_path)
(tmp_path / "pwm1").write_text("0")
(tmp_path / "pwm1_enable").write_text("0")
(tmp_path / "pwm2").write_text("0")
(tmp_path / "pwm2_enable").write_text("0")
mock_krakenz3.set_fixed_speed("pump", 84, direct_access=direct_access)
mock_krakenz3.set_fixed_speed("fan", 50, direct_access=direct_access)
pump_report, fan_report = mock_krakenz3.device.sent
assert pump_report.number == 0x72
assert pump_report.data[3:43] == [84 for i in range(0, 39)] + [100]
assert fan_report.number == 0x72
assert fan_report.data[3:43] == [50 for i in range(0, 39)] + [100]
# Assert that hwmon wasn't touched
if has_hwmon:
assert (tmp_path / "pwm1_enable").read_text() == "0"
assert (tmp_path / "pwm1").read_text() == "0"
assert (tmp_path / "pwm2_enable").read_text() == "0"
assert (tmp_path / "pwm2").read_text() == "0"
@pytest.mark.parametrize("has_support", [False, True])
def test_krakenz3_set_fixed_speeds_hwmon(mock_krakenz3, has_support, tmp_path):
mock_krakenz3._hwmon = HwmonDevice("mock_module", tmp_path)
if has_support:
(tmp_path / "pwm1").write_text("0\n")
(tmp_path / "pwm1_enable").write_text("0\n")
(tmp_path / "pwm2").write_text("0\n")
(tmp_path / "pwm2_enable").write_text("0\n")
mock_krakenz3.set_fixed_speed("pump", 84)
mock_krakenz3.set_fixed_speed("fan", 50)
if has_support:
assert (tmp_path / "pwm1_enable").read_text() == "1"
assert (tmp_path / "pwm1").read_text() == "214"
assert (tmp_path / "pwm2_enable").read_text() == "1"
assert (tmp_path / "pwm2").read_text() == "127"
else:
# Assert fallback to direct access
pump_report, fan_report = mock_krakenz3.device.sent
assert pump_report.number == 0x72
assert pump_report.data[3:43] == [84 for i in range(0, 39)] + [100]
assert fan_report.number == 0x72
assert fan_report.data[3:43] == [50 for i in range(0, 39)] + [100]
@pytest.mark.parametrize("has_hwmon,direct_access", [(False, False), (True, True)])
def test_krakenz3_set_speed_profile_directly(mock_krakenz3, has_hwmon, direct_access, tmp_path):
"""For both test cases only direct access should be used"""
if has_hwmon:
mock_krakenz3._hwmon = HwmonDevice("mock_module", tmp_path)
(tmp_path / "pwm1").write_text("0")
(tmp_path / "pwm1_enable").write_text("0")
(tmp_path / "pwm2").write_text("0")
(tmp_path / "pwm2_enable").write_text("0")
for i in range(1, 40 + 1):
(tmp_path / f"temp1_auto_point{i}_pwm").write_text("0")
(tmp_path / f"temp2_auto_point{i}_pwm").write_text("0")
mock_krakenz3.set_speed_profile(
"pump", zip([20, 30, 34, 40, 50], [30, 50, 80, 90, 100]), direct_access=direct_access
)
mock_krakenz3.set_speed_profile(
"fan", zip([20, 30, 34, 40, 50], [30, 50, 80, 90, 100]), direct_access=direct_access
)
pump_report, fan_report = mock_krakenz3.device.sent
assert pump_report.number == 0x72
assert pump_report.data[3:43] == test_curve_final_pwm
assert fan_report.number == 0x72
assert fan_report.data[3:43] == test_curve_final_pwm
# Assert that hwmon wasn't touched
if has_hwmon:
assert (tmp_path / "pwm1_enable").read_text() == "0"
assert (tmp_path / "pwm1").read_text() == "0"
assert (tmp_path / "pwm2_enable").read_text() == "0"
assert (tmp_path / "pwm2").read_text() == "0"
for i in range(1, 40):
assert (tmp_path / f"temp1_auto_point{i}_pwm").read_text() == "0"
assert (tmp_path / f"temp2_auto_point{i}_pwm").read_text() == "0"
@pytest.mark.parametrize("has_support", [False, True])
def test_krakenz3_set_speed_profile_hwmon(mock_krakenz3, has_support, tmp_path):
mock_krakenz3._hwmon = HwmonDevice("mock_module", tmp_path)
if has_support:
(tmp_path / "pwm1_enable").write_text("0\n")
(tmp_path / "pwm2_enable").write_text("0\n")
for i in range(1, 40 + 1):
(tmp_path / f"temp1_auto_point{i}_pwm").write_text("0")
(tmp_path / f"temp2_auto_point{i}_pwm").write_text("0")
mock_krakenz3.set_speed_profile("pump", zip([20, 30, 34, 40, 50], [30, 50, 80, 90, 100]))
mock_krakenz3.set_speed_profile("fan", zip([20, 30, 34, 40, 50], [30, 50, 80, 90, 100]))
if has_support:
assert (tmp_path / "pwm1_enable").read_text() == "2"
for i in range(1, 40 + 1):
assert int((tmp_path / f"temp1_auto_point{i}_pwm").read_text()) == (
test_curve_final_pwm[i - 1] * 255 // 100
)
assert int((tmp_path / f"temp2_auto_point{i}_pwm").read_text()) == (
test_curve_final_pwm[i - 1] * 255 // 100
)
else:
# Assert fallback to direct access
pump_report, fan_report = mock_krakenz3.device.sent
assert pump_report.number == 0x72
assert pump_report.data[3:43] == test_curve_final_pwm
assert fan_report.number == 0x72
assert fan_report.data[3:43] == test_curve_final_pwm
def test_krakenz3_not_totally_broken(mock_krakenz3):
"""Reasonable example calls to untested APIs do not raise exceptions."""
mock_krakenz3.initialize()
mock_krakenz3.device.preload_read(Report(0, SAMPLE_STATUS))
mock_krakenz3.device.preload_read(Report(0, Z3_SAMPLE_STATUS))
_ = mock_krakenz3.get_status()
mock_krakenz3.set_speed_profile(channel="fan", profile=iter([(20, 20), (30, 50), (40, 100)]))
mock_krakenz3.set_fixed_speed(channel="pump", duty=50)