enhanced_monitoring_system.md · 17.2 KB

Enhanced Monitoring System

Overview

The Enhanced Monitoring System provides comprehensive, real-time monitoring of system resources, API usage, performance metrics, and rate limiting across the mindX framework. It integrates seamlessly with the MemoryAgent for persistent storage and structured alerting.

Architecture

graph TB
    A[Enhanced Monitoring System] --> B[Resource Monitor]
    A --> C[Performance Monitor]
    A --> D[API Usage Monitor]
    A --> E[Rate Limiter Monitor]
    
    B --> F[CPU Metrics]
    B --> G[Memory Metrics]
    B --> H[Disk/Network I/O]
    
    C --> I[LLM Performance]
    C --> J[Agent Performance]
    
    D --> K[Token Usage]
    D --> L[Cost Tracking]
    D --> M[Provider Analytics]
    
    E --> N[Rate Limit Metrics]
    E --> O[Success Rates]
    E --> P[Wait Times]
    
    F --> Q[Memory Agent]
    G --> Q
    H --> Q
    I --> Q
    J --> Q
    K --> Q
    L --> Q
    M --> Q
    N --> Q
    O --> Q
    P --> Q
    
    Q --> R[Structured Logs]
    Q --> S[Export Files]

Core Components

1. Token Calculator Tool (Production Cost Management)

The TokenCalculatorTool serves as the cornerstone of the monitoring system's cost management capabilities, providing production-grade token cost calculation, usage tracking, and budget optimization for all LLM operations.

Key Features:

Core Operations:

# Cost estimation with high precision
result = await token_calculator.execute(
    "estimate_cost",
    text="Analyze this code snippet",
    model="gemini-1.5-flash",
    operation_type="code_generation"
)

Usage tracking with budget monitoring

result = await token_calculator.execute( "track_usage", agent_id="analyzer_agent", operation="code_analysis", model="gemini-1.5-flash", input_tokens=150, output_tokens=75, cost_usd=0.000375 )

Comprehensive system metrics

result = await token_calculator.execute("get_metrics")

Integration with Monitoring System:

The TokenCalculatorTool integrates seamlessly with the Enhanced Monitoring System to provide:

2. Enhanced Resource Monitoring

Detailed CPU Metrics:

Comprehensive Memory Metrics:

System Metrics:

3. API Token Usage Monitoring

Comprehensive Token Tracking:

Cost Analysis:

Provider Intelligence:

4. Rate Limiter Performance Monitoring

Advanced Rate Limiting Metrics:

Performance Analysis:

5. Intelligent Alerting System

Multi-level Alert Severity:

Resource Alerts:

API Usage Alerts:

Rate Limiter Alerts:

API Reference

Core Monitoring Methods

Enhanced Resource Collection

# Automatic detailed resource collection every 30 seconds (configurable)
await monitoring_system.start_monitoring()

Get current comprehensive metrics

metrics = monitoring_system.get_current_metrics() print(f"CPU: {metrics['resource_metrics']['cpu_percent']}%") print(f"Memory: {metrics['resource_metrics']['memory_used_gb']:.1f}GB") print(f"Swap: {metrics['resource_metrics']['swap_percent']}%")

API Token Usage Tracking

# Log comprehensive API token usage
await monitoring_system.log_api_token_usage(
    model_name="gpt-4",
    provider="openai",
    prompt_tokens=150,
    completion_tokens=200,
    cost_usd=0.005,
    success=True,
    rate_limited=False,
    metadata={"agent_id": "my_agent", "task": "code_generation"}
)

Get API usage summary

api_summary = await monitoring_system.get_api_usage_summary(hours_back=24) print(f"Total Cost: ${api_summary['summary']['total_cost_usd']:.3f}") print(f"Total Tokens: {api_summary['summary']['total_tokens']:,}")

Rate Limiter Integration

# Enhanced rate limiter with monitoring
def monitoring_callback(metrics):
    asyncio.create_task(monitoring_system.log_rate_limiter_metrics(
        provider="openai",
        model_name="gpt-4", 
        rate_limiter_metrics=metrics
    ))

rate_limiter = RateLimiter( requests_per_minute=60, max_retries=3, monitoring_callback=monitoring_callback )

Get rate limiter health summary

limiter_summary = await monitoring_system.get_rate_limiter_summary() print(f"Overall Health: {limiter_summary['overall_health']}")

Performance Monitoring

LLM Performance Tracking

# Enhanced LLM performance logging
await monitoring_system.log_llm_performance(
    model_name="gpt-4",
    task_type="code_generation",
    agent_id="enhanced_simple_coder",
    latency_ms=2500,
    success=True,
    prompt_tokens=200,
    completion_tokens=150,
    cost=0.008,
    error_type=None,
    metadata={"complexity": "high", "language": "python"}
)

Agent Performance Tracking

# Agent execution performance
await monitoring_system.log_agent_performance(
    agent_id="bdi_agent",
    action_type="planning",
    execution_time_ms=1200,
    success=True,
    metadata={"goal_complexity": "medium"}
)

Configuration Options

Monitoring Intervals

{
  "monitoring": {
    "interval_seconds": 30.0,
    "alert_cooldown_seconds": 300,
    "memory_logging_enabled": true,
    "log_performance_details": true
  }
}

Resource Thresholds

{
  "monitoring": {
    "thresholds": {
      "cpu_critical": 90.0,
      "cpu_warning": 70.0,
      "memory_critical": 85.0,
      "memory_warning": 70.0,
      "disk_critical": 90.0,
      "disk_warning": 80.0,
      "swap_critical": 80.0,
      "swap_warning": 60.0
    }
  }
}

API Usage Thresholds

{
  "monitoring": {
    "api": {
      "daily_cost_threshold": 100.0,
      "rate_limit_threshold": 10,
      "efficiency_threshold": 0.1
    }
  }
}

Rate Limiter Configuration

{
  "rate_limit_profiles": {
    "default_rpm": 2,
    "agint_rpm": 5,
    "bdi_rpm": 15,
    "high_volume_rpm": 60
  }
}

Data Storage and Persistence

Memory Agent Integration

Export Functionality

# Export comprehensive metrics to JSON
export_path = await monitoring_system.export_metrics_to_file()
print(f"Metrics exported to: {export_path}")

Generate detailed monitoring report

report = await monitoring_system.generate_monitoring_report(hours_back=24)

Storage Structure

data/
├── monitoring/
│   └── logs/
│       └── metrics_export_YYYYMMDD_HHMMSS.json
└── memory/
    └── stm/
        └── enhanced_monitoring_system/
            └── YYYYMMDD/
                ├── YYYY-MM-DDTHH-MM-SS.system_state.memory.json
                ├── YYYY-MM-DDTHH-MM-SS.performance.memory.json
                └── YYYY-MM-DDTHH-MM-SS.api_usage.memory.json

Integration Examples

LLM Handler Integration

# In LLM handlers, integrate token usage tracking
class GeminiHandler(LLMHandlerInterface):
    async def generate_text(self, prompt, model, kwargs):
        start_time = time.time()
        
        try:
            response = await self._api_call(prompt, model, kwargs)
            
            # Log successful API usage
            await monitoring_system.log_api_token_usage(
                model_name=model,
                provider="gemini",
                prompt_tokens=response.usage.prompt_tokens,
                completion_tokens=response.usage.completion_tokens,
                cost_usd=self._calculate_cost(response.usage),
                success=True,
                rate_limited=False
            )
            
            return response.text
            
        except RateLimitError as e:
            # Log rate limited API usage
            await monitoring_system.log_api_token_usage(
                model_name=model,
                provider="gemini",
                prompt_tokens=0,
                completion_tokens=0,
                cost_usd=0.0,
                success=False,
                rate_limited=True
            )
            raise

Agent Integration

# In BDI agents, track performance metrics
class BDIAgent:
    async def execute_action(self, action):
        start_time = time.time()
        
        try:
            result = await self._perform_action(action)
            execution_time = (time.time() - start_time)  1000
            
            await monitoring_system.log_agent_performance(
                agent_id=self.agent_id,
                action_type=action.type,
                execution_time_ms=execution_time,
                success=True,
                metadata={"action_complexity": action.complexity}
            )
            
            return result
            
        except Exception as e:
            execution_time = (time.time() - start_time)  1000
            
            await monitoring_system.log_agent_performance(
                agent_id=self.agent_id,
                action_type=action.type,
                execution_time_ms=execution_time,
                success=False,
                metadata={"error_type": type(e).__name__}
            )
            raise

Monitoring Dashboards and Analysis

Real-time Metrics Dashboard

# Get real-time system overview
async def get_system_overview():
    current = monitoring_system.get_current_metrics()
    api_summary = await monitoring_system.get_api_usage_summary()
    limiter_summary = await monitoring_system.get_rate_limiter_summary()
    
    return {
        "system_health": {
            "cpu_usage": f"{current['resource_metrics']['cpu_percent']:.1f}%",
            "memory_usage": f"{current['resource_metrics']['memory_percent']:.1f}%",
            "swap_usage": f"{current['resource_metrics']['swap_percent']:.1f}%",
        },
        "api_usage": {
            "total_cost": f"${api_summary['summary']['total_cost_usd']:.3f}",
            "total_calls": api_summary['summary']['total_calls'],
            "avg_cost_per_call": f"${api_summary['summary']['avg_cost_per_call']:.4f}"
        },
        "rate_limiters": {
            "total_tracked": limiter_summary['total_limiters'],
            "overall_health": limiter_summary['overall_health']
        }
    }

Cost Analysis

# Analyze API costs and efficiency
async def analyze_api_efficiency():
    api_summary = await monitoring_system.get_api_usage_summary()
    
    efficiency_report = {}
    for model_key, model_data in api_summary['by_model'].items():
        efficiency_report[model_key] = {
            "cost_per_token": model_data['total_cost'] / max(model_data['total_tokens'], 1),
            "efficiency_ratio": model_data['efficiency'],
            "rate_limit_impact": model_data['rate_limit_hits'] / max(model_data['calls'], 1)
        }
    
    return efficiency_report

Best Practices

1. Resource Monitoring

2. API Usage Optimization

3. Rate Limiter Tuning

4. Alert Configuration

Troubleshooting

Common Issues

High Memory Usage

API Cost Overruns

Rate Limiter Issues

Diagnostic Commands

# Check overall system health
overview = await get_system_overview()
print(json.dumps(overview, indent=2))

Analyze API efficiency

efficiency = await analyze_api_efficiency() for model, metrics in efficiency.items(): print(f"{model}: {metrics['cost_per_token']:.6f}$/token, {metrics['efficiency_ratio']:.3f} efficiency")

Export detailed metrics for analysis

export_path = await monitoring_system.export_metrics_to_file() print(f"Detailed metrics exported to: {export_path}")

Conclusion

The Enhanced Monitoring System provides comprehensive, production-ready monitoring of all aspects of the mindX framework. With detailed resource tracking, API usage monitoring, rate limiter performance analysis, and intelligent alerting, it enables proactive system management and optimization.

Key benefits:


All DocumentsDocument IndexThe Book of mindXImprovement JournalAPI Reference