Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions src/authorization/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@

@lru_cache(maxsize=1)
def get_authorization_resolvers() -> Tuple[RolesResolver, AccessResolver]:
"""Get authorization resolvers from configuration (cached)."""
"""Get authorization resolvers from configuration (cached).

Return the configured RolesResolver and AccessResolver based on
authentication and authorization settings.

The selection mirrors configuration: returns noop resolvers for
NOOP/K8S/NOOP_WITH_TOKEN or when JWT role rules or authorization access
rules are not set; returns JwtRolesResolver and GenericAccessResolver when
JWK_TOKEN configuration provides role and access rules. The result is
cached to avoid recomputing resolvers.

Returns:
tuple[RolesResolver, AccessResolver]: (roles_resolver, access_resolver)
appropriate for the current configuration.
"""
authorization_cfg = configuration.authorization_configuration
authentication_config = configuration.authentication_configuration

Expand Down Expand Up @@ -83,7 +97,29 @@ def get_authorization_resolvers() -> Tuple[RolesResolver, AccessResolver]:
async def _perform_authorization_check(
action: Action, args: tuple[Any, ...], kwargs: dict[str, Any]
) -> None:
"""Perform authorization check - common logic for all decorators."""
"""Perform authorization check - common logic for all decorators.

Performs role resolution and access verification for the supplied `action`
using configured resolvers. Expects `kwargs` to contain an `auth` value
from the authentication dependency; if a Request is present in `args` or
`kwargs` its `state.authorized_actions` will be set to the set of actions
the resolved roles are authorized to perform.

Parameters:
action (Action): The action to authorize.
args (tuple[Any, ...]): Positional arguments passed to the endpoint;
used to locate a Request instance if present.
kwargs (dict[str, Any]): Keyword arguments passed to the endpoint; must
include `auth` (authentication info) and may include `request`.

Returns:
none

Raises:
HTTPException: with 500 Internal Server Error if `auth` is missing from `kwargs`.
HTTPException: with 403 Forbidden if the resolved roles are not
permitted to perform `action`.
"""
role_resolver, access_resolver = get_authorization_resolvers()

try:
Expand Down Expand Up @@ -120,9 +156,31 @@ async def _perform_authorization_check(


def authorize(action: Action) -> Callable:
"""Check authorization for an endpoint (async version)."""
"""Check authorization for an endpoint (async version).

Create a decorator that enforces the specified authorization action on an endpoint.

Parameters:
action (Action): The action that the decorated endpoint must be
authorized to perform.

Returns:
Callable: A decorator which, when applied to an endpoint function,
performs the authorization check for the given action before invoking
the function.
"""

def decorator(func: Callable) -> Callable:
"""
Wrap an endpoint function to perform an authorization check before invoking original one.

Parameters:
func (Callable): The function to wrap.

Returns:
Callable: A wrapper that performs authorization then calls `func`.
"""

@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
await _perform_authorization_check(action, args, kwargs)
Expand Down
183 changes: 169 additions & 14 deletions src/authorization/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,32 @@ class RolesResolver(ABC): # pylint: disable=too-few-public-methods

@abstractmethod
async def resolve_roles(self, auth: AuthTuple) -> UserRoles:
"""Given an auth tuple, return the list of user roles."""
"""Given an auth tuple, return the list of user roles.

Resolve and return the set of user roles extracted from the provided authentication tuple.

Parameters:
auth (AuthTuple): Authentication tuple (for example, a token and
associated metadata) used to determine roles.

Returns:
UserRoles: A set of role names associated with the authenticated subject.
"""


class NoopRolesResolver(RolesResolver): # pylint: disable=too-few-public-methods
"""No-op roles resolver that does not perform any role resolution."""

async def resolve_roles(self, auth: AuthTuple) -> UserRoles:
"""Return an empty list of roles."""
"""Return an empty list of roles.

Produce an empty set of user roles; no role resolution is performed.

The provided `auth` tuple is accepted but ignored.

Returns:
An empty set of role names.
"""
_ = auth # Unused
return set()

Expand All @@ -44,6 +62,9 @@ def unsafe_get_claims(token: str) -> dict[str, Any]:

A somewhat hacky way to get JWT claims without verifying the signature.
We assume verification has already been done during authentication.

Returns:
dict[str, Any]: Claims dictionary parsed from the JWT payload.
"""
payload = token.split(".")[1]
padded = payload + "=" * (-len(payload) % 4)
Expand All @@ -54,11 +75,28 @@ class JwtRolesResolver(RolesResolver): # pylint: disable=too-few-public-methods
"""Processes JWT claims with the given JSONPath rules to get roles."""

def __init__(self, role_rules: list[JwtRoleRule]):
"""Initialize the resolver with rules."""
"""Initialize the resolver with rules.

Create a JwtRolesResolver configured with JWT-to-role extraction rules.

Parameters:
role_rules (list[JwtRoleRule]): Ordered list of rules that map JWT
claim matches to roles. Each rule specifies a JSONPath to evaluate,
an operator to apply to matches, the roles to grant when the rule
matches, and optional negation or regex matching.
"""
self.role_rules = role_rules

async def resolve_roles(self, auth: AuthTuple) -> UserRoles:
"""Extract roles from JWT claims using configured rules."""
"""Extract roles from JWT claims using configured rules.

Determine user roles by evaluating configured JwtRoleRule objects
against JWT claims extracted from the provided AuthTuple.

Returns:
roles (UserRoles): Set of role names derived from all configured
rules that match the token's claims.
"""
jwt_claims = self._get_claims(auth)
return {
role
Expand All @@ -68,7 +106,20 @@ async def resolve_roles(self, auth: AuthTuple) -> UserRoles:

@staticmethod
def evaluate_role_rules(rule: JwtRoleRule, jwt_claims: dict[str, Any]) -> UserRoles:
"""Get roles from a JWT role rule if it matches the claims."""
"""Get roles from a JWT role rule if it matches the claims.

Determine which roles from a JwtRoleRule apply to the provided JWT claims.

Parameters:
rule (JwtRoleRule): Rule containing a JSONPath expression,
operator, and associated roles to grant when matched.
jwt_claims (dict[str, Any]): Decoded JWT claims to evaluate against
the rule's JSONPath.

Returns:
roles (set[str]): The set of roles from `rule.roles` if the rule
matches `jwt_claims`, otherwise an empty set.
"""
return (
set(rule.roles)
if JwtRolesResolver._evaluate_operator(
Expand All @@ -80,7 +131,18 @@ def evaluate_role_rules(rule: JwtRoleRule, jwt_claims: dict[str, Any]) -> UserRo

@staticmethod
def _get_claims(auth: AuthTuple) -> dict[str, Any]:
"""Get the JWT claims from the auth tuple."""
"""Get the JWT claims from the auth tuple.

Extract JWT claims from an AuthTuple.

Parameters:
auth (AuthTuple): Authentication tuple where the fourth element is the JWT token.

Returns:
dict[str, Any]: Decoded JWT claims as a dictionary. Returns an
empty dict when the token equals constants.NO_USER_TOKEN (guest).
The token payload is decoded without validating the JWT signature.
"""
_, _, _, token = auth
if token == constants.NO_USER_TOKEN:
# No claims for guests
Expand All @@ -93,7 +155,28 @@ def _get_claims(auth: AuthTuple) -> dict[str, Any]:
def _evaluate_operator(
rule: JwtRoleRule, match: Any
) -> bool: # pylint: disable=too-many-branches
"""Evaluate an operator against a match and rule."""
"""Evaluate an operator against a match and rule.

Determine whether a single JSONPath rule condition matches the provided value.

Evaluates the rule's operator against the given match and applies rule.negate if set.
Supported operators:
- EQUALS: match equals rule.value.
- CONTAINS: rule.value is contained in match.
- IN: match is contained in rule.value.
- MATCH: any string item in match matches rule.compiled_regex (if
compiled_regex is provided).

Parameters:
rule (JwtRoleRule): The role rule containing operator, value,
negate flag, and optionally compiled_regex.
match (Any): The value(s) produced by evaluating the JSONPath; may
be a single value or an iterable of values for MATCH.

Returns:
bool: `true` if the operator evaluation (after applying negation
when set) succeeds, `false` otherwise.
"""
result = False
match rule.operator:
case JsonPathOperator.EQUALS:
Expand Down Expand Up @@ -121,24 +204,62 @@ class AccessResolver(ABC): # pylint: disable=too-few-public-methods

@abstractmethod
def check_access(self, action: Action, user_roles: UserRoles) -> bool:
"""Check if the user has access to the specified action based on their roles."""
"""Check if the user has access to the specified action based on their roles.

Determine whether any of the given user roles permit performing the specified action.

Parameters:
action (Action): The action to authorize.
user_roles (UserRoles): Set of role names assigned to the user.

Returns:
bool: `true` if at least one role in `user_roles` grants the
requested `action`, `false` otherwise.
"""

@abstractmethod
def get_actions(self, user_roles: UserRoles) -> set[Action]:
"""Get the actions that the user can perform based on their roles."""
"""Get the actions that the user can perform based on their roles.

Compute the set of actions permitted for the provided user roles.

Parameters:
user_roles (UserRoles): Set of role names to evaluate.

Returns:
set[Action]: The aggregated set of allowed actions for the given
roles. If `ADMIN` is included in the aggregated actions, returns
all available non-`ADMIN` actions.
"""


class NoopAccessResolver(AccessResolver): # pylint: disable=too-few-public-methods
"""No-op access resolver that does not perform any access checks."""

def check_access(self, action: Action, user_roles: UserRoles) -> bool:
"""Return True always, indicating access is granted."""
"""Return True always, indicating access is granted.

Grant all access unconditionally.

Parameters:
action (Action): Ignored.
user_roles (UserRoles): Ignored.

Returns:
`true` always (access is always granted).
"""
_ = action # We're noop, it doesn't matter, everyone is allowed
_ = user_roles # We're noop, it doesn't matter, everyone is allowed
return True

def get_actions(self, user_roles: UserRoles) -> set[Action]:
"""Return an empty set of actions, indicating no specific actions are allowed."""
"""Return an empty set of actions, indicating no specific actions are allowed.

Determine the set of actions permitted for any user under the noop access resolver.

Returns:
allowed_actions (set[Action]): All defined `Action` values except `Action.ADMIN`.
"""
Comment on lines +256 to +262
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docstring contradicts behavior in get_actions.
It says “empty set” but the method returns all actions except ADMIN.

Proposed fix
-        """Return an empty set of actions, indicating no specific actions are allowed.
+        """Return all actions except the ADMIN override.
 
-        Determine the set of actions permitted for any user under the noop access resolver.
+        Determine the set of actions permitted for any user under the noop access resolver
+        (all actions except ADMIN).
 
         Returns:
-            allowed_actions (set[Action]): All defined `Action` values except `Action.ADMIN`.
+            allowed_actions (set[Action]): All defined `Action` values except `Action.ADMIN`.
         """
🤖 Prompt for AI Agents
In `@src/authorization/resolvers.py` around lines 256 - 262, The docstring for
get_actions contradicts its implementation: it says "empty set" but the function
returns all actions except Action.ADMIN; update the docstring to accurately
describe behavior (that get_actions returns the set of all Action values
excluding Action.ADMIN) and mention the returned variable allowed_actions and
the exclusion of Action.ADMIN so callers understand the resolver's semantics.

_ = user_roles # We're noop, it doesn't matter, everyone is allowed
return set(Action) - {Action.ADMIN}

Expand All @@ -151,7 +272,19 @@ class GenericAccessResolver(AccessResolver): # pylint: disable=too-few-public-m
"""

def __init__(self, access_rules: list[AccessRule]):
"""Initialize the access resolver with access rules."""
"""Initialize the access resolver with access rules.

Create a GenericAccessResolver and build an internal mapping of roles to allowed actions.

Parameters:
access_rules (list[AccessRule]): List of access rules used to
populate the resolver. Each rule's `role` is mapped to the union of
its `actions`.

Raises:
ValueError: If any rule contains the `Action.ADMIN` action together
with other actions.
"""
for rule in access_rules:
# Since this is nonsensical, it might be a mistake, so hard fail
if Action.ADMIN in rule.actions and len(rule.actions) > 1:
Expand All @@ -169,7 +302,21 @@ def __init__(self, access_rules: list[AccessRule]):
self._access_lookup[rule.role].update(rule.actions)

def check_access(self, action: Action, user_roles: UserRoles) -> bool:
"""Check if the user has access to the specified action based on their roles."""
"""Check if the user has access to the specified action based on their roles.

Determine whether the provided roles permit performing the specified action.

If any role grants the ADMIN action, that role permits all non-ADMIN
actions (ADMIN acts as a full override).

Parameters:
action (Action): The action to check.
user_roles (UserRoles): The set of roles assigned to the user.

Returns:
true if at least one role permits the action or ADMIN override
applies, false otherwise.
"""
if action != Action.ADMIN and self.check_access(Action.ADMIN, user_roles):
# Recurse to check if the roles allow the user to perform the admin action,
# if they do, then we allow any action
Expand All @@ -188,7 +335,15 @@ def check_access(self, action: Action, user_roles: UserRoles) -> bool:
return False

def get_actions(self, user_roles: UserRoles) -> set[Action]:
"""Get the actions that the user can perform based on their roles."""
"""Get the actions that the user can perform based on their roles.

Determine which actions are permitted for the given user roles.

Returns:
allowed_actions (set[Action]): Set of actions the user may perform.
If any role grants Action.ADMIN, returns every Action except
Action.ADMIN.
"""
actions = {
action
for role in user_roles
Expand Down
Loading
Loading