pyproject: enable 'preview' ruff rules

Now that we gate our ruff updates via the tasks container, we can afford
to be a bit more future-oriented here.

This brings in a nice raft of fix-ups, including my personal favourite:

  RUF003 Comment contains ambiguous `µ` (MICRO SIGN). Did you mean `μ` (GREEK SMALL LETTER MU)?

Sure... why not?

The majority of fixes here are from a single rule suggesting to rewrite

  a and b or c

as

  (a and b) or c

But in most (but not all) cases in our code we actually want to write
that as

  b if a else c

I've also added some type annotations to help me make the right choice
in places where it wasn't immediately clear.
This commit is contained in:
Allison Karlitskaya 2024-04-15 20:10:50 +02:00 committed by Jelle van der Waa
parent f1218af1ca
commit 5b6c1ce8ae
18 changed files with 52 additions and 52 deletions

View File

@ -61,7 +61,7 @@ class Inotify:
buf = os.read(self.fd, 4096)
pos = 0
while pos < len(buf):
(wd, mask, cookie, name_len) = struct.unpack('iIII', buf[pos:pos + 16])
(wd, mask, _cookie, name_len) = struct.unpack('iIII', buf[pos:pos + 16])
pos += 16
(name,) = struct.unpack('%ds' % name_len, buf[pos:pos + name_len])
pos += name_len

View File

@ -141,7 +141,7 @@ def monitor():
fstab_watcher = Watcher("/etc/fstab")
mtab_file = open("/proc/self/mounts", "r")
while True:
(r, w, x) = select.select([fstab_watcher.inotify.fd], [], [mtab_file])
r, _w, x = select.select([fstab_watcher.inotify.fd], [], [mtab_file])
if mtab_file in x:
process_mtab()
report()

View File

@ -32,6 +32,7 @@ exclude = [
"node_modules/",
]
line-length = 118
preview = true
src = []
[tool.ruff.lint]

View File

@ -303,7 +303,7 @@ async def run(args) -> None:
# only patch the packages line if we are in beiboot mode
if bridge.packages:
message['packages'] = {p: None for p in bridge.packages.packages}
message['packages'] = dict.fromkeys(bridge.packages.packages)
bridge.write_control(message)
bridge.ssh_peer.thaw_endpoint()

View File

@ -145,7 +145,7 @@ class Bridge(Router, PackagesListener):
}
if self.packages is not None:
init_args['packages'] = {p: None for p in self.packages.packages}
init_args['packages'] = dict.fromkeys(self.packages.packages)
self.write_control(init_args)

View File

@ -184,7 +184,7 @@ class DBusChannel(Channel):
self.send_json(owner=owner)
def handler(message):
name, old, new = message.get_body()
_name, _old, new = message.get_body()
send_owner(owner=new if new != "" else None)
self.add_signal_handler(handler,
sender='org.freedesktop.DBus',
@ -308,7 +308,7 @@ class DBusChannel(Channel):
timeout = message.get('timeout')
if timeout is not None:
# sd_bus timeout is µs, cockpit API timeout is ms
# sd_bus timeout is μs, cockpit API timeout is ms
timeout *= 1000
else:
# sd_bus has no "indefinite" timeout, so use MAX_UINT64
@ -399,7 +399,7 @@ class DBusChannel(Channel):
(path, interfaces) = message.get_body()
logger.debug('interfaces removed %s %s', path, interfaces)
async with self.watch_processing_lock:
notify = {path: {name: None for name in interfaces}}
notify = {path: dict.fromkeys(interfaces)}
self.send_json(notify=notify)
self.add_async_signal_handler(handler,

View File

@ -373,12 +373,12 @@ class FsInfoChannel(Channel, PathWatchListener):
if reset:
if set(self.current_value) & set(updates):
# if we have an overlap, we need to do a proper reset
self.send_json({name: None for name in self.current_value}, partial=True)
self.send_json(dict.fromkeys(self.current_value), partial=True)
self.current_value = {'partial': True}
updates.update(partial=None)
else:
# otherwise there's no overlap: we can just remove the old keys
updates.update({key: None for key in self.current_value})
updates.update(dict.fromkeys(self.current_value))
json_merge_and_filter_patch(self.current_value, updates)
if updates:

View File

@ -129,7 +129,7 @@ class SshPeer(Peer):
except ferny.SshAuthenticationError as exc:
logger.debug('authentication to host %s failed: %s', host, exc)
results = {method: 'not-provided' for method in exc.methods}
results = dict.fromkeys(exc.methods, "not-provided")
if 'password' in results and self.password is not None:
if responder.password_attempts == 0:
results['password'] = 'not-tried'

View File

@ -59,30 +59,28 @@ UNEXPECTED_MESSAGE = "FAIL: Test completed, but found unexpected "
PIXEL_TEST_MESSAGE = "Some pixel tests have failed"
__all__ = (
# Test definitions
'test_main',
'arg_parser',
'PIXEL_TEST_MESSAGE',
'TEST_DIR',
'UNEXPECTED_MESSAGE',
'Browser',
'Error',
'MachineCase',
'nondestructive',
'arg_parser',
'no_retry_when_changed',
'nondestructive',
'onlyImage',
'skipImage',
'skipDistroPackage',
'skipOstree',
'opts',
'sit',
'skipBrowser',
'skipDistroPackage',
'skipImage',
'skipOstree',
'test_main',
'timeout',
'todo',
'todoPybridge',
'todoPybridgeRHEL8',
'timeout',
'Error',
'sit',
'wait',
'opts',
'TEST_DIR',
'UNEXPECTED_MESSAGE',
'PIXEL_TEST_MESSAGE'
)
# Command line options
@ -237,7 +235,7 @@ class Browser:
Error: When a timeout occurs waiting for the page to load.
"""
if href.startswith("/"):
schema = tls and "https" or "http"
schema = "https" if tls else "http"
href = "%s://%s:%s%s" % (schema, self.address, self.port, href)
if not self.current_layout and os.environ.get("TEST_SHOW_BROWSER") in [None, "pixels"]:
@ -940,7 +938,7 @@ class Browser:
return int(m.group(1))
def ignore_ssl_certificate_errors(self, ignore: bool):
action = ignore and "continue" or "cancel"
action = "continue" if ignore else "cancel"
if opts.trace:
print("-> Setting SSL certificate error policy to %s" % action)
self.cdp.command(f"setSSLBadCertificateAction('{action}')")
@ -1318,7 +1316,7 @@ class MachineCase(unittest.TestCase):
return self.__class__.__name__ + '-' + self._testMethodName
def new_machine(self, image=None, forward=None, restrict=True, cleanup=True, inherit_machine_class=True, **kwargs):
machine_class = inherit_machine_class and self.machine_class or testvm.VirtMachine
machine_class = (inherit_machine_class and self.machine_class) or testvm.VirtMachine
if opts.address:
if forward:
@ -1860,7 +1858,7 @@ class MachineCase(unittest.TestCase):
"""Check for unexpected journal entries."""
machine = machine or self.machine
# on main machine, only consider journal entries since test case start
cursor = (machine == self.machine) and self.journal_start or None
cursor = self.journal_start if machine == self.machine else None
# Journald does not always set trusted fields like
# _SYSTEMD_UNIT or _EXE correctly for the last few messages of

View File

@ -283,4 +283,3 @@ def test_overlapping_minified(pkgdir):
assert document.data.read().decode() == 'min'
document = packages.load_path('/one/two.min.js', {})
assert document.data.read().decode() == 'min'

View File

@ -212,7 +212,7 @@ class TestStdio:
@pytest.mark.asyncio
async def test_terminal_write_eof(self):
# Make sure write_eof() fails
with self.create_terminal() as (ours, protocol, transport):
with self.create_terminal() as (ours, _protocol, transport):
assert not transport.can_write_eof()
with pytest.raises(RuntimeError):
transport.write_eof()
@ -221,7 +221,7 @@ class TestStdio:
@pytest.mark.asyncio
async def test_terminal_disconnect(self):
# Make sure disconnecting the session shows up as an EOF
with self.create_terminal() as (ours, protocol, transport):
with self.create_terminal() as (ours, protocol, _transport):
os.close(ours)
while not protocol.eof:
await asyncio.sleep(0.1)

View File

@ -593,27 +593,27 @@ class TestConnection(testlib.MachineCase):
self.sed_file('$ a\\\nPermitEmptyPasswords yes', '/etc/ssh/sshd_config',
self.restart_sshd)
def assertInOrNot(string, result, expected):
def assertInOrNot(string: str, result: str, *, expected: bool) -> None:
if expected:
self.assertIn(string, result)
else:
self.assertNotIn(string, result)
def checkMotdForUser(string, user, expected):
def checkMotdForUser(string: str, user: str, *, expected: bool) -> None:
result = m.execute(f"ssh -o StrictHostKeyChecking=no -n {user}@localhost")
assertInOrNot(string, result, expected)
assertInOrNot(string, result, expected=expected)
def checkMotdContent(string, expected=True):
def checkMotdContent(string: str, *, expected: bool = True) -> None:
# Needs https://github.com/linux-pam/linux-pam/pull/292 (or PAM 1.5.0)
old_pam = m.image.startswith('rhel-8-') or m.image in ['centos-8-stream', 'ubuntu-2204']
# check issue (should be exactly the same as motd)
assertInOrNot(string, m.execute("cat /etc/issue.d/cockpit.issue"), expected)
assertInOrNot(string, m.execute("cat /etc/issue.d/cockpit.issue"), expected=expected)
# check motd as 'root' (via cat) and 'admin' and 'user' (via ssh)
assertInOrNot(string, m.execute("cat /etc/motd.d/cockpit"), expected)
assertInOrNot(string, m.execute("cat /etc/motd.d/cockpit"), expected=expected)
checkMotdForUser(string, expected=expected, user='admin')
checkMotdForUser(string, expected=old_pam and expected or False, user='user')
checkMotdForUser(string, expected=old_pam and expected, user='user')
m.stop_cockpit()
checkMotdContent('systemctl')

View File

@ -37,7 +37,7 @@ class KdumpHelpers(testlib.MachineCase):
def assertActive(self, active, browser=None):
browser = browser or self.browser
browser.wait_visible(".pf-v5-c-switch__input" + (active and ":checked" or ":not(:checked)"))
browser.wait_visible(".pf-v5-c-switch__input" + (":checked" if active else ":not(:checked)"))
def enableKdump(self):
if self.machine.image.startswith("fedora"):

View File

@ -32,7 +32,7 @@ def getCompressedMinuteValue(test, g_type, saturation, hour, minute):
polygon_class = ".polygon-sat" if saturation else ".polygon-use"
sel = f"#metrics-hour-{hour} div.metrics-minute[data-minute='{minute}'] div.metrics-data-{g_type} .compressed{polygon_class}"
m = re.search(r"--%s:\s*([0-9.]+);" % (saturation and "saturation" or "utilization"), test.browser.attr(sel, "style"))
m = re.search(r"--%s:\s*([0-9.]+);" % ("saturation" if saturation else "utilization"), test.browser.attr(sel, "style"))
test.assertIsNotNone(m)
return float(m.group(1))
@ -696,7 +696,7 @@ class TestHistoryMetrics(testlib.MachineCase):
for _ in range(10):
b.click("#metrics-header-section button.pf-m-secondary")
b.wait_visible('#switch-pmproxy')
found = b.is_present("#switch-pmproxy" + (expected and ":checked" or ":not(:checked)"))
found = b.is_present("#switch-pmproxy" + (":checked" if expected else ":not(:checked)"))
b.click(f"{self.pcp_dialog_selector} button.btn-cancel")
b.wait_not_present(self.pcp_dialog_selector)

View File

@ -19,6 +19,7 @@
import os
import time
from collections.abc import Collection
import packagelib
import testlib
@ -68,8 +69,9 @@ class TestUpdates(NoSubManCase):
# make sure we don't have any extra ones
self.assertFalse(self.browser.is_present(selector.format(len(updates) + 1)))
def check_nth_update(self, index, pkgname, version, severity="bug",
num_issues=None, desc_matches=None, cves=None, bugs=None, arch=None):
def check_nth_update(self, index: int, pkgname: str, version: str, severity: str = "bug",
num_issues: 'int | None' = None, desc_matches: Collection[str] = (),
cves: Collection[str] = (), bugs: Collection[str] = (), arch: 'str | None' = None) -> str:
"""Check the contents of the package update table row at index
None properties will not be tested.
@ -93,7 +95,7 @@ class TestUpdates(NoSubManCase):
severity_to_aria = {"bug": "bug fix", "enhancement": "enhancement", "security": "security"}
b.wait_visible(f"{row} [data-label=Severity] .severity-icon[aria-label='{severity_to_aria[severity]}']")
self.assertEqual(b.text(row + "[data-label=Severity]").strip(),
num_issues is not None and str(num_issues) or "")
'' if num_issues is None else str(num_issues))
# should not be expanded by default
self.assertNotIn("pf-m-expanded", b.attr(row, "class"))
@ -104,7 +106,7 @@ class TestUpdates(NoSubManCase):
desc = b.text(row + "> tr.pf-m-expanded")
# details should contain all description bits, CVEs and bug numbers
for m in (desc_matches or []) + (cves or []) + (bugs or []):
for m in (*desc_matches, *cves, *bugs):
self.assertIn(m, desc)
return row

View File

@ -311,7 +311,7 @@ class TestSelinuxEnforcing(testlib.MachineCase):
def assertEnforce(active):
# polled every 10s
with b.wait_timeout(20):
b.wait_visible(".pf-v5-c-switch__input" + (active and ":checked" or ":not(:checked)"))
b.wait_visible(".pf-v5-c-switch__input" + (":checked" if active else ":not(:checked)"))
# SELinux should be enabled and enforcing at the beginning
assertEnforce(active=True)

View File

@ -497,7 +497,7 @@ machine : 8561
if expect_smt_state is not None:
b.wait_visible('#cpu-mitigations-dialog .nosmt-heading:contains(nosmt)')
b.wait_visible('#cpu-mitigations-dialog #nosmt-switch input' +
(expect_smt_state and ":checked" or ":not(:checked)"))
(":checked" if expect_smt_state else ":not(:checked)"))
b.logout()
finally:
@ -528,7 +528,7 @@ machine : 8561
b.click('#hwinfo button:contains(Mitigations)')
b.click('#cpu-mitigations-dialog #nosmt-switch input')
b.wait_visible('#cpu-mitigations-dialog #nosmt-switch input' +
(self.expect_smt_default and ':not(:checked)' or ':checked'))
(':not(:checked)' if self.expect_smt_default else ':checked'))
b.click('#cpu-mitigations-dialog Button:contains(Save and reboot)')
m.wait_reboot()
@ -571,10 +571,10 @@ fi
b.click('#hwinfo button:contains(Mitigations)')
b.wait_visible('#cpu-mitigations-dialog .nosmt-heading:contains(nosmt)')
b.wait_visible('#cpu-mitigations-dialog #nosmt-switch input' +
(self.expect_smt_default and ':not(:checked)' or ':checked'))
(':not(:checked)' if self.expect_smt_default else ':checked'))
b.click('#cpu-mitigations-dialog #nosmt-switch input')
b.wait_visible('#cpu-mitigations-dialog #nosmt-switch input' +
(self.expect_smt_default and ':checked' or ':not(:checked)'))
(':checked' if self.expect_smt_default else ':not(:checked)'))
b.click('#cpu-mitigations-dialog Button:contains(Save and reboot)')
m.wait_reboot()
if self.expect_smt_default:

View File

@ -104,7 +104,7 @@ for directory, _subdirs, files in os.walk(f'{BASE_DIR}/dist'):
raise SystemError('Can not determine licenses of:\n%s' % comment)
for license_id in licenses:
if license_id.lower() not in license_ids:
raise KeyError('License {license_id} not found in {TEMPLATE_FILE}')
raise KeyError(f'License {license_id} not found in {TEMPLATE_FILE}')
# All bundles also contain our own code
licenses.add("LGPL-2.1-or-later")