277 lines
8.2 KiB
Python
277 lines
8.2 KiB
Python
import os
|
|
from collections import deque, namedtuple
|
|
from datetime import timedelta
|
|
from enum import Enum, unique
|
|
from tempfile import mkdtemp
|
|
|
|
from liquidctl.driver.base import *
|
|
from liquidctl.keyval import RuntimeStorage, _FilesystemBackend
|
|
from liquidctl.driver.usb import _DEFAULT_TIMEOUT_MS
|
|
|
|
Report = namedtuple('Report', ['number', 'data'])
|
|
|
|
|
|
def noop(*args, **kwargs):
|
|
return None
|
|
|
|
|
|
class MockRuntimeStorage(RuntimeStorage):
|
|
def __init__(self, key_prefixes, backend=None):
|
|
if not backend:
|
|
run_dir = mkdtemp('run_dir')
|
|
backend = _FilesystemBackend(key_prefixes, runtime_dirs=[run_dir])
|
|
super().__init__(key_prefixes, backend)
|
|
|
|
|
|
class MockHidapiDevice:
|
|
def __init__(self, vendor_id=None, product_id=None, release_number=None,
|
|
serial_number=None, bus=None, address=None, path=None):
|
|
self.vendor_id = vendor_id
|
|
self.product_id = product_id
|
|
self.release_number = release_number
|
|
self.serial_number = serial_number
|
|
self.bus = bus
|
|
self.address = address
|
|
self.path = path or b'<placeholder path>'
|
|
self.port = None
|
|
|
|
self.open = noop
|
|
self.close = noop
|
|
self.clear_enqueued_reports = noop
|
|
|
|
self._read = deque()
|
|
self.sent = list()
|
|
|
|
def preload_read(self, report):
|
|
self._read.append(report)
|
|
|
|
def read(self, length, *, timeout=_DEFAULT_TIMEOUT_MS):
|
|
if self._read:
|
|
number, data = self._read.popleft()
|
|
if number:
|
|
return [number] + list(data)[:length]
|
|
else:
|
|
return list(data)[:length]
|
|
return None
|
|
|
|
def write(self, data):
|
|
data = bytes(data) # ensure data is convertible to bytes
|
|
self.sent.append(Report(data[0], list(data[1:])))
|
|
return len(data)
|
|
|
|
def get_feature_report(self, report_id, length):
|
|
if self._read:
|
|
try:
|
|
report = next(filter(lambda x: x.number == report_id, self._read))
|
|
number, data = report
|
|
self._read.remove(report)
|
|
except StopIteration:
|
|
return None
|
|
# length dictates the size of the buffer, and if it's not large
|
|
# enough "ioctl (GFEATURE): Value too large for defined data type"
|
|
# may happen on Linux; see:
|
|
# https://github.com/liquidctl/liquidctl/issues/151#issuecomment-665119675
|
|
assert length >= len(data) + 1, 'buffer not large enough for received report'
|
|
return [number] + list(data)[:length]
|
|
return None
|
|
|
|
def send_feature_report(self, data):
|
|
return self.write(data)
|
|
|
|
|
|
class MockPyusbDevice():
|
|
def __init__(self, vendor_id=None, product_id=None, release_number=None,
|
|
serial_number=None, bus=None, address=None, port=None):
|
|
self.vendor_id = vendor_id
|
|
self.product_id = product_id
|
|
self.release_number = release_number
|
|
self.serial_number = serial_number
|
|
self.bus = bus
|
|
self.address = address
|
|
self.port = port
|
|
|
|
self.open = noop
|
|
self.claim = noop
|
|
self.release = noop
|
|
self.close = noop
|
|
|
|
self._reset_sent()
|
|
|
|
def read(self, endpoint, length, *, timeout=_DEFAULT_TIMEOUT_MS):
|
|
if len(self._responses):
|
|
return self._responses.popleft()
|
|
return [0] * length
|
|
|
|
def write(self, endpoint, data, *, timeout=_DEFAULT_TIMEOUT_MS):
|
|
self._sent_xfers.append(('write', endpoint, data))
|
|
|
|
def ctrl_transfer(self, bmRequestType, bRequest, wValue=0, wIndex=0,
|
|
data_or_wLength=None, timeout=_DEFAULT_TIMEOUT_MS):
|
|
self._sent_xfers.append(('ctrl_transfer', bmRequestType, bRequest,
|
|
wValue, wIndex, data_or_wLength))
|
|
|
|
def _reset_sent(self):
|
|
self._sent_xfers = list()
|
|
self._responses = deque()
|
|
|
|
|
|
VirtualEeprom = namedtuple('VirtualEeprom', ['name', 'data'])
|
|
|
|
|
|
class VirtualSmbus:
|
|
def __init__(self, address_count=256, register_count=256, name='i2c-99',
|
|
description='Virtual', parent_vendor=0xff01, parent_device=0xff02,
|
|
parent_subsystem_vendor=0xff10, parent_subsystem_device=0xff20,
|
|
parent_driver='virtual'):
|
|
|
|
self._open = False
|
|
self._data = [[0] * register_count for _ in range(address_count)]
|
|
|
|
self.name = name
|
|
self.description = description
|
|
self.parent_vendor = parent_vendor
|
|
self.parent_device = parent_device
|
|
self.parent_subsystem_vendor = parent_subsystem_vendor
|
|
self.parent_subsystem_device = parent_subsystem_device
|
|
self.parent_driver = parent_driver
|
|
|
|
def open(self):
|
|
self._open = True
|
|
|
|
def read_byte(self, address):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
return self._data[address][0]
|
|
|
|
def read_byte_data(self, address, register):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
return self._data[address][register]
|
|
|
|
def read_word_data(self, address, register):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
return self._data[address][register]
|
|
|
|
def read_block_data(self, address, register):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
return self._data[address][register]
|
|
|
|
def write_byte(self, address, value):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
self._data[address][0] = value
|
|
|
|
def write_byte_data(self, address, register, value):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
self._data[address][register] = value
|
|
|
|
def write_word_data(self, address, register, value):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
self._data[address][register] = value
|
|
|
|
def write_block_data(self, address, register, data):
|
|
if not self._open:
|
|
raise OSError('closed')
|
|
self._data[address][register] = data
|
|
|
|
def close(self):
|
|
self._open = False
|
|
|
|
def emulate_eeprom_at(self, address, name, data):
|
|
self._data[address] = VirtualEeprom(name, data) # hack
|
|
|
|
def load_eeprom(self, address):
|
|
return self._data[address] # hack
|
|
|
|
|
|
@unique
|
|
class VirtualControlMode(Enum):
|
|
QUIET = 0x0
|
|
BALANCED = 0x1
|
|
EXTREME = 0x2
|
|
|
|
|
|
CallArgs = namedtuple('CallArgs', ['args', 'kwargs'])
|
|
|
|
|
|
class VirtualBusDevice(BaseDriver):
|
|
def __init__(self, *args, **kwargs):
|
|
self.call_args = dict()
|
|
self.call_args['__init__'] = CallArgs(args, kwargs)
|
|
self.connected = False
|
|
|
|
def connect(self, *args, **kwargs):
|
|
self.call_args['connect'] = CallArgs(args, kwargs)
|
|
self.connected = True
|
|
return self
|
|
|
|
def disconnect(self, *args, **kwargs):
|
|
self.call_args['disconnect'] = CallArgs(args, kwargs)
|
|
self.connected = False
|
|
|
|
def initialize(self, *args, **kwargs):
|
|
self.call_args['initialize'] = CallArgs(args, kwargs)
|
|
return [
|
|
('Firmware version', '3.14.16', ''),
|
|
]
|
|
|
|
def get_status(self, *args, **kwargs):
|
|
self.call_args['status'] = CallArgs(args, kwargs)
|
|
return [
|
|
('Temperature', 30.4, '°C'),
|
|
('Fan control mode', VirtualControlMode.QUIET, ''),
|
|
('Animation', None, ''),
|
|
('Uptime', timedelta(hours=18, minutes=23, seconds=12), ''),
|
|
('Hardware mode', True, ''),
|
|
]
|
|
|
|
def set_fixed_speed(self, *args, **kwargs):
|
|
self.call_args['set_fixed_speed'] = CallArgs(args, kwargs)
|
|
|
|
def set_speed_profile(self, *args, **kwargs):
|
|
self.call_args['set_speed_profile'] = CallArgs(args, kwargs)
|
|
|
|
def set_color(self, *args, **kwargs):
|
|
self.call_args['set_color'] = CallArgs(args, kwargs)
|
|
|
|
@property
|
|
def description(self):
|
|
return 'Virtual Bus Device (experimental)'
|
|
|
|
@property
|
|
def vendor_id(self):
|
|
return 0x1234
|
|
|
|
@property
|
|
def product_id(self):
|
|
return 0xabcd
|
|
|
|
@property
|
|
def release_number(self):
|
|
None
|
|
|
|
@property
|
|
def serial_number(self):
|
|
raise OSError()
|
|
|
|
@property
|
|
def bus(self):
|
|
return 'virtual'
|
|
|
|
@property
|
|
def address(self):
|
|
return 'virtual_address'
|
|
|
|
@property
|
|
def port(self):
|
|
return None
|
|
|
|
|
|
class VirtualBus(BaseBus):
|
|
def find_devices(self, **kwargs):
|
|
yield from [VirtualBusDevice()]
|