Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions COMPATIBILITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ In case you find any error, please [create a new issue](https://github.com/packi
| `which_groups_can_merge_pr` | ✘ | ✘ | ✔ | ✘ |
| `get_pr_files_diff` | ✘ | ✘ | ✔ | ✘ |
| `get_users_with_given_access` | ✘ | ✘ | ✔ | ✔ |
| `has_permission` | ✔ | ✔ | ✔ | ✔ |
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this, the table is used only when there are features that are missing in some forges, i.e., at least one forge doesn't support the feature.


## User

Expand Down
18 changes: 11 additions & 7 deletions ogr/abstract/access_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ class AccessLevel(IntEnum):
"""
Enumeration representing an access level to the repository.

| Value from enumeration | GitHub | GitLab | Pagure |
| ---------------------- | -------- | ----------------------- | ------ |
| `AccessLevel.pull` | pull | guest | ticket |
| `AccessLevel.triage` | triage | reporter | ticket |
| `AccessLevel.push` | push | developer | commit |
| `AccessLevel.admin` | admin | maintainer | commit |
| `AccessLevel.maintain` | maintain | owner (only for groups) | admin |
| Value from enumeration | GitHub | GitLab | Pagure | Forgejo |
| ---------------------- | -------- | ----------------------- | ------ | ------- |
| `AccessLevel.pull` | pull | guest | ticket | read |
| `AccessLevel.triage` | triage | reporter | ticket | read |
| `AccessLevel.push` | push | developer | commit | write |
| `AccessLevel.admin` | admin | maintainer | commit | admin |
| `AccessLevel.maintain` | maintain | owner (only for groups) | admin | owner |

Note: ``has_permission()`` uses conservative fallbacks for forges
that lack a triage equivalent — a triage check requires commit
on Pagure and write on Forgejo, not ticket/read.
"""

pull = 1
Expand Down
24 changes: 24 additions & 0 deletions ogr/abstract/git_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,30 @@ def can_merge_pr(self, username: str) -> bool:
"""
raise NotImplementedError()

def has_permission(self, username: str, access_level: AccessLevel) -> bool:
"""
Check if user has at least the given access level.

Not all forges support all access levels. When a forge lacks an
equivalent for a given level, the check falls back to the nearest
higher level (e.g., triage falls back to write/push on forges
without a triage concept). See the ``AccessLevel`` docstring for
the per-forge mapping table.

Implementations must not propagate forge-specific exceptions for
unknown or non-existent users — return ``False`` instead.

Args:
username: Username.
access_level: Minimum required access level.

Returns:
``True`` if user has at least the given access level,
``False`` otherwise. Returns ``False`` if the user does not
exist or is not a collaborator.
"""
raise NotImplementedError()

def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]:
"""
Args:
Expand Down
41 changes: 41 additions & 0 deletions ogr/services/forgejo/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ class ForgejoProject(BaseGitProject):
None: "",
}

_FORGEJO_PERM_RANK: ClassVar[dict[str, int]] = {
"none": -1,
"read": 0,
"write": 1,
"admin": 2,
"owner": 3,
}

# Intentionally differs from access_dict: triage maps to "write"
# (not "read") because "read" would bypass the permission gate on
# public repos.
_ACCESS_LEVEL_TO_FORGEJO_PERM: ClassVar[dict[AccessLevel, str]] = {
AccessLevel.pull: "read",
AccessLevel.triage: "write",
AccessLevel.push: "write",
AccessLevel.admin: "admin",
AccessLevel.maintain: "owner",
}

def __init__(
self,
repo: str,
Expand Down Expand Up @@ -288,6 +307,28 @@ def can_merge_pr(self, username: str) -> bool:
collaborator=username,
).permission in ("owner", "admin", "write")

def has_permission(self, username: str, access_level: AccessLevel) -> bool:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will very likely be an issue. Trying it out locally I have managed to get the following error message out of it:

status_code: 403
message: >
  Only admins can query all permissions, repo admins can query all repo
  permissions, collaborators can query only their own

Unfortunately the related issue (to this PR) of service is not linked, but in my perspective this will be a big blocker for supporting Forgejo as an upstream forge in Packit Service, as this also goes against our least privilege “rule” for repos where we provide CI, e.g., on dist-git we do not possess any privileges.

try:
perm = self.api.repo_get_repo_permissions(
owner=self.namespace,
repo=self.repo,
collaborator=username,
).permission
except NotFoundError:
return False
perm = perm.lower() if perm else "none"
if perm not in self._FORGEJO_PERM_RANK:
logger.warning(
"Unknown Forgejo permission %r for user %s on %s/%s; denying",
perm,
username,
self.namespace,
self.repo,
)
return False
required = self._ACCESS_LEVEL_TO_FORGEJO_PERM[access_level]
return self._FORGEJO_PERM_RANK[perm] >= self._FORGEJO_PERM_RANK[required]

def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]:
access_levels_forgejo = [
self.access_dict[access_level] for access_level in access_levels
Expand Down
35 changes: 35 additions & 0 deletions ogr/services/github/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ class GithubProject(BaseGitProject):
# Permission levels that can merge PRs
CAN_MERGE_PERMS: ClassVar[set[str]] = {"admin", "write"}

_GITHUB_PERM_RANK: ClassVar[dict[str, int]] = {
"none": -1,
"read": 0,
"triage": 1,
"write": 2,
"maintain": 3,
"admin": 4,
}

_ACCESS_LEVEL_TO_GITHUB_PERM: ClassVar[dict[AccessLevel, str]] = {
AccessLevel.pull: "read",
AccessLevel.triage: "triage",
AccessLevel.push: "write",
AccessLevel.admin: "admin",
AccessLevel.maintain: "maintain",
}

def __init__(
self,
repo: str,
Expand Down Expand Up @@ -251,6 +268,24 @@ def can_merge_pr(self, username) -> bool:
in self.CAN_MERGE_PERMS
)

def has_permission(self, username: str, access_level: AccessLevel) -> bool:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, based on the local testing, similar issue as with the Forgejo.

Packit Service is OK here, since it is a GitHub App. However for general usage, with a classic PAT (personal access token) this endpoint yields 403.

The 403 status is even bigger issue, because of the utterly poor design choices done by GitHub to return 403 even for rate-limiting which classifies this Forbidden status code as to be retried in ogr, basically resulting in an infinite loop…

This should be definitely reflected in docs, if not checked manually and handled with a proper exception (IIUC only repo admins, same as with Forgejo, are allowed to query this endpoint).

try:
perm = self.github_repo.get_collaborator_permission(username)
except UnknownObjectException:
return False
perm = perm.lower() if perm else "none"
if perm not in self._GITHUB_PERM_RANK:
logger.warning(
"Unknown GitHub permission %r for user %s on %s/%s; denying",
perm,
username,
self.namespace,
self.repo,
)
return False
required = self._ACCESS_LEVEL_TO_GITHUB_PERM[access_level]
return self._GITHUB_PERM_RANK[perm] >= self._GITHUB_PERM_RANK[required]

def _get_collaborators_with_permission(self) -> dict:
"""
Get all project collaborators in dictionary with permission association.
Expand Down
38 changes: 37 additions & 1 deletion ogr/services/gitlab/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import logging
import os
from typing import Any, Optional, Union
from typing import Any, ClassVar, Optional, Union

import gitlab
from gitlab.exceptions import GitlabGetError
Expand Down Expand Up @@ -38,6 +38,14 @@
class GitlabProject(BaseGitProject):
service: "ogr_gitlab.GitlabService"

_ACCESS_LEVEL_TO_GITLAB: ClassVar[dict[AccessLevel, int]] = {
AccessLevel.pull: gitlab.const.GUEST_ACCESS,
AccessLevel.triage: gitlab.const.REPORTER_ACCESS,
AccessLevel.push: gitlab.const.DEVELOPER_ACCESS,
AccessLevel.admin: gitlab.const.MAINTAINER_ACCESS,
AccessLevel.maintain: gitlab.const.OWNER_ACCESS,
}

def __init__(
self,
repo: str,
Expand Down Expand Up @@ -193,6 +201,34 @@ def who_can_merge_pr(self) -> set[str]:
def can_merge_pr(self, username) -> bool:
return username in self.who_can_merge_pr()

def has_permission(self, username: str, access_level: AccessLevel) -> bool:
required_access = self._ACCESS_LEVEL_TO_GITLAB[access_level]
try:
users = self.service.gitlab_instance.users.list(username=username)
user = next(
(u for u in users if u.username.lower() == username.lower()),
None,
)
Comment on lines +207 to +211
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
users = self.service.gitlab_instance.users.list(username=username)
user = next(
(u for u in users if u.username.lower() == username.lower()),
None,
)
users = self.service.gitlab_instance.users.list(username=username, iterator=True)
user = next(users, None)

Also, just out of interest, why is there conversion from a list to a generator including a filtering? I would've expected the .list(username=username) to handle the filtering?

if user is None:
return False
member_manager = getattr(
self.gitlab_repo,
"members_all",
self.gitlab_repo.members,
)
member = member_manager.get(user.id)
member_access = (
member["access_level"]
if isinstance(member, dict)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this happen? 👀

else member.access_level
)
return member_access >= required_access
except (
gitlab.exceptions.GitlabGetError,
gitlab.exceptions.GitlabOperationError,
):
return False

def delete(self) -> None:
self.gitlab_repo.delete()

Expand Down
47 changes: 47 additions & 0 deletions ogr/services/pagure/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ class PagureProject(BaseGitProject):
None: "",
}

_PAGURE_PERM_RANK: ClassVar[dict[str, int]] = {
"ticket": 0,
"commit": 1,
"admin": 2,
"owner": 3,
}

# Intentionally differs from access_dict: triage maps to "commit"
# (not "ticket") and admin maps to "admin" (not "commit") because
# has_permission uses "at least" threshold semantics rather than
# the assignment semantics of access_dict.
_ACCESS_LEVEL_TO_PAGURE_PERM: ClassVar[dict[AccessLevel, str]] = {
AccessLevel.pull: "ticket",
AccessLevel.triage: "commit",
AccessLevel.push: "commit",
AccessLevel.admin: "admin",
AccessLevel.maintain: "admin",
}

Comment on lines +50 to +68
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it would probably make sense to make a Pagure-only dataclass… oh i see… ew… never mind

def __init__(
self,
repo: str,
Expand Down Expand Up @@ -255,6 +274,34 @@ def can_merge_pr(self, username) -> bool:
)
return username in accounts_that_can_merge_pr

def has_permission(self, username: str, access_level: AccessLevel) -> bool:
required = self._ACCESS_LEVEL_TO_PAGURE_PERM[access_level]
required_rank = self._PAGURE_PERM_RANK[required]
qualifying = [
perm
for perm, rank in self._PAGURE_PERM_RANK.items()
if rank >= required_rank
]
try:
# Check direct user access first to avoid O(G) group expansion
project = self.get_project_info()
access_users = project.get("access_users", {})
for perm in qualifying:
if username in access_users.get(perm, []):
return True

# Fall back to group expansion
access_groups = project.get("access_groups", {})
for perm in qualifying:
if perm == "owner":
continue
for group in access_groups.get(perm, []):
if username in self.service.get_group(group).members:
return True
return False
except PagureAPIException:
return False

def request_access(self):
raise OperationNotSupported("Not possible on Pagure")

Expand Down
80 changes: 80 additions & 0 deletions tests/unit/test_forgejo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright Contributors to the Packit project.
# SPDX-License-Identifier: MIT

import pytest
from flexmock import flexmock
from pyforgejo import NotFoundError

from ogr.abstract import AccessLevel
from ogr.services.forgejo.project import ForgejoProject


class TestForgejoHasPermission:
@staticmethod
def _make_project():
service = flexmock()
project = ForgejoProject(
repo="test_repo",
service=service,
namespace="test_namespace",
)
mock_api = flexmock()
flexmock(project).should_receive("api").and_return(mock_api)
return project, mock_api

@pytest.mark.parametrize(
("perm", "access_level", "expected"),
[
("write", AccessLevel.triage, True),
("write", AccessLevel.push, True),
("read", AccessLevel.triage, False),
("read", AccessLevel.pull, True),
("admin", AccessLevel.admin, True),
("admin", AccessLevel.maintain, False),
("owner", AccessLevel.maintain, True),
("none", AccessLevel.pull, False),
],
)
def test_has_permission(self, perm, access_level, expected):
project, mock_api = self._make_project()

mock_api.should_receive("repo_get_repo_permissions").with_args(
owner="test_namespace",
repo="test_repo",
collaborator="testuser",
).and_return(flexmock(permission=perm))

assert project.has_permission("testuser", access_level) is expected

def test_has_permission_nonexistent_user(self):
project, mock_api = self._make_project()

mock_api.should_receive("repo_get_repo_permissions").with_args(
owner="test_namespace",
repo="test_repo",
collaborator="ghost",
).and_raise(NotFoundError("Not Found"))

assert project.has_permission("ghost", AccessLevel.pull) is False

def test_has_permission_none_permission(self):
project, mock_api = self._make_project()

mock_api.should_receive("repo_get_repo_permissions").with_args(
owner="test_namespace",
repo="test_repo",
collaborator="testuser",
).and_return(flexmock(permission=None))

assert project.has_permission("testuser", AccessLevel.pull) is False

def test_has_permission_unknown_string(self):
project, mock_api = self._make_project()

mock_api.should_receive("repo_get_repo_permissions").with_args(
owner="test_namespace",
repo="test_repo",
collaborator="testuser",
).and_return(flexmock(permission="custom_role"))

assert project.has_permission("testuser", AccessLevel.pull) is False
Loading
Loading