Skip to content

Commit

Permalink
Update user write permissions
Browse files Browse the repository at this point in the history
1. Users with 'edit' permissions gets the superset Gamma role

2. If HQ user don't have view permissions, don't allow access
  • Loading branch information
Charl1996 committed Oct 28, 2024
1 parent 5efceb6 commit cb57fa6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 118 deletions.
14 changes: 0 additions & 14 deletions hq_superset/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@
CAN_ADD_PERMISSION = "can_add"
CAN_DELETE_PERMISSIONS = "can_delete"

WRITE_PERMISSIONS = [
CAN_WRITE_PERMISSION,
CAN_EDIT_PERMISSION,
CAN_ADD_PERMISSION,
CAN_DELETE_PERMISSIONS,
]

READ_ONLY_MENU_PERMISSIONS = {
"Chart": READ_ONLY_PERMISSIONS,
Expand All @@ -58,11 +52,3 @@
"Datasets": [MENU_ACCESS_PERMISSION],
"ExploreFormDataRestApi": [CAN_READ_PERMISSION]
}

WRITE_MENU_PERMISSIONS = {
"Chart": WRITE_PERMISSIONS,
"Dataset": WRITE_PERMISSIONS,
"Dashboard": WRITE_PERMISSIONS,
"Datasource": WRITE_PERMISSIONS,
"ExploreFormDataRestApi": [CAN_WRITE_PERMISSION],
}
94 changes: 20 additions & 74 deletions hq_superset/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from unittest.mock import patch

from hq_superset.utils import get_column_dtypes, DomainSyncUtil
from hq_superset.const import READ_ONLY_MENU_PERMISSIONS, WRITE_MENU_PERMISSIONS
from .base_test import SupersetTestCase
from .const import TEST_DATASOURCE

Expand Down Expand Up @@ -31,87 +30,36 @@ def test_doctests():
class TestDomainSyncUtil(SupersetTestCase):
PLATFORM_ROLE_NAMES = ["Gamma", "sql_lab", "dataset_editor"]

@patch.object(DomainSyncUtil, "_domain_user_role_name")
def test_get_user_domain_role_for_permissions_read_only(self, domain_user_role_name_mock):
role_name = "test-domain_user_1"
domain_user_role_name_mock.return_value = role_name
security_manager = self.app.appbuilder.sm

role = DomainSyncUtil(security_manager)._get_user_domain_role_for_permissions(
"test-domain", {"can_read": True, "can_write": False}
)
assert role.name == role_name

expected_permissions_count = 0
for view_menu_name, permissions_names in READ_ONLY_MENU_PERMISSIONS.items():
for permission_name in permissions_names:
expected_permissions_count += 1
pv = security_manager.find_permission_view_menu(permission_name, view_menu_name)
assert pv in role.permissions

assert len(role.permissions) == expected_permissions_count

@patch.object(DomainSyncUtil, "_domain_user_role_name")
def test_get_user_domain_role_for_permissions_write_only(self, domain_user_role_name_mock):
role_name = "test-domain_user_1"
domain_user_role_name_mock.return_value = role_name
security_manager = self.app.appbuilder.sm

role = DomainSyncUtil(security_manager)._get_user_domain_role_for_permissions(
"test-domain", {"can_read": False, "can_write": True}
)
assert role.name == role_name

expected_permissions_count = 0
for view_menu_name, permissions_names in WRITE_MENU_PERMISSIONS.items():
for permission_name in permissions_names:
expected_permissions_count += 1
pv = security_manager.find_permission_view_menu(permission_name, view_menu_name)
assert pv in role.permissions

assert len(role.permissions) == expected_permissions_count

@patch.object(DomainSyncUtil, "_domain_user_role_name")
@patch.object(DomainSyncUtil, "_get_domain_access")
def test_admin_additional_domain_roles(self, get_domain_access_mock, domain_user_role_name_mock):
domain_user_role_name = "test-domain_user_1"
domain_user_role_name_mock.return_value = domain_user_role_name

def test_gamma_role_assigned_for_edit_permissions(self, get_domain_access_mock):
security_manager = self.app.appbuilder.sm
self._ensure_platform_roles_exist(security_manager)

get_domain_access_mock.return_value = self._to_permissions_response(
can_write=True,
can_read=True,
roles=["Gamma", "sql_lab", "dataset_editor"],
roles=[],
)
domain_user_role, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")

assert domain_user_role.name == "test-domain_user_1"
assert sorted([role.name for role in platform_roles]) == sorted(self.PLATFORM_ROLE_NAMES)
additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")
assert len(additional_roles) == 1
assert additional_roles[0].name == "Gamma"

@patch.object(DomainSyncUtil, "_domain_user_role_name")
@patch.object(DomainSyncUtil, "_get_domain_access")
def test_limited_additional_domain_roles(self, get_domain_access_mock, domain_user_role_name_mock):
domain_user_role_name = "test-domain_user_1"
domain_user_role_name_mock.return_value = domain_user_role_name

def test_no_roles_assigned_without_at_least_read_permission(self, get_domain_access_mock):
security_manager = self.app.appbuilder.sm
self._ensure_platform_roles_exist(security_manager)

get_domain_access_mock.return_value = self._to_permissions_response(
can_write=False,
can_read=True,
roles=["sql_lab"],
can_read=False,
roles=["sql_lab", "dataset_editor"],
)
_, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")

assert len(platform_roles) == 1
assert platform_roles[0].name == "sql_lab"
additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")
assert not additional_roles

@patch.object(DomainSyncUtil, "_domain_user_role_name")
@patch.object(DomainSyncUtil, "_get_domain_access")
def test_no_access_domain_roles(self, get_domain_access_mock, domain_user_role_name_mock):
def test_read_permission_gives_custom_domain_role(self, get_domain_access_mock, domain_user_role_name_mock):
domain_user_role_name = "test-domain_user_1"
domain_user_role_name_mock.return_value = domain_user_role_name

Expand All @@ -120,12 +68,12 @@ def test_no_access_domain_roles(self, get_domain_access_mock, domain_user_role_n

get_domain_access_mock.return_value = self._to_permissions_response(
can_write=False,
can_read=False,
can_read=True,
roles=[],
)
domain_user_role, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")
assert domain_user_role is None
assert platform_roles == []
additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")
assert len(additional_roles) == 1
assert additional_roles[0].name == domain_user_role_name

@patch.object(DomainSyncUtil, "_domain_user_role_name")
@patch.object(DomainSyncUtil, "_get_domain_access")
Expand All @@ -140,21 +88,19 @@ def test_permissions_change_updates_user_role(self, get_domain_access_mock, doma
get_domain_access_mock.return_value = self._to_permissions_response(
can_write=False,
can_read=True,
roles=["sql_lab"],
roles=[],
)
_, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")

assert platform_roles[0].name == "sql_lab"
additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")
assert additional_roles[0].name == domain_user_role_name

# user has no access
get_domain_access_mock.return_value = self._to_permissions_response(
can_write=False,
can_read=False,
roles=[],
)
domain_user_role, platform_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")
assert domain_user_role is None
assert platform_roles == []
additional_roles = DomainSyncUtil(security_manager)._get_additional_user_roles("test-domain")
assert not additional_roles

def _ensure_platform_roles_exist(self, sm):
for role_name in self.PLATFORM_ROLE_NAMES:
Expand Down
55 changes: 25 additions & 30 deletions hq_superset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
CAN_READ_PERMISSION,
CAN_WRITE_PERMISSION,
READ_ONLY_MENU_PERMISSIONS,
WRITE_MENU_PERMISSIONS,
)
from .exceptions import DatabaseMissing

Expand Down Expand Up @@ -144,11 +143,11 @@ def sync_domain_role(self, domain):
hq_user_role = self._ensure_hq_user_role()
domain_schema_role = self._create_domain_role(domain)

domain_user_role, platform_roles = self._get_additional_user_roles(domain)
if not domain_user_role and not platform_roles:
additional_roles = self._get_additional_user_roles(domain)
if not additional_roles:
return False

current_user.roles = [hq_user_role, domain_schema_role, domain_user_role] + platform_roles
current_user.roles = [hq_user_role, domain_schema_role] + additional_roles

self.sm.get_session.add(current_user)
self.sm.get_session.commit()
Expand Down Expand Up @@ -203,11 +202,25 @@ def _ensure_domain_role_created(self, domain):

def _get_additional_user_roles(self, domain):
domain_permissions, platform_roles_names = self._get_domain_access(domain)
if self._user_has_no_access(domain_permissions, platform_roles_names):
return None, []
if self._user_has_no_access(domain_permissions):
return []

domain_user_role = self._get_user_domain_role_for_permissions(domain, domain_permissions)
return domain_user_role, self._get_platform_roles(platform_roles_names)
additional_roles = []
if domain_permissions[CAN_WRITE_PERMISSION]:
platform_roles_names.append(
'gamma',
)
elif domain_permissions[CAN_READ_PERMISSION]:
additional_roles.append(
self._get_user_domain_read_only_role(domain)
)

if platform_roles_names:
additional_roles.extend(
self._get_platform_roles(platform_roles_names)
)

return additional_roles

@staticmethod
def _get_domain_access(domain):
Expand All @@ -232,8 +245,8 @@ def _get_domain_access(domain):
return permissions, roles

@staticmethod
def _user_has_no_access(permissions: dict, platform_roles: list):
user_has_access = any([permissions[p] for p in permissions]) or platform_roles
def _user_has_no_access(permissions: dict):
user_has_access = any([permissions[p] for p in permissions])
return not user_has_access

def _get_platform_roles(self, roles_names):
Expand All @@ -245,21 +258,9 @@ def _get_platform_roles(self, roles_names):
platform_roles.append(role)
return platform_roles

def _get_user_domain_role_for_permissions(self, domain, permissions):
def _get_user_domain_read_only_role(self, domain):
role = self._get_domain_user_role(domain, current_user)
role_permissions = []

if permissions[CAN_READ_PERMISSION]:
role_permissions.extend(
self._read_permissions_for_user
)

if permissions[CAN_WRITE_PERMISSION]:
role_permissions.extend(
self._write_permissions_for_user
)

self.sm.set_role_permissions(role, role_permissions)
self.sm.set_role_permissions(role, self._read_permissions_for_user)
return role

def _get_domain_user_role(self, domain, user):
Expand All @@ -275,12 +276,6 @@ def _read_permissions_for_user(self):
menu_permissions=READ_ONLY_MENU_PERMISSIONS
)

@property
def _write_permissions_for_user(self):
return self._get_view_menu_permissions(
menu_permissions=WRITE_MENU_PERMISSIONS
)

def _get_view_menu_permissions(self, menu_permissions):
"""
This method returns combinations for all view menus and permissions
Expand Down

0 comments on commit cb57fa6

Please sign in to comment.