说明
很久没有写了,总是写一半就没空往下写。这次正好有个单独的主题,可以写一下。
内容
1 块的分配
数据应该怎么切分和管理?这没有一个固定的答案,在我的实践中,我觉得一个块(Block)一万条记录是比较合理的。然后按照片(Shard)、区(Partition)进行拆分和管理。
我稍微重新定义了片区块的概念。
片 Shard: 可以理解一个片就是一个物理存储位置,A主机存一个片,B主机存一个片。片是集群管理的单位。
区 Partition: 区对应于表级的概念。一个项目或主题有一个库名,下面会分为若干表,也就是分区。库名是项目名称。
块 Block: 对应于表内的一个索引字段,按照顺序进行编号。块有两种类型,一种是时间型的,按时隙进行顺序编号;另一种是空间型的,只是按照入库的顺序进行编号。
片的意义是,当数据真的非常大,需要集群来进行分担时采用的存储单元。当然,有的时候是因为并发太大,将记录均匀映射到多个分片,从而让集群中的每台机器来均匀的处理请求。
区的意义是,当进行查询时,仅需对相对较小的部分进行查询。
块的意义是,既满足并行处理的效率需求,也满足存储,特别是内存的限制。效率问题主要是指:当我们逐条处理数据时,CPU的性能大部分是处在停等状态的。显然我们可以批量执行来提高处理效率,以目前我接触到的处理来看,一万条应该是一个比较合适的量级:吞吐量是一条的约一万倍,同时对于大部分数据而言,内存都不会占用太多。
2 块的编号
按照上面设计的层级,一个块可以由 project_name
、shard_id
、part_id
和block_id
构成。按照物理的级别,实际上层级的顺序应该是 shard_id
~ 物理机、project_name
~ 库名、part_id
~ 表名 和block_id
~ 筛选字段。block_id
又可以分为时间和空间两部分id。
一个样例名称是: myproject.0.0.0.0,其意义为:
myproject : 项目名称
第一个0 : shard, 0~999, 0 通常也意味着没有启动集群
第二个0 : part, 0~999 ,0 也意味着初始表,如果一个project只有一个sub_project, 那么一个part容纳1000个block,也就是一千万条记录
第三个0: 0 时隙,意味着时隙无关数据,时隙顺着统一时间轴编号,这个可以是默认的轴。
第四个0: block, 0~999 ,这个是通过sub_project的rec_id计算出来的,每个sub_project的rec_id是唯一的。
因此,假设project只有一个sub_project时,每个数据库服务(对应某块硬盘)的存储的数据大约是10万(1000*1000)个block,也就是10亿条数据。要扩充这个设计上线只要改变编号数就可以,我是觉得这个尺寸刚好。因为数据库服务是微服务,如果主机够强,可以支撑多个shard(1000),那么就是1万亿条数据。我可以假定一个项目不会超过这个数据上限(至少目前没有碰到如此夸张的数据)。
3 Worker
worker是实现逻辑的核心单元,通过player进行调度,每个worker每次处理的数据单位就是block。 worker会按照块编号提出自己的数据请求。
在某一次处理中,worker将锁定某一个时空的block(通过编号),发起数据请求。当数据请求被满足时,worker将执行任务。这里有几方面内容:从时间序列的角度来说,先取时间块,再取空间块。例如,某个worker要处理某个标的的数据,先获取时间块(例如对应2020年1月数据),再从时间块中获取某个标的的数据(空间块)。有时候,问题可能会简化为时间无关的空间块需求(例如对全量进行处理)
每时每刻都会有大量的worker分别在完成不同的任务,这会带来很多挑战。
首先,是计算资源,如CPU或者GPU。这个一般可以通过worker的数量进行简单的控制:每个worker处理的基础单位是block(n>=1),因此在设计时就天然的限制了内存的使用,以及CPU/GPU的使用(毕竟数据少,计算时间短)。所以,在多个worker下,每个项目总有机会得到轮替执行。由于分布式网络的存在,以及现在很流行的算力租用,启动更多的worker变的容易。所以计算资源暂时可以认为是容易解决的。
其次,是存储资源,或者说是IO资源。在极端情况下,假设只有一个worker,那么显然,worker可以直接从数据库调取数据;当有多个worker,这些worker可能会反复的对某些块发出需求,此时数据库就会濒临崩溃。由于内外存的数据存储速度还是存在巨大的差异,并且反复执行数据库查询操作会占用大量的CPU时间,连接数等,这会导致服务器的能力急剧下降。关键是从道理上来说,每个worker都没错,但是从整体上来看,这些独立的worker造成了大量的浪费。比如1000个worker,就对某个块读取了1000次。特别是,当算力拓展到租用机时,如何确保这些租用机能获取到相应的数据?
4 Block Manager
所以,Worker不能直接从数据库(Mongo)取数,而是要从内存(Redis)里取数;另外,如何对于数据减少反复读取,以及分发到扩展主机上,需要有一个管理者。这就是Block Manager。
worker在一次运行中(通常是半个时隙,或者说一个节拍)会执行元数据计算,或者是数据计算。
在元数据计算阶段,主要需要计算状态,以及对应的数据请求。状态会决定是否可计算,以及具体的计算方法。并根据据此提供所需求的数据需求(数据块名称)。
在计算阶段,worker尝试读取数据块进行工作,并将结果(无论是顺利执行,数据缺失或者逻辑错误)写入元数据,然后结束。
因此,Block Manager需要接受来自不同worker的块请求,去数据库取数,存在内存里。其管理作用体现在:
- 1 预取数 Pre-Fetch。
- 2 动态删除数据 Dynamic-Delete。
- 3 避免重复取数 Read Once
我觉得chat归纳的比我清晰。
4.1 预取数
worker有两种方式:一种空间型的,通过player可以做全盘的规划,然后并行执行;还有一种是时间型的,下一次worker的执行必须依赖本次worker执行的结果。
空间型的预取数主要是根据计划表按“通道”来执行提前取数。首先player会生成一张待执行表,表里有一个通道字段,供不同的worker进行并行。一般任务数不会超过10万个(每个任务的block是一万)。worker所在的服务器在执行任务时,会被分配一个任务通道,然后按顺序执行任务。这个顺序会按通道删选,可能还会按照优先级排序(可动态),然后再按照所使用的block顺序排序。
worker的元信息中会包含所属任务表,这样Block Manager就知道当前worker的需求,同时可以较为准确的知道接下来worker需要哪些数据。这是属于通过任务(计划)表进行的预取数,还有一种是通过模型/算法进行取数预测,这是后话。
取数分为两种,历史的和最新的。如果是历史取数,那么只要block存在,就不必再读取。如果是最新(次新)的block,就需要反复读取。预取数可以提前准备好n个block,这样如果worker处理的时间足够长,那么在下一次时隙到来时,其运行总是不用停等的。由于BM将数据取到了内存,worker只是在内存中取数。
4.2 动态删除数据
BM每次获取block时都会设置失效时间,一般是一天。这种被动式的删除数据方法显然太慢,如果worker请求的block很多,那么内存将很快被撑爆。
BM应当接受一些参数来采取更主动的数据管理,这些参数包括:最大的块限制、最大的内存使用限制。
这要求BM对存储块保持跟踪对每个块的追踪(block_tracking_table),包括某个块的载入次数,大小等,这样就可以接受参数来进行控制。
4.3 避免重复取数
通常worker会发送task_table的信息给到BM,BM会读取并比较这些task_table是否是取相同的数据源等。
5 Class BlockManager
用一些粗糙的方法来展示BM的一些功能
class BlockManager:state = 'I' # 初始化def __init__(self,name):passdef _set_max_blocks(): # 最大的块passdef _set_max_mem(): # 最大内存passdef _mongo_connect(): # 数据库连接passdef _redis_connect(): # redis连接passdef pre_fetch(): # 预取数passdef flip(): # 元数据计算passdef flop(): # 数据计算passdef gc(): # 内存回收passdef ikeep(): # 适当的保存数据避免重复读取passdef _merge_task_table(): # 读取新的任务表,来进行块的估计pass
先写到这吧,假期快结束了。
一句话总结:数据的IO与数据的流转是两个完全不同的领域。之前的ADBS已经完成了流转部分功能,在IO方面需要BM及规范来确保大量的计算是自由的。
最后再补充一点,片、区、块的设计是UCS的一部分,在拓展计算(租用算力上),可以通过rsync方式将块以文件方式同步到算力机上,这样就实现了数据的全局同步。