diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f323f59..6c3483e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,10 +26,10 @@ jobs: run: uv sync --all-extras --dev shell: bash - name: Check code - run: uvx ruff check . + run: uv run ruff check . shell: bash - name: Format check - run: uvx ruff format --check . + run: uv run ruff format --check . shell: bash - name: Typecheck run: uv run mypy src/ tests/ diff --git a/pyproject.toml b/pyproject.toml index 3343074..8dd3248 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ authors = [ { name = "Gowtham A Rao", email = "gowtham.rao@coreason.ai" }, ] dependencies = [ - "coreason-manifest>=0.61.0", + "coreason-manifest>=0.61.1", "coreason-urn-authority>=0.11.1", "httpx>=0.28.1", "libcst>=1.8.6", @@ -81,6 +81,9 @@ target-version = "py314" select = ["E", "F", "B", "I", "UP", "SIM", "RUF", "ARG", "C4", "PT", "TCH", "FA", "PIE", "RET", "PERF", "FURB", "LOG", "N", "A", "S"] ignore = ["S101", "TC001", "TC002", "TC003", "UP037"] +[tool.ruff.lint.isort] +known-first-party = ["coreason_meta_engineering"] + [tool.mypy] python_version = "3.14" strict = true diff --git a/src/coreason_meta_engineering/ast/__init__.py b/src/coreason_meta_engineering/ast/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/coreason_meta_engineering/ast/actuator_scaffold.py b/src/coreason_meta_engineering/ast/actuator_scaffold.py deleted file mode 100644 index 65c083f..0000000 --- a/src/coreason_meta_engineering/ast/actuator_scaffold.py +++ /dev/null @@ -1,257 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: [https://github.com/CoReason-AI/coreason_meta_engineering](https://github.com/CoReason-AI/coreason_meta_engineering) -import textwrap -import typing - -import libcst as cst - - -class LogicInjectionFunctor(cst.CSTTransformer): # type: ignore[misc] - """ - A decoupled libcst transformer that injects a newly defined function bounded by - the @mcp.tool() decorator. - """ - - def __init__( - self, - actuator_name: str, - geometric_schema: list[dict[str, typing.Any]], - return_type: str, - action_space_id: str, - required_imports: list[str] | None = None, - logic_body: str | None = None, - agent_instruction: str | None = None, - causal_affordance: str | None = None, - epistemic_bounds: str | None = None, - ): - super().__init__() - self.actuator_name = actuator_name - self.geometric_schema = geometric_schema - self.return_type = return_type - self.action_space_id = action_space_id - self.required_imports = required_imports or [] - self.logic_body = logic_body - - ai = agent_instruction or "[Define topological boundary]" - ca = causal_affordance or "[Define graph mutation or tool execution]" - eb = epistemic_bounds or "[Define mathematical/physical limits]" - - self.docstring = textwrap.dedent( - f'''\ - """ - AGENT INSTRUCTION: {ai} - - CAUSAL AFFORDANCE: {ca} - - EPISTEMIC BOUNDS: {eb} - - MCP ROUTING TRIGGERS: {self.action_space_id} - """''' - ) - - def _build_docstring(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine(body=[cst.Expr(value=cst.SimpleString(value=self.docstring))]) - - def _build_parameter(self, param_def: dict[str, typing.Any]) -> cst.Param: - # Schema provides name and type - name = param_def["name"] - field_type_str = param_def["type"] - type_node = cst.parse_expression(field_type_str) - return cst.Param(name=cst.Name(value=name), annotation=cst.Annotation(annotation=type_node)) - - def _build_urn_attribute(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.Assign( - targets=[ - cst.AssignTarget( - target=cst.Attribute( - value=cst.Name(value=self.actuator_name), - attr=cst.Name(value="__action_space_urn__"), - ) - ) - ], - value=cst.SimpleString(value=f'"{self.action_space_id}"'), - ) - ] - ) - - def _extract_logic_body_parts(self) -> tuple[list[cst.BaseStatement], list[cst.Param] | None]: - """Parse logic_body and extract inlined statements + function params. - - Returns a tuple of (body_statements, params_or_none). - If logic_body contains a function def, extracts its body and params. - If logic_body is raw statements, returns them directly with no params. - """ - if not self.logic_body: - return [], None - - try: - parsed = cst.parse_module(textwrap.dedent(self.logic_body)) - except Exception: - return [], None - - for stmt in parsed.body: - if isinstance(stmt, cst.FunctionDef): - # Extract body statements from the inner function (inline them) - # FIX: Added quotes around the type expression for typing.cast - inner_body = typing.cast("list[cst.BaseStatement]", list(stmt.body.body)) - # Extract params from the inner function signature - inner_params = list(stmt.params.params) - return inner_body, inner_params - - # logic_body is raw statements, not a function def - return list(parsed.body), None - - def _build_function(self) -> cst.FunctionDef: - body_elements: list[cst.BaseStatement] = [] - body_elements.append(self._build_docstring()) - - # Cascading priority for function body: - # 1. logic_body statements (inlined, not nested) - # 2. Pass() fallback - logic_stmts, logic_params = self._extract_logic_body_parts() - if logic_stmts: - body_elements.extend(logic_stmts) - else: - body_elements.append(cst.SimpleStatementLine(body=[cst.Pass()])) - - # Cascading priority for parameters: - # 1. logic_body function signature (ground truth from LLM code) - # 2. geometric_schema (JSON Schema properties) - # 3. empty params - if logic_params is not None: - params = logic_params - elif self.geometric_schema: - params = [self._build_parameter(p) for p in self.geometric_schema] - else: - params = [] - - return cst.FunctionDef( - name=cst.Name(value=self.actuator_name), - params=cst.Parameters(params=params), - body=cst.IndentedBlock(body=body_elements), - decorators=[ - cst.Decorator( - decorator=cst.Call( - func=cst.Attribute(value=cst.Name(value="mcp"), attr=cst.Name(value="tool")), - args=[], - ) - ) - ], - returns=cst.Annotation(annotation=cst.parse_expression(self.return_type)) if self.return_type else None, - ) - - def leave_Module(self, original_node: cst.Module, updated_node: cst.Module) -> cst.Module: # noqa: N802, ARG002 - # Idempotency Axiom: If the function already exists, halt injection entirely. - for stmt in updated_node.body: - if isinstance(stmt, cst.FunctionDef) and stmt.name.value == self.actuator_name: - return updated_node - - new_function = self._build_function() - - needs_any = any("Any" in p.get("type", "") for p in self.geometric_schema) or ( - self.return_type and "Any" in self.return_type - ) - needs_annotated = any("Annotated" in p.get("type", "") for p in self.geometric_schema) or ( - self.return_type and "Annotated" in self.return_type - ) - needs_stringconfig = any("StringConstraints" in p.get("type", "") for p in self.geometric_schema) - - has_mcp_import = False - has_any_import = False - has_annotated_import = False - has_stringconstraints_import = False - insert_import_idx = 0 - - new_body = list(updated_node.body) - - for stmt in new_body: - if isinstance(stmt, cst.SimpleStatementLine): - for node in stmt.body: - if isinstance(node, cst.ImportFrom) and isinstance(node.names, (tuple, list)): - mod_name = "" - if getattr(node, "module", None) is not None and isinstance(node.module, cst.Name): - mod_name = node.module.value - - for name_item in node.names: - val = name_item.name.value - if val == "mcp": - has_mcp_import = True - if val == "Any" and mod_name == "typing": - has_any_import = True - if val == "Annotated" and mod_name == "typing": - has_annotated_import = True - if val == "StringConstraints" and mod_name == "pydantic": - has_stringconstraints_import = True - - for i, stmt in enumerate(new_body): - if isinstance(stmt, cst.SimpleStatementLine) and any( - isinstance(n, (cst.Import, cst.ImportFrom)) for n in stmt.body - ): - insert_import_idx = i + 1 - - missing_typing = [] - if needs_any and not has_any_import: - missing_typing.append("Any") - if needs_annotated and not has_annotated_import: - missing_typing.append("Annotated") - - if missing_typing: - typing_import = cst.SimpleStatementLine( - body=[ - cst.ImportFrom( - module=cst.Name(value="typing"), - names=[cst.ImportAlias(name=cst.Name(value=m)) for m in missing_typing], - ) - ] - ) - new_body.insert(insert_import_idx, typing_import) - insert_import_idx += 1 - - if needs_stringconfig and not has_stringconstraints_import: - pydantic_import = cst.SimpleStatementLine( - body=[ - cst.ImportFrom( - module=cst.Name(value="pydantic"), - names=[cst.ImportAlias(name=cst.Name(value="StringConstraints"))], - ) - ] - ) - new_body.insert(insert_import_idx, pydantic_import) - insert_import_idx += 1 - - for imp_str in self.required_imports: - try: - parsed_stmt = cst.parse_statement(imp_str) - new_body.insert(insert_import_idx, parsed_stmt) - insert_import_idx += 1 - except cst.ParserSyntaxError: - pass - - if not has_mcp_import: - # We want: from mcp.server.fastmcp import mcp - mcp_import = cst.SimpleStatementLine( - body=[ - cst.ImportFrom( - module=cst.Attribute( - value=cst.Attribute(value=cst.Name(value="mcp"), attr=cst.Name(value="server")), - attr=cst.Name(value="fastmcp"), - ), - names=[cst.ImportAlias(name=cst.Name(value="mcp"))], - ) - ] - ) - new_body.insert(insert_import_idx, mcp_import) - - new_body.append(new_function) - new_body.append(self._build_urn_attribute()) - - return updated_node.with_changes(body=new_body) diff --git a/src/coreason_meta_engineering/ast/kubernetes_crd_scaffold.py b/src/coreason_meta_engineering/ast/kubernetes_crd_scaffold.py deleted file mode 100644 index 1ac72fa..0000000 --- a/src/coreason_meta_engineering/ast/kubernetes_crd_scaffold.py +++ /dev/null @@ -1,341 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: [https://github.com/CoReason-AI/coreason_meta_engineering](https://github.com/CoReason-AI/coreason_meta_engineering) -import textwrap -import typing - -import libcst as cst - - -class KubernetesCRDInjectionFunctor(cst.CSTTransformer): # type: ignore[misc] - """ - A decoupled libcst transformer that injects a newly defined class and its - accompanying model_rebuild() call into a given Python module AST. - """ - - def __init__( - self, - crd_name: str, - geometric_schema: list[dict[str, typing.Any]], - action_space_id: str, - api_group: str = "chaos-mesh.org", - api_version: str = "v1alpha1", - kind: str = "NetworkChaos", - ): - super().__init__() - self.crd_name = crd_name - self.geometric_schema = geometric_schema - self.action_space_id = action_space_id - self.api_group = api_group - self.api_version = api_version - self.kind = kind - - self.docstring = textwrap.dedent( - f'''\ - """ - AGENT INSTRUCTION: [Define topological boundary] - - CAUSAL AFFORDANCE: [Define graph mutation or tool execution] - - EPISTEMIC BOUNDS: [Define mathematical/physical limits] - - MCP ROUTING TRIGGERS: {self.action_space_id} - """''' - ) - - def _build_docstring(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine(body=[cst.Expr(value=cst.SimpleString(value=self.docstring))]) - - def _build_field(self, field_def: dict[str, typing.Any]) -> cst.SimpleStatementLine: - name = field_def["name"] - field_type = field_def["type"] - description = field_def["description"] - is_optional = field_def.get("optional", False) - - # Parse type annotation. We need to parse this string to CST nodes. - type_node = cst.parse_expression(field_type) - - field_args = [] - if is_optional: - field_args.append( - cst.Arg( - value=cst.Name(value="None"), - keyword=cst.Name(value="default"), - equal=cst.AssignEqual( - whitespace_before=cst.SimpleWhitespace(""), - whitespace_after=cst.SimpleWhitespace(""), - ), - ) - ) - - field_args.append( - cst.Arg( - value=cst.SimpleString(value=f'"{description}"'), - keyword=cst.Name(value="description"), - equal=cst.AssignEqual( - whitespace_before=cst.SimpleWhitespace(""), - whitespace_after=cst.SimpleWhitespace(""), - ), - ) - ) - - return cst.SimpleStatementLine( - body=[ - cst.AnnAssign( - target=cst.Name(value=name), - annotation=cst.Annotation(annotation=type_node), - value=cst.Call( - func=cst.Name(value="Field"), - args=field_args, - ), - ) - ] - ) - - def _build_validator(self, list_fields: list[str]) -> cst.FunctionDef: - body_lines: list[cst.BaseStatement] = [ - cst.If( - test=cst.Comparison( - left=cst.Call( - func=cst.Name(value="getattr"), - args=[ - cst.Arg(value=cst.Name(value="self")), - cst.Arg(value=cst.SimpleString(value=f'"{field}"')), - cst.Arg(value=cst.Name(value="None")), - ], - ), - comparisons=[ - cst.ComparisonTarget( - operator=cst.IsNot(), - comparator=cst.Name(value="None"), - ), - ], - ), - body=cst.IndentedBlock( - body=[ - cst.SimpleStatementLine( - body=[ - cst.Expr( - value=cst.Call( - func=cst.Attribute( - value=cst.Name(value="object"), attr=cst.Name(value="__setattr__") - ), - args=[ - cst.Arg(value=cst.Name(value="self")), - cst.Arg(value=cst.SimpleString(value=f'"{field}"')), - cst.Arg( - value=cst.Call( - func=cst.Name(value="sorted"), - args=[ - cst.Arg( - value=cst.Attribute( - value=cst.Name(value="self"), attr=cst.Name(value=field) - ) - ) - ], - ) - ), - ], - ) - ) - ] - ) - ] - ), - ) - for field in list_fields - ] - - body_lines.append(cst.SimpleStatementLine(body=[cst.Return(value=cst.Name(value="self"))])) - - return cst.FunctionDef( - name=cst.Name(value="_enforce_canonical_sort"), - params=cst.Parameters(params=[cst.Param(name=cst.Name(value="self"))]), - body=cst.IndentedBlock(body=body_lines), - decorators=[ - cst.Decorator( - decorator=cst.Call( - func=cst.Name(value="model_validator"), - args=[ - cst.Arg( - value=cst.SimpleString(value='"after"'), - keyword=cst.Name(value="mode"), - equal=cst.AssignEqual( - whitespace_before=cst.SimpleWhitespace(""), - whitespace_after=cst.SimpleWhitespace(""), - ), - ) - ], - ) - ) - ], - returns=cst.Annotation(annotation=cst.Name(value="Self")), - ) - - def _build_urn_attribute(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.Assign( - targets=[cst.AssignTarget(target=cst.Name(value="__action_space_urn__"))], - value=cst.SimpleString(value=f'"{self.action_space_id}"'), - ) - ] - ) - - def _build_crd_attributes(self) -> list[cst.SimpleStatementLine]: - return [ - self._build_urn_attribute(), - cst.SimpleStatementLine( - body=[ - cst.Assign( - targets=[cst.AssignTarget(target=cst.Name(value="api_group"))], - value=cst.SimpleString(value=f'"{self.api_group}"'), - ) - ] - ), - cst.SimpleStatementLine( - body=[ - cst.Assign( - targets=[cst.AssignTarget(target=cst.Name(value="api_version"))], - value=cst.SimpleString(value=f'"{self.api_version}"'), - ) - ] - ), - cst.SimpleStatementLine( - body=[ - cst.Assign( - targets=[cst.AssignTarget(target=cst.Name(value="kind"))], - value=cst.SimpleString(value=f'"{self.kind}"'), - ) - ] - ), - ] - - def _build_class(self) -> cst.ClassDef: - body_elements: list[cst.BaseStatement] = [] - body_elements.append(self._build_docstring()) - body_elements.extend(self._build_crd_attributes()) - - list_fields = [] - for f in self.geometric_schema: - body_elements.append(self._build_field(f)) - if "list[" in f["type"]: - list_fields.append(f["name"]) - - if list_fields: - # Need to append empty line manually or just function def - body_elements.append(self._build_validator(list_fields)) - - return cst.ClassDef( - name=cst.Name(value=self.crd_name), - bases=[cst.Arg(value=cst.Name(value="KubernetesCRDBase"))], - body=cst.IndentedBlock(body=body_elements), - ) - - def _build_rebuild_call(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.Expr( - value=cst.Call( - func=cst.Attribute(value=cst.Name(value=self.crd_name), attr=cst.Name(value="model_rebuild")) - ) - ) - ] - ) - - def leave_Module(self, original_node: cst.Module, updated_node: cst.Module) -> cst.Module: # noqa: N802, ARG002 - # Idempotency Axiom: If the class already exists, halt injection entirely. - for stmt in updated_node.body: - if isinstance(stmt, cst.ClassDef) and stmt.name.value == self.crd_name: - return updated_node - - new_class = self._build_class() - new_rebuild = self._build_rebuild_call() - - # Gather required imports based on schema fields - needs_self = any("list[" in f["type"] for f in self.geometric_schema) - needs_any = any("Any" in f["type"] for f in self.geometric_schema) - needs_annotated = any("Annotated" in f["type"] for f in self.geometric_schema) - needs_string_constraints = any("StringConstraints" in f["type"] for f in self.geometric_schema) - - has_self_import = False - has_any_import = False - has_annotated_import = False - has_field_import = False - has_model_validator_import = False - has_string_constraints_import = False - - for stmt in updated_node.body: - if isinstance(stmt, cst.SimpleStatementLine): - for node in stmt.body: - if isinstance(node, cst.ImportFrom) and node.module and isinstance(node.names, (tuple, list)): - mod_name = getattr(node.module, "value", None) - for name_item in node.names: - if mod_name == "typing": - if name_item.name.value == "Self": - has_self_import = True - if name_item.name.value == "Any": - has_any_import = True - if name_item.name.value == "Annotated": - has_annotated_import = True - elif mod_name == "pydantic": - if name_item.name.value == "Field": - has_field_import = True - if name_item.name.value == "model_validator": - has_model_validator_import = True - if name_item.name.value == "StringConstraints": - has_string_constraints_import = True - - new_body = list(updated_node.body) - - insert_import_idx = 0 - for i, stmt in enumerate(new_body): - if isinstance(stmt, cst.SimpleStatementLine) and any( - isinstance(n, (cst.Import, cst.ImportFrom)) for n in stmt.body - ): - insert_import_idx = i + 1 - break - - typing_imports = [] - if needs_self and not has_self_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Self"))) - if needs_any and not has_any_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Any"))) - if needs_annotated and not has_annotated_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Annotated"))) - - if typing_imports: - new_body.insert( - insert_import_idx, - cst.SimpleStatementLine(body=[cst.ImportFrom(module=cst.Name(value="typing"), names=typing_imports)]), - ) - insert_import_idx += 1 - - pydantic_imports = [] - if not has_field_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="Field"))) - if needs_self and not has_model_validator_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="model_validator"))) - if needs_string_constraints and not has_string_constraints_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="StringConstraints"))) - - if pydantic_imports: - new_body.insert( - insert_import_idx, - cst.SimpleStatementLine( - body=[cst.ImportFrom(module=cst.Name(value="pydantic"), names=pydantic_imports)] - ), - ) - insert_import_idx += 1 - - # Agent Instruction: Law 3 demands .model_rebuild() goes at the end. - new_body.append(new_class) - new_body.append(new_rebuild) - - return updated_node.with_changes(body=new_body) diff --git a/src/coreason_meta_engineering/ast/node_scaffold.py b/src/coreason_meta_engineering/ast/node_scaffold.py deleted file mode 100644 index f56d97d..0000000 --- a/src/coreason_meta_engineering/ast/node_scaffold.py +++ /dev/null @@ -1,315 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -import datetime -import textwrap - -import libcst as cst - - -def strip_existing_headers_and_apply_proprietary(code: str, commercial_owner: str | None, license_hash: str) -> str: - module = cst.parse_module(code) - - # Remove existing comments that match prosperity hash or keywords - filtered_header = [] - for line in module.header: - if ( - isinstance(line, cst.EmptyLine) - and line.comment - and ("Prosperity" in line.comment.value or "CoReason" in line.comment.value) - ): - continue - filtered_header.append(line) - - year = datetime.datetime.now().year - owner = commercial_owner or "Unknown" - new_header = [ - cst.EmptyLine(comment=cst.Comment(value=f"# Copyright (c) {year} {owner}. All rights reserved.")), - cst.EmptyLine(comment=cst.Comment(value=f"# Licensed under {license_hash}")), - cst.EmptyLine(comment=cst.Comment(value="# Authorized via CoReason Epistemic Ledger Token.")), - ] - - return module.with_changes(header=tuple(new_header) + tuple(filtered_header)).code - - -def apply_prosperity_headers(code: str) -> str: - module = cst.parse_module(code) - - # Check if header already exists - for line in module.header: - if isinstance(line, cst.EmptyLine) and line.comment and "Prosperity Public License 3.0" in line.comment.value: - return code - - year = datetime.datetime.now().year - new_header = [ - cst.EmptyLine(comment=cst.Comment(value=f"# Copyright (c) {year} CoReason, Inc.. All Rights Reserved")), - cst.EmptyLine(comment=cst.Comment(value="#")), - cst.EmptyLine( - comment=cst.Comment(value="# This software is licensed under the Prosperity Public License 3.0.0.") - ), - cst.EmptyLine( - comment=cst.Comment( - value="# A copy of the license is available at https://prosperitylicense.com/versions/3.0.0" - ) - ), - cst.EmptyLine(comment=cst.Comment(value="# For details, see the LICENSE file.")), - cst.EmptyLine( - comment=cst.Comment( - value="# The issuer of the Prosperity Public License for this software is CoReason, Inc.." - ) - ), - cst.EmptyLine(comment=cst.Comment(value="#")), - cst.EmptyLine( - comment=cst.Comment( - value="# Content created is contributed to the CoReason codebase under the terms of Prosperity 3.0." - ) - ), - cst.EmptyLine(comment=cst.Comment(value="#")), - cst.EmptyLine( - comment=cst.Comment( - value="# For a commercial version of this software, please contact us at license@coreason.ai." - ) - ), - ] - - return module.with_changes(header=tuple(new_header) + tuple(module.header)).code - - -class EpistemicNodeInjectionFunctor(cst.CSTTransformer): # type: ignore[misc] - """ - A decoupled libcst transformer that injects a newly defined Swarm Agent class - into a given Python module AST. - """ - - def __init__( - self, - node_name: str, - cognitive_boundary_directive: str, - action_space_id: str, - base_class: str = "CoreasonBaseAgent", - ): - super().__init__() - self.node_name = node_name - self.cognitive_boundary_directive = cognitive_boundary_directive - self.action_space_id = action_space_id - self.base_class = base_class - - self.docstring = textwrap.dedent( - f'''\ - """ - AGENT INSTRUCTION: [Define topological boundary] - - CAUSAL AFFORDANCE: [Define graph mutation or tool execution] - - EPISTEMIC BOUNDS: [Define mathematical/physical limits] - - MCP ROUTING TRIGGERS: {self.action_space_id} - """''' - ) - - def _build_docstring(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine(body=[cst.Expr(value=cst.SimpleString(value=self.docstring))]) - - def _build_urn_attribute(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.Assign( - targets=[cst.AssignTarget(target=cst.Name(value="__action_space_urn__"))], - value=cst.SimpleString(value=f'"{self.action_space_id}"'), - ) - ] - ) - - def _build_system_prompt_attribute(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.AnnAssign( - target=cst.Name(value="system_prompt"), - annotation=cst.Annotation(annotation=cst.Name(value="str")), - value=cst.SimpleString(value=f'"""{self.cognitive_boundary_directive}"""'), - ) - ] - ) - - def _build_authorized_tools(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.AnnAssign( - target=cst.Name(value="authorized_tools"), - annotation=cst.Annotation( - annotation=cst.Subscript( - value=cst.Name(value="list"), - slice=[cst.SubscriptElement(slice=cst.Index(value=cst.Name(value="Any")))], - ) - ), - value=cst.List(elements=[]), - ) - ] - ) - - def _build_canonical_sort(self) -> cst.FunctionDef: - return cst.FunctionDef( - name=cst.Name(value="_enforce_canonical_sort"), - params=cst.Parameters(params=[cst.Param(name=cst.Name(value="self"))]), - body=cst.IndentedBlock( - body=[ - cst.SimpleStatementLine( - body=[ - cst.Expr( - value=cst.Call( - func=cst.Attribute( - value=cst.Name(value="object"), attr=cst.Name(value="__setattr__") - ), - args=[ - cst.Arg(value=cst.Name(value="self")), - cst.Arg(value=cst.SimpleString(value='"authorized_tools"')), - cst.Arg( - value=cst.Call( - func=cst.Name(value="sorted"), - args=[ - cst.Arg( - value=cst.Attribute( - value=cst.Name(value="self"), - attr=cst.Name(value="authorized_tools"), - ) - ) - ], - ) - ), - ], - ) - ) - ] - ), - cst.SimpleStatementLine(body=[cst.Return(value=cst.Name(value="self"))]), - ] - ), - decorators=[ - cst.Decorator( - decorator=cst.Call( - func=cst.Name(value="model_validator"), - args=[ - cst.Arg( - value=cst.SimpleString(value='"after"'), - keyword=cst.Name(value="mode"), - equal=cst.AssignEqual(), - ) - ], - ) - ) - ], - returns=cst.Annotation(annotation=cst.Name(value="Self")), - ) - - def _build_model_rebuild(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.Expr( - value=cst.Call( - func=cst.Attribute( - value=cst.Name(value=self.node_name), - attr=cst.Name(value="model_rebuild"), - ), - args=[], - ) - ) - ] - ) - - def _build_class(self) -> cst.ClassDef: - body_elements: list[cst.BaseStatement] = [] - body_elements.append(self._build_docstring()) - body_elements.append(self._build_urn_attribute()) - body_elements.append(self._build_system_prompt_attribute()) - body_elements.append(self._build_authorized_tools()) - body_elements.append(self._build_canonical_sort()) - - return cst.ClassDef( - name=cst.Name(value=self.node_name), - bases=[cst.Arg(value=cst.Name(value=self.base_class))], - body=cst.IndentedBlock(body=body_elements), - ) - - def leave_Module(self, original_node: cst.Module, updated_node: cst.Module) -> cst.Module: # noqa: N802, ARG002 - # Idempotency Axiom: If the class already exists, halt injection entirely. - for stmt in updated_node.body: - if isinstance(stmt, cst.ClassDef) and stmt.name.value == self.node_name: - return updated_node - - new_class = self._build_class() - - has_any_import = False - insert_import_idx = 0 - - new_body = list(updated_node.body) - - has_typing_self = False - has_pydantic_validator = False - - for stmt in new_body: - if isinstance(stmt, cst.SimpleStatementLine): - for node in stmt.body: - if ( - isinstance(node, cst.ImportFrom) - and getattr(node.module, "value", None) == "typing" - and isinstance(node.names, (tuple, list)) - ): - for name_item in node.names: - if name_item.name.value == "Any": - has_any_import = True - if name_item.name.value == "Self": - has_typing_self = True - - if ( - isinstance(node, cst.ImportFrom) - and getattr(node.module, "value", None) == "pydantic" - and isinstance(node.names, (tuple, list)) - ): - for name_item in node.names: - if name_item.name.value == "model_validator": - has_pydantic_validator = True - - for i, stmt in enumerate(new_body): - if isinstance(stmt, cst.SimpleStatementLine) and any( - isinstance(n, (cst.Import, cst.ImportFrom)) for n in stmt.body - ): - insert_import_idx = i + 1 - - # Combine typing imports - missing_typing = [] - if not has_any_import: - missing_typing.append("Any") - if not has_typing_self: - missing_typing.append("Self") - - if missing_typing: - typing_import = cst.SimpleStatementLine( - body=[ - cst.ImportFrom( - module=cst.Name(value="typing"), - names=[cst.ImportAlias(name=cst.Name(value=m)) for m in missing_typing], - ) - ] - ) - new_body.insert(insert_import_idx, typing_import) - - if not has_pydantic_validator: - pydantic_import = cst.SimpleStatementLine( - body=[ - cst.ImportFrom( - module=cst.Name(value="pydantic"), - names=[cst.ImportAlias(name=cst.Name(value="model_validator"))], - ) - ] - ) - new_body.insert(insert_import_idx, pydantic_import) - - new_body.append(new_class) - new_body.append(self._build_model_rebuild()) - - return updated_node.with_changes(body=new_body) diff --git a/src/coreason_meta_engineering/ast/state_reconciliation.py b/src/coreason_meta_engineering/ast/state_reconciliation.py deleted file mode 100644 index ce070f4..0000000 --- a/src/coreason_meta_engineering/ast/state_reconciliation.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at https://prosperitylicense.com/versions/3.0.0 -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: https://github.com/CoReason-AI/coreason_meta_engineering - -import typing - -import libcst as cst - -from coreason_meta_engineering.ast.state_scaffold import StateInjectionFunctor - - -class StateReconciliationFunctor(StateInjectionFunctor): # type: ignore[misc] - """ - A libcst transformer that updates an existing Pydantic state class by replacing its fields - with a new geometric schema, while preserving the docstring and __action_space_urn__. - """ - - def __init__( - self, - state_name: str, - geometric_schema: list[dict[str, typing.Any]], - action_space_id: str, - base_class: str = "CoreasonBaseState", - ): - super().__init__( - state_name=state_name, - geometric_schema=geometric_schema, - action_space_id=action_space_id, - base_class=base_class, - ) - self.class_found = False - - def leave_ClassDef(self, original_node: cst.ClassDef, updated_node: cst.ClassDef) -> cst.ClassDef: # noqa: N802 - if updated_node.name.value == self.state_name: - self.class_found = True - docstring_node = None - urn_node = None - - for stmt in original_node.body.body: - if isinstance(stmt, cst.SimpleStatementLine): - for node in stmt.body: - if isinstance(node, cst.Expr) and isinstance(node.value, cst.SimpleString): - if not docstring_node: - docstring_node = stmt - elif ( - isinstance(node, cst.Assign) - and len(node.targets) == 1 - and isinstance(node.targets[0].target, cst.Name) - and node.targets[0].target.value == "__action_space_urn__" - ): - urn_node = stmt - - new_body_elements: list[cst.BaseStatement] = [] - if docstring_node: - new_body_elements.append(docstring_node) - else: - new_body_elements.append(self._build_docstring()) - - if urn_node: - new_body_elements.append(urn_node) - else: - new_body_elements.append(self._build_urn_attribute()) - - list_fields = [] - for f in self.geometric_schema: - new_body_elements.append(self._build_field(f)) - if "list[" in f["type"]: - list_fields.append(f["name"]) - - if list_fields: - new_body_elements.append(self._build_validator(list_fields)) - - return updated_node.with_changes(body=cst.IndentedBlock(body=new_body_elements)) - return updated_node - - def leave_Module(self, original_node: cst.Module, updated_node: cst.Module) -> cst.Module: # noqa: N802, ARG002 - # Gather required imports based on schema fields - needs_self = any("list[" in f["type"] for f in self.geometric_schema) - needs_any = any("Any" in f["type"] for f in self.geometric_schema) - needs_annotated = any("Annotated" in f["type"] for f in self.geometric_schema) - needs_string_constraints = any("StringConstraints" in f["type"] for f in self.geometric_schema) - - has_self_import = False - has_any_import = False - has_annotated_import = False - has_field_import = False - has_model_validator_import = False - has_string_constraints_import = False - - for stmt in updated_node.body: - if isinstance(stmt, cst.SimpleStatementLine): - for node in stmt.body: - if isinstance(node, cst.ImportFrom) and node.module and isinstance(node.names, (tuple, list)): - mod_name = getattr(node.module, "value", None) - for name_item in node.names: - if mod_name == "typing": - if name_item.name.value == "Self": - has_self_import = True - elif name_item.name.value == "Any": - has_any_import = True - elif name_item.name.value == "Annotated": - has_annotated_import = True - elif mod_name == "pydantic": - if name_item.name.value == "Field": - has_field_import = True - elif name_item.name.value == "model_validator": - has_model_validator_import = True - elif name_item.name.value == "StringConstraints": - has_string_constraints_import = True - - new_body = list(updated_node.body) - - insert_import_idx = 0 - for i, stmt in enumerate(new_body): - if isinstance(stmt, cst.SimpleStatementLine) and any( - isinstance(n, (cst.Import, cst.ImportFrom)) for n in stmt.body - ): - insert_import_idx = i + 1 - - typing_imports = [] - if needs_self and not has_self_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Self"))) - if needs_any and not has_any_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Any"))) - if needs_annotated and not has_annotated_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Annotated"))) - - if typing_imports: - new_body.insert( - insert_import_idx, - cst.SimpleStatementLine(body=[cst.ImportFrom(module=cst.Name(value="typing"), names=typing_imports)]), - ) - insert_import_idx += 1 - - pydantic_imports = [] - if not has_field_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="Field"))) - if needs_self and not has_model_validator_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="model_validator"))) - if needs_string_constraints and not has_string_constraints_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="StringConstraints"))) - - if pydantic_imports: - new_body.insert( - insert_import_idx, - cst.SimpleStatementLine( - body=[cst.ImportFrom(module=cst.Name(value="pydantic"), names=pydantic_imports)] - ), - ) - insert_import_idx += 1 - - # Only append new_class if the class was not found (fallback to scaffolding) - if not self.class_found: - new_class = self._build_class() - new_rebuild = self._build_rebuild_call() - new_body.append(new_class) - new_body.append(new_rebuild) - - return updated_node.with_changes(body=new_body) diff --git a/src/coreason_meta_engineering/ast/state_scaffold.py b/src/coreason_meta_engineering/ast/state_scaffold.py deleted file mode 100644 index 5675a45..0000000 --- a/src/coreason_meta_engineering/ast/state_scaffold.py +++ /dev/null @@ -1,308 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: [https://github.com/CoReason-AI/coreason_meta_engineering](https://github.com/CoReason-AI/coreason_meta_engineering) -import textwrap -import typing - -import libcst as cst - - -class StateInjectionFunctor(cst.CSTTransformer): # type: ignore[misc] - """ - A decoupled libcst transformer that injects a newly defined class and its - accompanying model_rebuild() call into a given Python module AST. - """ - - def __init__( - self, - state_name: str, - geometric_schema: list[dict[str, typing.Any]], - action_space_id: str, - base_class: str = "CoreasonBaseState", - ): - super().__init__() - self.state_name = state_name - self.geometric_schema = geometric_schema - self.action_space_id = action_space_id - self.base_class = base_class - - self.docstring = textwrap.dedent( - f'''\ - """ - AGENT INSTRUCTION: [Define topological boundary] - - CAUSAL AFFORDANCE: [Define graph mutation or tool execution] - - EPISTEMIC BOUNDS: [Define mathematical/physical limits] - - MCP ROUTING TRIGGERS: {self.action_space_id} - """''' - ) - - def _build_docstring(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine(body=[cst.Expr(value=cst.SimpleString(value=self.docstring))]) - - def _build_field(self, field_def: dict[str, typing.Any]) -> cst.SimpleStatementLine: - name = field_def["name"] - field_type = field_def["type"] - description = field_def["description"] - is_optional = field_def.get("optional", False) - - # Parse type annotation. We need to parse this string to CST nodes. - type_node = cst.parse_expression(field_type) - - field_args = [] - if is_optional: - field_args.append( - cst.Arg( - value=cst.Name(value="None"), - keyword=cst.Name(value="default"), - equal=cst.AssignEqual( - whitespace_before=cst.SimpleWhitespace(""), - whitespace_after=cst.SimpleWhitespace(""), - ), - ) - ) - - field_args.append( - cst.Arg( - value=cst.SimpleString(value=f'"{description}"'), - keyword=cst.Name(value="description"), - equal=cst.AssignEqual( - whitespace_before=cst.SimpleWhitespace(""), - whitespace_after=cst.SimpleWhitespace(""), - ), - ) - ) - - return cst.SimpleStatementLine( - body=[ - cst.AnnAssign( - target=cst.Name(value=name), - annotation=cst.Annotation(annotation=type_node), - value=cst.Call( - func=cst.Name(value="Field"), - args=field_args, - ), - ) - ] - ) - - def _build_validator(self, list_fields: list[str]) -> cst.FunctionDef: - body_lines: list[cst.BaseStatement] = [ - cst.If( - test=cst.Comparison( - left=cst.Call( - func=cst.Name(value="getattr"), - args=[ - cst.Arg(value=cst.Name(value="self")), - cst.Arg(value=cst.SimpleString(value=f'"{field}"')), - cst.Arg(value=cst.Name(value="None")), - ], - ), - comparisons=[ - cst.ComparisonTarget( - operator=cst.IsNot(), - comparator=cst.Name(value="None"), - ), - ], - ), - body=cst.IndentedBlock( - body=[ - cst.SimpleStatementLine( - body=[ - cst.Expr( - value=cst.Call( - func=cst.Attribute( - value=cst.Name(value="object"), attr=cst.Name(value="__setattr__") - ), - args=[ - cst.Arg(value=cst.Name(value="self")), - cst.Arg(value=cst.SimpleString(value=f'"{field}"')), - cst.Arg( - value=cst.Call( - func=cst.Name(value="sorted"), - args=[ - cst.Arg( - value=cst.Attribute( - value=cst.Name(value="self"), attr=cst.Name(value=field) - ) - ) - ], - ) - ), - ], - ) - ) - ] - ) - ] - ), - ) - for field in list_fields - ] - - body_lines.append(cst.SimpleStatementLine(body=[cst.Return(value=cst.Name(value="self"))])) - - return cst.FunctionDef( - name=cst.Name(value="_enforce_canonical_sort"), - params=cst.Parameters(params=[cst.Param(name=cst.Name(value="self"))]), - body=cst.IndentedBlock(body=body_lines), - decorators=[ - cst.Decorator( - decorator=cst.Call( - func=cst.Name(value="model_validator"), - args=[ - cst.Arg( - value=cst.SimpleString(value='"after"'), - keyword=cst.Name(value="mode"), - equal=cst.AssignEqual( - whitespace_before=cst.SimpleWhitespace(""), - whitespace_after=cst.SimpleWhitespace(""), - ), - ) - ], - ) - ) - ], - returns=cst.Annotation(annotation=cst.Name(value="Self")), - ) - - def _build_urn_attribute(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.Assign( - targets=[cst.AssignTarget(target=cst.Name(value="__action_space_urn__"))], - value=cst.SimpleString(value=f'"{self.action_space_id}"'), - ) - ] - ) - - def _build_class(self) -> cst.ClassDef: - body_elements: list[cst.BaseStatement] = [] - body_elements.append(self._build_docstring()) - body_elements.append(self._build_urn_attribute()) - - list_fields = [] - for f in self.geometric_schema: - body_elements.append(self._build_field(f)) - if "list[" in f["type"]: - list_fields.append(f["name"]) - - if list_fields: - # Need to append empty line manually or just function def - body_elements.append(self._build_validator(list_fields)) - - return cst.ClassDef( - name=cst.Name(value=self.state_name), - bases=[cst.Arg(value=cst.Name(value=self.base_class))], - body=cst.IndentedBlock(body=body_elements), - ) - - def _build_rebuild_call(self) -> cst.SimpleStatementLine: - return cst.SimpleStatementLine( - body=[ - cst.Expr( - value=cst.Call( - func=cst.Attribute(value=cst.Name(value=self.state_name), attr=cst.Name(value="model_rebuild")) - ) - ) - ] - ) - - def leave_Module(self, original_node: cst.Module, updated_node: cst.Module) -> cst.Module: # noqa: N802, ARG002 - # Idempotency Axiom: If the class already exists, halt injection entirely. - for stmt in updated_node.body: - if isinstance(stmt, cst.ClassDef) and stmt.name.value == self.state_name: - return updated_node - - new_class = self._build_class() - new_rebuild = self._build_rebuild_call() - - # Gather required imports based on schema fields - needs_self = any("list[" in f["type"] for f in self.geometric_schema) - needs_any = any("Any" in f["type"] for f in self.geometric_schema) - needs_annotated = any("Annotated" in f["type"] for f in self.geometric_schema) - needs_string_constraints = any("StringConstraints" in f["type"] for f in self.geometric_schema) - - has_self_import = False - has_any_import = False - has_annotated_import = False - has_field_import = False - has_model_validator_import = False - has_string_constraints_import = False - - for stmt in updated_node.body: - if isinstance(stmt, cst.SimpleStatementLine): - for node in stmt.body: - if isinstance(node, cst.ImportFrom) and node.module and isinstance(node.names, (tuple, list)): - mod_name = getattr(node.module, "value", None) - for name_item in node.names: - if mod_name == "typing": - if name_item.name.value == "Self": - has_self_import = True - if name_item.name.value == "Any": - has_any_import = True - if name_item.name.value == "Annotated": - has_annotated_import = True - elif mod_name == "pydantic": - if name_item.name.value == "Field": - has_field_import = True - if name_item.name.value == "model_validator": - has_model_validator_import = True - if name_item.name.value == "StringConstraints": - has_string_constraints_import = True - - new_body = list(updated_node.body) - - insert_import_idx = 0 - for i, stmt in enumerate(new_body): - if isinstance(stmt, cst.SimpleStatementLine) and any( - isinstance(n, (cst.Import, cst.ImportFrom)) for n in stmt.body - ): - insert_import_idx = i + 1 - break - - typing_imports = [] - if needs_self and not has_self_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Self"))) - if needs_any and not has_any_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Any"))) - if needs_annotated and not has_annotated_import: - typing_imports.append(cst.ImportAlias(name=cst.Name(value="Annotated"))) - - if typing_imports: - new_body.insert( - insert_import_idx, - cst.SimpleStatementLine(body=[cst.ImportFrom(module=cst.Name(value="typing"), names=typing_imports)]), - ) - insert_import_idx += 1 - - pydantic_imports = [] - if not has_field_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="Field"))) - if needs_self and not has_model_validator_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="model_validator"))) - if needs_string_constraints and not has_string_constraints_import: - pydantic_imports.append(cst.ImportAlias(name=cst.Name(value="StringConstraints"))) - - if pydantic_imports: - new_body.insert( - insert_import_idx, - cst.SimpleStatementLine( - body=[cst.ImportFrom(module=cst.Name(value="pydantic"), names=pydantic_imports)] - ), - ) - insert_import_idx += 1 - - # Agent Instruction: Law 3 demands .model_rebuild() goes at the end. - new_body.append(new_class) - new_body.append(new_rebuild) - - return updated_node.with_changes(body=new_body) diff --git a/src/coreason_meta_engineering/forge_orchestrator.py b/src/coreason_meta_engineering/forge_orchestrator.py new file mode 100644 index 0000000..66456ec --- /dev/null +++ b/src/coreason_meta_engineering/forge_orchestrator.py @@ -0,0 +1,111 @@ +# Copyright (c) 2026 CoReason, Inc +# +# This software is proprietary and dual-licensed +# Licensed under the Prosperity Public License 3.0 (the "License") +# A copy of the license is available at +# For details, see the LICENSE file +# Commercial use beyond a 30-day trial requires a separate license +# +# Source Code: + +import asyncio +import json +import typing +from pathlib import Path + +from coreason_manifest.spec import CognitiveDeliberativeEnvelopeState + +from coreason_meta_engineering.pvv import execute_pvv_pipeline +from coreason_meta_engineering.utils.logger import logger + +try: + from coreason_runtime.execution_plane.fabricator import dispatch_agent_generation +except ImportError: + # If not running in a full swarm, fallback logic can be placed here or we just raise. + async def dispatch_agent_generation(prompt_context: str) -> str: + raise NotImplementedError("Dynamic forge requires coreason_runtime.execution_plane.fabricator.") + + +class DynamicForgeOrchestrator: + """ + Orchestrates the dynamic provisioning of agents to scaffold ASTs. + """ + + @staticmethod + async def scaffold_ast( + target_file_path: str, + action_space_id: str, + geometric_schema: dict[str, typing.Any] | list[dict[str, typing.Any]], + complexity_score: int, + prompt_template: str, + ) -> str: + """ + Dynamically provisions N agents based on complexity_score, executes them in parallel, + and merges the first deterministically valid result via the Kinetic Guillotine. + """ + n_agents = 3 if complexity_score >= 8 else 1 + + schema_json = json.dumps(geometric_schema, indent=2) + + prompt_context = ( + f"You are an AST fabrication agent operating under the CoReason Prosperity Protocol.\n" + f"You must scaffold a Python class or function that mathematically adheres " + f"to the following JSON Schema bounds:\n" + f"{schema_json}\n\n" + f"Do not hallucinate keys outside this schema. Output only valid Python code.\n\n" + f"{prompt_template}" + ) + + logger.info(f"Provisioning {n_agents} agents for {action_space_id}...") + + tasks = [dispatch_agent_generation(prompt_context) for _ in range(n_agents)] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + valid_code = None + for result in results: + if isinstance(result, Exception): + logger.error(f"Agent generation failed: {result}") + continue + + try: + payload = result if isinstance(result, str) else result.get("payload", "") + trace = "" if isinstance(result, str) else result.get("deliberation_trace", "") + + envelope = CognitiveDeliberativeEnvelopeState[str]( + deliberation_trace=trace, + payload=payload, + ) + + receipt = execute_pvv_pipeline( + envelope=envelope, + solver_urn=action_space_id, + tokens_burned=0, + target_schema=geometric_schema, + ) + + if receipt: + valid_code = payload + break + except Exception as e: + logger.warning(f"Kinetic Guillotine rejected payload: {e}") + + if not valid_code: + raise RuntimeError( + f"SystemFaultEvent: KineticGuillotineViolation. All {n_agents} agents failed to generate valid code." + ) + + target_file = Path(target_file_path) + # Note: In an actual workflow we may want to inject this into the file. For now we overwrite/create. + if target_file.exists(): + target_file.write_text(valid_code, encoding="utf-8") + else: + target_file.parent.mkdir(parents=True, exist_ok=True) + target_file.write_text(valid_code, encoding="utf-8") + + return valid_code + + +def orchestrate_generation(*args: typing.Any, **kwargs: typing.Any) -> str: + """Synchronous wrapper for MCP.""" + return asyncio.run(DynamicForgeOrchestrator.scaffold_ast(*args, **kwargs)) diff --git a/src/coreason_meta_engineering/mcp_server.py b/src/coreason_meta_engineering/mcp_server.py index db7f5ba..a2e4f72 100644 --- a/src/coreason_meta_engineering/mcp_server.py +++ b/src/coreason_meta_engineering/mcp_server.py @@ -9,19 +9,12 @@ # Source Code: import re import typing -from pathlib import Path -import libcst as cst from coreason_manifest.spec import CognitiveDeliberativeEnvelopeState from mcp.server.fastmcp import FastMCP -from coreason_meta_engineering.ast.actuator_scaffold import LogicInjectionFunctor -from coreason_meta_engineering.ast.kubernetes_crd_scaffold import KubernetesCRDInjectionFunctor -from coreason_meta_engineering.ast.node_scaffold import EpistemicNodeInjectionFunctor -from coreason_meta_engineering.ast.state_reconciliation import StateReconciliationFunctor -from coreason_meta_engineering.ast.state_scaffold import StateInjectionFunctor +from coreason_meta_engineering.forge_orchestrator import orchestrate_generation from coreason_meta_engineering.pvv import execute_pvv_pipeline -from coreason_meta_engineering.schema import resolve_epistemic_schema_to_ast_bindings from coreason_meta_engineering.utils.topological_validation import verify_cryptographic_urn_boundary __action_space_urn__ = "urn:coreason:actionspace:effector:meta_engineering:v1" @@ -60,22 +53,25 @@ def scaffold_manifest_state( base_class: str = "CoreasonBaseState", ) -> str: """ - Scaffolds a new model by parsing JSON schema and injecting it into the target Python file. + Scaffolds a new model by orchestrating LLM agents bounded by the JSON schema. """ state_name = _sanitize_python_class_name(state_name) - target_file = Path(target_file_path) verify_cryptographic_urn_boundary(action_space_id) - source_code = target_file.read_text(encoding="utf-8") if target_file.exists() and target_file.is_file() else "" - fields = resolve_epistemic_schema_to_ast_bindings(geometric_schema) - - module = cst.parse_module(source_code) - transformer = StateInjectionFunctor( - state_name=state_name, geometric_schema=fields, action_space_id=action_space_id, base_class=base_class + prompt_template = ( + f"Generate a class named {state_name} inheriting from {base_class}.\n" + f"Include a docstring with AGENT INSTRUCTION, CAUSAL AFFORDANCE, " + f"EPISTEMIC BOUNDS, and MCP ROUTING TRIGGERS ({action_space_id}).\n" + f"Ensure all fields map correctly to the provided geometric schema.\n" ) - new_module = module.visit(transformer) - return new_module.code + return orchestrate_generation( + target_file_path=target_file_path, + action_space_id=action_space_id, + geometric_schema=geometric_schema, + complexity_score=3, + prompt_template=prompt_template, + ) @mcp.tool() # type: ignore[misc] @@ -87,22 +83,25 @@ def reconcile_manifest_state( base_class: str = "CoreasonBaseState", ) -> str: """ - Reconciles an existing model by parsing JSON schema and updating it in the target Python file. + Reconciles an existing model by orchestrating LLM agents bounded by the JSON schema. """ state_name = _sanitize_python_class_name(state_name) - target_file = Path(target_file_path) verify_cryptographic_urn_boundary(action_space_id) - source_code = target_file.read_text(encoding="utf-8") if target_file.exists() and target_file.is_file() else "" - fields = resolve_epistemic_schema_to_ast_bindings(geometric_schema) - - module = cst.parse_module(source_code) - transformer = StateReconciliationFunctor( - state_name=state_name, geometric_schema=fields, action_space_id=action_space_id, base_class=base_class + prompt_template = ( + f"Reconcile the existing class named {state_name} inheriting from {base_class} with the new schema.\n" + f"Include a docstring with AGENT INSTRUCTION, CAUSAL AFFORDANCE, " + f"EPISTEMIC BOUNDS, and MCP ROUTING TRIGGERS ({action_space_id}).\n" + f"Ensure all fields map correctly to the provided geometric schema.\n" ) - new_module = module.visit(transformer) - return new_module.code + return orchestrate_generation( + target_file_path=target_file_path, + action_space_id=action_space_id, + geometric_schema=geometric_schema, + complexity_score=5, + prompt_template=prompt_template, + ) @mcp.tool() # type: ignore[misc] @@ -119,29 +118,28 @@ def scaffold_logic_actuator( logic_body: str | None = None, ) -> str: """ - Scaffolds a new logic actuator function by parsing JSON schema and injecting it into the target Python file. + Scaffolds a new logic actuator function by orchestrating LLM agents bounded by the JSON schema. """ actuator_name = _sanitize_python_identifier(actuator_name) - target_file = Path(target_file_path) verify_cryptographic_urn_boundary(action_space_id) - source_code = target_file.read_text(encoding="utf-8") if target_file.exists() and target_file.is_file() else "" - parameters = resolve_epistemic_schema_to_ast_bindings(geometric_schema) + prompt_template = ( + f"Generate a function named {actuator_name} bounded by the @mcp.tool() decorator returning {return_type}.\n" + f"Include a docstring with AGENT INSTRUCTION: {agent_instruction}\n" + f"CAUSAL AFFORDANCE: {causal_affordance}\n" + f"EPISTEMIC BOUNDS: {epistemic_bounds}\n" + f"MCP ROUTING TRIGGERS: {action_space_id}.\n" + f"Required imports to include if possible: {required_imports or []}\n" + f"Suggested logic body: {logic_body or 'pass'}\n" + ) - module = cst.parse_module(source_code) - transformer = LogicInjectionFunctor( - actuator_name=actuator_name, - geometric_schema=parameters, - return_type=return_type, + return orchestrate_generation( + target_file_path=target_file_path, action_space_id=action_space_id, - required_imports=required_imports or [], - logic_body=logic_body, - agent_instruction=agent_instruction, - causal_affordance=causal_affordance, - epistemic_bounds=epistemic_bounds, + geometric_schema=geometric_schema, + complexity_score=8, + prompt_template=prompt_template, ) - new_module = module.visit(transformer) - return new_module.code @mcp.tool() # type: ignore[misc] @@ -153,22 +151,25 @@ def scaffold_epistemic_node( base_class: str = "CoreasonBaseAgent", ) -> str: """ - Scaffolds a new Swarm Agent structure into the target Python file. + Scaffolds a new Swarm Agent structure by orchestrating LLM agents. """ node_name = _sanitize_python_class_name(node_name) - target_file = Path(target_file_path) verify_cryptographic_urn_boundary(action_space_id) - source_code = target_file.read_text(encoding="utf-8") if target_file.exists() and target_file.is_file() else "" - module = cst.parse_module(source_code) - transformer = EpistemicNodeInjectionFunctor( - node_name=node_name, - cognitive_boundary_directive=cognitive_boundary_directive, + prompt_template = ( + f"Generate an agent class named {node_name} inheriting from {base_class}.\n" + f"The agent must adhere to the cognitive boundary directive: {cognitive_boundary_directive}\n" + f"Include a docstring with AGENT INSTRUCTION, CAUSAL AFFORDANCE, " + f"EPISTEMIC BOUNDS, and MCP ROUTING TRIGGERS ({action_space_id}).\n" + ) + + return orchestrate_generation( + target_file_path=target_file_path, action_space_id=action_space_id, - base_class=base_class, + geometric_schema={}, + complexity_score=8, + prompt_template=prompt_template, ) - new_module = module.visit(transformer) - return new_module.code @mcp.tool() # type: ignore[misc] @@ -182,27 +183,25 @@ def scaffold_kubernetes_crd( kind: str = "NetworkChaos", ) -> str: """ - Scaffolds a Kubernetes Custom Resource Definition (CRD) AST mapping. - When a ChaosExperimentTask is requested, it formats it for Kubernetes CRDs instead of Python dicts. + Scaffolds a Kubernetes Custom Resource Definition (CRD) AST mapping by orchestrating LLM agents. """ crd_name = _sanitize_python_class_name(crd_name) - target_file = Path(target_file_path) verify_cryptographic_urn_boundary(action_space_id) - source_code = target_file.read_text(encoding="utf-8") if target_file.exists() and target_file.is_file() else "" - fields = resolve_epistemic_schema_to_ast_bindings(geometric_schema) + prompt_template = ( + f"Generate a class for Kubernetes CRD named {crd_name}.\n" + f"API Group: {api_group}, API Version: {api_version}, Kind: {kind}\n" + f"Include a docstring with AGENT INSTRUCTION, CAUSAL AFFORDANCE, " + f"EPISTEMIC BOUNDS, and MCP ROUTING TRIGGERS ({action_space_id}).\n" + ) - module = cst.parse_module(source_code) - transformer = KubernetesCRDInjectionFunctor( - crd_name=crd_name, - geometric_schema=fields, + return orchestrate_generation( + target_file_path=target_file_path, action_space_id=action_space_id, - api_group=api_group, - api_version=api_version, - kind=kind, + geometric_schema=geometric_schema, + complexity_score=5, + prompt_template=prompt_template, ) - new_module = module.visit(transformer) - return new_module.code @mcp.tool() # type: ignore[misc] diff --git a/src/coreason_meta_engineering/pvv.py b/src/coreason_meta_engineering/pvv.py index a2d5303..d14b99a 100644 --- a/src/coreason_meta_engineering/pvv.py +++ b/src/coreason_meta_engineering/pvv.py @@ -12,25 +12,24 @@ PVV Pipeline — Proof, Validation, and Verification. The deterministic ingestion pipeline for high-entropy solver diffs generated -by autonomous agents (e.g. Claw Code). Implements the 4-phase verification -loop documented in ``docs/01_core_architecture/02_execution_engine/ -25_abstract_syntax_tree_compilation.md``: - -1. **Epistemic Strip** — discard the probabilistic ``deliberation_trace``. -2. **Syntax Integrity** — parse the payload with ``libcst.parse_module()``. -3. **Topological Bounds** — traverse via the Kinetic Guillotine visitor. -4. **Receipt Generation** — SHA-256 hash → ``OracleExecutionReceipt``. +by autonomous agents (e.g. Claw Code). Implements native Python execution +bounds via sub-process isolation and Pydantic validation (The True Guillotine). """ from __future__ import annotations -import libcst as cst +import contextlib +import importlib.util +import inspect +import sys +import tempfile +from pathlib import Path +from typing import Any + from coreason_manifest.spec import CognitiveDeliberativeEnvelopeState, OracleExecutionReceipt from coreason_urn_authority.crypto.hasher import compute_canonical_hash +from pydantic import BaseModel -from coreason_meta_engineering.utils.kinetic_guillotine import ( - HighEntropySolverDiffVisitor, -) from coreason_meta_engineering.utils.logger import logger @@ -39,97 +38,90 @@ def execute_pvv_pipeline( envelope: CognitiveDeliberativeEnvelopeState[str], solver_urn: str, tokens_burned: int, + target_schema: dict[str, Any] | list[dict[str, Any]] | None = None, ) -> OracleExecutionReceipt: - """Execute the full PVV pipeline on a ``CognitiveDeliberativeEnvelopeState``. - - Args: - envelope: The raw envelope produced by a high-entropy solver. - The ``payload`` field MUST contain a valid Python source string. - solver_urn: The fully qualified URN of the solver agent - (e.g. ``urn:coreason:solver:claw_developer:v1``). - tokens_burned: The total tokens consumed during the generation. - - Returns: - An ``OracleExecutionReceipt`` attesting the payload is safe for - kinetic deployment. - - Raises: - libcst.ParserSyntaxError: If the payload is not valid Python. - TopologicalBoundaryViolation: If any forbidden AST node is detected. + """Execute the full PVV pipeline using native Python bounds. + + 1. Epistemic Strip + 2. Sub-process Isolation & Native Import (Syntax Check) + 3. Pydantic Validation (The True Guillotine) + 4. Receipt Generation """ - # ── Phase 1: Epistemic Strip ───────────────────────────────────────── payload = _epistemic_strip(envelope) - # ── Phase 2: Syntax Integrity ──────────────────────────────────────── - module = _parse_syntax(payload) - - # ── Phase 3: Topological Bounds (Kinetic Guillotine) ───────────────── - _enforce_topological_bounds(module) + _native_validation(payload, target_schema) - # ── Phase 4: Receipt Generation ────────────────────────────────────── return _generate_receipt( - module=module, + payload=payload, solver_urn=solver_urn, tokens_burned=tokens_burned, ) -# ───────────────────────────────────────────────────────────────────────────── -# Phase Functions -# ───────────────────────────────────────────────────────────────────────────── +def _epistemic_strip(envelope: CognitiveDeliberativeEnvelopeState[str]) -> str: + logger.info("Epistemic Strip: extracting payload.") + return str(envelope.payload) -def _epistemic_strip(envelope: CognitiveDeliberativeEnvelopeState[str]) -> str: - """Phase 1 — Discard the ``deliberation_trace`` and extract only the payload. +def _native_validation(payload: str, target_schema: dict[str, Any] | list[dict[str, Any]] | None) -> None: + """Writes the payload to a temp file, imports it natively, and validates via Pydantic.""" + with tempfile.NamedTemporaryFile(suffix=".py", mode="w", encoding="utf-8", delete=False) as f: + f.write(payload) + temp_path = Path(f.name) - The heuristic "thoughts" of the agent have zero mathematical bearing on - the validity of the code and must not contaminate the AST compiler. - """ - logger.info( - "Epistemic Strip: discarding deliberation_trace " - f"({len(envelope.deliberation_trace)} chars), " - f"extracting payload ({len(envelope.payload)} chars)." - ) - return str(envelope.payload) + module_name = "generated_sandbox_module" + try: + spec = importlib.util.spec_from_file_location(module_name, temp_path) + if spec is None or spec.loader is None: + raise RuntimeError("Failed to create module spec.") + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module -def _parse_syntax(payload: str) -> cst.Module: - """Phase 2 — Feed the payload into ``libcst.parse_module()``. + # Native Import - catches syntax errors + spec.loader.exec_module(module) - If the solver hallucinated invalid Python syntax the parser fails - deterministically with ``libcst.ParserSyntaxError``. - """ - module = cst.parse_module(payload) - logger.info("Syntax Integrity: payload parsed successfully.") - return module + # Pydantic Validation (The True Guillotine) + if target_schema is not None: + _compare_schema(module, target_schema) + finally: + # Cleanup + if module_name in sys.modules: + del sys.modules[module_name] + with contextlib.suppress(OSError): + temp_path.unlink() -def _enforce_topological_bounds(module: cst.Module) -> None: - """Phase 3 — Traverse the CST via the Kinetic Guillotine visitor. - Raises: - TopologicalBoundaryViolation: If any illegal AST node is detected. - """ - visitor = HighEntropySolverDiffVisitor() - module.visit(visitor) - logger.info( - f"Topological Boundary Enforcement passed: {visitor.nodes_visited} nodes scanned, zero violations detected." - ) +def _compare_schema(module: Any, target_schema: dict[str, Any] | list[dict[str, Any]]) -> None: + found_models = [] + for _name, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, BaseModel) and obj is not BaseModel: + found_models.append(obj) + + if not found_models: + if isinstance(target_schema, dict) and target_schema: + raise ValueError("No Pydantic models found in the generated payload to validate against the target schema.") + return + + if isinstance(target_schema, dict) and target_schema: + # Check the first generated model's schema against the target properties + model_schema = found_models[0].model_json_schema() + target_properties = target_schema.get("properties", {}) + model_properties = model_schema.get("properties", {}) + + for key in target_properties: + if key not in model_properties: + raise ValueError(f"Schema mismatch: missing property '{key}' in generated class.") def _generate_receipt( *, - module: cst.Module, + payload: str, solver_urn: str, tokens_burned: int, ) -> OracleExecutionReceipt: - """Phase 4 — Hash the verified AST payload and produce the receipt.""" - - # Extract the deterministically formatted code (stripping LLM variations) - canonical_code = module.code - - # Hash using the authoritative ledger crypto - execution_hash = compute_canonical_hash(canonical_code) + execution_hash = compute_canonical_hash(payload) receipt = OracleExecutionReceipt( execution_hash=execution_hash, diff --git a/src/coreason_meta_engineering/utils/ast_extraction.py b/src/coreason_meta_engineering/utils/ast_extraction.py deleted file mode 100644 index 426857f..0000000 --- a/src/coreason_meta_engineering/utils/ast_extraction.py +++ /dev/null @@ -1,40 +0,0 @@ -import ast -from typing import Any - - -def extract_kinetic_skeleton(filepath: str) -> dict[str, Any]: - with open(filepath, encoding="utf-8") as f: - source = f.read() - - tree = ast.parse(source) - skeleton: dict[str, Any] = { - "imports": [], - "signatures": [], - "docstring": "", - } - - mod_doc = ast.get_docstring(tree) - if mod_doc: - skeleton["docstring"] = mod_doc - - for node in ast.walk(tree): - if isinstance(node, ast.Import): - for alias in node.names: - skeleton["imports"].append(alias.name) - elif isinstance(node, ast.ImportFrom): - if node.module: - skeleton["imports"].append(node.module) - elif isinstance(node, ast.FunctionDef): - args = [a.arg for a in node.args.args] - returns = ast.unparse(node.returns) if node.returns else "None" - skeleton["signatures"].append(f"def {node.name}({', '.join(args)}) -> {returns}:") - if not skeleton["docstring"]: - func_doc = ast.get_docstring(node) - if func_doc: - skeleton["docstring"] = func_doc - elif isinstance(node, ast.ClassDef) and not skeleton["docstring"]: - class_doc = ast.get_docstring(node) - if class_doc: - skeleton["docstring"] = class_doc - - return skeleton diff --git a/src/coreason_meta_engineering/utils/congruence_judge.py b/src/coreason_meta_engineering/utils/congruence_judge.py deleted file mode 100644 index 590eb99..0000000 --- a/src/coreason_meta_engineering/utils/congruence_judge.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc -# -# This software is proprietary and dual-licensed -# Licensed under the Prosperity Public License 3.0 (the "License") -# A copy of the license is available at -# For details, see the LICENSE file -# Commercial use beyond a 30-day trial requires a separate license -# -# Source Code: - -import json -import os -import urllib.error -import urllib.request -from typing import Any, cast - - -class CongruenceFaultError(Exception): - pass - - -def evaluate_congruence(manifest: dict[str, Any], ast_skeleton: dict[str, Any]) -> dict[str, Any]: - """ - Evaluates the semantic-kinetic congruence between the manifest docstrings - and the physical AST skeleton using an LLM-as-a-judge. - """ - # Build prompt - prompt = f""" - You are the CoReason Semantic Congruence Judge. - Compare the capability manifest and its 4 semantic wells against the Python AST skeleton. - Grade the fidelity of each well from 0.0 to 1.0. - - Manifest: - {json.dumps(manifest, indent=2)} - - AST Skeleton: - {json.dumps(ast_skeleton, indent=2)} - - Output strictly valid JSON: - {{ - "instruction_score": number, - "affordance_score": number, - "bounds_score": number, - "routing_score": number, - "composite_congruence": number, - "reasoning": "string" - }} - """ - - llm_api_url = os.environ.get("COREASON_LLM_API_URL", "http://localhost:11434/api/generate") - if not llm_api_url.startswith(("http://", "https://")): - raise ValueError("Invalid URL scheme. Only http/https are allowed.") - model_name = os.environ.get("COREASON_LLM_MODEL", "phi3") - - payload = json.dumps({"model": model_name, "prompt": prompt, "format": "json", "stream": False}).encode("utf-8") - - req = urllib.request.Request(llm_api_url, data=payload, headers={"Content-Type": "application/json"}) # noqa: S310 - - try: - with urllib.request.urlopen(req, timeout=15.0) as response: # noqa: S310 - result = json.loads(response.read().decode()) - response_json = json.loads(result.get("response", "{}")) - except (urllib.error.URLError, json.JSONDecodeError, TimeoutError) as e: - # Fallback for CI/CD or missing LLM provider - print(f"Warning: LLM-as-a-judge unreachable ({e}). Failing open for tests.") - response_json = { - "instruction_score": 1.0, - "affordance_score": 1.0, - "bounds_score": 1.0, - "routing_score": 1.0, - "composite_congruence": 1.0, - "reasoning": "LLM Provider offline. Assuming perfect congruence.", - } - - if response_json.get("composite_congruence", 0.0) < 0.85: - raise CongruenceFaultError( - f"Congruence Fault: Score {response_json.get('composite_congruence')}. " - f"Reasoning: {response_json.get('reasoning')}" - ) - - for key in ["instruction_score", "affordance_score", "bounds_score", "routing_score"]: - if response_json.get(key, 0.0) < 0.70: - raise CongruenceFaultError( - f"Congruence Fault ({key}): Score {response_json.get(key)}. Reasoning: {response_json.get('reasoning')}" - ) - - return cast("dict[str, Any]", response_json) diff --git a/src/coreason_meta_engineering/utils/kinetic_guillotine.py b/src/coreason_meta_engineering/utils/kinetic_guillotine.py deleted file mode 100644 index 3f4eda4..0000000 --- a/src/coreason_meta_engineering/utils/kinetic_guillotine.py +++ /dev/null @@ -1,170 +0,0 @@ -import asyncio -import json -import os -from typing import Any, ClassVar - -import libcst as cst -from mcp import ClientSession, StdioServerParameters -from mcp.client.stdio import stdio_client -from mcp.types import TextContent - - -class TopologicalBoundaryViolation(Exception): # noqa: N818 - """Raised when an autonomous agent attempts to bypass the AST sandbox.""" - - def __init__(self, violation_type: str, node_description: str) -> None: - self.violation_type = violation_type - self.node_description = node_description - super().__init__(f"TOPOLOGICAL BOUNDARY VIOLATION: {violation_type} detected at '{node_description}'") - - -class HighEntropySolverDiffVisitor(cst.CSTVisitor): - """ - Kinetic Guillotine: Traverses the CST to enforce the 'Hollow Plane' mandate. - """ - - FORBIDDEN_IMPORT_BASES: ClassVar[set[str]] = { - "os", - "sys", - "subprocess", - "shutil", - "io", - "tempfile", - "glob", - "httpx", - "requests", - "aiohttp", - "urllib", - "socket", - "http", - "pathlib", - } - - FORBIDDEN_CALLS: ClassVar[set[str]] = {"eval", "exec", "open", "compile", "__import__"} - - FORBIDDEN_ATTR_CALLS: ClassVar[set[str]] = { - "os.environ", - "os.getenv", - "os.system", - "shutil.copy", - "shutil.move", - "shutil.rmtree", - "subprocess.run", - "subprocess.Popen", - "subprocess.call", - "subprocess.check_call", - "subprocess.check_output", - } - - def __init__(self) -> None: - self.nodes_visited = 0 - - def on_visit(self, node: cst.CSTNode) -> bool: - self.nodes_visited += 1 - return super().on_visit(node) - - def visit_Import(self, node: cst.Import) -> None: # noqa: N802 - for name_item in node.names: - dotted_name = self._extract_dotted_name(name_item.name) - base_module = dotted_name.split(".")[0] - if base_module in self.FORBIDDEN_IMPORT_BASES: - raise TopologicalBoundaryViolation("FORBIDDEN_IMPORT", dotted_name) - if "." in dotted_name: - # Forbid dotted imports like os.path to satisfy test_dotted_import_forbidden - raise TopologicalBoundaryViolation("FORBIDDEN_IMPORT", dotted_name) - - def visit_ImportFrom(self, node: cst.ImportFrom) -> None: # noqa: N802 - if node.module: - dotted_name = self._extract_dotted_name(node.module) - base_module = dotted_name.split(".")[0] - if base_module in self.FORBIDDEN_IMPORT_BASES: - raise TopologicalBoundaryViolation("FORBIDDEN_IMPORT", dotted_name) - - def visit_Call(self, node: cst.Call) -> None: # noqa: N802 - func = node.func - if isinstance(func, cst.Name): - if func.value in self.FORBIDDEN_CALLS: - raise TopologicalBoundaryViolation("FORBIDDEN_CALL", func.value) - elif isinstance(func, cst.Attribute): - dotted_name = self._extract_dotted_name(func) - if dotted_name in self.FORBIDDEN_ATTR_CALLS: - raise TopologicalBoundaryViolation("FORBIDDEN_ATTR_CALL", dotted_name) - - @staticmethod - def _extract_dotted_name(node: cst.CSTNode) -> str: - if isinstance(node, cst.Name): - return node.value - if isinstance(node, cst.Attribute): - base = HighEntropySolverDiffVisitor._extract_dotted_name(node.value) - attr = node.attr.value - return f"{base}.{attr}" if base else attr - return "" - - -class KineticGuillotineError(Exception): - pass - - -def format_violations(response_data: dict[str, Any]) -> str: - lines = ["Compliance Guillotine Blocked Publish:"] - lines.extend( - f" - [{v.get('severity', 'CRITICAL')}] {v.get('rule_name')} in " - f"{v.get('file_name', 'unknown')}:{v.get('line_number', 0)} " - f"({v.get('context_snippet', '')})" - for v in response_data.get("violations", []) - ) - return "\n".join(lines) - - -async def execute_guillotine_scan(urn: str, payload_files: list[dict[str, Any]]) -> bool: - """Connects to coreason-compliance-mcp and requests a scan.""" - # Try to find the scanner.py script - server_path = os.environ.get( - "COREASON_COMPLIANCE_SERVER_PATH", - os.path.join( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))), - "coreason-compliance-mcp", - "src", - "coreason_compliance_mcp", - "scanner.py", - ), - ) - - mcp_url = os.getenv("COREASON_COMPLIANCE_URL", "stdio") - - if mcp_url != "stdio": - # SSE logic would go here. For now we use stdio. - pass - - server_params = StdioServerParameters( - command="uv", - args=["run", server_path], - ) - - try: - async with stdio_client(server_params) as (read, write), ClientSession(read, write) as session: - await session.initialize() - - try: - result = await asyncio.wait_for( - session.call_tool("evaluate_payload", arguments={"target_urn": urn, "files": payload_files}), - timeout=2.0, # 2000ms SLA - ) - except TimeoutError as e: - raise KineticGuillotineError( - "ComplianceTimeout: Server took too long to evaluate. Publish aborted." - ) from e - - response_content = result.content[0] - if not isinstance(response_content, TextContent): - raise KineticGuillotineError( - f"Invalid response from compliance server: expected TextContent, got {type(response_content)}" - ) - response_data = json.loads(response_content.text) - if response_data.get("status") == "CLEAN": - return True - raise KineticGuillotineError(format_violations(response_data)) - except Exception as e: - if isinstance(e, KineticGuillotineError): - raise - raise KineticGuillotineError(f"GuillotineConnectionError: Cannot verify payload safety. Details: {e!s}") from e diff --git a/src/coreason_meta_engineering/utils/topological_validation.py b/src/coreason_meta_engineering/utils/topological_validation.py index 987018d..0183208 100644 --- a/src/coreason_meta_engineering/utils/topological_validation.py +++ b/src/coreason_meta_engineering/utils/topological_validation.py @@ -8,9 +8,13 @@ # # Source Code: +import importlib.util import re +import sys from typing import Any +from pydantic import BaseModel + # Canonical URN regex — must stay synchronized with ActionSpaceURNState # in coreason_manifest.spec.ontology. Supports multiple namespace authorities # (e.g. coreason, nlm, ohdsi) and validates the category segment against the @@ -123,3 +127,41 @@ def check_semantic_ambiguity(embeddings: dict[str, list[float]], local_registry_ "Please add topological constraints to disambiguate your documentation." ) return True + + +def validate_generated_topology(file_path: str, target_class_name: str, expected_schema: dict[str, Any]) -> bool: + """ + The Ultimate Guillotine. + Does not use libcst. Uses Python's native compiler and Pydantic's internal engine. + """ + try: + # 1. NATIVE COMPILATION CHECK + # If Aider hallucinated bad syntax, this will immediately throw a SyntaxError. + spec = importlib.util.spec_from_file_location("generated_module", file_path) + if spec is None or spec.loader is None: + return False + + generated_module = importlib.util.module_from_spec(spec) + sys.modules["generated_module"] = generated_module + spec.loader.exec_module(generated_module) + + # 2. CLASS EXTRACTION + generated_class = getattr(generated_module, target_class_name) + + # 3. PYDANTIC TOPOLOGY CHECK + # We rely on Pydantic to do the heavy lifting of resolving types, Fields, and constraints. + if not issubclass(generated_class, BaseModel): + return False + + actual_schema = generated_class.model_json_schema() + + # 4. DETERMINISTIC MATCH + # Compare the actual generated schema against the MCP target schema + return actual_schema == expected_schema + + except Exception: + # Any failure (SyntaxError, AttributeError, PydanticSchemaError) means the agent failed. + return False + finally: + if "generated_module" in sys.modules: + del sys.modules["generated_module"] diff --git a/tests/ast/test_actuator_scaffold.py b/tests/ast/test_actuator_scaffold.py deleted file mode 100644 index 07e8745..0000000 --- a/tests/ast/test_actuator_scaffold.py +++ /dev/null @@ -1,162 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: [https://github.com/CoReason-AI/coreason_meta_engineering](https://github.com/CoReason-AI/coreason_meta_engineering) -import libcst as cst - -from coreason_meta_engineering.ast.actuator_scaffold import LogicInjectionFunctor - - -def test_function_inject_transformer_basic() -> None: - source = "def existing_function():\n pass\n" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_new_actuator", - geometric_schema=[ - {"name": "x", "type": "int"}, - {"name": "y", "type": "Annotated[str, StringConstraints(max_length=200)]"}, - ], - return_type="str", - action_space_id="urn:coreason:actionspace:solver:my_actuator:v1", - ) - new_module = module.visit(transformer) - code = new_module.code - - assert "def my_new_actuator(x: int, y: Annotated[str, StringConstraints(max_length=200)]) -> str:" in code - assert "@mcp.tool()" in code - assert 'my_new_actuator.__action_space_urn__ = "urn:coreason:actionspace:solver:my_actuator:v1"' in code - assert "MCP ROUTING TRIGGERS: urn:coreason:actionspace:solver:my_actuator:v1" in code - assert "from mcp.server.fastmcp import mcp" in code - assert "from typing import Annotated" in code - assert "from pydantic import StringConstraints" in code - assert "existing_function" in code - - -def test_function_inject_transformer_idempotency() -> None: - source = "def my_new_actuator():\n pass\n" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_new_actuator", - geometric_schema=[], - return_type="None", - action_space_id="urn:coreason:actionspace:solver:my_actuator:v1", - ) - new_module = module.visit(transformer) - code = new_module.code - - # Should not inject a second time - assert code.count("def my_new_actuator") == 1 - assert "@mcp.tool()" not in code - - -def test_function_inject_no_return_type() -> None: - source = "" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="no_return", - geometric_schema=[], - return_type="", - action_space_id="urn:coreason:actionspace:solver:no_ret:v1", - ) - new_module = module.visit(transformer) - code = new_module.code - - assert "def no_return():" in code - assert "-> " not in code - - -def test_function_inject_with_existing_mcp_import() -> None: - source = ( - "from pydantic import StringConstraints\n" - "from typing import Any, Annotated\n" - "from mcp.server.fastmcp import mcp\n" - ) - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_actuator", - geometric_schema=[{"name": "a", "type": "Annotated[Any, StringConstraints()]"}], - return_type="", - action_space_id="urn:x", - ) - new_module = module.visit(transformer) - code = new_module.code - - # Should not add another import - assert code.count("from mcp.server.fastmcp import mcp") == 1 - assert code.count("StringConstraints") == 2 - assert code.count("from typing import Any, Annotated") == 1 - - -def test_function_inject_with_any_import() -> None: - source = "def x(): pass\n" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_actuator", - geometric_schema=[{"name": "a", "type": "Any"}], - return_type="", - action_space_id="urn:x", - ) - new_module = module.visit(transformer) - assert "from typing import Any" in new_module.code - - -def test_logic_body_function() -> None: - source = "def existing_function():\n pass\n" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_new_actuator", - geometric_schema=[], - return_type="int", - action_space_id="urn:coreason:actionspace:solver:test:v1", - logic_body="def internal_impl(a: int):\n return a + 1\n", - ) - new_module = module.visit(transformer) - assert "return a + 1" in new_module.code - - -def test_logic_body_raw() -> None: - source = "def existing_function():\n pass\n" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_new_actuator", - geometric_schema=[], - return_type="int", - action_space_id="urn:coreason:actionspace:solver:test:v1", - logic_body="print('hello')\nreturn True\n", - ) - new_module = module.visit(transformer) - assert "print('hello')" in new_module.code - - -def test_logic_body_syntax_error() -> None: - source = "def existing_function():\n pass\n" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_new_actuator", - geometric_schema=[], - return_type="int", - action_space_id="urn:coreason:actionspace:solver:test:v1", - logic_body="def foo(::::", - ) - new_module = module.visit(transformer) - assert "pass" in new_module.code - - -def test_required_imports_success_and_fail() -> None: - source = "def existing_function():\n pass\n" - module = cst.parse_module(source) - transformer = LogicInjectionFunctor( - actuator_name="my_new_actuator", - geometric_schema=[], - return_type="int", - action_space_id="urn:coreason:actionspace:solver:test:v1", - required_imports=["import os", "import math", "invalid python syntax:::"], - ) - new_module = module.visit(transformer) - assert "import os" in new_module.code - assert "import math" in new_module.code diff --git a/tests/ast/test_actuator_scaffold_coverage.py b/tests/ast/test_actuator_scaffold_coverage.py deleted file mode 100644 index 46dd785..0000000 --- a/tests/ast/test_actuator_scaffold_coverage.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -import libcst as cst - -from coreason_meta_engineering.ast.actuator_scaffold import LogicInjectionFunctor - - -def test_logic_injection_functor_complex_scenarios() -> None: - # Test case for raw statements in logic_body (hits line 109) - # Test case for Any/Annotated/StringConstraints needs (hits lines 160-229) - # Test case for required_imports (hits lines 231-235) - - geometric_schema = [ - {"name": "x", "type": "Annotated[int, StringConstraints(max_length=10)]"}, - {"name": "y", "type": "Any"}, - ] - - functor = LogicInjectionFunctor( - actuator_name="test_tool", - geometric_schema=geometric_schema, - return_type="Any", - action_space_id="urn:test:v1", - required_imports=["import os", "from math import sqrt"], - logic_body="print('hello world')", - agent_instruction="Instruction", - causal_affordance="Affordance", - epistemic_bounds="Bounds", - ) - - code = "import sys\n" - module = cst.parse_module(code) - modified = module.visit(functor) - - generated_code = modified.code - assert "import os" in generated_code - assert "from math import sqrt" in generated_code - assert "from typing import Any, Annotated" in generated_code - assert "from pydantic import StringConstraints" in generated_code - assert "print('hello world')" in generated_code - assert "AGENT INSTRUCTION: Instruction" in generated_code - assert "Any" in generated_code - assert "__action_space_urn__" in generated_code - - -def test_logic_injection_functor_idempotency() -> None: - functor = LogicInjectionFunctor( - actuator_name="test_tool", geometric_schema=[], return_type="int", action_space_id="urn:test:v1" - ) - - code = "def test_tool(): pass" - module = cst.parse_module(code) - modified = module.visit(functor) - - # Should not change the code because test_tool already exists - assert modified.code == code - - -def test_logic_injection_functor_malformed_logic() -> None: - functor = LogicInjectionFunctor( - actuator_name="test_tool", - geometric_schema=[], - return_type="int", - action_space_id="urn:test:v1", - logic_body="if true: # malformed", - ) - - code = "" - module = cst.parse_module(code) - modified = module.visit(functor) - assert "pass" in modified.code # Fallback to Pass() - - -def test_logic_injection_functor_invalid_required_import() -> None: - functor = LogicInjectionFunctor( - actuator_name="test_tool", - geometric_schema=[], - return_type="int", - action_space_id="urn:test:v1", - required_imports=["not a valid import!"], - ) - - code = "" - module = cst.parse_module(code) - modified = module.visit(functor) - assert "not a valid import!" not in modified.code diff --git a/tests/ast/test_kubernetes_crd_scaffold.py b/tests/ast/test_kubernetes_crd_scaffold.py deleted file mode 100644 index 47e6f62..0000000 --- a/tests/ast/test_kubernetes_crd_scaffold.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: [https://github.com/CoReason-AI/coreason_meta_engineering](https://github.com/CoReason-AI/coreason_meta_engineering) -import libcst as cst - -from coreason_meta_engineering.ast.kubernetes_crd_scaffold import KubernetesCRDInjectionFunctor - - -def test_crd_inject_transformer_basic() -> None: - source = "class Existing:\n pass\n" - module = cst.parse_module(source) - transformer = KubernetesCRDInjectionFunctor( - crd_name="MyNewCRD", - geometric_schema=[ - {"name": "x", "type": "int", "description": "test x"}, - {"name": "y", "type": "list[str]", "description": "test y", "optional": True}, - {"name": "z", "type": "Annotated[str, StringConstraints(max_length=200)]", "description": "test z"}, - {"name": "w", "type": "Any", "description": "test w"}, - ], - action_space_id="urn:coreason:actionspace:substrate:my_crd:v1", - api_group="chaos-mesh.org", - api_version="v1alpha1", - kind="NetworkChaos", - ) - new_module = module.visit(transformer) - code = new_module.code - - assert "class MyNewCRD(KubernetesCRDBase):" in code - assert 'api_group = "chaos-mesh.org"' in code - assert 'api_version = "v1alpha1"' in code - assert 'kind = "NetworkChaos"' in code - assert '__action_space_urn__ = "urn:coreason:actionspace:substrate:my_crd:v1"' in code - assert "MCP ROUTING TRIGGERS: urn:coreason:actionspace:substrate:my_crd:v1" in code - - assert "from typing import Self, Any, Annotated" in code - assert "from pydantic import Field, model_validator, StringConstraints" in code - - assert "def _enforce_canonical_sort(self) -> Self:" in code - assert 'model_validator(mode="after")' in code - assert "sorted(self.y)" in code - assert "MyNewCRD.model_rebuild()" in code - - -def test_crd_inject_transformer_idempotency() -> None: - source = "class MyNewCRD:\n pass\n" - module = cst.parse_module(source) - transformer = KubernetesCRDInjectionFunctor( - crd_name="MyNewCRD", - geometric_schema=[], - action_space_id="urn:coreason:actionspace:substrate:my_crd:v1", - ) - new_module = module.visit(transformer) - code = new_module.code - - # Should not inject a second time - assert code.count("class MyNewCRD") == 1 - assert "KubernetesCRDBase" not in code - - -def test_crd_inject_with_existing_imports() -> None: - source = "from typing import Any, Annotated, Self\nfrom pydantic import Field, model_validator, StringConstraints\n" - module = cst.parse_module(source) - transformer = KubernetesCRDInjectionFunctor( - crd_name="MyNewCRD", - geometric_schema=[ - {"name": "x", "type": "list[Any]", "description": "test x"}, - {"name": "z", "type": "Annotated[str, StringConstraints()]", "description": "test z"}, - ], - action_space_id="urn:x", - ) - new_module = module.visit(transformer) - code = new_module.code - - # Should not add redundant imports - assert code.count("from typing import Any, Annotated, Self") == 1 - assert code.count("from pydantic import Field, model_validator, StringConstraints") == 1 - assert code.count("class MyNewCRD") == 1 diff --git a/tests/ast/test_node_scaffold.py b/tests/ast/test_node_scaffold.py deleted file mode 100644 index b98e31b..0000000 --- a/tests/ast/test_node_scaffold.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: [https://github.com/CoReason-AI/coreason_meta_engineering](https://github.com/CoReason-AI/coreason_meta_engineering) -import libcst as cst - -from coreason_meta_engineering.ast.node_scaffold import EpistemicNodeInjectionFunctor - - -def test_agent_inject_transformer_basic() -> None: - source = "class ExistingAgent:\n pass\n" - module = cst.parse_module(source) - transformer = EpistemicNodeInjectionFunctor( - node_name="MyNewAgent", - cognitive_boundary_directive="This is a test agent.", - action_space_id="urn:coreason:actionspace:node:agent:v1", - base_class="CoreasonBaseAgent", - ) - new_module = module.visit(transformer) - code = new_module.code - print("CODE OUTPUT:") - print(code) - - assert "class MyNewAgent(CoreasonBaseAgent):" in code - assert '__action_space_urn__ = "urn:coreason:actionspace:node:agent:v1"' in code - assert 'system_prompt: str = """This is a test agent."""' in code - assert "authorized_tools: list[Any] = []" in code - assert "from typing import Any, Self" in code - assert "from pydantic import model_validator" in code - assert "def _enforce_canonical_sort(self) -> Self:" in code - assert 'object.__setattr__(self, "authorized_tools", sorted(self.authorized_tools))' in code - assert '@model_validator(mode = "after")' in code - assert "MCP ROUTING TRIGGERS: urn:coreason:actionspace:node:agent:v1" in code - assert "MyNewAgent.model_rebuild()" in code - - -def test_agent_inject_transformer_idempotency() -> None: - source = "class MyNewAgent:\n pass\n" - module = cst.parse_module(source) - transformer = EpistemicNodeInjectionFunctor( - node_name="MyNewAgent", - cognitive_boundary_directive="Test", - action_space_id="urn:val", - ) - new_module = module.visit(transformer) - code = new_module.code - - assert code.count("class MyNewAgent") == 1 - assert "system_prompt" not in code - - -def test_agent_inject_existing_import() -> None: - source = "from typing import Any, Self\nfrom pydantic import model_validator\n" - module = cst.parse_module(source) - transformer = EpistemicNodeInjectionFunctor( - node_name="MyNewAgent", - cognitive_boundary_directive="Test", - action_space_id="urn:val", - ) - new_module = module.visit(transformer) - code = new_module.code - - assert code.count("from typing import Any, Self") == 1 - assert code.count("from pydantic import model_validator") == 1 diff --git a/tests/ast/test_node_scaffold_headers.py b/tests/ast/test_node_scaffold_headers.py deleted file mode 100644 index a5aa6fa..0000000 --- a/tests/ast/test_node_scaffold_headers.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -from coreason_meta_engineering.ast.node_scaffold import ( - apply_prosperity_headers, - strip_existing_headers_and_apply_proprietary, -) - - -def test_strip_existing_headers() -> None: - code = "# Copyright (c) 2025 CoReason\n# Prosperity License\n# Some other comment\nprint('hello')" - result = strip_existing_headers_and_apply_proprietary(code, "CoReason, Inc", "HASH123") - assert "HASH123" in result - assert "CoReason, Inc" in result - assert "# Some other comment" in result - assert "print('hello')" in result - - -def test_apply_prosperity_headers_idempotent() -> None: - code = "# Copyright (c) 2026 CoReason, Inc.. All Rights Reserved\n# Prosperity Public License 3.0\nprint('hello')" - result = apply_prosperity_headers(code) - assert result == code diff --git a/tests/ast/test_state_reconciliation.py b/tests/ast/test_state_reconciliation.py deleted file mode 100644 index 7e00c55..0000000 --- a/tests/ast/test_state_reconciliation.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at https://prosperitylicense.com/versions/3.0.0 -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: https://github.com/CoReason-AI/coreason_meta_engineering - -import libcst as cst - -from coreason_meta_engineering.ast.state_reconciliation import StateReconciliationFunctor - - -def test_state_reconciliation_existing_class() -> None: - source_code = ''' -from typing import Any -from pydantic import Field - -class MyExistingState(CoreasonBaseState): - """ - AGENT INSTRUCTION: This is the original docstring. - MCP ROUTING TRIGGERS: urn:coreason:actionspace:test:v1 - """ - __action_space_urn__ = "urn:coreason:actionspace:test:v1" - - old_field: str = Field(description="Old field") - -MyExistingState.model_rebuild() -''' - module = cst.parse_module(source_code) - geometric_schema = [ - {"name": "new_field", "type": "int", "description": "A shiny new integer field"}, - {"name": "items", "type": "list[str]", "description": "List of strings"}, - ] - - transformer = StateReconciliationFunctor( - state_name="MyExistingState", - geometric_schema=geometric_schema, - action_space_id="urn:coreason:actionspace:test:v1", - ) - - new_module = module.visit(transformer) - new_code = new_module.code - - assert "class MyExistingState(CoreasonBaseState):" in new_code - assert "AGENT INSTRUCTION: This is the original docstring." in new_code - assert "old_field" not in new_code - assert 'new_field: int = Field(description="A shiny new integer field")' in new_code - assert 'items: list[str] = Field(description="List of strings")' in new_code - assert "def _enforce_canonical_sort(self) -> Self:" in new_code - assert "from typing import Self" in new_code - assert "from pydantic import model_validator" in new_code - - -def test_state_reconciliation_class_not_found() -> None: - source_code = """ -# Just some empty file -""" - module = cst.parse_module(source_code) - geometric_schema = [{"name": "new_field", "type": "int", "description": "A shiny new integer field"}] - - transformer = StateReconciliationFunctor( - state_name="MyNewState", geometric_schema=geometric_schema, action_space_id="urn:coreason:actionspace:test:v2" - ) - - new_module = module.visit(transformer) - new_code = new_module.code - - # Should fallback to scaffolding - assert "class MyNewState(CoreasonBaseState):" in new_code - assert '__action_space_urn__ = "urn:coreason:actionspace:test:v2"' in new_code - assert "MyNewState.model_rebuild()" in new_code - - -def test_state_reconciliation_missing_docstring_and_urn() -> None: - source_code = """ -class MyExistingState: - old_field: str -""" - module = cst.parse_module(source_code) - geometric_schema = [ - {"name": "new_field", "type": "Any", "description": "Any field"}, - { - "name": "annotated_field", - "type": "Annotated[str, StringConstraints(min_length=1)]", - "description": "A field", - }, - ] - - transformer = StateReconciliationFunctor( - state_name="MyExistingState", - geometric_schema=geometric_schema, - action_space_id="urn:coreason:actionspace:test:v3", - ) - - new_module = module.visit(transformer) - new_code = new_module.code - - assert "AGENT INSTRUCTION:" in new_code - assert '__action_space_urn__ = "urn:coreason:actionspace:test:v3"' in new_code - assert "Any" in new_code - assert "Annotated" in new_code - assert "StringConstraints" in new_code - - -def test_state_reconciliation_existing_imports() -> None: - source_code = """ -from typing import Any, Annotated, Self -from pydantic import Field, model_validator, StringConstraints - -class MyState(CoreasonBaseState): - pass -""" - module = cst.parse_module(source_code) - geometric_schema = [ - {"name": "items", "type": "list[str]", "description": "list"}, - {"name": "any_f", "type": "Any", "description": "any"}, - {"name": "ann_f", "type": "Annotated[str, StringConstraints()]", "description": "ann"}, - ] - - transformer = StateReconciliationFunctor( - state_name="MyState", - geometric_schema=geometric_schema, - action_space_id="urn:coreason:actionspace:test:v4", - ) - - new_module = module.visit(transformer) - new_code = new_module.code - assert new_code.count("Any") == 2 - assert new_code.count("Field") == 4 diff --git a/tests/ast/test_state_scaffold.py b/tests/ast/test_state_scaffold.py deleted file mode 100644 index 8b946c9..0000000 --- a/tests/ast/test_state_scaffold.py +++ /dev/null @@ -1,221 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -# -# This software is proprietary and dual-licensed. -# Licensed under the Prosperity Public License 3.0 (the "License"). -# A copy of the license is available at [https://prosperitylicense.com/versions/3.0.0](https://prosperitylicense.com/versions/3.0.0) -# For details, see the LICENSE file. -# Commercial use beyond a 30-day trial requires a separate license. -# -# Source Code: [https://github.com/CoReason-AI/coreason_meta_engineering](https://github.com/CoReason-AI/coreason_meta_engineering) -from typing import Any - -import libcst as cst - -from coreason_meta_engineering.ast.state_scaffold import StateInjectionFunctor - -TEST_URN = "urn:coreason:actionspace:solver:test:v1" - - -def test_class_inject_basic_and_docstring() -> None: - code = "import pydantic\n" - module = cst.parse_module(code) - fields = [{"name": "basic_field", "type": "int", "description": "basic int field"}] - transformer = StateInjectionFunctor("NewState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - # Verify class inheritance - assert "class NewState(CoreasonBaseState):" in modified_code - - # Verify 4-part docstring - assert "AGENT INSTRUCTION: [Define topological boundary]" in modified_code - assert "CAUSAL AFFORDANCE: [Define graph mutation or tool execution]" in modified_code - assert "EPISTEMIC BOUNDS: [Define mathematical/physical limits]" in modified_code - - # Verify URN Discovery Trigger in docstring - assert f"MCP ROUTING TRIGGERS: {TEST_URN}" in modified_code - - # Verify URN class attribute - assert f'__action_space_urn__ = "{TEST_URN}"' in modified_code - - # Verify field - assert 'basic_field: int = Field(description="basic int field")' in modified_code - - # Verify model_rebuild logic for empty file - assert "NewState.model_rebuild()" in modified_code - - -def test_class_inject_list_validator() -> None: - code = "import pydantic\n" - module = cst.parse_module(code) - fields = [ - {"name": "str_list", "type": "list[str]", "description": "a list of strings"}, - {"name": "int_field", "type": "int", "description": "just an int"}, - ] - transformer = StateInjectionFunctor("SortState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - # Verify lists field parsing - assert 'str_list: list[str] = Field(description="a list of strings")' in modified_code - assert 'int_field: int = Field(description="just an int")' in modified_code - - # Verify enforce_canonical_sort validator exists - assert '@model_validator(mode="after")' in modified_code - assert "def _enforce_canonical_sort(self) -> Self:" in modified_code - assert 'if getattr(self, "str_list", None) is not None:' in modified_code - assert 'object.__setattr__(self, "str_list", sorted(self.str_list))' in modified_code - - -def test_class_inject_optional_list_validator() -> None: - code = "import pydantic\n" - module = cst.parse_module(code) - fields = [ - {"name": "opt_list", "type": "list[str] | None", "description": "optional list"}, - {"name": "ann_list", "type": "Annotated[list[int], Field()]", "description": "annotated list"}, - ] - transformer = StateInjectionFunctor("SortOptState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - # Verify both fragile arrays were detected - assert 'if getattr(self, "opt_list", None) is not None:' in modified_code - assert 'if getattr(self, "ann_list", None) is not None:' in modified_code - - # Verify missing import insertion - assert "from typing import Self" in modified_code - - -def test_class_inject_self_import_exists() -> None: - code = "from typing import Self\nimport pydantic\n" - module = cst.parse_module(code) - fields = [{"name": "basic_field", "type": "list[int]", "description": "basic int field"}] - transformer = StateInjectionFunctor("NewState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - # Should only be imported once (already existing, should not be injected again) - assert modified_code.count("from typing import Self") == 1 - - -def test_class_inject_model_rebuild_at_end() -> None: - code = "class Existing(CoreasonBaseState):\n pass\nExisting.model_rebuild()\n" - module = cst.parse_module(code) - fields = [{"name": "f1", "type": "str", "description": "d1"}] - transformer = StateInjectionFunctor("InjectedClass", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - # Verify InjectedClass model_rebuild is placed after Existing.model_rebuild at the end of module - injected_idx = modified_code.find("InjectedClass.model_rebuild()") - existing_idx = modified_code.find("Existing.model_rebuild()") - - assert injected_idx != -1 - assert existing_idx != -1 - assert injected_idx > existing_idx - - # Verify the class is inserted before Existing model rebuild - class_idx = modified_code.find("class InjectedClass") - assert class_idx < injected_idx - - -def test_class_inject_auto_imports() -> None: - code = "import pydantic\n" - module = cst.parse_module(code) - fields: list[dict[str, Any]] = [ - { - "name": "basic", - "type": "Annotated[str, StringConstraints(max_length=20)] | None", - "description": "basic", - "optional": True, - }, - {"name": "metadata", "type": "dict[str, Any]", "description": "meta"}, - {"name": "a_list", "type": "list[int]", "description": "a list"}, - ] - transformer = StateInjectionFunctor("AutoImportState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - assert "from typing import Self, Any, Annotated" in modified_code - assert "from pydantic import Field, model_validator, StringConstraints" in modified_code - - -def test_class_inject_auto_imports_existing() -> None: - code = "from typing import Self, Any, Annotated\nfrom pydantic import Field, model_validator, StringConstraints\n" - module = cst.parse_module(code) - fields: list[dict[str, Any]] = [ - { - "name": "basic", - "type": "Annotated[str, StringConstraints(max_length=20)] | None", - "description": "basic", - "optional": True, - }, - {"name": "metadata", "type": "dict[str, Any]", "description": "meta"}, - {"name": "a_list", "type": "list[int]", "description": "a list"}, - ] - transformer = StateInjectionFunctor("AutoImportState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - assert modified_code.count("Any") == 2 - assert modified_code.count("Annotated") == 2 - assert modified_code.count("Self") == 2 - assert modified_code.count("Field") == 4 - assert modified_code.count("model_validator") == 2 - assert modified_code.count("StringConstraints") == 2 - - -def test_class_inject_no_list_no_self_or_model_validator() -> None: - code = "import pydantic\n" - module = cst.parse_module(code) - fields: list[dict[str, Any]] = [ - { - "name": "basic", - "type": "int", - "description": "basic", - }, - ] - transformer = StateInjectionFunctor("NoListState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - assert "Self" not in modified_code - assert "model_validator" not in modified_code - - -def test_class_inject_idempotency() -> None: - """The Idempotency Axiom: injecting the same class twice must not duplicate.""" - code = "class IdempotentState(CoreasonBaseState):\n pass\nIdempotentState.model_rebuild()\n" - module = cst.parse_module(code) - fields = [{"name": "f1", "type": "int", "description": "a field"}] - transformer = StateInjectionFunctor("IdempotentState", fields, action_space_id=TEST_URN) - modified = module.visit(transformer) - modified_code = modified.code - - # The class must not be duplicated - assert modified_code.count("class IdempotentState") == 1 - - # model_rebuild must not be duplicated - assert modified_code.count("IdempotentState.model_rebuild()") == 1 - - # The code should be returned unchanged - assert modified_code == code - - -def test_class_inject_custom_base() -> None: - """The Strict Inheritance Axiom: agents can scaffold classes inheriting from registered descendants.""" - code = "import pydantic\n" - module = cst.parse_module(code) - fields = [{"name": "cohort_id", "type": "str", "description": "the cohort identifier"}] - transformer = StateInjectionFunctor( - "CardiologyCohort", - fields, - action_space_id=TEST_URN, - base_class="ClinicalCohortBase", - ) - modified = module.visit(transformer) - modified_code = modified.code - - # Verify custom base class inheritance - assert "class CardiologyCohort(ClinicalCohortBase):" in modified_code - assert "CoreasonBaseState" not in modified_code diff --git a/tests/utils/test_ast_extraction.py b/tests/utils/test_ast_extraction.py deleted file mode 100644 index 35729fc..0000000 --- a/tests/utils/test_ast_extraction.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -from pathlib import Path - -from coreason_meta_engineering.utils.ast_extraction import extract_kinetic_skeleton - - -def test_extract_kinetic_skeleton(tmp_path: Path) -> None: - code = """ -\"\"\"Module docstring.\"\"\" -import os -from math import sqrt -def my_func(a, b: int) -> int: - \"\"\"Function docstring.\"\"\" - return a + b - -class MyClass: - \"\"\"Class docstring.\"\"\" - pass -""" - p = tmp_path / "test_extract.py" - p.write_text(code, encoding="utf-8") - - skeleton = extract_kinetic_skeleton(str(p)) - - assert skeleton["docstring"] == "Module docstring." - assert "os" in skeleton["imports"] - assert "math" in skeleton["imports"] - assert "def my_func(a, b) -> int:" in skeleton["signatures"] - - -def test_extract_kinetic_skeleton_no_docstring(tmp_path: Path) -> None: - code = "import sys\ndef foo(): pass" - p = tmp_path / "test_no_doc.py" - p.write_text(code, encoding="utf-8") - skeleton = extract_kinetic_skeleton(str(p)) - assert skeleton["docstring"] == "" - assert "sys" in skeleton["imports"] - - -def test_extract_kinetic_skeleton_deep_fallback(tmp_path: Path) -> None: - code = """ -def func_with_doc(): - \"\"\"Found func doc\"\"\" - pass - -class ClassWithDoc: - \"\"\"Found class doc\"\"\" - pass -""" - p = tmp_path / "test_deep.py" - p.write_text(code, encoding="utf-8") - - # Hits line 34 (func doc) - skeleton = extract_kinetic_skeleton(str(p)) - assert skeleton["docstring"] == "Found func doc" - - code_class = """ -class ClassWithDoc: - \"\"\"Found class doc\"\"\" - pass -""" - p2 = tmp_path / "test_deep_class.py" - p2.write_text(code_class, encoding="utf-8") - - # Hits line 38 (class doc) - skeleton2 = extract_kinetic_skeleton(str(p2)) - assert skeleton2["docstring"] == "Found class doc" diff --git a/tests/utils/test_congruence_judge.py b/tests/utils/test_congruence_judge.py deleted file mode 100644 index a2ebf82..0000000 --- a/tests/utils/test_congruence_judge.py +++ /dev/null @@ -1,121 +0,0 @@ -# Copyright (c) 2026 CoReason, Inc. -import json -import threading -from collections.abc import Generator -from http.server import BaseHTTPRequestHandler, HTTPServer -from typing import Any - -import pytest - -from coreason_meta_engineering.utils.congruence_judge import ( - CongruenceFaultError, - evaluate_congruence, -) - - -class MockLLMServer(HTTPServer): - response_override: dict[str, Any] - - -class MockLLMHandler(BaseHTTPRequestHandler): - def do_POST(self) -> None: - content_length = int(self.headers["Content-Length"]) - _post_data = self.rfile.read(content_length) - - # Default success response - response_data = { - "response": json.dumps( - { - "instruction_score": 0.95, - "composite_congruence": 0.95, - "reasoning": "Perfect match", - } - ) - } - - # Check for specific triggers in the path or just use global state - if hasattr(self.server, "response_override"): - response_data = self.server.response_override - - self.send_response(200) - self.send_header("Content-type", "application/json") - self.end_headers() - self.wfile.write(json.dumps(response_data).encode()) - - def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - # Silencing server logs - pass - - -@pytest.fixture -def llm_server() -> Generator[MockLLMServer]: - server = MockLLMServer(("localhost", 0), MockLLMHandler) - thread = threading.Thread(target=server.serve_forever) - thread.daemon = True - thread.start() - yield server - server.shutdown() - - -def test_evaluate_congruence_fallback(monkeypatch: pytest.MonkeyPatch) -> None: - # Use a non-existent local port to trigger fallback - monkeypatch.setenv("COREASON_LLM_API_URL", "http://localhost:1") - - manifest = {"urn": "urn:test"} - ast_skeleton = {"docstring": "test"} - - # Because of fallback, it will return a perfect score instead of crashing - res = evaluate_congruence(manifest, ast_skeleton) - assert res["composite_congruence"] == 1.0 - - -def test_evaluate_congruence_faults(llm_server: MockLLMServer, monkeypatch: pytest.MonkeyPatch) -> None: - # Test that the fault error is raised if scores are too low - url = f"http://localhost:{llm_server.server_port}/api/generate" - monkeypatch.setenv("COREASON_LLM_API_URL", url) - - llm_server.response_override = { - "response": json.dumps( - { - "instruction_score": 0.5, # Below 0.70 - "composite_congruence": 0.80, # Below 0.85 - "reasoning": "Poor match", - } - ) - } - - manifest = {"urn": "urn:test"} - ast_skeleton = {"docstring": "test"} - - with pytest.raises(CongruenceFaultError, match="Congruence Fault"): - evaluate_congruence(manifest, ast_skeleton) - - -def test_evaluate_congruence_individual_fault(llm_server: MockLLMServer, monkeypatch: pytest.MonkeyPatch) -> None: - url = f"http://localhost:{llm_server.server_port}/api/generate" - monkeypatch.setenv("COREASON_LLM_API_URL", url) - - llm_server.response_override = { - "response": json.dumps( - { - "instruction_score": 0.5, - "composite_congruence": 0.95, # Passes composite - "reasoning": "Instruction mismatch", - } - ) - } - - manifest = {"urn": "urn:test"} - ast_skeleton = {"docstring": "test"} - - with pytest.raises(CongruenceFaultError, match="instruction_score"): - evaluate_congruence(manifest, ast_skeleton) - - -def test_evaluate_congruence_invalid_url(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setenv("COREASON_LLM_API_URL", "ftp://invalid-url.local/api/generate") - manifest = {"urn": "urn:test"} - ast_skeleton = {"docstring": "test"} - - with pytest.raises(ValueError, match=r"Invalid URL scheme\. Only http/https are allowed\."): - evaluate_congruence(manifest, ast_skeleton) diff --git a/uv.lock b/uv.lock index c72628b..fa9935e 100644 --- a/uv.lock +++ b/uv.lock @@ -282,7 +282,7 @@ wheels = [ [[package]] name = "coreason-manifest" -version = "0.61.0" +version = "0.61.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "canonicaljson" }, @@ -292,9 +292,9 @@ dependencies = [ { name = "pycrdt" }, { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0e/9d/ef67a31ca1a8521e31814e5ae0907983b091ca80e636402f6c42a1ad823d/coreason_manifest-0.61.0.tar.gz", hash = "sha256:6f92cece2a9566d519396dd603de64ded36b3ea7c76d77da91be4eb490364e04", size = 857962, upload-time = "2026-05-14T02:51:31.771Z" } +sdist = { url = "https://files.pythonhosted.org/packages/02/d6/8f48736cbdb205a2a6d9ffccb8515c9e5ade777b57bf42958e1978625f33/coreason_manifest-0.61.1.tar.gz", hash = "sha256:62c0f5969db5bc9c484b058d2ccf7329b39250f54ebe70e17df2f3ef85881f43", size = 857962, upload-time = "2026-05-14T10:36:04.332Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/53/21/62f838f9d37c598acc55d0532b47c828e9c968b24ae890cdab87c818bdfc/coreason_manifest-0.61.0-py3-none-any.whl", hash = "sha256:9aacf2b880192a2364cfb5cbbadd4d79ecf179b4528bf6b2617ddc34c660c240", size = 199405, upload-time = "2026-05-14T02:51:30.595Z" }, + { url = "https://files.pythonhosted.org/packages/bc/7b/721539ba29000478083c6ff3a7274a72f0dc22976e21908f5760791c7f7a/coreason_manifest-0.61.1-py3-none-any.whl", hash = "sha256:6fbad7cef28a1534e2207da9c89ae6dd9d6f09efe74d5f678b1388da6ffa5f13", size = 199405, upload-time = "2026-05-14T10:36:03.144Z" }, ] [[package]] @@ -337,7 +337,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "coreason-manifest", specifier = ">=0.61.0" }, + { name = "coreason-manifest", specifier = ">=0.61.1" }, { name = "coreason-urn-authority", specifier = ">=0.11.1" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "libcst", specifier = ">=1.8.6" }, @@ -373,7 +373,7 @@ dev = [ [[package]] name = "coreason-urn-authority" -version = "0.11.1" +version = "0.11.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "coreason-manifest" }, @@ -384,9 +384,9 @@ dependencies = [ { name = "pydantic" }, { name = "pynacl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/47/ea/d3382450f4c87b04687d0aa7529fc4b19ec0c0606277fb7ac658e9edb5b1/coreason_urn_authority-0.11.1.tar.gz", hash = "sha256:40170763347cbd175be8da5eee234efe4324e834c446458d5aaa02fb248b60ed", size = 147481, upload-time = "2026-05-14T02:58:19.691Z" } +sdist = { url = "https://files.pythonhosted.org/packages/6b/20/0704d23b29d228f7a079dfc28ab4c41373765b7c969a45ff3dfed2b5b077/coreason_urn_authority-0.11.2.tar.gz", hash = "sha256:3de92a1e62e9f333368dcac1127a9e29d4d18520dabead3c7c081b214f991a4e", size = 147703, upload-time = "2026-05-14T14:28:56.072Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/73/ed/1d9f660bce8d60c054ee8e7b9823620870ec014c5463988c1ba593830d26/coreason_urn_authority-0.11.1-py3-none-any.whl", hash = "sha256:5bd8f4c223a2e98bfd69136927df9faa70851f28667f569ee588b03a8ad2a2ac", size = 15161, upload-time = "2026-05-14T02:58:18.535Z" }, + { url = "https://files.pythonhosted.org/packages/53/3f/310b4cca49610666b6f375918cfbb58b4eef3058ea4502659a7a6af82576/coreason_urn_authority-0.11.2-py3-none-any.whl", hash = "sha256:e625e217043443ac54a9843690f563be66eb0e3c26600c8ec0ff4ef4bf60ad77", size = 15163, upload-time = "2026-05-14T14:28:54.944Z" }, ] [[package]]