Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/connectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ connectors:
sa_json_secret: GOOGLE_DRIVE_SA_JSON
scopes:
- https://www.googleapis.com/auth/drive
# OIDC / per-user Drive (ToolHive upstream bearer passthrough):
# auth:
# provider: upstream_bearer
# Or: GOOGLE_DRIVE_AUTH_PROVIDER=upstream_bearer (overrides yaml when set)

fhir_epic:
enabled: true
Expand Down
30 changes: 30 additions & 0 deletions docs/google_drive_connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,36 @@ For **MCP** (e.g. ToolHive), tools are named `google_drive.<action>` from the co

---

## User OAuth (OIDC / upstream bearer)

For **per-user Google Drive access** (each caller uses their own Drive), set:

```yaml
google_drive:
auth:
provider: upstream_bearer
```

Or set the environment variable (overrides `connectors.yaml` when present):

```env
GOOGLE_DRIVE_AUTH_PROVIDER=upstream_bearer
```

Allowed values: `service_account` (default), `upstream_bearer`.

Run the **google-drive-only** MCP server (`python -m agents.google_drive_mcp`) with `NW_MCP_TRANSPORT=streamable-http`. The `Authorization: Bearer` token on each MCP request must be the Google access token (typically issued via ToolHive embedded OIDC). Do **not** set `NW_MCP_API_KEY`, `NW_MCP_JWT_SECRET`, or `GOOGLE_DRIVE_SA_JSON` for this profile.

**ToolHive OIDC manifests:** copy and adapt from [mcp-builder `out/google-drive-mcp/deploy/`](https://github.com/stacklok/mcp-builder/tree/main/out/google-drive-mcp/deploy) (`mcpexternalauthconfig.yaml`, `mcpoidcconfig.yaml`, `mcpserver.yaml`) — use image/entrypoint `nw-google-drive` / `python -m agents.google_drive_mcp`.

**Ponytail:** Passthrough MCP auth applies only when this server exposes `google_drive` alone with `upstream_bearer`. The unified `mcp_entrypoint` with multiple connectors keeps API-key/JWT MCP auth.

With `NW_MCP_SCOPE_POLICY_DEFAULT=deny` (recommended for production), the google-drive MCP server auto-grants the per-action MCP scopes (`mcp:google_drive.<action>`) from its manifest to upstream bearer callers so `tools/list` is not empty. Google OAuth on the `Authorization: Bearer` token remains the boundary for Drive API access—refresh that access token when Drive calls fail with auth errors.

For **shared-folder automation** (single service identity), keep the [service account setup](#google-drive-service-account-setup) below.

---

## Google Drive service account setup

This guide walks you through creating a Google Cloud service account and connecting it to Node Wire. A service account is a special type of Google account used by applications (rather than humans) to authenticate with Google APIs.
Expand Down
3 changes: 3 additions & 0 deletions sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ CERNER_SCOPES="system/Patient.read system/Encounter.read system/DocumentReferenc
# Google Drive
GOOGLE_DRIVE_SA_JSON=/absolute/path/to/service-account.json
GOOGLE_DRIVE_FOLDER_ID=your-google-drive-folder-id
# Auth profile: service_account (default) or upstream_bearer (MCP OIDC per-user Drive)
# GOOGLE_DRIVE_AUTH_PROVIDER=service_account
# GOOGLE_DRIVE_AUTH_PROVIDER=upstream_bearer

# SMTP
SMTP_HOST=smtp.gmail.com
Expand Down
47 changes: 46 additions & 1 deletion src/bindings/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@
_PLATFORM_ROOT = Path(__file__).resolve().parent.parent.parent
_DEFAULT_CONFIG_PATH = _PLATFORM_ROOT / "config" / "connectors.yaml"

_GOOGLE_DRIVE_AUTH_PROVIDER_ENV = "GOOGLE_DRIVE_AUTH_PROVIDER"
_GOOGLE_DRIVE_AUTH_PROVIDERS = frozenset({"service_account", "upstream_bearer"})


def _resolve_google_drive_auth(auth_cfg: dict[str, Any]) -> dict[str, Any]:
"""Apply GOOGLE_DRIVE_AUTH_PROVIDER env override when set (wins over connectors.yaml)."""
override = os.environ.get(_GOOGLE_DRIVE_AUTH_PROVIDER_ENV, "").strip()
if not override:
return auth_cfg
if override not in _GOOGLE_DRIVE_AUTH_PROVIDERS:
raise ValueError(
f"{_GOOGLE_DRIVE_AUTH_PROVIDER_ENV} must be one of "
f"{sorted(_GOOGLE_DRIVE_AUTH_PROVIDERS)!r}, got {override!r}"
)
merged = dict(auth_cfg)
merged["provider"] = override
return merged


def _resolve_env_vars(data: Any) -> Any:
if isinstance(data, dict):
Expand Down Expand Up @@ -174,12 +192,15 @@ def load(self) -> None:
for connector_id, cfg in connectors_cfg.items():
enabled = bool(cfg.get("enabled", False))
exposed_via = list(cfg.get("exposed_via", []))
cfg_raw: Dict[str, Any] = dict(cfg)
if connector_id == "google_drive":
cfg_raw["auth"] = _resolve_google_drive_auth(cfg_raw.get("auth") or {})

self._configs[connector_id] = ConnectorConfig(
id=connector_id,
enabled=enabled,
exposed_via=exposed_via,
raw=cfg,
raw=cfg_raw,
)

if not enabled:
Expand Down Expand Up @@ -215,6 +236,8 @@ def _build_auth_provider(self, connector_id: str, cfg: dict) -> Any:
)

auth_cfg = cfg.get("auth") or {}
if connector_id == "google_drive":
auth_cfg = _resolve_google_drive_auth(auth_cfg)
provider_type = auth_cfg.get("provider", "none")

if provider_type in ("none", ""):
Expand Down Expand Up @@ -254,6 +277,28 @@ def _build_auth_provider(self, connector_id: str, cfg: dict) -> Any:
scopes=auth_cfg.get("scopes"),
)

if provider_type == "upstream_bearer":
from node_wire_runtime.auth.base import AuthProvider, get_upstream_bearer

class _UpstreamBearerProvider(AuthProvider): # type: ignore[misc]
per_request_credentials = True

async def get_headers(self) -> dict:
token = get_upstream_bearer()
if not token:
raise RuntimeError("Upstream bearer token required")
return {"Authorization": f"Bearer {token}"}

async def get_client_credentials(self): # type: ignore[override]
from google.oauth2.credentials import Credentials # type: ignore[import]

token = get_upstream_bearer()
if not token:
return None
return Credentials(token=token)

return _UpstreamBearerProvider()

if provider_type == "static_credentials":
# SMTP-style: returns (username, password) tuple via get_client_credentials().
# We use a lightweight wrapper around StaticTokenAuthProvider pair.
Expand Down
43 changes: 42 additions & 1 deletion src/bindings/mcp_server/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#
from __future__ import annotations

import contextvars
import os
import logging
from pathlib import Path
Expand All @@ -19,8 +20,13 @@
decode_binding_jwt,
parse_api_key_scopes_from_env,
)
from node_wire_runtime.auth.base import reset_upstream_bearer, set_upstream_bearer

logger = logging.getLogger("bindings.mcp_server.auth")
logger = logging.getLogger(__name__)

_upstream_reset_ctx: contextvars.ContextVar[contextvars.Token | None] = contextvars.ContextVar(
"_mcp_upstream_reset", default=None
)

# Back-compat: callers may still import ``McpIdentity`` / ``build_identity`` from MCP auth.
McpIdentity = CallerIdentity
Expand Down Expand Up @@ -227,7 +233,34 @@ def authenticate_mcp_request(
*,
headers: Mapping[str, Any] | None = None,
meta: Mapping[str, Any] | None = None,
upstream_passthrough: bool = False,
upstream_granted_scopes: tuple[str, ...] = (),
) -> CallerIdentity | None:
if upstream_passthrough:
token = extract_token(headers=headers, meta=meta)
if not token:
if mcp_auth_disabled():
return None
raise McpAuthRequiredError()
_upstream_reset_ctx.set(set_upstream_bearer(token))
if mcp_auth_disabled():
return None
# Ponytail: MCP scopes gate tool visibility on this server; the Google OAuth
# access token on the request is the upstream authz boundary for Drive API.
identity = build_caller_identity(
{
"sub": "upstream-bearer",
"tenant_id": None,
"scopes": list(upstream_granted_scopes),
},
"upstream_bearer",
)
logger.info(
"MCP upstream passthrough accepted",
extra={"auth_type": identity.auth_type, "principal": identity.principal},
)
return identity

logger.info(
"MCP auth gate status",
extra={
Expand Down Expand Up @@ -259,3 +292,11 @@ def authenticate_mcp_request(
},
)
return identity


def reset_upstream_passthrough_context() -> None:
"""Clear upstream bearer set during passthrough auth (call in middleware finally)."""
reset_tok = _upstream_reset_ctx.get()
if reset_tok is not None:
reset_upstream_bearer(reset_tok)
_upstream_reset_ctx.set(None)
61 changes: 60 additions & 1 deletion src/bindings/mcp_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
from bindings.mcp_server.auth import (
McpAuthError,
authenticate_mcp_request,
reset_upstream_passthrough_context,
log_effective_mcp_auth_state,
)
from node_wire_runtime.caller_identity import CallerIdentity
from node_wire_runtime.policies.mcp_scope_policy import (
action_allowed_for_identity_scopes,
load_scope_map_from_env,
load_scope_policy_default_from_env,
resolve_required_scope_for_action,
)
from node_wire_runtime.connector_registry import auto_register
from node_wire_runtime.manifest import MCP_MANIFEST_CONTRACT_VERSION, build_manifest
Expand Down Expand Up @@ -115,6 +117,45 @@ def _process_response_payload(data: Any, max_items: int) -> Tuple[Any, bool, int
return data, False, 0, next_page_token


def _resolve_upstream_passthrough(
factory: ConnectorFactory,
connector_ids: frozenset[str] | None,
) -> bool:
"""Enable when google_drive-only MCP server uses upstream_bearer auth."""
if connector_ids != frozenset({"google_drive"}):
return False
cfg = factory._configs.get("google_drive")
if cfg is None:
return False
auth = cfg.raw.get("auth") or {}
return auth.get("provider") == "upstream_bearer"


def _upstream_passthrough_scopes(
factory: ConnectorFactory,
connector_ids: frozenset[str] | None,
) -> tuple[str, ...]:
if connector_ids is None:
return ()
scope_map = load_scope_map_from_env()
default_mode = load_scope_policy_default_from_env()
manifest = build_manifest(factory.list_for_protocol("mcp"))
scopes: set[str] = set()
for entry in manifest:
cid = entry["connector_id"]
if cid not in connector_ids:
continue
required = resolve_required_scope_for_action(
connector_id=cid,
action=str(entry["action"]),
action_scope_map=scope_map,
default_mode=default_mode,
)
if required:
scopes.add(required)
return tuple(sorted(scopes))


class McpServer:
"""
Manifest-driven MCP server: tools come from connector metadata; execution
Expand All @@ -137,6 +178,14 @@ def __init__(
auto_register()
self._factory = ConnectorFactory()
self._factory.load()
self._upstream_passthrough = _resolve_upstream_passthrough(
self._factory, self._connector_ids
)
self._upstream_passthrough_scopes = (
_upstream_passthrough_scopes(self._factory, self._connector_ids)
if self._upstream_passthrough
else ()
)
try:
from importlib.metadata import version as pkg_version

Expand Down Expand Up @@ -225,6 +274,8 @@ def _ensure_identity(
return authenticate_mcp_request(
headers=_http_request_headers.get(),
meta=meta,
upstream_passthrough=self._upstream_passthrough,
upstream_granted_scopes=self._upstream_passthrough_scopes,
)

def _request_meta_from_context(self) -> Mapping[str, Any] | None:
Expand Down Expand Up @@ -495,6 +546,9 @@ def _build_streamable_http_app(self, *, session_manager: Any, path: str) -> Any:
from starlette.responses import JSONResponse
from starlette.routing import Route

upstream_passthrough = self._upstream_passthrough
upstream_granted_scopes = self._upstream_passthrough_scopes

@asynccontextmanager
async def lifespan(app: Starlette):
async with session_manager.run():
Expand All @@ -505,7 +559,11 @@ async def dispatch(self, request: Request, call_next): # type: ignore[override]
if request.url.path != path:
return await call_next(request)
try:
identity = authenticate_mcp_request(headers=request.headers)
identity = authenticate_mcp_request(
headers=request.headers,
upstream_passthrough=upstream_passthrough,
upstream_granted_scopes=upstream_granted_scopes,
)
except McpAuthError as exc:
headers: Dict[str, str] = {}
if exc.www_authenticate:
Expand All @@ -522,6 +580,7 @@ async def dispatch(self, request: Request, call_next): # type: ignore[override]
return await call_next(request)
finally:
_streamable_http_identity_ctx.reset(token)
reset_upstream_passthrough_context()

# Use a wrapper class to ensure Starlette treats this as an ASGI app
# without the automatic redirection logic of Mount().
Expand Down
37 changes: 22 additions & 15 deletions src/node_wire_google_drive/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,21 @@ class GoogleDriveConnector(BaseConnector):
def build_client(self) -> Any:
import asyncio

# get_client_credentials() is async; run it synchronously here since
# build_client() is called from the synchronous get_client() accessor.
async def _fetch_creds() -> Any:
return await self._auth_provider.get_client_credentials()

try:
loop = asyncio.get_event_loop()
except RuntimeError:
creds = asyncio.run(_fetch_creds())
else:
if loop.is_running():
# In an async context, we can't use run_until_complete.
# Instead, fetch credentials synchronously via the underlying
# ServiceAccountAuthProvider._build_credentials() pattern.
# This code path is reached during connector initialisation
# inside an async frame (e.g. in tests with pytest-asyncio).
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
creds = pool.submit(
lambda: asyncio.run(self._auth_provider.get_client_credentials())
).result()
creds = pool.submit(asyncio.run, _fetch_creds()).result()
else:
creds = loop.run_until_complete(self._auth_provider.get_client_credentials())
except RuntimeError:
creds = asyncio.run(self._auth_provider.get_client_credentials())

creds = loop.run_until_complete(_fetch_creds())
if creds is None:
# Fallback for NoAuthProvider or unconfigured provider —
# attempt direct secret resolution for backward compatibility.
Expand Down Expand Up @@ -111,6 +105,13 @@ def build_client(self) -> Any:

return build("drive", "v3", credentials=creds)

def get_client(self) -> Any:
if getattr(self._auth_provider, "per_request_credentials", False):
return self.build_client()
if self._client is None:
self._client = self.build_client()
return self._client

def _translate_and_raise_http_error(self, exc: HttpError) -> None:
status = exc.resp.status
content_str = str(getattr(exc, "content", "") or "")
Expand Down Expand Up @@ -140,7 +141,13 @@ async def _execute_action_spec(
spec = GOOGLE_DRIVE_ACTION_SPECS.get(action_name)
if spec is None:
raise ValueError(f"No action spec registered for {action_name!r}")
drive = self.get_client()
if getattr(self._auth_provider, "per_request_credentials", False):
creds = await self._auth_provider.get_client_credentials()
if creds is None:
raise GoogleDriveAuthError("Upstream bearer token required")
drive = build("drive", "v3", credentials=creds)
else:
drive = self.get_client()
extra = {"trace_id": trace_id, **(log_extra or {})}
logger.info("Google Drive %s", action_name, extra=extra)
try:
Expand Down
Loading
Loading