-
Notifications
You must be signed in to change notification settings - Fork 69
ogr: add has_permission() for access-level checks #985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,25 @@ class ForgejoProject(BaseGitProject): | |
| None: "", | ||
| } | ||
|
|
||
| _FORGEJO_PERM_RANK: ClassVar[dict[str, int]] = { | ||
| "none": -1, | ||
| "read": 0, | ||
| "write": 1, | ||
| "admin": 2, | ||
| "owner": 3, | ||
| } | ||
|
|
||
| # Intentionally differs from access_dict: triage maps to "write" | ||
| # (not "read") because "read" would bypass the permission gate on | ||
| # public repos. | ||
| _ACCESS_LEVEL_TO_FORGEJO_PERM: ClassVar[dict[AccessLevel, str]] = { | ||
| AccessLevel.pull: "read", | ||
| AccessLevel.triage: "write", | ||
| AccessLevel.push: "write", | ||
| AccessLevel.admin: "admin", | ||
| AccessLevel.maintain: "owner", | ||
| } | ||
|
|
||
| def __init__( | ||
| self, | ||
| repo: str, | ||
|
|
@@ -288,6 +307,28 @@ def can_merge_pr(self, username: str) -> bool: | |
| collaborator=username, | ||
| ).permission in ("owner", "admin", "write") | ||
|
|
||
| def has_permission(self, username: str, access_level: AccessLevel) -> bool: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will very likely be an issue. Trying it out locally I have managed to get the following error message out of it: Unfortunately the related issue (to this PR) of service is not linked, but in my perspective this will be a big blocker for supporting Forgejo as an upstream forge in Packit Service, as this also goes against our least privilege “rule” for repos where we provide CI, e.g., on dist-git we do not possess any privileges. |
||
| try: | ||
| perm = self.api.repo_get_repo_permissions( | ||
| owner=self.namespace, | ||
| repo=self.repo, | ||
| collaborator=username, | ||
| ).permission | ||
| except NotFoundError: | ||
| return False | ||
| perm = perm.lower() if perm else "none" | ||
| if perm not in self._FORGEJO_PERM_RANK: | ||
| logger.warning( | ||
| "Unknown Forgejo permission %r for user %s on %s/%s; denying", | ||
| perm, | ||
| username, | ||
| self.namespace, | ||
| self.repo, | ||
| ) | ||
| return False | ||
| required = self._ACCESS_LEVEL_TO_FORGEJO_PERM[access_level] | ||
| return self._FORGEJO_PERM_RANK[perm] >= self._FORGEJO_PERM_RANK[required] | ||
|
|
||
| def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]: | ||
| access_levels_forgejo = [ | ||
| self.access_dict[access_level] for access_level in access_levels | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,23 @@ class GithubProject(BaseGitProject): | |
| # Permission levels that can merge PRs | ||
| CAN_MERGE_PERMS: ClassVar[set[str]] = {"admin", "write"} | ||
|
|
||
| _GITHUB_PERM_RANK: ClassVar[dict[str, int]] = { | ||
| "none": -1, | ||
| "read": 0, | ||
| "triage": 1, | ||
| "write": 2, | ||
| "maintain": 3, | ||
| "admin": 4, | ||
| } | ||
|
|
||
| _ACCESS_LEVEL_TO_GITHUB_PERM: ClassVar[dict[AccessLevel, str]] = { | ||
| AccessLevel.pull: "read", | ||
| AccessLevel.triage: "triage", | ||
| AccessLevel.push: "write", | ||
| AccessLevel.admin: "admin", | ||
| AccessLevel.maintain: "maintain", | ||
| } | ||
|
|
||
| def __init__( | ||
| self, | ||
| repo: str, | ||
|
|
@@ -251,6 +268,24 @@ def can_merge_pr(self, username) -> bool: | |
| in self.CAN_MERGE_PERMS | ||
| ) | ||
|
|
||
| def has_permission(self, username: str, access_level: AccessLevel) -> bool: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, based on the local testing, similar issue as with the Forgejo. Packit Service is OK here, since it is a GitHub App. However for general usage, with a classic PAT (personal access token) this endpoint yields The This should be definitely reflected in docs, if not checked manually and handled with a proper exception (IIUC only repo admins, same as with Forgejo, are allowed to query this endpoint). |
||
| try: | ||
| perm = self.github_repo.get_collaborator_permission(username) | ||
| except UnknownObjectException: | ||
| return False | ||
| perm = perm.lower() if perm else "none" | ||
| if perm not in self._GITHUB_PERM_RANK: | ||
| logger.warning( | ||
| "Unknown GitHub permission %r for user %s on %s/%s; denying", | ||
| perm, | ||
| username, | ||
| self.namespace, | ||
| self.repo, | ||
| ) | ||
| return False | ||
| required = self._ACCESS_LEVEL_TO_GITHUB_PERM[access_level] | ||
| return self._GITHUB_PERM_RANK[perm] >= self._GITHUB_PERM_RANK[required] | ||
|
|
||
| def _get_collaborators_with_permission(self) -> dict: | ||
| """ | ||
| Get all project collaborators in dictionary with permission association. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,7 +3,7 @@ | |||||||||||||||
|
|
||||||||||||||||
| import logging | ||||||||||||||||
| import os | ||||||||||||||||
| from typing import Any, Optional, Union | ||||||||||||||||
| from typing import Any, ClassVar, Optional, Union | ||||||||||||||||
|
|
||||||||||||||||
| import gitlab | ||||||||||||||||
| from gitlab.exceptions import GitlabGetError | ||||||||||||||||
|
|
@@ -38,6 +38,14 @@ | |||||||||||||||
| class GitlabProject(BaseGitProject): | ||||||||||||||||
| service: "ogr_gitlab.GitlabService" | ||||||||||||||||
|
|
||||||||||||||||
| _ACCESS_LEVEL_TO_GITLAB: ClassVar[dict[AccessLevel, int]] = { | ||||||||||||||||
| AccessLevel.pull: gitlab.const.GUEST_ACCESS, | ||||||||||||||||
| AccessLevel.triage: gitlab.const.REPORTER_ACCESS, | ||||||||||||||||
| AccessLevel.push: gitlab.const.DEVELOPER_ACCESS, | ||||||||||||||||
| AccessLevel.admin: gitlab.const.MAINTAINER_ACCESS, | ||||||||||||||||
| AccessLevel.maintain: gitlab.const.OWNER_ACCESS, | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| def __init__( | ||||||||||||||||
| self, | ||||||||||||||||
| repo: str, | ||||||||||||||||
|
|
@@ -193,6 +201,34 @@ def who_can_merge_pr(self) -> set[str]: | |||||||||||||||
| def can_merge_pr(self, username) -> bool: | ||||||||||||||||
| return username in self.who_can_merge_pr() | ||||||||||||||||
|
|
||||||||||||||||
| def has_permission(self, username: str, access_level: AccessLevel) -> bool: | ||||||||||||||||
| required_access = self._ACCESS_LEVEL_TO_GITLAB[access_level] | ||||||||||||||||
| try: | ||||||||||||||||
| users = self.service.gitlab_instance.users.list(username=username) | ||||||||||||||||
| user = next( | ||||||||||||||||
| (u for u in users if u.username.lower() == username.lower()), | ||||||||||||||||
| None, | ||||||||||||||||
| ) | ||||||||||||||||
|
Comment on lines
+207
to
+211
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Also, just out of interest, why is there conversion from a |
||||||||||||||||
| if user is None: | ||||||||||||||||
| return False | ||||||||||||||||
| member_manager = getattr( | ||||||||||||||||
| self.gitlab_repo, | ||||||||||||||||
| "members_all", | ||||||||||||||||
| self.gitlab_repo.members, | ||||||||||||||||
| ) | ||||||||||||||||
| member = member_manager.get(user.id) | ||||||||||||||||
| member_access = ( | ||||||||||||||||
| member["access_level"] | ||||||||||||||||
| if isinstance(member, dict) | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can this happen? 👀 |
||||||||||||||||
| else member.access_level | ||||||||||||||||
| ) | ||||||||||||||||
| return member_access >= required_access | ||||||||||||||||
| except ( | ||||||||||||||||
| gitlab.exceptions.GitlabGetError, | ||||||||||||||||
| gitlab.exceptions.GitlabOperationError, | ||||||||||||||||
| ): | ||||||||||||||||
| return False | ||||||||||||||||
|
|
||||||||||||||||
| def delete(self) -> None: | ||||||||||||||||
| self.gitlab_repo.delete() | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,25 @@ class PagureProject(BaseGitProject): | |
| None: "", | ||
| } | ||
|
|
||
| _PAGURE_PERM_RANK: ClassVar[dict[str, int]] = { | ||
| "ticket": 0, | ||
| "commit": 1, | ||
| "admin": 2, | ||
| "owner": 3, | ||
| } | ||
|
|
||
| # Intentionally differs from access_dict: triage maps to "commit" | ||
| # (not "ticket") and admin maps to "admin" (not "commit") because | ||
| # has_permission uses "at least" threshold semantics rather than | ||
| # the assignment semantics of access_dict. | ||
| _ACCESS_LEVEL_TO_PAGURE_PERM: ClassVar[dict[AccessLevel, str]] = { | ||
| AccessLevel.pull: "ticket", | ||
| AccessLevel.triage: "commit", | ||
| AccessLevel.push: "commit", | ||
| AccessLevel.admin: "admin", | ||
| AccessLevel.maintain: "admin", | ||
| } | ||
|
|
||
|
Comment on lines
+50
to
+68
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO it would probably make sense to make a Pagure-only dataclass… oh i see… ew… never mind |
||
| def __init__( | ||
| self, | ||
| repo: str, | ||
|
|
@@ -255,6 +274,34 @@ def can_merge_pr(self, username) -> bool: | |
| ) | ||
| return username in accounts_that_can_merge_pr | ||
|
|
||
| def has_permission(self, username: str, access_level: AccessLevel) -> bool: | ||
| required = self._ACCESS_LEVEL_TO_PAGURE_PERM[access_level] | ||
| required_rank = self._PAGURE_PERM_RANK[required] | ||
| qualifying = [ | ||
| perm | ||
| for perm, rank in self._PAGURE_PERM_RANK.items() | ||
| if rank >= required_rank | ||
| ] | ||
| try: | ||
| # Check direct user access first to avoid O(G) group expansion | ||
| project = self.get_project_info() | ||
| access_users = project.get("access_users", {}) | ||
| for perm in qualifying: | ||
| if username in access_users.get(perm, []): | ||
| return True | ||
|
|
||
| # Fall back to group expansion | ||
| access_groups = project.get("access_groups", {}) | ||
| for perm in qualifying: | ||
| if perm == "owner": | ||
| continue | ||
| for group in access_groups.get(perm, []): | ||
| if username in self.service.get_group(group).members: | ||
| return True | ||
| return False | ||
| except PagureAPIException: | ||
| return False | ||
|
|
||
| def request_access(self): | ||
| raise OperationNotSupported("Not possible on Pagure") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| # Copyright Contributors to the Packit project. | ||
| # SPDX-License-Identifier: MIT | ||
|
|
||
| import pytest | ||
| from flexmock import flexmock | ||
| from pyforgejo import NotFoundError | ||
|
|
||
| from ogr.abstract import AccessLevel | ||
| from ogr.services.forgejo.project import ForgejoProject | ||
|
|
||
|
|
||
| class TestForgejoHasPermission: | ||
| @staticmethod | ||
| def _make_project(): | ||
| service = flexmock() | ||
| project = ForgejoProject( | ||
| repo="test_repo", | ||
| service=service, | ||
| namespace="test_namespace", | ||
| ) | ||
| mock_api = flexmock() | ||
| flexmock(project).should_receive("api").and_return(mock_api) | ||
| return project, mock_api | ||
|
|
||
| @pytest.mark.parametrize( | ||
| ("perm", "access_level", "expected"), | ||
| [ | ||
| ("write", AccessLevel.triage, True), | ||
| ("write", AccessLevel.push, True), | ||
| ("read", AccessLevel.triage, False), | ||
| ("read", AccessLevel.pull, True), | ||
| ("admin", AccessLevel.admin, True), | ||
| ("admin", AccessLevel.maintain, False), | ||
| ("owner", AccessLevel.maintain, True), | ||
| ("none", AccessLevel.pull, False), | ||
| ], | ||
| ) | ||
| def test_has_permission(self, perm, access_level, expected): | ||
| project, mock_api = self._make_project() | ||
|
|
||
| mock_api.should_receive("repo_get_repo_permissions").with_args( | ||
| owner="test_namespace", | ||
| repo="test_repo", | ||
| collaborator="testuser", | ||
| ).and_return(flexmock(permission=perm)) | ||
|
|
||
| assert project.has_permission("testuser", access_level) is expected | ||
|
|
||
| def test_has_permission_nonexistent_user(self): | ||
| project, mock_api = self._make_project() | ||
|
|
||
| mock_api.should_receive("repo_get_repo_permissions").with_args( | ||
| owner="test_namespace", | ||
| repo="test_repo", | ||
| collaborator="ghost", | ||
| ).and_raise(NotFoundError("Not Found")) | ||
|
|
||
| assert project.has_permission("ghost", AccessLevel.pull) is False | ||
|
|
||
| def test_has_permission_none_permission(self): | ||
| project, mock_api = self._make_project() | ||
|
|
||
| mock_api.should_receive("repo_get_repo_permissions").with_args( | ||
| owner="test_namespace", | ||
| repo="test_repo", | ||
| collaborator="testuser", | ||
| ).and_return(flexmock(permission=None)) | ||
|
|
||
| assert project.has_permission("testuser", AccessLevel.pull) is False | ||
|
|
||
| def test_has_permission_unknown_string(self): | ||
| project, mock_api = self._make_project() | ||
|
|
||
| mock_api.should_receive("repo_get_repo_permissions").with_args( | ||
| owner="test_namespace", | ||
| repo="test_repo", | ||
| collaborator="testuser", | ||
| ).and_return(flexmock(permission="custom_role")) | ||
|
|
||
| assert project.has_permission("testuser", AccessLevel.pull) is False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add this, the table is used only when there are features that are missing in some forges, i.e., at least one forge doesn't support the feature.