MindX's enhanced memory and logging system provides sophisticated self-awareness and context management capabilities designed to scale from single-agent deployments to enterprise-level systems supporting thousands of agents. The system maintains real-time performance while offering both programmatic and human-readable access to memory insights.
Hierarchical Organization
Memory Types & Importance Levels
# Memory Types
INTERACTION # User/agent conversations
CONTEXT # System state snapshots
LEARNING # Knowledge and pattern discoveries
SYSTEM_STATE # Infrastructure status
PERFORMANCE # Metrics and benchmarks
ERROR # Failures and recovery attempts
GOAL # Agent objectives and targets
BELIEF # Agent world model updates
PLAN # Decision-making processes
Importance Levels
CRITICAL # System failures, security events
HIGH # Performance alerts, goal completions
MEDIUM # Regular interactions, state changes
LOW # Debug info, routine operations
data/
├── memory/
│ ├── timestamped/ # Hot storage (recent memories)
│ │ ├── shards/ # Distributed sharding for scale
│ │ │ ├── shard_000/ # First 100 agents (agents 0-99)
│ │ │ ├── shard_001/ # Next 100 agents (agents 100-199)
│ │ │ └── shard_nnn/ # Auto-scaling shards
│ │ └── {shard_id}/
│ │ ├── {agent_id}/
│ │ │ ├── {YYYYMMDD}/
│ │ │ │ ├── interactions/
│ │ │ │ │ ├── {timestamp}.interaction.json
│ │ │ │ │ └── {timestamp}.response.json
│ │ │ │ ├── performance/
│ │ │ │ │ └── {timestamp}.perf.json
│ │ │ │ ├── errors/
│ │ │ │ │ └── {timestamp}.error.json
│ │ │ │ └── index.json # Daily index for fast lookup
│ │ │ └── metadata.json # Agent config and stats
│ ├── compressed/ # Cold storage (archived memories)
│ │ ├── {agent_id}/
│ │ │ ├── {YYYYMM}.gz # Monthly compressed archives
│ │ │ └── index.json # Archive index
│ ├── analytics/ # Aggregated insights
│ │ ├── global/ # System-wide analytics
│ │ │ ├── daily/ # Daily aggregations
│ │ │ ├── weekly/ # Weekly trends
│ │ │ └── monthly/ # Long-term patterns
│ │ ├── agent_clusters/ # Agent group analytics
│ │ └── performance/ # Performance baselines
│ └── cache/ # Fast lookup caches
│ ├── recent_memories/ # LRU cache for hot data
│ ├── pattern_cache/ # Compiled pattern insights
│ └── search_indices/ # Full-text search indices
└── logs/
├── runtime/ # System execution logs
│ ├── {YYYYMMDD}/
│ │ ├── mindx_runtime_{shard}.log
│ │ └── error_summary.json
├── terminal/ # User interaction logs
│ ├── {YYYYMMDD}/
│ │ └── terminal_sessions.log
├── process_traces/ # Detailed execution traces
│ ├── {agent_id}/
│ │ └── {YYYYMMDD}/
│ │ └── trace_{timestamp}.json
└── audit/ # Security and compliance logs
├── {YYYYMMDD}/
│ ├── access.log
│ ├── auth.log
│ └── data_changes.log
{
"memory_id": "agent123_20250115_143022_001",
"timestamp_utc": "2025-01-15T14:30:22.123456Z",
"timestamp_local": "2025-01-15T14:30:22.123456-05:00",
"agent_id": "bdi_agent_001",
"shard_id": "shard_001",
"memory_type": "INTERACTION",
"importance": "MEDIUM",
"input": {
"content": "Please analyze the market trends",
"source": "user_interface",
"context_id": "session_abc123"
},
"response": {
"content": "Based on recent data analysis...",
"success": true,
"confidence": 0.87,
"processing_time_ms": 1247
},
"context": {
"session_id": "session_abc123",
"user_id": "user_456",
"task_type": "analysis",
"model_used": "gpt-4",
"system_load": 0.65
},
"tags": ["market_analysis", "user_request", "successful"],
"relationships": {
"parent_memory_id": "agent123_20250115_143015_999",
"related_memories": ["agent123_20250115_143000_888"]
},
"metadata": {
"memory_version": "2.0",
"compression_eligible": false,
"retention_days": 30,
"access_count": 0,
"last_accessed": null
}
}
Timestampmemory.json Files
Every input/response interaction between users and agents is stored as an individual timestampmemory.json file. These files contain:
Storage Layers for Scale
Runtime Logs
logs/runtime/{YYYYMMDD}/
├── mindx_runtime_shard_000.log # Agent operations for shard 0
├── mindx_runtime_shard_001.log # Agent operations for shard 1
├── error_summary.json # Aggregated error patterns
└── performance_metrics.json # System performance data
Process Traces
logs/process_traces/{agent_id}/{YYYYMMDD}/
├── trace_143022_user_request.json # Individual request trace
├── trace_143025_model_call.json # LLM interaction trace
└── trace_143028_memory_store.json # Memory operation trace
Audit and Compliance Logs
logs/audit/{YYYYMMDD}/
├── access.log # All system access attempts
├── auth.log # Authentication and authorization events
├── data_changes.log # Memory modification tracking
└── compliance.json # Regulatory compliance data export
Dynamic Shard Management The system automatically distributes agents across shards to prevent any single storage location from becoming a bottleneck:
class ShardManager:
def __init__(self):
self.agents_per_shard = 100 # Configurable based on workload
self.max_shard_size_gb = 10 # Auto-split threshold
self.replication_factor = 2 # For reliability and read scaling
def get_shard_id(self, agent_id: str) -> str:
# Consistent hashing ensures same agent always maps to same shard
hash_value = hash(agent_id) % 1000000
shard_num = hash_value // (1000000 // self.agents_per_shard)
return f"shard_{shard_num:03d}"
def auto_scale_shards(self):
# Monitor shard sizes and split when thresholds exceeded
# Redistribute agents for optimal load balancing
# Handle shard merging when agents are decommissioned
Load Balancing Features
Intelligent Tiering Strategy
class MemoryLifecycleManager:
def __init__(self):
self.hot_days = 7 # Recent memories in fast storage
self.warm_days = 30 # Compressed but indexed memories
self.cold_retention_months = 12 # Long-term archives
async def auto_tier_memories(self):
# Daily background process moves memories between storage tiers
# Considers access patterns, memory importance, and agent activity
# Maintains performance while optimizing storage costs
Compression and Deduplication
Multi-Level Caching Strategy
class MemoryCache:
def __init__(self):
self.recent_cache = LRU(maxsize=10000) # Last 10k memories
self.pattern_cache = LRU(maxsize=1000) # Compiled pattern insights
self.agent_stats_cache = LRU(maxsize=5000) # Agent performance summaries
def smart_prefetch(self, agent_id: str):
# Predict and preload likely-needed memories
# Based on agent behavior patterns, time of day, and historical access
# Reduces cache misses and improves response times
Database Optimization
from agents.enhanced_memory_agent import EnhancedMemoryAgent
Initialize with scalability configuration
memory_agent = EnhancedMemoryAgent(
shard_config={
"agents_per_shard": 100,
"auto_scale": True,
"replication_factor": 2
},
storage_config={
"hot_storage_days": 7,
"compression_enabled": True,
"cache_size_mb": 500
}
)
Save interaction with automatic sharding and optimization
await memory_agent.save_interaction_memory(
agent_id="bdi_agent_0157",
input_content="Analyze customer behavior patterns",
response_content="Identified 3 key behavior clusters based on purchase history...",
context={
"task_complexity": "high",
"processing_time_ms": 2340,
"confidence": 0.91,
"model_used": "gpt-4",
"tokens_used": 1250
},
importance="HIGH",
tags=["customer_analysis", "ml_insights", "business_intelligence"]
)
Cross-agent pattern analysis for swarm intelligence
cluster_analysis = await memory_agent.analyze_agent_cluster_patterns(
agent_group="customer_service_bots",
pattern_types=["performance", "errors", "learning", "collaboration"],
time_range_days=7,
min_interactions=100
)
System-wide health and scaling insights
system_health = await memory_agent.generate_system_health_report(
include_predictions=True,
detail_level="executive_summary",
forecast_days=30
)
# Predict when system will need more resources
scaling_prediction = await memory_agent.predict_scaling_needs(
forecast_days=30,
confidence_threshold=0.8,
growth_scenarios=["conservative", "aggressive", "exponential"],
consider_seasonal_patterns=True
)
Identify optimization opportunities across thousands of agents
optimization_report = await memory_agent.generate_optimization_recommendations(
target_metrics=["response_time", "memory_usage", "error_rate", "cost"],
agent_sample_size=1000, # Analyze top 1000 most active agents
optimization_horizon_days=7,
include_cost_analysis=True
)
Cross-agent collaboration and swarm behavior analysis
collaboration_patterns = await memory_agent.analyze_collaboration_patterns(
time_window_hours=24,
min_interaction_threshold=5,
network_analysis=True, # Generate agent interaction networks
detect_emergent_behaviors=True
)
Small Scale (1-50 agents)
{
"memory_system": {
"storage": {
"sharding": {
"enabled": false,
"single_shard_mode": true
},
"backend": "sqlite",
"cache_size_mb": 100,
"compression": {"enabled": false}
},
"monitoring": {
"health_check_interval_seconds": 300,
"metrics_retention_days": 30
}
}
}
Medium Scale (50-500 agents)
{
"memory_system": {
"storage": {
"sharding": {
"enabled": true,
"agents_per_shard": 50,
"max_shard_size_gb": 5,
"auto_scale": true
},
"backend": "postgresql",
"cache_size_mb": 1000,
"compression": {"enabled": true, "level": 3}
},
"performance": {
"max_concurrent_operations": 500,
"batch_size": 100
}
}
}
Large Scale (500-5000+ agents)
{
"memory_system": {
"storage": {
"sharding": {
"enabled": true,
"agents_per_shard": 100,
"max_shard_size_gb": 10,
"auto_scale": true,
"replication_factor": 3
},
"backend": "distributed_postgresql",
"cache_size_mb": 5000,
"compression": {
"enabled": true,
"algorithm": "zstd",
"level": 6
}
},
"performance": {
"max_concurrent_operations": 10000,
"batch_size": 1000,
"async_processing": true,
"background_optimization": true
},
"monitoring": {
"real_time_analytics": true,
"health_check_interval_seconds": 30,
"predictive_scaling": true
}
}
}
Memory Operations per Second
Query Response Times (99th percentile)
Storage Efficiency
When to Scale Up
Auto-scaling Triggers
Small Scale (1-50 agents)
Medium Scale (50-500 agents)
Large Scale (500-5000+ agents)
class LegacyMigrator:
async def migrate_existing_memories(self):
# Phase 1: Analyze existing memory format and volume
existing_data = await self.analyze_legacy_format()
# Phase 2: Create migration plan with optimal sharding strategy
migration_plan = await self.create_migration_plan(existing_data)
# Phase 3: Implement dual-write during transition period
await self.enable_dual_write_mode()
# Phase 4: Migrate in batches to avoid service interruption
for batch in migration_plan.batches:
await self.migrate_batch(batch)
await self.validate_batch_integrity(batch)
# Phase 5: Gradual cutover with rollback capability
await self.perform_gradual_cutover()
# Phase 6: Cleanup and optimization
await self.cleanup_legacy_data()
await self.optimize_new_system()
Infrastructure Validation
Performance Validation
Operational Readiness
Automated Health Checks
Performance Optimization
Capacity Planning
Data Quality
This enhanced memory and logging system provides MindX with enterprise-grade scalability while maintaining the simplicity and power needed for both individual agents and massive agent swarms. The architecture is designed to grow seamlessly from prototype to production scale, supporting thousands of agents while maintaining sub-second response times and comprehensive observability.