Python SDK
The official Python SDK for Recall provides a powerful, type-safe interface to the hybrid memory system.
Installation
Bash
1pip install recall-memory
Requirements
- Python 3.8 or higher
- Redis 6.0+ (local or cloud)
- Mem0 API key
Quick Start
Python
1from recall import RecallClient2
3# Initialize the client4client = RecallClient(5 redis_url="redis://localhost:6379",6 mem0_api_key="your-api-key"7)8
9# Store a memory10memory = client.add(11 content="User prefers Python for data science",12 user_id="user_123",13 priority="high"14)15
16# Search memories17results = client.search(18 query="programming preferences",19 user_id="user_123"20)
Client Configuration
Environment Variables
The SDK automatically reads from environment variables:
Python
1import os2from recall import RecallClient3
4# Set environment variables5os.environ["RECALL_REDIS_URL"] = "redis://localhost:6379"6os.environ["RECALL_MEM0_API_KEY"] = "your-api-key"7os.environ["RECALL_ENVIRONMENT"] = "production"8
9# Client auto-configures from environment10client = RecallClient()
Configuration File
Load configuration from YAML or JSON:
Python
1from recall import RecallClient2
3# From YAML file4client = RecallClient.from_config("recall.yaml")5
6# From JSON file7client = RecallClient.from_config("recall.json")8
9# From dictionary10config = {11 "redis": {"url": "redis://localhost:6379"},12 "mem0": {"api_key": "your-api-key"},13 "cache": {"ttl": 3600}14}15client = RecallClient.from_dict(config)
Advanced Configuration
Python
1from recall import RecallClient, CacheConfig, SyncConfig2from recall.serializers import MessagePackSerializer3
4client = RecallClient(5 redis_url="redis://localhost:6379",6 mem0_api_key="your-api-key",7
8 # Cache configuration9 cache_config=CacheConfig(10 ttl=3600,11 max_memory="1gb",12 eviction_policy="allkeys-lru",13 compression=True,14 serializer=MessagePackSerializer()15 ),16
17 # Sync configuration18 sync_config=SyncConfig(19 mode="lazy",20 batch_size=100,21 interval=6022 ),23
24 # Connection configuration25 redis_connection_pool_kwargs={26 "max_connections": 50,27 "socket_keepalive": True,28 "socket_keepalive_options": {29 1: 1, # TCP_KEEPIDLE30 2: 1, # TCP_KEEPINTVL31 3: 5, # TCP_KEEPCNT32 }33 },34
35 # Performance options36 enable_pipelining=True,37 enable_lua_scripts=True,38
39 # Monitoring40 enable_metrics=True,41 metrics_port=909042)
Type Safety
The SDK includes comprehensive type hints:
Python
1from recall import RecallClient, Memory, Priority2from typing import List, Optional3
4def store_user_preference(5 client: RecallClient,6 user_id: str,7 preference: str,8 priority: Priority = Priority.MEDIUM9) -> Memory:10 """Store a user preference with type safety."""11 return client.add(12 content=preference,13 user_id=user_id,14 priority=priority15 )16
17def get_preferences(18 client: RecallClient,19 user_id: str,20 category: Optional[str] = None21) -> List[Memory]:22 """Retrieve user preferences with filtering."""23 filters = {"category": category} if category else None24 return client.search(25 query="preferences",26 user_id=user_id,27 filters=filters28 )
Async Support
AsyncRecallClient
Python
1import asyncio2from recall import AsyncRecallClient3
4async def main():5 # Initialize async client6 client = AsyncRecallClient(7 redis_url="redis://localhost:6379",8 mem0_api_key="your-api-key"9 )10
11 # Async operations12 memory = await client.add(13 content="Async memory operation",14 user_id="user_123"15 )16
17 # Concurrent operations18 tasks = []19 for i in range(10):20 task = client.add(21 content=f"Memory {i}",22 user_id="user_123"23 )24 tasks.append(task)25
26 memories = await asyncio.gather(*tasks)27 print(f"Created {len(memories)} memories concurrently")28
29 # Async context manager30 async with AsyncRecallClient() as client:31 await client.add(content="Auto-cleanup", user_id="user_123")32
33asyncio.run(main())
Async Streaming
Python
1import asyncio2from recall import AsyncRecallClient3
4async def stream_memories():5 client = AsyncRecallClient()6
7 # Stream search results8 async for memory in client.stream_search(9 query="user interactions",10 user_id="user_123",11 batch_size=1012 ):13 print(f"Processing: {memory.content}")14 # Process each memory as it arrives15
16 # Stream all memories17 async for batch in client.stream_all(18 user_id="user_123",19 batch_size=5020 ):21 print(f"Batch of {len(batch)} memories")22 # Process batch23
24asyncio.run(stream_memories())
Context Managers
Automatic Resource Management
Python
1from recall import RecallClient2
3# Automatic cleanup with context manager4with RecallClient() as client:5 client.add(content="Memory", user_id="user_123")6 # Connection automatically closed7
8# Transaction support9with client.transaction() as tx:10 tx.add(content="Memory 1", user_id="user_123")11 tx.add(content="Memory 2", user_id="user_123")12 # Atomic commit or rollback
Decorators
Caching Decorator
Python
1from recall.decorators import recall_cache2from recall import RecallClient3
4client = RecallClient()5
6@recall_cache(client, ttl=3600)7def expensive_computation(user_id: str, query: str):8 """This function's results will be cached."""9 # Expensive operation10 return complex_calculation(user_id, query)11
12# First call: computes and caches13result = expensive_computation("user_123", "data")14
15# Second call: returns from cache16result = expensive_computation("user_123", "data")
Memory Decorator
Python
1from recall.decorators import remember2from recall import RecallClient3
4client = RecallClient()5
6@remember(client, priority="high")7def user_action(user_id: str, action: str):8 """Automatically stores function calls as memories."""9 # Perform action10 return f"Completed {action}"11
12# Function call is automatically remembered13result = user_action("user_123", "changed settings")14# Memory stored: "user_123 performed: changed settings"
Data Models
Memory Model
Python
1from recall.models import Memory, Priority2from datetime import datetime3
4# Create a memory object5memory = Memory(6 id="mem_123",7 content="User preference",8 user_id="user_123",9 priority=Priority.HIGH,10 created_at=datetime.now(),11 metadata={12 "category": "preferences",13 "source": "settings"14 }15)16
17# Access properties18print(memory.content)19print(memory.priority.value) # "high"20print(memory.age_seconds)21print(memory.to_dict())
Batch Operations
Python
1from recall.models import MemoryBatch2
3# Create batch4batch = MemoryBatch()5batch.add(content="Memory 1", user_id="user_123")6batch.add(content="Memory 2", user_id="user_123")7batch.add(content="Memory 3", user_id="user_123")8
9# Execute batch10results = client.add_batch(batch)11
12# Batch with validation13batch = MemoryBatch(validate=True, max_size=100)14try:15 batch.add(content="", user_id="") # Raises ValidationError16except ValidationError as e:17 print(f"Invalid memory: {e}")
Serialization
Custom Serializers
Python
1from recall import RecallClient2from recall.serializers import (3 JSONSerializer,4 MessagePackSerializer,5 PickleSerializer,6 ProtobufSerializer7)8
9# JSON (default)10client = RecallClient(serializer=JSONSerializer())11
12# MessagePack (faster, smaller)13client = RecallClient(serializer=MessagePackSerializer())14
15# Pickle (Python objects)16client = RecallClient(serializer=PickleSerializer())17
18# Protocol Buffers19client = RecallClient(serializer=ProtobufSerializer())20
21# Custom serializer22class CustomSerializer:23 def serialize(self, obj):24 # Custom serialization logic25 return custom_encode(obj)26
27 def deserialize(self, data):28 # Custom deserialization logic29 return custom_decode(data)30
31client = RecallClient(serializer=CustomSerializer())
Middleware
Request Middleware
Python
1from recall import RecallClient2from recall.middleware import Middleware3
4class LoggingMiddleware(Middleware):5 def before_request(self, method, *args, **kwargs):6 print(f"Calling {method} with args={args}, kwargs={kwargs}")7
8 def after_request(self, method, result):9 print(f"{method} returned {result}")10 return result11
12 def on_error(self, method, error):13 print(f"{method} failed with {error}")14 raise error15
16# Add middleware17client = RecallClient()18client.add_middleware(LoggingMiddleware())19
20# All requests now logged21client.add(content="Test", user_id="user_123")
Built-in Middleware
Python
1from recall.middleware import (2 RetryMiddleware,3 RateLimitMiddleware,4 MetricsMiddleware,5 CacheMiddleware6)7
8client = RecallClient()9
10# Add retry logic11client.add_middleware(RetryMiddleware(max_retries=3))12
13# Add rate limiting14client.add_middleware(RateLimitMiddleware(15 max_requests=100,16 window_seconds=6017))18
19# Add metrics collection20client.add_middleware(MetricsMiddleware(21 prometheus_port=909022))
Testing
Mock Client
Python
1from recall.testing import MockRecallClient2import pytest3
4@pytest.fixture5def recall_client():6 """Fixture providing mock client for tests."""7 return MockRecallClient()8
9def test_memory_storage(recall_client):10 # Add memory11 memory = recall_client.add(12 content="Test memory",13 user_id="test_user"14 )15 assert memory.id.startswith("mem_")16
17 # Verify storage18 assert recall_client.call_count("add") == 119 assert recall_client.last_call("add").content == "Test memory"20
21 # Search22 results = recall_client.search(23 query="test",24 user_id="test_user"25 )26 assert len(results) == 1
Test Utilities
Python
1from recall.testing import (2 create_test_memory,3 populate_test_data,4 assert_memory_equal5)6
7# Create test data8memory = create_test_memory(9 content="Test content",10 priority="high"11)12
13# Populate with sample data14client = RecallClient()15populate_test_data(client, user_id="test_user", count=100)16
17# Assert memories are equal18assert_memory_equal(memory1, memory2)
Monitoring
Metrics Collection
Python
1from recall import RecallClient2from recall.monitoring import MetricsCollector3
4# Enable metrics5client = RecallClient(enable_metrics=True)6
7# Access metrics8metrics = client.get_metrics()9print(f"Total operations: {metrics.total_operations}")10print(f"Cache hit rate: {metrics.cache_hit_rate:.2%}")11print(f"Average latency: {metrics.avg_latency_ms:.2f}ms")12
13# Export to Prometheus14client.export_metrics_prometheus(port=9090)15
16# Export to StatsD17client.export_metrics_statsd(18 host="localhost",19 port=8125,20 prefix="recall"21)
Logging
Python
1import logging2from recall import RecallClient3
4# Configure logging5logging.basicConfig(level=logging.DEBUG)6
7# Client with debug logging8client = RecallClient(debug=True)9
10# Custom logger11logger = logging.getLogger("my_app")12client = RecallClient(logger=logger)13
14# Log levels15client.set_log_level(logging.WARNING)
Error Handling
Exception Hierarchy
Python
1from recall.exceptions import (2 RecallError,3 ConnectionError,4 AuthenticationError,5 ValidationError,6 CacheError,7 SyncError,8 RateLimitError,9 TimeoutError10)11
12try:13 client.add(content="", user_id="")14except ValidationError as e:15 # Handle validation errors16 print(f"Invalid input: {e.field} - {e.message}")17except ConnectionError as e:18 # Handle connection issues19 print(f"Connection failed: {e.service} - {e.message}")20except RecallError as e:21 # Catch all Recall errors22 print(f"Error: {e}")
Retry Logic
Python
1from recall import RecallClient2from recall.retry import exponential_backoff3
4client = RecallClient(5 retry_config={6 "max_attempts": 3,7 "backoff": exponential_backoff(base=2, max_delay=30),8 "retry_on": [ConnectionError, TimeoutError]9 }10)11
12# Operations automatically retry on failure13memory = client.add(content="Important", user_id="user_123")
CLI Usage
The SDK includes a CLI tool:
Bash
1# Check status2recall status3
4# Add memory5recall add "User preference" --user-id user_123 --priority high6
7# Search memories8recall search "preferences" --user-id user_123 --limit 109
10# Get statistics11recall stats12
13# Clear cache14recall cache clear --user-id user_12315
16# Export data17recall export --format json --output memories.json18
19# Import data20recall import memories.json
Best Practices
Connection Pooling
Python
1from recall import RecallClient2from redis.connection import ConnectionPool3
4# Share connection pool across clients5pool = ConnectionPool(6 host='localhost',7 port=6379,8 max_connections=509)10
11client1 = RecallClient(redis_connection_pool=pool)12client2 = RecallClient(redis_connection_pool=pool)
Memory Management
Python
1# Use appropriate priority levels2client.add(3 content="Critical user data",4 user_id="user_123",5 priority="critical" # Never evicted6)7
8client.add(9 content="Temporary preference",10 user_id="user_123",11 priority="low", # First to evict12 metadata={"expires_at": "2024-12-31"}13)
Performance Optimization
Python
1# Batch operations for better performance2memories = [3 {"content": f"Memory {i}", "user_id": "user_123"}4 for i in range(1000)5]6
7# Slow: Individual calls8for memory in memories:9 client.add(**memory)10
11# Fast: Batch call12client.add_batch(memories)13
14# Pipeline for multiple operations15with client.pipeline() as pipe:16 pipe.add(content="Memory 1", user_id="user_123")17 pipe.add(content="Memory 2", user_id="user_123")18 pipe.search(query="test", user_id="user_123")19 results = pipe.execute()
Migration Guide
From Mem0
Python
1# Before (Mem0)2from mem0 import Memory3m = Memory()4m.add("Memory content", user_id="user_123")5
6# After (Recall)7from recall import RecallClient8client = RecallClient()9client.add(content="Memory content", user_id="user_123")
From Redis
Python
1# Before (Redis)2import redis3r = redis.Redis()4r.set("user:123:pref", "dark_mode")5
6# After (Recall)7from recall import RecallClient8client = RecallClient()9client.add(10 content="Prefers dark mode",11 user_id="user_123",12 metadata={"key": "user:123:pref"}13)
Next Steps
- Review TypeScript SDK for Node.js applications
- Explore API Reference for detailed method documentation
- Check Examples for real-world use cases