Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish refactor of DAG resource name helper #15511

Merged
merged 1 commit into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,17 +582,17 @@ def _sync_perm_for_dag(self, dag, session: Optional[Session] = None):
"""Sync DAG specific permissions, if necessary"""
from flask_appbuilder.security.sqla import models as sqla_models

from airflow.security.permissions import DAG_PERMS, permission_name_for_dag
from airflow.security.permissions import DAG_PERMS, resource_name_for_dag

def needs_perm_views(dag_id: str) -> bool:
view_menu_name = permission_name_for_dag(dag_id)
dag_resource_name = resource_name_for_dag(dag_id)
for permission_name in DAG_PERMS:
if not (
session.query(sqla_models.PermissionView)
.join(sqla_models.Permission)
.join(sqla_models.ViewMenu)
.filter(sqla_models.Permission.name == permission_name)
.filter(sqla_models.ViewMenu.name == view_menu_name)
.filter(sqla_models.ViewMenu.name == dag_resource_name)
.one_or_none()
):
return True
Expand Down
4 changes: 2 additions & 2 deletions airflow/security/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
DAG_PERMS = {ACTION_CAN_READ, ACTION_CAN_EDIT}


def permission_name_for_dag(dag_id):
"""Returns the permission name for a DAG id."""
def resource_name_for_dag(dag_id):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the right name for this, pretty sure. Input @jhtimmins?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this. Yup!

"""Returns the resource name for a DAG id."""
if dag_id == RESOURCE_DAG:
return dag_id

Expand Down
40 changes: 23 additions & 17 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def get_accessible_dags(self, user_actions, user, session=None):
def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool:
"""Checks if user has read or write access to some dags."""
if dag_id and dag_id != '~':
return self.has_access(action, self.prefixed_dag_id(dag_id))
return self.has_access(action, permissions.resource_name_for_dag(dag_id))

user = g.user
if action == permissions.ACTION_CAN_READ:
Expand All @@ -345,24 +345,30 @@ def can_read_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG read access."""
if not user:
user = g.user
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)
return self._has_view_access(
user, permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG
) or self._has_view_access(user, permissions.ACTION_CAN_READ, prefixed_dag_id)
) or self._has_view_access(user, permissions.ACTION_CAN_READ, dag_resource_name)

def can_edit_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG edit access."""
if not user:
user = g.user
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)

return self._has_view_access(
user, permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG
) or self._has_view_access(user, permissions.ACTION_CAN_EDIT, prefixed_dag_id)
) or self._has_view_access(user, permissions.ACTION_CAN_EDIT, dag_resource_name)

def prefixed_dag_id(self, dag_id):
"""Returns the permission name for a DAG id."""
return permissions.permission_name_for_dag(dag_id)
warnings.warn(
"`prefixed_dag_id` has been deprecated. "
"Please use `airflow.security.permissions.resource_name_for_dag` instead.",
DeprecationWarning,
stacklevel=2,
)
return permissions.resource_name_for_dag(dag_id)

def is_dag_resource(self, resource_name):
"""Determines if a permission belongs to a DAG or all DAGs."""
Expand Down Expand Up @@ -548,7 +554,7 @@ def create_dag_specific_permissions(self) -> None:
dags = dagbag.dags.values()

for dag in dags:
dag_resource_name = self.prefixed_dag_id(dag.dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
for perm_name in self.DAG_PERMS:
if (perm_name, dag_resource_name) not in perms:
self._merge_perm(perm_name, dag_resource_name)
Expand Down Expand Up @@ -624,12 +630,12 @@ def sync_perm_for_dag(self, dag_id, access_control=None):
:type access_control: dict
:return:
"""
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)
for dag_perm in self.DAG_PERMS:
self.add_permission_view_menu(dag_perm, prefixed_dag_id)
self.add_permission_view_menu(dag_perm, dag_resource_name)

if access_control:
self._sync_dag_view_permissions(prefixed_dag_id, access_control)
self._sync_dag_view_permissions(dag_resource_name, access_control)

def _sync_dag_view_permissions(self, dag_id, access_control):
"""Set the access policy on the given DAG's ViewModel.
Expand All @@ -641,13 +647,13 @@ def _sync_dag_view_permissions(self, dag_id, access_control):
{'can_read'}
:type access_control: dict
"""
prefixed_dag_id = self.prefixed_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag_id)

def _get_or_create_dag_permission(perm_name):
dag_perm = self.find_permission_view_menu(perm_name, prefixed_dag_id)
dag_perm = self.find_permission_view_menu(perm_name, dag_resource_name)
if not dag_perm:
self.log.info("Creating new permission '%s' on view '%s'", perm_name, prefixed_dag_id)
dag_perm = self.add_permission_view_menu(perm_name, prefixed_dag_id)
self.log.info("Creating new permission '%s' on view '%s'", perm_name, dag_resource_name)
dag_perm = self.add_permission_view_menu(perm_name, dag_resource_name)

return dag_perm

Expand All @@ -661,12 +667,12 @@ def _revoke_stale_permissions(dag_view):
self.log.info(
"Revoking '%s' on DAG '%s' for role '%s'",
perm.permission,
prefixed_dag_id,
dag_resource_name,
role.name,
)
self.del_permission_role(role, perm)

dag_view = self.find_view_menu(prefixed_dag_id)
dag_view = self.find_view_menu(dag_resource_name)
if dag_view:
_revoke_stale_permissions(dag_view)

Expand All @@ -684,7 +690,7 @@ def _revoke_stale_permissions(dag_view):
raise AirflowException(
"The access_control map for DAG '{}' includes the following "
"invalid permissions: {}; The set of valid permissions "
"is: {}".format(prefixed_dag_id, invalid_perms, self.DAG_PERMS)
"is: {}".format(dag_resource_name, invalid_perms, self.DAG_PERMS)
)

for perm_name in perms:
Expand Down
26 changes: 18 additions & 8 deletions tests/www/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
def _has_dag_perm(self, perm, dag_id, user):
# if not user:
# user = self.user
return self.security_manager.has_access(perm, self.security_manager.prefixed_dag_id(dag_id), user)
return self.security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)

def _create_dag(self, dag_id):
dag_model = DagModel(dag_id=dag_id)
Expand Down Expand Up @@ -581,24 +581,24 @@ def test_create_dag_specific_permissions(self, dagbag_mock):
self.security_manager._sync_dag_view_permissions = mock.Mock()

for dag in dags:
prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
all_perms = self.security_manager.get_all_permissions()
assert ('can_read', prefixed_dag_id) not in all_perms
assert ('can_edit', prefixed_dag_id) not in all_perms
assert ('can_read', dag_resource_name) not in all_perms
assert ('can_edit', dag_resource_name) not in all_perms

self.security_manager.create_dag_specific_permissions()

dagbag_mock.assert_called_once_with(read_dags_from_db=True)
collect_dags_from_db_mock.assert_called_once_with()

for dag in dags:
prefixed_dag_id = self.security_manager.prefixed_dag_id(dag.dag_id)
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
all_perms = self.security_manager.get_all_permissions()
assert ('can_read', prefixed_dag_id) in all_perms
assert ('can_edit', prefixed_dag_id) in all_perms
assert ('can_read', dag_resource_name) in all_perms
assert ('can_edit', dag_resource_name) in all_perms

self.security_manager._sync_dag_view_permissions.assert_called_once_with(
self.security_manager.prefixed_dag_id('has_access_control'), access_control
permissions.resource_name_for_dag('has_access_control'), access_control
)

del dagbag.dags["has_access_control"]
Expand Down Expand Up @@ -638,3 +638,13 @@ def test_get_all_roles_with_permissions(self):
assert isinstance(role, self.security_manager.role_model)

assert 'Admin' in roles

def test_prefixed_dag_id_is_deprecated(self):
with pytest.warns(
DeprecationWarning,
match=(
"`prefixed_dag_id` has been deprecated. "
"Please use `airflow.security.permissions.resource_name_for_dag` instead."
),
):
self.security_manager.prefixed_dag_id("hello")