Since Ray processes do not share memory space, data transferred between workers and nodes will need to serialized and deserialized. Ray uses the Plasma object store to efficiently transfer objects across different processes and different nodes. Numpy arrays in the object store are shared between workers on the same node (zero-copy deserialization).
Plasma Object Store¶
Plasma is an in-memory object store that is being developed as part of Apache Arrow. Ray uses Plasma to efficiently transfer objects across different processes and different nodes. All objects in Plasma object store are immutable and held in shared memory. This is so that they can be accessed efficiently by many workers on the same node.
Each node has its own object store. When data is put into the object store, it does not get automatically broadcasted to other nodes. Data remains local to the writer until requested by another task or actor on another node.
Objects that are serialized for transfer among Ray processes go through three stages:
1. Serialize using pyarrow: Below is the set of Python objects that Ray can serialize using
- Primitive types: ints, floats, longs, bools, strings, unicode, and numpy arrays.
- Any list, dictionary, or tuple whose elements can be serialized by Ray.
2. ``__dict__`` serialization: If a direct usage of PyArrow is not possible, Ray will recursively extract the object’s
__dict__ and serialize that using pyarrow. This behavior is not correct in all cases.
3. Cloudpickle: Ray falls back to
cloudpickle as a final attempt for serialization. This may be slow.
Ray optimizes for numpy arrays by using the Apache Arrow data format. The numpy array is stored as a read-only object, and all Ray workers on the same node can read the numpy array in the object store without copying (zero-copy reads). Each numpy array object in the worker process holds a pointer to the relevant array held in shared memory. Any writes to the read-only object will require the user to first copy it into the local process memory.
You can often avoid serialization issues by using only native types (e.g., numpy arrays or lists/dicts of numpy arrays and other primitive types), or by using Actors hold objects that cannot be serialized.
Serialization notes and limitations¶
Ray currently handles certain patterns incorrectly, according to Python semantics. For example, a list that contains two copies of the same list will be serialized as if the two lists were distinct.
l1 =  l2 = [l1, l1] l3 = ray.get(ray.put(l2)) assert l2 is l2 assert not l3 is l3
For reasons similar to the above example, we also do not currently handle objects that recursively contain themselves (this may be common in graph-like data structures).
l =  l.append(l) # Try to put this list that recursively contains itself in the object store. ray.put(l)
This will throw an exception with a message like the following.
This object exceeds the maximum recursion depth. It may contain itself recursively.
Whenever possible, use numpy arrays or Python collections of numpy arrays for maximum performance.
Last resort: Custom Serialization¶
If none of these options work, you can try registering a custom serializer.
register_custom_serializer(cls, serializer=None, deserializer=None, use_pickle=False, use_dict=False, local=None, job_id=None, class_id=None)
Registers custom functions for efficient object serialization.
The serializer and deserializer are used when transferring objects of cls across processes and nodes. This can be significantly faster than the Ray default fallbacks. Wraps register_custom_serializer underneath.
use_pickle tells Ray to automatically use cloudpickle for serialization, and use_dict automatically uses cls.__dict__.
When calling this function, you can only provide one of the following:
- serializer and deserializer
- cls (type) – The class that ray should use this custom serializer for.
- serializer – The custom serializer that takes in a cls instance and outputs a serialized representation. use_pickle and use_dict must be False if provided.
- deserializer – The custom deserializer that takes in a serialized representation of the cls and outputs a cls instance. use_pickle and use_dict must be False if provided.
- use_pickle (bool) – If true, objects of this class will be serialized using pickle. Must be False if use_dict is true.
- use_dict (bool) – If true, objects of this class be serialized turning their __dict__ fields into a dictionary. Must be False if use_pickle is true.
- local – Deprecated.
- job_id – Deprecated.
- class_id (str) – Unique ID of the class. Autogenerated if None.
Below is an example of using
import ray ray.init() class Foo(object): def __init__(self, value): self.value = value def custom_serializer(obj): return obj.value def custom_deserializer(value): object = Foo() object.value = value return object ray.register_custom_serializer( Foo, serializer=custom_serializer, deserializer=custom_deserializer) object_id = ray.put(Foo(100)) assert ray.get(object_id).value == 100
If you find cases where Ray serialization doesn’t work or does something unexpected, please let us know so we can fix it.