From 35b368abe8a69bd8228b737b1b3cfed8bf6b006a Mon Sep 17 00:00:00 2001 From: Minh Nguyen Cong Date: Mon, 19 Jan 2026 21:48:17 +0100 Subject: [PATCH 1/5] feat: shared network client and token storage --- boxsdk/client/client.py | 330 +++++++++++++++ boxsdk/util/token_storage_adapter.py | 82 ++++ docs/config-sharing-implementation.md | 276 +++++++++++++ .../boxsdk/integration/test_config_sharing.py | 332 +++++++++++++++ .../boxsdk/unit/client/test_config_sharing.py | 389 ++++++++++++++++++ .../unit/util/test_token_storage_adapter.py | 199 +++++++++ 6 files changed, 1608 insertions(+) create mode 100644 boxsdk/util/token_storage_adapter.py create mode 100644 docs/config-sharing-implementation.md create mode 100644 test/boxsdk/integration/test_config_sharing.py create mode 100644 test/boxsdk/unit/client/test_config_sharing.py create mode 100644 test/boxsdk/unit/util/test_token_storage_adapter.py diff --git a/boxsdk/client/client.py b/boxsdk/client/client.py index 67b4d4ed6..db2538bde 100644 --- a/boxsdk/client/client.py +++ b/boxsdk/client/client.py @@ -19,6 +19,21 @@ from ..util.datetime_formatter import normalize_date_to_rfc3339_format from ..util.shared_link import get_shared_link_header from ..util.deprecation_decorator import deprecated +from ...auth.developer_token_auth import DeveloperTokenAuth +from ...auth.jwt_auth import JWTAuth +from ...auth.ccg_auth import CCGAuth +from ...auth.oauth2 import OAuth2 as LegacyOAuth2 +from ...util.token_storage_adapter import LegacyTokenStorageAdapter + +from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth +from box_sdk_gen.box.oauth import BoxOAuth, OAuthConfig +from box_sdk_gen.box.jwt_auth import BoxJWTAuth, JWTConfig +from box_sdk_gen.box.ccg_auth import BoxCCGAuth, CCGConfig +from box_sdk_gen.client import BoxClient +from box_sdk_gen.networking.network import NetworkSession +from box_sdk_gen.networking.base_urls import BaseUrls +from box_sdk_gen.networking.retries import BoxRetryStrategy +from box_sdk_gen.schemas.access_token import AccessToken if TYPE_CHECKING: from boxsdk import OAuth2 @@ -2009,3 +2024,318 @@ def get_ai_agent_default_config( session=self._session, response_object=box_response.json(), ) + + def get_authentication(self, *, token_storage=None): + """ + Extract authentication configuration from this legacy client and convert it + to a generated SDK Authentication object. + + This method supports the following legacy authentication types: + - DeveloperTokenAuth -> BoxDeveloperTokenAuth + - OAuth2 -> BoxOAuth + - JWTAuth -> BoxJWTAuth + - CCGAuth -> BoxCCGAuth + + :param token_storage: + Optional TokenStorage instance for the generated SDK. + If not provided, an adapter will be created to bridge legacy token storage. + :return: + Authentication object compatible with the generated SDK (box_sdk_gen). + :raises ValueError: + If the authentication type is not supported or required credentials are missing. + :raises ValueError: + If the authentication type is not supported or required credentials are missing. + """ + oauth = self._oauth + + # Developer Token Authentication + if isinstance(oauth, DeveloperTokenAuth): + token = oauth.access_token + if not token: + raise ValueError("Developer token is not available") + return BoxDeveloperTokenAuth(token=token) + + # OAuth 2.0 Authentication + # Check if it's OAuth2 (but not DeveloperTokenAuth, JWTAuth, or CCGAuth) + if isinstance(oauth, LegacyOAuth2) and not isinstance(oauth, (DeveloperTokenAuth, JWTAuth, CCGAuth)): + # It's OAuth2 + client_id = getattr(oauth, '_client_id', None) + client_secret = getattr(oauth, '_client_secret', None) + + if not client_id or not client_secret: + raise ValueError("OAuth2 client_id and client_secret are required") + + # Create token storage adapter if not provided + if token_storage is None: + # Create adapter from legacy OAuth2's token storage + def get_tokens(): + return oauth._get_tokens() + + def store_tokens(access_token, refresh_token): + oauth._store_tokens(access_token, refresh_token) + + token_storage = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=store_tokens + ) + + config = OAuthConfig( + client_id=client_id, + client_secret=client_secret, + token_storage=token_storage + ) + + # Pre-populate with existing tokens if available + auth = BoxOAuth(config=config) + access_token, refresh_token = oauth._get_tokens() + if access_token: + existing_token = AccessToken( + access_token=access_token, + refresh_token=refresh_token, + expires_in=3600, # Default, actual expiry not available + token_type='bearer' + ) + token_storage.store(existing_token) + + return auth + + # JWT Authentication + if isinstance(oauth, JWTAuth): + client_id = getattr(oauth, '_client_id', None) + client_secret = getattr(oauth, '_client_secret', None) + jwt_key_id = getattr(oauth, '_jwt_key_id', None) + rsa_private_key = getattr(oauth, '_rsa_private_key', None) + enterprise_id = getattr(oauth, '_enterprise_id', None) + user_id = getattr(oauth, '_user_id', None) + + if not all([client_id, client_secret, jwt_key_id, rsa_private_key]): + raise ValueError("JWT authentication requires client_id, client_secret, jwt_key_id, and private key") + + # Convert RSA private key to string format + # Note: If the key was originally encrypted, we can't extract the passphrase + # from the normalized RSAPrivateKey object. We'll serialize it unencrypted. + from cryptography.hazmat.primitives import serialization + try: + # Serialize the key to PEM format (unencrypted) + # This works even if the original key was encrypted, as the + # normalized RSAPrivateKey object is already decrypted + private_key_pem = rsa_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ).decode('utf-8') + # Passphrase is not needed since we're serializing unencrypted + # The generated SDK will handle encryption if needed + passphrase = '' + except Exception as e: + raise ValueError( + f"Cannot serialize private key: {e}. " + "Please ensure the private key is valid." + ) from e + + # Create token storage adapter if not provided + if token_storage is None: + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + token_storage = InMemoryTokenStorage() + + config = JWTConfig( + client_id=client_id, + client_secret=client_secret, + jwt_key_id=jwt_key_id, + private_key=private_key_pem, + private_key_passphrase=passphrase, + enterprise_id=enterprise_id, + user_id=user_id, + token_storage=token_storage + ) + + auth = BoxJWTAuth(config=config) + + # Handle user vs enterprise scope + if user_id: + auth = auth.with_user_subject(user_id, token_storage=token_storage) + + return auth + + # CCG (Client Credentials Grant) Authentication + if isinstance(oauth, CCGAuth): + client_id = getattr(oauth, '_client_id', None) + client_secret = getattr(oauth, '_client_secret', None) + enterprise_id = getattr(oauth, '_enterprise_id', None) + user_id = getattr(oauth, '_user_id', None) + + if not client_id or not client_secret: + raise ValueError("CCG authentication requires client_id and client_secret") + + # Create token storage adapter if not provided + if token_storage is None: + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + token_storage = InMemoryTokenStorage() + + config = CCGConfig( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + user_id=user_id, + token_storage=token_storage + ) + + auth = BoxCCGAuth(config=config) + + # Handle user vs enterprise scope + if user_id: + auth = auth.with_user_subject(user_id, token_storage=token_storage) + + return auth + + raise ValueError( + f"Unsupported authentication type: {type(oauth).__name__}. " + "Supported types: DeveloperTokenAuth, OAuth2, JWTAuth, CCGAuth" + ) + + def get_network_session( + self, + *, + network_client=None, + retry_strategy=None, + data_sanitizer=None, + interceptors=None, + additional_headers=None + ): + """ + Extract network configuration from this legacy client and convert it + to a generated SDK NetworkSession object. + + :param network_client: + Optional NetworkClient instance for the generated SDK. + If not provided, a default will be created with proxy support if configured. + :param retry_strategy: + Optional RetryStrategy instance for the generated SDK. + If not provided, one will be created from legacy retry settings. + :param data_sanitizer: + Optional DataSanitizer instance for the generated SDK. + :param interceptors: + Optional list of request/response interceptors. + :param additional_headers: + Optional dictionary of additional HTTP headers to merge with legacy headers. + :return: + NetworkSession object compatible with the generated SDK (box_sdk_gen). + """ + session = self._session + api_config = session.api_config + proxy_config = session.proxy_config + + # Extract base URLs + base_url = getattr(api_config, 'BASE_API_URL', 'https://api.box.com/2.0') + # Remove version suffix if present + if base_url.endswith('/2.0'): + base_url = base_url[:-4] + elif base_url.endswith('/2'): + base_url = base_url[:-2] + + upload_url = getattr(api_config, 'UPLOAD_URL', 'https://upload.box.com/api/2.0') + # Remove version suffix if present + if upload_url.endswith('/2.0'): + upload_url = upload_url[:-4] + elif upload_url.endswith('/2'): + upload_url = upload_url[:-2] + + oauth2_url = getattr(api_config, 'OAUTH2_AUTHORIZE_URL', 'https://account.box.com/api/oauth2') + # Extract base OAuth URL + if '/authorize' in oauth2_url: + oauth2_url = oauth2_url[:oauth2_url.rindex('/authorize')] + + base_urls = BaseUrls( + base_url=base_url, + upload_url=upload_url, + oauth_2_url=oauth2_url + ) + + # Extract or create retry strategy + if retry_strategy is None: + max_retries = getattr(api_config, 'MAX_RETRY_ATTEMPTS', 5) + retry_base_interval = getattr(session, '_retry_base_interval', 1.0) + retry_strategy = BoxRetryStrategy( + max_attempts=max_retries, + retry_base_interval=retry_base_interval + ) + + # Handle proxy configuration + proxy_url = None + if proxy_config and hasattr(proxy_config, 'URL') and proxy_config.URL: + proxy_url = proxy_config.URL + # Handle authenticated proxy + if hasattr(proxy_config, 'AUTH') and proxy_config.AUTH: + auth = proxy_config.AUTH + if isinstance(auth, dict) and 'user' in auth and 'password' in auth: + # Extract host from URL + host = proxy_url.split('//')[1] if '//' in proxy_url else proxy_url + proxy_url = f"http://{auth['user']}:{auth['password']}@{host}" + + # Merge custom headers + headers = {} + if hasattr(session, '_default_headers'): + headers.update(session._default_headers.copy()) + if additional_headers: + headers.update(additional_headers) + + # Create network session + network_session = NetworkSession( + base_urls=base_urls, + network_client=network_client, + retry_strategy=retry_strategy, + additional_headers=headers if headers else None, + proxy_url=proxy_url, + data_sanitizer=data_sanitizer + ) + + return network_session + + def get_sdk_gen_client( + self, + *, + auth_options=None, + network_options=None + ): + """ + Create a fully configured generated SDK client from this legacy client. + + This method combines get_authentication() and get_network_session() to create + a BoxClient instance that shares authentication and network configuration + with this legacy client. + + :param auth_options: + Optional dictionary with authentication options: + - token_storage: Custom TokenStorage instance + :param network_options: + Optional dictionary with network options: + - network_client: Custom NetworkClient instance + - retry_strategy: Custom RetryStrategy instance + - data_sanitizer: Custom DataSanitizer instance + - interceptors: List of request/response interceptors + - additional_headers: Dictionary of additional HTTP headers + :return: + BoxClient instance from box_sdk_gen, fully configured with shared settings. + """ + # Extract authentication + token_storage = None + if auth_options and 'token_storage' in auth_options: + token_storage = auth_options['token_storage'] + + auth = self.get_authentication(token_storage=token_storage) + + # Extract network session + network_kwargs = {} + if network_options: + network_kwargs = { + 'network_client': network_options.get('network_client'), + 'retry_strategy': network_options.get('retry_strategy'), + 'data_sanitizer': network_options.get('data_sanitizer'), + 'interceptors': network_options.get('interceptors'), + 'additional_headers': network_options.get('additional_headers') + } + + network_session = self.get_network_session(**network_kwargs) + + # Create and return fully configured client + return BoxClient(auth=auth, network_session=network_session) diff --git a/boxsdk/util/token_storage_adapter.py b/boxsdk/util/token_storage_adapter.py new file mode 100644 index 000000000..dd7a26ef9 --- /dev/null +++ b/boxsdk/util/token_storage_adapter.py @@ -0,0 +1,82 @@ +""" +Token storage adapter to bridge legacy OAuth2 token storage to generated SDK TokenStorage. + +This module provides adapters that allow legacy SDK token storage mechanisms +(callbacks, in-memory storage) to work with the generated SDK's TokenStorage interface. +""" + +from typing import Optional, Callable, Tuple +from box_sdk_gen.box.token_storage import TokenStorage +from box_sdk_gen.schemas.access_token import AccessToken + + +class LegacyTokenStorageAdapter(TokenStorage): + """ + Adapter that bridges legacy OAuth2 token storage (callbacks) to generated SDK TokenStorage. + + This adapter wraps legacy token storage mechanisms (store_tokens callback and + _get_tokens method) to provide a TokenStorage interface for the generated SDK. + """ + + def __init__( + self, + get_tokens: Callable[[], Tuple[Optional[str], Optional[str]]], + store_tokens: Optional[Callable[[str, Optional[str]], None]] = None, + ): + """ + Initialize the adapter with legacy token storage callbacks. + + :param get_tokens: + Callable that returns (access_token, refresh_token) tuple. + This can be a method from OAuth2._get_tokens or a custom callback. + :param store_tokens: + Optional callable that stores tokens. Takes (access_token, refresh_token). + If None, tokens will only be read from get_tokens but not persisted. + """ + self._get_tokens = get_tokens + self._store_tokens = store_tokens + + def store(self, token: AccessToken) -> None: + """ + Store a token using the legacy storage mechanism. + + :param token: + AccessToken object from generated SDK. + """ + if self._store_tokens is not None: + refresh_token = token.refresh_token if hasattr(token, 'refresh_token') else None + self._store_tokens(token.access_token, refresh_token) + + def get(self) -> Optional[AccessToken]: + """ + Get the current token from legacy storage. + + :return: + AccessToken object if tokens are available, None otherwise. + """ + access_token, refresh_token = self._get_tokens() + + if access_token is None: + return None + + # Convert legacy token format to generated SDK AccessToken + # The generated SDK AccessToken has: access_token, refresh_token, expires_in, token_type + # We don't have expires_in from legacy, so we'll set a default or calculate if possible + expires_in = 3600 # Default to 1 hour if not available + + return AccessToken( + access_token=access_token, + refresh_token=refresh_token, + expires_in=expires_in, + token_type='bearer' + ) + + def clear(self) -> None: + """ + Clear stored tokens using the legacy storage mechanism. + + Note: This will call store_tokens with None values if the callback supports it. + """ + if self._store_tokens is not None: + self._store_tokens(None, None) + diff --git a/docs/config-sharing-implementation.md b/docs/config-sharing-implementation.md new file mode 100644 index 000000000..07e960cb2 --- /dev/null +++ b/docs/config-sharing-implementation.md @@ -0,0 +1,276 @@ +# Configuration Sharing Implementation + +This document describes the implementation of configuration sharing between the legacy `boxsdk` package and the new auto-generated `box_sdk_gen` package. + +## Overview + +The configuration sharing feature allows developers to seamlessly migrate from the legacy SDK to the generated SDK by automatically extracting and converting authentication and network configuration from a legacy client to a generated client. + +## Implementation Components + +### 1. Token Storage Adapter (`boxsdk/util/token_storage_adapter.py`) + +The `LegacyTokenStorageAdapter` class bridges the gap between legacy OAuth2 token storage mechanisms (callbacks) and the generated SDK's `TokenStorage` interface. + +**Key Features:** +- Converts legacy token format (access_token, refresh_token tuple) to generated SDK `AccessToken` objects +- Supports both read and write operations +- Handles token storage callbacks from legacy OAuth2 implementations + +### 2. Client Methods + +Three new methods have been added to the `Client` class in `boxsdk/client/client.py`: + +#### `get_authentication(token_storage=None)` + +Extracts authentication configuration from the legacy client and converts it to a generated SDK `Authentication` object. + +**Supported Authentication Types:** +- `DeveloperTokenAuth` → `BoxDeveloperTokenAuth` +- `OAuth2` → `BoxOAuth` +- `JWTAuth` → `BoxJWTAuth` +- `CCGAuth` → `BoxCCGAuth` + +**Parameters:** +- `token_storage` (optional): Custom `TokenStorage` instance. If not provided, an adapter will be created to bridge legacy token storage. + +**Returns:** +- `Authentication` object compatible with `box_sdk_gen` + +**Example:** +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +# Legacy client +legacy_auth = OAuth2(client_id="...", client_secret="...") +legacy_client = Client(legacy_auth) + +# Get generated SDK authentication +gen_auth = legacy_client.get_authentication() +``` + +#### `get_network_session(**options)` + +Extracts network configuration from the legacy client and converts it to a generated SDK `NetworkSession` object. + +**Parameters:** +- `network_client` (optional): Custom `NetworkClient` instance +- `retry_strategy` (optional): Custom `RetryStrategy` instance +- `data_sanitizer` (optional): Custom `DataSanitizer` instance +- `interceptors` (optional): List of request/response interceptors +- `additional_headers` (optional): Dictionary of additional HTTP headers + +**Returns:** +- `NetworkSession` object compatible with `box_sdk_gen` + +**Configuration Mapping:** +- Base URLs: Extracted from `API` config +- Proxy settings: Extracted from `Proxy` config +- Retry strategy: Extracted from `API.MAX_RETRY_ATTEMPTS` and session retry settings +- Custom headers: Merged from session default headers and additional headers + +**Example:** +```python +network_session = legacy_client.get_network_session( + additional_headers={"X-Custom-Header": "value"} +) +``` + +#### `get_sdk_gen_client(auth_options=None, network_options=None)` + +Creates a fully configured generated SDK client from the legacy client. This is the main convenience method that combines `get_authentication()` and `get_network_session()`. + +**Parameters:** +- `auth_options` (optional): Dictionary with authentication options + - `token_storage`: Custom `TokenStorage` instance +- `network_options` (optional): Dictionary with network options + - `network_client`: Custom `NetworkClient` instance + - `retry_strategy`: Custom `RetryStrategy` instance + - `data_sanitizer`: Custom `DataSanitizer` instance + - `interceptors`: List of request/response interceptors + - `additional_headers`: Dictionary of additional HTTP headers + +**Returns:** +- `BoxClient` instance from `box_sdk_gen`, fully configured with shared settings + +**Example:** +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +# Legacy client setup +legacy_auth = OAuth2(client_id="...", client_secret="...") +legacy_client = Client(legacy_auth) + +# Get generated SDK client with one line +gen_client = legacy_client.get_sdk_gen_client() + +# Use generated client for new features +user = gen_client.users.get_user_by_id("me") +folders = gen_client.folders.get_folder_items("0") + +# Legacy client still works for existing code +legacy_user = legacy_client.user().get() +``` + +## Usage Examples + +### Developer Token + +```python +from boxsdk import Client +from boxsdk.auth import DeveloperTokenAuth + +legacy_auth = DeveloperTokenAuth() +legacy_client = Client(legacy_auth) + +# Get generated client +gen_client = legacy_client.get_sdk_gen_client() +``` + +### OAuth 2.0 with Token Refresh + +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +legacy_auth = OAuth2( + client_id="...", + client_secret="...", + access_token="...", + refresh_token="..." +) +legacy_client = Client(legacy_auth) + +# Get generated client with shared token storage +gen_client = legacy_client.get_sdk_gen_client() + +# Both clients share the same token storage +# Token refresh by either updates both +``` + +### JWT Authentication + +```python +from boxsdk import Client +from boxsdk.auth import JWTAuth + +legacy_auth = JWTAuth( + client_id="...", + client_secret="...", + enterprise_id="...", + jwt_key_id="...", + rsa_private_key_file_sys_path="path/to/key.pem" +) +legacy_client = Client(legacy_auth) + +# Get generated client +gen_client = legacy_client.get_sdk_gen_client() + +# Handle user vs enterprise scope +if user_id: + gen_auth = legacy_client.get_authentication() + gen_auth = gen_auth.with_user_subject(user_id) + gen_client = BoxClient(auth=gen_auth, network_session=legacy_client.get_network_session()) +``` + +### CCG Authentication + +```python +from boxsdk import Client +from boxsdk.auth import CCGAuth + +legacy_auth = CCGAuth( + client_id="...", + client_secret="...", + enterprise_id="..." +) +legacy_client = Client(legacy_auth) + +# Get generated client +gen_client = legacy_client.get_sdk_gen_client() +``` + +### Custom Network Configuration + +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +legacy_client = Client(OAuth2(...)) + +# Get generated client with custom network options +gen_client = legacy_client.get_sdk_gen_client( + network_options={ + "additional_headers": {"X-Custom-Header": "value"}, + "retry_strategy": custom_retry_strategy + } +) +``` + +## Implementation Details + +### Token Storage Adapter + +The `LegacyTokenStorageAdapter` implements the `TokenStorage` interface from `box_sdk_gen`: + +- `store(token: AccessToken)`: Stores tokens using legacy storage mechanism +- `get() -> Optional[AccessToken]`: Retrieves tokens from legacy storage +- `clear()`: Clears stored tokens + +The adapter converts between: +- Legacy format: `(access_token: str, refresh_token: Optional[str])` +- Generated format: `AccessToken(access_token, refresh_token, expires_in, token_type)` + +### Authentication Conversion + +Each authentication type is handled specifically: + +1. **DeveloperTokenAuth**: Direct token extraction +2. **OAuth2**: Client ID/secret extraction + token storage adapter +3. **JWTAuth**: Full credential extraction including private key serialization +4. **CCGAuth**: Client ID/secret + enterprise/user ID extraction + +### Network Configuration Conversion + +Network settings are extracted from: +- `Session.api_config`: Base URLs, OAuth URLs +- `Session.proxy_config`: Proxy settings +- `Session._default_headers`: Custom headers +- `API.MAX_RETRY_ATTEMPTS`: Retry configuration + +## Limitations + +1. **JWT Private Key**: If the JWT private key was originally encrypted, the passphrase cannot be extracted from the normalized `RSAPrivateKey` object. The key will be serialized unencrypted. + +2. **Token Expiry**: The legacy SDK doesn't always track token expiry times, so the adapter uses a default value (3600 seconds) when converting to `AccessToken`. + +3. **Custom Token Storage**: If using custom token storage callbacks in the legacy SDK, ensure they're thread-safe if both clients will be used concurrently. + +## Error Handling + +The implementation includes comprehensive error handling: + +- `ValueError`: Raised for unsupported auth types or missing credentials +- `TypeError`: Raised for invalid parameter types + +Note: Since `boxsdk` and `box_sdk_gen` are always installed together, import errors for `box_sdk_gen` should not occur in practice. + +## Testing + +Unit tests should cover: +- Token storage adapter conversion +- Each authentication type conversion +- Network configuration extraction +- Error cases (missing credentials, unsupported types) +- Integration tests for `get_sdk_gen_client()` + +## Future Enhancements + +Potential improvements: +1. Support for additional authentication types +2. Better token expiry tracking +3. Support for encrypted JWT keys with passphrase extraction +4. More comprehensive network configuration mapping + diff --git a/test/boxsdk/integration/test_config_sharing.py b/test/boxsdk/integration/test_config_sharing.py new file mode 100644 index 000000000..10adfe8ef --- /dev/null +++ b/test/boxsdk/integration/test_config_sharing.py @@ -0,0 +1,332 @@ +""" +Integration tests for configuration sharing between legacy and generated SDKs. + +These tests verify that configuration sharing works end-to-end with real +authentication and network configurations. +""" + +from unittest.mock import Mock, MagicMock, patch +import pytest + +from boxsdk import Client +from boxsdk.auth.developer_token_auth import DeveloperTokenAuth +from boxsdk.auth.oauth2 import OAuth2 +from boxsdk.auth.jwt_auth import JWTAuth +from boxsdk.auth.ccg_auth import CCGAuth +from boxsdk.config import API, Proxy +from box_sdk_gen.client import BoxClient +from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth +from box_sdk_gen.box.oauth import BoxOAuth +from box_sdk_gen.box.jwt_auth import BoxJWTAuth +from box_sdk_gen.box.ccg_auth import BoxCCGAuth +from box_sdk_gen.networking.network import NetworkSession + + +class TestConfigSharingIntegration: + """Integration tests for configuration sharing.""" + + def test_developer_token_roundtrip(self, mock_box_session): + """Test that developer token auth works end-to-end.""" + token = 'test_dev_token_12345' + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + legacy_client = Client(legacy_auth, session=mock_box_session) + + # Get generated client + gen_client = legacy_client.get_sdk_gen_client() + + # Verify both clients are configured + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxDeveloperTokenAuth) + assert gen_client.auth.token == token + + # Verify network session is configured + assert isinstance(gen_client.network_session, NetworkSession) + + def test_oauth2_roundtrip(self, mock_box_session): + """Test that OAuth2 auth works end-to-end with token sharing.""" + client_id = 'test_client_id_123' + client_secret = 'test_client_secret_456' + access_token = 'test_access_token_789' + refresh_token = 'test_refresh_token_012' + + legacy_auth = OAuth2( + client_id=client_id, + client_secret=client_secret, + access_token=access_token, + refresh_token=refresh_token + ) + + legacy_client = Client(legacy_auth, session=mock_box_session) + + # Get generated client + gen_client = legacy_client.get_sdk_gen_client() + + # Verify authentication + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxOAuth) + assert gen_client.auth.config.client_id == client_id + assert gen_client.auth.config.client_secret == client_secret + + # Verify token storage is shared + stored_token = gen_client.auth.token_storage.get() + assert stored_token is not None + assert stored_token.access_token == access_token + assert stored_token.refresh_token == refresh_token + + def test_oauth2_token_synchronization(self, mock_box_session): + """Test that tokens are synchronized between legacy and generated clients.""" + client_id = 'test_client_id' + client_secret = 'test_client_secret' + + # Track token storage + stored_tokens = {'access': 'initial_token', 'refresh': 'initial_refresh'} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + legacy_auth = OAuth2( + client_id=client_id, + client_secret=client_secret, + store_tokens=store_tokens + ) + legacy_auth._get_tokens = get_tokens + legacy_auth._store_tokens = store_tokens + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Update token via generated client + from box_sdk_gen.schemas.access_token import AccessToken + new_token = AccessToken( + access_token='new_token', + refresh_token='new_refresh', + expires_in=3600, + token_type='bearer' + ) + gen_client.auth.token_storage.store(new_token) + + # Verify legacy client sees the update + access, refresh = legacy_auth._get_tokens() + assert access == 'new_token' + assert refresh == 'new_refresh' + + def test_network_configuration_preservation(self, mock_box_session): + """Test that network configuration is preserved.""" + # Set up custom API config + original_base_url = API.BASE_API_URL + original_upload_url = API.UPLOAD_URL + + API.BASE_API_URL = 'https://custom.api.box.com/2.0' + API.UPLOAD_URL = 'https://custom.upload.box.com/api/2.0' + + try: + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify base URLs are preserved + assert 'custom.api.box.com' in gen_client.network_session.base_urls.base_url + assert 'custom.upload.box.com' in gen_client.network_session.base_urls.upload_url + finally: + # Restore original values + API.BASE_API_URL = original_base_url + API.UPLOAD_URL = original_upload_url + + def test_proxy_configuration_preservation(self, mock_box_session): + """Test that proxy configuration is preserved.""" + # Set up proxy + original_proxy_url = Proxy.URL + original_proxy_auth = Proxy.AUTH + + Proxy.URL = 'http://proxy.example.com:8080' + Proxy.AUTH = {'user': 'proxy_user', 'password': 'proxy_pass'} + + try: + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify proxy is configured + assert gen_client.network_session.proxy_url is not None + assert 'proxy.example.com' in gen_client.network_session.proxy_url + assert 'proxy_user' in gen_client.network_session.proxy_url + finally: + # Restore original values + Proxy.URL = original_proxy_url + Proxy.AUTH = original_proxy_auth + + def test_custom_headers_preservation(self, mock_box_session): + """Test that custom headers are preserved.""" + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + # Add custom headers to session + mock_box_session._default_headers = {'X-Custom-Header': 'custom_value'} + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify headers are preserved + assert 'X-Custom-Header' in gen_client.network_session.additional_headers + assert gen_client.network_session.additional_headers['X-Custom-Header'] == 'custom_value' + + def test_retry_strategy_preservation(self, mock_box_session): + """Test that retry strategy is preserved.""" + # Modify API config + original_max_retries = API.MAX_RETRY_ATTEMPTS + API.MAX_RETRY_ATTEMPTS = 10 + + try: + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify retry strategy + assert gen_client.network_session.retry_strategy.max_attempts == 10 + finally: + API.MAX_RETRY_ATTEMPTS = original_max_retries + + def test_jwt_auth_roundtrip(self, mock_box_session): + """Test that JWT auth works end-to-end.""" + from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey + + # Create a mock RSA private key + mock_key = Mock(spec=RSAPrivateKey) + mock_key.private_bytes.return_value = b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + + client_id = 'test_jwt_client_id' + client_secret = 'test_jwt_client_secret' + jwt_key_id = 'test_jwt_key_id' + enterprise_id = 'test_enterprise_id' + + legacy_auth = JWTAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + jwt_key_id=jwt_key_id, + rsa_private_key_data=mock_key + ) + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Verify authentication + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxJWTAuth) + assert gen_client.auth.config.client_id == client_id + assert gen_client.auth.config.client_secret == client_secret + assert gen_client.auth.config.jwt_key_id == jwt_key_id + assert gen_client.auth.config.enterprise_id == enterprise_id + + def test_ccg_auth_roundtrip(self, mock_box_session): + """Test that CCG auth works end-to-end.""" + client_id = 'test_ccg_client_id' + client_secret = 'test_ccg_client_secret' + enterprise_id = 'test_ccg_enterprise_id' + + legacy_auth = CCGAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id + ) + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Verify authentication + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxCCGAuth) + assert gen_client.auth.config.client_id == client_id + assert gen_client.auth.config.client_secret == client_secret + assert gen_client.auth.config.enterprise_id == enterprise_id + + def test_parallel_client_usage(self, mock_box_session): + """Test that both legacy and generated clients can be used in parallel.""" + token = 'parallel_token' + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Both clients should be functional + assert legacy_client.auth.access_token == token + assert gen_client.auth.token == token + + # Both should have network sessions + assert legacy_client.session is not None + assert gen_client.network_session is not None + + def test_get_authentication_with_custom_token_storage(self, mock_box_session): + """Test get_authentication() with custom token storage.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + legacy_auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh' + ) + + custom_storage = InMemoryTokenStorage() + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_auth = legacy_client.get_authentication(token_storage=custom_storage) + + assert isinstance(gen_auth, BoxOAuth) + assert gen_auth.config.token_storage is custom_storage + + def test_get_network_session_with_custom_options(self, mock_box_session): + """Test get_network_session() with custom options.""" + from box_sdk_gen.networking.retries import BoxRetryStrategy + + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + custom_retry = BoxRetryStrategy(max_attempts=20, retry_base_interval=2.5) + custom_headers = {'X-Custom': 'value'} + + network_session = legacy_client.get_network_session( + retry_strategy=custom_retry, + additional_headers=custom_headers + ) + + assert network_session.retry_strategy.max_attempts == 20 + assert network_session.retry_strategy.retry_base_interval == 2.5 + assert network_session.additional_headers['X-Custom'] == 'value' + + def test_get_sdk_gen_client_with_all_options(self, mock_box_session): + """Test get_sdk_gen_client() with all options.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + from box_sdk_gen.networking.retries import BoxRetryStrategy + + legacy_auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh' + ) + + custom_storage = InMemoryTokenStorage() + custom_retry = BoxRetryStrategy(max_attempts=25) + custom_headers = {'X-Full-Test': 'full_value'} + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client( + auth_options={'token_storage': custom_storage}, + network_options={ + 'retry_strategy': custom_retry, + 'additional_headers': custom_headers + } + ) + + # Verify all options are applied + assert gen_client.auth.config.token_storage is custom_storage + assert gen_client.network_session.retry_strategy.max_attempts == 25 + assert gen_client.network_session.additional_headers['X-Full-Test'] == 'full_value' + diff --git a/test/boxsdk/unit/client/test_config_sharing.py b/test/boxsdk/unit/client/test_config_sharing.py new file mode 100644 index 000000000..a96c2f622 --- /dev/null +++ b/test/boxsdk/unit/client/test_config_sharing.py @@ -0,0 +1,389 @@ +""" +Unit tests for configuration sharing methods in Client class. + +Tests get_authentication(), get_network_session(), and get_sdk_gen_client() methods. +""" + +from unittest.mock import Mock, MagicMock, patch +import pytest + +from unittest.mock import Mock, MagicMock, patch +import pytest + +from boxsdk.client import Client +from boxsdk.auth.developer_token_auth import DeveloperTokenAuth +from boxsdk.auth.oauth2 import OAuth2 +from boxsdk.auth.jwt_auth import JWTAuth +from boxsdk.auth.ccg_auth import CCGAuth +from boxsdk.config import API, Proxy +from boxsdk.session.session import Session, AuthorizedSession +from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth +from box_sdk_gen.box.oauth import BoxOAuth +from box_sdk_gen.box.jwt_auth import BoxJWTAuth +from box_sdk_gen.box.ccg_auth import BoxCCGAuth +from box_sdk_gen.networking.network import NetworkSession +from box_sdk_gen.networking.base_urls import BaseUrls +from box_sdk_gen.client import BoxClient + + +class TestGetAuthentication: + """Test cases for get_authentication() method.""" + + def test_get_authentication_developer_token(self, mock_box_session): + """Test converting DeveloperTokenAuth to BoxDeveloperTokenAuth.""" + token = 'dev_token_123' + auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxDeveloperTokenAuth) + assert gen_auth.token == token + + def test_get_authentication_developer_token_missing_token(self, mock_box_session): + """Test that missing developer token raises ValueError.""" + # Create auth with callback that returns None + auth = DeveloperTokenAuth(get_new_token_callback=lambda: None) + auth._access_token = None + + client = Client(auth, session=mock_box_session) + + with pytest.raises(ValueError, match="Developer token is not available"): + client.get_authentication() + + def test_get_authentication_oauth2(self, mock_box_session): + """Test converting OAuth2 to BoxOAuth.""" + client_id = 'test_client_id' + client_secret = 'test_client_secret' + access_token = 'test_access_token' + refresh_token = 'test_refresh_token' + + auth = OAuth2( + client_id=client_id, + client_secret=client_secret, + access_token=access_token, + refresh_token=refresh_token + ) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxOAuth) + assert gen_auth.config.client_id == client_id + assert gen_auth.config.client_secret == client_secret + + def test_get_authentication_oauth2_missing_credentials(self, mock_box_session): + """Test that missing OAuth2 credentials raises ValueError.""" + auth = OAuth2(client_id=None, client_secret=None) + + client = Client(auth, session=mock_box_session) + + with pytest.raises(ValueError, match="OAuth2 client_id and client_secret are required"): + client.get_authentication() + + def test_get_authentication_oauth2_with_custom_token_storage(self, mock_box_session): + """Test OAuth2 conversion with custom token storage.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh' + ) + + custom_storage = InMemoryTokenStorage() + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication(token_storage=custom_storage) + + assert isinstance(gen_auth, BoxOAuth) + assert gen_auth.config.token_storage is custom_storage + + def test_get_authentication_jwt(self, mock_box_session): + """Test converting JWTAuth to BoxJWTAuth.""" + from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey + from cryptography.hazmat.primitives import serialization as crypto_serialization + + # Create a mock RSA private key + mock_key = Mock(spec=RSAPrivateKey) + mock_key.private_bytes.return_value = b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + + client_id = 'test_client_id' + client_secret = 'test_client_secret' + jwt_key_id = 'test_key_id' + enterprise_id = 'test_enterprise_id' + + auth = JWTAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + jwt_key_id=jwt_key_id, + rsa_private_key_data=mock_key + ) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxJWTAuth) + assert gen_auth.config.client_id == client_id + assert gen_auth.config.client_secret == client_secret + assert gen_auth.config.jwt_key_id == jwt_key_id + assert gen_auth.config.enterprise_id == enterprise_id + + def test_get_authentication_jwt_missing_credentials(self, mock_box_session): + """Test that missing JWT credentials raises ValueError.""" + from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey + + # Create a mock RSA private key + mock_key = Mock(spec=RSAPrivateKey) + mock_key.private_bytes.return_value = b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + + # Create auth with missing client_id + auth = JWTAuth( + client_id=None, # Missing required field + client_secret='test_secret', + enterprise_id='test_enterprise', + jwt_key_id='test_key_id', + rsa_private_key_data=mock_key + ) + + client = Client(auth, session=mock_box_session) + + with pytest.raises(ValueError, match="JWT authentication requires"): + client.get_authentication() + + def test_get_authentication_ccg(self, mock_box_session): + """Test converting CCGAuth to BoxCCGAuth.""" + client_id = 'test_client_id' + client_secret = 'test_client_secret' + enterprise_id = 'test_enterprise_id' + + auth = CCGAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id + ) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxCCGAuth) + assert gen_auth.config.client_id == client_id + assert gen_auth.config.client_secret == client_secret + assert gen_auth.config.enterprise_id == enterprise_id + + def test_get_authentication_ccg_missing_credentials(self, mock_box_session): + """Test that missing CCG credentials raises ValueError.""" + auth = CCGAuth(client_id=None, client_secret=None) + + client = Client(auth, session=mock_box_session) + + with pytest.raises(ValueError, match="CCG authentication requires"): + client.get_authentication() + + def test_get_authentication_unsupported_type(self, mock_box_session): + """Test that unsupported auth type raises ValueError.""" + # Create a mock auth that's not one of the supported types + mock_auth = Mock() + mock_auth.__class__.__name__ = 'UnsupportedAuth' + + client = Client(mock_auth, session=mock_box_session) + + with pytest.raises(ValueError, match="Unsupported authentication type"): + client.get_authentication() + + +class TestGetNetworkSession: + """Test cases for get_network_session() method.""" + + def test_get_network_session_default(self, mock_box_session): + """Test extracting network session with default settings.""" + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + assert isinstance(network_session, NetworkSession) + assert isinstance(network_session.base_urls, BaseUrls) + + def test_get_network_session_with_custom_headers(self, mock_box_session): + """Test network session with custom headers.""" + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + additional_headers = {'X-Custom-Header': 'custom_value'} + network_session = client.get_network_session(additional_headers=additional_headers) + + assert 'X-Custom-Header' in network_session.additional_headers + assert network_session.additional_headers['X-Custom-Header'] == 'custom_value' + + def test_get_network_session_with_proxy(self, mock_box_session): + """Test network session with proxy configuration.""" + # Set up proxy config + Proxy.URL = 'http://proxy.example.com:8080' + Proxy.AUTH = None + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + # Proxy URL should be set + assert network_session.proxy_url is not None + assert 'proxy.example.com' in network_session.proxy_url + + # Clean up + Proxy.URL = None + + def test_get_network_session_with_authenticated_proxy(self, mock_box_session): + """Test network session with authenticated proxy.""" + # Set up authenticated proxy config + Proxy.URL = 'http://proxy.example.com:8080' + Proxy.AUTH = {'user': 'proxy_user', 'password': 'proxy_pass'} + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + # Proxy URL should include authentication + assert network_session.proxy_url is not None + assert 'proxy_user' in network_session.proxy_url + assert 'proxy_pass' in network_session.proxy_url + + # Clean up + Proxy.URL = None + Proxy.AUTH = None + + def test_get_network_session_with_custom_retry_strategy(self, mock_box_session): + """Test network session with custom retry strategy.""" + from box_sdk_gen.networking.retries import BoxRetryStrategy + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + custom_retry = BoxRetryStrategy(max_attempts=10, retry_base_interval=2.0) + network_session = client.get_network_session(retry_strategy=custom_retry) + + assert network_session.retry_strategy is custom_retry + assert network_session.retry_strategy.max_attempts == 10 + + def test_get_network_session_base_urls(self, mock_box_session): + """Test that base URLs are correctly extracted.""" + # Modify API config + original_base_url = API.BASE_API_URL + original_upload_url = API.UPLOAD_URL + + API.BASE_API_URL = 'https://custom.api.box.com/2.0' + API.UPLOAD_URL = 'https://custom.upload.box.com/api/2.0' + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + # URLs should have version suffix removed + assert 'custom.api.box.com' in network_session.base_urls.base_url + assert 'custom.upload.box.com' in network_session.base_urls.upload_url + + # Restore + API.BASE_API_URL = original_base_url + API.UPLOAD_URL = original_upload_url + + +class TestGetSdkGenClient: + """Test cases for get_sdk_gen_client() method.""" + + def test_get_sdk_gen_client_basic(self, mock_box_session): + """Test basic usage of get_sdk_gen_client().""" + token = 'dev_token' + auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client() + + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxDeveloperTokenAuth) + assert gen_client.auth.token == token + + def test_get_sdk_gen_client_oauth2(self, mock_box_session): + """Test get_sdk_gen_client() with OAuth2.""" + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh' + ) + + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client() + + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxOAuth) + + def test_get_sdk_gen_client_with_auth_options(self, mock_box_session): + """Test get_sdk_gen_client() with custom auth options.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh' + ) + + custom_storage = InMemoryTokenStorage() + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client( + auth_options={'token_storage': custom_storage} + ) + + assert isinstance(gen_client, BoxClient) + assert gen_client.auth.config.token_storage is custom_storage + + def test_get_sdk_gen_client_with_network_options(self, mock_box_session): + """Test get_sdk_gen_client() with custom network options.""" + from box_sdk_gen.networking.retries import BoxRetryStrategy + + token = 'dev_token' + auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + custom_retry = BoxRetryStrategy(max_attempts=10) + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client( + network_options={'retry_strategy': custom_retry} + ) + + assert isinstance(gen_client, BoxClient) + assert gen_client.network_session.retry_strategy.max_attempts == 10 + + def test_get_sdk_gen_client_with_all_options(self, mock_box_session): + """Test get_sdk_gen_client() with both auth and network options.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + from box_sdk_gen.networking.retries import BoxRetryStrategy + + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh' + ) + + custom_storage = InMemoryTokenStorage() + custom_retry = BoxRetryStrategy(max_attempts=15) + additional_headers = {'X-Test': 'value'} + + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client( + auth_options={'token_storage': custom_storage}, + network_options={ + 'retry_strategy': custom_retry, + 'additional_headers': additional_headers + } + ) + + assert isinstance(gen_client, BoxClient) + assert gen_client.auth.config.token_storage is custom_storage + assert gen_client.network_session.retry_strategy.max_attempts == 15 + assert gen_client.network_session.additional_headers['X-Test'] == 'value' + diff --git a/test/boxsdk/unit/util/test_token_storage_adapter.py b/test/boxsdk/unit/util/test_token_storage_adapter.py new file mode 100644 index 000000000..dee06e993 --- /dev/null +++ b/test/boxsdk/unit/util/test_token_storage_adapter.py @@ -0,0 +1,199 @@ +""" +Unit tests for token storage adapter. + +Tests the LegacyTokenStorageAdapter that bridges legacy OAuth2 token storage +to generated SDK TokenStorage interface. +""" + +from unittest.mock import Mock, MagicMock +import pytest + +from boxsdk.util.token_storage_adapter import LegacyTokenStorageAdapter +from box_sdk_gen.schemas.access_token import AccessToken + + +class TestLegacyTokenStorageAdapter: + """Test cases for LegacyTokenStorageAdapter.""" + + def test_store_with_store_callback(self): + """Test storing tokens using the legacy store callback.""" + stored_tokens = {} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=store_tokens + ) + + token = AccessToken( + access_token='test_access_token', + refresh_token='test_refresh_token', + expires_in=3600, + token_type='bearer' + ) + + adapter.store(token) + + assert stored_tokens['access'] == 'test_access_token' + assert stored_tokens['refresh'] == 'test_refresh_token' + + def test_store_without_store_callback(self): + """Test that store works even without a store callback.""" + def get_tokens(): + return (None, None) + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=None + ) + + token = AccessToken( + access_token='test_access_token', + refresh_token='test_refresh_token', + expires_in=3600, + token_type='bearer' + ) + + # Should not raise an error + adapter.store(token) + + def test_get_with_tokens(self): + """Test retrieving tokens when they exist.""" + def get_tokens(): + return ('test_access_token', 'test_refresh_token') + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=None + ) + + token = adapter.get() + + assert token is not None + assert token.access_token == 'test_access_token' + assert token.refresh_token == 'test_refresh_token' + assert token.expires_in == 3600 # Default value + assert token.token_type == 'bearer' + + def test_get_without_tokens(self): + """Test retrieving tokens when they don't exist.""" + def get_tokens(): + return (None, None) + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=None + ) + + token = adapter.get() + + assert token is None + + def test_get_with_only_access_token(self): + """Test retrieving when only access token is available.""" + def get_tokens(): + return ('test_access_token', None) + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=None + ) + + token = adapter.get() + + assert token is not None + assert token.access_token == 'test_access_token' + assert token.refresh_token is None + + def test_clear_with_store_callback(self): + """Test clearing tokens using the legacy store callback.""" + stored_tokens = {'access': 'token', 'refresh': 'refresh_token'} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + if access_token is None: + stored_tokens.clear() + else: + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=store_tokens + ) + + adapter.clear() + + assert stored_tokens == {} + + def test_clear_without_store_callback(self): + """Test that clear works even without a store callback.""" + def get_tokens(): + return ('token', 'refresh_token') + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=None + ) + + # Should not raise an error + adapter.clear() + + def test_token_conversion_format(self): + """Test that tokens are converted to the correct AccessToken format.""" + def get_tokens(): + return ('access_123', 'refresh_456') + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=None + ) + + token = adapter.get() + + assert isinstance(token, AccessToken) + assert token.access_token == 'access_123' + assert token.refresh_token == 'refresh_456' + assert token.expires_in == 3600 + assert token.token_type == 'bearer' + + def test_store_and_get_roundtrip(self): + """Test storing and retrieving tokens in a roundtrip.""" + stored_tokens = {} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, + store_tokens=store_tokens + ) + + # Store a token + original_token = AccessToken( + access_token='stored_access', + refresh_token='stored_refresh', + expires_in=7200, + token_type='bearer' + ) + adapter.store(original_token) + + # Retrieve it + retrieved_token = adapter.get() + + assert retrieved_token is not None + assert retrieved_token.access_token == 'stored_access' + assert retrieved_token.refresh_token == 'stored_refresh' + From 0cc8d59a516a591a05e96f7a9b14cf36b082122f Mon Sep 17 00:00:00 2001 From: Minh Nguyen Cong Date: Mon, 19 Jan 2026 21:54:17 +0100 Subject: [PATCH 2/5] update test --- boxsdk/client/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/boxsdk/client/client.py b/boxsdk/client/client.py index db2538bde..d1468363d 100644 --- a/boxsdk/client/client.py +++ b/boxsdk/client/client.py @@ -19,11 +19,11 @@ from ..util.datetime_formatter import normalize_date_to_rfc3339_format from ..util.shared_link import get_shared_link_header from ..util.deprecation_decorator import deprecated -from ...auth.developer_token_auth import DeveloperTokenAuth -from ...auth.jwt_auth import JWTAuth -from ...auth.ccg_auth import CCGAuth -from ...auth.oauth2 import OAuth2 as LegacyOAuth2 -from ...util.token_storage_adapter import LegacyTokenStorageAdapter +from ..auth.developer_token_auth import DeveloperTokenAuth +from ..auth.jwt_auth import JWTAuth +from ..auth.ccg_auth import CCGAuth +from ..auth.oauth2 import OAuth2 as LegacyOAuth2 +from ..util.token_storage_adapter import LegacyTokenStorageAdapter from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth from box_sdk_gen.box.oauth import BoxOAuth, OAuthConfig From b140ddacbbd16242b486cea8802ea907baeaf542 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Cong Date: Mon, 19 Jan 2026 22:02:18 +0100 Subject: [PATCH 3/5] update code --- boxsdk/client/client.py | 140 +++++++-------- boxsdk/util/token_storage_adapter.py | 23 +-- .../boxsdk/integration/test_config_sharing.py | 147 ++++++++-------- .../boxsdk/unit/client/test_config_sharing.py | 165 +++++++++--------- .../unit/util/test_token_storage_adapter.py | 120 ++++++------- 5 files changed, 299 insertions(+), 296 deletions(-) diff --git a/boxsdk/client/client.py b/boxsdk/client/client.py index d1468363d..2f5bf1e78 100644 --- a/boxsdk/client/client.py +++ b/boxsdk/client/client.py @@ -2029,13 +2029,13 @@ def get_authentication(self, *, token_storage=None): """ Extract authentication configuration from this legacy client and convert it to a generated SDK Authentication object. - + This method supports the following legacy authentication types: - DeveloperTokenAuth -> BoxDeveloperTokenAuth - OAuth2 -> BoxOAuth - JWTAuth -> BoxJWTAuth - CCGAuth -> BoxCCGAuth - + :param token_storage: Optional TokenStorage instance for the generated SDK. If not provided, an adapter will be created to bridge legacy token storage. @@ -2047,44 +2047,45 @@ def get_authentication(self, *, token_storage=None): If the authentication type is not supported or required credentials are missing. """ oauth = self._oauth - + # Developer Token Authentication if isinstance(oauth, DeveloperTokenAuth): token = oauth.access_token if not token: raise ValueError("Developer token is not available") return BoxDeveloperTokenAuth(token=token) - + # OAuth 2.0 Authentication # Check if it's OAuth2 (but not DeveloperTokenAuth, JWTAuth, or CCGAuth) - if isinstance(oauth, LegacyOAuth2) and not isinstance(oauth, (DeveloperTokenAuth, JWTAuth, CCGAuth)): + if isinstance(oauth, LegacyOAuth2) and not isinstance( + oauth, (DeveloperTokenAuth, JWTAuth, CCGAuth) + ): # It's OAuth2 client_id = getattr(oauth, '_client_id', None) client_secret = getattr(oauth, '_client_secret', None) - + if not client_id or not client_secret: raise ValueError("OAuth2 client_id and client_secret are required") - + # Create token storage adapter if not provided if token_storage is None: # Create adapter from legacy OAuth2's token storage def get_tokens(): return oauth._get_tokens() - + def store_tokens(access_token, refresh_token): oauth._store_tokens(access_token, refresh_token) - + token_storage = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=store_tokens + get_tokens=get_tokens, store_tokens=store_tokens ) - + config = OAuthConfig( client_id=client_id, client_secret=client_secret, - token_storage=token_storage + token_storage=token_storage, ) - + # Pre-populate with existing tokens if available auth = BoxOAuth(config=config) access_token, refresh_token = oauth._get_tokens() @@ -2093,12 +2094,12 @@ def store_tokens(access_token, refresh_token): access_token=access_token, refresh_token=refresh_token, expires_in=3600, # Default, actual expiry not available - token_type='bearer' + token_type='bearer', ) token_storage.store(existing_token) - + return auth - + # JWT Authentication if isinstance(oauth, JWTAuth): client_id = getattr(oauth, '_client_id', None) @@ -2107,14 +2108,17 @@ def store_tokens(access_token, refresh_token): rsa_private_key = getattr(oauth, '_rsa_private_key', None) enterprise_id = getattr(oauth, '_enterprise_id', None) user_id = getattr(oauth, '_user_id', None) - + if not all([client_id, client_secret, jwt_key_id, rsa_private_key]): - raise ValueError("JWT authentication requires client_id, client_secret, jwt_key_id, and private key") - + raise ValueError( + "JWT authentication requires client_id, client_secret, jwt_key_id, and private key" + ) + # Convert RSA private key to string format # Note: If the key was originally encrypted, we can't extract the passphrase # from the normalized RSAPrivateKey object. We'll serialize it unencrypted. from cryptography.hazmat.primitives import serialization + try: # Serialize the key to PEM format (unencrypted) # This works even if the original key was encrypted, as the @@ -2122,7 +2126,7 @@ def store_tokens(access_token, refresh_token): private_key_pem = rsa_private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ).decode('utf-8') # Passphrase is not needed since we're serializing unencrypted # The generated SDK will handle encryption if needed @@ -2132,12 +2136,13 @@ def store_tokens(access_token, refresh_token): f"Cannot serialize private key: {e}. " "Please ensure the private key is valid." ) from e - + # Create token storage adapter if not provided if token_storage is None: from box_sdk_gen.box.token_storage import InMemoryTokenStorage + token_storage = InMemoryTokenStorage() - + config = JWTConfig( client_id=client_id, client_secret=client_secret, @@ -2146,48 +2151,51 @@ def store_tokens(access_token, refresh_token): private_key_passphrase=passphrase, enterprise_id=enterprise_id, user_id=user_id, - token_storage=token_storage + token_storage=token_storage, ) - + auth = BoxJWTAuth(config=config) - + # Handle user vs enterprise scope if user_id: auth = auth.with_user_subject(user_id, token_storage=token_storage) - + return auth - + # CCG (Client Credentials Grant) Authentication if isinstance(oauth, CCGAuth): client_id = getattr(oauth, '_client_id', None) client_secret = getattr(oauth, '_client_secret', None) enterprise_id = getattr(oauth, '_enterprise_id', None) user_id = getattr(oauth, '_user_id', None) - + if not client_id or not client_secret: - raise ValueError("CCG authentication requires client_id and client_secret") - + raise ValueError( + "CCG authentication requires client_id and client_secret" + ) + # Create token storage adapter if not provided if token_storage is None: from box_sdk_gen.box.token_storage import InMemoryTokenStorage + token_storage = InMemoryTokenStorage() - + config = CCGConfig( client_id=client_id, client_secret=client_secret, enterprise_id=enterprise_id, user_id=user_id, - token_storage=token_storage + token_storage=token_storage, ) - + auth = BoxCCGAuth(config=config) - + # Handle user vs enterprise scope if user_id: auth = auth.with_user_subject(user_id, token_storage=token_storage) - + return auth - + raise ValueError( f"Unsupported authentication type: {type(oauth).__name__}. " "Supported types: DeveloperTokenAuth, OAuth2, JWTAuth, CCGAuth" @@ -2200,12 +2208,12 @@ def get_network_session( retry_strategy=None, data_sanitizer=None, interceptors=None, - additional_headers=None + additional_headers=None, ): """ Extract network configuration from this legacy client and convert it to a generated SDK NetworkSession object. - + :param network_client: Optional NetworkClient instance for the generated SDK. If not provided, a default will be created with proxy support if configured. @@ -2224,7 +2232,7 @@ def get_network_session( session = self._session api_config = session.api_config proxy_config = session.proxy_config - + # Extract base URLs base_url = getattr(api_config, 'BASE_API_URL', 'https://api.box.com/2.0') # Remove version suffix if present @@ -2232,34 +2240,33 @@ def get_network_session( base_url = base_url[:-4] elif base_url.endswith('/2'): base_url = base_url[:-2] - + upload_url = getattr(api_config, 'UPLOAD_URL', 'https://upload.box.com/api/2.0') # Remove version suffix if present if upload_url.endswith('/2.0'): upload_url = upload_url[:-4] elif upload_url.endswith('/2'): upload_url = upload_url[:-2] - - oauth2_url = getattr(api_config, 'OAUTH2_AUTHORIZE_URL', 'https://account.box.com/api/oauth2') + + oauth2_url = getattr( + api_config, 'OAUTH2_AUTHORIZE_URL', 'https://account.box.com/api/oauth2' + ) # Extract base OAuth URL if '/authorize' in oauth2_url: - oauth2_url = oauth2_url[:oauth2_url.rindex('/authorize')] - + oauth2_url = oauth2_url[: oauth2_url.rindex('/authorize')] + base_urls = BaseUrls( - base_url=base_url, - upload_url=upload_url, - oauth_2_url=oauth2_url + base_url=base_url, upload_url=upload_url, oauth_2_url=oauth2_url ) - + # Extract or create retry strategy if retry_strategy is None: max_retries = getattr(api_config, 'MAX_RETRY_ATTEMPTS', 5) retry_base_interval = getattr(session, '_retry_base_interval', 1.0) retry_strategy = BoxRetryStrategy( - max_attempts=max_retries, - retry_base_interval=retry_base_interval + max_attempts=max_retries, retry_base_interval=retry_base_interval ) - + # Handle proxy configuration proxy_url = None if proxy_config and hasattr(proxy_config, 'URL') and proxy_config.URL: @@ -2271,14 +2278,14 @@ def get_network_session( # Extract host from URL host = proxy_url.split('//')[1] if '//' in proxy_url else proxy_url proxy_url = f"http://{auth['user']}:{auth['password']}@{host}" - + # Merge custom headers headers = {} if hasattr(session, '_default_headers'): headers.update(session._default_headers.copy()) if additional_headers: headers.update(additional_headers) - + # Create network session network_session = NetworkSession( base_urls=base_urls, @@ -2286,24 +2293,19 @@ def get_network_session( retry_strategy=retry_strategy, additional_headers=headers if headers else None, proxy_url=proxy_url, - data_sanitizer=data_sanitizer + data_sanitizer=data_sanitizer, ) - + return network_session - def get_sdk_gen_client( - self, - *, - auth_options=None, - network_options=None - ): + def get_sdk_gen_client(self, *, auth_options=None, network_options=None): """ Create a fully configured generated SDK client from this legacy client. - + This method combines get_authentication() and get_network_session() to create a BoxClient instance that shares authentication and network configuration with this legacy client. - + :param auth_options: Optional dictionary with authentication options: - token_storage: Custom TokenStorage instance @@ -2321,9 +2323,9 @@ def get_sdk_gen_client( token_storage = None if auth_options and 'token_storage' in auth_options: token_storage = auth_options['token_storage'] - + auth = self.get_authentication(token_storage=token_storage) - + # Extract network session network_kwargs = {} if network_options: @@ -2332,10 +2334,10 @@ def get_sdk_gen_client( 'retry_strategy': network_options.get('retry_strategy'), 'data_sanitizer': network_options.get('data_sanitizer'), 'interceptors': network_options.get('interceptors'), - 'additional_headers': network_options.get('additional_headers') + 'additional_headers': network_options.get('additional_headers'), } - + network_session = self.get_network_session(**network_kwargs) - + # Create and return fully configured client return BoxClient(auth=auth, network_session=network_session) diff --git a/boxsdk/util/token_storage_adapter.py b/boxsdk/util/token_storage_adapter.py index dd7a26ef9..65c6efbe4 100644 --- a/boxsdk/util/token_storage_adapter.py +++ b/boxsdk/util/token_storage_adapter.py @@ -13,7 +13,7 @@ class LegacyTokenStorageAdapter(TokenStorage): """ Adapter that bridges legacy OAuth2 token storage (callbacks) to generated SDK TokenStorage. - + This adapter wraps legacy token storage mechanisms (store_tokens callback and _get_tokens method) to provide a TokenStorage interface for the generated SDK. """ @@ -25,7 +25,7 @@ def __init__( ): """ Initialize the adapter with legacy token storage callbacks. - + :param get_tokens: Callable that returns (access_token, refresh_token) tuple. This can be a method from OAuth2._get_tokens or a custom callback. @@ -39,44 +39,45 @@ def __init__( def store(self, token: AccessToken) -> None: """ Store a token using the legacy storage mechanism. - + :param token: AccessToken object from generated SDK. """ if self._store_tokens is not None: - refresh_token = token.refresh_token if hasattr(token, 'refresh_token') else None + refresh_token = ( + token.refresh_token if hasattr(token, 'refresh_token') else None + ) self._store_tokens(token.access_token, refresh_token) def get(self) -> Optional[AccessToken]: """ Get the current token from legacy storage. - + :return: AccessToken object if tokens are available, None otherwise. """ access_token, refresh_token = self._get_tokens() - + if access_token is None: return None - + # Convert legacy token format to generated SDK AccessToken # The generated SDK AccessToken has: access_token, refresh_token, expires_in, token_type # We don't have expires_in from legacy, so we'll set a default or calculate if possible expires_in = 3600 # Default to 1 hour if not available - + return AccessToken( access_token=access_token, refresh_token=refresh_token, expires_in=expires_in, - token_type='bearer' + token_type='bearer', ) def clear(self) -> None: """ Clear stored tokens using the legacy storage mechanism. - + Note: This will call store_tokens with None values if the callback supports it. """ if self._store_tokens is not None: self._store_tokens(None, None) - diff --git a/test/boxsdk/integration/test_config_sharing.py b/test/boxsdk/integration/test_config_sharing.py index 10adfe8ef..0414d0bb6 100644 --- a/test/boxsdk/integration/test_config_sharing.py +++ b/test/boxsdk/integration/test_config_sharing.py @@ -29,17 +29,17 @@ def test_developer_token_roundtrip(self, mock_box_session): """Test that developer token auth works end-to-end.""" token = 'test_dev_token_12345' legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) - + legacy_client = Client(legacy_auth, session=mock_box_session) - + # Get generated client gen_client = legacy_client.get_sdk_gen_client() - + # Verify both clients are configured assert isinstance(gen_client, BoxClient) assert isinstance(gen_client.auth, BoxDeveloperTokenAuth) assert gen_client.auth.token == token - + # Verify network session is configured assert isinstance(gen_client.network_session, NetworkSession) @@ -49,25 +49,25 @@ def test_oauth2_roundtrip(self, mock_box_session): client_secret = 'test_client_secret_456' access_token = 'test_access_token_789' refresh_token = 'test_refresh_token_012' - + legacy_auth = OAuth2( client_id=client_id, client_secret=client_secret, access_token=access_token, - refresh_token=refresh_token + refresh_token=refresh_token, ) - + legacy_client = Client(legacy_auth, session=mock_box_session) - + # Get generated client gen_client = legacy_client.get_sdk_gen_client() - + # Verify authentication assert isinstance(gen_client, BoxClient) assert isinstance(gen_client.auth, BoxOAuth) assert gen_client.auth.config.client_id == client_id assert gen_client.auth.config.client_secret == client_secret - + # Verify token storage is shared stored_token = gen_client.auth.token_storage.get() assert stored_token is not None @@ -78,38 +78,37 @@ def test_oauth2_token_synchronization(self, mock_box_session): """Test that tokens are synchronized between legacy and generated clients.""" client_id = 'test_client_id' client_secret = 'test_client_secret' - + # Track token storage stored_tokens = {'access': 'initial_token', 'refresh': 'initial_refresh'} - + def get_tokens(): return (stored_tokens.get('access'), stored_tokens.get('refresh')) - + def store_tokens(access_token, refresh_token): stored_tokens['access'] = access_token stored_tokens['refresh'] = refresh_token - + legacy_auth = OAuth2( - client_id=client_id, - client_secret=client_secret, - store_tokens=store_tokens + client_id=client_id, client_secret=client_secret, store_tokens=store_tokens ) legacy_auth._get_tokens = get_tokens legacy_auth._store_tokens = store_tokens - + legacy_client = Client(legacy_auth, session=mock_box_session) gen_client = legacy_client.get_sdk_gen_client() - + # Update token via generated client from box_sdk_gen.schemas.access_token import AccessToken + new_token = AccessToken( access_token='new_token', refresh_token='new_refresh', expires_in=3600, - token_type='bearer' + token_type='bearer', ) gen_client.auth.token_storage.store(new_token) - + # Verify legacy client sees the update access, refresh = legacy_auth._get_tokens() assert access == 'new_token' @@ -120,19 +119,22 @@ def test_network_configuration_preservation(self, mock_box_session): # Set up custom API config original_base_url = API.BASE_API_URL original_upload_url = API.UPLOAD_URL - + API.BASE_API_URL = 'https://custom.api.box.com/2.0' API.UPLOAD_URL = 'https://custom.upload.box.com/api/2.0' - + try: legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') legacy_client = Client(legacy_auth, session=mock_box_session) - + gen_client = legacy_client.get_sdk_gen_client() - + # Verify base URLs are preserved assert 'custom.api.box.com' in gen_client.network_session.base_urls.base_url - assert 'custom.upload.box.com' in gen_client.network_session.base_urls.upload_url + assert ( + 'custom.upload.box.com' + in gen_client.network_session.base_urls.upload_url + ) finally: # Restore original values API.BASE_API_URL = original_base_url @@ -143,16 +145,16 @@ def test_proxy_configuration_preservation(self, mock_box_session): # Set up proxy original_proxy_url = Proxy.URL original_proxy_auth = Proxy.AUTH - + Proxy.URL = 'http://proxy.example.com:8080' Proxy.AUTH = {'user': 'proxy_user', 'password': 'proxy_pass'} - + try: legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') legacy_client = Client(legacy_auth, session=mock_box_session) - + gen_client = legacy_client.get_sdk_gen_client() - + # Verify proxy is configured assert gen_client.network_session.proxy_url is not None assert 'proxy.example.com' in gen_client.network_session.proxy_url @@ -166,28 +168,31 @@ def test_custom_headers_preservation(self, mock_box_session): """Test that custom headers are preserved.""" legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') legacy_client = Client(legacy_auth, session=mock_box_session) - + # Add custom headers to session mock_box_session._default_headers = {'X-Custom-Header': 'custom_value'} - + gen_client = legacy_client.get_sdk_gen_client() - + # Verify headers are preserved assert 'X-Custom-Header' in gen_client.network_session.additional_headers - assert gen_client.network_session.additional_headers['X-Custom-Header'] == 'custom_value' + assert ( + gen_client.network_session.additional_headers['X-Custom-Header'] + == 'custom_value' + ) def test_retry_strategy_preservation(self, mock_box_session): """Test that retry strategy is preserved.""" # Modify API config original_max_retries = API.MAX_RETRY_ATTEMPTS API.MAX_RETRY_ATTEMPTS = 10 - + try: legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') legacy_client = Client(legacy_auth, session=mock_box_session) - + gen_client = legacy_client.get_sdk_gen_client() - + # Verify retry strategy assert gen_client.network_session.retry_strategy.max_attempts == 10 finally: @@ -196,27 +201,29 @@ def test_retry_strategy_preservation(self, mock_box_session): def test_jwt_auth_roundtrip(self, mock_box_session): """Test that JWT auth works end-to-end.""" from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey - + # Create a mock RSA private key mock_key = Mock(spec=RSAPrivateKey) - mock_key.private_bytes.return_value = b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' - + mock_key.private_bytes.return_value = ( + b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + ) + client_id = 'test_jwt_client_id' client_secret = 'test_jwt_client_secret' jwt_key_id = 'test_jwt_key_id' enterprise_id = 'test_enterprise_id' - + legacy_auth = JWTAuth( client_id=client_id, client_secret=client_secret, enterprise_id=enterprise_id, jwt_key_id=jwt_key_id, - rsa_private_key_data=mock_key + rsa_private_key_data=mock_key, ) - + legacy_client = Client(legacy_auth, session=mock_box_session) gen_client = legacy_client.get_sdk_gen_client() - + # Verify authentication assert isinstance(gen_client, BoxClient) assert isinstance(gen_client.auth, BoxJWTAuth) @@ -230,16 +237,16 @@ def test_ccg_auth_roundtrip(self, mock_box_session): client_id = 'test_ccg_client_id' client_secret = 'test_ccg_client_secret' enterprise_id = 'test_ccg_enterprise_id' - + legacy_auth = CCGAuth( client_id=client_id, client_secret=client_secret, - enterprise_id=enterprise_id + enterprise_id=enterprise_id, ) - + legacy_client = Client(legacy_auth, session=mock_box_session) gen_client = legacy_client.get_sdk_gen_client() - + # Verify authentication assert isinstance(gen_client, BoxClient) assert isinstance(gen_client.auth, BoxCCGAuth) @@ -251,14 +258,14 @@ def test_parallel_client_usage(self, mock_box_session): """Test that both legacy and generated clients can be used in parallel.""" token = 'parallel_token' legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) - + legacy_client = Client(legacy_auth, session=mock_box_session) gen_client = legacy_client.get_sdk_gen_client() - + # Both clients should be functional assert legacy_client.auth.access_token == token assert gen_client.auth.token == token - + # Both should have network sessions assert legacy_client.session is not None assert gen_client.network_session is not None @@ -266,36 +273,35 @@ def test_parallel_client_usage(self, mock_box_session): def test_get_authentication_with_custom_token_storage(self, mock_box_session): """Test get_authentication() with custom token storage.""" from box_sdk_gen.box.token_storage import InMemoryTokenStorage - + legacy_auth = OAuth2( client_id='test_id', client_secret='test_secret', access_token='token', - refresh_token='refresh' + refresh_token='refresh', ) - + custom_storage = InMemoryTokenStorage() legacy_client = Client(legacy_auth, session=mock_box_session) gen_auth = legacy_client.get_authentication(token_storage=custom_storage) - + assert isinstance(gen_auth, BoxOAuth) assert gen_auth.config.token_storage is custom_storage def test_get_network_session_with_custom_options(self, mock_box_session): """Test get_network_session() with custom options.""" from box_sdk_gen.networking.retries import BoxRetryStrategy - + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') legacy_client = Client(legacy_auth, session=mock_box_session) - + custom_retry = BoxRetryStrategy(max_attempts=20, retry_base_interval=2.5) custom_headers = {'X-Custom': 'value'} - + network_session = legacy_client.get_network_session( - retry_strategy=custom_retry, - additional_headers=custom_headers + retry_strategy=custom_retry, additional_headers=custom_headers ) - + assert network_session.retry_strategy.max_attempts == 20 assert network_session.retry_strategy.retry_base_interval == 2.5 assert network_session.additional_headers['X-Custom'] == 'value' @@ -304,29 +310,30 @@ def test_get_sdk_gen_client_with_all_options(self, mock_box_session): """Test get_sdk_gen_client() with all options.""" from box_sdk_gen.box.token_storage import InMemoryTokenStorage from box_sdk_gen.networking.retries import BoxRetryStrategy - + legacy_auth = OAuth2( client_id='test_id', client_secret='test_secret', access_token='token', - refresh_token='refresh' + refresh_token='refresh', ) - + custom_storage = InMemoryTokenStorage() custom_retry = BoxRetryStrategy(max_attempts=25) custom_headers = {'X-Full-Test': 'full_value'} - + legacy_client = Client(legacy_auth, session=mock_box_session) gen_client = legacy_client.get_sdk_gen_client( auth_options={'token_storage': custom_storage}, network_options={ 'retry_strategy': custom_retry, - 'additional_headers': custom_headers - } + 'additional_headers': custom_headers, + }, ) - + # Verify all options are applied assert gen_client.auth.config.token_storage is custom_storage assert gen_client.network_session.retry_strategy.max_attempts == 25 - assert gen_client.network_session.additional_headers['X-Full-Test'] == 'full_value' - + assert ( + gen_client.network_session.additional_headers['X-Full-Test'] == 'full_value' + ) diff --git a/test/boxsdk/unit/client/test_config_sharing.py b/test/boxsdk/unit/client/test_config_sharing.py index a96c2f622..b9b92a5e5 100644 --- a/test/boxsdk/unit/client/test_config_sharing.py +++ b/test/boxsdk/unit/client/test_config_sharing.py @@ -33,10 +33,10 @@ def test_get_authentication_developer_token(self, mock_box_session): """Test converting DeveloperTokenAuth to BoxDeveloperTokenAuth.""" token = 'dev_token_123' auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) - + client = Client(auth, session=mock_box_session) gen_auth = client.get_authentication() - + assert isinstance(gen_auth, BoxDeveloperTokenAuth) assert gen_auth.token == token @@ -45,9 +45,9 @@ def test_get_authentication_developer_token_missing_token(self, mock_box_session # Create auth with callback that returns None auth = DeveloperTokenAuth(get_new_token_callback=lambda: None) auth._access_token = None - + client = Client(auth, session=mock_box_session) - + with pytest.raises(ValueError, match="Developer token is not available"): client.get_authentication() @@ -57,17 +57,17 @@ def test_get_authentication_oauth2(self, mock_box_session): client_secret = 'test_client_secret' access_token = 'test_access_token' refresh_token = 'test_refresh_token' - + auth = OAuth2( client_id=client_id, client_secret=client_secret, access_token=access_token, - refresh_token=refresh_token + refresh_token=refresh_token, ) - + client = Client(auth, session=mock_box_session) gen_auth = client.get_authentication() - + assert isinstance(gen_auth, BoxOAuth) assert gen_auth.config.client_id == client_id assert gen_auth.config.client_secret == client_secret @@ -75,27 +75,31 @@ def test_get_authentication_oauth2(self, mock_box_session): def test_get_authentication_oauth2_missing_credentials(self, mock_box_session): """Test that missing OAuth2 credentials raises ValueError.""" auth = OAuth2(client_id=None, client_secret=None) - + client = Client(auth, session=mock_box_session) - - with pytest.raises(ValueError, match="OAuth2 client_id and client_secret are required"): + + with pytest.raises( + ValueError, match="OAuth2 client_id and client_secret are required" + ): client.get_authentication() - def test_get_authentication_oauth2_with_custom_token_storage(self, mock_box_session): + def test_get_authentication_oauth2_with_custom_token_storage( + self, mock_box_session + ): """Test OAuth2 conversion with custom token storage.""" from box_sdk_gen.box.token_storage import InMemoryTokenStorage - + auth = OAuth2( client_id='test_id', client_secret='test_secret', access_token='token', - refresh_token='refresh' + refresh_token='refresh', ) - + custom_storage = InMemoryTokenStorage() client = Client(auth, session=mock_box_session) gen_auth = client.get_authentication(token_storage=custom_storage) - + assert isinstance(gen_auth, BoxOAuth) assert gen_auth.config.token_storage is custom_storage @@ -103,27 +107,29 @@ def test_get_authentication_jwt(self, mock_box_session): """Test converting JWTAuth to BoxJWTAuth.""" from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.hazmat.primitives import serialization as crypto_serialization - + # Create a mock RSA private key mock_key = Mock(spec=RSAPrivateKey) - mock_key.private_bytes.return_value = b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' - + mock_key.private_bytes.return_value = ( + b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + ) + client_id = 'test_client_id' client_secret = 'test_client_secret' jwt_key_id = 'test_key_id' enterprise_id = 'test_enterprise_id' - + auth = JWTAuth( client_id=client_id, client_secret=client_secret, enterprise_id=enterprise_id, jwt_key_id=jwt_key_id, - rsa_private_key_data=mock_key + rsa_private_key_data=mock_key, ) - + client = Client(auth, session=mock_box_session) gen_auth = client.get_authentication() - + assert isinstance(gen_auth, BoxJWTAuth) assert gen_auth.config.client_id == client_id assert gen_auth.config.client_secret == client_secret @@ -133,22 +139,24 @@ def test_get_authentication_jwt(self, mock_box_session): def test_get_authentication_jwt_missing_credentials(self, mock_box_session): """Test that missing JWT credentials raises ValueError.""" from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey - + # Create a mock RSA private key mock_key = Mock(spec=RSAPrivateKey) - mock_key.private_bytes.return_value = b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' - + mock_key.private_bytes.return_value = ( + b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + ) + # Create auth with missing client_id auth = JWTAuth( client_id=None, # Missing required field client_secret='test_secret', enterprise_id='test_enterprise', jwt_key_id='test_key_id', - rsa_private_key_data=mock_key + rsa_private_key_data=mock_key, ) - + client = Client(auth, session=mock_box_session) - + with pytest.raises(ValueError, match="JWT authentication requires"): client.get_authentication() @@ -157,16 +165,16 @@ def test_get_authentication_ccg(self, mock_box_session): client_id = 'test_client_id' client_secret = 'test_client_secret' enterprise_id = 'test_enterprise_id' - + auth = CCGAuth( client_id=client_id, client_secret=client_secret, - enterprise_id=enterprise_id + enterprise_id=enterprise_id, ) - + client = Client(auth, session=mock_box_session) gen_auth = client.get_authentication() - + assert isinstance(gen_auth, BoxCCGAuth) assert gen_auth.config.client_id == client_id assert gen_auth.config.client_secret == client_secret @@ -175,9 +183,9 @@ def test_get_authentication_ccg(self, mock_box_session): def test_get_authentication_ccg_missing_credentials(self, mock_box_session): """Test that missing CCG credentials raises ValueError.""" auth = CCGAuth(client_id=None, client_secret=None) - + client = Client(auth, session=mock_box_session) - + with pytest.raises(ValueError, match="CCG authentication requires"): client.get_authentication() @@ -186,9 +194,9 @@ def test_get_authentication_unsupported_type(self, mock_box_session): # Create a mock auth that's not one of the supported types mock_auth = Mock() mock_auth.__class__.__name__ = 'UnsupportedAuth' - + client = Client(mock_auth, session=mock_box_session) - + with pytest.raises(ValueError, match="Unsupported authentication type"): client.get_authentication() @@ -200,9 +208,9 @@ def test_get_network_session_default(self, mock_box_session): """Test extracting network session with default settings.""" auth = Mock(OAuth2) client = Client(auth, session=mock_box_session) - + network_session = client.get_network_session() - + assert isinstance(network_session, NetworkSession) assert isinstance(network_session.base_urls, BaseUrls) @@ -210,10 +218,12 @@ def test_get_network_session_with_custom_headers(self, mock_box_session): """Test network session with custom headers.""" auth = Mock(OAuth2) client = Client(auth, session=mock_box_session) - + additional_headers = {'X-Custom-Header': 'custom_value'} - network_session = client.get_network_session(additional_headers=additional_headers) - + network_session = client.get_network_session( + additional_headers=additional_headers + ) + assert 'X-Custom-Header' in network_session.additional_headers assert network_session.additional_headers['X-Custom-Header'] == 'custom_value' @@ -222,16 +232,16 @@ def test_get_network_session_with_proxy(self, mock_box_session): # Set up proxy config Proxy.URL = 'http://proxy.example.com:8080' Proxy.AUTH = None - + auth = Mock(OAuth2) client = Client(auth, session=mock_box_session) - + network_session = client.get_network_session() - + # Proxy URL should be set assert network_session.proxy_url is not None assert 'proxy.example.com' in network_session.proxy_url - + # Clean up Proxy.URL = None @@ -240,17 +250,17 @@ def test_get_network_session_with_authenticated_proxy(self, mock_box_session): # Set up authenticated proxy config Proxy.URL = 'http://proxy.example.com:8080' Proxy.AUTH = {'user': 'proxy_user', 'password': 'proxy_pass'} - + auth = Mock(OAuth2) client = Client(auth, session=mock_box_session) - + network_session = client.get_network_session() - + # Proxy URL should include authentication assert network_session.proxy_url is not None assert 'proxy_user' in network_session.proxy_url assert 'proxy_pass' in network_session.proxy_url - + # Clean up Proxy.URL = None Proxy.AUTH = None @@ -258,13 +268,13 @@ def test_get_network_session_with_authenticated_proxy(self, mock_box_session): def test_get_network_session_with_custom_retry_strategy(self, mock_box_session): """Test network session with custom retry strategy.""" from box_sdk_gen.networking.retries import BoxRetryStrategy - + auth = Mock(OAuth2) client = Client(auth, session=mock_box_session) - + custom_retry = BoxRetryStrategy(max_attempts=10, retry_base_interval=2.0) network_session = client.get_network_session(retry_strategy=custom_retry) - + assert network_session.retry_strategy is custom_retry assert network_session.retry_strategy.max_attempts == 10 @@ -273,19 +283,19 @@ def test_get_network_session_base_urls(self, mock_box_session): # Modify API config original_base_url = API.BASE_API_URL original_upload_url = API.UPLOAD_URL - + API.BASE_API_URL = 'https://custom.api.box.com/2.0' API.UPLOAD_URL = 'https://custom.upload.box.com/api/2.0' - + auth = Mock(OAuth2) client = Client(auth, session=mock_box_session) - + network_session = client.get_network_session() - + # URLs should have version suffix removed assert 'custom.api.box.com' in network_session.base_urls.base_url assert 'custom.upload.box.com' in network_session.base_urls.upload_url - + # Restore API.BASE_API_URL = original_base_url API.UPLOAD_URL = original_upload_url @@ -298,10 +308,10 @@ def test_get_sdk_gen_client_basic(self, mock_box_session): """Test basic usage of get_sdk_gen_client().""" token = 'dev_token' auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) - + client = Client(auth, session=mock_box_session) gen_client = client.get_sdk_gen_client() - + assert isinstance(gen_client, BoxClient) assert isinstance(gen_client.auth, BoxDeveloperTokenAuth) assert gen_client.auth.token == token @@ -312,48 +322,48 @@ def test_get_sdk_gen_client_oauth2(self, mock_box_session): client_id='test_id', client_secret='test_secret', access_token='token', - refresh_token='refresh' + refresh_token='refresh', ) - + client = Client(auth, session=mock_box_session) gen_client = client.get_sdk_gen_client() - + assert isinstance(gen_client, BoxClient) assert isinstance(gen_client.auth, BoxOAuth) def test_get_sdk_gen_client_with_auth_options(self, mock_box_session): """Test get_sdk_gen_client() with custom auth options.""" from box_sdk_gen.box.token_storage import InMemoryTokenStorage - + auth = OAuth2( client_id='test_id', client_secret='test_secret', access_token='token', - refresh_token='refresh' + refresh_token='refresh', ) - + custom_storage = InMemoryTokenStorage() client = Client(auth, session=mock_box_session) gen_client = client.get_sdk_gen_client( auth_options={'token_storage': custom_storage} ) - + assert isinstance(gen_client, BoxClient) assert gen_client.auth.config.token_storage is custom_storage def test_get_sdk_gen_client_with_network_options(self, mock_box_session): """Test get_sdk_gen_client() with custom network options.""" from box_sdk_gen.networking.retries import BoxRetryStrategy - + token = 'dev_token' auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) - + custom_retry = BoxRetryStrategy(max_attempts=10) client = Client(auth, session=mock_box_session) gen_client = client.get_sdk_gen_client( network_options={'retry_strategy': custom_retry} ) - + assert isinstance(gen_client, BoxClient) assert gen_client.network_session.retry_strategy.max_attempts == 10 @@ -361,29 +371,28 @@ def test_get_sdk_gen_client_with_all_options(self, mock_box_session): """Test get_sdk_gen_client() with both auth and network options.""" from box_sdk_gen.box.token_storage import InMemoryTokenStorage from box_sdk_gen.networking.retries import BoxRetryStrategy - + auth = OAuth2( client_id='test_id', client_secret='test_secret', access_token='token', - refresh_token='refresh' + refresh_token='refresh', ) - + custom_storage = InMemoryTokenStorage() custom_retry = BoxRetryStrategy(max_attempts=15) additional_headers = {'X-Test': 'value'} - + client = Client(auth, session=mock_box_session) gen_client = client.get_sdk_gen_client( auth_options={'token_storage': custom_storage}, network_options={ 'retry_strategy': custom_retry, - 'additional_headers': additional_headers - } + 'additional_headers': additional_headers, + }, ) - + assert isinstance(gen_client, BoxClient) assert gen_client.auth.config.token_storage is custom_storage assert gen_client.network_session.retry_strategy.max_attempts == 15 assert gen_client.network_session.additional_headers['X-Test'] == 'value' - diff --git a/test/boxsdk/unit/util/test_token_storage_adapter.py b/test/boxsdk/unit/util/test_token_storage_adapter.py index dee06e993..71d4db611 100644 --- a/test/boxsdk/unit/util/test_token_storage_adapter.py +++ b/test/boxsdk/unit/util/test_token_storage_adapter.py @@ -18,63 +18,58 @@ class TestLegacyTokenStorageAdapter: def test_store_with_store_callback(self): """Test storing tokens using the legacy store callback.""" stored_tokens = {} - + def get_tokens(): return (stored_tokens.get('access'), stored_tokens.get('refresh')) - + def store_tokens(access_token, refresh_token): stored_tokens['access'] = access_token stored_tokens['refresh'] = refresh_token - + adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=store_tokens + get_tokens=get_tokens, store_tokens=store_tokens ) - + token = AccessToken( access_token='test_access_token', refresh_token='test_refresh_token', expires_in=3600, - token_type='bearer' + token_type='bearer', ) - + adapter.store(token) - + assert stored_tokens['access'] == 'test_access_token' assert stored_tokens['refresh'] == 'test_refresh_token' def test_store_without_store_callback(self): """Test that store works even without a store callback.""" + def get_tokens(): return (None, None) - - adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=None - ) - + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + token = AccessToken( access_token='test_access_token', refresh_token='test_refresh_token', expires_in=3600, - token_type='bearer' + token_type='bearer', ) - + # Should not raise an error adapter.store(token) def test_get_with_tokens(self): """Test retrieving tokens when they exist.""" + def get_tokens(): return ('test_access_token', 'test_refresh_token') - - adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=None - ) - + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + token = adapter.get() - + assert token is not None assert token.access_token == 'test_access_token' assert token.refresh_token == 'test_refresh_token' @@ -83,30 +78,26 @@ def get_tokens(): def test_get_without_tokens(self): """Test retrieving tokens when they don't exist.""" + def get_tokens(): return (None, None) - - adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=None - ) - + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + token = adapter.get() - + assert token is None def test_get_with_only_access_token(self): """Test retrieving when only access token is available.""" + def get_tokens(): return ('test_access_token', None) - - adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=None - ) - + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + token = adapter.get() - + assert token is not None assert token.access_token == 'test_access_token' assert token.refresh_token is None @@ -114,51 +105,46 @@ def get_tokens(): def test_clear_with_store_callback(self): """Test clearing tokens using the legacy store callback.""" stored_tokens = {'access': 'token', 'refresh': 'refresh_token'} - + def get_tokens(): return (stored_tokens.get('access'), stored_tokens.get('refresh')) - + def store_tokens(access_token, refresh_token): if access_token is None: stored_tokens.clear() else: stored_tokens['access'] = access_token stored_tokens['refresh'] = refresh_token - + adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=store_tokens + get_tokens=get_tokens, store_tokens=store_tokens ) - + adapter.clear() - + assert stored_tokens == {} def test_clear_without_store_callback(self): """Test that clear works even without a store callback.""" + def get_tokens(): return ('token', 'refresh_token') - - adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=None - ) - + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + # Should not raise an error adapter.clear() def test_token_conversion_format(self): """Test that tokens are converted to the correct AccessToken format.""" + def get_tokens(): return ('access_123', 'refresh_456') - - adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=None - ) - + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + token = adapter.get() - + assert isinstance(token, AccessToken) assert token.access_token == 'access_123' assert token.refresh_token == 'refresh_456' @@ -168,32 +154,30 @@ def get_tokens(): def test_store_and_get_roundtrip(self): """Test storing and retrieving tokens in a roundtrip.""" stored_tokens = {} - + def get_tokens(): return (stored_tokens.get('access'), stored_tokens.get('refresh')) - + def store_tokens(access_token, refresh_token): stored_tokens['access'] = access_token stored_tokens['refresh'] = refresh_token - + adapter = LegacyTokenStorageAdapter( - get_tokens=get_tokens, - store_tokens=store_tokens + get_tokens=get_tokens, store_tokens=store_tokens ) - + # Store a token original_token = AccessToken( access_token='stored_access', refresh_token='stored_refresh', expires_in=7200, - token_type='bearer' + token_type='bearer', ) adapter.store(original_token) - + # Retrieve it retrieved_token = adapter.get() - + assert retrieved_token is not None assert retrieved_token.access_token == 'stored_access' assert retrieved_token.refresh_token == 'stored_refresh' - From 8b00f8296b793676fb913175aad7398dc49af12f Mon Sep 17 00:00:00 2001 From: Minh Nguyen Cong Date: Tue, 20 Jan 2026 11:20:22 +0100 Subject: [PATCH 4/5] update code --- boxsdk/client/client.py | 14 ++++++-------- docs/config-sharing-implementation.md | 7 ++----- test/boxsdk/unit/client/test_config_sharing.py | 5 +---- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/boxsdk/client/client.py b/boxsdk/client/client.py index 2f5bf1e78..20af8e0e5 100644 --- a/boxsdk/client/client.py +++ b/boxsdk/client/client.py @@ -2043,8 +2043,6 @@ def get_authentication(self, *, token_storage=None): Authentication object compatible with the generated SDK (box_sdk_gen). :raises ValueError: If the authentication type is not supported or required credentials are missing. - :raises ValueError: - If the authentication type is not supported or required credentials are missing. """ oauth = self._oauth @@ -2207,7 +2205,6 @@ def get_network_session( network_client=None, retry_strategy=None, data_sanitizer=None, - interceptors=None, additional_headers=None, ): """ @@ -2222,8 +2219,6 @@ def get_network_session( If not provided, one will be created from legacy retry settings. :param data_sanitizer: Optional DataSanitizer instance for the generated SDK. - :param interceptors: - Optional list of request/response interceptors. :param additional_headers: Optional dictionary of additional HTTP headers to merge with legacy headers. :return: @@ -2275,9 +2270,14 @@ def get_network_session( if hasattr(proxy_config, 'AUTH') and proxy_config.AUTH: auth = proxy_config.AUTH if isinstance(auth, dict) and 'user' in auth and 'password' in auth: + scheme = ( + proxy_url.split('://', 1)[0] if '://' in proxy_url else 'http' + ) # Extract host from URL host = proxy_url.split('//')[1] if '//' in proxy_url else proxy_url - proxy_url = f"http://{auth['user']}:{auth['password']}@{host}" + proxy_url = ( + f"{scheme}://{auth['user']}:{auth['password']}@{host}" + ) # Merge custom headers headers = {} @@ -2314,7 +2314,6 @@ def get_sdk_gen_client(self, *, auth_options=None, network_options=None): - network_client: Custom NetworkClient instance - retry_strategy: Custom RetryStrategy instance - data_sanitizer: Custom DataSanitizer instance - - interceptors: List of request/response interceptors - additional_headers: Dictionary of additional HTTP headers :return: BoxClient instance from box_sdk_gen, fully configured with shared settings. @@ -2333,7 +2332,6 @@ def get_sdk_gen_client(self, *, auth_options=None, network_options=None): 'network_client': network_options.get('network_client'), 'retry_strategy': network_options.get('retry_strategy'), 'data_sanitizer': network_options.get('data_sanitizer'), - 'interceptors': network_options.get('interceptors'), 'additional_headers': network_options.get('additional_headers'), } diff --git a/docs/config-sharing-implementation.md b/docs/config-sharing-implementation.md index 07e960cb2..20e2d9947 100644 --- a/docs/config-sharing-implementation.md +++ b/docs/config-sharing-implementation.md @@ -58,7 +58,6 @@ Extracts network configuration from the legacy client and converts it to a gener - `network_client` (optional): Custom `NetworkClient` instance - `retry_strategy` (optional): Custom `RetryStrategy` instance - `data_sanitizer` (optional): Custom `DataSanitizer` instance -- `interceptors` (optional): List of request/response interceptors - `additional_headers` (optional): Dictionary of additional HTTP headers **Returns:** @@ -88,7 +87,6 @@ Creates a fully configured generated SDK client from the legacy client. This is - `network_client`: Custom `NetworkClient` instance - `retry_strategy`: Custom `RetryStrategy` instance - `data_sanitizer`: Custom `DataSanitizer` instance - - `interceptors`: List of request/response interceptors - `additional_headers`: Dictionary of additional HTTP headers **Returns:** @@ -250,10 +248,9 @@ Network settings are extracted from: ## Error Handling -The implementation includes comprehensive error handling: +The implementation raises: -- `ValueError`: Raised for unsupported auth types or missing credentials -- `TypeError`: Raised for invalid parameter types +- `ValueError`: for unsupported auth types or missing credentials Note: Since `boxsdk` and `box_sdk_gen` are always installed together, import errors for `box_sdk_gen` should not occur in practice. diff --git a/test/boxsdk/unit/client/test_config_sharing.py b/test/boxsdk/unit/client/test_config_sharing.py index b9b92a5e5..23a37c3fe 100644 --- a/test/boxsdk/unit/client/test_config_sharing.py +++ b/test/boxsdk/unit/client/test_config_sharing.py @@ -4,10 +4,7 @@ Tests get_authentication(), get_network_session(), and get_sdk_gen_client() methods. """ -from unittest.mock import Mock, MagicMock, patch -import pytest - -from unittest.mock import Mock, MagicMock, patch +from unittest.mock import Mock import pytest from boxsdk.client import Client From 0a156811840311ec42f9e3458a44f3afd9d30a04 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Cong Date: Tue, 20 Jan 2026 11:55:29 +0100 Subject: [PATCH 5/5] update code --- boxsdk/client/client.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/boxsdk/client/client.py b/boxsdk/client/client.py index 20af8e0e5..c09d082d8 100644 --- a/boxsdk/client/client.py +++ b/boxsdk/client/client.py @@ -2275,9 +2275,7 @@ def get_network_session( ) # Extract host from URL host = proxy_url.split('//')[1] if '//' in proxy_url else proxy_url - proxy_url = ( - f"{scheme}://{auth['user']}:{auth['password']}@{host}" - ) + proxy_url = f"{scheme}://{auth['user']}:{auth['password']}@{host}" # Merge custom headers headers = {}