Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions src/google/adk/agents/config_agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from __future__ import annotations

import importlib
import logging
import re
import inspect
import os
from typing import Any
Expand All @@ -28,6 +30,80 @@
from .agent_config import AgentConfig
from .base_agent import BaseAgent
from .base_agent_config import BaseAgentConfig

logger = logging.getLogger("google_adk." + __name__)

# Modules that must never be loaded via YAML agent configuration.
# Importing these can lead to arbitrary code execution, file system
# access, or process spawning.
_BLOCKED_MODULE_PREFIXES: tuple[str, ...] = (
"os",
"sys",
"subprocess",
"shutil",
"socket",
"http",
"ctypes",
"multiprocessing",
"signal",
"importlib",
"pickle",
"shelve",
"marshal",
"code",
"codeop",
"compile",
"compileall",
"runpy",
"builtins",
"io",
"tempfile",
"glob",
"pathlib",
"webbrowser",
"antigravity",
)

_VALID_MODULE_PATH_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_.]*$")


def _validate_module_path(module_path: str) -> None:
"""Validates that a module path is safe to import.

Rejects module paths that reference dangerous standard library modules
or contain suspicious patterns.

Args:
module_path: Dotted Python module path (e.g., 'my_package.my_module').

Raises:
ValueError: If the module path is blocked or invalid.
"""
if not module_path:
raise ValueError("Module path must not be empty.")

if not _VALID_MODULE_PATH_RE.match(module_path):
raise ValueError(
f"Module path {module_path!r} contains invalid characters."
)

# Check for dunder/private module segments.
segments = module_path.split(".")
for segment in segments:
if segment.startswith("__") and segment.endswith("__"):
raise ValueError(
f"Module path {module_path!r} contains dunder segment"
f" {segment!r}."
)

# Block dangerous top-level modules.
top_level = segments[0]
if top_level in _BLOCKED_MODULE_PREFIXES:
raise ValueError(
f"Module path {module_path!r} references blocked module"
f" {top_level!r}. Importing arbitrary standard library modules"
" via YAML configuration is not permitted."
)
from .common_configs import AgentRefConfig
from .common_configs import CodeConfig

Expand Down Expand Up @@ -109,6 +185,7 @@ def _load_config_from_path(config_path: str) -> AgentConfig:
def resolve_fully_qualified_name(name: str) -> Any:
try:
module_path, obj_name = name.rsplit(".", 1)
_validate_module_path(module_path)
module = importlib.import_module(module_path)
return getattr(module, obj_name)
except Exception as e:
Expand Down Expand Up @@ -161,6 +238,7 @@ def _resolve_agent_code_reference(code: str) -> Any:
raise ValueError(f"Invalid code reference: {code}")

module_path, obj_name = code.rsplit(".", 1)
_validate_module_path(module_path)
module = importlib.import_module(module_path)
obj = getattr(module, obj_name)

Expand Down Expand Up @@ -190,6 +268,7 @@ def resolve_code_reference(code_config: CodeConfig) -> Any:
raise ValueError("Invalid CodeConfig.")

module_path, obj_name = code_config.name.rsplit(".", 1)
_validate_module_path(module_path)
module = importlib.import_module(module_path)
return getattr(module, obj_name)

Expand Down
99 changes: 99 additions & 0 deletions tests/unittests/agents/test_config_module_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for module path validation in YAML agent config resolution."""

import pytest

from google.adk.agents.config_agent_utils import _validate_module_path


class TestValidateModulePath:
"""Tests for _validate_module_path blocklist enforcement."""

def test_safe_module_passes(self):
"""User-defined modules should pass validation."""
_validate_module_path("my_app.agents.my_agent")
_validate_module_path("google.adk.agents")
_validate_module_path("my_package")

def test_os_module_blocked(self):
"""os module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("os")

def test_os_path_blocked(self):
"""os.path should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("os.path")

def test_subprocess_blocked(self):
"""subprocess module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("subprocess")

def test_sys_blocked(self):
"""sys module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("sys")

def test_shutil_blocked(self):
"""shutil module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("shutil")

def test_pickle_blocked(self):
"""pickle module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("pickle")

def test_importlib_blocked(self):
"""importlib module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("importlib")

def test_builtins_blocked(self):
"""builtins module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("builtins")

def test_socket_blocked(self):
"""socket module should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("socket")

def test_empty_path_blocked(self):
"""Empty module path should be rejected."""
with pytest.raises(ValueError, match="must not be empty"):
_validate_module_path("")

def test_invalid_characters_blocked(self):
"""Module paths with special characters should be rejected."""
with pytest.raises(ValueError, match="invalid characters"):
_validate_module_path("os;import sys")

def test_dunder_segment_blocked(self):
"""Module paths with __dunder__ segments should be rejected."""
with pytest.raises(ValueError, match="dunder segment"):
_validate_module_path("my_app.__builtins__.evil")

def test_google_adk_passes(self):
"""google.adk modules should pass (not blocked)."""
_validate_module_path("google.adk.tools.my_tool")
_validate_module_path("google.adk.agents.llm_agent")

def test_multiprocessing_blocked(self):
"""multiprocessing should be blocked."""
with pytest.raises(ValueError, match="blocked module"):
_validate_module_path("multiprocessing.pool")