💐💐扫码关注公众号,回复 spark 关键字下载geekbang 原价 90 元 零基础入门 Spark 学习资料💐💐
join 实现机制
Join 有 3 种实现机制,分别是 NLJ(Nested Loop Join)、SMJ(Sort Merge Join)和 HJ(Hash Join)
NLJ:Nested Loop Join
对于参与关联的两张表,如 salaries 和 employees,按照它们在代码中出现的顺序,我们约定俗成地把 salaries 称作“左表”,而把 employees 称作“右表”。在探讨关联机制的时候,我们又常常把左表称作是“驱动表”,而把右表称为“基表”。一般来说,驱动表的体量往往较大,在实现关联的过程中,驱动表是主动扫描数据的那一方。而基表相对来说体量较小,它是被动参与数据扫描的那一方。
在 NLJ 的实现机制下,算法会使用外、内两个嵌套的 for 循环,来依次扫描驱动表与基表中的数据记录。在扫描的同时,还会判定关联条件是否成立,如内关联例子中的 salaries(“id”) === employees(“id”)。如果关联条件成立,就把两张表的记录拼接在一起,然后对外进行输出。
不难发现,假设驱动表有 M 行数据,而基表有 N 行数据,那么 NLJ 算法的计算复杂度是 O(M * N)。尽管 NLJ 的实现方式简单、直观、易懂,但它的执行效率显然很差。执行高效的 HJ 和 SMJ 只能用于等值关联,也就是说关联条件必须是等式,像 salaries(“id”) < employees(“id”) 这样的关联条件,HJ 和 SMJ 是无能为力的。相反,NLJ 既可以处理等值关联(Equi Join),也可以应付不等值关联(Non Equi Join),可以说是数据关联在实现机制上的最后一道防线。
SMJ:Sort Merge Join
SMJ 的实现思路是先排序、再归并。给定参与关联的两张表,SMJ 先把他们各自排序,然后再使用独立的游标,对排好序的两张表做归并关联。
具体计算过程是这样的:起初,驱动表与基表的游标都会先锚定在各自的第一条记录上,然后通过对比游标所在记录的 id 字段值,来决定下一步的走向。对比结果以及后续操作主要分为 3 种情况:
- 满足关联条件,两边的 id 值相等,那么此时把两边的数据记录拼接并输出,然后把驱动表的游标滑动到下一条记录;
- 不满足关联条件,驱动表 id 值小于基表的 id 值,此时把驱动表的游标滑动到下一条记录;
- 不满足关联条件,驱动表 id 值大于基表的 id 值,此时把基表的游标滑动到下一条记录。
基于这 3 种情况,SMJ 不停地向下滑动游标,直到某张表的游标滑到尽头,即宣告关联结束。对于驱动表的每一条记录,由于基表已按 id 字段排序,且扫描的起始位置为游标所在位置,因此,SMJ 算法的计算复杂度为 O(M + N)。然而,计算复杂度的降低,仰仗的其实是两张表已经事先排好了序。但是我们知道,排序本身就是一项很耗时的操作,更何况,为了完成归并关联,参与 Join 的两张表都需要排序。
Sort Merge Join 没有内存方面的限制。不论是排序、还是合并,SMJ 都可以利用磁盘来完成计算。所以,在稳定性这方面,SMJ 更胜一筹。
HJ:Hash Join
HJ 的设计初衷是以空间换时间,力图将基表扫描的计算复杂度降低至 O(1)。
具体来说,HJ 的计算分为两个阶段,分别是 Build 阶段和 Probe 阶段。在 Build 阶段,在基表之上,算法使用既定的哈希函数构建哈希表,如上图的步骤 1 所示。哈希表中的 Key 是 id 字段应用(Apply)哈希函数之后的哈希值,而哈希表的 Value 同时包含了原始的 Join Key(id 字段)和 Payload。
在 Probe 阶段,算法依次遍历驱动表的每一条数据记录。首先使用同样的哈希函数,以动态的方式计算 Join Key 的哈希值。然后,算法再用哈希值去查询刚刚在 Build 阶段创建好的哈希表。如果查询失败,则说明该条记录与基表中的数据不存在关联关系;相反,如果查询成功,则继续对比两边的 Join Key。如果 Join Key 一致,就把两边的记录进行拼接并输出,从而完成数据关联。
Hash Join 的执行效率最高,这主要得益于哈希表 O(1) 的查找效率。不过,在 Probe 阶段享受哈希表的“性能红利”之前,Build 阶段得先在内存中构建出哈希表才行。因此,Hash Join 这种算法对于内存的要求比较高,适用于内存能够容纳基表数据的计算场景。
join 数据分发
在大数据的应用场景中,数据的处理往往是在分布式的环境下进行的,在这种情况下,数据关联的计算还要考虑网络分发这个环节。在分布式环境中,Spark 支持两类数据分发模式。一类是Shuffle,Shuffle 通过中间文件来完成 Map 阶段与 Reduce 阶段的数据交换,因此它会引入大量的磁盘与网络开销。另一类是广播变量(Broadcast Variables),广播变量在 Driver 端创建,并由 Driver 分发到各个 Executors。因此,从数据分发模式的角度出发,数据关联又可以分为 Shuffle Join 和 Broadcast Join 这两大类。
在分布式环境中,两张表的数据各自散落在不同的计算节点与 Executors 进程。因此,要想完成数据关联,Spark SQL 就必须先要把 Join Keys 相同的数据,分发到同一个 Executors 中去才行。以 Join Keys 为基准,两张表的数据分布保持一致,是 Spark SQL 执行分布式数据关联的前提。而能满足这个前提的途径只有两个:Shuffle 与广播。
Shuffle Join
对于参与 Join 的两张数据表,Spark SQL 先是按照如下规则,来决定不同数据记录应当分发到哪个 Executors 中去:
- 根据 Join Keys 计算哈希值
- 将哈希值对并行度(Parallelism)取模
Spark SQL 在默认情况下一律采用 Shuffle Join,原因在于 Shuffle Join 的“万金油”属性。也就是说,在任何情况下,不论数据的体量是大是小、不管内存是否足够,Shuffle Join 在功能上都能够“不辱使命”,成功地完成数据关联的计算。
Broadcast Join
Spark 不仅可以在普通变量上创建广播变量,在分布式数据集(如 RDD、DataFrame)之上也可以创建广播变量。这样一来,对于参与 Join 的两张表,我们可以把其中较小的一个封装为广播变量,然后再让它们进行关联。
在 Broadcast Join 的执行过程中,Spark SQL 首先从各个 Executors 收集 employees 表所有的数据分片,然后在 Driver 端构建广播变量 bcEmployees,构建的过程如下图实线部分所示。
可以看到,散落在不同 Executors 内花花绿绿的矩形,代表的正是 employees 表的数据分片。这些数据分片聚集到一起,就构成了广播变量。接下来,如图中虚线部分所示,携带着 employees 表全量数据的广播变量 bcEmployees,被分发到了全网所有的 Executors 当中去。在这种情况下,体量较大的薪资表数据只要“待在原地、保持不动”,就可以轻松关联到跟它保持之一致的员工表数据了。通过这种方式,Spark SQL 成功地避开了 Shuffle 这种“劳师动众”的数据分发过程,转而用广播变量的分发取而代之。
尽管广播变量的创建与分发同样需要消耗网络带宽,但相比 Shuffle Join 中两张表的全网分发,因为仅仅通过分发体量较小的数据表来完成数据关联,Spark SQL 的执行性能显然要高效得多。这种小投入、大产出,用极小的成本去博取高额的性能收益,可以说是“四两拨千斤”!