Docs/Getting Started

Python SDK

The official Python SDK for Recall provides a powerful, type-safe interface to the hybrid memory system.

Installation

Bash
1pip install recall-memory

Requirements

  • Python 3.8 or higher
  • Redis 6.0+ (local or cloud)
  • Mem0 API key

Quick Start

Python
1from recall import RecallClient
2
3# Initialize the client
4client = RecallClient(
5 redis_url="redis://localhost:6379",
6 mem0_api_key="your-api-key"
7)
8
9# Store a memory
10memory = client.add(
11 content="User prefers Python for data science",
12 user_id="user_123",
13 priority="high"
14)
15
16# Search memories
17results = client.search(
18 query="programming preferences",
19 user_id="user_123"
20)

Client Configuration

Environment Variables

The SDK automatically reads from environment variables:

Python
1import os
2from recall import RecallClient
3
4# Set environment variables
5os.environ["RECALL_REDIS_URL"] = "redis://localhost:6379"
6os.environ["RECALL_MEM0_API_KEY"] = "your-api-key"
7os.environ["RECALL_ENVIRONMENT"] = "production"
8
9# Client auto-configures from environment
10client = RecallClient()

Configuration File

Load configuration from YAML or JSON:

Python
1from recall import RecallClient
2
3# From YAML file
4client = RecallClient.from_config("recall.yaml")
5
6# From JSON file
7client = RecallClient.from_config("recall.json")
8
9# From dictionary
10config = {
11 "redis": {"url": "redis://localhost:6379"},
12 "mem0": {"api_key": "your-api-key"},
13 "cache": {"ttl": 3600}
14}
15client = RecallClient.from_dict(config)

Advanced Configuration

Python
1from recall import RecallClient, CacheConfig, SyncConfig
2from recall.serializers import MessagePackSerializer
3
4client = RecallClient(
5 redis_url="redis://localhost:6379",
6 mem0_api_key="your-api-key",
7
8 # Cache configuration
9 cache_config=CacheConfig(
10 ttl=3600,
11 max_memory="1gb",
12 eviction_policy="allkeys-lru",
13 compression=True,
14 serializer=MessagePackSerializer()
15 ),
16
17 # Sync configuration
18 sync_config=SyncConfig(
19 mode="lazy",
20 batch_size=100,
21 interval=60
22 ),
23
24 # Connection configuration
25 redis_connection_pool_kwargs={
26 "max_connections": 50,
27 "socket_keepalive": True,
28 "socket_keepalive_options": {
29 1: 1, # TCP_KEEPIDLE
30 2: 1, # TCP_KEEPINTVL
31 3: 5, # TCP_KEEPCNT
32 }
33 },
34
35 # Performance options
36 enable_pipelining=True,
37 enable_lua_scripts=True,
38
39 # Monitoring
40 enable_metrics=True,
41 metrics_port=9090
42)

Type Safety

The SDK includes comprehensive type hints:

Python
1from recall import RecallClient, Memory, Priority
2from typing import List, Optional
3
4def store_user_preference(
5 client: RecallClient,
6 user_id: str,
7 preference: str,
8 priority: Priority = Priority.MEDIUM
9) -> Memory:
10 """Store a user preference with type safety."""
11 return client.add(
12 content=preference,
13 user_id=user_id,
14 priority=priority
15 )
16
17def get_preferences(
18 client: RecallClient,
19 user_id: str,
20 category: Optional[str] = None
21) -> List[Memory]:
22 """Retrieve user preferences with filtering."""
23 filters = {"category": category} if category else None
24 return client.search(
25 query="preferences",
26 user_id=user_id,
27 filters=filters
28 )

Async Support

AsyncRecallClient

Python
1import asyncio
2from recall import AsyncRecallClient
3
4async def main():
5 # Initialize async client
6 client = AsyncRecallClient(
7 redis_url="redis://localhost:6379",
8 mem0_api_key="your-api-key"
9 )
10
11 # Async operations
12 memory = await client.add(
13 content="Async memory operation",
14 user_id="user_123"
15 )
16
17 # Concurrent operations
18 tasks = []
19 for i in range(10):
20 task = client.add(
21 content=f"Memory {i}",
22 user_id="user_123"
23 )
24 tasks.append(task)
25
26 memories = await asyncio.gather(*tasks)
27 print(f"Created {len(memories)} memories concurrently")
28
29 # Async context manager
30 async with AsyncRecallClient() as client:
31 await client.add(content="Auto-cleanup", user_id="user_123")
32
33asyncio.run(main())

Async Streaming

Python
1import asyncio
2from recall import AsyncRecallClient
3
4async def stream_memories():
5 client = AsyncRecallClient()
6
7 # Stream search results
8 async for memory in client.stream_search(
9 query="user interactions",
10 user_id="user_123",
11 batch_size=10
12 ):
13 print(f"Processing: {memory.content}")
14 # Process each memory as it arrives
15
16 # Stream all memories
17 async for batch in client.stream_all(
18 user_id="user_123",
19 batch_size=50
20 ):
21 print(f"Batch of {len(batch)} memories")
22 # Process batch
23
24asyncio.run(stream_memories())

Context Managers

Automatic Resource Management

Python
1from recall import RecallClient
2
3# Automatic cleanup with context manager
4with RecallClient() as client:
5 client.add(content="Memory", user_id="user_123")
6 # Connection automatically closed
7
8# Transaction support
9with client.transaction() as tx:
10 tx.add(content="Memory 1", user_id="user_123")
11 tx.add(content="Memory 2", user_id="user_123")
12 # Atomic commit or rollback

Decorators

Caching Decorator

Python
1from recall.decorators import recall_cache
2from recall import RecallClient
3
4client = RecallClient()
5
6@recall_cache(client, ttl=3600)
7def expensive_computation(user_id: str, query: str):
8 """This function's results will be cached."""
9 # Expensive operation
10 return complex_calculation(user_id, query)
11
12# First call: computes and caches
13result = expensive_computation("user_123", "data")
14
15# Second call: returns from cache
16result = expensive_computation("user_123", "data")

Memory Decorator

Python
1from recall.decorators import remember
2from recall import RecallClient
3
4client = RecallClient()
5
6@remember(client, priority="high")
7def user_action(user_id: str, action: str):
8 """Automatically stores function calls as memories."""
9 # Perform action
10 return f"Completed {action}"
11
12# Function call is automatically remembered
13result = user_action("user_123", "changed settings")
14# Memory stored: "user_123 performed: changed settings"

Data Models

Memory Model

Python
1from recall.models import Memory, Priority
2from datetime import datetime
3
4# Create a memory object
5memory = Memory(
6 id="mem_123",
7 content="User preference",
8 user_id="user_123",
9 priority=Priority.HIGH,
10 created_at=datetime.now(),
11 metadata={
12 "category": "preferences",
13 "source": "settings"
14 }
15)
16
17# Access properties
18print(memory.content)
19print(memory.priority.value) # "high"
20print(memory.age_seconds)
21print(memory.to_dict())

Batch Operations

Python
1from recall.models import MemoryBatch
2
3# Create batch
4batch = MemoryBatch()
5batch.add(content="Memory 1", user_id="user_123")
6batch.add(content="Memory 2", user_id="user_123")
7batch.add(content="Memory 3", user_id="user_123")
8
9# Execute batch
10results = client.add_batch(batch)
11
12# Batch with validation
13batch = MemoryBatch(validate=True, max_size=100)
14try:
15 batch.add(content="", user_id="") # Raises ValidationError
16except ValidationError as e:
17 print(f"Invalid memory: {e}")

Serialization

Custom Serializers

Python
1from recall import RecallClient
2from recall.serializers import (
3 JSONSerializer,
4 MessagePackSerializer,
5 PickleSerializer,
6 ProtobufSerializer
7)
8
9# JSON (default)
10client = RecallClient(serializer=JSONSerializer())
11
12# MessagePack (faster, smaller)
13client = RecallClient(serializer=MessagePackSerializer())
14
15# Pickle (Python objects)
16client = RecallClient(serializer=PickleSerializer())
17
18# Protocol Buffers
19client = RecallClient(serializer=ProtobufSerializer())
20
21# Custom serializer
22class CustomSerializer:
23 def serialize(self, obj):
24 # Custom serialization logic
25 return custom_encode(obj)
26
27 def deserialize(self, data):
28 # Custom deserialization logic
29 return custom_decode(data)
30
31client = RecallClient(serializer=CustomSerializer())

Middleware

Request Middleware

Python
1from recall import RecallClient
2from recall.middleware import Middleware
3
4class LoggingMiddleware(Middleware):
5 def before_request(self, method, *args, **kwargs):
6 print(f"Calling {method} with args={args}, kwargs={kwargs}")
7
8 def after_request(self, method, result):
9 print(f"{method} returned {result}")
10 return result
11
12 def on_error(self, method, error):
13 print(f"{method} failed with {error}")
14 raise error
15
16# Add middleware
17client = RecallClient()
18client.add_middleware(LoggingMiddleware())
19
20# All requests now logged
21client.add(content="Test", user_id="user_123")

Built-in Middleware

Python
1from recall.middleware import (
2 RetryMiddleware,
3 RateLimitMiddleware,
4 MetricsMiddleware,
5 CacheMiddleware
6)
7
8client = RecallClient()
9
10# Add retry logic
11client.add_middleware(RetryMiddleware(max_retries=3))
12
13# Add rate limiting
14client.add_middleware(RateLimitMiddleware(
15 max_requests=100,
16 window_seconds=60
17))
18
19# Add metrics collection
20client.add_middleware(MetricsMiddleware(
21 prometheus_port=9090
22))

Testing

Mock Client

Python
1from recall.testing import MockRecallClient
2import pytest
3
4@pytest.fixture
5def recall_client():
6 """Fixture providing mock client for tests."""
7 return MockRecallClient()
8
9def test_memory_storage(recall_client):
10 # Add memory
11 memory = recall_client.add(
12 content="Test memory",
13 user_id="test_user"
14 )
15 assert memory.id.startswith("mem_")
16
17 # Verify storage
18 assert recall_client.call_count("add") == 1
19 assert recall_client.last_call("add").content == "Test memory"
20
21 # Search
22 results = recall_client.search(
23 query="test",
24 user_id="test_user"
25 )
26 assert len(results) == 1

Test Utilities

Python
1from recall.testing import (
2 create_test_memory,
3 populate_test_data,
4 assert_memory_equal
5)
6
7# Create test data
8memory = create_test_memory(
9 content="Test content",
10 priority="high"
11)
12
13# Populate with sample data
14client = RecallClient()
15populate_test_data(client, user_id="test_user", count=100)
16
17# Assert memories are equal
18assert_memory_equal(memory1, memory2)

Monitoring

Metrics Collection

Python
1from recall import RecallClient
2from recall.monitoring import MetricsCollector
3
4# Enable metrics
5client = RecallClient(enable_metrics=True)
6
7# Access metrics
8metrics = client.get_metrics()
9print(f"Total operations: {metrics.total_operations}")
10print(f"Cache hit rate: {metrics.cache_hit_rate:.2%}")
11print(f"Average latency: {metrics.avg_latency_ms:.2f}ms")
12
13# Export to Prometheus
14client.export_metrics_prometheus(port=9090)
15
16# Export to StatsD
17client.export_metrics_statsd(
18 host="localhost",
19 port=8125,
20 prefix="recall"
21)

Logging

Python
1import logging
2from recall import RecallClient
3
4# Configure logging
5logging.basicConfig(level=logging.DEBUG)
6
7# Client with debug logging
8client = RecallClient(debug=True)
9
10# Custom logger
11logger = logging.getLogger("my_app")
12client = RecallClient(logger=logger)
13
14# Log levels
15client.set_log_level(logging.WARNING)

Error Handling

Exception Hierarchy

Python
1from recall.exceptions import (
2 RecallError,
3 ConnectionError,
4 AuthenticationError,
5 ValidationError,
6 CacheError,
7 SyncError,
8 RateLimitError,
9 TimeoutError
10)
11
12try:
13 client.add(content="", user_id="")
14except ValidationError as e:
15 # Handle validation errors
16 print(f"Invalid input: {e.field} - {e.message}")
17except ConnectionError as e:
18 # Handle connection issues
19 print(f"Connection failed: {e.service} - {e.message}")
20except RecallError as e:
21 # Catch all Recall errors
22 print(f"Error: {e}")

Retry Logic

Python
1from recall import RecallClient
2from recall.retry import exponential_backoff
3
4client = RecallClient(
5 retry_config={
6 "max_attempts": 3,
7 "backoff": exponential_backoff(base=2, max_delay=30),
8 "retry_on": [ConnectionError, TimeoutError]
9 }
10)
11
12# Operations automatically retry on failure
13memory = client.add(content="Important", user_id="user_123")

CLI Usage

The SDK includes a CLI tool:

Bash
1# Check status
2recall status
3
4# Add memory
5recall add "User preference" --user-id user_123 --priority high
6
7# Search memories
8recall search "preferences" --user-id user_123 --limit 10
9
10# Get statistics
11recall stats
12
13# Clear cache
14recall cache clear --user-id user_123
15
16# Export data
17recall export --format json --output memories.json
18
19# Import data
20recall import memories.json

Best Practices

Connection Pooling

Python
1from recall import RecallClient
2from redis.connection import ConnectionPool
3
4# Share connection pool across clients
5pool = ConnectionPool(
6 host='localhost',
7 port=6379,
8 max_connections=50
9)
10
11client1 = RecallClient(redis_connection_pool=pool)
12client2 = RecallClient(redis_connection_pool=pool)

Memory Management

Python
1# Use appropriate priority levels
2client.add(
3 content="Critical user data",
4 user_id="user_123",
5 priority="critical" # Never evicted
6)
7
8client.add(
9 content="Temporary preference",
10 user_id="user_123",
11 priority="low", # First to evict
12 metadata={"expires_at": "2024-12-31"}
13)

Performance Optimization

Python
1# Batch operations for better performance
2memories = [
3 {"content": f"Memory {i}", "user_id": "user_123"}
4 for i in range(1000)
5]
6
7# Slow: Individual calls
8for memory in memories:
9 client.add(**memory)
10
11# Fast: Batch call
12client.add_batch(memories)
13
14# Pipeline for multiple operations
15with client.pipeline() as pipe:
16 pipe.add(content="Memory 1", user_id="user_123")
17 pipe.add(content="Memory 2", user_id="user_123")
18 pipe.search(query="test", user_id="user_123")
19 results = pipe.execute()

Migration Guide

From Mem0

Python
1# Before (Mem0)
2from mem0 import Memory
3m = Memory()
4m.add("Memory content", user_id="user_123")
5
6# After (Recall)
7from recall import RecallClient
8client = RecallClient()
9client.add(content="Memory content", user_id="user_123")

From Redis

Python
1# Before (Redis)
2import redis
3r = redis.Redis()
4r.set("user:123:pref", "dark_mode")
5
6# After (Recall)
7from recall import RecallClient
8client = RecallClient()
9client.add(
10 content="Prefers dark mode",
11 user_id="user_123",
12 metadata={"key": "user:123:pref"}
13)

Next Steps