cockpit/test/verify/check-static-login

970 lines
42 KiB
Python
Executable File

#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)
# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.
import base64
import re
import subprocess
import time
from testlib import (
MachineCase,
nondestructive,
skipBrowser,
skipDistroPackage,
skipImage,
skipOstree,
test_main,
wait,
)
@nondestructive
@skipDistroPackage()
class TestLogin(MachineCase):
def check_shell(self):
b = self.browser
b.wait_visible("#content")
b.wait_text('#current-username', 'admin')
b.set_layout("mobile")
b.click("#hosts-sel button")
b.wait_in_text(".view-hosts .pf-m-current", "admin @")
b.click("#hosts-sel button")
b.set_layout("desktop")
def testBasic(self):
m = self.machine
b = self.browser
# Setup users and passwords
m.execute("useradd user -c 'Barney Bär'")
m.execute("echo user:abcdefg | chpasswd")
admins_only_pam = """account sufficient pam_succeed_if.so uid = 0\\
account required pam_succeed_if.so user ingroup %s""" % m.get_admin_group()
# Setup a special PAM config that disallows non-wheel users
def deny_non_root(remote_filename):
if m.image == "arch":
self.sed_file(f'1 a {admins_only_pam}', remote_filename)
else:
self.sed_file(f'/nologin/a {admins_only_pam}', remote_filename)
deny_non_root("/etc/pam.d/cockpit")
deny_non_root("/etc/pam.d/sshd")
m.start_cockpit()
b.open("/system")
# Test banner
# Test that we don't show banner when not specified
b.wait_visible("#login-user-input")
b.wait_not_visible("#banner")
if m.ostree_image:
m.execute("podman exec ws printf '[Session]\nBanner = /host/etc/issue\n' > /etc/cockpit/cockpit.conf")
else:
m.execute("printf '[Session]\nBanner = /etc/issue\n' > /etc/cockpit/cockpit.conf")
m.restart_cockpit()
b.reload()
b.wait_visible("#login-user-input")
b.wait_visible("#banner")
self.assertEqual(b.text("#banner-message"), m.execute("cat /etc/issue").rstrip())
# Test non existent file
m.execute("printf '[Session]\nBanner = /etc/non-existing-file\n' > /etc/cockpit/cockpit.conf")
self.allow_journal_messages("error loading contents of banner: Failed to open file “/etc/non-existing-file”: No such file or directory")
m.restart_cockpit()
b.reload()
b.wait_visible("#login-user-input")
b.wait_not_visible("#banner")
# Try to login as a non-existing user
b.try_login("nonexisting", "blahblah")
b.wait_text_not("#login-error-message", "")
self.assertNotIn("web", m.execute("who"))
# Try to login as user with a wrong password
b.try_login("user", "gfedcba")
b.wait_text_not("#login-error-message", "")
self.assertNotIn("web", m.execute("who"))
self.allow_journal_messages(".* user: Authentication failure.*")
# Try to login as user with correct password
b.try_login("user", "abcdefg")
if m.ostree_image:
b.wait_in_text("#login-error-message", "Server closed connection")
else:
b.wait_text("#login-error-message", "Permission denied")
self.assertNotIn("web", m.execute("who"))
# Try to login with disabled shell; this does not work on OSTree where
# we log in through ssh
if not m.ostree_image:
m.execute("usermod --shell /bin/false admin; sync")
b.reload()
b.try_login("admin", "foobar")
b.wait_text_not("#login-error-message", "")
m.execute("usermod --shell /bin/bash admin; sync")
# Login as admin
b.open("/system")
b.login_and_go()
self.check_shell()
if not m.ostree_image: # logs in via ssh, not cockpit-session
self.assertRegex(m.execute("who"), r"(^|\n)admin *web.*(\d+\.\d+|::)")
# reload, which should log us in with the cookie
b.reload()
self.check_shell()
if not m.ostree_image: # logs in via ssh, not cockpit-session
self.assertRegex(m.execute("who"), r"(^|\n)admin *web.*(\d+\.\d+|::)")
b.go("/users#/admin")
b.enter_page("/users")
b.wait_text("#account-user-name", "admin")
try:
m.execute("journalctl -p 7 SYSLOG_IDENTIFIER=cockpit-ws | grep 'cockpit-session: opening pam session'")
assert False, "cockpit-session debug messages found"
except subprocess.CalledProcessError:
pass
# Change login screen options
b.logout()
b.wait(lambda: "web console" not in m.execute("who"))
b.wait_visible("#option-group")
m.execute("printf '[WebService]\nLoginTo = false\n' > /etc/cockpit/cockpit.conf")
m.restart_cockpit()
b.open("/system")
b.wait_visible("#login")
b.wait_not_visible("#option-group")
# LoginTo= also disables direct URL
b.open("/=192.168.99.99/")
b.wait_visible("#login")
b.wait_not_visible("#option-group")
# logging in does not go via cockpit-ssh (which would cause a connection failure)
b.try_login()
# this isn't the most helpful error message, but this is essentially hacking
b.wait_text("#login-error-message", "Wrong user name or password")
# Default options be to display these options
m.execute("rm /etc/cockpit/cockpit.conf")
m.restart_cockpit()
b.open("/system")
b.wait_visible("#option-group")
# And now we remove cockpit-ssh which affects the default
if not m.ostree_image:
self.restore_dir(self.libexecdir)
m.execute(f"rm {self.libexecdir}/cockpit-ssh")
m.restart_cockpit()
b.open("/system")
b.wait_visible("#login")
b.wait_not_visible("#option-group")
# test login with tcsh
if not m.image.startswith("rhel") and not m.image.startswith("centos") and not m.image == "arch": # no tcsh in RHEL and in Arch Linux (TODO: available in [community])
try:
m.execute("sed -r -i.bak '/^admin:/ s_:[^:]+$_:/bin/tcsh_' /etc/passwd")
b.login_and_go()
b.enter_page('/system')
b.wait_visible('.system-information')
b.logout()
finally:
m.execute("mv /etc/passwd.bak /etc/passwd")
# login with user shell that prints some stdout/err noise
# having stdout output in ~/.bashrc confuses docker, so don't run on OSTree
m.execute("set -e; cd ~admin; cp -a .bashrc .bashrc.bak; [ ! -e .profile ] || cp -a .profile .profile.bak; "
"echo 'echo noise-rc-out; echo noise-rc-err >&2' >> .bashrc; "
"echo 'echo noise-profile-out; echo noise-profile-err >&2' >> .profile")
self.addCleanup(m.execute, "set -e; cd ~admin; mv .bashrc.bak .bashrc; "
"if [ -e .profile.bak ]; then mv .profile.bak .profile; else rm .profile; fi")
b.login_and_go()
self.allow_journal_messages("Returning error-response ... with reason .*",
r"pam_unix\(cockpit:auth\): authentication failure; .*",
r"pam_unix\(cockpit:auth\): check pass; user unknown",
r"pam_succeed_if\(cockpit:auth\): requirement .* not met by user .*",
"noise-rc-.*")
@skipOstree("logs in via ssh, not cockpit-session")
def testLogging(self):
m = self.machine
b = self.browser
def assert_messages(has_last, n_fail):
if has_last:
b.wait_visible('#system_last_login')
b.wait_in_text('#system_last_login', "Last successful login")
b.wait_in_text('#system_last_login_from', "from") # only present if IP was logged
if n_fail:
b.wait_visible('#system_last_login')
b.wait_in_text('#system_last_login', f'{n_fail} failed login')
b.wait_in_text('#system_last_login_from', "from")
b.wait_in_text('#system_last_login_success', "Last successful login")
else:
self.assertFalse(b.is_present('#system_last_login_success'))
def verify_correct(has_last, n_fail):
b.login_and_go('/system')
assert_messages(has_last, n_fail)
# reload and make sure it's still there (or not)
b.reload()
b.enter_page('/system')
assert_messages(has_last, n_fail)
b.logout()
m.start_cockpit()
# Clean out the relevant logfiles
m.execute("truncate -s0 /var/log/{[bw]tmp,lastlog} /var/run/utmp")
if m.image == "arch":
self.sed_file("s/# deny = 3/deny = 4/", "/etc/security/faillock.conf")
# First login should have no messages
verify_correct(False, 0)
# Next login should see the last login
verify_correct(True, 0)
# Do some bogus login attempts
b.try_login('admin', 'xyz')
b.wait_text_not("#login-error-message", "")
b.try_login('admin', 'xyz')
b.wait_text_not("#login-error-message", "")
b.try_login('admin', 'xyz')
b.wait_text_not("#login-error-message", "")
# We should see those bogus attempts now
verify_correct(False, 3)
# But after that login, they should be gone again
verify_correct(True, 0)
@skipImage("Arch Linux has no pwquality by default", "arch")
def testExpired(self):
m = self.machine
b = self.browser
# On OSTree this happens over ssh
if m.ostree_image:
self.restore_dir("/etc/ssh", '( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service')
m.execute("sed -i 's/.*ChallengeResponseAuthentication.*/ChallengeResponseAuthentication yes/' /etc/ssh/sshd_config /etc/ssh/sshd_config.d/*")
m.execute("( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service")
# test steps below assume a pam_pwquality config with retry > 1; on some images authselect drops that setting
if not m.image.startswith('debian') and not m.image.startswith('ubuntu') and not m.image.startswith("arch"):
self.sed_file("/password.*requisite.*pam_pwquality/ s/$/ retry=3/", "/etc/pam.d/password-auth")
m.execute("chage -d 0 admin")
m.start_cockpit()
b.open("/system")
b.wait_visible("#login")
b.wait_not_visible("#conversation-group")
b.wait_visible("#password-group")
b.wait_visible("#user-group")
b.set_val('#login-user-input', "admin")
b.set_val('#login-password-input', "foobar")
b.click('#login-button')
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
if m.ostree_image:
b.wait_in_text("#conversation-prompt", "You are required to change your password")
else:
b.wait_in_text("#conversation-message", "You are required to change your password")
b.set_val('#conversation-input', 'foobar')
b.click('#login-button')
# Type a bad password
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
b.wait_in_text("#conversation-prompt", "New password")
b.set_val('#conversation-input', 'admin')
b.click('#login-button')
# We should see a message
if m.ostree_image:
b.wait_in_text("#conversation-prompt", "BAD PASSWORD")
else:
b.wait_in_text("#conversation-message", "BAD PASSWORD")
# Now choose a better password
b.wait_not_present("#login-button:disabled")
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
b.wait_in_text("#conversation-prompt", "New password")
b.set_val('#conversation-input', '123foobar!@#')
b.click('#login-button')
# Retype the password wrong
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
b.wait_in_text("#conversation-prompt", "Retype")
b.set_val('#conversation-input', '123foobar!') # wrong
b.click('#login-button')
# We should see a message
if m.ostree_image:
b.wait_in_text("#conversation-prompt", "passwords do not match")
else:
b.wait_in_text("#conversation-message", "passwords do not match")
# Type the password again
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
b.wait_in_text("#conversation-prompt", "New password")
b.set_val('#conversation-input', '123foobar!@#')
b.click('#login-button')
# Now type it right
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
b.wait_in_text("#conversation-prompt", "Retype")
b.set_val('#conversation-input', '123foobar!@#')
b.click('#login-button')
self.check_shell()
self.allow_journal_messages('.*You are required to change your password immediately.*',
'.*user account or password has expired.*',
'.*BAD PASSWORD.*',
'.*Sorry, passwords do not match.')
self.allow_restart_journal_messages()
def testConversation(self):
m = self.machine
b = self.browser
conf = "/etc/pam.d/cockpit"
if m.ostree_image:
conf = "/etc/pam.d/sshd"
self.restore_dir("/etc/ssh", '( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service')
m.execute("sed -i 's/.*ChallengeResponseAuthentication.*/ChallengeResponseAuthentication yes/' /etc/ssh/sshd_config /etc/ssh/sshd_config.d/*")
m.execute("( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service")
# On Arch Linux the ordering matters due to an auth include for system-remote-login
if self.machine.image == "arch":
self.sed_file('1 a auth required mock-pam-conv-mod.so', conf)
else:
self.sed_file('5 a auth required mock-pam-conv-mod.so', conf)
m.start_cockpit()
b.open("/system")
# Try to login as a non-existing user
b.try_login("nonexisting", "blahblah")
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
b.wait_in_text("#conversation-prompt", "life the universe")
b.set_val('#conversation-input', '43')
b.click('#login-button')
b.wait_text_not("#login-error-message", "")
b.try_login("admin", "foobar")
b.wait_visible("#conversation-group")
b.wait_not_visible("#password-group")
b.wait_not_visible("#user-group")
b.wait_in_text("#conversation-prompt", "life the universe")
b.set_val('#conversation-input', '42')
b.click('#login-button')
self.check_shell()
self.allow_restart_journal_messages()
@skipImage("No tlog", "debian-*", "ubuntu-*", "arch", "centos-8-stream")
@skipOstree("No tlog")
def testSessionRecordingShell(self):
m = self.machine
b = self.browser
m.execute("useradd user --shell /usr/bin/tlog-rec-session")
m.execute("echo user:abcdefg | chpasswd")
# this doesn't actually record anything, but logging into cockpit should work
m.start_cockpit()
b.login_and_go("/system", user="user", password="abcdefg")
b.wait_visible(".pf-v5-c-alert:contains('Web console is running in limited access mode.')")
b.logout()
self.allow_journal_messages(".*value for the SHELL variable was not found the /etc/shells.*",
"Locale charset is ANSI.*",
"Assuming locale environment is lost.*",
"ATTENTION! Your session is being recorded!")
def curl_auth(self, url, userpass):
header = "Authorization: Basic " + base64.b64encode(userpass.encode()).decode()
return subprocess.check_output(['/usr/bin/curl', '-s', '-k', '-D', '-', '--header', header,
f'http://{self.machine.web_address}:{self.machine.web_port}{url}'],
universal_newlines=True)
def curl_auth_code(self, url, userpass):
lines = self.curl_auth(url, userpass).splitlines()
self.assertGreater(len(lines), 0)
tokens = lines[0].split(' ', 2)
self.assertEqual(len(tokens), 3)
return int(tokens[1])
def testRaw(self):
self.machine.start_cockpit()
time.sleep(0.5)
self.assertEqual(self.curl_auth_code('/cockpit/login', ''), 401)
self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:'), 401)
self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar\n'), 401)
self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar:baz'), 401)
self.assertEqual(self.curl_auth_code('/cockpit/login', ':\n\n'), 401)
self.assertIn(self.curl_auth_code('/cockpit/login', 'admin:bar'), [401, 403])
self.assertEqual(self.curl_auth_code('/cockpit/login', 'foo:bar'), 401)
self.assertIn(self.curl_auth_code('/cockpit/login', 'admin:' + 'x' * 4000), [401, 403])
self.assertEqual(self.curl_auth_code('/cockpit/login', 'x' * 4000 + ':bar'), 401)
self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':'), 401)
self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':b\nc'), 401)
self.assertEqual(self.curl_auth_code('/cockpit/login', 'a' * 4000 + ':b\nc\n'), 401)
self.allow_journal_messages("Returning error-response ... with reason .*",
r"pam_unix\(cockpit:auth\): authentication failure; .*",
r"pam_unix\(cockpit:auth\): check pass; user unknown",
r"pam_succeed_if\(cockpit:auth\): requirement .* not met by user .*",
"couldn't parse login input: Malformed input",
"couldn't parse login input: Authentication failed")
@skipImage("No SELinux", "debian-*", "ubuntu-*", "arch")
@skipOstree("No semanage")
def testSELinuxRestrictedUser(self):
m = self.machine
b = self.browser
# non-admin user_u
m.execute("useradd unpriv; echo 'unpriv:foobar' | chpasswd; semanage login -a -s user_u unpriv")
self.addCleanup(m.execute, "semanage login -d -s user_u unpriv")
m.start_cockpit()
b.login_and_go("/system", user="unpriv")
# generate lastlog entry
b.logout()
b.login_and_go("/system", user="unpriv")
# not an admin
b.wait_visible(".pf-v5-c-alert:contains('Web console is running in limited access mode.')")
b.wait_in_text('#system_last_login', "Last successful login")
b.wait_in_text('#system_last_login_from', "web console")
b.logout()
# not allowed to restricted users
self.allow_journal_messages('.*type=1400.*avc: denied { map }.*comm="cockpit-pcp".*')
self.allow_journal_messages('.*type=1400.*avc: denied .* comm="sudo".*')
self.allow_journal_messages('.*type=1400.*avc: denied .* comm="systemd".*')
self.allow_journal_messages('.*type=1400.*avc: denied { watch } .* comm="cockpit-bridge".*')
self.allow_journal_messages('.*sudo:.* setresuid(.*): Operation not permitted')
self.allow_journal_messages('.*sudo: error initializing audit plugin sudoers_audit')
# https://bugzilla.redhat.com/show_bug.cgi?id=1727887
self.allow_journal_messages('.*type=1400.*avc: denied { connectto } .* path="/run/user/.*/bus" scontext=user_u:user_r:user_t:s0.*')
# sysadm_u
m.execute("useradd priv; echo 'priv:foobar' | chpasswd")
m.execute("usermod -aG wheel priv")
m.execute("semanage login -a -s sysadm_u priv")
self.addCleanup(m.execute, "semanage login -d -s sysadm_u priv")
b.login_and_go("/system", user="priv")
# passing login info memfd should work
b.logout()
b.login_and_go("/system", user="priv")
b.wait_in_text('#system_last_login', "Last successful login")
b.wait_in_text('#system_last_login_from', "web console")
b.go("/playground/test")
b.enter_page("/playground/test")
b.click(".super-channel button")
b.wait_in_text(".super-channel span", 'result: ')
if m.image.startswith("rhel-8") or m.image in ["centos-8-stream", "rhel-9-1"]:
# https://bugzilla.redhat.com/show_bug.cgi?id=1814569
self.assertIn('result: access-denied', b.text(".super-channel span"))
else:
self.assertIn('result: uid=0', b.text(".super-channel span"))
def testNoAdminGroup(self):
m = self.machine
b = self.browser
# Create a user that is not in wheel but can sudo
m.execute("useradd user -s /bin/bash -c User")
m.execute("echo user:foobar | chpasswd")
self.write_file("/etc/sudoers.d/user", "user ALL=(ALL) ALL")
self.password = "foobar"
self.login_and_go("/system", user="user")
# session is privileged
b.switch_to_top()
b.check_superuser_indicator("Administrative access")
b.enter_page('/system')
# shutdown button should be enabled and working
b.click("#reboot-button")
b.wait_popup("shutdown-dialog")
b.wait_in_text(f"#shutdown-dialog button{self.danger_btn_class}", 'Reboot')
b.click("#delay")
b.click("button:contains('5 minutes')")
b.wait_text("#delay .pf-v5-c-select__toggle-text", "5 minutes")
b.click(f"#shutdown-dialog button{self.danger_btn_class}")
# cancel reboot
b.wait_in_text('#system-health-shutdown-status-text', "Scheduled reboot")
b.click("#system-health-shutdown-status-cancel-btn")
b.wait_not_present('#system-health-shutdown-status')
def testUnsupportedBrowser(self):
m = self.machine
b = self.browser
m.start_cockpit()
# pretend browser doesn't support a required capability
b.cdp.invoke("Page.addScriptToEvaluateOnNewDocument", source="window.WebSocket = undefined;")
b.open("/system")
b.wait_visible("#unsupported-browser")
b.wait_not_visible("#login-fatal")
b.wait_not_visible("#login")
b.wait_not_visible("#login-details")
def testBypassSoftRequirements(self):
m = self.machine
b = self.browser
m.start_cockpit()
b.cdp.invoke("Page.addScriptToEvaluateOnNewDocument", source="""
window.CSS.supports = function() { return false; };
""")
b.open("/system")
b.wait_visible("#unsupported-browser")
b.wait_in_text("#show-other-login-options-text", "Bypass browser check")
b.click(".pf-v5-c-expandable-section")
b.set_val('#login-user-input', "admin")
b.set_val('#login-password-input', "foobar")
b.click("#login-button")
self.check_shell()
@skipOstree("Starting on OSTree is weird")
def testFailingWebsocket(self, safari=False, cacert=False):
m = self.machine
b = self.browser
# Cause cockpit-ws to reject WebSocket connections
m.write("/etc/cockpit/cockpit.conf", "[WebService]\nOrigins=foo.bar.com\n")
self.allow_journal_messages("received request from bad Origin: .*",
"connection unexpectedly closed by peer",
"Received invalid handshake request from the client")
self.allow_browser_errors(".*")
m.start_cockpit()
# Really start Cockpit to make sure it has generated all its certificates.
m.execute("systemctl start cockpit")
if safari:
b.set_user_agent("Safari/300")
if cacert:
m.write("/etc/cockpit/ws-certs.d/0-self-signed-ca.pem", "FAKE CERT FOR TESTING\n")
else:
m.execute("rm -f /etc/cockpit/ws-certs.d/0-self-signed-ca.pem")
# Log in.
b.open("/system")
b.wait_visible("#login")
b.set_val('#login-user-input', "admin")
b.set_val('#login-password-input', "foobar")
b.click('#login-button')
b.wait_visible("#early-failure")
if safari and cacert:
b.wait_visible("#safari-cert-help")
else:
b.wait_not_present("#safari-cert-help")
@skipBrowser("Enough when only chromium pretends to be a different browser", "firefox")
@skipOstree("Starting on OSTree is weird")
def testFailingWebsocketSafari(self):
self.testFailingWebsocket(safari=True, cacert=True)
@skipBrowser("Enough when only chromium pretends to be a different browser", "firefox")
@skipOstree("Starting on OSTree is weird")
def testFailingWebsocketSafariNoCA(self):
self.testFailingWebsocket(safari=True, cacert=False)
def testFaillock(self):
m = self.machine
b = self.browser
module = 'faillock'
def logfile(user):
return '/var/run/faillock/' + user
def expect_fail_count(expected, user="admin", must_exist=True):
if must_exist:
cksum = m.execute(f"cksum {logfile(user)}")
output = m.execute(f"{module} --user {user}")
n_lines = len(output.splitlines())
if n_lines:
n_lines -= 2 # remove the header, if it's there
self.assertEqual(expected, n_lines)
# make sure the above command didn't change the file
if must_exist:
self.assertEqual(cksum, m.execute(f"cksum {logfile(user)}"))
def expect_failed_login(user, password, n_failed):
b.try_login(user, password)
b.wait_text_not("#login-error-message", "")
expect_fail_count(n_failed, user=user)
def expect_successful_login(user, password):
b.login_and_go('/system', user=user, password=password)
expect_fail_count(0, user=user)
b.logout()
self.enable_root_login()
# ensure we have no module in our pam config already
# arch has faillock enabled by default
if m.image != "arch":
m.execute(f"! grep -r '{module}' /etc/pam.d")
# add it to the pam config
if m.image.startswith('debian-') or m.image.startswith('ubuntu'):
# enable pam_faillock. see example in pam_faillock(8)
self.sed_file(r"/fallback if no module succeeds/ s/^/"
r"auth [default=die] pam_faillock.so authfail deny=4\n"
r"auth sufficient pam_faillock.so authsucc deny=4\n/",
"/etc/pam.d/common-auth")
elif m.image == "arch":
self.sed_file("s/# deny = 3/deny = 4/", "/etc/security/faillock.conf")
else:
# see https://access.redhat.com/solutions/62949
self.sed_file("""/pam_unix/ {
s/sufficient/[success=1 default=ignore]/\n
aauth [default=die] pam_faillock.so authfail audit deny=4 unlock_time=600\n
aauth sufficient pam_faillock.so authsucc audit deny=4 unlock_time=600\n
}""", "/etc/pam.d/password-auth")
m.execute(f"grep -r '{module}' /etc/pam.d")
m.start_cockpit()
# start from a clean slate
clean_cmd = f"rm -f {logfile('admin')}"
self.addCleanup(m.execute, clean_cmd)
m.execute(clean_cmd)
# make sure we can still login
b.login_and_go("/system")
b.logout()
# and it should show zero fails
expect_fail_count(0, must_exist=False)
# try three bogus login attempts
for n in [1, 2, 3]:
expect_failed_login("admin", "bad", n)
expect_fail_count(3)
# make sure we can still login
expect_successful_login("admin", "foobar")
# after the success, try three more bogus login attempts
for n in [1, 2, 3]:
expect_failed_login("admin", "bad", n)
expect_successful_login("admin", "foobar")
# now try four, which should lock the account
for n in [1, 2, 3, 4]:
expect_failed_login("admin", "bad", n)
# logging in should fail now
expect_failed_login("admin", "foobar", n)
# having given the correct password the last time should not have helped things
expect_failed_login("admin", "foobar", n)
# but we can reset the lockout
m.execute(module + " --reset --user admin")
expect_fail_count(0)
# and login again
expect_successful_login("admin", "foobar")
# ostree images log in via sshd, which forbids root logging in with a password
if not m.ostree_image:
# make sure root never gets locked out
for n in range(1, 10):
expect_failed_login("root", "bad", n)
expect_successful_login("root", "foobar")
self.allow_journal_messages(".*[aA]ccount .*locked due to .* failed logins.*")
self.allow_journal_messages(".*minutes left to unlock.*")
@skipOstree("root logins disabled by default with ssh")
def testPamAccess(self):
b = self.browser
m = self.machine
m.start_cockpit()
b.open("/system")
# root login is disabled by default via /etc/cockpit/disallowed-users on everything except RHEL 8
if not m.image.startswith("rhel-8") and not m.image.startswith("centos-8"):
b.try_login("root", "foobar")
b.wait_in_text("#login-error-message", "Wrong user name or password")
# disable root login with pam_access
self.enable_root_login()
self.write_file("/etc/security/access.conf", "- : root : ALL\n", append=True)
self.sed_file("1 aaccount required pam_access.so", "/etc/pam.d/cockpit")
b.try_login(user="root")
b.wait_in_text("#login-error-message", "Permission denied")
self.allow_journal_messages(r"cockpit-session: user account access failed.*root.*")
@skipOstree("sssd not available")
def testClientCertAuthentication(self):
m = self.machine
if m.image.startswith("debian") or m.image.startswith("ubuntu"):
# on Debian/Ubuntu, an unconfigured sssd fails to start, so only restart it at the end if it was running before
# also, sssd is split into several services
self.restore_dir("/etc/sssd", post_restore_action="systemctl stop 'sssd*'")
else:
self.restore_dir("/etc/sssd", post_restore_action="systemctl try-restart sssd")
m.execute("useradd alice; echo alice:foobar123 | chpasswd")
m.upload(["alice.pem", "alice.key", "alice-expired.pem", "bob.pem", "bob.key"], self.vm_tmpdir,
relative_dir="src/tls/ca/")
alice_cert = self.vm_tmpdir + "/alice.pem"
alice_cert_expired = self.vm_tmpdir + "/alice-expired.pem"
alice_key = self.vm_tmpdir + "/alice.key"
alice_cert_key = ['--cert', alice_cert, '--key', alice_key]
# set up local (NIS) sssd provider and certificate mapping
self.write_file("/etc/sssd/sssd.conf", """
[sssd]
domains = local
[domain/local]
id_provider = files
[certmap/local/alice]
# WARNING: Requires sssd >= 2.6.1 and installing sssd_auth_ca_db.pem; with earlier sssd this is completely unsafe
matchrule = <SUBJECT>^DC=LAN,DC=COCKPIT,CN=alice$
""", perm="0600")
m.execute("systemctl restart sssd")
# ensure sssd certificate lookup without validation works
user_obj = m.execute('busctl call org.freedesktop.sssd.infopipe /org/freedesktop/sssd/infopipe/Users '
'org.freedesktop.sssd.infopipe.Users FindByCertificate s -- '
'''"$(cat %s)" | sed 's/^o "//; s/"$//' ''' % alice_cert)
self.assertEqual(m.execute('busctl get-property org.freedesktop.sssd.infopipe ' + user_obj.strip() +
' org.freedesktop.sssd.infopipe.Users.User name').strip(),
's "alice"')
# sssd 2.6.1 introduces FindByValidCertificate() method which validates the given certificate
# against the configured CA. Without this, certmap rules are completely unsafe!
api = m.execute("busctl introspect org.freedesktop.sssd.infopipe /org/freedesktop/sssd/infopipe/Users")
has_validate_api = 'FindByValidCertificate' in api
# we don't want to completely pinpoint the OS list to avoid lockstep image refreshes+cockpit PRs
# and spontaneous packit test failures as sssd 2.6.1 goes into more distros; but make sure we hit
# this code path on some known-good ones
if m.image.startswith("fedora") or m.image in ["debian-testing"]:
self.assertTrue(has_validate_api)
if has_validate_api:
err = m.execute('! busctl call org.freedesktop.sssd.infopipe /org/freedesktop/sssd/infopipe/Users '
'org.freedesktop.sssd.infopipe.Users FindByValidCertificate s -- '
'''"$(cat %s)" 2>&1''' % alice_cert)
# HACK: sssd 2.6.1 has a broken error message ("Invalid certificate provided"), accept that as well
# until 2.6.2 is available everywhere. https://github.com/SSSD/sssd/issues/5911
self.assertRegex(err, "Invalid certificate provided|Certificate authority file not found")
# install our CA, so that sssd can validate
with open("src/tls/ca/ca.pem") as f:
m.write("/etc/sssd/pki/sssd_auth_ca_db.pem", f.read())
u = m.execute('busctl call org.freedesktop.sssd.infopipe /org/freedesktop/sssd/infopipe/Users '
'org.freedesktop.sssd.infopipe.Users FindByCertificate s -- '
'''"$(cat %s)" | sed 's/^o "//; s/"$//' ''' % alice_cert)
self.assertEqual(u, user_obj)
# These tests have to be run with curl, as chromium-headless does not support selecting/handling client-side
# certificates; it just rejects cert requests. For interactive tests, grab src/tls/ca/alice.p12 and import
# it into the browser.
def do_test(authopts, expected, not_expected=[], session_leader=None):
m.start_cockpit(tls=True)
output = m.execute(['curl', '-ksS', '-D-'] + authopts + ['https://localhost:9090/cockpit/login'])
for s in expected:
self.assertIn(s, output)
for s in not_expected:
self.assertNotIn(s, output)
# sessions/users often hang around in State=closing for a long time, ignore these
if session_leader:
m.execute('until [ "$(loginctl show-user --property=State --value alice)" = "active" ]; do sleep 1; done')
sessions = m.execute('loginctl show-user --property=Sessions --value alice').strip().split()
self.assertGreaterEqual(len(sessions), 1)
for session in sessions:
out = m.execute('loginctl session-status ' + session)
if "State: active" in out: # skip closing sessions
self.assertIn(session_leader, out)
self.assertIn('cockpit-bridge', out)
self.assertIn('cockpit; type web', out)
break
else:
self.fail("no active session for active user")
# sessions time out after 10s, but let's not wait for that
m.execute('loginctl terminate-session ' + sessions[0])
# wait until the session is gone
m.execute("while loginctl show-user alice | grep -q 'State=active'; do sleep 1; done")
m.stop_cockpit()
# from sssd
self.allow_journal_messages("alice is not allowed to run sudo.*")
# cert auth should not be enabled by default
do_test(alice_cert_key, ["HTTP/1.1 401 Authentication failed"])
# password auth should work
do_test(['-u', 'alice:foobar123'],
['HTTP/1.1 200 OK', '"csrf-token"'],
session_leader='cockpit-session')
# enable cert based auth
m.write("/etc/cockpit/cockpit.conf", '[WebService]\nClientCertAuthentication = true\n', append=True)
# cert auth should work now
do_test(alice_cert_key, ['HTTP/1.1 200 OK', '"csrf-token"'])
# password auth, too
do_test(['-u', 'alice:foobar123'],
['HTTP/1.1 200 OK', '"csrf-token"'],
session_leader='cockpit-session')
# another certificate gets rejected
self.allow_journal_messages("cockpit-session: No matching user for certificate")
do_test(["--cert", self.vm_tmpdir + "/bob.pem", "--key", self.vm_tmpdir + "/bob.key"],
["HTTP/1.1 401 Authentication failed", '<h1>Authentication failed</h1>'],
not_expected=["crsf-token"])
# check expired certificate
m.start_cockpit(tls=True)
journal_cursor = self.machine.journal_cursor()
m.execute(f'! curl -ksS --cert {alice_cert_expired} --key {alice_key} https://localhost:9090/cockpit/login')
m.stop_cockpit()
wait(lambda: re.search(r'.*Invalid TLS peer certificate.* expired',
m.execute(f"journalctl -ocat --cursor '{journal_cursor}' SYSLOG_IDENTIFIER=cockpit-tls")),
tries=10)
self.allow_journal_messages('.*Invalid TLS peer certificate.* expired')
self.allow_journal_messages('.*TLS handshake failed: Error in the certificate verification.*')
# disallow password auth
m.write("/etc/cockpit/cockpit.conf", "[Basic]\naction = none\n", append=True)
do_test(alice_cert_key, ['HTTP/1.1 200 OK', '"csrf-token"'])
do_test(['-u', 'alice:foobar123'],
['HTTP/1.1 401 Authentication disabled', '<h1>Authentication disabled</h1>'],
not_expected=["crsf-token"])
# with validation API and without a CA, alice's cert fails
if has_validate_api:
m.execute("rm /etc/sssd/pki/sssd_auth_ca_db.pem")
self.allow_journal_messages("cockpit-session: Failed to map .* Invalid certificate provided")
self.allow_journal_messages("cockpit-session: Failed to map .* Certificate authority file not found")
do_test(alice_cert_key, ['HTTP/1.1 401 Authentication failed'])
# sssd-dbus not available
self.allow_journal_messages("cockpit-session: Failed to map .* Could not activate remote peer.*")
self.allow_journal_messages("cockpit-session: Failed to map .* Unit sssd-ifp.service is masked.")
m.execute("systemctl mask sssd-ifp; systemctl stop sssd-ifp")
do_test(alice_cert_key, ["HTTP/1.1 401 Authentication failed", '<h1>Authentication failed</h1>'],
not_expected=["crsf-token"])
m.execute("systemctl unmask sssd-ifp")
def testServer(self):
m = self.machine
b = self.browser
m.execute("useradd --create-home user")
m.execute("echo user:foobar | chpasswd")
m.start_cockpit()
# the quick succession of login/logout is too much for packagekit's brain
self.disable_preload("packagekit", "playground", "systemd")
def check_server(server, expect_fp_ack):
b.open('/')
b.wait_visible("#login")
# start with no known keys every time
b.eval_js("""window.localStorage.setItem("known_hosts", "{ }")""")
b.set_val('#login-user-input', "user")
b.set_val('#login-password-input', "foobar")
b.click("#show-other-login-options")
b.set_val("#server-field", server)
b.click("#login-button")
if expect_fp_ack:
b.wait_in_text("#hostkey-title", "New host")
b.wait_in_text("#hostkey-message-1", "for the first time")
b.click("#login-button")
b.wait_visible('#content')
b.enter_page('/system')
b.wait_visible('.system-information')
b.logout()
# by name
check_server("localhost", True)
# by name and port
check_server("localhost:22", True)
# by IPv4 address; 127.0.0.1 is treated specially as ignore_hostkey fallback, and does not require FP
check_server("127.0.0.1", False)
# by IPv4 address and port
check_server("127.0.0.1:22", False)
# by IPv6 address
check_server("::1", True)
# by IPv6 address and port
check_server("[::1]:22", True)
if __name__ == '__main__':
test_main()