Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions .github/scripts/MatrixXx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import asyncio
import logging
from typing import Callable, Any, Dict, List
import nimpy
from clang import cindex
from langchain.llms import OpenAI
from langchain.chains import ConversationChain
from langchain.memory import InMemoryConversationMemory

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("AgenticMatrixLivingCodeAI")

# --- Living Environment ---
class LivingEnvironment:
def __init__(self):
self.state: Dict[str, Any] = {}
self.subscribers: List[Callable[[str, Any], asyncio.Future]] = []

def subscribe(self, callback: Callable[[str, Any], asyncio.Future]):
self.subscribers.append(callback)

async def update_state(self, key: str, value: Any):
logger.debug(f"[Env] Update: {key} -> {value}")
self.state[key] = value
await asyncio.gather(*[cb(key, value) for cb in self.subscribers])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Handle exceptions from subscriber callbacks to prevent one failure from affecting others.

Using return_exceptions=True with asyncio.gather or wrapping each callback in a try/except block will ensure all subscribers are called, even if one fails.


def get_state(self, key: str):
return self.state.get(key)

# --- Hybrid Agent with Living Code and Multi-Language Execution ---
class HybridAgent:
def __init__(self, name: str, env: LivingEnvironment, initial_prompt: str):
self.name = name
self.env = env
self.prompt = initial_prompt
self.memory = InMemoryConversationMemory()
self.llm = None
self.chain = None
self.llm_ready = False
try:
self.llm = OpenAI(temperature=0.3)
self.chain = ConversationChain(llm=self.llm, memory=self.memory)
self.llm_ready = True
except Exception as e:
logger.warning(f'[{self.name}] OpenAI LLM not available: {e}')
Comment on lines +44 to +45
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching Exception is too broad. It's better to catch specific exceptions. For OpenAI initialization, this could be ValueError (e.g., for a missing API key) or ImportError if a dependency is missing. This makes the error handling more precise and prevents masking unrelated errors.

Suggested change
except Exception as e:
logger.warning(f'[{self.name}] OpenAI LLM not available: {e}')
except (ValueError, ImportError) as e:
logger.warning(f'[{self.name}] OpenAI LLM not available: {e}')

self.running = True
self.env.subscribe(self.on_env_update)

self.clang_index = cindex.Index.create()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The creation of cindex.Index can fail if the libclang shared library is not found in the system's path, which would crash the agent during initialization. To make the agent more robust, this call should be wrapped in a try...except block, similar to how the nimpy module is handled. You can set a self.clang_ready flag and check it before attempting to parse C++ code.

try:
self.nim_agent = nimpy.import_module("nim_agent")
self.nim_ready = True
except Exception as e:
logger.warning(f"[{self.name}] Nim module not available: {e}")
Comment on lines +53 to +54
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using except Exception is too broad. For importing the Nim module, catching ImportError is more specific and safer, as it won't mask other unexpected runtime errors.

Suggested change
except Exception as e:
logger.warning(f"[{self.name}] Nim module not available: {e}")
except ImportError as e:
logger.warning(f"[{self.name}] Nim module not available: {e}")

self.nim_ready = False

async def on_env_update(self, key: str, value: Any):
if key == f"prompt_update_{self.name}":
logger.info(f"[{self.name}] Prompt update received")
self.prompt = value
if key == f"code_update_{self.name}":
Comment on lines +58 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The keys for state updates like f"prompt_update_{self.name}" and f"code_update_{self.name}" are constructed as 'magic strings'. This practice is error-prone, as a typo in either the publisher (HostAgent) or subscriber (HybridAgent) would cause silent failures. Consider defining these event key patterns as constants or helper functions to ensure consistency and improve maintainability.

logger.info(f"[{self.name}] Received C++ code update")
diag = self.parse_cpp_code(value)
for d in diag:
logger.warning(f"[{self.name}][Clang] {d.spelling}")

def parse_cpp_code(self, code: str):
tu = self.clang_index.parse("dynamic_code.cpp", args=['-std=c++17'], unsaved_files=[('dynamic_code.cpp', code)])
return list(tu.diagnostics)

async def execute_nim_logic(self, data: str):
if not self.nim_ready:
logger.warning(f"[{self.name}] Skipping Nim execution (module unavailable)")
return
result = self.nim_agent.process_buffer(data)
logger.debug(f"[{self.name}] Nim module result: {result}")

async def run(self):
iteration = 0
while self.running:
env_snapshot = str(self.env.state)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Converting the entire environment state to a string via str(self.env.state) is not scalable. As the state grows, this will create excessively large prompts, increasing LLM costs, latency, and the risk of exceeding the context window. It could also leak sensitive information into the prompt. A better approach would be to create a summary of the state or select only the most relevant key-value pairs for the current task.

full_prompt = f"{self.prompt}\nEnv State:\n{env_snapshot}\nIteration: {iteration}"
try:
llm_resp = self.chain.invoke(full_prompt)
logger.info(f"[{self.name}] LLM output: {llm_resp[:250]}...")
await self.env.update_state(f"status_{self.name}", llm_resp)
Comment on lines +84 to +86
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The invoke method on a ConversationChain returns a dictionary, not a string. Attempting to slice it directly with llm_resp[:250] will cause a TypeError. You need to extract the response string from the dictionary, which is typically under the 'response' key.

Suggested change
llm_resp = self.chain.invoke(full_prompt)
logger.info(f"[{self.name}] LLM output: {llm_resp[:250]}...")
await self.env.update_state(f"status_{self.name}", llm_resp)
llm_resp = self.chain.invoke(full_prompt).get("response", "")
logger.info(f"[{self.name}] LLM output: {llm_resp[:250]}...")
await self.env.update_state(f"status_{self.name}", llm_resp)

except Exception as e:
logger.error(f"[{self.name}] LLM call failed: {e}")
Comment on lines +87 to +88
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching the base Exception class is too broad and can hide underlying issues, making debugging difficult. It's better to catch more specific exceptions related to the LLM call (e.g., API errors, network issues). You may need to import openai to catch its specific exception types like openai.APIError or openai.RateLimitError. This allows for more robust error handling, such as implementing specific retry logic for certain errors.


# Example: dynamic C++ snippet (could be realtime updated)
cpp_code = """
int add(int a, int b) { return a + b; }
"""
self.parse_cpp_code(cpp_code)
Comment on lines +91 to +94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This static C++ code snippet is parsed on every iteration of the run loop, which is inefficient. Since the code does not change within the loop, this parsing operation should be moved outside of it, for example, to the __init__ method. This will prevent redundant work on each cycle.


# Nim logic invocation with complex data string
await self.execute_nim_logic(f"Nim data iteration {iteration} from {self.name}")

await asyncio.sleep(6)
iteration += 1

def stop(self):
logger.info(f"[{self.name}] Stopping execution.")
self.running = False

# --- Host AI Manager ---
class HostAgent:
def __init__(self):
self.env = LivingEnvironment()
self.agents: List[HybridAgent] = []
self.running = True

def register_agent(self, agent: HybridAgent):
logger.info(f"[Host] Registering agent {agent.name}")
self.agents.append(agent)

async def update_loop(self):
count = 0
while self.running and count < 8:
await asyncio.sleep(10)
for agent in self.agents:
prompt = f"Agent update cycle {count} for {agent.name}. Adapt behavior dynamically."
await self.env.update_state(f"prompt_update_{agent.name}", prompt)

# Optional dynamic new C++ source for compilation
new_cpp_code = f"int dynamic_func() {{ return {count} * 42; }}"
await self.env.update_state(f"code_update_{agent.name}", new_cpp_code)

count += 1

logger.info("[Host] All update cycles complete, stopping agents.")
for agent in self.agents:
agent.stop()

async def run(self):
tasks = [asyncio.create_task(agent.run()) for agent in self.agents]
updater = asyncio.create_task(self.update_loop())
await asyncio.gather(updater, *tasks)
self.running = False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line self.running = False appears to be redundant. The HostAgent's run method awaits gather, which will only complete after update_loop and all agent run loops have finished. The update_loop is what controls the lifecycle and stops the agents. Since this flag doesn't control any active loop for the HostAgent at this point, it can be removed to improve clarity.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line has no effect and can be removed. The update_loop task, which is the only consumer of the self.running flag, will have already completed by the time this line is reached because asyncio.gather waits for all tasks to finish. The agents are also stopped within update_loop before it finishes.


# --- Main Entrypoint ---
async def main():
host = HostAgent()
agents = [
HybridAgent("AlphaBot", host.env, "You are AlphaBot, leading agentic matrix coordination."),
HybridAgent("BetaBot", host.env, "You are BetaBot, specialist in adaptive code evolution."),
HybridAgent("GammaBot", host.env, "You are GammaBot, UI and UX genius mirroring living interfaces.")
]
for ag in agents:
host.register_agent(ag)
await host.run()

if __name__ == "__main__":
asyncio.run(main())
Loading