The Starting Point: A Robust Central Manager
Any distributed system needs a way to share state. Our initial design, `MwManager`, was created to provide robust, namespaced access to two shared Ray actors across a cluster: `MwStore` (for frequency counting and miss tracking) and `SharedCache` (a simple key/value store).
Key features of this initial version included:
- Robust Ray Bootstrap: Ensured Ray was initialized safely with the correct namespace.
- Resilient Actor Management: Handled actor lookups with retries and exponential backoff, and even allowed for auto-creation of missing actors.
- Async Compatibility: Used a threadpool to prevent `ray.get()` from blocking asynchronous functions.
- Centralized Logic: `MwStore` tracked both item frequency and misses, eliminating the need for a separate `MissTracker`.
In plain English, this was a smart, fault-tolerant cluster memory layer. It was a solid foundation, but as we looked towards massive scale—from 1,000 to 1,000,000 nodes—a critical bottleneck became obvious.
The Scaling Bottleneck: The Hotspot Problem
A single, named `SharedCache` actor will inevitably become a performance hotspot long before you reach 1,000 nodes, let alone a million. Every agent in the cluster trying to read from or write to the same actor creates a massive queue. The Global Cluster Service (GCS) would melt under the pressure of lookups, and the actor itself would be overwhelmed.
The challenge wasn't just about storing data; it was about distributing the *load* of accessing that data.
A Battle-Tested Architecture for Massive Scale
To solve the hotspot problem, we redesigned the cache with a battle-tested, hierarchical, and sharded approach. The core principle is simple: no single point of failure or contention.
1. Hierarchical Caching (L0, L1, L2)
We introduced multiple layers of caching to serve requests as close to the client as possible:
- L0 (Process): The existing in-memory dictionary (`MwManager._cache`) for the fastest possible access within a single process.
- L1 (Per-Node): A new `NodeCache` actor pinned to each Ray node using `NodeAffinitySchedulingStrategy`. This intercepts requests on the local node, avoiding network hops for frequently accessed data.
- L2 (Global, Sharded): The single `SharedCache` is replaced by an array of `SharedCacheShard` actors. There is no central router; clients determine the correct shard directly.
2. Client-Side Sharding with Consistent Hashing
To avoid a router bottleneck, the client is responsible for choosing the correct L2 shard. We use a consistent hashing ring (`_HashRing`) to map a given key to a specific shard actor. This distributes keys evenly and allows for dynamic scaling of the shard pool.
# Client determines the correct shard handle directly
# This avoids any central routing bottleneck.
_RING, _SHARDS = _get_shard_handles()
def _shard_for(key:str):
nm = _RING.node_for(key)
return _SHARDS[nm]
# Usage in a write operation:
def set_global_item(self, item_id:str, value:Any):
ref = ray.put(value)
shard = _shard_for(item_id) # Client-side hashing
shard.set_ref.remote(self._global_key(item_id), ref)
3. High-Concurrency Async Actors and ObjectRefs
The cache actors (`NodeCache`, `SharedCacheShard`) are designed to be highly concurrent and non-blocking.
- Async Methods: All actor methods like `get` and `set` are `async def`.
- High Concurrency: Actors are decorated with `@ray.remote(max_concurrency=2000)` to handle thousands of parallel requests.
- Storing ObjectRefs: Instead of serializing large data blobs into the cache, we `ray.put(value)` and store the resulting `ObjectRef`. This allows callers to fetch the data via Ray's zero-copy read paths, dramatically reducing serialization overhead.
@ray.remote(num_cpus=0, max_concurrency=2000)
class SharedCacheShard:
def __init__(self, max_bytes:int=2_000_000_000, default_ttl_s:int=3600):
self._map = {} # key -> (ObjectRef, expire_ts, size)
self._lru = OrderedDict()
self._bytes = 0
self._max_bytes = max_bytes
# ... LRU and TTL eviction logic ...
async def get(self, key:str):
rec = self._map.get(key)
if not rec: return None
ref, exp, size = rec
# ... TTL check ...
return ref # Return the ObjectRef, not the value
async def set_ref(self, key:str, ref, ttl_s:int=None, size:int=None):
# ... Store the ObjectRef with metadata ...
self._map[key] = (ref, exp, size or 0)
await self._evict_if_needed()
4. Observability is Key
At scale, you can't fly blind. This architecture is designed for observability with metrics for hits/misses, latency, memory usage per shard, and error rates, all exportable to Prometheus.
The Next Frontier: Semantic Caching
Our sharded cache is now incredibly fast and scalable for literal key lookups. But what if the keys aren't identical, but mean the same thing?
An agent setting a key "I would like a cup of ice/hot coffee" should create a cache hit for another agent looking up "I would like a cup of coffee."
This requires a semantic layer. Instead of matching strings, we need to match intent. The standard approach is to use vector embeddings. Each string is converted into a vector, and lookups are performed via an Approximate Nearest Neighbor (ANN) search to find semantically similar entries.
The Final Architecture: A Hybrid Approach
We faced a classic "build vs. buy" decision. Do we build our own ANN indexing inside each cache shard, or integrate a specialized tool? We decided on a hybrid approach that leverages the best of both worlds.
Build the Infrastructure, Integrate the Intelligence
The optimal solution is to use our custom, high-performance Ray cache for what it does best—scale and speed—while integrating a dedicated AI memory service like Mem0 for semantic understanding.
- Cluster Cache (Our Ray Actors): The L0/L1/L2 hierarchical cache remains the fast path for all exact-match lookups. It's optimized for massive scale, low latency, and efficient Ray integration.
- Semantic Memory (Mem0): Mem0 acts as the intelligent, persistent memory layer. It handles embedding generation, semantic retrieval, memory consolidation, and reasoning.
- The Glue: When a request flows through our cache and results in a final L2 miss, we make a fallback call to Mem0's semantic search API. If a relevant memory is found, we populate it back into our L2 cache for fast, repeated access.
This layered model gives us:
- Ray-Native Performance: The vast majority of hits are served by our hyper-optimized, distributed cache.
- Rich Semantic Intelligence: We get powerful semantic matching and memory persistence from Mem0 without having to reinvent the wheel.
- Clean Separation of Concerns: Our infrastructure team focuses on scaling Ray, while the agent logic team leverages a powerful memory API.
Final Takeaway
Building a memory system for a million-node AI platform is a journey. It starts with a robust but centralized manager, evolves into a decentralized, sharded system to handle load, and finally incorporates a specialized semantic layer to provide true intelligence. By building a high-performance cache in Ray and integrating it with a service like Mem0, you get the best of both worlds: unparalleled scale and sophisticated semantic understanding.