diff --git a/hq_superset/const.py b/hq_superset/const.py index e72bade..0bea95f 100644 --- a/hq_superset/const.py +++ b/hq_superset/const.py @@ -36,12 +36,6 @@ CAN_ADD_PERMISSION = "can_add" CAN_DELETE_PERMISSIONS = "can_delete" -WRITE_PERMISSIONS = [ - CAN_WRITE_PERMISSION, - CAN_EDIT_PERMISSION, - CAN_ADD_PERMISSION, - CAN_DELETE_PERMISSIONS, -] READ_ONLY_MENU_PERMISSIONS = { "Chart": READ_ONLY_PERMISSIONS, @@ -58,11 +52,3 @@ "Datasets": [MENU_ACCESS_PERMISSION], "ExploreFormDataRestApi": [CAN_READ_PERMISSION] } - -WRITE_MENU_PERMISSIONS = { - "Chart": WRITE_PERMISSIONS, - "Dataset": WRITE_PERMISSIONS, - "Dashboard": WRITE_PERMISSIONS, - "Datasource": WRITE_PERMISSIONS, - "ExploreFormDataRestApi": [CAN_WRITE_PERMISSION], -} diff --git a/hq_superset/tests/test_utils.py b/hq_superset/tests/test_utils.py index f94438b..c5fb9ae 100644 --- a/hq_superset/tests/test_utils.py +++ b/hq_superset/tests/test_utils.py @@ -2,7 +2,6 @@ from unittest.mock import patch from hq_superset.utils import get_column_dtypes, DomainSyncUtil -from hq_superset.const import READ_ONLY_MENU_PERMISSIONS, WRITE_MENU_PERMISSIONS from .base_test import SupersetTestCase from .const import TEST_DATASOURCE @@ -31,87 +30,36 @@ def test_doctests(): class TestDomainSyncUtil(SupersetTestCase): PLATFORM_ROLE_NAMES = ["Gamma", "sql_lab", "dataset_editor"] - @patch.object(DomainSyncUtil, "_domain_user_role_name") - def test_get_user_domain_role_for_permissions_read_only(self, domain_user_role_name_mock): - role_name = "test-domain_user_1" - domain_user_role_name_mock.return_value = role_name - security_manager = self.app.appbuilder.sm - - role = DomainSyncUtil(security_manager)._get_user_domain_role_for_permissions( - "test-domain", {"can_read": True, "can_write": False} - ) - assert role.name == role_name - - expected_permissions_count = 0 - for view_menu_name, permissions_names in READ_ONLY_MENU_PERMISSIONS.items(): - for permission_name in permissions_names: - expected_permissions_count += 1 - pv = security_manager.find_permission_view_menu(permission_name, view_menu_name) - assert pv in role.permissions - - assert len(role.permissions) == expected_permissions_count - - @patch.object(DomainSyncUtil, "_domain_user_role_name") - def test_get_user_domain_role_for_permissions_write_only(self, domain_user_role_name_mock): - role_name = "test-domain_user_1" - domain_user_role_name_mock.return_value = role_name - security_manager = self.app.appbuilder.sm - - role = DomainSyncUtil(security_manager)._get_user_domain_role_for_permissions( - "test-domain", {"can_read": False, "can_write": True} - ) - assert role.name == role_name - - expected_permissions_count = 0 - for view_menu_name, permissions_names in WRITE_MENU_PERMISSIONS.items(): - for permission_name in permissions_names: - expected_permissions_count += 1 - pv = security_manager.find_permission_view_menu(permission_name, view_menu_name) - assert pv in role.permissions - - assert len(role.permissions) == expected_permissions_count - - @patch.object(DomainSyncUtil, "_domain_user_role_name") @patch.object(DomainSyncUtil, "_get_domain_access") - def test_admin_additional_domain_roles(self, get_domain_access_mock, domain_user_role_name_mock): - domain_user_role_name = "test-domain_user_1" - domain_user_role_name_mock.return_value = domain_user_role_name - + def test_gamma_role_assigned_for_edit_permissions(self, get_domain_access_mock): security_manager = self.app.appbuilder.sm self._ensure_platform_roles_exist(security_manager) get_domain_access_mock.return_value = self._to_permissions_response( can_write=True, can_read=True, - roles=["Gamma", "sql_lab", "dataset_editor"], + roles=[], ) - domain_user_role, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") - - assert domain_user_role.name == "test-domain_user_1" - assert sorted([role.name for role in platform_roles]) == sorted(self.PLATFORM_ROLE_NAMES) + additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") + assert len(additional_roles) == 1 + assert additional_roles[0].name == "Gamma" - @patch.object(DomainSyncUtil, "_domain_user_role_name") @patch.object(DomainSyncUtil, "_get_domain_access") - def test_limited_additional_domain_roles(self, get_domain_access_mock, domain_user_role_name_mock): - domain_user_role_name = "test-domain_user_1" - domain_user_role_name_mock.return_value = domain_user_role_name - + def test_no_roles_assigned_without_at_least_read_permission(self, get_domain_access_mock): security_manager = self.app.appbuilder.sm self._ensure_platform_roles_exist(security_manager) get_domain_access_mock.return_value = self._to_permissions_response( can_write=False, - can_read=True, - roles=["sql_lab"], + can_read=False, + roles=["sql_lab", "dataset_editor"], ) - _, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") - - assert len(platform_roles) == 1 - assert platform_roles[0].name == "sql_lab" + additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") + assert not additional_roles @patch.object(DomainSyncUtil, "_domain_user_role_name") @patch.object(DomainSyncUtil, "_get_domain_access") - def test_no_access_domain_roles(self, get_domain_access_mock, domain_user_role_name_mock): + def test_read_permission_gives_custom_domain_role(self, get_domain_access_mock, domain_user_role_name_mock): domain_user_role_name = "test-domain_user_1" domain_user_role_name_mock.return_value = domain_user_role_name @@ -120,12 +68,12 @@ def test_no_access_domain_roles(self, get_domain_access_mock, domain_user_role_n get_domain_access_mock.return_value = self._to_permissions_response( can_write=False, - can_read=False, + can_read=True, roles=[], ) - domain_user_role, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") - assert domain_user_role is None - assert platform_roles == [] + additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") + assert len(additional_roles) == 1 + assert additional_roles[0].name == domain_user_role_name @patch.object(DomainSyncUtil, "_domain_user_role_name") @patch.object(DomainSyncUtil, "_get_domain_access") @@ -140,11 +88,10 @@ def test_permissions_change_updates_user_role(self, get_domain_access_mock, doma get_domain_access_mock.return_value = self._to_permissions_response( can_write=False, can_read=True, - roles=["sql_lab"], + roles=[], ) - _, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") - - assert platform_roles[0].name == "sql_lab" + additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") + assert additional_roles[0].name == domain_user_role_name # user has no access get_domain_access_mock.return_value = self._to_permissions_response( @@ -152,9 +99,8 @@ def test_permissions_change_updates_user_role(self, get_domain_access_mock, doma can_read=False, roles=[], ) - domain_user_role, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") - assert domain_user_role is None - assert platform_roles == [] + additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain") + assert not additional_roles def _ensure_platform_roles_exist(self, sm): for role_name in self.PLATFORM_ROLE_NAMES: diff --git a/hq_superset/utils.py b/hq_superset/utils.py index dc52734..808f5c9 100644 --- a/hq_superset/utils.py +++ b/hq_superset/utils.py @@ -24,7 +24,6 @@ CAN_READ_PERMISSION, CAN_WRITE_PERMISSION, READ_ONLY_MENU_PERMISSIONS, - WRITE_MENU_PERMISSIONS, ) from .exceptions import DatabaseMissing @@ -144,11 +143,11 @@ def sync_domain_role(self, domain): hq_user_role = self._ensure_hq_user_role() domain_schema_role = self._create_domain_role(domain) - domain_user_role, platform_roles = self._get_additional_user_roles(domain) - if not domain_user_role and not platform_roles: + additional_roles = self._get_additional_user_roles(domain) + if not additional_roles: return False - current_user.roles = [hq_user_role, domain_schema_role, domain_user_role] + platform_roles + current_user.roles = [hq_user_role, domain_schema_role] + additional_roles self.sm.get_session.add(current_user) self.sm.get_session.commit() @@ -203,11 +202,25 @@ def _ensure_domain_role_created(self, domain): def _get_additional_user_roles(self, domain): domain_permissions, platform_roles_names = self._get_domain_access(domain) - if self._user_has_no_access(domain_permissions, platform_roles_names): - return None, [] + if self._user_has_no_access(domain_permissions): + return [] - domain_user_role = self._get_user_domain_role_for_permissions(domain, domain_permissions) - return domain_user_role, self._get_platform_roles(platform_roles_names) + additional_roles = [] + if domain_permissions[CAN_WRITE_PERMISSION]: + platform_roles_names.append( + 'gamma', + ) + elif domain_permissions[CAN_READ_PERMISSION]: + additional_roles.append( + self._get_user_domain_read_only_role(domain) + ) + + if platform_roles_names: + additional_roles.extend( + self._get_platform_roles(platform_roles_names) + ) + + return additional_roles @staticmethod def _get_domain_access(domain): @@ -232,8 +245,8 @@ def _get_domain_access(domain): return permissions, roles @staticmethod - def _user_has_no_access(permissions: dict, platform_roles: list): - user_has_access = any([permissions[p] for p in permissions]) or platform_roles + def _user_has_no_access(permissions: dict): + user_has_access = any([permissions[p] for p in permissions]) return not user_has_access def _get_platform_roles(self, roles_names): @@ -245,21 +258,9 @@ def _get_platform_roles(self, roles_names): platform_roles.append(role) return platform_roles - def _get_user_domain_role_for_permissions(self, domain, permissions): + def _get_user_domain_read_only_role(self, domain): role = self._get_domain_user_role(domain, current_user) - role_permissions = [] - - if permissions[CAN_READ_PERMISSION]: - role_permissions.extend( - self._read_permissions_for_user - ) - - if permissions[CAN_WRITE_PERMISSION]: - role_permissions.extend( - self._write_permissions_for_user - ) - - self.sm.set_role_permissions(role, role_permissions) + self.sm.set_role_permissions(role, self._read_permissions_for_user) return role def _get_domain_user_role(self, domain, user): @@ -275,12 +276,6 @@ def _read_permissions_for_user(self): menu_permissions=READ_ONLY_MENU_PERMISSIONS ) - @property - def _write_permissions_for_user(self): - return self._get_view_menu_permissions( - menu_permissions=WRITE_MENU_PERMISSIONS - ) - def _get_view_menu_permissions(self, menu_permissions): """ This method returns combinations for all view menus and permissions