Memory Agent

Overview

The FinAgent Memory System is a sophisticated, production-ready memory management architecture designed specifically for financial trading agents. It provides intelligent memory storage, retrieval, and relationship modeling capabilities through a unified backend with multiple protocol interfaces.

Key Features:

  • Multi-Protocol Support: HTTP REST, Model Context Protocol (MCP), and Agent-to-Agent (A2A) communication

  • Graph-Based Storage: Neo4j database with intelligent relationship modeling

  • Semantic Search: AI-powered memory retrieval with similarity scoring

  • Real-Time Processing: Event-driven architecture with streaming capabilities

  • Production Ready: 100% test coverage with comprehensive monitoring

System Architecture

Layered Architecture Design

The system implements a modular, layered architecture that separates protocol concerns from business logic:

┌─────────────────────────────────────────────────────────────┐
│                 FINAGENT MEMORY SYSTEM                     │
├─────────────────────────────────────────────────────────────┤
│  Memory Server   │   MCP Server    │    A2A Server         │
│  (Port 8000)     │   (Port 8001)   │   (Port 8002)         │
│  HTTP/REST API   │   MCP Protocol  │   A2A Protocol        │
├─────────────────────────────────────────────────────────────┤
│              UNIFIED INTERFACE MANAGER                     │
│          Protocol-agnostic tool orchestration             │
├─────────────────────────────────────────────────────────────┤
│              UNIFIED DATABASE MANAGER                      │
│          Neo4j graph database operations                  │
├─────────────────────────────────────────────────────────────┤
│                CONFIGURATION LAYER                        │
│          Environment-specific configurations              │
└─────────────────────────────────────────────────────────────┘

Core Components

1. Memory Server (Port 8000)
  • Purpose: Primary HTTP REST API interface

  • Technology: FastAPI with asynchronous processing

  • Features: Health monitoring, documentation endpoint

  • Use Case: Web applications and HTTP-based integrations

2. MCP Server (Port 8001)
  • Purpose: Model Context Protocol implementation for LLM integration

  • Technology: FastMCP with JSON-RPC 2.0 compliance

  • Features: Standardized tool definitions, Server-Sent Events (SSE)

  • Use Case: AI model integration and agent communication

3. A2A Server (Port 8002)
  • Purpose: Agent-to-Agent communication protocol

  • Technology: A2A SDK with streaming capabilities

  • Features: Inter-agent messaging, batch operations

  • Use Case: Multi-agent system coordination

4. Unified Interface Manager
  • Purpose: Protocol-agnostic tool execution engine

  • Features: Dynamic tool registration, context management

  • Benefits: Code reuse across protocols, centralized business logic

5. Unified Database Manager
  • Purpose: Centralized Neo4j operations

  • Features: Graph relationships, full-text search, intelligent indexing

  • Optimizations: Connection pooling, batch operations, caching

Interface Design

REST API Interface (Memory Server)

Base URL: http://localhost:8000

Health Check Endpoint

GET /health HTTP/1.1
Host: localhost:8000

Response:
{
  "status": "healthy",
  "timestamp": "2025-07-24T15:30:00Z",
  "service": "FinAgent Memory Server",
  "details": {
    "database": "connected",
    "components": {...}
  }
}

Documentation Endpoint

GET /docs HTTP/1.1
Host: localhost:8000

Response: HTML documentation page with API specifications

Model Context Protocol Interface (MCP Server)

Base URL: http://localhost:8001/mcp/ Protocol: JSON-RPC 2.0 with Server-Sent Events

Available Tools

store_memory

Store intelligent memory with automatic linking

{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "store_memory",
    "arguments": {
      "query": "Trading signal analysis",
      "keywords": ["trading", "signal", "analysis"],
      "summary": "Analysis of market signals",
      "agent_id": "trading_agent_001"
    }
  },
  "id": 1
}
retrieve_memory

Retrieve memories with semantic search

{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "retrieve_memory",
    "arguments": {
      "search_query": "market analysis",
      "limit": 5
    }
  },
  "id": 2
}
semantic_search

AI-powered semantic search

{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "semantic_search",
    "arguments": {
      "query": "risk management strategies",
      "limit": 10,
      "similarity_threshold": 0.3
    }
  },
  "id": 3
}
get_statistics

Comprehensive system statistics

{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "get_statistics",
    "arguments": {}
  },
  "id": 4
}
health_check

System health monitoring

{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "health_check",
    "arguments": {}
  },
  "id": 5
}
create_relationship

Create intelligent memory relationships

{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "create_relationship",
    "arguments": {
      "source_memory_id": "uuid-1",
      "target_memory_id": "uuid-2",
      "relationship_type": "RELATED_TO"
    }
  },
  "id": 6
}

Agent-to-Agent Protocol Interface (A2A Server)

Base URL: http://localhost:8002/ Protocol: A2A JSON-RPC with streaming support

Message Format

{
  "jsonrpc": "2.0",
  "method": "message/send",
  "params": {
    "message": {
      "messageId": "msg_001",
      "role": "user",
      "parts": [
        {
          "text": "{\"action\": \"store\", \"key\": \"market_data\", \"value\": \"...\"}"
        }
      ]
    }
  },
  "id": 1
}

Supported Operations

  • store: Store memory objects

  • retrieve: Retrieve memory objects

  • search: Search memories

  • health: Health check

  • list: List memories

Database Schema

Neo4j Graph Model

Node Types

  • Memory: Core memory storage nodes

  • Agent: Agent identity nodes

Relationship Types

  • SIMILAR_TO: Semantic similarity relationships

  • CREATED_BY: Agent ownership relationships

  • RELATED_TO: General content relationships

Memory Node Properties

Memory {
  memory_id: String (UNIQUE)
  agent_id: String (INDEXED)
  memory_type: String (INDEXED)
  content: String (JSON)
  content_text: String (FULLTEXT INDEXED)
  summary: String (FULLTEXT INDEXED)
  keywords: List<String>
  timestamp: DateTime (INDEXED)
  event_type: String
  log_level: String
  session_id: String
  correlation_id: String
  lookup_count: Integer
  created_at: DateTime
}

Indexes

-- Unique constraint
CREATE CONSTRAINT memory_id_unique FOR (m:Memory) REQUIRE m.memory_id IS UNIQUE;

-- Full-text search
CREATE FULLTEXT INDEX memory_content_index FOR (m:Memory) ON EACH [m.content_text, m.summary];

-- Range indexes
CREATE INDEX memory_timestamp_idx FOR (m:Memory) ON (m.timestamp);
CREATE INDEX memory_agent_idx FOR (m:Memory) ON (m.agent_id);
CREATE INDEX memory_type_idx FOR (m:Memory) ON (m.memory_type);

Installation and Setup

Prerequisites

System Requirements:

  • Python 3.12+

  • Neo4j 5.x

  • Available ports: 8000, 8001, 8002

Environment Setup:

# Create conda environment
conda create -n agent python=3.12
conda activate agent

# Install dependencies
pip install -r requirements.txt

Database Configuration:

# Neo4j connection settings
URI: bolt://localhost:7687
Username: neo4j
Password: finagent123
Database: neo4j

Quick Start

One-Command Startup:

cd /path/to/FinAgent-Orchestration/FinAgents/memory
./start_memory_system.sh start

Service Management:

# Start all services
./start_memory_system.sh start

# Start individual services
./start_memory_system.sh memory    # Memory Server only
./start_memory_system.sh mcp       # MCP Server only
./start_memory_system.sh a2a       # A2A Server only

# Check status
./start_memory_system.sh status

# View logs
./start_memory_system.sh logs

# Stop services
./start_memory_system.sh stop

# Run tests
./start_memory_system.sh test

Manual Startup:

# Terminal 1 - Memory Server
python memory_server.py

# Terminal 2 - MCP Server
uvicorn mcp_server:app --host 0.0.0.0 --port 8001

# Terminal 3 - A2A Server
python a2a_server.py

Testing Framework

Comprehensive Test Suite

The system includes a comprehensive testing framework with 20 test cases covering all components:

Test Categories:

  • Port Connectivity Tests (3 tests)

  • Database Operations Tests (2 tests)

  • Memory Server Tests (2 tests)

  • MCP Server Tests (7 tests)

  • A2A Server Tests (5 tests)

  • Performance Tests (1 test)

Test Execution:

# Integrated testing via startup script
./start_memory_system.sh test

# Direct test execution
python -m FinAgents.memory.tests.memory_test

# Test system script
cd tests && ./test_system.sh

Expected Results:

📊 Final Test Results:
   📝 Total Tests: 20
   ✅ Passed: 20
   ❌ Failed: 0
   📈 Success Rate: 100.0%
   ⏱️  Duration: ~0.5s

Test Coverage Details

MCP Protocol Tests:

  • MCP Tools List: Verifies 6 available tools

  • MCP Health Check Tool: Tests health monitoring

  • MCP Statistics Tool: Tests system analytics

  • MCP Store Memory Tool: Tests memory storage with success indicators

  • MCP Retrieve Memory Tool: Tests memory retrieval functionality

A2A Protocol Tests:

  • Simple Message: Basic communication test

  • Store Operation: Memory storage via A2A protocol

  • Retrieve Operation: Memory retrieval via A2A protocol

  • Health Check: A2A health monitoring

  • Performance Test: 50+ operations/second throughput

Error Handling:

All tests include comprehensive error handling with:

  • Graceful fallback mechanisms

  • Detailed error reporting

  • Timeout management

  • Connection retry logic

Performance Metrics

System Performance

Throughput Benchmarks:

  • A2A Operations: 50+ operations/second

  • MCP Tool Calls: <200ms average response time

  • Database Queries: <150ms for memory retrieval

  • Memory Storage: <100ms per operation

Scalability Metrics:

  • Concurrent Connections: Multiple simultaneous clients supported

  • Memory Indexing: Real-time with intelligent similarity linking

  • Database Operations: Connection pooling for optimal performance

Resource Usage:

  • Memory Footprint: Optimized for concurrent operations

  • CPU Usage: Efficient asynchronous processing

  • Network I/O: Minimal latency with streaming support

Monitoring and Logging

Log Management

Log Files Location:

logs/
├── memory_server.log       # Memory Server operations
├── mcp_server.log         # MCP Server operations
├── a2a_server.log         # A2A Server operations
└── system.log             # System-wide events

Log Levels:

  • INFO: Normal operations and successful requests

  • WARNING: Non-critical issues and deprecation notices

  • ERROR: Error conditions that don’t stop service

  • CRITICAL: Fatal errors requiring immediate attention

Real-Time Monitoring:

# Monitor all logs
tail -f logs/*.log

# Monitor specific service
tail -f logs/mcp_server.log

# Search for errors
grep -i error logs/*.log

Health Monitoring

Service Health Checks:

# Memory Server health
curl http://localhost:8000/health

# System status via script
./start_memory_system.sh status

Component Status Monitoring:

  • Database connectivity status

  • Service responsiveness metrics

  • Memory usage statistics

  • Performance benchmarks

Troubleshooting Guide

Common Issues

Port Conflicts

# Check port availability
lsof -i :8000,8001,8002

# Kill processes on specific ports
sudo lsof -ti:8000 | xargs kill -9

Database Connection Issues

# Check Neo4j status
systemctl status neo4j

# Test database connection
cypher-shell -u neo4j -p finagent123

Service Startup Problems

# Check conda environment
conda info --envs | grep agent

# Verify Python dependencies
pip check

# Check service logs
grep -E "(ERROR|CRITICAL)" logs/*.log

Test Failures

# Run diagnostic tests
python tests/memory_test.py --verbose

# Check individual service health
curl -s http://localhost:8000/health | jq .
curl -s http://localhost:8001/ | head -5
curl -s http://localhost:8002/ | head -5

Performance Issues

Memory Usage

# Monitor memory usage
ps aux | grep -E "(memory_server|mcp_server|a2a_server)"

# Check database memory
cypher-shell -u neo4j -p finagent123 "CALL dbms.listMemoryPools()"

Response Time Optimization

# Enable debug logging
export LOG_LEVEL=DEBUG

# Monitor response times
curl -w "%{time_total}\n" -s http://localhost:8000/health

Diagnostic Commands

System Validation:

# Complete system check
./start_memory_system.sh test

# Service connectivity test
./start_memory_system.sh status

# Configuration validation
python -c "import unified_database_manager; print('✅ Database manager OK')"
python -c "import unified_interface_manager; print('✅ Interface manager OK')"

Log Analysis:

# Recent errors
tail -100 logs/*.log | grep -i error

# Service startup events
grep -E "(Starting|Started|Initialized)" logs/*.log

# Performance metrics
grep -E "(operations/second|ms|performance)" logs/*.log

Advanced Configuration

Environment Variables

# Database configuration
export NEO4J_URI="bolt://localhost:7687"
export NEO4J_USER="neo4j"
export NEO4J_PASSWORD="finagent123"

# Service configuration
export MEMORY_SERVER_PORT=8000
export MCP_SERVER_PORT=8001
export A2A_SERVER_PORT=8002

# Logging configuration
export LOG_LEVEL=INFO
export LOG_FORMAT=detailed

Custom Tool Development

Adding New MCP Tools:

@mcp_server.tool(
    name="custom_analysis",
    description="Custom financial analysis tool"
)
async def custom_analysis(data: str, parameters: Dict[str, Any]) -> str:
    """Custom analysis implementation."""
    # Tool implementation
    return json.dumps({"result": "analysis complete"})

Extending Database Schema:

-- Add custom node types
CREATE (:CustomAnalysis {
  analysis_id: 'unique_id',
  analysis_type: 'technical',
  parameters: 'json_data',
  created_at: datetime()
});

-- Add custom relationships
CREATE (m:Memory)-[:ANALYZED_BY]->(a:CustomAnalysis);

API Reference

Complete API Documentation

For comprehensive API documentation including all endpoints, parameters, and response formats, visit:

  • Memory Server: http://localhost:8000/docs

  • MCP Tools: Available via tools/list JSON-RPC call

  • A2A Protocol: Standard A2A SDK documentation

Example Usage Patterns:

# Python client example for MCP
import requests

def call_mcp_tool(tool_name, arguments):
    response = requests.post(
        "http://localhost:8001/mcp/",
        json={
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {"name": tool_name, "arguments": arguments},
            "id": 1
        },
        headers={"Accept": "application/json, text/event-stream"}
    )
    return response.json()

Contributing

Development Setup

# Fork and clone repository
git clone https://github.com/Open-Finance-Lab/FinAgent-Orchestration.git
cd FinAgent-Orchestration/FinAgents/memory

# Setup development environment
conda create -n finagent-dev python=3.12
conda activate finagent-dev
pip install -r requirements.txt
pip install -r requirements-dev.txt

Code Style:

  • Follow PEP 8 guidelines

  • Use type hints for all functions

  • Include comprehensive docstrings

  • Add unit tests for new features

Testing Requirements:

  • All new features must include tests

  • Maintain 100% test pass rate

  • Include integration tests for new protocols

  • Performance benchmarks for new operations

License and Support

License: OpenMDW (see LICENSE file)

Support Channels:

Contributing: Please see CONTRIBUTING.md for detailed guidelines.

Acknowledgments: Built on FastAPI, Neo4j, FastMCP, and A2A Protocol standards.

Note

This documentation reflects the current stable release. For the latest development features and API updates, please refer to the GitHub repository main branch.