The Enhanced Monitoring System provides comprehensive, real-time monitoring of system resources, API usage, performance metrics, and rate limiting across the mindX framework. It integrates seamlessly with the MemoryAgent for persistent storage and structured alerting.
graph TB
A[Enhanced Monitoring System] --> B[Resource Monitor]
A --> C[Performance Monitor]
A --> D[API Usage Monitor]
A --> E[Rate Limiter Monitor]
B --> F[CPU Metrics]
B --> G[Memory Metrics]
B --> H[Disk/Network I/O]
C --> I[LLM Performance]
C --> J[Agent Performance]
D --> K[Token Usage]
D --> L[Cost Tracking]
D --> M[Provider Analytics]
E --> N[Rate Limit Metrics]
E --> O[Success Rates]
E --> P[Wait Times]
F --> Q[Memory Agent]
G --> Q
H --> Q
I --> Q
J --> Q
K --> Q
L --> Q
M --> Q
N --> Q
O --> Q
P --> Q
Q --> R[Structured Logs]
Q --> S[Export Files]
The TokenCalculatorTool serves as the cornerstone of the monitoring system's cost management capabilities, providing production-grade token cost calculation, usage tracking, and budget optimization for all LLM operations.
# Cost estimation with high precision
result = await token_calculator.execute(
"estimate_cost",
text="Analyze this code snippet",
model="gemini-1.5-flash",
operation_type="code_generation"
)
Usage tracking with budget monitoring
result = await token_calculator.execute(
"track_usage",
agent_id="analyzer_agent",
operation="code_analysis",
model="gemini-1.5-flash",
input_tokens=150,
output_tokens=75,
cost_usd=0.000375
)
Comprehensive system metrics
result = await token_calculator.execute("get_metrics")
# Automatic detailed resource collection every 30 seconds (configurable)
await monitoring_system.start_monitoring()
Get current comprehensive metrics
metrics = monitoring_system.get_current_metrics()
print(f"CPU: {metrics['resource_metrics']['cpu_percent']}%")
print(f"Memory: {metrics['resource_metrics']['memory_used_gb']:.1f}GB")
print(f"Swap: {metrics['resource_metrics']['swap_percent']}%")
# Log comprehensive API token usage
await monitoring_system.log_api_token_usage(
model_name="gpt-4",
provider="openai",
prompt_tokens=150,
completion_tokens=200,
cost_usd=0.005,
success=True,
rate_limited=False,
metadata={"agent_id": "my_agent", "task": "code_generation"}
)
Get API usage summary
api_summary = await monitoring_system.get_api_usage_summary(hours_back=24)
print(f"Total Cost: ${api_summary['summary']['total_cost_usd']:.3f}")
print(f"Total Tokens: {api_summary['summary']['total_tokens']:,}")
# Enhanced rate limiter with monitoring
def monitoring_callback(metrics):
asyncio.create_task(monitoring_system.log_rate_limiter_metrics(
provider="openai",
model_name="gpt-4",
rate_limiter_metrics=metrics
))
rate_limiter = RateLimiter(
requests_per_minute=60,
max_retries=3,
monitoring_callback=monitoring_callback
)
Get rate limiter health summary
limiter_summary = await monitoring_system.get_rate_limiter_summary()
print(f"Overall Health: {limiter_summary['overall_health']}")
# Enhanced LLM performance logging
await monitoring_system.log_llm_performance(
model_name="gpt-4",
task_type="code_generation",
agent_id="enhanced_simple_coder",
latency_ms=2500,
success=True,
prompt_tokens=200,
completion_tokens=150,
cost=0.008,
error_type=None,
metadata={"complexity": "high", "language": "python"}
)
# Agent execution performance
await monitoring_system.log_agent_performance(
agent_id="bdi_agent",
action_type="planning",
execution_time_ms=1200,
success=True,
metadata={"goal_complexity": "medium"}
)
{
"monitoring": {
"interval_seconds": 30.0,
"alert_cooldown_seconds": 300,
"memory_logging_enabled": true,
"log_performance_details": true
}
}
{
"monitoring": {
"thresholds": {
"cpu_critical": 90.0,
"cpu_warning": 70.0,
"memory_critical": 85.0,
"memory_warning": 70.0,
"disk_critical": 90.0,
"disk_warning": 80.0,
"swap_critical": 80.0,
"swap_warning": 60.0
}
}
}
{
"monitoring": {
"api": {
"daily_cost_threshold": 100.0,
"rate_limit_threshold": 10,
"efficiency_threshold": 0.1
}
}
}
{
"rate_limit_profiles": {
"default_rpm": 2,
"agint_rpm": 5,
"bdi_rpm": 15,
"high_volume_rpm": 60
}
}
# Export comprehensive metrics to JSON
export_path = await monitoring_system.export_metrics_to_file()
print(f"Metrics exported to: {export_path}")
Generate detailed monitoring report
report = await monitoring_system.generate_monitoring_report(hours_back=24)
data/
├── monitoring/
│ └── logs/
│ └── metrics_export_YYYYMMDD_HHMMSS.json
└── memory/
└── stm/
└── enhanced_monitoring_system/
└── YYYYMMDD/
├── YYYY-MM-DDTHH-MM-SS.system_state.memory.json
├── YYYY-MM-DDTHH-MM-SS.performance.memory.json
└── YYYY-MM-DDTHH-MM-SS.api_usage.memory.json
# In LLM handlers, integrate token usage tracking
class GeminiHandler(LLMHandlerInterface):
async def generate_text(self, prompt, model, **kwargs):
start_time = time.time()
try:
response = await self._api_call(prompt, model, **kwargs)
# Log successful API usage
await monitoring_system.log_api_token_usage(
model_name=model,
provider="gemini",
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
cost_usd=self._calculate_cost(response.usage),
success=True,
rate_limited=False
)
return response.text
except RateLimitError as e:
# Log rate limited API usage
await monitoring_system.log_api_token_usage(
model_name=model,
provider="gemini",
prompt_tokens=0,
completion_tokens=0,
cost_usd=0.0,
success=False,
rate_limited=True
)
raise
# In BDI agents, track performance metrics
class BDIAgent:
async def execute_action(self, action):
start_time = time.time()
try:
result = await self._perform_action(action)
execution_time = (time.time() - start_time) 1000
await monitoring_system.log_agent_performance(
agent_id=self.agent_id,
action_type=action.type,
execution_time_ms=execution_time,
success=True,
metadata={"action_complexity": action.complexity}
)
return result
except Exception as e:
execution_time = (time.time() - start_time) 1000
await monitoring_system.log_agent_performance(
agent_id=self.agent_id,
action_type=action.type,
execution_time_ms=execution_time,
success=False,
metadata={"error_type": type(e).__name__}
)
raise
# Get real-time system overview
async def get_system_overview():
current = monitoring_system.get_current_metrics()
api_summary = await monitoring_system.get_api_usage_summary()
limiter_summary = await monitoring_system.get_rate_limiter_summary()
return {
"system_health": {
"cpu_usage": f"{current['resource_metrics']['cpu_percent']:.1f}%",
"memory_usage": f"{current['resource_metrics']['memory_percent']:.1f}%",
"swap_usage": f"{current['resource_metrics']['swap_percent']:.1f}%",
},
"api_usage": {
"total_cost": f"${api_summary['summary']['total_cost_usd']:.3f}",
"total_calls": api_summary['summary']['total_calls'],
"avg_cost_per_call": f"${api_summary['summary']['avg_cost_per_call']:.4f}"
},
"rate_limiters": {
"total_tracked": limiter_summary['total_limiters'],
"overall_health": limiter_summary['overall_health']
}
}
# Analyze API costs and efficiency
async def analyze_api_efficiency():
api_summary = await monitoring_system.get_api_usage_summary()
efficiency_report = {}
for model_key, model_data in api_summary['by_model'].items():
efficiency_report[model_key] = {
"cost_per_token": model_data['total_cost'] / max(model_data['total_tokens'], 1),
"efficiency_ratio": model_data['efficiency'],
"rate_limit_impact": model_data['rate_limit_hits'] / max(model_data['calls'], 1)
}
return efficiency_report
# Check overall system health
overview = await get_system_overview()
print(json.dumps(overview, indent=2))
Analyze API efficiency
efficiency = await analyze_api_efficiency()
for model, metrics in efficiency.items():
print(f"{model}: {metrics['cost_per_token']:.6f}$/token, {metrics['efficiency_ratio']:.3f} efficiency")
Export detailed metrics for analysis
export_path = await monitoring_system.export_metrics_to_file()
print(f"Detailed metrics exported to: {export_path}")
The Enhanced Monitoring System provides comprehensive, production-ready monitoring of all aspects of the mindX framework. With detailed resource tracking, API usage monitoring, rate limiter performance analysis, and intelligent alerting, it enables proactive system management and optimization.
Key benefits: