Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/runpod_flash/cli/commands/build_utils/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ def discover_remote_functions(self) -> List[RemoteFunctionMetadata]:
tree = ast.parse(content)
self._extract_resource_configs(tree, py_file)
except UnicodeDecodeError:
logger.debug(f"Skipping non-UTF-8 file: {py_file}")
pass
except SyntaxError as e:
logger.warning(f"Syntax error in {py_file}: {e}")
except Exception as e:
logger.debug(f"Failed to parse {py_file}: {e}")
except Exception:
pass

# Second pass: extract @remote decorated functions
for py_file in self.py_files:
Expand All @@ -87,11 +87,11 @@ def discover_remote_functions(self) -> List[RemoteFunctionMetadata]:
tree = ast.parse(content)
functions.extend(self._extract_remote_functions(tree, py_file))
except UnicodeDecodeError:
logger.debug(f"Skipping non-UTF-8 file: {py_file}")
pass
except SyntaxError as e:
logger.warning(f"Syntax error in {py_file}: {e}")
except Exception as e:
logger.debug(f"Failed to parse {py_file}: {e}")
except Exception:
pass

# Third pass: analyze function call graphs
remote_function_names = {f.function_name for f in functions}
Expand All @@ -115,11 +115,11 @@ def discover_remote_functions(self) -> List[RemoteFunctionMetadata]:
node, func_meta, remote_function_names
)
except UnicodeDecodeError:
logger.debug(f"Skipping non-UTF-8 file: {py_file}")
pass
except SyntaxError as e:
logger.warning(f"Syntax error in {py_file}: {e}")
except Exception as e:
logger.debug(f"Failed to parse {py_file}: {e}")
except Exception:
pass

return functions

Expand Down
1 change: 0 additions & 1 deletion src/runpod_flash/cli/utils/ignore.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def get_file_tree(
for item in directory.iterdir():
# Check if should ignore
if should_ignore(item, spec, base_dir):
log.debug(f"Ignoring: {item.relative_to(base_dir)}")
continue

if item.is_file():
Expand Down
15 changes: 0 additions & 15 deletions src/runpod_flash/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,16 @@ def _should_execute_locally(func_name: str) -> bool:
# Check if we're in a deployed environment
runpod_endpoint_id = os.getenv("RUNPOD_ENDPOINT_ID")
runpod_pod_id = os.getenv("RUNPOD_POD_ID")
flash_resource_name = os.getenv("FLASH_RESOURCE_NAME")

log.debug(
f"@remote decorator for {func_name}: "
f"RUNPOD_ENDPOINT_ID={runpod_endpoint_id}, "
f"FLASH_RESOURCE_NAME={flash_resource_name}"
)

if not runpod_endpoint_id and not runpod_pod_id:
# Local development - create stub for remote execution via ResourceManager
log.debug(f"@remote {func_name}: local dev mode, creating stub")
return False

# In deployed environment - check build-time generated configuration
try:
from .runtime._flash_resource_config import is_local_function

result = is_local_function(func_name)
log.debug(
f"@remote {func_name}: deployed mode, is_local_function returned {result}"
)
return result
except ImportError as e:
# Configuration not generated (shouldn't happen in deployed env)
Expand Down Expand Up @@ -186,14 +175,10 @@ def decorator(func_or_class):

if should_execute_local:
# This function belongs to our resource - execute locally
log.debug(
f"@remote {func_name}: returning original function (local execution)"
)
func_or_class.__remote_config__ = routing_config
return func_or_class

# Remote execution mode - create stub for calling other endpoints
log.debug(f"@remote {func_name}: creating wrapper for remote execution")

if inspect.isclass(func_or_class):
# Handle class decoration
Expand Down
34 changes: 14 additions & 20 deletions src/runpod_flash/core/api/runpod.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Bypasses the outdated runpod-python SDK limitations.
"""

import json
import json # noqa: F401 - used in commented debug logs
import logging
import os
from typing import Any, Dict, Optional, List
Expand Down Expand Up @@ -92,19 +92,19 @@ async def _execute_graphql(

payload = {"query": query, "variables": variables or {}}

log.debug(f"GraphQL Query: {query}")
sanitized_vars = _sanitize_for_logging(variables)
log.debug(f"GraphQL Variables: {json.dumps(sanitized_vars, indent=2)}")
# log.debug(f"GraphQL Query: {query}")
# sanitized_vars = _sanitize_for_logging(variables)
# log.debug(f"GraphQL Variables: {json.dumps(sanitized_vars, indent=2)}")

try:
async with session.post(self.GRAPHQL_URL, json=payload) as response:
response_data = await response.json()

log.debug(f"GraphQL Response Status: {response.status}")
sanitized_response = _sanitize_for_logging(response_data)
log.debug(
f"GraphQL Response: {json.dumps(sanitized_response, indent=2)}"
)
# log.debug(f"GraphQL Response Status: {response.status}")
# sanitized_response = _sanitize_for_logging(response_data)
# log.debug(
# f"GraphQL Response: {json.dumps(sanitized_response, indent=2)}"
# )

if response.status >= 400:
sanitized_err = _sanitize_for_logging(response_data)
Expand Down Expand Up @@ -156,7 +156,7 @@ async def update_template(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
raise Exception("Unexpected GraphQL response structure")

template_data = result["saveTemplate"]
log.info(
log.debug(
f"Updated template: {template_data.get('id', 'unknown')} - {template_data.get('name', 'unnamed')}"
)

Expand Down Expand Up @@ -354,8 +354,6 @@ async def finalize_artifact_upload(
"""
variables = {"input": input_data}

log.debug(f"finalizing upload for flash app: {input_data}")

result = await self._execute_graphql(mutation, variables)
return result["finalizeFlashArtifactUpload"]

Expand Down Expand Up @@ -407,7 +405,6 @@ async def get_flash_app_by_name(self, app_name: str) -> Dict[str, Any]:
"""
variables = {"flashAppName": app_name}

log.debug(f"Fetching flash app by name for input: {app_name}")
result = await self._execute_graphql(query, variables)
return result["flashAppByName"]

Expand Down Expand Up @@ -460,7 +457,6 @@ async def get_flash_environment_by_name(
"""
variables = {"input": input_data}

log.debug(f"Fetching flash environment by name for input: {variables}")
result = await self._execute_graphql(query, variables)

return result["flashEnvironmentByName"]
Expand Down Expand Up @@ -513,8 +509,6 @@ async def deploy_build_to_environment(

variables = {"input": input_data}

log.debug(f"Deploying flash environment with vars: {input_data}")

result = await self._execute_graphql(mutation, variables)
return result["deployBuildToEnvironment"]

Expand Down Expand Up @@ -834,15 +828,15 @@ async def _execute_rest(
"""Execute a REST API request."""
session = await self._get_session()

log.debug(f"REST Request: {method} {url}")
log.debug(f"REST Data: {json.dumps(data, indent=2) if data else 'None'}")
# log.debug(f"REST Request: {method} {url}")
# log.debug(f"REST Data: {json.dumps(data, indent=2) if data else 'None'}")

try:
async with session.request(method, url, json=data) as response:
response_data = await response.json()

log.debug(f"REST Response Status: {response.status}")
log.debug(f"REST Response: {json.dumps(response_data, indent=2)}")
# log.debug(f"REST Response Status: {response.status}")
# log.debug(f"REST Response: {json.dumps(response_data, indent=2)}")

if response.status >= 400:
raise Exception(
Expand Down
7 changes: 0 additions & 7 deletions src/runpod_flash/core/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ def discover(self) -> List[DeployableResource]:
resource = self._resolve_resource_variable(module, var_name)
if resource:
resources.append(resource)
log.debug(
f"Discovered resource: {var_name} -> {resource.__class__.__name__}"
)
else:
log.warning(f"Failed to import {self.entry_point}")

Expand Down Expand Up @@ -405,10 +402,6 @@ def _scan_project_directory(self) -> List[DeployableResource]:
resource = self._resolve_resource_variable(module, var_name)
if resource:
resources.append(resource)
log.debug(
f"Discovered resource in {file_path.relative_to(project_root)}: "
f"{var_name} -> {resource.__class__.__name__}"
)

except Exception as e:
log.debug(f"Failed to scan {file_path}: {e}")
Expand Down
2 changes: 0 additions & 2 deletions src/runpod_flash/core/resources/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,8 @@ async def _hydrate(self) -> None:
"""
async with self._hydrate_lock:
if self._hydrated:
log.debug("App is already hydrated while calling hydrate. Returning")
return

log.debug("Hydrating app")
async with RunpodGraphQLClient() as client:
try:
result = await client.get_flash_app_by_name(self.name)
Expand Down
33 changes: 1 addition & 32 deletions src/runpod_flash/core/resources/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ def _save_resources(self) -> None:
data = (self._resources, self._resource_configs)
cloudpickle.dump(data, f)
f.flush() # Ensure data is written to disk
log.debug(f"Saved resources in {RESOURCE_STATE_FILE}")
except (FileLockError, Exception) as e:
log.error(f"Failed to save resources to {RESOURCE_STATE_FILE}: {e}")
raise
Expand Down Expand Up @@ -224,15 +223,6 @@ async def get_or_deploy_resource(
resource_key = config.get_resource_key()
new_config_hash = config.config_hash

log.debug(
f"get_or_deploy_resource called:\n"
f" Config type: {type(config).__name__}\n"
f" Config name: {getattr(config, 'name', 'N/A')}\n"
f" Resource key: {resource_key}\n"
f" New config hash: {new_config_hash[:16]}...\n"
f" Available keys in cache: {list(self._resources.keys())}"
)

# Ensure global lock is initialized
assert ResourceManager._global_lock is not None, "Global lock not initialized"

Expand All @@ -247,7 +237,6 @@ async def get_or_deploy_resource(
existing = self._resources.get(resource_key)

if existing:
log.debug(f"Resource found in cache: {resource_key}")
# Resource exists - check if still valid
if not existing.is_deployed():
log.warning(f"{existing} is no longer valid, redeploying.")
Expand All @@ -273,21 +262,6 @@ async def get_or_deploy_resource(
stored_config_hash = self._resource_configs.get(resource_key, "")

if stored_config_hash != new_config_hash:
# Detailed drift debugging
log.debug(
f"DRIFT DEBUG for '{config.name}':\n"
f" Stored hash: {stored_config_hash}\n"
f" New hash: {new_config_hash}\n"
f" Stored resource type: {type(existing).__name__}\n"
f" New resource type: {type(config).__name__}\n"
f" Existing config fields: {existing.model_dump(exclude_none=True, exclude={'id'}) if hasattr(existing, 'model_dump') else 'N/A'}\n"
f" New config fields: {config.model_dump(exclude_none=True, exclude={'id'}) if hasattr(config, 'model_dump') else 'N/A'}"
)
log.debug(
f"Config drift detected for '{config.name}': "
f"Automatically updating endpoint"
)

# Attempt update (will redeploy if structural changes detected)
if hasattr(existing, "update"):
updated_resource = await existing.update(config)
Expand Down Expand Up @@ -318,15 +292,10 @@ async def get_or_deploy_resource(
raise

# Config unchanged, reuse existing
log.debug(f"{existing} exists, reusing (config unchanged)")
log.debug(f"URL: {existing.url}")
log.info(f"URL: {existing.url}")
return existing

# No existing resource, deploy new one
log.debug(
f"Resource NOT found in cache, deploying new: {resource_key}\n"
f" Searched in keys: {list(self._resources.keys())}"
)
try:
deployed_resource = await self._deploy_with_error_context(config)
log.debug(f"URL: {deployed_resource.url}")
Expand Down
15 changes: 4 additions & 11 deletions src/runpod_flash/core/resources/serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,14 +646,9 @@ async def update(self, new_config: "ServerlessResource") -> "ServerlessResource"

try:
resolved_template_id = self.templateId or new_config.templateId
# Log if version-triggering changes detected (informational only)
if self._has_structural_changes(new_config):
log.debug(
f"{self.name}: Version-triggering changes detected. "
"Server will increment version and recreate workers."
)
else:
log.debug(f"Updating endpoint '{self.name}' (ID: {self.id})")
# Check for version-triggering changes
if not self._has_structural_changes(new_config):
log.info(f"Updating endpoint '{self.name}' (ID: {self.id})")

# Ensure network volume is deployed if specified
await new_config._ensure_network_volume_deployed()
Expand All @@ -678,7 +673,7 @@ async def update(self, new_config: "ServerlessResource") -> "ServerlessResource"
new_config.template, resolved_template_id
)
await client.update_template(template_payload)
log.info(
log.debug(
f"Updated template '{resolved_template_id}' for endpoint '{self.name}'"
)
else:
Expand Down Expand Up @@ -752,11 +747,9 @@ def _has_structural_changes(self, new_config: "ServerlessResource") -> bool:
# Handle list comparison
if isinstance(old_val, list) and isinstance(new_val, list):
if sorted(str(v) for v in old_val) != sorted(str(v) for v in new_val):
log.debug(f"Structural change in '{field}': {old_val} → {new_val}")
return True
# Handle other types
elif old_val != new_val:
log.debug(f"Structural change in '{field}': {old_val} → {new_val}")
return True

return False
Expand Down
3 changes: 0 additions & 3 deletions src/runpod_flash/core/utils/file_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def file_lock(
_acquire_fallback_lock(file_handle, exclusive, timeout)

lock_acquired = True
log.debug(f"File lock acquired (exclusive={exclusive})")

except (OSError, IOError, FileLockError) as e:
# Check timeout
Expand All @@ -128,8 +127,6 @@ def file_lock(
else:
_release_fallback_lock(file_handle)

log.debug("File lock released")

except Exception as e:
log.error(f"Error releasing file lock: {e}")
# Don't raise - we're in cleanup
Expand Down
9 changes: 0 additions & 9 deletions src/runpod_flash/execute_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ def get_or_cache_class_data(
},
)

log.debug(f"Cached class data for {cls.__name__} with key: {cache_key}")

except (TypeError, AttributeError, OSError, SerializationError) as e:
log.warning(
f"Could not serialize constructor arguments for {cls.__name__}: {e}"
Expand All @@ -81,9 +79,6 @@ def get_or_cache_class_data(
else:
# Cache hit - retrieve cached data
cached_data = _SERIALIZED_CLASS_CACHE.get(cache_key)
log.debug(
f"Retrieved cached class data for {cls.__name__} with key: {cache_key}"
)
return cached_data["class_code"]


Expand Down Expand Up @@ -121,7 +116,6 @@ def extract_class_code_simple(cls: Type) -> str:
# Validate the code by trying to compile it
compile(class_code, "<string>", "exec")

log.debug(f"Successfully extracted class code for {cls.__name__}")
return class_code

except Exception as e:
Expand Down Expand Up @@ -182,7 +176,6 @@ def get_class_cache_key(
# Combine hashes for final cache key
cache_key = f"{cls.__name__}_{class_hash[:HASH_TRUNCATE_LENGTH]}_{args_hash[:HASH_TRUNCATE_LENGTH]}"

log.debug(f"Generated cache key for {cls.__name__}: {cache_key}")
return cache_key

except (TypeError, AttributeError, OSError) as e:
Expand Down Expand Up @@ -229,8 +222,6 @@ def __init__(self, *args, **kwargs):
cls, args, kwargs, self._cache_key
)

log.debug(f"Created remote class wrapper for {cls.__name__}")

async def _ensure_initialized(self):
"""Ensure the remote instance is created."""
if self._initialized:
Expand Down
Loading
Loading