Ray core
这篇博客将介绍Ray core,这是一个强大的分布式计算框架,其中提供了一些比较重要的原语,比如tasks, actors, and objects来构建和scale分布式应用。
ray引入了针对高性能的workloads,尤其是多个GPU,实验性的api--compiled graph.
快速开始
使用pip install -U ray
即可安装。第一步是导入和初始化ray:
import ray
ray.init()
注意在最近的版本,ray.init()
会在ray remote api的第一次使用时自动调用。
运行一个task
@ray.remote
def square(x):return x * xfutures = [square.remote(i) for i in range(4)]print(ray.get(futures))
calling an actor
task是无状态的,ray actor可以使得我们创建有状态的workers,在方法调用时维护内部状态;当你启动一个ray actor时:
- ray会在集群上启动一个专门的worker进程
- actor的方法在那个特定的worker上运行,并且可以访问和改变它的状态
- actor顺序执行方法调用,来保持一致性
一个简单例子如下:
@ray.remote
class Counter:def __init__(self):self.i = 0def get(self):return self.idef incr(self, value):self.i += valuec = Counter.remote()for _ in range(10):c.incr.remote(1)print(ray.get(c.get.remote()))
传递对象
ray的分布式对象存储高效管理在集群中的数据;有三种主要的方式:
- 隐式创建: 当tasks和actors返回values时,它们将自动存储在ray的分布式对象存储上,返回之后可以被retrive的对象引用
- 显式创建: 使用
ray.put()
直接将对象放入store中 - 传递引用: 可以想其他tasks和actors传递对象引用,避免不必要的拷贝,并且能lazy exec.
import numpy as np@ray.remote
def sum_matrix(matrix):return np.sum(matrix)print(ray.get(sum_matrix.remote(np.ones((100, 100)))))matrix_ref = ray.put(np.ones((1000, 1000)))
print(ray.get(sum_matrix.remote(matrix_ref)))
核心概念
Tasks
类似函数
actors
类似class
objects
tasks和actors都在objects上操作;我们称这些objs为remote objs,它们可以存储在ray cluster上的任何地方,我们可以用obj ref来引用它们。远程对象将在ray的分布式 shared mem的object store上cache,并且集群中的每个节点都只有一个obj store. 在集群配置时,一个远程对象可以在一个或多个nodes上存在,这与谁持有这个对象的ref是无关的。
placement groups
这使得用户可以自动在多个节点维护资源的group。它们之后可以用来调度ray的tasks和actors,尽可能近来利用局部性(PACK)。
环境依赖
当ray在远程机器上执行任务和actors时,它们对于所要执行代码的环境依赖必须是可满足的,有两种方法解决: 1. 提前准备好依赖 2. 使用ray的运行时环境on-the-fly地安装它们。