Ray: A Framework for Scaling Python Applications

Ray is a framework for scaling Python applications. Ray allows you to execute the same Python program that you would run on your laptop on a cluster of computers with minimum effort. All you need is to tell Ray what part of your program needs to be scalable by decorating your Python functions and classes, and Ray takes care of the rest for you. Ray has been used by several companies including OpenAI for building ChatGPT.

Ray provides both task-parallel and actor-based computations. The programming model is as simple and intuitive as it seems it is just an extension to Python. You simply instantiate tasks/actors out of Python functions/classes. By passing around references to these tasks and actors, you can define dependencies between them. Ray provides high flexibility and allows dynamic execution necessary for many ML problems.

There are a lot of educational materials on Ray. Other than resources you can find on Ray Github, you might want to check out Anyscale Academy,  Ray Summit, and Scaling Python with Ray book. In this post, I share a quick overview of Ray programming model, some examples, and my understanding of Ray internals and how it achieves scalability and fault-tolerance. 

What is the deal with Ray?

Using Ray you can scale the same Python program you run on your laptop to run on a cluster of computers in no time; you don't need to worry about managing a cluster, scheduling your tasks, network communications, health checking, tolerating faults, etc. Instead, you simply decorate your Python functions/classes with @ray.remote, and Ray takes care of the rest for you. Considering that many data scientists or ML practitioners are not distributed systems experts, what Ray provides is very valuable. What distinguishes Ray from other data processing frameworks such as Spark is its flexibility, fine-grained control, and dynamic execution among other things. While Spark is very good for data processing and classical machine learning, Ray is better for deep learning and RL applications.

Ray programming model has two main concepts: Tasks and Actors.

Tasks

Tasks are used to perform stateless computation. A task is an execution of a Python function decorated with @ray.remote.

For example, consider the following function that adds two arrays.

@ray.remote
def add_array(arr1: np.array, arr2: np.array) -> np.array:
    return np.add(arr1, arr1)
We can call a remote function, using .remote(). The requested task will be executed somewhere in the cluster picked by Ray. Thus, by simply decorating our function with @ray.remote and calling with .remote(), we can benefit from the processing power of our cluster.

result1 = add_array.remote(arr1, arr2)
The above call is an asynchronous call, and result1 is a future. You can get the value of the object that this future refers to using ray.get()

arr_result = ray.get(result1)
You can pass the future to other tasks as parameters, thereby defining dependency between tasks. For example: 

result2 = add_array.remote(arr3, result1)
In this example, the task that computes result2 depends on the one that computes result1. This ability to define dependencies between tasks using futures is one of the key advantages of Ray.

The .remote() asynchronous API lets us define parallelism. For example, suppose we are given a list of tuples of arrays and we want to compute the sum of two arrays of each tuple. With tasks, we can run this computation in parallel as follows.

sum_list = []
for tuple in list_of_tuples:
    arr1, arr2 = tuple
    sum_list.append(add_array.remote(arr1, arr2))
With the code above, entries of the sum_list will be computed in parallel.

Actors

Actors are used to perform stateful computation. An actor is an instance of a Python class that is decorated with @ray.remote. You can execute the methods of an actor, just like tasks, somewhere in the cluster, except they access the state of the actor, i.e., the class variables.

For example, consider the following class that multiplies the sum of two arrays by a number.

@ray.remote
class AddArray:
    def __init__(self, m: int):
        self.m = m

    def update_m(new_m: int):
        self.m = new_m

    def add_array(arr1: np.array, arr2: np.array) -> np.array:
        return np.multiply(np.add(arr1, arr1), m)
You can instantiate an actor using .remote() and calls its functions.

add_array_actor = AddArray.remote(2)
result3 = add_array_actor.add_array.remote(arr1, arr2)
In add_array, the state of the actor (i.e., the variable m) is used to compute the result. We can mutate this state by calling update_m function of the actor. 

Supporting stateful distributed computation is another key feature of Ray.

Ray Clusters

After installing Ray on your laptop, you can start using it by simply having ray.init in your Python program (see examples below). To run your code on a cluster, however, you should first start a Ray cluster. Deploying a Ray cluster manually on perm is very straightforward. Ray cluster consists of a head node a set of worker nodes. To create a cluster, you first start ray on the head node (via ray start) and then start Ray on worker nodes by giving the address of the head to them. The head node is the node that runs system processes (needed for Ray internals), and usually the driver processes which run the user Python code. 

Starting the head node:

ray start --head --port=6379
Starting a worker node:

ray start --address=<head-node-address:port>
Ray also supports deployment on Kubernetes, AWS, and GCP. Read more on Ray clusters

Examples

Installing Ray on Apple M2

Ray has experimental support for machines running Apple Silicon (such as M1/M2). I was able to install and use Ray on  an Apple M2 following these instructions. Note that Ray clusters are not supported for Windows and OSX.

This is how I installed Ray on M2:

mkdir myRayEnv

cd myRayEnv

wget https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-MacOSX-arm64.sh

bash Miniforge3-MacOSX-arm64.sh

rm Miniforge3-MacOSX-arm64.sh 

source ~/.bash_profile

conda create --name myRayEnv

conda activate myRayEnv

conda install grpcio=1.43.0 -c conda-forge

pip install ray

Parallel Computing

As the first example, let's see how using Ray, we can easily speed up our computation by running our tasks in parallel. Inside myRayEnv folder, I created parallel_computing.py with this content:

import os
import time
import logging
import ray

ray.init(
ignore_reinit_error=True,
logging_level=logging.ERROR,
include_dashboard=False
)

def some_work():
time.sleep(1)
return "some_result"

@ray.remote
def distributed_some_work():
return some_work()


print(f'Number of CPUs: {os.cpu_count()}')


def run_local():
results = [some_work() for _ in range(os.cpu_count())]
return results

start_time = time.time()
print(f'result from local: {run_local()}')
end_time = time.time()
print(f'elapsed time (s): {(end_time - start_time)}')

def run_remote():
results = ray.get([distributed_some_work.remote() for _ in range(os.cpu_count())])
return results

start_time = time.time()
print(f'result from Ray: {run_remote()}')
end_time = time.time()
print(f'elapsed time (s): {(end_time - start_time)}')
Output:

(myRayEnv)$ python3 parallel_computing.py
Number of CPUs:  8
result from local: ['some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result']
elapsed time (s): 8.035311937332153
result from Ray: ['some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result']
elapsed time (s): 1.0391979217529297
As expected the local execution is done serially. Thus, running 8 jobs serially each taking 1s takes 8s. On the other hand, running them as Ray remote task uses 8 CPUs in parallel, so it finishes in 1s.

Fault-tolerance

For the second example, let's see how Ray handles task failures. Note that I changed the logging level to warning to see the task failure warning messages.

import logging
import ray
import sys
import random

ray.init(
ignore_reinit_error=True,
logging_level=logging.WARNING,
include_dashboard=False
)

@ray.remote
def some_work():
if random.randint(0, 2) == 1:
sys.exit(0)
return "some_result"

print(f'result from Ray: {ray.get(some_work.remote())}')


Output (added new lines to the warning message to make it more readable):

(myRayEnv)$ python3 fault-tolerance.py
2023-05-06 21:49:33,121	WARNING worker.py:1986 -- A worker died or was killed while executing a task by an unexpected system error. 
To troubleshoot the problem, check the logs for the dead worker. 
RayTask ID: 1109b4db659eb718fd8ef13a3193fe2d6dfea77701000000 
Worker ID: 1312a49ddc8363414c1395a08ea3596a84c448f537135790228fd217 
Node ID: b854a53730bc9c8f6f546f922f665f1562697319d8cdd69cb97f1a7f 
Worker IP address: 127.0.0.1 
Worker port: 53391 
Worker PID: 5160 
Worker exit type: SYSTEM_ERROR 
Worker exit detail: The leased worker has unrecoverable failure. 
Worker is requested to be destroyed when it is returned.

result from Ray: some_result
So after the first failure of the task, Ray retried it and in the second time successfully finished it.

Ray Internals

Ray has been actively under development since the publication of [1] in 2018. It seems [2] is the most recent design. Despite the changes, Ray remains a distribute future system [3] to utilize the processing power of a cluster of computers. Ray achieves that goal by distributing the computation. Specifically, to execute a task, Ray picks a node in the cluster and sends the function definition to it to execute. Each node maintains objects that works with in its memory. The selected node to execute a task may not have a copy of the object that the task needs. In that case, the object will be copied to the memory of the node. The result of a remote task can be accessed through a distributed future, i.e., a future whose eventual value might be stored on a remote node.

Selecting a node to execute a task is the job of the scheduler. Maintaining the objects in the node memory is the job of the object store. Finally, maintaining the state is the job of the Global Control Store (GCS). Section 4.3 of [1] provides a good example that demonstrates how a remote task is executed on Ray. 
Figure 1. Ray Architecture [1]

Scheduling

By default, Ray tries to schedule a task locally, if the local node has all the objects needed by the task. However, the user can change the default scheduling strategy, for example, to spread the tasks for better performance. 

You can define resource requirements for your task or class with @ray.remote decorator. Note that there are hard requirements, so a node that does not meet the resource requirements of a task, is never selected to run that task.

@ray.remote(num_cpus=2, num_gpus=2)
def func():
Strategies
  • DEFAULT: 
    • First prefers locality: Locality picks a node that has more of the arguments. So if a function needs a and b and b is larger than a, the task will be scheduled on the node hosting b. 
    • If there is no node with locality, Ray ranks nodes based on different aspects such as utilization, and network latency [1]. It picks a random machine out of the top k nodes where k is configurable. 
  • SPREAD:
    • Ignores locality.
    • Tries to maximize the spreading of the workload throughout the cluster.
  • You can also reserve resources for certain tasks using Placement Groups (PGs)
The original Ray [1] used local and global schedulers. That design has changed, and now local schedulers directly schedule tasks on remote machines. The GCS now pulls information about resources on all nodes. The schedulers refer to GCS to access this information to schedule a task [2].

Scalability

The GCS is the only stateful component in Ray; the rest of the components (including the scheduler) are stateless which makes it easy to scale them by simply adding more instances. The GCS itself is sharded, so to scale it, we should add more shards.

What if we have a big object (like a large matrix) that does not fit into the memory of a single node? 
At least at the time of writing [1], distributed objects are not supported by Ray, so you would need to break down large objects in the application layer, i.e., in your Python program. For example, if you want to read a large file that does not fit into your memory, you have to load it to two objects hosted by two different nodes.

Fault-tolerance

All components except GCS are stateless. Thus, to recover from their failure, we can simply spawn a new instance of them.

But what if we lose a node? After failure, the objects in its memory are gone. How does Ray tolerate node failures?
When an object is lost, Ray first tries to find a copy of it on other nodes. In cases where no copy is available, Ray relies on lineage-based fault-tolerance. Specifically, the owner of each object which is the node that created the objects keeps track of how the object is created, and upon failure, it reconstructs the object by re-executing the task that created it. If any of the parameters of the task is also lost, it will be recursively recovered. 

For GCS, the original paper [1] uses one Redis per shard and provides high availability for each shard using chain replication [4]. However, that seems not to be the case anymore [2]. GCS has changed its name to Global Control Service (instead of Store) and now it is maintained on the head node memory. Read more on GCS changes here.

Figure 2. New Ray Architecture [2]

It seems you can still back your Ray cluster by Redis HA. By storing GCS data on Redis, upon failure of the current head node, the new head node can recover the state from Redis. To back Ray with Redis, simply provide Redis cluster information to the Ray head node when starting it.

RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD
Read more here.

Consistency

You can imagine a situation where a single object is passed to more than one task running concurrently. If several tasks update the object, we might have inconsistency and now have to deal with inconsistency challenges. To avoid inconsistency altogether, Ray simply makes all objects immutable. Specifically, having a future (which is basically an object reference), you can read the value of the object using ray.get(future), but there is no ray.set(future) to change the value of the object, so once the object is created, you can only read it. Note that there is future = ray.put(object_value), which allows you to create a brand-new object out of a value and receive its reference/future. 

Inter-Process Communication

Each worker process maintains an in-process store that maintains small objects (<100KB). Larger objects are stored in a distributed in-memory object store. To access an object owned by another process, the object is copied to the in-process store of the requests. Workers on the same node access larger objects from the object store through shared memory. To access a large remote object, the object store copies it from the object store of the owner process to the local object store. Ray relies on gRPC for non-local inter-process communication. Read more here.

Conclusion

Ray is a great tool for ML practitioners to scale their Python applications with minimum effort. With its intuitive and flexible programming model, ML engineers can focus on their core ML problems instead of dealing with the challenges of managing and utilizing a distributed system. Ray design has been evolving since its initial introduction, but the overall goal which is to be a distribute future system remains the same. What I love about Ray is its simplicity, intuitive programming model, and its integration with Python. 

References

[1] Moritz, P., Nishihara, R., Wang, S., Tumanov, A., Liawsdfh, R., Liang, E., Elibol, M., Yang, Z., Paul, W., Jordan, M.I. and Stoica, I., 2018. Ray: A distributed framework for emerging {AI} applications. In 13th {USENIX} Symposium on Operating Systems Design and Implementation ({OSDI} 18) (pp. 561-577).


[3] Wang, S., Liang, E., Oakes, E., Hindman, B., Luan, F.S., Cheng, A. and Stoica, I., 2021, April. Ownership: A Distributed Futures System for Fine-Grained Tasks. In NSDI (pp. 671-686).

[4] Van Renesse, R. and Schneider, F.B., 2004, December. Chain Replication for Supporting High Throughput and Availability. In OSDI (Vol. 4, No. 91–104).

Comments

Popular posts from this blog

In-memory vs. On-disk Databases

ByteGraph: A Graph Database for TikTok

Eventual Consistency and Conflict Resolution - Part 1

Amazon DynamoDB: ACID Transactions using Timestamp Ordering