liquidctl/tests/test_commander_core.py

301 lines
11 KiB
Python

import pytest
from collections import deque
from liquidctl import ExpectationNotMet
from liquidctl.driver.commander_core import CommanderCore
from _testutils import noop
from liquidctl.util import u16le_from
def int_to_le(num, length=2, byteorder='little', signed=False):
return int(num).to_bytes(length=length, byteorder=byteorder, signed=signed)
class MockCommanderCoreDevice:
def __init__(self):
self.vendor_id = 0x1b1c
self.product_id = 0x0c1c
self.address = 'addr'
self.path = b'path'
self.release_number = None
self.serial_number = None
self.bus = None
self.port = None
self.open = noop
self.close = noop
self.clear_enqueued_reports = noop
self._read = deque()
self.sent = list()
self._last_write = bytes()
self._modes = {}
self._awake = False
self.response_prefix = ()
self.firmware_version = (0x00, 0x00, 0x00)
self.led_counts = (None, None, None, None, None, None, None)
self.speeds_mode = (0, 0, 0, 0, 0, 0, 0)
self.speeds = (None, None, None, None, None, None, None)
self.fixed_speeds = (0, 0, 0, 0, 0, 0, 0)
self.temperatures = (None, None)
def read(self, length):
data = bytearray([0x00, self._last_write[2], 0x00])
data.extend(self.response_prefix)
if self._last_write[2] == 0x02: # Firmware version
for i in range(0, 3):
data.append(self.firmware_version[i])
if self._awake:
if self._last_write[2] == 0x08: # Get data
channel = self._last_write[3]
mode = self._modes.get(channel)
if mode[1] == 0x00:
if mode[0] == 0x17: # Get speeds
data.extend([0x06, 0x00])
data.append(len(self.speeds))
for i in self.speeds:
if i is None:
data.extend([0x00, 0x00])
else:
data.extend(int_to_le(i))
elif mode[0] == 0x1a: # Speed devices connected
data.extend([0x09, 0x00])
data.append(len(self.speeds))
for i in self.speeds:
data.extend([0x01 if i is None else 0x07])
elif mode[0] == 0x20: # LED detect
data.extend([0x0f, 0x00])
data.append(len(self.led_counts))
for i in self.led_counts:
if i is None:
data.extend(int_to_le(3)+int_to_le(0))
else:
data.extend(int_to_le(2))
data.extend(int_to_le(i))
elif mode[0] == 0x21: # Get temperatures
data.extend([0x10, 0x00])
data.append(len(self.temperatures))
for i in self.temperatures:
if i is None:
data.append(1)
data.extend(int_to_le(0))
else:
data.append(0)
data.extend(int_to_le(int(i*10)))
else:
raise NotImplementedError(f'Read for {mode.hex(":")}')
elif mode[1] == 0x6d:
if mode[0] == 0x60:
data.extend([0x03, 0x00])
data.append(len(self.speeds_mode))
for i in self.speeds_mode:
data.append(i)
elif mode[0] == 0x61:
data.extend([0x04, 0x00])
data.append(len(self.fixed_speeds))
for i in self.fixed_speeds:
data.extend(int_to_le(i))
else:
raise NotImplementedError(f'Read for {mode.hex(":")}')
else:
raise NotImplementedError(f'Read for {mode.hex(":")}')
return list(data)[:length]
def write(self, data):
data = bytes(data) # ensure data is convertible to bytes
self._last_write = data
if data[0] != 0x00 or data[1] != 0x08:
raise ValueError('Start of packets going out should be 00:08')
if data[2] == 0x0d:
channel = data[3]
if self._modes.get(channel) is None:
self._modes[channel] = data[4:6]
else:
raise ExpectationNotMet('Previous channel was not reset')
elif data[2] == 0x05 and data[3] == 0x01:
self._modes[data[4]] = None
elif data[2] == 0x01 and data[3] == 0x03 and data[4] == 0x00:
self._awake = data[5] == 0x02
elif self._awake:
if data[2] == 0x06: # Write command
channel = data[3]
mode = self._modes.get(channel)
length = u16le_from(data[4:6])
data_type = data[8:10]
written_data = data[10:8+length]
if mode[1] == 0x6d:
if mode[0] == 0x60 and list(data_type) == [0x03, 0x00]:
self.speeds_mode = tuple(written_data[i+1] for i in range(0, written_data[0]))
elif mode[0] == 0x61 and list(data_type) == [0x04, 0x00]:
self.fixed_speeds = tuple(u16le_from(written_data[i*2+1:i*2+3]) for i in range(0, written_data[0]))
else:
raise NotImplementedError('Invalid Write command')
else:
raise NotImplementedError('Invalid Write command')
return len(data)
@pytest.fixture
def commander_core_device():
device = MockCommanderCoreDevice()
core = CommanderCore(device, 'Corsair Commander Core (experimental)', True)
core.connect()
return core
def test_initialize_commander_core(commander_core_device):
commander_core_device.device.firmware_version = (0x01, 0x02, 0x21)
commander_core_device.device.speeds = (None, 104, None, None, None, None, 918)
commander_core_device.device.led_counts = (27, None, 1, 2, 4, 8, 16)
commander_core_device.device.temperatures = (None, 45.6)
res = commander_core_device.initialize()
assert len(res) == 17
assert res[0][1] == '1.2.33' # Firmware
# LED counts
assert res[1][1] == 27
assert res[2][1] is None
assert res[3][1] == 1
assert res[4][1] == 2
assert res[5][1] == 4
assert res[6][1] == 8
assert res[7][1] == 16
# Speed devices connected
assert not res[8][1]
assert res[9][1]
assert not res[10][1]
assert not res[11][1]
assert not res[12][1]
assert not res[13][1]
assert res[14][1]
# Temperature sensors connected
assert not res[15][1]
assert res[16][1]
# Ensure device is asleep at end
assert not commander_core_device.device._awake
def test_initialize_error_commander_core(commander_core_device):
"""This tests sends invalid data to ensure the device gets set back to hardware mode on error"""
commander_core_device.device.response_prefix = (0x00, 0x00)
with pytest.raises(ExpectationNotMet):
commander_core_device.initialize()
# Ensure device is asleep at end
assert not commander_core_device.device._awake
def test_status_commander_core(commander_core_device):
commander_core_device.device.speeds = (2357, 918, 903, 501, 1104, 1824, 104)
commander_core_device.device.temperatures = (12.3, 45.6)
res = commander_core_device.get_status()
assert len(res) == 9
# Speeds of pump and fans
assert res[0][1] == 2357
assert res[1][1] == 918
assert res[2][1] == 903
assert res[3][1] == 501
assert res[4][1] == 1104
assert res[5][1] == 1824
assert res[6][1] == 104
# Temperatures
assert res[7][1] == 12.3
assert res[8][1] == 45.6
# Ensure device is asleep at end
assert not commander_core_device.device._awake
def test_status_error_commander_core(commander_core_device):
"""This tests sends invalid data to ensure the device gets set back to hardware mode on error"""
commander_core_device.device.response_prefix = (0x00, 0x00)
with pytest.raises(ExpectationNotMet):
commander_core_device.initialize()
# Ensure device is asleep at end
assert not commander_core_device.device._awake
def test_set_fixed_speed_fan2_commander_core(commander_core_device):
"""This tests setting the speed of a single channel"""
commander_core_device.device.speeds_mode = (1, 2, 3, 4, 5, 6, 7)
commander_core_device.device.fixed_speeds = (8, 9, 10, 11, 12, 13, 14)
commander_core_device.set_fixed_speed('fan2', 95)
assert commander_core_device.device.speeds_mode == (1, 2, 0, 4, 5, 6, 7)
assert commander_core_device.device.fixed_speeds == (8, 9, 95, 11, 12, 13, 14)
# Ensure device is asleep at end
assert not commander_core_device.device._awake
def test_set_fixed_speed_fans_commander_core(commander_core_device):
"""This tests setting the speed of all the fans"""
commander_core_device.device.speeds_mode = (1, 2, 3, 4, 5, 6, 7)
commander_core_device.device.fixed_speeds = (8, 9, 10, 11, 12, 13, 14)
commander_core_device.set_fixed_speed('fans', 61)
assert commander_core_device.device.speeds_mode == (1, 0, 0, 0, 0, 0, 0)
assert commander_core_device.device.fixed_speeds == (8, 61, 61, 61, 61, 61, 61)
# Ensure device is asleep at end
assert not commander_core_device.device._awake
def test_set_fixed_speed_error_commander_core(commander_core_device):
"""This tests sends invalid data to ensure the device gets set back to hardware mode on error"""
commander_core_device.device.response_prefix = (0x00, 0x00)
with pytest.raises(ExpectationNotMet):
commander_core_device.set_fixed_speed('fan1', 95)
# Ensure device is asleep at end
assert not commander_core_device.device._awake
def test_parse_channels_commander_core():
"""This test will go through and thoroughly test CommanderCore._parse_channels so we don't have to in other tests"""
core = CommanderCore(MockCommanderCoreDevice(), 'Corsair Commander Core (experimental)', True)
tests = [
('pump', [0]), ('fans', [1, 2, 3, 4, 5, 6]),
('fan1', [1]), ('fan2', [2]), ('fan3', [3]), ('fan4', [4]), ('fan5', [5]), ('fan6', [6])
]
for (val, answer) in tests:
assert list(core._parse_channels(val)) == answer
def test_parse_channels_commander_core_xt():
"""Test Core XT-specific channel layout"""
corext = CommanderCore(MockCommanderCoreDevice(), 'Corsair Commander Core (experimental)', False)
tests = [
('fans', [0, 1, 2, 3, 4, 5]),
('fan1', [0]), ('fan2', [1]), ('fan3', [2]), ('fan4', [3]), ('fan5', [4]), ('fan6', [5])
]
for (val, answer) in tests:
assert list(corext._parse_channels(val)) == answer
def test_parse_channels_error_commander_core():
"""This tests to make sure we get an error with an invalid channel"""
core = CommanderCore(MockCommanderCoreDevice(), 'Corsair Commander Core (experimental)', True)
with pytest.raises(ValueError):
core._parse_channels('fan')