From 66c2c6933c7720a6e46dd6c0b74aae47e593f819 Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Wed, 10 Apr 2024 23:21:32 -0400 Subject: [PATCH] roles: adding gpo management to samba role --- requirements.txt | 7 +- sssd_test_framework/roles/ad.py | 38 ++- sssd_test_framework/roles/generic.py | 71 ++++++ sssd_test_framework/roles/samba.py | 348 ++++++++++++++++++++++++++- 4 files changed, 444 insertions(+), 20 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9095ac9a..0f0bcb9f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,7 @@ -jc -pytest +jc~=1.25.1 +pytest~=8.0.1 python-ldap pytest-mh >= 1.0.17 + +PyYAML~=6.0.1 +Sphinx~=7.2.6 \ No newline at end of file diff --git a/sssd_test_framework/roles/ad.py b/sssd_test_framework/roles/ad.py index ebd30120..685161dd 100644 --- a/sssd_test_framework/roles/ad.py +++ b/sssd_test_framework/roles/ad.py @@ -1670,13 +1670,13 @@ def __init__(self, role: AD, name: str) -> None: self._search_base: str = f"cn=policies,cn=system,{self.role.host.naming_context}" """Group policy search base.""" - self._dn = self.get("DistinguishedName") + self._dn = self._get("DistinguishedName") """Group policy dn.""" - self._cn = self.get("CN") + self._cn = self._get("CN") """Group policy cn.""" - def get(self, key: str) -> str | None: + def _get(self, key: str) -> str | None: """ Get group policy attributes. @@ -1728,8 +1728,8 @@ def add(self) -> GPO: """ self.role.host.ssh.run(f'New-GPO -name "{self.name}"') - self._cn = self.get("CN") - self._dn = self.get("DistinguishedName") + self._cn = self._get("CN") + self._dn = self._get("DistinguishedName") self.role.host.ssh.run( rf""" @@ -1750,13 +1750,15 @@ def link( self, op: str | None = "New", target: str | None = None, - args: list[str] | str | None = None, + enforced: bool | None = False, + disabled: bool | None = False, + order: int | None = 0, ) -> GPO: """ - Link the group policy to the a target object inside the directory, a site, domain or an ou. + Link the group policy to the target object inside the directory, a site, domain or an ou. ..Note:: - The New and Set cmdlets are identical. To modify an an existing link, + The New and Set cmdlets are identical. To modify an existing link, change the $op parameter to "Set", i.e. to disable 'Enforced' ou_policy.link("Set", args=["-Enforced No"]) @@ -1765,13 +1767,23 @@ def link( :type op: str, optional :param target: Group policy target :type target: str, optional - :param args: Additional arguments - :type args: list[str] | None, optional + :param enforced: Enforced the policy + :type enforced: bool, optional + :param disabled: Disable the policy + :type disabled: bool, optional + :param order: Order number + :type order: int, optional :return: Group policy object :rtype: GPO """ - if args is None: - args = [] + args = [] + + if enforced is True: + args.extend("-Enforce Yes") + if disabled is True: + args.extend("-LinkEnabled No") + if order != 0: + args.extend(f"-Order {str(order)}") if isinstance(args, list): args = " ".join(args) @@ -1851,7 +1863,7 @@ def policy(self, logon_rights: dict[str, list[ADObject]], cfg: dict[str, Any] | This method does the remaining configuration of the group policy. It updates 'GptTmpl.inf' with security logon right keys with the SIDs of users and groups - objects. The *Remote* keys can be omitted, in which the corresponding keys values + objects. The *Remote* keys can be omitted, in which the corresponding keys value will then be used. To add users and groups to the policy, the SID must be used for the values. The diff --git a/sssd_test_framework/roles/generic.py b/sssd_test_framework/roles/generic.py index b1892f63..baa05838 100644 --- a/sssd_test_framework/roles/generic.py +++ b/sssd_test_framework/roles/generic.py @@ -24,6 +24,7 @@ "GenericAutomount", "GenericAutomountMap", "GenericAutomountKey", + "GenericGPO", ] @@ -288,6 +289,14 @@ def fqn(self, name: str) -> str: def firewall(self) -> Firewall: pass + @property + @abstractmethod + def gpo(self) -> GenericGPO: + """ + Generic GPO management. + """ + pass + class GenericUser(ABC, BaseObject): """ @@ -961,3 +970,65 @@ def dump(self) -> str: @abstractmethod def __str__(self) -> str: pass + + +class GenericGPO(ABC, object): + """ + Generic GPO management. + """ + + @abstractmethod + def get(self, key: str) -> str | None: + """ + Get GPO attribute. + + :param key: GPO key value. + :type key: str + :return: GPO key value. + :rtype: str | None + """ + pass + + @abstractmethod + def delete(self) -> None: + """ + Delete GPO. + """ + pass + + @abstractmethod + def add(self) -> GenericGPO: + """ + Add GPO. + """ + pass + + @abstractmethod + def link( + self, op: str | None = "New", target: str | None = None, args: list[str] | str | None = None + ) -> GenericGPO: + """ + Link GPO. + """ + pass + + @abstractmethod + def unlink(self) -> None: + """ + Unlink GPO. + """ + pass + + @abstractmethod + def permissions(self, target: str, permission_level: str, target_type: str | None = "Group") -> GenericGPO: + """ + Configure GPO permissions. + """ + pass + + @abstractmethod + def policy(self, logon_rights: dict[str, list[GenericUser]], cfg: dict[str, Any] | None = None) -> GenericGPO: + """ + GPO configuration. + """ + pass diff --git a/sssd_test_framework/roles/samba.py b/sssd_test_framework/roles/samba.py index 4333e366..ba3ade21 100644 --- a/sssd_test_framework/roles/samba.py +++ b/sssd_test_framework/roles/samba.py @@ -2,27 +2,30 @@ from __future__ import annotations +import configparser +import os.path from typing import Any, TypeAlias import ldap.modlist from pytest_mh.cli import CLIBuilderArgs from pytest_mh.ssh import SSHProcessResult -from sssd_test_framework.utils.ldap import LDAPRecordAttributes - from ..hosts.samba import SambaHost -from ..misc import attrs_parse, to_list_of_strings +from ..misc import attrs_parse, attrs_to_hash, to_list_of_strings +from ..utils.ldap import LDAPRecordAttributes from .base import BaseLinuxLDAPRole, BaseObject, DeleteAttribute from .ldap import LDAPAutomount, LDAPNetgroup, LDAPNetgroupMember, LDAPObject, LDAPOrganizationalUnit, LDAPSudoRule __all__ = [ "Samba", "SambaObject", + "SambaComputer", "SambaUser", "SambaGroup", "SambaOrganizationalUnit", "SambaAutomount", "SambaSudoRule", + "SambaGPO", ] @@ -143,7 +146,7 @@ def test_example(client: Client, samba: Samba): assert result.user.name == 'user-1' assert result.group.name == 'domain users' - :param name: User name. + :param name: Username. :type name: str :return: New user object. :rtype: SambaUser @@ -225,6 +228,63 @@ def test_example_netgroup(client: Client, samba: Samba): """ return SambaNetgroup(self, name, basedn) + def computer(self, name: str) -> SambaComputer: + """ + Get computer object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.AD) + def test_example(client: Client, samba: Samba): + # Create OU + ou = ad.ou("test").add().dn + # Move computer object + ad.computer(client.host.hostname.split(".")[0]).move(ou) + + client.sssd.start() + + :param name: Computer name. + :type name: str + :return: New computer object. + :rtype: ADComputer + """ + return SambaComputer(self, name) + + def gpo(self, name: str) -> SambaGPO: + """ + Get group policy object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.AD) + def test_ad__gpo_is_set_to_enforcing(client: Client, samba: Samba): + user = ad.user("user").add() + allow_user = ad.user("allow_user").add() + deny_user = ad.user("deny_user").add() + + ad.gpo("test policy").add().policy( + { + "SeInteractiveLogonRight": [allow_user, ad.group("Domain Admins")], + "SeRemoteInteractiveLogonRight": [allow_user, ad.group("Domain Admins")], + "SeDenyInteractiveLogonRight": [deny_user], + "SeDenyRemoteInteractiveLogonRight": [deny_user], + } + ).link() + + client.sssd.domain["ad_gpo_access_control"] = "enforcing" + client.sssd.start() + + assert client.auth.ssh.password(username="allow_user", password="Secret123") + assert not client.auth.ssh.password(username="user", password="Secret123") + assert not client.auth.ssh.password(username="deny_user", password="Secret123") + + :param name: Name of the GPO. + :type name: str + """ + return SambaGPO(self, name) + def ou(self, name: str, basedn: LDAPObject | str | None = None) -> SambaOrganizationalUnit: """ Get organizational unit object. @@ -313,7 +373,11 @@ def __init__(self, role: Samba, command: str, name: str) -> None: self.__dn: str | None = None - def _exec(self, op: str, args: list[str] | None = None, **kwargs) -> SSHProcessResult: + self.__sid: str | None = None + + self.__cn: str | None = None + + def _exec(self, op: str, args: list[str] | None = None, **kwargs: object) -> SSHProcessResult: """ Execute samba-tool command. @@ -412,6 +476,30 @@ def dn(self) -> str: self.__dn = obj.pop("dn")[0] return self.__dn + @property + def cn(self) -> str: + """ + Object's distinguished name. + """ + if self.__cn is not None: + return self.__cn + + obj = self.get(["cn"]) + self.__cn = obj.pop("cn")[0] + return self.__cn + + @property + def sid(self) -> str: + """ + Object's security identifier. + """ + if self.__sid is not None: + return self.__sid + + obj = self.get(["objectSid"]) + self.__sid = obj.pop("objectSid")[0] + return self.__sid + class SambaUser(SambaObject): """ @@ -679,6 +767,256 @@ def __get_member_args(self, members: list[SambaUser | SambaGroup]) -> list[str]: return [",".join([x.name for x in members])] +class SambaComputer(SambaObject): + """ + AD computer management. + """ + + def __init__(self, role: Samba, name: str) -> None: + """ + :param role: AD role object. + :type role: AD + :param name: Computer name. + :type name: str + """ + super().__init__(role, "Computer", name) + + def move(self, target: str) -> SambaComputer: + """ + Move a computer object. + + :param target: Target path. + :type target: str + :return: Self. + :rtype: SambaComputer + """ + if self.name.startswith("cn"): + _name = self.name.split(",")[0].split("=")[1] + self._exec("Move", [self.name, target]) + + return self + + +class SambaGPO(SambaObject): + """ + Group policy object management. + """ + + def __init__(self, role: Samba, name: str) -> None: + """ + :param name: GPO name, defaults to 'Domain Test Policy' + :type name: str, optional + """ + super().__init__(role, "gpo", name) + + self.target: str | None = None + """Group policy target.""" + + self.search_base: str = f"cn=policies,cn=system,{self.role.host.naming_context}" + """Group policy search base.""" + + self.credentials: CLIBuilderArgs = { + "username": (self.cli.option.VALUE, "Administrator"), + "password": (self.cli.option.VALUE, self.host.ssh_password), + } + + def _get(self, key: str) -> str | None: + """ + Get group policy key. + Output contains 'GENSEC' strings, which are removed + + :param key: Attribute. + :type key: str + :return: Key value. + :rtype: str + """ + result = [] + if self.name is not None: + for i in self.host.ssh.exec(["samba-tool", "gpo", "listall"]).stdout_lines: + if "GENSEC" not in i: + result.append(i) + + _result = attrs_parse(result, [key]) + + for k, v in _result.items(): + if k == key: + return v[0] + + return None + + def add(self) -> SambaGPO: + """ + Add a group policy object. + + :return: Samba group policy object + :rtype: SambaGPO + """ + + self._exec("create", self.cli.args(self.credentials)) + + self.__cn = self._get("GPO") + self.__dn = self._get("dn") + + return self + + def delete(self) -> None: + """ + Delete group policy object. + """ + self._exec("del", self.cli.args(self.credentials)) + + def link( + self, + target: str | None = None, + enforced: bool | None = False, + disabled: bool | None = False, + ) -> SambaGPO: + """ + Link the group policy to the target object inside the directory, a site, domain or an ou. + + ..Note:: + The New and Set cmdlets are identical. To modify an existing link, + change the $op parameter to "Set", i.e. to disable 'Enforced' + + ou_policy.link("Set", args=["-Enforced No"]) + + :param target: Group policy target, defaults to 'Default-First-Site-Name' + :type target: str, optional + :param enforced: Enforced the policy + :type enforced: bool, optional + :param disabled: Disable the policy + :type disabled: bool, optional + :return: Samba group policy object + :rtype: SambaGPO + """ + args = [] + + if enforced is True: + args.extend("--enforce") + if disabled is True: + args.extend("--disabled") + + if isinstance(args, list): + args = " ".join(args) + elif args is None: + args = "" + + if target is None and self.target is None: + self.target = "Default-First-Site-Name" + + if target is not None and self.target is None: + self.target = target + + self._exec("setlink", [self.target, [args], self.cli.args(self.credentials)]) + + return self + + def unlink(self) -> SambaGPO: + """ + Unlink the group policy from the target. + + :return: Samba group policy object + :rtype: SambaGPO + """ + self._exec("dellink", self.cli.args(self.credentials)) + + return self + + def permissions(self, level: str, target_type: str | None = "User") -> SambaGPO: + """ + Configure group policy object permissions. + + :param level: Permission level + :type level: str, values should be 'GpoRead | GpoApply | GpoEdit | GpoEditDeleteModifySecurity | None' + :param target_type: Target type, defaults to 'user' + :type target_type: str, optional, values should be 'user | group | computer' + :return: Samba group policy object + :rtype: SambaGPO + :TODO: Figure out dsacl and what permissions can we set on the GPO object + """ + self.role.host.ssh.run( + # f'Set-GPPermission -Guid "{self._cn}" -PermissionLevel {level} -Type "{target_type}" -Replace $True' + f"samba-tool dsacl" + ) + + return self + + def policy(self, logon_rights: dict[str, list[SambaObject]]) -> SambaGPO: + """ + Group policy configuration. + + This method does the remaining configuration of the group policy. It updates + 'GptTmpl.inf' with security logon right keys with the SIDs of users and groups + objects. The *Remote* keys can be omitted, in which the corresponding keys values + will then be used. + + To add users and groups to the policy, the SID must be used for the values. The + values need to be prefixed with an '*' and use a comma for a de-limiter, i.e. + `*SID1-2-3-4,*SID-5-6-7-8` + + Additionally, gPCMachineExtensionNames need to be updated in the directory so + the GPO is readable to the client. The value is a list of Client Side + Extensions (CSEs), that is an index of what part of the policy is pushed and + processed by the client. + + :param logon_rights: List of logon rights. + :type logon_rights: dict[str, list[SambaObject]] + :return: Samba Group policy object + :rtype: SambaGPO + """ + _path: str = os.path.join( + "var", + "lib", + "samba", + "sysvol", + self.role.domain, + "Policies", + self.__cn, + "MACHINE", + "Microsoft", + "Windows\\ NT", + "SecEdit", + "GptTmpl.inf", + ) + + _keys: list[str] = [ + "SeInteractiveLogonRight", + "SeRemoteInteractiveLogonRight", + "SeDenyInteractiveLogonRight", + "SeDenyRemoteInteractiveLogonRight", + ] + + for i in _keys: + if i not in logon_rights.keys() and i == "SeRemoteInteractiveLogonRight": + logon_rights[i] = logon_rights["SeInteractiveLogonRight"] + if i not in logon_rights.keys() and i == "SeDenyRemoteInteractiveLogonRight": + logon_rights[i] = logon_rights["SeDenyInteractiveLogonRight"] + + for i in _keys: + if i not in logon_rights.keys(): + raise KeyError(f"Expected {i} but got {logon_rights.keys()}") + + _logon_rights: dict[str, Any] = {} + for k, v in logon_rights.items(): + sids: list[str] = [] + for j in v: + sids.append(f"*{j.sid}") + _logon_rights = {**_logon_rights, **{k: ",".join(sids)}} + + config = configparser.ConfigParser(interpolation=None) + config["Unicode"] = {} + config["Unicode"]["Unicode"] = "yes" + config["Version"] = {} + config["Version"]["signature"] = "$CHICAGO$" + config["Version"]["Revision"] = "1" + config["Privilege Rights"] = {} + for k, v in _logon_rights.items(): + config["Privilege Rights"][k.strip()] = v.strip() + + self.host.fs.write(_path, config) + return self + + SambaOrganizationalUnit: TypeAlias = LDAPOrganizationalUnit[SambaHost, Samba] SambaAutomount: TypeAlias = LDAPAutomount[SambaHost, Samba] SambaSudoRule: TypeAlias = LDAPSudoRule[SambaHost, Samba, SambaUser, SambaGroup]