diff --git a/COMPATIBILITY.md b/COMPATIBILITY.md index 39ec5ec8..32a7f473 100644 --- a/COMPATIBILITY.md +++ b/COMPATIBILITY.md @@ -63,6 +63,7 @@ In case you find any error, please [create a new issue](https://github.com/packi | `which_groups_can_merge_pr` | ✘ | ✘ | ✔ | ✘ | | `get_pr_files_diff` | ✘ | ✘ | ✔ | ✘ | | `get_users_with_given_access` | ✘ | ✘ | ✔ | ✔ | +| `has_permission` | ✔ | ✔ | ✔ | ✔ | ## User diff --git a/ogr/abstract/access_level.py b/ogr/abstract/access_level.py index fee75f14..c58ad29d 100644 --- a/ogr/abstract/access_level.py +++ b/ogr/abstract/access_level.py @@ -8,13 +8,17 @@ class AccessLevel(IntEnum): """ Enumeration representing an access level to the repository. - | Value from enumeration | GitHub | GitLab | Pagure | - | ---------------------- | -------- | ----------------------- | ------ | - | `AccessLevel.pull` | pull | guest | ticket | - | `AccessLevel.triage` | triage | reporter | ticket | - | `AccessLevel.push` | push | developer | commit | - | `AccessLevel.admin` | admin | maintainer | commit | - | `AccessLevel.maintain` | maintain | owner (only for groups) | admin | + | Value from enumeration | GitHub | GitLab | Pagure | Forgejo | + | ---------------------- | -------- | ----------------------- | ------ | ------- | + | `AccessLevel.pull` | pull | guest | ticket | read | + | `AccessLevel.triage` | triage | reporter | ticket | read | + | `AccessLevel.push` | push | developer | commit | write | + | `AccessLevel.admin` | admin | maintainer | commit | admin | + | `AccessLevel.maintain` | maintain | owner (only for groups) | admin | owner | + + Note: ``has_permission()`` uses conservative fallbacks for forges + that lack a triage equivalent — a triage check requires commit + on Pagure and write on Forgejo, not ticket/read. """ pull = 1 diff --git a/ogr/abstract/git_project.py b/ogr/abstract/git_project.py index 0a324403..9003f292 100644 --- a/ogr/abstract/git_project.py +++ b/ogr/abstract/git_project.py @@ -186,6 +186,30 @@ def can_merge_pr(self, username: str) -> bool: """ raise NotImplementedError() + def has_permission(self, username: str, access_level: AccessLevel) -> bool: + """ + Check if user has at least the given access level. + + Not all forges support all access levels. When a forge lacks an + equivalent for a given level, the check falls back to the nearest + higher level (e.g., triage falls back to write/push on forges + without a triage concept). See the ``AccessLevel`` docstring for + the per-forge mapping table. + + Implementations must not propagate forge-specific exceptions for + unknown or non-existent users — return ``False`` instead. + + Args: + username: Username. + access_level: Minimum required access level. + + Returns: + ``True`` if user has at least the given access level, + ``False`` otherwise. Returns ``False`` if the user does not + exist or is not a collaborator. + """ + raise NotImplementedError() + def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]: """ Args: diff --git a/ogr/services/forgejo/project.py b/ogr/services/forgejo/project.py index ef4dd775..916fc7a1 100644 --- a/ogr/services/forgejo/project.py +++ b/ogr/services/forgejo/project.py @@ -49,6 +49,25 @@ class ForgejoProject(BaseGitProject): None: "", } + _FORGEJO_PERM_RANK: ClassVar[dict[str, int]] = { + "none": -1, + "read": 0, + "write": 1, + "admin": 2, + "owner": 3, + } + + # Intentionally differs from access_dict: triage maps to "write" + # (not "read") because "read" would bypass the permission gate on + # public repos. + _ACCESS_LEVEL_TO_FORGEJO_PERM: ClassVar[dict[AccessLevel, str]] = { + AccessLevel.pull: "read", + AccessLevel.triage: "write", + AccessLevel.push: "write", + AccessLevel.admin: "admin", + AccessLevel.maintain: "owner", + } + def __init__( self, repo: str, @@ -288,6 +307,28 @@ def can_merge_pr(self, username: str) -> bool: collaborator=username, ).permission in ("owner", "admin", "write") + def has_permission(self, username: str, access_level: AccessLevel) -> bool: + try: + perm = self.api.repo_get_repo_permissions( + owner=self.namespace, + repo=self.repo, + collaborator=username, + ).permission + except NotFoundError: + return False + perm = perm.lower() if perm else "none" + if perm not in self._FORGEJO_PERM_RANK: + logger.warning( + "Unknown Forgejo permission %r for user %s on %s/%s; denying", + perm, + username, + self.namespace, + self.repo, + ) + return False + required = self._ACCESS_LEVEL_TO_FORGEJO_PERM[access_level] + return self._FORGEJO_PERM_RANK[perm] >= self._FORGEJO_PERM_RANK[required] + def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]: access_levels_forgejo = [ self.access_dict[access_level] for access_level in access_levels diff --git a/ogr/services/github/project.py b/ogr/services/github/project.py index 378e7904..af21e89e 100644 --- a/ogr/services/github/project.py +++ b/ogr/services/github/project.py @@ -49,6 +49,23 @@ class GithubProject(BaseGitProject): # Permission levels that can merge PRs CAN_MERGE_PERMS: ClassVar[set[str]] = {"admin", "write"} + _GITHUB_PERM_RANK: ClassVar[dict[str, int]] = { + "none": -1, + "read": 0, + "triage": 1, + "write": 2, + "maintain": 3, + "admin": 4, + } + + _ACCESS_LEVEL_TO_GITHUB_PERM: ClassVar[dict[AccessLevel, str]] = { + AccessLevel.pull: "read", + AccessLevel.triage: "triage", + AccessLevel.push: "write", + AccessLevel.admin: "admin", + AccessLevel.maintain: "maintain", + } + def __init__( self, repo: str, @@ -251,6 +268,24 @@ def can_merge_pr(self, username) -> bool: in self.CAN_MERGE_PERMS ) + def has_permission(self, username: str, access_level: AccessLevel) -> bool: + try: + perm = self.github_repo.get_collaborator_permission(username) + except UnknownObjectException: + return False + perm = perm.lower() if perm else "none" + if perm not in self._GITHUB_PERM_RANK: + logger.warning( + "Unknown GitHub permission %r for user %s on %s/%s; denying", + perm, + username, + self.namespace, + self.repo, + ) + return False + required = self._ACCESS_LEVEL_TO_GITHUB_PERM[access_level] + return self._GITHUB_PERM_RANK[perm] >= self._GITHUB_PERM_RANK[required] + def _get_collaborators_with_permission(self) -> dict: """ Get all project collaborators in dictionary with permission association. diff --git a/ogr/services/gitlab/project.py b/ogr/services/gitlab/project.py index f8dede69..59c84d48 100644 --- a/ogr/services/gitlab/project.py +++ b/ogr/services/gitlab/project.py @@ -3,7 +3,7 @@ import logging import os -from typing import Any, Optional, Union +from typing import Any, ClassVar, Optional, Union import gitlab from gitlab.exceptions import GitlabGetError @@ -38,6 +38,14 @@ class GitlabProject(BaseGitProject): service: "ogr_gitlab.GitlabService" + _ACCESS_LEVEL_TO_GITLAB: ClassVar[dict[AccessLevel, int]] = { + AccessLevel.pull: gitlab.const.GUEST_ACCESS, + AccessLevel.triage: gitlab.const.REPORTER_ACCESS, + AccessLevel.push: gitlab.const.DEVELOPER_ACCESS, + AccessLevel.admin: gitlab.const.MAINTAINER_ACCESS, + AccessLevel.maintain: gitlab.const.OWNER_ACCESS, + } + def __init__( self, repo: str, @@ -193,6 +201,34 @@ def who_can_merge_pr(self) -> set[str]: def can_merge_pr(self, username) -> bool: return username in self.who_can_merge_pr() + def has_permission(self, username: str, access_level: AccessLevel) -> bool: + required_access = self._ACCESS_LEVEL_TO_GITLAB[access_level] + try: + users = self.service.gitlab_instance.users.list(username=username) + user = next( + (u for u in users if u.username.lower() == username.lower()), + None, + ) + if user is None: + return False + member_manager = getattr( + self.gitlab_repo, + "members_all", + self.gitlab_repo.members, + ) + member = member_manager.get(user.id) + member_access = ( + member["access_level"] + if isinstance(member, dict) + else member.access_level + ) + return member_access >= required_access + except ( + gitlab.exceptions.GitlabGetError, + gitlab.exceptions.GitlabOperationError, + ): + return False + def delete(self) -> None: self.gitlab_repo.delete() diff --git a/ogr/services/pagure/project.py b/ogr/services/pagure/project.py index cea9d424..e035da70 100644 --- a/ogr/services/pagure/project.py +++ b/ogr/services/pagure/project.py @@ -47,6 +47,25 @@ class PagureProject(BaseGitProject): None: "", } + _PAGURE_PERM_RANK: ClassVar[dict[str, int]] = { + "ticket": 0, + "commit": 1, + "admin": 2, + "owner": 3, + } + + # Intentionally differs from access_dict: triage maps to "commit" + # (not "ticket") and admin maps to "admin" (not "commit") because + # has_permission uses "at least" threshold semantics rather than + # the assignment semantics of access_dict. + _ACCESS_LEVEL_TO_PAGURE_PERM: ClassVar[dict[AccessLevel, str]] = { + AccessLevel.pull: "ticket", + AccessLevel.triage: "commit", + AccessLevel.push: "commit", + AccessLevel.admin: "admin", + AccessLevel.maintain: "admin", + } + def __init__( self, repo: str, @@ -255,6 +274,34 @@ def can_merge_pr(self, username) -> bool: ) return username in accounts_that_can_merge_pr + def has_permission(self, username: str, access_level: AccessLevel) -> bool: + required = self._ACCESS_LEVEL_TO_PAGURE_PERM[access_level] + required_rank = self._PAGURE_PERM_RANK[required] + qualifying = [ + perm + for perm, rank in self._PAGURE_PERM_RANK.items() + if rank >= required_rank + ] + try: + # Check direct user access first to avoid O(G) group expansion + project = self.get_project_info() + access_users = project.get("access_users", {}) + for perm in qualifying: + if username in access_users.get(perm, []): + return True + + # Fall back to group expansion + access_groups = project.get("access_groups", {}) + for perm in qualifying: + if perm == "owner": + continue + for group in access_groups.get(perm, []): + if username in self.service.get_group(group).members: + return True + return False + except PagureAPIException: + return False + def request_access(self): raise OperationNotSupported("Not possible on Pagure") diff --git a/tests/unit/test_forgejo.py b/tests/unit/test_forgejo.py new file mode 100644 index 00000000..d519e3d4 --- /dev/null +++ b/tests/unit/test_forgejo.py @@ -0,0 +1,80 @@ +# Copyright Contributors to the Packit project. +# SPDX-License-Identifier: MIT + +import pytest +from flexmock import flexmock +from pyforgejo import NotFoundError + +from ogr.abstract import AccessLevel +from ogr.services.forgejo.project import ForgejoProject + + +class TestForgejoHasPermission: + @staticmethod + def _make_project(): + service = flexmock() + project = ForgejoProject( + repo="test_repo", + service=service, + namespace="test_namespace", + ) + mock_api = flexmock() + flexmock(project).should_receive("api").and_return(mock_api) + return project, mock_api + + @pytest.mark.parametrize( + ("perm", "access_level", "expected"), + [ + ("write", AccessLevel.triage, True), + ("write", AccessLevel.push, True), + ("read", AccessLevel.triage, False), + ("read", AccessLevel.pull, True), + ("admin", AccessLevel.admin, True), + ("admin", AccessLevel.maintain, False), + ("owner", AccessLevel.maintain, True), + ("none", AccessLevel.pull, False), + ], + ) + def test_has_permission(self, perm, access_level, expected): + project, mock_api = self._make_project() + + mock_api.should_receive("repo_get_repo_permissions").with_args( + owner="test_namespace", + repo="test_repo", + collaborator="testuser", + ).and_return(flexmock(permission=perm)) + + assert project.has_permission("testuser", access_level) is expected + + def test_has_permission_nonexistent_user(self): + project, mock_api = self._make_project() + + mock_api.should_receive("repo_get_repo_permissions").with_args( + owner="test_namespace", + repo="test_repo", + collaborator="ghost", + ).and_raise(NotFoundError("Not Found")) + + assert project.has_permission("ghost", AccessLevel.pull) is False + + def test_has_permission_none_permission(self): + project, mock_api = self._make_project() + + mock_api.should_receive("repo_get_repo_permissions").with_args( + owner="test_namespace", + repo="test_repo", + collaborator="testuser", + ).and_return(flexmock(permission=None)) + + assert project.has_permission("testuser", AccessLevel.pull) is False + + def test_has_permission_unknown_string(self): + project, mock_api = self._make_project() + + mock_api.should_receive("repo_get_repo_permissions").with_args( + owner="test_namespace", + repo="test_repo", + collaborator="testuser", + ).and_return(flexmock(permission="custom_role")) + + assert project.has_permission("testuser", AccessLevel.pull) is False diff --git a/tests/unit/test_github.py b/tests/unit/test_github.py index 02537d1f..bf0453e5 100644 --- a/tests/unit/test_github.py +++ b/tests/unit/test_github.py @@ -6,9 +6,10 @@ import pytest from flexmock import flexmock +from github import UnknownObjectException from ogr import GithubService -from ogr.abstract import AuthMethod +from ogr.abstract import AccessLevel, AuthMethod from ogr.exceptions import GithubAPIException from ogr.services.github.auth_providers.token import TokenAuthentication from ogr.services.github.auth_providers.tokman import Tokman @@ -203,3 +204,80 @@ def test_no_set_reset_customized_auth_method(github_service_with_one_auth_method with pytest.raises(GithubAPIException): service.set_auth_method(AuthMethod.github_app) assert isinstance(service.authentication, Tokman) + + +class TestGithubHasPermission: + @pytest.mark.parametrize( + ("perm", "access_level", "expected"), + [ + ("admin", AccessLevel.triage, True), + ("write", AccessLevel.triage, True), + ("maintain", AccessLevel.triage, True), + ("triage", AccessLevel.triage, True), + ("triage", AccessLevel.push, False), + ("read", AccessLevel.triage, False), + ("read", AccessLevel.pull, True), + ("none", AccessLevel.triage, False), + ("none", AccessLevel.pull, False), + ("admin", AccessLevel.admin, True), + ("admin", AccessLevel.maintain, True), + ("maintain", AccessLevel.admin, False), + ("write", AccessLevel.push, True), + ("read", AccessLevel.push, False), + ], + ) + def test_has_permission(self, perm, access_level, expected): + project = GithubProject( + repo="test_repo", + service="test_service", + namespace="test_namespace", + ) + mock_repo = flexmock() + mock_repo.should_receive("get_collaborator_permission").with_args( + "testuser", + ).and_return(perm) + flexmock(project).should_receive("github_repo").and_return(mock_repo) + + assert project.has_permission("testuser", access_level) is expected + + def test_has_permission_nonexistent_user(self): + project = GithubProject( + repo="test_repo", + service="test_service", + namespace="test_namespace", + ) + mock_repo = flexmock() + mock_repo.should_receive("get_collaborator_permission").with_args( + "ghost", + ).and_raise(UnknownObjectException(404, data="Not Found")) + flexmock(project).should_receive("github_repo").and_return(mock_repo) + + assert project.has_permission("ghost", AccessLevel.pull) is False + + def test_has_permission_unknown_string(self): + project = GithubProject( + repo="test_repo", + service="test_service", + namespace="test_namespace", + ) + mock_repo = flexmock() + mock_repo.should_receive("get_collaborator_permission").with_args( + "testuser", + ).and_return("custom_role") + flexmock(project).should_receive("github_repo").and_return(mock_repo) + + assert project.has_permission("testuser", AccessLevel.pull) is False + + def test_has_permission_none_permission(self): + project = GithubProject( + repo="test_repo", + service="test_service", + namespace="test_namespace", + ) + mock_repo = flexmock() + mock_repo.should_receive("get_collaborator_permission").with_args( + "testuser", + ).and_return(None) + flexmock(project).should_receive("github_repo").and_return(mock_repo) + + assert project.has_permission("testuser", AccessLevel.pull) is False diff --git a/tests/unit/test_gitlab.py b/tests/unit/test_gitlab.py index 9823ff75..8eb306c5 100644 --- a/tests/unit/test_gitlab.py +++ b/tests/unit/test_gitlab.py @@ -3,7 +3,14 @@ from unittest import TestCase +import gitlab.const +import gitlab.exceptions +import pytest +from flexmock import flexmock + from ogr import GitlabService +from ogr.abstract import AccessLevel +from ogr.services.gitlab.project import GitlabProject class TestGitlabService(TestCase): @@ -13,3 +20,93 @@ def test_hostname(self): GitlabService(instance_url="https://gitlab.gnome.org").hostname == "gitlab.gnome.org" ) + + +class TestGitlabHasPermission: + @staticmethod + def _make_project(mock_users_list_result): + users_manager = flexmock() + users_manager.should_receive("list").and_return(mock_users_list_result) + + gitlab_instance = flexmock(users=users_manager) + service = flexmock(gitlab_instance=gitlab_instance) + + project = GitlabProject( + repo="test_repo", + service=service, + namespace="test_namespace", + ) + mock_gitlab_repo = flexmock(members_all=flexmock(), members=flexmock()) + flexmock(project).should_receive("gitlab_repo").and_return( + mock_gitlab_repo, + ) + return project, mock_gitlab_repo + + @pytest.mark.parametrize( + ("member_access", "access_level", "expected"), + [ + (gitlab.const.GUEST_ACCESS, AccessLevel.pull, True), + (gitlab.const.REPORTER_ACCESS, AccessLevel.triage, True), + (gitlab.const.REPORTER_ACCESS, AccessLevel.push, False), + (gitlab.const.DEVELOPER_ACCESS, AccessLevel.triage, True), + (gitlab.const.MAINTAINER_ACCESS, AccessLevel.admin, True), + (gitlab.const.MAINTAINER_ACCESS, AccessLevel.maintain, False), + (gitlab.const.OWNER_ACCESS, AccessLevel.maintain, True), + ], + ) + def test_has_permission(self, member_access, access_level, expected): + mock_user = flexmock(id=42, username="testuser") + project, mock_gitlab_repo = self._make_project([mock_user]) + + mock_member = flexmock(access_level=member_access) + mock_gitlab_repo.members_all.should_receive("get").with_args( + 42, + ).and_return(mock_member) + + assert project.has_permission("testuser", access_level) is expected + + def test_has_permission_nonexistent_user(self): + project, _ = self._make_project([]) + + assert project.has_permission("ghost", AccessLevel.pull) is False + + def test_has_permission_non_member(self): + mock_user = flexmock(id=42, username="outsider") + project, mock_gitlab_repo = self._make_project([mock_user]) + + mock_gitlab_repo.members_all.should_receive("get").with_args( + 42, + ).and_raise( + gitlab.exceptions.GitlabGetError("404 Not found"), + ) + + assert project.has_permission("outsider", AccessLevel.pull) is False + + def test_has_permission_username_mismatch(self): + mock_user = flexmock(id=99, username="alice_admin") + project, _ = self._make_project([mock_user]) + + assert project.has_permission("alice", AccessLevel.pull) is False + + def test_has_permission_case_insensitive(self): + mock_user = flexmock(id=42, username="TestUser") + project, mock_gitlab_repo = self._make_project([mock_user]) + + mock_member = flexmock(access_level=gitlab.const.DEVELOPER_ACCESS) + mock_gitlab_repo.members_all.should_receive("get").with_args( + 42, + ).and_return(mock_member) + + assert project.has_permission("testuser", AccessLevel.push) is True + + def test_has_permission_operation_error(self): + mock_user = flexmock(id=42, username="testuser") + project, mock_gitlab_repo = self._make_project([mock_user]) + + mock_gitlab_repo.members_all.should_receive("get").with_args( + 42, + ).and_raise( + gitlab.exceptions.GitlabOperationError("500 Internal Server Error"), + ) + + assert project.has_permission("testuser", AccessLevel.pull) is False diff --git a/tests/unit/test_pagure.py b/tests/unit/test_pagure.py index 7650a186..5d969541 100644 --- a/tests/unit/test_pagure.py +++ b/tests/unit/test_pagure.py @@ -3,10 +3,105 @@ from unittest import TestCase +import pytest +from flexmock import flexmock + from ogr import PagureService +from ogr.abstract import AccessLevel +from ogr.exceptions import PagureAPIException +from ogr.services.pagure.project import PagureProject class TestPagureService(TestCase): def test_hostname(self): assert PagureService().hostname == "src.fedoraproject.org" assert PagureService(instance_url="https://pagure.io").hostname == "pagure.io" + + +def _make_pagure_project(access_users, access_groups=None): + service = flexmock(instance_url="https://pagure.io", read_only=False) + project = PagureProject( + repo="test_repo", + namespace="test_namespace", + service=service, + ) + project_info = { + "access_users": access_users, + "access_groups": access_groups or {"admin": [], "commit": [], "ticket": []}, + } + flexmock(project).should_receive("get_project_info").and_return(project_info) + return project + + +class TestPagureHasPermission: + @pytest.mark.parametrize( + ("access_key", "access_level", "expected"), + [ + ("ticket", AccessLevel.pull, True), + ("commit", AccessLevel.triage, True), + ("ticket", AccessLevel.triage, False), + ("commit", AccessLevel.admin, False), + ("admin", AccessLevel.admin, True), + ("admin", AccessLevel.maintain, True), + ("commit", AccessLevel.maintain, False), + ], + ) + def test_has_permission(self, access_key, access_level, expected): + access_users = {"owner": [], "admin": [], "commit": [], "ticket": []} + access_users[access_key] = ["testuser"] + project = _make_pagure_project(access_users) + + assert project.has_permission("testuser", access_level) is expected + + def test_has_permission_owner_pull(self): + access_users = { + "owner": ["the-owner"], + "admin": [], + "commit": [], + "ticket": [], + } + project = _make_pagure_project(access_users) + + assert project.has_permission("the-owner", AccessLevel.pull) is True + + def test_has_permission_owner_maintain(self): + access_users = { + "owner": ["the-owner"], + "admin": [], + "commit": [], + "ticket": [], + } + project = _make_pagure_project(access_users) + + assert project.has_permission("the-owner", AccessLevel.maintain) is True + + def test_has_permission_via_group(self): + access_users = {"owner": [], "admin": [], "commit": [], "ticket": []} + access_groups = {"admin": [], "commit": ["dev-team"], "ticket": []} + project = _make_pagure_project(access_users, access_groups) + + mock_group = flexmock(members=["groupuser"]) + project.service.should_receive("get_group").with_args( + "dev-team", + ).and_return(mock_group) + + assert project.has_permission("groupuser", AccessLevel.push) is True + + def test_has_permission_unknown_user(self): + access_users = {"owner": [], "admin": [], "commit": [], "ticket": []} + project = _make_pagure_project(access_users) + + assert project.has_permission("nobody", AccessLevel.pull) is False + + def test_has_permission_api_error(self): + service = flexmock(instance_url="https://pagure.io", read_only=False) + project = PagureProject( + repo="test_repo", + namespace="test_namespace", + service=service, + ) + flexmock(project).should_receive("get_project_info").and_raise( + PagureAPIException("Server error", response_code=500), + ) + + assert project.has_permission("anyone", AccessLevel.push) is False