MultiEnvEmployer is a library for safely executing code from different Python virtual environments as regular functions.
Execute functions from isolated environments with different Python versions and conflicting dependencies without import conflicts or version issues.
- Project Purpose
- Installation
- Quick Start
- Core Concepts
- API Reference
- Features
- Error Handling
- Process Management
- Advanced Usage
- Limitations
- License
MultiEnvEmployer solves the problem of:
- Running Python code in isolated virtual environments
- Calling functions as if they were local
- Transferring data between processes
- Managing execution lifetime and timeouts
- Intercepting output (
print) - Handling errors as regular exceptions
The project is not a dev tool, debugging wrapper, or build system. It is used during program runtime as an infrastructure layer.
Via pip:
pip install multi-env-employerVia repository:
git clone https://github.com/REYIL/MultiEnvEmployer.git
cd MultiEnvEmployer
pip install -e .Run the demonstration:
python main.pyThis demonstrates all library features including:
- Employer initialization with custom settings
- Module connections (stateless, stateful, cached)
- Function introspection
- Basic function calls
- Print interception (terminal and logger)
- Generators and async functions
- Stateful behavior
- Result caching
- Various data types
- Large data streaming
- Timeout modes
- Error handling
- Cross-module imports
- Process management
- Context manager usage
Run tests:
python test.pyTests verify all functionality and save results to logs/test_results_py{version}.log
Basic usage example:
from pathlib import Path
from MultiEnvEmployer import Employer, RemoteModule
# Initialize employer with target environment
emp = Employer(
project_dir=Path("path/to/modules"),
venv_path=Path("path/to/venv")
)
# Connect to remote module
module = RemoteModule(emp, "my_module")
# Call functions as if they were local
result = module.add(2, 3)
print(result) # 5
# Use context manager for automatic cleanup
with Employer("path/to/modules", "path/to/venv") as emp:
module = RemoteModule(emp, "my_module")
result = module.process_data([1, 2, 3])MultiEnvEmployer uses a process-based architecture:
- Main Process (your code) creates an
Employer - Employer spawns Worker Processes in target virtual environments
- Workers execute code and communicate via pickle protocol
- Results are returned to the main process
graph LR
A[Main Process] -->|spawn| B[Worker Process]
B -->|pickle messages| A
A -->|function call| B
B -->|result/yield/error| A
Communication uses typed messages:
| Type | Description |
|---|---|
RESULT |
Regular function return value |
YIELD |
Generator yield value |
URESULT |
Streamed large data chunk |
OUTPUT |
Intercepted print() |
DONE |
Execution completed |
ERROR |
Exception occurred |
Main class for managing worker processes.
Employer(
project_dir: Path,
venv_path: Path,
cache_path: Path = None,
pickle_protocol: int = 4
)Parameters:
project_dir- Directory containing Python modules to executevenv_path- Path to virtual environment for workerscache_path- Optional custom cache directorypickle_protocol- Pickle protocol version (default: 4)
Methods:
cache_clear()- Clear result cacheclose(modules=None)- Terminate processes (all or specific)get_functions(module_name)- Get available functions in module
Context Manager:
with Employer(project_dir, venv_path) as emp:
# Automatic cleanup on exit
passProxy for remote module access.
RemoteModule(
employer: Employer,
module_name: str,
print_output: str = "terminal",
logger: logging.Logger = None,
stateful: bool = False,
caching: bool = False,
timeout: TimeoutPolicy = None
)Parameters:
employer- Employer instancemodule_name- Module name (without .py)print_output- Output mode:"terminal","logger","terminal|logger","none"logger- Logger instance (required if output mode includes "logger")stateful- Keep process alive between calls (default: False)caching- Enable result caching (default: False)timeout- Timeout policy (default: 60s progress mode)
Properties:
__remote__.functions- Dictionary of available functions with signatures
Configuration for execution timeouts.
from MultiEnvEmployer import TimeoutPolicy
timeout = TimeoutPolicy(
seconds=60,
mode="progress" # "none", "absolute", or "progress"
)Modes:
none- No timeoutabsolute- Hard limit from function startprogress- Reset timer on any activity (print, yield, return)
Stateless (default):
- New process per function call
- No shared state between calls
- Automatic cleanup after execution
module = RemoteModule(emp, "my_module", stateful=False)
module.func1() # Process A
module.func2() # Process BStateful:
- Single process for all calls
- Shared module-level state
- Manual cleanup required
module = RemoteModule(emp, "my_module", stateful=True)
module.set_value(10) # Process A
module.get_value() # Process A (same process)All print() calls in remote modules are intercepted and redirected:
# Remote module
def my_function():
print("Hello from worker")
return 42
# Main process
module = RemoteModule(emp, "my_module", print_output="terminal")
result = module.my_function()
# Output: Hello from workerOutput modes:
"terminal"- Print to stdout"logger"- Send to logger"terminal|logger"- Both"none"- Discard output
None:
timeout = TimeoutPolicy(seconds=60, mode="none")
# No timeout, function can run indefinitelyAbsolute:
timeout = TimeoutPolicy(seconds=30, mode="absolute")
# Hard 30-second limit from startProgress:
timeout = TimeoutPolicy(seconds=10, mode="progress")
# 10 seconds of inactivity allowed
# Timer resets on print/yield/returnEnable caching to store function results:
module = RemoteModule(emp, "my_module", caching=True)
result1 = module.expensive_function(x=10) # Executes
result2 = module.expensive_function(x=10) # From cacheNotes:
- Only
RESULT(return values) are cached - Generators and yields are not cached
- Cache key includes module, function, args, and kwargs
- Cache is file-based and persistent
Generators work transparently:
# Remote module
def count_to(n):
for i in range(n):
yield i
# Main process
module = RemoteModule(emp, "my_module")
for value in module.count_to(5):
print(value) # 0, 1, 2, 3, 4Large return values are automatically streamed in chunks:
# Remote module
def get_large_list():
return ["data"] * 10_000_000 # Automatically streamed
# Main process
module = RemoteModule(emp, "my_module")
result = module.get_large_list() # Received in chunksSupported types for streaming:
strlisttuplenumpy.ndarray
Threshold: 1 MB (configurable in worker)
All errors are converted to custom exceptions:
from MultiEnvEmployer import errors
try:
result = module.failing_function()
except errors.RemoteExecutionError as e:
print(f"Remote error: {e.error_type}")
print(f"Message: {e.error_message}")
print(f"Traceback:\n{e.remote_traceback}")
except errors.RemoteTimeoutError as e:
print(f"Timeout after {e.timeout_seconds}s")
except errors.WrongArgumentsError as e:
print(f"Invalid arguments: {e.details}")Exception hierarchy:
MultiEnvEmployerError
├── RemoteError
│ ├── RemoteExecutionError
│ ├── RemoteTimeoutError
│ ├── RemoteCloseFunction
│ ├── RemoteCloseModule
│ ├── TypeMessageNotFound
│ ├── FailedIntrospectModule
│ └── RemoteFunctionNotFound
└── WrongArgumentsError
Close specific module:
emp.close(module)
emp.close("module_name")Close specific function (stateless):
emp.close("module_name.function_name")Close all processes:
emp.close()Automatic cleanup:
# Via context manager
with Employer(project_dir, venv_path) as emp:
pass # Automatic cleanup
# Via atexit (registered automatically)
emp = Employer(project_dir, venv_path)
# Cleanup on program exitAsync functions in remote modules are automatically handled:
# Remote module
async def async_operation(x):
await asyncio.sleep(1)
return x * 2
# Main process (synchronous call)
module = RemoteModule(emp, "my_module")
result = module.async_operation(5) # Returns 10Arguments are validated before execution:
# Remote module
def add(a: int, b: int) -> int:
return a + b
# Main process
module = RemoteModule(emp, "my_module")
module.add(1, 2) # OK
module.add(1) # Raises WrongArgumentsError
module.add(1, 2, 3) # Raises WrongArgumentsErrorGet available functions:
module = RemoteModule(emp, "my_module")
functions = module.__remote__.functions
for name, info in functions.items():
print(f"{name}{info['signature']}")What the library does NOT do:
- Does not optimize user code
- Does not analyze algorithms
- Does not interfere with module logic
- Does not "fix" stalled functions
Security considerations:
⚠️ CRITICAL: This library uses pickle for inter-process communication. Never use with untrusted data sources- Pickle can execute arbitrary code during deserialization
- Only use MultiEnvEmployer with code and data you control
- Not suitable for processing user-supplied data or external inputs
Known constraints:
- Pickle protocol limitations apply
- Functions must be pickle-serializable
- No shared memory between processes
- Overhead from process spawning and IPC
The project is available under the MIT License — free to use, modify, and distribute.
For issues and questions:
- GitHub Issues: https://github.com/REYIL/MultiEnvEmployer/issues
- Telegram: @REYIL_DEV