Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nac_test/pyats_core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
# PyATS-specific file paths
AUTH_CACHE_DIR: str = os.path.join(tempfile.gettempdir(), "nac-test-auth-cache")

# PyATS config files written to output directory during test execution
PYATS_PLUGIN_CONFIG_FILENAME: str = ".pyats_plugin.yaml"
PYATS_CONFIG_FILENAME: str = ".pyats.conf"

# pushed to pyats device connection settings to speed up disconnects (default is 10s/1s)
PYATS_POST_DISCONNECT_WAIT_SECONDS: int = 0
PYATS_GRACEFUL_DISCONNECT_WAIT_SECONDS: int = 0
Expand Down
172 changes: 84 additions & 88 deletions nac_test/pyats_core/execution/subprocess_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import logging
import os
import sysconfig
import tempfile
import textwrap
import time
from collections.abc import Callable
Expand All @@ -19,34 +18,51 @@
from nac_test.pyats_core.constants import (
PIPE_DRAIN_DELAY_SECONDS,
PIPE_DRAIN_TIMEOUT_SECONDS,
PYATS_CONFIG_FILENAME,
PYATS_OUTPUT_BUFFER_LIMIT,
PYATS_PLUGIN_CONFIG_FILENAME,
)
from nac_test.utils.logging import DEFAULT_LOGLEVEL, LogLevel

logger = logging.getLogger(__name__)


# disable EnvironmentDebugPlugin to prevent sensitive environment vars
# from being logged by PyATS
PLUGIN_CONFIG = textwrap.dedent("""\
plugins:
ProgressReporterPlugin:
enabled: True
module: nac_test.pyats_core.progress.plugin
order: 1.0
EnvironmentDebugPlugin:
enabled: False
""")

PYATS_CONFIG = textwrap.dedent("""\
[report]
git_info = false
""")


class SubprocessRunner:
"""Executes PyATS jobs as subprocesses and handles their output."""

def __init__(
self,
output_dir: Path,
output_handler: Callable[[str], None],
plugin_config_path: Path | None = None,
loglevel: LogLevel = DEFAULT_LOGLEVEL,
):
"""Initialize the subprocess runner.

Args:
output_dir: Directory for test output
output_handler: Function to process each line of stdout
plugin_config_path: Path to the PyATS plugin configuration file
loglevel: Logging level to pass to PyATS CLI
"""
self.output_dir = output_dir
self.output_handler = output_handler
self.plugin_config_path = plugin_config_path
self.loglevel = loglevel

# Ensure pyats is in the same environment as nac-test
Expand All @@ -57,20 +73,68 @@ def __init__(
)
self.pyats_executable = str(pyats_path)

self._plugin_config_file: Path | None = None
self._pyats_config_file: Path | None = None
self._create_config_files()

def _create_config_files(self) -> None:
"""Create config files for PyATS execution in the output directory.

Raises:
RuntimeError: If file creation fails
"""
plugin_config_file = self.output_dir / PYATS_PLUGIN_CONFIG_FILENAME
pyats_config_file = self.output_dir / PYATS_CONFIG_FILENAME

try:
plugin_config_file.write_text(PLUGIN_CONFIG)
pyats_config_file.write_text(PYATS_CONFIG)
except OSError as e:
# Clean up any successfully written files before raising
plugin_config_file.unlink(missing_ok=True)
pyats_config_file.unlink(missing_ok=True)
raise RuntimeError(f"Failed to create PyATS config files: {e}") from e

self._plugin_config_file = plugin_config_file
self._pyats_config_file = pyats_config_file
logger.debug(f"Created plugin_config {self._plugin_config_file}")
logger.debug(f"Created pyats_config {self._pyats_config_file}")

def cleanup(self) -> None:
"""Remove config files created during initialization.

Called explicitly by the orchestrator after normal execution. Also called
opportunistically from __del__ for unexpected exits (best-effort only).
"""
for config_file in [self._plugin_config_file, self._pyats_config_file]:
if config_file is not None:
config_file.unlink(missing_ok=True)
logger.debug(f"Cleaned up config file: {config_file}")

def __del__(self) -> None:
"""Opportunistic cleanup on garbage collection.

Not guaranteed: CPython-specific, not called on SIGKILL or interpreter shutdown.
Handles unexpected exits without complicating call sites in the orchestrator.
A more robust cleanup mechanism will be implemented as part of #677 (which
primarily targets a different file, but config file cleanup will be included) —
until then, leaked config files are acceptable (contents not sensitive, files small).
"""
try:
self.cleanup()
except Exception:
pass # Best-effort: never raise from __del__

def _build_command(
self,
job_file_path: Path,
plugin_config_file: str,
pyats_config_file: str,
archive_name: str,
testbed_file_path: Path | None = None,
) -> list[str]:
"""Build the PyATS command with all arguments.

Args:
job_file_path: Path to the job file
plugin_config_file: Path to the plugin configuration file
pyats_config_file: Path to the PyATS configuration file
archive_name: Name for the archive file
testbed_file_path: Optional path to the testbed file (for D2D tests)

Expand All @@ -87,12 +151,20 @@ def _build_command(
if testbed_file_path is not None:
cmd.extend(["--testbed-file", str(testbed_file_path)])

# Unreachable in practice: _create_config_files() always sets these in __init__,
# or raises before __init__ completes. Guard exists for mypy type narrowing only.
if (
self._plugin_config_file is None or self._pyats_config_file is None
): # pragma: no cover
raise RuntimeError(
"Config files not initialized — this is a bug in SubprocessRunner."
)
cmd.extend(
[
"--configuration",
plugin_config_file,
str(self._plugin_config_file),
"--pyats-configuration",
pyats_config_file,
str(self._pyats_config_file),
"--archive-dir",
str(self.output_dir),
"--archive-name",
Expand Down Expand Up @@ -131,51 +203,12 @@ async def execute_job(
Returns:
Path to the archive file if successful, None otherwise
"""
# Create plugin configuration for progress reporting
plugin_config_file = None
pyats_config_file = None
try:
plugin_config = textwrap.dedent("""
plugins:
ProgressReporterPlugin:
enabled: True
module: nac_test.pyats_core.progress.plugin
order: 1.0
EnvironmentDebugPlugin:
enabled: False
""")

with tempfile.NamedTemporaryFile(
mode="w", suffix="_plugin_config.yaml", delete=False
) as f:
f.write(plugin_config)
plugin_config_file = f.name
logger.debug(
f"Created plugin_config {plugin_config_file} with content\n{plugin_config}"
)

# Create PyATS configuration to disable git_info collection
# This prevents fork() crashes on macOS with Python 3.12+ caused by
# CoreFoundation lock corruption in get_git_info()
pyats_config = "[report]\ngit_info = false\n"
with tempfile.NamedTemporaryFile(
mode="w", suffix="_pyats_config.conf", delete=False
) as f:
f.write(pyats_config)
pyats_config_file = f.name
logger.debug(f"Created pyats_config {pyats_config_file}")

except Exception as e:
logger.warning(f"Failed to create config files: {e}")
# If we can't create config files, we should probably fail
return None

# Generate archive name with timestamp
job_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
archive_name = f"nac_test_job_{job_timestamp}.zip"

cmd = self._build_command(
job_file_path, plugin_config_file, pyats_config_file, archive_name
job_file_path,
archive_name,
)

logger.info(f"Executing command: {' '.join(cmd)}")
Expand Down Expand Up @@ -241,49 +274,12 @@ async def execute_job_with_testbed(
Returns:
Path to the archive file if successful, None otherwise
"""
# Create plugin configuration for progress reporting
plugin_config_file = None
pyats_config_file = None
try:
plugin_config = textwrap.dedent("""
plugins:
ProgressReporterPlugin:
enabled: True
module: nac_test.pyats_core.progress.plugin
order: 1.0
EnvironmentDebugPlugin:
enabled: False
""")

with tempfile.NamedTemporaryFile(
mode="w", suffix="_plugin_config.yaml", delete=False
) as f:
f.write(plugin_config)
plugin_config_file = f.name

# Create PyATS configuration to disable git_info collection
# This prevents fork() crashes on macOS with Python 3.12+ caused by
# CoreFoundation lock corruption in get_git_info()
pyats_config = "[report]\ngit_info = false\n"
with tempfile.NamedTemporaryFile(
mode="w", suffix="_pyats_config.conf", delete=False
) as f:
f.write(pyats_config)
pyats_config_file = f.name

except Exception as e:
logger.warning(f"Failed to create config files: {e}")
# If we can't create config files, we should probably fail
return None

# Get device ID from environment for archive naming
hostname = env.get("HOSTNAME", "unknown")
archive_name = f"pyats_archive_device_{hostname}"

cmd = self._build_command(
job_file_path,
plugin_config_file,
pyats_config_file,
archive_name,
testbed_file_path=testbed_file_path,
)
Expand Down
Loading
Loading