From 757a7bfe6b83da0093728722027f56d49edef0b4 Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Wed, 23 Aug 2023 14:31:15 -0400 Subject: [PATCH] pam: adding support to manage pam modules; pam_access and pam_faillock --- docs/guides/index.rst | 1 + docs/guides/pam.rst | 60 ++++++ sssd_test_framework/roles/base.py | 6 + sssd_test_framework/utils/pam.py | 290 +++++++++++++++++++++++++++++ sssd_test_framework/utils/sssd.py | 16 ++ sssd_test_framework/utils/tools.py | 13 ++ 6 files changed, 386 insertions(+) create mode 100644 docs/guides/pam.rst create mode 100644 sssd_test_framework/utils/pam.py diff --git a/docs/guides/index.rst b/docs/guides/index.rst index 80b489e2..169a0b05 100644 --- a/docs/guides/index.rst +++ b/docs/guides/index.rst @@ -7,6 +7,7 @@ How to guides check-sssd-functionality ssh-client sss_override + pam testing-authentication testing-autofs testing-identity diff --git a/docs/guides/pam.rst b/docs/guides/pam.rst new file mode 100644 index 00000000..0fbce792 --- /dev/null +++ b/docs/guides/pam.rst @@ -0,0 +1,60 @@ +Pluggable Authentication Modules / PAM +###################################### + +Class :class:`sssd_test_framework.utils.pam.PAMUtils` provides +an API to manage PAM module configuration; Currently pam_access and pam_faillock is supported. + +pam_access +========== +A module for logdaemon style login access control. This is managed by /etc/security/access.conf. + +.. code-block:: python + :caption: Example PAM Access usage + @pytest.mark.topology(KnownTopologyGroup.AnyProvider) + def test_example(client: Client, provider: GenericProvider): + # Add users + provider.user("user-1").add() + provider.user("user-2").add() + + # Add access rules + access = client.pam.access() + access.config_set([{"access": "+", "user": "user-1", "origin": "ALL"}, + {"access": "-", "user": "user-2", "origin": "ALL"}]) + access.config_write() + + client.sssd.authselect.enable_feature(["with-pamaccess"]) + client.sssd.domain["use_fully_qualified_names"] = "False" + client.sssd.start() + + assert client.auth.ssh.password("user-1", "Secret123") + assert not client.auth.ssh.password("user-2", "Secret123") + +pam_faillock +============ +A module that counts authentication failures. This is configured in /etc/security/faillock.conf. + +.. code-block:: python + :caption: Example PAM Faillock usage + + @pytest.mark.topology(KnownTopologyGroup.AnyProvider) + def test_example(client: Client, provider: GenericProvider): + # Add user + provider.user("user-1").add() + faillock = client.pam.faillock() + faillock.config_set({"deny": "3", "unlock_time": "300"}) + faillock.config_write() + + client.sssd.common.pam(["with-faillock"]) + client.sssd.start() + + assert client.auth.ssh.password("user-1", "Secret123") + + for i in range(3): + client.auth.ssh.password("user-1", "BadSecret123") + + assert not client.auth.ssh.password("user-1", "Secret123") + + # Reset user lockout + client.tools.faillock(["--user", "user-1", "--reset"]) + + assert client.auth.ssh.password("user-1", "Secret123") diff --git a/sssd_test_framework/roles/base.py b/sssd_test_framework/roles/base.py index 86acf97f..a457dcd7 100644 --- a/sssd_test_framework/roles/base.py +++ b/sssd_test_framework/roles/base.py @@ -15,6 +15,7 @@ from ..utils.authentication import AuthenticationUtils from ..utils.authselect import AuthselectUtils from ..utils.ldap import LDAPUtils +from ..utils.pam import PAMUtils from ..utils.tools import LinuxToolsUtils HostType = TypeVar("HostType", bound=BaseHost) @@ -139,6 +140,11 @@ def __init__(self, *args, **kwargs) -> None: Authentication helpers. """ + self.pam: PAMUtils = PAMUtils(self.host, self.fs) + """ + Configuring various PAM modules. + """ + class BaseLinuxLDAPRole(BaseLinuxRole[LDAPHostType]): """ diff --git a/sssd_test_framework/utils/pam.py b/sssd_test_framework/utils/pam.py new file mode 100644 index 00000000..bb48d79e --- /dev/null +++ b/sssd_test_framework/utils/pam.py @@ -0,0 +1,290 @@ +""""PAM Tools.""" + +from __future__ import annotations + +import re + +from pytest_mh import MultihostHost, MultihostUtility +from pytest_mh.utils.fs import LinuxFileSystem + +__all__ = [ + "PAMUtils", + "PAMAccess", + "PAMFaillock", +] + + +class PAMUtils(MultihostUtility[MultihostHost]): + """ + Configuring various PAM modules + """ + + def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None: + """ + :param host: Remote host instance + :type host: MultihostHost + :param fs: Linux file system instance + :type fs: LinuxFileSystem + """ + super().__init__(host) + + self.fs: LinuxFileSystem = fs + + def access(self, file: str = "/etc/security/access.conf") -> PAMAccess: + """ + :param file: PAM Access file name. + :type file: str + :return: PAM Access object + :rtype: PAMAccess + """ + return PAMAccess(self, file, self.host, self.fs) + + def faillock(self, file: str = "/etc/security/faillock.conf") -> PAMFaillock: + """ + :param file: PAM Faillock file name. + :type file: str + :return: PAM Faillock object + :rtype: PAMFaillock + """ + return PAMFaillock(self, file, self.host, self.fs) + + +class PAMAccess(PAMUtils): + """ + Management of PAM Access on the client host. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopologyGroup.AnyProvider) + def test_example(client: Client, provider: GenericProvider): + # Add users + provider.user("user-1").add() + provider.user("user-2").add() + + # Add rule to permit "user-1" and deny "user-2" + access = client.pam.access() + access.config_set([{"access": "+", "user": "user-1", "origin": "ALL"}, + {"access": "-", "user": "user-2", "origin": "ALL"}]) + access.config_write() + + client.sssd.authselect.enable_feature(["with-pamaccess"]) + client.sssd.start() + + # Check the results + assert client.auth.ssh.password("user-1", "Secret123") + assert not client.auth.ssh.password("user-2", "Secret123") + """ + + def __init__(self, util: PAMUtils, file: str, host: MultihostHost, fs: LinuxFileSystem) -> None: + """ + :param util: PAMUtils utility object + :type util: PAMUtils + :param file: Configuration file + :type file: str + :param host: Multihost object + :type host: MultihostHost + :param fs: LinuxFileSystem object + :type fs: LinuxFileSystem + :param file: File name of access file + :type file: str, optional + """ + super().__init__(host, fs) + self._changed: bool = False + + self.util: PAMUtils = util + self.file: str = file + self.path: str = "/files" + self.file + self.args: str = f'--noautoload --transform "Access.lns incl {self.file}"' + self.cmd: str = "" + + self._backup = f"{self.file}.bak.wfw" + + def setup_when_used(self): + super().setup_when_used() + self.logger.info(f"Creating backup at '{self._backup}' on {self.host.hostname})") + self.util.host.ssh.run(f"cp -rf {self.file} {self._backup}") + + def teardown_when_used(self): + super().teardown_when_used() + self.logger.info(f"Restoring backup at {self._backup} on {self.host.hostname}") + self.util.host.ssh.run(f"mv {self._backup} {self.file}") + + def config_write(self): + """ + Write access configuration. + :return: None + """ + self.util.logger.info(f"Saving Augeas changes to {self.file} on {self.util.host.hostname}") + self.util.host.ssh.run(f'echo -e "{self.cmd} save\n" | augtool --echo {self.args}') + + def config_read(self) -> str: + """ + Read access file as Augeas tree. + :return: PAM access configuration + :rtype: str + """ + self.util.logger.info(f"Reading Augeas tree {self.file} on {self.util.host.hostname}") + result = self.util.host.ssh.run(f"augtool {self.args} print {self.path}") + + return result.stdout + + def config_delete(self, value: list[dict[str, str]]) -> None: + """ + Delete access configuration. + :param value: Configuration. + :type value: list[dict[str, str]] + :return: None + """ + if value is None: + raise ValueError("No data!") + + index = 1 + for i in self.util.host.ssh.run(f"augtool {self.args} match {self.path}/*").stdout_lines: + node = re.sub("\\d", str(index), i.split("=")[0].strip()) + leaf = self.util.host.ssh.run(f"augtool {self.args} match {node}/*").stdout_lines + access = i.split("=")[1].strip() + user = leaf[0].split("=")[1].strip() + origin = leaf[1].split("=")[1].strip() + match = {"access": access, "user": user, "origin": origin} + for y in value: + if match == y: + self.util.logger.info(f"Deleting node in Augeas tree on {self.util.host.hostname}") + self.util.host.ssh.run(f"augtool {self.args} rm {node}") + else: + index = +index + + def config_set(self, value: list[dict[str, str]]) -> None: + """ + Configure access configuration file. + :param value: Access rule + :type value: list[list[str]] + :return: None + """ + if value is None: + raise ValueError("No data!") + + count = 1 + for i in value: + self.cmd = self.cmd + f"set {self.path}/access[{count}] " + i["access"] + "\n" + self.cmd = self.cmd + f"set {self.path}/access[{count}]/user " + i["user"] + "\n" + self.cmd = self.cmd + f"set {self.path}/access[{count}]/origin " + i["origin"] + "\n" + count = +count + + +class PAMFaillock(PAMUtils): + """ + Management of PAM Faillock on the client host. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopologyGroup.AnyProvider) + def test_example(client: Client, provider: GenericProvider): + # Add user + provider.user("user-1").add() + + # Setup faillock + faillock = client.pam.faillock() + faillock.config_set({"deny": "3", "unlock_time": "300"}) + faillock.config_write() + + client.sssd.common.pam(["with-faillock"]) + + # Start SSSD + client.sssd.start() + + # Check the results + assert client.auth.ssh.password("user-1", "Secret123") + + # Three failed login attempts + for i in range(3): + assert not client.auth.ssh.password("user-1", "bad_password") + + assert not client.auth.ssh.password("user-1", "Secret123") + + # Reset user lockout + client.pam.faillock("user-1").reset + assert client.auth.ssh.password("user-1", "Secret123") + """ + + def __init__(self, util: PAMUtils, file: str, host: MultihostHost, fs: LinuxFileSystem) -> None: + """ + :param util: PAMUtils object + :type util: PAMUtils + :param file: Configuration file + :type file: str + :param host: MultihostHost object + :type host: MultihostHost + :param fs: LinuxFileSystem object + :type fs: LinuxFileSystem + :param file: Faillock configuration file + :type file: str, optional + """ + super().__init__(host, fs) + self._changed: bool = False + + self.util: PAMUtils = util + self.file: str = file + self.path: str = "/files" + self.file + self.args: str = f'--noautoload --transform "Simplevars.lns incl {self.file}"' + self.cmd: str = "" + + self._backup = f"{self.file}.bak.wfw" + + def setup_when_used(self): + super().setup_when_used() + self.logger.info(f"Creating backup at '{self._backup}' on {self.host.hostname})") + self.util.host.ssh.run(f"cp -rf {self.file} {self._backup}") + + def teardown_when_used(self): + super().teardown_when_used() + self.logger.info(f"Restoring backup at {self._backup} on {self.host.hostname}") + self.util.host.ssh.run(f"mv {self._backup} {self.file}") + + def config_write(self) -> None: + """ + Write faillock configuration. + :return: Self + :rtype: PAMFaillock + """ + self.util.logger.info(f"Saving Augeas changes to {self.file} on {self.util.host.hostname}") + self.util.host.ssh.run(f'echo -e "{self.cmd}save\n" | augtool --echo {self.args}') + + def config_read(self) -> str: + """ + Read faillock configuration as augeas tree. + :return: PAM access configuration + :rtype: str + """ + self.util.logger.info(f"Reading Augeas tree {self.file} on {self.util.host.hostname}") + result = self.util.host.ssh.run(f"augtool {self.args} print {self.path}").stdout + + return result + + def config_delete(self, value: dict[str, str]) -> None: + """ + Delete faillock configuration. + :param value: Configuration. + :type value: dict[str, str] + :return: None + """ + if value is None: + raise ValueError("No data!") + + self.util.logger.info(f"Deleting node in Augeas tree on {self.util.host.hostname}") + for k, v in value.items(): + self.util.host.ssh.run(f"augtool {self.args} --autosave rm {self.path}/{k} {v}") + + def config_set(self, value: dict[str, str]) -> None: + """ + Set faillock configuration. + :param value: Configuration parameter(s) and value(s). + :type value: dict[str, str] + :return: None + """ + if value is None: + raise ValueError("No data!") + + for k, v in value.items(): + self.cmd = self.cmd + f"set {self.path}/{k} {v}\n" diff --git a/sssd_test_framework/utils/sssd.py b/sssd_test_framework/utils/sssd.py index e2a707d4..26afdd83 100644 --- a/sssd_test_framework/utils/sssd.py +++ b/sssd_test_framework/utils/sssd.py @@ -858,3 +858,19 @@ def proxy( self.sssd.dom(domain).clear() self.sssd.dom(domain).update(options) + + def pam(self, features: list[str] | None = None) -> None: + """ + Configure SSSD with pam. + + #. Select authselect sssd profile + #. Enable pam responder in sssd profile + + :param features: list of authselect features + :type features: list[str], optional + """ + if features is None: + features = [] + + self.sssd.authselect.select("sssd", features) + self.sssd.enable_responder("pam") diff --git a/sssd_test_framework/utils/tools.py b/sssd_test_framework/utils/tools.py index 177fe2e1..b4b6a7f5 100644 --- a/sssd_test_framework/utils/tools.py +++ b/sssd_test_framework/utils/tools.py @@ -574,6 +574,19 @@ def dnf(self, args: list[Any] | None = None) -> SSHProcessResult: return command + def faillock(self, args: list[Any]) -> SSHProcessResult: + """ + Execute faillock command. + :param args: Arguments to ``faillock``, defaults to None + :type args: list[Any] | None, optional + :return: SSH Process result + :rtype: SSHProcessResult + """ + if args is None: + args = [] + + return self.host.ssh.exec(["faillock", *args]) + def teardown(self): """ Revert all changes.