
585 lines
22 KiB

"""liquidctl drivers for Corsair Commander Pro devices.
Supported devices:
- Corsair Commander Pro
- Corsair Lighting Node Pro
Important notes:
- This device currently only has hardware control implemented but it also
supports a software control mode.
- Software control will be enabled at a future time.
Copyright Marshall Asch and contributors
SPDX-License-Identifier: GPL-3.0-or-later
import itertools
import logging
import re
from enum import Enum, unique
from liquidctl.driver.usb import UsbHidDriver
from liquidctl.error import NotSupportedByDevice
from liquidctl.keyval import RuntimeStorage
from liquidctl.util import clamp, fraction_of_byte, u16be_from, u16le_from, \
normalize_profile, check_unsafe, map_direction
_LOGGER = logging.getLogger(__name__)
_CMD_GET_TEMP = 0x11
_FAN_MODE_DC = 0x01
_FAN_MODE_PWM = 0x02
_MAX_FAN_RPM = 5000 # I have no idea if this is a good value or not
_MAX_LEDS = 204
_MODES = {
'off': 0x04, # this is a special case of fixed
'rainbow': 0x00,
'color_shift': 0x01,
'color_pulse': 0x02,
'color_wave': 0x03,
'fixed': 0x04,
# 'temperature': 0x05, # ignore this
'visor': 0x06,
'marquee': 0x07,
'blink': 0x08,
'sequential': 0x09,
'rainbow2': 0x0a,
'dc': _FAN_MODE_DC,
'pwm': _FAN_MODE_PWM,
def _prepare_profile(original, critcalTempature):
clamped = ((temp, clamp(duty, 0, _MAX_FAN_RPM)) for temp, duty in original)
normal = normalize_profile(clamped, critcalTempature, _MAX_FAN_RPM)
missing = _PROFILE_LENGTH - len(normal)
if missing < 0:
raise ValueError(f'too many points in profile (remove {-missing})')
if missing > 0:
normal += missing * [(critcalTempature, _MAX_FAN_RPM)]
return normal
def _quoted(*names):
return ', '.join(map(repr, names))
def _fan_mode_desc(mode):
"""This will convert the fan mode value to a descriptive name.
if mode == _FAN_MODE_DC:
return 'DC'
elif mode == _FAN_MODE_PWM:
return 'PWM'
_LOGGER.warning('unknown fan mode: {mode:#04x}')
return None
class CommanderPro(UsbHidDriver):
"""Corsair Commander Pro LED and fan hub"""
# support for hwmon: corsair-cpro, Linux 5.9
(0x1b1c, 0x0c10, 'Corsair Commander Pro',
{'fan_count': 6, 'temp_probs': 4, 'led_channels': 2}),
(0x1b1c, 0x0c0b, 'Corsair Lighting Node Pro',
{'fan_count': 0, 'temp_probs': 0, 'led_channels': 2}),
(0x1b1c, 0x0c1a, 'Corsair Lighting Node Core',
{'fan_count': 0, 'temp_probs': 0, 'led_channels': 1}),
(0x1b1c, 0x1d00, 'Corsair Obsidian 1000D',
{'fan_count': 6, 'temp_probs': 4, 'led_channels': 2}),
def __init__(self, device, description, fan_count, temp_probs, led_channels, **kwargs):
super().__init__(device, description, **kwargs)
# the following fields are only initialized in connect()
self._data = None
self._fan_names = [f'fan{i+1}' for i in range(fan_count)]
if led_channels == 1:
self._led_names = ['led']
self._led_names = [f'led{i+1}' for i in range(led_channels)]
self._temp_probs = temp_probs
self._fan_count = fan_count
def connect(self, runtime_storage=None, **kwargs):
"""Connect to the device."""
ret = super().connect(**kwargs)
if runtime_storage:
self._data = runtime_storage
ids = f'vid{self.vendor_id:04x}_pid{self.product_id:04x}'
# must use the HID path because there is no serial number; however,
# these can be quite long on Windows and macOS, so only take the
# numbers, since they are likely the only parts that vary between two
# devices of the same model
loc = 'loc' + '_'.join(re.findall(r'\d+', self.address))
self._data = RuntimeStorage(key_prefixes=[ids, loc])
return ret
def _initialize_directly(self, fan_modes: dict, **kwargs):
res = self._send_command(_CMD_GET_FIRMWARE)
fw_version = (res[1], res[2], res[3])
res = self._send_command(_CMD_GET_BOOTLOADER)
bootloader_version = (res[1], res[2]) # is it possible for there to be a third value?
status = [
('Firmware version', '{}.{}.{}'.format(*fw_version), ''),
('Bootloader version', '{}.{}'.format(*bootloader_version), ''),
if self._temp_probs > 0:
res = self._send_command(_CMD_GET_TEMP_CONFIG)
temp_connected = res[1:5]'temp_sensors_connected', temp_connected)
status += [
(f'Temperature probe {i + 1}', bool(temp_connected[i]), '')
for i in range(4)
if self._fan_count > 0:
for i, value in fan_modes.items():
fan_num = int(i) - 1
if value not in ['dc', 'pwm', 'off']:
raise ValueError(f"invalid fan mode: '{value}'")
self._send_command(_CMD_SET_FAN_MODE, [0x02, fan_num, _FAN_MODES.get(value)])
res = self._send_command(_CMD_GET_FAN_MODES)
fanModes = res[1:self._fan_count+1]'fan_modes', fanModes)
status += [
(f'Fan {i + 1} control mode', _fan_mode_desc(fanModes[i]), '')
for i in range(6)
return status
def _get_static_info_from_hwmon(self):
# firmware and bootloader versions are not available through hwmon, but
# we don't want to race with the kernel driver
_LOGGER.warning('some attributes cannot be read from %s kernel driver', self._hwmon.driver)
status = []
if self._temp_probs > 0:
# use ints to mimic how we normally handle the raw data
sensors = [int(self._hwmon.has_attribute(f'temp{n}_input')) for n in range(1, 5)]
_LOGGER.debug('%r', sensors)'temp_sensors_connected', sensors)
for n, connected in zip(range(1, 5), sensors):
status.append((f'Temperature probe {n}', bool(connected), ''))
if self._fan_count > 0:
def hwmon_fan_mode(hwmon, n):
attr = f'fan{n}_label'
if not hwmon.has_attribute(attr):
label = hwmon.get_string(attr)
if label.endswith('4pin'):
return _FAN_MODE_PWM
elif label.endswith('3pin'):
return _FAN_MODE_DC
assert label.endswith('other')
_LOGGER.warning('hwmon reported the fan mode as other')
return None
fans = [hwmon_fan_mode(self._hwmon, n) for n in range(1, 7)]
_LOGGER.debug('%r', fans)'fan_modes', fans)
for n, mode in zip(range(1, 7), fans):
status.append((f'Fan {n} control mode', _fan_mode_desc(mode), ''))
return status
def initialize(self, direct_access=False, fan_modes={}, **kwargs):
"""Initialize the device and the driver.
This method should be called every time the systems boots, resumes from
a suspended state, or if the device has just been (re)connected. In
those scenarios, no other method, except `connect()` or `disconnect()`,
should be called until the device and driver has been (re-)initialized.
Returns None or a list of `(property, value, unit)` tuples, similarly
to `get_status()`.
if self._hwmon and not direct_access:'bound to %s kernel driver, assuming it is already initialized',
if fan_modes:
# say "with" instead of "by" because eventually the driver may start supporting fan
# modes, and we're not actually checking for it at runtime
_LOGGER.warning('fan mode options not supported with %s kernel driver, ignoring',
return self._get_static_info_from_hwmon()
if self._hwmon:
_LOGGER.warning('forcing re-initialization despite %s kernel driver',
return self._initialize_directly(fan_modes)
def _get_status_directly(self):
temp_probes = self._data.load('temp_sensors_connected', default=[0]*self._temp_probs)
fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count)
status = []
# get the temperature sensor values
for i, probe_enabled in enumerate(temp_probes):
if probe_enabled:
temp = self._get_temp(i)
status.append((f'Temperature {i + 1}', temp, '°C'))
# get fan RPMs of connected fans
for i, fan_mode in enumerate(fan_modes):
if fan_mode == _FAN_MODE_DC or fan_mode == _FAN_MODE_PWM:
speed = self._get_fan_rpm(i)
status.append((f'Fan {i + 1} speed', speed, 'rpm'))
# get the real power supply voltages
for i, rail in enumerate(["+12V", "+5V", "+3.3V"]):
raw = self._send_command(_CMD_GET_VOLTS, [i])
voltage = u16be_from(raw, offset=1) / 1000
status.append((f'{rail} rail', voltage, 'V'))
return status
def _get_status_from_hwmon(self):
temp_probes = self._data.load('temp_sensors_connected', default=[0]*self._temp_probs)
fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count)
status = []
# get the temperature sensor values
for i, probe_enabled in enumerate(temp_probes):
if probe_enabled:
n = i + 1
temp = self._hwmon.read_int(f'temp{n}_input') * 1e-3
status.append((f'Temperature {n}', temp, '°C'))
# get fan RPMs of connected fans
for i, fan_mode in enumerate(fan_modes):
if fan_mode == _FAN_MODE_DC or fan_mode == _FAN_MODE_PWM:
n = i + 1
speed = self._hwmon.read_int(f'fan{n}_input')
status.append((f'Fan {n} speed', speed, 'rpm'))
# get the real power supply voltages
for i, rail in enumerate(["+12V", "+5V", "+3.3V"]):
voltage = self._hwmon.read_int(f'in{i}_input') * 1e-3
status.append((f'{rail} rail', voltage, 'V'))
return status
def get_status(self, direct_access=False, **kwargs):
"""Get a status report.
Returns a list of `(property, value, unit)` tuples.
if self._fan_count == 0 or self._temp_probs == 0:
_LOGGER.debug('only Commander Pro and Obsidian 1000D report status')
return []
if self._hwmon and not direct_access:'bound to %s kernel driver, reading status from hwmon', self._hwmon.driver)
return self._get_status_from_hwmon()
if self._hwmon:
_LOGGER.warning('directly reading the status despite %s kernel driver',
return self._get_status_directly()
def _get_temp(self, sensor_num):
"""This will get the temperature in degrees celsius for the specified temp sensor.
sensor number MUST be in range of 0-3
if self._temp_probs == 0:
raise ValueError('this device does not have a temperature sensor')
if sensor_num < 0 or sensor_num > 3:
raise ValueError(f'sensor_num {sensor_num} invalid, must be between 0 and 3')
res = self._send_command(_CMD_GET_TEMP, [sensor_num])
temp = u16be_from(res, offset=1) / 100
return temp
def _get_fan_rpm(self, fan_num):
"""This will get the rpm value of the fan.
fan number MUST be in range of 0-5
if self._fan_count == 0:
raise ValueError('this device does not have any fans')
if fan_num < 0 or fan_num > 5:
raise ValueError(f'fan_num {fan_num} invalid, must be between 0 and 5')
res = self._send_command(_CMD_GET_FAN_RPM, [fan_num])
speed = u16be_from(res, offset=1)
return speed
def _get_hw_fan_channels(self, channel):
"""This will get a list of all the fan channels that the command should be sent to
It will look up the name of the fan channel given and return a list of the real fan number
if channel == 'sync' or len(self._fan_names) == 1:
return list(range(len(self._fan_names)))
if channel in self._fan_names:
return [self._fan_names.index(channel)]
raise ValueError(f'unknown channel, should be one of: {_quoted("sync", *self._fan_names)}')
def _get_hw_led_channels(self, channel):
"""This will get a list of all the led channels that the command should be sent to
It will look up the name of the led channel given and return a list of the real led device number
if channel == 'sync' or len(self._led_names) == 1:
return list(range(len(self._led_names)))
if channel in self._led_names:
return [self._led_names.index(channel)]
raise ValueError(f'unknown channel, should be one of: {_quoted("sync", *self._led_names)}')
def set_fixed_speed(self, channel, duty, **kwargs):
"""Set fan or fans to a fixed speed duty.
Valid channel values are 'fanN', where N >= 1 is the fan number, and
'fan', to simultaneously configure all fans. Unconfigured fan channels
may default to 100% duty.
if self._fan_count == 0:
raise NotSupportedByDevice()
duty = clamp(duty, 0, 100)
fan_channels = self._get_hw_fan_channels(channel)
fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count)
for fan in fan_channels:
mode = fan_modes[fan]
if mode == _FAN_MODE_DC or mode == _FAN_MODE_PWM:
self._send_command(_CMD_SET_FAN_DUTY, [fan, duty])
def set_speed_profile(self, channel, profile, temperature_sensor=1, **kwargs):
"""Set fan or fans to follow a speed profile _in rpm._
Unlike corresponding methods in other drivers, this function operates
with angular speeds in rpm, not duty values in percentage.
Valid channel values are 'fanN', where N >= 1 is the fan number, and
'fan', to simultaneously configure all fans. Unconfigured fan channels
may default to 100% duty.
Up to six (temperature, speed) pairs can be supplied in `profile`,
with temperatures in Celsius and speeds in rpm. The last point should
set the fan to 100% duty cycle, or be omitted; in the latter case the
fan will be heuristically set to max out at 60°C.
# send fan num, temp sensor, check to make sure it is actually enabled, and do not let the user send external sensor
# 6 2-byte big endian temps (celsius * 100), then 6 2-byte big endian rpms
# need to figure out how to find out what the max rpm is for the given fan
if self._fan_count == 0:
raise NotSupportedByDevice()
profile = list(profile)
criticalTemp = _CRITICAL_TEMPERATURE_HIGH if check_unsafe('high_temperature', **kwargs) else _CRITICAL_TEMPERATURE
profile = _prepare_profile(profile, criticalTemp)
# fan_type = kwargs['fan_type'] # need to make sure this is set
temp_sensor = clamp(temperature_sensor, 1, self._temp_probs)
sensors = self._data.load('temp_sensors_connected', default=[0]*self._temp_probs)
if sensors[temp_sensor-1] != 1:
raise ValueError('the specified temperature sensor is not connected')
buf = bytearray(26)
buf[1] = temp_sensor-1 # 0 # use temp sensor 1
for i, entry in enumerate(profile):
temp = entry[0]*100
rpm = entry[1]
# convert both values to 2 byte big endian values
buf[2 + i*2] = temp.to_bytes(2, byteorder='big')[0]
buf[3 + i*2] = temp.to_bytes(2, byteorder='big')[1]
buf[14 + i*2] = rpm.to_bytes(2, byteorder='big')[0]
buf[15 + i*2] = rpm.to_bytes(2, byteorder='big')[1]
fan_channels = self._get_hw_fan_channels(channel)
fan_modes = self._data.load('fan_modes', default=[0]*self._fan_count)
for fan in fan_channels:
mode = fan_modes[fan]
if mode == _FAN_MODE_DC or mode == _FAN_MODE_PWM:
buf[0] = fan
self._send_command(_CMD_SET_FAN_PROFILE, buf)
def set_color(self, channel, mode, colors, direction='forward',
speed='medium', start_led=1, maximum_leds=_MAX_LEDS, **kwargs):
"""Set the color of each LED.
The table bellow summarizes the available channels, modes, and their
associated maximum number of colors for each device family.
| Channel | Mode | Num colors |
| -------- | ----------- | ---------- |
| led | off | 0 |
| led | fixed | 1 |
| led | color_shift | 2 |
| led | color_pulse | 2 |
| led | color_wave | 2 |
| led | visor | 2 |
| led | blink | 2 |
| led | marquee | 1 |
| led | sequential | 1 |
| led | rainbow | 0 |
| led | rainbow2 | 0 |
# a special mode to clear the current led settings.
# this is usefull if the the user wants to use a led mode for multiple devices
if mode == 'clear':'saved_effects', None)
colors = list(colors)
expanded = colors[:3]
c = itertools.chain(*((r, g, b) for r, g, b in expanded))
colors = list(c)
direction = map_direction(direction, _LED_DIRECTION_FORWARD, _LED_DIRECTION_BACKWARD)
speed = _LED_SPEED_SLOW if speed == 'slow' else _LED_SPEED_FAST if speed == 'fast' else _LED_SPEED_MEDIUM
start_led = clamp(start_led, 1, _MAX_LEDS) - 1
num_leds = clamp(maximum_leds, 1, _MAX_LEDS - start_led)
random_colors = 0x00 if mode == 'off' or len(colors) != 0 else 0x01
mode_val = _MODES.get(mode, -1)
if mode_val == -1:
raise ValueError(f'mode "{mode}" is not valid')
# FIXME clears on 'off', while the docs only mention this behavior for 'clear'
saved_effects = [] if mode == 'off' else self._data.load('saved_effects', default=[])
for led_channel in self._get_hw_led_channels(channel):
lighting_effect = {
'channel': led_channel,
'start_led': start_led,
'num_leds': num_leds,
'mode': mode_val,
'speed': speed,
'direction': direction,
'random_colors': random_colors,
'colors': colors
saved_effects += [lighting_effect]
# check to make sure that too many LED effects are not being sent.
# the max seems to be 8 as found here
if len(saved_effects) > 8:
_LOGGER.warning(f'too many lighting effects. Run `liquidctl set {channel} color clear` to reset the effect')
# start sending the led commands
self._send_command(_CMD_RESET_LED_CHANNEL, [led_channel])
self._send_command(_CMD_BEGIN_LED_EFFECT, [led_channel])
self._send_command(_CMD_SET_LED_CHANNEL_STATE, [led_channel, 0x01])
# FIXME clears on 'off', while the docs only mention this behavior for 'clear''saved_effects', None if mode == 'off' else saved_effects)
for effect in saved_effects:
config = [effect.get('channel'),
] + effect.get('colors')
self._send_command(_CMD_LED_EFFECT, config)
self._send_command(_CMD_LED_COMMIT, [0xff])
def _send_command(self, command, data=None):
# self.device.write expects buf[0] to be the report number or 0 if not used
buf = bytearray(_REPORT_LENGTH + 1)
buf[1] = command
start_at = 2
if data:
data = data[:_REPORT_LENGTH-1]
buf[start_at: start_at + len(data)] = data
buf = bytes(
return buf
def set_screen(self, channel, mode, value, **kwargs):
"""Not supported by this device."""
raise NotSupportedByDevice()