demo_basegen_utils.md · 38.6 KB

Codebase Snapshot: utils

Generated by: base_gen_agent_v1.1 on 2025-06-25 05:39:51 Source Directory (resolved): /home/luvai/MINDXbuilds/production/mindX/utils

Directory Structure

utils/
  __init__.py
  config.py
  logging_config.py
  logic_engine.py
  yaml_config_loader.py

File Contents

__init__.py


config.py

# mindx/utils/config.py (Corrected)
import os
import json
from pathlib import Path
from typing import Dict, Any, Optional

--- THE FIX IS HERE ---

The package is installed as 'python-dotenv', but imported as 'dotenv'.

from dotenv import load_dotenv

from .logging_config import get_logger from .yaml_config_loader import load_yaml_file # IMPORT THE NEW LOADER

--- Constants ---

PROJECT_ROOT = Path(__file__).resolve().parent.parent logger = get_logger(__name__)

class Config: """ Handles loading and accessing hierarchical configuration for the MindX system. It now supports loading from multiple JSON files, a models directory with YAML files, and environment variables. """ _instance = None

def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(Config, cls).__new__(cls) return cls._instance

def __init__(self, test_mode=False): if hasattr(self, '_initialized') and self._initialized and not test_mode: return

self.config_data: Dict[str, Any] = {} self._load_environment_variables() self._load_config_files() # ADDED: Load model capability configurations from the 'models' directory self._load_model_capability_configs()

self._initialized = True logger.info("Configuration loaded successfully.")

def _load_environment_variables(self): env_path = PROJECT_ROOT / '.env' if env_path.exists(): # Use the corrected import here load_dotenv(dotenv_path=env_path, override=True) logger.info(f"Loaded environment variables from {env_path}")

def _load_config_files(self): config_dir = PROJECT_ROOT / "data" / "config" if not config_dir.is_dir(): logger.warning(f"Configuration directory not found at {config_dir}, skipping JSON config load.") return

config_files = [f for f in config_dir.glob(".json")] for config_file in sorted(config_files): try: with config_file.open("r", encoding="utf-8") as f: data = json.load(f) self._deep_merge(self.config_data, data) logger.info(f"Loaded and merged config from {config_file.name}") except Exception as e: logger.error(f"Failed to load config file {config_file}: {e}")

def _load_model_capability_configs(self): """Loads all .yaml model capability files from the /models directory.""" models_dir = PROJECT_ROOT / "models" if not models_dir.is_dir(): logger.info("No 'models' directory found for YAML capabilities, skipping.") return

for yaml_file in models_dir.glob(".yaml"): provider_name = yaml_file.stem # e.g., 'gemini.yaml' -> 'gemini' model_data = load_yaml_file(yaml_file) if model_data: # Structure the data to be merged into the main config # Expected structure: llm.<provider>.models provider_config = { "llm": { provider_name: { "models": model_data } } } self._deep_merge(self.config_data, provider_config)

def _deep_merge(self, source, destination): """Recursively merges dictionary destination into source.""" for key, value in destination.items(): if isinstance(value, dict) and key in source and isinstance(source[key], dict): self._deep_merge(source[key], value) else: source[key] = value

def get(self, key: str, default: Any = None) -> Any: # Environment variables have the highest priority. # e.g., key 'llm.gemini.api_key' -> env var 'MINDX_LLM_GEMINI_API_KEY' env_var_key = "MINDX_" + key.upper().replace('.', '_') env_value = os.getenv(env_var_key) if env_value is not None: return env_value

# If not in env, check the loaded config data. keys = key.split('.') value = self.config_data try: for k in keys: value = value[k] # Handle placeholder substitution, e.g., "env:GEMINI_API_KEY" if isinstance(value, str) and value.lower().startswith("env:"): env_var = value[4:] return os.environ.get(env_var, default) return value except (KeyError, TypeError): return default

@classmethod def reset_instance(cls): cls._instance = None

logging_config.py

# mindx/utils/logging_config.py
import logging
import logging.handlers
import os
from pathlib import Path

Use a default project root if the one from config isn't available yet

try: from .config import PROJECT_ROOT except ImportError: PROJECT_ROOT = Path(__file__).resolve().parent.parent

--- Configuration ---

LOG_DIR = PROJECT_ROOT / "data" / "logs" LOG_FILENAME = "mindx_runtime.log" MAX_LOG_SIZE_BYTES = 10 1024 1024 # 10 MB BACKUP_COUNT = 5

_loggers = {} _is_configured = False

UPDATED: The function now accepts parameters for more flexible setup.

def setup_logging( log_level: str = "INFO", console: bool = True, log_file: bool = True ): """ Configures the root logger for the MindX application.

This function is idempotent and can be called multiple times without adverse effects.

Args: log_level (str): The minimum logging level (e.g., "DEBUG", "INFO", "WARNING"). console (bool): If True, logs will be output to the console. log_file (bool): If True, logs will be output to a rotating file. """ global _is_configured if _is_configured: return

# Convert string log level to logging constant level = getattr(logging, log_level.upper(), logging.INFO) # Get the root logger root_logger = logging.getLogger() root_logger.setLevel(level) # Clear existing handlers to prevent duplicate logs if root_logger.hasHandlers(): root_logger.handlers.clear()

# Define a standard formatter formatter = logging.Formatter( "[%(asctime)s] %(name)s - %(levelname)s - %(module)s.%(funcName)s:%(lineno)d - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" )

# Configure Console Handler if console: console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) root_logger.addHandler(console_handler)

# Configure File Handler if log_file: try: LOG_DIR.mkdir(parents=True, exist_ok=True) log_path = LOG_DIR / LOG_FILENAME # Use RotatingFileHandler to manage log file size file_handler = logging.handlers.RotatingFileHandler( log_path, maxBytes=MAX_LOG_SIZE_BYTES, backupCount=BACKUP_COUNT, encoding='utf-8' ) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) except Exception as e: # If file logging fails, log an error to the console (if available) # and continue without it. logging.basicConfig() # Basic config to ensure the next line prints logging.error(f"Failed to configure file logging at {LOG_DIR}: {e}", exc_info=True)

_is_configured = True # Use the root logger to announce configuration status root_logger.info(f"Logging configured. Root level: {log_level}. Console: {console}. File: {log_file}.")

def get_logger(name: str) -> logging.Logger: """ Retrieves a logger instance for a given module name.

If logging has not been configured yet, it will trigger the default setup. """ if not _is_configured: setup_logging() # Ensure default setup if not explicitly called

if name in _loggers: return _loggers[name] logger = logging.getLogger(name) _loggers[name] = logger return logger

logic_engine.py

# mindx/utils/logic_engine.py
"""
Enhanced Logic Engine for mindX Augmentic Intelligence 

Provides utilities for representing logical rules, evaluating conditions safely, performing basic forward-chaining inference, and conceptual Socratic questioning to enhance agent decision-making, especially when dealing with smaller LLMs or needing to validate choices. """ import logging import sys import ast import operator from typing import Dict, Any, Callable, List, Union, Set, Optional, Tuple import uuid import copy # For deepcopying context

from mindx.utils.logging_config import get_logger from mindx.core.belief_system import BeliefSystem, Belief, BeliefSource from mindx.llm.llm_factory import LLMHandler, create_llm_handler # For Socratic questioning from mindx.utils.config import Config # For configuring LLM for Socratic part

logger = get_logger(__name__)

--- Safe Expression Evaluation ---

ALLOWED_OPERATORS = { ast.And: operator.and_, ast.Or: operator.or_, ast.Not: operator.not_, ast.Eq: operator.eq, ast.NotEq: operator.ne, ast.Lt: operator.lt, ast.LtE: operator.le, ast.Gt: operator.gt, ast.GtE: operator.ge, ast.In: lambda x, y: x in y, ast.NotIn: lambda x, y: x not in y, ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, ast.Mod: operator.mod, ast.Pow: operator.pow, ast.USub: operator.neg, ast.UAdd: operator.pos, ast.FloorDiv: operator.floordiv, } ALLOWED_NODE_TYPES_BASE = ( # Base types always allowed ast.Expression, ast.Name, ast.Constant, ast.Attribute, ast.BoolOp, ast.UnaryOp, ast.Compare, ast.BinOp, ast.Call, ast.IfExp, ast.List, ast.Tuple, ast.Dict, ast.Set, ast.Slice, ast.Subscript, ast.Load, ) ALLOWED_NODE_TYPES = ALLOWED_NODE_TYPES_BASE if sys.version_info < (3,8): # pragma: no cover ALLOWED_NODE_TYPES += (ast.NameConstant, ast.Num, ast.Str, ast.Bytes, ast.Ellipsis)

class SafeExpressionEvaluator: # pragma: no cover """Safely evaluates Python-like expressions by parsing to AST and allowing only whitelisted operations.""" def __init__(self, context_vars: Optional[Dict[str, Any]] = None, allowed_functions: Optional[Dict[str, Callable]] = None): self.context_vars = context_vars or {} self.allowed_functions = allowed_functions or {} self.allowed_builtins = { "len": len, "str": str, "int": int, "float": float, "bool": bool, "all": all, "any": any, "round": round, "abs": abs, "min": min, "max": max, "sum": sum, "sorted": sorted, "list": list, "dict": dict, "set": set, "tuple": tuple, "isinstance": isinstance, "type": type, # type() can be useful for checking "getattr": getattr, # Use getattr carefully, ensure objects in context are safe "hasattr": hasattr, }

def _eval_node(self, node: ast.AST) -> Any: node_type = type(node)

if node_type not in ALLOWED_NODE_TYPES: # pragma: no cover raise TypeError(f"SafeEvaluator: Disallowed AST node type: {node_type.__name__}")

if isinstance(node, ast.Constant): return node.value elif sys.version_info < (3,8) and isinstance(node, (ast.Num, ast.Str)): return node.n if isinstance(node, ast.Num) else node.s # type: ignore elif sys.version_info < (3,8) and isinstance(node, ast.NameConstant): return node.value # type: ignore elif isinstance(node, ast.Name): if node.id in self.context_vars: return self.context_vars[node.id] if node.id in self.allowed_builtins: return self.allowed_builtins[node.id] raise NameError(f"SafeEvaluator: Name '{node.id}' is not defined in allowed context or builtins.") elif isinstance(node, ast.UnaryOp): op_func = ALLOWED_OPERATORS.get(type(node.op)) if not op_func: raise TypeError(f"SafeEvaluator: Disallowed unary op: {type(node.op).__name__}") return op_func(self._eval_node(node.operand))

elif isinstance(node, ast.BoolOp): op_func = ALLOWED_OPERATORS[type(node.op)] if isinstance(node.op, ast.And): for value_node in node.values: if not self._eval_node(value_node): return False return True elif isinstance(node.op, ast.Or): for value_node in node.values: if self._eval_node(value_node): return True return False elif isinstance(node, ast.BinOp): op_func = ALLOWED_OPERATORS.get(type(node.op)) if not op_func: raise TypeError(f"SafeEvaluator: Disallowed binary op: {type(node.op).__name__}") return op_func(self._eval_node(node.left), self._eval_node(node.right))

elif isinstance(node, ast.Compare): left_val = self._eval_node(node.left) for i, op_node in enumerate(node.ops): op_type = type(op_node); op_func = ALLOWED_OPERATORS.get(op_type) if not op_func: raise TypeError(f"SafeEvaluator: Disallowed comparison op: {op_type.__name__}") right_val = self._eval_node(node.comparators[i]) if not op_func(left_val, right_val): return False left_val = right_val return True elif isinstance(node, ast.Call): func_to_call: Callable if isinstance(node.func, ast.Name): # Direct function call e.g. my_func() or len() func_name = node.func.id if func_name in self.allowed_functions: func_to_call = self.allowed_functions[func_name] elif func_name in self.allowed_builtins: func_to_call = self.allowed_builtins[func_name] # type: ignore else: raise NameError(f"SafeEvaluator: Function call to '{func_name}' not allowed.") elif isinstance(node.func, ast.Attribute): # Method call e.g. my_str.startswith('p') obj_val = self._eval_node(node.func.value) method_name = node.func.attr # Allow common, safe, side-effect-free methods on basic types safe_methods_on_types = { str: {"startswith", "endswith", "lower", "upper", "strip", "isdigit", "isalpha", "isalnum", "isspace", "find", "count", "replace", "split", "join", "format", "capitalize", "title"}, list: {"count", "index"}, # Read-only list methods dict: {"get", "keys", "values", "items"}, set: {"issubset", "issuperset", "union", "intersection", "difference", "isdisjoint"}, # int, float, bool don't have many callable methods that make sense here } obj_type = type(obj_val) if obj_type in safe_methods_on_types and method_name in safe_methods_on_types[obj_type]: func_to_call = getattr(obj_val, method_name) elif hasattr(obj_val, method_name) and callable(getattr(obj_val, method_name)) and not method_name.startswith("_"): # pragma: no cover # More general but riskier logger.warning(f"SafeEvaluator: Allowing call to method '{method_name}' on object of type '{obj_type.__name__}'. Ensure this is safe.") func_to_call = getattr(obj_val, method_name) else: raise NameError(f"SafeEvaluator: Method call '{method_name}' on type '{obj_type.__name__}' not allowed or doesn't exist.") else: raise TypeError(f"SafeEvaluator: Unsupported function call type: {type(node.func).__name__}")

args_val = [self._eval_node(arg) for arg in node.args] kwargs_val = {kw.arg: self._eval_node(kw.value) for kw in node.keywords if kw.arg is not None} return func_to_call(*args_val, **kwargs_val) elif isinstance(node, ast.IfExp): return self._eval_node(node.body) if self._eval_node(node.test) else self._eval_node(node.orelse) elif isinstance(node, ast.List): return [self._eval_node(e) for e in node.elts] elif isinstance(node, ast.Tuple): return tuple(self._eval_node(e) for e in node.elts) elif isinstance(node, ast.Set): return {self._eval_node(e) for e in node.elts} elif isinstance(node, ast.Dict): return { (self._eval_node(k) if k is not None else None) : self._eval_node(v) for k,v in zip(node.keys, node.values)} elif isinstance(node, ast.Subscript): return self._eval_node(node.value)[self._eval_node(node.slice)] elif isinstance(node, ast.Slice): return slice(self._eval_node(node.lower) if node.lower else None, self._eval_node(node.upper) if node.upper else None, self._eval_node(node.step) if node.step else None) elif isinstance(node, ast.Attribute): # obj.attr obj = self._eval_node(node.value) attr_name = node.attr # More controlled attribute access: only allow if obj is a dict and attr_name is a key, # or if obj is a simple data object and attr_name is not private. if isinstance(obj, dict): # Allow dict key access via attribute if it's a valid key if attr_name in obj: return obj[attr_name] else: raise AttributeError(f"SafeEvaluator: Dictionary object does not have key '{attr_name}'.") # For other objects, only allow non-private attributes if not attr_name.startswith('_'): if hasattr(obj, attr_name): return getattr(obj, attr_name) raise AttributeError(f"SafeEvaluator: Accessing attribute '{attr_name}' on object of type '{type(obj).__name__}' is restricted or attribute does not exist.") else: raise TypeError(f"SafeEvaluator: Unsupported AST node: {type(node).__name__}") # pragma: no cover

def evaluate(self, expression_string: str) -> Any: if not isinstance(expression_string, str) or not expression_string.strip(): logger.debug("SafeEvaluator: Empty/non-string expression -> False.") return False try: ast_tree = ast.parse(expression_string, mode='eval') if not isinstance(ast_tree, ast.Expression) or not hasattr(ast_tree, 'body'): # pragma: no cover raise ValueError("Expression must be a single valid Python expression.") return self._eval_node(ast_tree.body) except SyntaxError as e: logger.error(f"SafeEvaluator: Syntax error parsing '{expression_string}': {e}"); raise except (TypeError, NameError, AttributeError, IndexError, KeyError) as e_eval: # pragma: no cover logger.warning(f"SafeEvaluator: Error evaluating '{expression_string}': {type(e_eval).__name__} - {e_eval}. Context (keys): {list(self.context_vars.keys())}. Allowed funcs: {list(self.allowed_functions.keys())}") raise # Re-raise evaluation errors for LogicalRule to handle except Exception as e_unexp: # pragma: no cover logger.error(f"SafeEvaluator: Unexpected error for '{expression_string}': {e_unexp}", exc_info=True) raise RuntimeError(f"Unexpected evaluation error: {e_unexp}") from e_unexp

class LogicalRule: # pragma: no cover """Represents a logical rule (IF condition THEN effects) or a consistency constraint.""" def __init__(self, rule_id: str, condition_expr: str, description: Optional[str] = None, effects: Optional[List[Dict[str, Any]]] = None, is_constraint: bool = False, priority: int = 0): # Higher number = higher priority for firing self.id = rule_id; self.condition_expr = condition_expr; self.description = description self.effects = effects or []; self.is_constraint = is_constraint; self.priority = priority try: ast.parse(self.condition_expr, mode='eval') except SyntaxError as e: raise ValueError(f"Invalid syntax in rule '{rule_id}' condition: '{self.condition_expr}'. Error: {e}") from e

def evaluate_condition(self, context_vars: Dict[str, Any], allowed_functions: Optional[Dict[str, Callable]] = None) -> bool: evaluator = SafeExpressionEvaluator(context_vars=context_vars, allowed_functions=allowed_functions) try: result = evaluator.evaluate(self.condition_expr) if not isinstance(result, bool): logger.warning(f"Rule '{self.id}' non-boolean result: {type(result).__name__} '{result}'. Treating as False."); return False return result except Exception: return False # Default to False on any evaluation error, already logged by SafeEvaluator def __repr__(self): return f"<LogicalRule id='{self.id}' prio={self.priority} condition='{self.condition_expr}' constraint={self.is_constraint}>" def to_dict(self) -> Dict[str, Any]: return {"id": self.id, "condition_expr": self.condition_expr, "description": self.description, "effects": self.effects, "is_constraint": self.is_constraint, "priority": self.priority} @classmethod def from_dict(cls, data: Dict[str,Any]) -> 'LogicalRule': return cls(**data)

class LogicEngine: # pragma: no cover """ Provides foundational logic capabilities for MindX agents. Manages logical rules, default assumptions, performs forward chaining inference, checks consistency, and can use an LLM for Socratic questioning. """ def __init__(self, belief_system: BeliefSystem, llm_handler_for_socratic: Optional[LLMHandler] = None, # Specifically for Socratic part agent_id_namespace: str = "mindx_default_logic_engine"): self.belief_system = belief_system self.llm_handler_socratic = llm_handler_for_socratic self.agent_id_ns = agent_id_namespace # For namespacing beliefs about rules/defaults if persisted self.log_prefix = f"LogicEngine ({self.agent_id_ns}):" self.config = Config() self.rules: Dict[str, LogicalRule] = {} self.default_assumptions: Dict[str, Dict[str, Any]] = {} # key -> {"value": val, "confidence": float, "overridden_by_belief_key": Optional[str]} self.allowed_eval_functions: Dict[str, Callable] = { # These allow rules to query the provided context_vars during evaluation, # NOT to directly query the global BeliefSystem to maintain purity of evaluation. "context_has_key": lambda key, ctx: key in ctx, "context_get_value": lambda key, default_val, ctx: ctx.get(key, default_val), # Example for an agent to pass its own safe functions: # "is_weather_good": lambda weather_belief_val: weather_belief_val in ["sunny", "clear"] } logger.info(f"{self.log_prefix} Initialized. LLM for Socratic: {'Yes' if self.llm_handler_socratic else 'No'}")

async def load_rules_from_belief_system(self, rule_prefix: Optional[str] = None): """Loads rules stored as Belief objects into the engine.""" rule_belief_prefix = rule_prefix or f"logic_engine.{self.agent_id_ns}.rules." logger.info(f"{self.log_prefix} Loading rules from BeliefSystem with prefix: {rule_belief_prefix}") rule_beliefs = await self.belief_system.query_beliefs(partial_key=rule_belief_prefix) count = 0 for key, belief in rule_beliefs: try: if isinstance(belief.value, dict): rule = LogicalRule.from_dict(belief.value) self.rules[rule.id] = rule count += 1 else: logger.warning(f"{self.log_prefix} Belief {key} value is not a dict, cannot load as rule.") except Exception as e: logger.error(f"{self.log_prefix} Failed to load rule from belief {key}: {e}") logger.info(f"{self.log_prefix} Loaded {count} rules from BeliefSystem.")

async def add_rule(self, rule: LogicalRule, persist: bool = False): if not isinstance(rule, LogicalRule): raise TypeError("Rule must be LogicalRule instance.") self.rules[rule.id] = rule logger.info(f"{self.log_prefix} Added rule: {rule.id} - '{rule.condition_expr}'") if persist: await self.belief_system.add_belief(f"logic_engine.{self.agent_id_ns}.rules.{rule.id}", rule.to_dict(), 1.0, BeliefSource.EXTERNAL_INPUT)

async def add_default_assumption(self, key: str, value: Any, confidence: float = 0.5, persist: bool = False): self.default_assumptions[key] = {"value": value, "confidence": confidence, "overridden_by_belief_key": None} logger.info(f"{self.log_prefix} Added default assumption: {key} = {str(value)[:50]} (Conf: {confidence})") if persist: await self.belief_system.add_belief(f"logic_engine.{self.agent_id_ns}.defaults.{key}", self.default_assumptions[key], 1.0, BeliefSource.EXTERNAL_INPUT)

async def _get_evaluation_context(self, agent_belief_prefix: str, additional_context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Prepares context for SafeExpressionEvaluator. Merges agent's relevant beliefs from shared BeliefSystem with active defaults and additional context. """ eval_context: Dict[str, Any] = {} # 1. Fetch relevant beliefs from the shared BeliefSystem for the specific agent/domain agent_beliefs = await self.belief_system.query_beliefs(partial_key=agent_belief_prefix) for key, belief in agent_beliefs: # Use simplified key (suffix after prefix) in eval_context for easier rule writing simple_key = key.replace(agent_belief_prefix, "").lstrip(".") # Higher confidence beliefs from BeliefSystem override lower confidence ones if keys clash (rare with good prefixing) if simple_key not in eval_context or belief.confidence > (eval_context.get(f"{simple_key}_confidence", -1.0)): eval_context[simple_key] = belief.value eval_context[f"{simple_key}_confidence"] = belief.confidence # Store confidence too if needed by rules

# 2. Overlay active default assumptions # A belief from BeliefSystem with the same key as a default assumption overrides the default # if its confidence is higher than the default's confidence. for def_key, def_data in self.default_assumptions.items(): corresponding_belief_val = eval_context.get(def_key) corresponding_belief_conf = eval_context.get(f"{def_key}_confidence", -1.0)

if corresponding_belief_val is not None and corresponding_belief_conf >= def_data["confidence"]: def_data["overridden_by_belief_key"] = f"{agent_belief_prefix.rstrip('.')}.{def_key}" # Mark as overridden def_data["is_active"] = False # The value from BeliefSystem (already in eval_context) takes precedence. else: # Default is not overridden by a stronger belief def_data["is_active"] = True def_data["overridden_by_belief_key"] = None if def_key not in eval_context: # Add default if no belief existed eval_context[def_key] = def_data["value"] eval_context[f"{def_key}_confidence"] = def_data["confidence"] # 3. Overlay any additional explicit context (highest precedence for these vars) if additional_context: eval_context.update(additional_context) logger.debug(f"{self.log_prefix} Prepared evaluation context with {len(eval_context)} vars (keys snippet: {list(eval_context.keys())[:5]})") return eval_context async def forward_chain(self, agent_belief_prefix_for_context: str, max_iterations: int = 10, rule_priority_threshold: int = 0 # Minimum priority for rules to be considered ) -> Tuple[Dict[str, Any], List[Dict[str,Any]]]: """ Performs forward chaining. Reads from agent_belief_prefix, derives new facts. Returns a dictionary of ALL facts (initial + derived) in the snapshot, and a list of derived effect dicts. The calling agent is responsible for adding these derived facts to the shared BeliefSystem. """ logger.info(f"{self.log_prefix} Starting forward chaining. Context prefix: {agent_belief_prefix_for_context}") # Initial context for the first iteration current_eval_context = await self._get_evaluation_context(agent_belief_prefix_for_context) all_derived_effects_this_run: List[Dict[str,Any]] = [] # Sort rules by priority (descending) for ordered application sorted_rules = sorted([r for r in self.rules.values() if not r.is_constraint and r.priority >= rule_priority_threshold], key=lambda r: -r.priority)

for i in range(max_iterations): made_change_in_iteration = False logger.debug(f"{self.log_prefix} Inference iteration {i+1}/{max_iterations}") for rule in sorted_rules: # If all effects of this rule are already in current_eval_context with same values, skip. # This is a simple check; a more advanced one would compare values if already present. effects_already_met = True if rule.effects: for effect in rule.effects: key_to_check = effect.get("set_belief") or effect.get("update_belief") # This check can be complex if effect value is dynamic. # For now, if key exists, assume effect might re-evaluate it. # A simpler check: if all effect keys are already in current_eval_context, maybe skip. # For robust chaining, we re-evaluate rules if any part of context changed. pass # Let rules fire if their conditions are met based on current context.

if rule.evaluate_condition(context_data=current_eval_context, allowed_functions=self.allowed_eval_functions): logger.debug(f"{self.log_prefix} Rule '{rule.id}' (Prio:{rule.priority}) fired. Condition: '{rule.condition_expr}'") for effect_template in rule.effects: effect = copy.deepcopy(effect_template) # Work on a copy # Resolve any placeholders in effect values from current_eval_context if isinstance(effect.get("value"), str) and effect["value"].startswith("$"): placeholder_key = effect["value"][1:] # Remove '$' effect["value"] = current_eval_context.get(placeholder_key, None) # Resolve from context if effect["value"] is None: logger.warning(f"{self.log_prefix} Rule '{rule.id}' effect placeholder '{placeholder_key}' not found in context. Using None.")

if "set_belief" in effect or "update_belief" in effect: key = effect["set_belief"] if "set_belief" in effect else effect["update_belief"] value = effect["value"] if key not in current_eval_context or current_eval_context[key] != value: logger.info(f"{self.log_prefix} Rule '{rule.id}' derived: {key} = {str(value)[:50]}") current_eval_context[key] = value # Update working snapshot for next rules in this iteration # Store confidence if effect specifies it, otherwise default current_eval_context[f"{key}_confidence"] = effect.get("confidence", 0.8) derived_effect_log = { "key": key, "value": value, "confidence": effect.get("confidence", 0.8), "source": BeliefSource.INFERRED, # Mark as inferred by logic engine "metadata": {"derived_by_rule_id": rule.id, "engine_agent_id": self.agent_id, "iteration": i+1} } all_derived_effects_this_run.append(derived_effect_log) made_change_in_iteration = True if not made_change_in_iteration: logger.info(f"{self.log_prefix} Forward chaining stabilized in {i+1} iterations.") break else: # pragma: no cover # Only if loop completes all max_iterations logger.warning(f"{self.log_prefix} Forward chaining reached max {max_iterations} iterations.") return current_eval_context, all_derived_effects_this_run

async def check_consistency(self, agent_belief_prefix_for_context: str, constraint_rule_ids: Optional[List[str]] = None) -> List[LogicalRule]: # pragma: no cover """Checks beliefs (from prefix) against constraint rules. Returns violated rules.""" logger.info(f"{self.log_prefix} Checking consistency. Context prefix: {agent_belief_prefix_for_context}") violated_constraints: List[LogicalRule] = [] eval_context = await self._get_evaluation_context(agent_belief_prefix_for_context)

rules_to_check_list: List[LogicalRule] = [] if constraint_rule_ids: for r_id in constraint_rule_ids: rule = self.rules.get(r_id) if rule and rule.is_constraint: rules_to_check_list.append(rule) elif rule: logger.warning(f"{self.log_prefix} Rule '{r_id}' not a constraint, skipped.") else: logger.warning(f"{self.log_prefix} Rule '{r_id}' not found for consistency.") else: rules_to_check_list = [r for r in self.rules.values() if r.is_constraint]

if not rules_to_check_list: logger.info(f"{self.log_prefix} No constraint rules to check."); return [] for rule in rules_to_check_list: if not rule.evaluate_condition(eval_context, self.allowed_eval_functions): logger.warning(f"{self.log_prefix} VIOLATION: Constraint '{rule.id}' FALSE. Cond: '{rule.condition_expr}'") violated_constraints.append(rule) if not violated_constraints: logger.info(f"{self.log_prefix} Beliefs consistent with checked constraints.") return violated_constraints

async def generate_socratic_questions(self, topic_or_goal: str, agent_belief_prefix_for_context: str, num_questions: int = 3) -> List[str]: # pragma: no cover """(LLM-driven) Generates Socratic questions based on topic and current beliefs from prefix.""" if not self.llm_handler_socratic: return ["Error: LLMHandler for Socratic questioning not configured."] logger.info(f"{self.log_prefix} Generating Socratic Qs for: '{topic_or_goal}' using prefix '{agent_belief_prefix_for_context}'") relevant_beliefs = await self.belief_system.query_beliefs(partial_key=agent_belief_prefix_for_context, min_confidence=0.5) belief_summary = "\n".join([f"- {key.replace(agent_belief_prefix_for_context,'').lstrip('.')}: {str(belief.value)[:80]}" for key, belief in relevant_beliefs[:10]]) # Max 10 beliefs for prompt

prompt = ( f"You are the Socratic assistant for mindX Augmentic Intelligence. Current Topic/Goal: '{topic_or_goal}'.\n" f"Relevant Beliefs (summary):\n{belief_summary if belief_summary else 'No specific beliefs provided for this topic.'}\n\n" f"Generate {num_questions} insightful Socratic questions that challenge assumptions, explore alternatives, or probe for deeper understanding of the topic/goal and related beliefs. " f"Aim to stimulate critical thinking and identify potential flaws or unstated premises.\n" f"Respond ONLY with a JSON list of question strings. Example: [\"What if our primary assumption about X is incorrect?\", \"Is there an alternative approach to Y that we haven't considered?\"]" ) try: max_q_tokens = self.config.get(f"logic_engine.{self.agent_id_ns}.socratic.max_tokens", 120 num_questions) temp_q = self.config.get(f"logic_engine.{self.agent_id_ns}.socratic.temperature", 0.65) response_str = await self.llm_handler_socratic.generate_text(prompt, max_tokens=max_q_tokens, temperature=temp_q, json_mode=True) if not response_str or response_str.startswith("Error:"): raise ValueError(f"LLM Socratic Q gen error: {response_str}") questions: List[str] = [] # ... (Robust JSON parsing for list of strings) try: questions = json.loads(response_str) except json.JSONDecodeError: match = re.search(r"\[\s\"[\s\S]?\"\s\]", response_str, re.DOTALL); if match: questions = json.loads(match.group(0)) else: raise ValueError("LLM Socratic Qs not valid JSON list.") if not isinstance(questions, list) or not all(isinstance(q, str) for q in questions): raise ValueError("LLM Socratic Qs not list of strings.") logger.info(f"{self.log_prefix} Generated {len(questions)} Socratic questions."); return questions[:num_questions] except Exception as e: logger.error(f"{self.log_prefix} Failed to generate Socratic questions: {e}", exc_info=True); return [f"Error generating Socratic Qs: {e}"]

yaml_config_loader.py

# mindx/utils/yaml_config_loader.py
"""
Utility for loading and parsing YAML configuration files.
"""
import yaml
from pathlib import Path
from typing import Dict, Any, Optional
from .logging_config import get_logger

logger = get_logger(__name__)

def load_yaml_file(file_path: Path) -> Optional[Dict[str, Any]]: """ Loads a single YAML file and returns its content as a dictionary.

Args: file_path: The Path object pointing to the YAML file.

Returns: A dictionary with the parsed YAML content, or None if an error occurs. """ if not file_path.is_file(): logger.warning(f"YAML file not found at: {file_path}") return None try: with file_path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) logger.info(f"Successfully loaded YAML configuration from {file_path.name}") return data except yaml.YAMLError as e: logger.error(f"Error parsing YAML file {file_path}: {e}", exc_info=True) except Exception as e: logger.error(f"Failed to read or process YAML file {file_path}: {e}", exc_info=True) return None


All DocumentsDocument IndexThe Book of mindXImprovement JournalAPI Reference