C++高品质分布式执行框架——Ray

Ray是UC Berkeley途乐ISELab新生产的高品质分布式执行框架,它拔取了和价值观分布式计算系统分歧的架构和对分布式总结的用空想来安慰自己方式,具有比斯Parker更可以的统计品质。

Ray近日还地处实验室阶段,最新版本为0.2.2版本。即便Ray自称是面向AI应用的分布式计算框架,不过它的架构具有通用的分布式总括抽象。本文对Ray举办简短的牵线,支持我们更快地精晓Ray是哪些,如有描述不当的地方,欢迎不吝指正。

一、不难起先

先是来看一下最简易的Ray程序是何许编写的。

# 导入ray,并初始化执行环境
import ray
ray.init()

# 定义ray remote函数
@ray.remote
def hello():
    return "Hello world !"

# 异步执行remote函数,返回结果id
object_id = hello.remote()

# 同步获取计算结果
hello = ray.get(object_id)

# 输出计算结果
print hello

在Ray里,通过Python注解@ray.remote定义remote函数。使用此注明申明的函数都会自带七个暗中认同的不二法门remote,通过此措施发起的函数调用都以以提交分布式任务的办法异步执行的,函数的再次来到值是二个对象id,使用ray.get停放操作可以联手获取该id对应的靶子。熟谙Java里的Future机制的话对此应当并不不熟悉,可能会有人困惑那和经常的异步函数调用没什么大的区分,可是此间最大的出入是,函数hello是分布式异步执行的。

remote函数是Ray分布式统计抽象中的宗旨概念,通过它开发者拥有了动态定制总结正视(职责DAG)的能力。比如:

@ray.remote
def A():
    return "A"

@ray.remote
def B():
    return "B"

@ray.remote
def C(a, b):
    return "C"

a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)

事例代码中,对函数A、B的调用是一心并行执行的,可是对函数C的调用依赖于A、B函数的归来结果。Ray可以确保函数C须求等待A、B函数的结果的确统计出来后才会执行。要是将函数A、B、C类比为DAG的节点的话,那么DAG的边就是函数C参数对函数A、B统计结果的依靠,自由的函数调用形式允许Ray可以轻易地定制DAG的社团和计算倚重关系。其余,提及一点的是Python的函数可以定义函数具有三个再次来到值,那也使得Python的函数更自然具备了DAG节点多入和多出的特色。

C++ 1

二、系统架构

Ray是利用什么的架构对分布式计算做出如上抽象的呢,一下付出了Ray的系统架构(来自Ray杂谈,参考文献1)。

C++ 2

作为分布式统计系统,Ray照旧依据了赞叹不己的Master-Slave的统筹:Master负责全局协调和情况维护,Slave执行分布式统计义务。可是和传统的分布式统计系统不相同的是,Ray使用了错落职务调度的笔触。在集群安插情势下,Ray运维了以下重点零部件:

  1. GlobalScheduler:Master上运转了二个大局调度器,用于吸纳本地调度器提交的职务,并将义务分发给方便的当地义务调度器执行。
  2. RedisServer:Master上运行了一到多少个RedisServer用于保存分布式任务的处境新闻(ControlState),包含对象机器的映照、任务描述、职责debug消息等。
  3. LocalScheduler:每种Slave上运维了1个本地调度器,用于提交职分到全局调度器,以及分配职务给当下机械的Worker进程。
  4. Worker:各个Slave上能够运行五个Worker进度执行分布式义务,并将总计结果存储到ObjectStore。
  5. ObjectStore:各种Slave上运维了1个ObjectStore存储只读数据对象,Worker可以经过共享内存的方法访问那几个目的数据,那样可以有效地减小内存拷贝和对象系列化开支。ObjectStore底层由Apache
    Arrow完成。
  6. Plasma:每一种Slave上的ObjectStore都由3个名为Plasma的目的管理器进行管制,它可以在Worker访问本地ObjectStore上不设有的长距离数据对象时,主动拉取其他Slave上的对象数据到日前机械。

亟待验证的是,Ray的舆论中提及,全局调度器可以运行一到三个,而如今Ray的落成文档里商讨的内容都以依据2个大局调度器的图景。作者估量大概是Ray尚在建设中,一些体制还未周密,后续读者可以小心此处的底细变化。

Ray的任务也是透过类似Spark中Driver的定义的艺术开展付出的,有所差其他是:

  1. 斯Parker的Driver提交的是天职DAG,一旦付出则不足更改。
  2. 而Ray提交的是更细粒度的remote
    function,任务DAG器重关系由函数看重关系自由定制。

杂谈给出的架构图里没有画出Driver的定义,由此笔者在其基础上做了有的改动和扩大。

C++ 3

Ray的Driver节点和和Slave节点运维的零部件大概一致,不过却有以下分别:

  1. Driver上的做事进度DriverProcess一般唯有3个,即用户运行的PythonShell。Slave可以依照须要创建七个WorkerProcess。
  2. Driver只可以交给职责,却不可以吸纳来自全局调度器分配的义务。Slave可以交到职务,也得以接到全局调度器分配的天职。
  3. Driver可以积极绕过全局调度器给Slave发送Actor调用义务(此处设计是或不是创设尚不琢磨)。Slave只好收取全局调度器分配的持筹握算义务。

三、主旨操作

依照以上架构,大家大致研商一下Ray中第壹的操作和流程。

1. ray.init()

在PythonShell中,使用ray.init()能够在本土运行ray,包蕴Driver、HeadNode(Master)和多少Slave。

import ray
ray.init()

假即便直连已有个别Ray集群,只须求指定RedisServer的地址即可。

ray.init(redis_address="<redis-address>")

地点运营Ray得到的输出如下:

>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>> 

本地运转Ray时,能够看来Ray的WebUI的拜访地址。

2. ray.put()

使用ray.put()可以将Python对象存入本地ObjectStore,并且异步重返三个唯一的ObjectID。通过该ID,Ray可以访问集群中任一个节点上的靶子(远程对象通过翻看Master的对象表拿到)。

目的一旦存入ObjectStore便不可更改,Ray的remote函数可以将一贯将该目的的ID作为参数传入。使用ObjectID作为remote函数参数,能够使得地缩减函数参数的写ObjectStore的次数。

@ray.remote
def f(x):
    pass

x = "hello"

# 对象x往ObjectStore拷贝里10次
[f.remote(x) for _ in range(10)]

# 对象x仅往ObjectStore拷贝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通过ObjectID获取ObjectStore内的目的并将之转换为Python对象。对于数组类型的靶子,Ray使用共享内存机制裁减数量的正片花费。而对此其他对象则要求将数据从ObjectStore拷贝到进度的堆内存中。

设若调用ray.get()操作时,对象尚未创立好,则get操作会阻塞,直到对象创设完结后回去。get操作的机要流程如下:

  1. Driver可能Worker进程首先到ObjectStore内请求ObjectID对应的目的数据。
  2. 一经当地ObjectStore没有对号入座的靶子数据,本地对象管理器Plasma会检讨Master上的对象表查看对象是或不是存储其它节点的ObjectStore。
  3. 设若目的数据在别的节点的ObjectStore内,Plasma会发送互联网请求将对象数据拉到本地ObjectStore。
  4. 如果目标数据还并未成立好,Master会在对象成立完毕后布告请求的Plasma读取。
  5. 一旦目的数据已经被抱有的ObjectStore移除(被LRU策略删除),本地调度器会依照义务血缘关系执行对象的重复创制工作。
  6. 即使指标数据在该地ObjectStore可用,Driver或然Worker进程会通过共享内存的不二法门一贯将对象内存区域映射到温馨的进程地址空间中,并反连串化为Python对象。

另外,ray.get()可以五回性读取三个目的的数码:

result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中应用注脚@ray.remote可以声Bellamy(Bellamy)个remote
function。remote函数时Ray的宗旨任务调度单元,remote函数定义后会立刻被种类化存储到RedisServer中,并且分配了2个唯一的ID,那样就确保了集群的兼具节点都可以看来这一个函数的定义。

C++,但是,那样对remote函数定义有了一个秘密的需求,即remote函数内倘使调用了其余的用户函数,则必须提前定义,否则remote函数无法找到呼应的函数定义内容。

remote函数内也得以调用别的的remote函数,Driver和Slave每一遍调用remote函数时,其实皆以向集群提交了一个计量职务,从这里也得以见见Ray的分布式总括的自由性。

Ray中调用remote函数的紧要流程如下:

  1. 调用remote函数时,首先会创制壹个义务目的,它包罗了函数的ID、参数的ID大概值(Python的主导对象直接传值,复杂对象会先通过ray.put()操作存入ObjectStore然后再次回到ObjectID)、函数重临值对象的ID。
  2. 义务目的被发送到本地调度器。
  3. 本土调度器决定职分目的是在本地调度依然发送给全局调度器。如若义务目标的借助(参数)在当地的ObejctStore已经存在且地面的CPU和GPU总结财富丰盛,那么当地调度器将职责分配给本地的WorkerProcess执行。否则,职分目的被发送给全局调度器并储存到任务表(TaskTable)中,全局调度器依据近期的义务状态新闻决定将任务发给集群中的某三个地面调度器。
  4. 地面调度器收到义务目标后(来自当地的职分照旧全局调度分配的任务),会将其放入二个职务队列中,等待总括财富和地点正视满足后分配给WorkerProcess执行。
  5. Worker收到任务目的后举行该任务,并将函数重临值存入ObjectStore,并创新Master的目的表(ObjectTable)音讯。

@ray.remote诠释有3个参数num_return_vals用于注解remote函数的再次回到值个数,基于此落成remote函数的多再次来到值机制。

@ray.remote(num_return_vals=2)
def f():
    return 1, 2

x_id, y_id = f.remote()
ray.get(x_id)  # 1
ray.get(y_id)  # 2

@ray.remote表明的另三个参数num_gpus可以为天职指定GPU的财富。使用内置函数ray.get_gpu_ids()可以拿到当前职责可以利用的GPU音讯。

@ray.remote(num_gpus=1)
def gpu_method():
    return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操作扶助批量的任务等待,基于此可以达成几回性取得三个ObjectID对应的数据。

# 启动5个remote函数调用任务
results = [f.remote(i) for i in range(5)]
# 阻塞等待4个任务完成,超时时间为2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包罗了五个ObjectID,使用ray.wait操作可以一贯等待有5个任务成功后赶回,并将不辱义务的数目对象放在第二个list类型重返值内,未成功的ObjectID放在第1个list重返值内。固然设置了晚点时间,那么在逾期时间甘休后仍未等到预期的重返值个数,则已逾期落成时的重临值为准。

6. ray.error_info()

使用ray.error_info()可以赢得职责执行时发出的错误新闻。

>>> import time
>>> @ray.remote
>>> def f():
>>>     time.sleep(5)
>>>     raise Exception("This task failed!!")
>>> f.remote()
Remote function __main__.f failed with:

Traceback (most recent call last):
  File "<stdin>", line 4, in f
Exception: This task failed!!


  You can inspect errors by running

      ray.error_info()

  If this driver is hanging, start a new one with

      ray.init(redis_address="127.0.0.1:65452")
>>> ray.error_info()
[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函数只可以处理无状态的测算须要,有情状的计算要求须要利用Ray的Actor完结。在Python的class定义前使用@ray.remote可以注脚Actor。

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

接纳如下形式开创Actor对象。

a1 = Counter.remote()
a2 = Counter.remote()

Ray创设Actor的流程为:

  1. Master接纳二个Slave,并将Actor创设任务分发给它的地方调度器。
  2. 创办Actor对象,并举行它的构造函数。

从流水线可以见见,Actor对象的制造时相互的。

因而调用Actor对象的办法应用Actor。

a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1

调用Actor对象的主意的流水线为:

  1. 首先创立七个义务。
  2. 该职责被Driver直接分配到成立该Actor对应的地点执行器执行,这几个操作绕开了大局调度器(Worker是不是也得以动用Actor直接分配职务尚存疑问)。
  3. 回来Actor方法调用结果的ObjectID。

为了保险Actor状态的一致性,对同一个Actor的措施调用是串行执行的。

四、安装Ray

若果只是利用Ray,可以采用如下命令直接设置。

pip intall ray

假设须要编译Ray的新型源码进行安装,根据如下步骤进行(马克斯OS):

# 更新编译依赖包
brew update
brew install cmake pkg-config automake autoconf libtool boost wget
pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
# 下载源码编译安装
git clone https://github.com/ray-project/ray.git
cd ray/python
python setup.py install
# 测试
python test/runtest.py

# 安装WebUI需要的库[可选]
pip install jupyter ipywidgets bokeh

# 编译Ray文档[可选]
cd ray/doc
pip install -r requirements-doc.txt
make html
open _build/html/index.html

自家在MacOS上安装jupyter时,遇到了Python的setuptools库不或许升级的地方,原因是MacOS的安全性设置难题,可以使用如下格局缓解:

  1. 重启电脑,运营时按住Command+R进去Mac珍贵方式。
  2. 打开命令行,输入指令csrutils disable关闭系统安全策略。
  3. 重启电脑,继续安装jupyter。
  4. 安装到位后,重复如上的方法执行csrutils enable,再度重启即可。

进入PythonShell,输入代码本地运转Ray:

import ray
ray.init()

浏览器内开辟WebUI界面如下:

C++ 4

参考资料

  1. Ray论文:Real-Time Machine Learning: The Missing
    Pieces
  2. Ray开发手册:http://ray.readthedocs.io/en/latest/index.html
  3. Ray源代码:https://github.com/ray-project/ray