Since Ray processes do not share memory space, data transferred between workers and nodes will need to serialized and deserialized. Ray uses the Plasma object store to efficiently transfer objects across different processes and different nodes. Numpy arrays in the object store are shared between workers on the same node (zero-copy deserialization).
Plasma Object Store¶
Plasma is an in-memory object store that is being developed as part of Apache Arrow. Ray uses Plasma to efficiently transfer objects across different processes and different nodes. All objects in Plasma object store are immutable and held in shared memory. This is so that they can be accessed efficiently by many workers on the same node.
Each node has its own object store. When data is put into the object store, it does not get automatically broadcasted to other nodes. Data remains local to the writer until requested by another task or actor on another node.
Objects that are serialized for transfer among Ray processes go through three stages:
1. Serialize using pyarrow: Below is the set of Python objects that Ray can serialize using
- Primitive types: ints, floats, longs, bools, strings, unicode, and numpy arrays.
- Any list, dictionary, or tuple whose elements can be serialized by Ray.
2. ``__dict__`` serialization: If a direct usage of PyArrow is not possible, Ray will recursively extract the object’s
__dict__ and serialize that using pyarrow. This behavior is not correct in all cases.
3. Cloudpickle: Ray falls back to
cloudpickle as a final attempt for serialization. This may be slow.
Ray optimizes for numpy arrays by using the Apache Arrow data format. The numpy array is stored as a read-only object, and all Ray workers on the same node can read the numpy array in the object store without copying (zero-copy reads). Each numpy array object in the worker process holds a pointer to the relevant array held in shared memory. Any writes to the read-only object will require the user to first copy it into the local process memory.
You can often avoid serialization issues by using only native types (e.g., numpy arrays or lists/dicts of numpy arrays and other primitive types), or by using Actors hold objects that cannot be serialized.
Serialization notes and limitations¶
Ray currently handles certain patterns incorrectly, according to Python semantics. For example, a list that contains two copies of the same list will be serialized as if the two lists were distinct.
l1 =  l2 = [l1, l1] l3 = ray.get(ray.put(l2)) assert l2 is l2 assert not l3 is l3
For reasons similar to the above example, we also do not currently handle objects that recursively contain themselves (this may be common in graph-like data structures).
l =  l.append(l) # Try to put this list that recursively contains itself in the object store. ray.put(l)
This will throw an exception with a message like the following.
This object exceeds the maximum recursion depth. It may contain itself recursively.
Whenever possible, use numpy arrays or Python collections of numpy arrays for maximum performance.
Last resort: Custom Serialization¶
If none of these options work, you can try registering a custom serializer.
register_custom_serializer(cls, use_pickle=False, use_dict=False, serializer=None, deserializer=None, local=False, job_id=None, class_id=None)
Enable serialization and deserialization for a particular class.
This method runs the register_class function defined below on every worker, which will enable ray to properly serialize and deserialize objects of this class.
- cls (type) – The class that ray should use this custom serializer for.
- use_pickle (bool) – If true, then objects of this class will be serialized using pickle.
- use_dict – If true, then objects of this class be serialized turning their __dict__ fields into a dictionary. Must be False if use_pickle is true.
- serializer – The custom serializer to use. This should be provided if and only if use_pickle and use_dict are False.
- deserializer – The custom deserializer to use. This should be provided if and only if use_pickle and use_dict are False.
- local – True if the serializers should only be registered on the current worker. This should usually be False.
- job_id – ID of the job that we want to register the class for.
- class_id – ID of the class that we are registering. If this is not specified, we will calculate a new one inside the function.
Exception– An exception is raised if pickle=False and the class cannot be efficiently serialized by Ray. This can also raise an exception if use_dict is true and cls is not pickleable.
Below is an example of using
import ray ray.init() class Foo(object): def __init__(self, value): self.value = value def custom_serializer(obj): return obj.value def custom_deserializer(value): object = Foo() object.value = value return object ray.register_custom_serializer( Foo, serializer=custom_serializer, deserializer=custom_deserializer) object_id = ray.put(Foo(100)) assert ray.get(object_id).value == 100
If you find cases where Ray serialization doesn’t work or does something unexpected, please let us know so we can fix it.