GPDB-内核特性-gp_interconnect_fc_method参数
gp_interconnect_fc_method参数控制使用哪种流量控制方式:capacity根据接收方窗口来控制发送;loss(默认)根据丢包情况控制发送速度。Loss是基于capacity,还会根据丢包情况调整发送速度。那么针对这个参数怎么解根据接收方窗口来控制发送呢?
1、接收端主线程
接收端主线程通过processIncomingChunks函数从接收队列中拿数据包构建tuple链表。如上图所示,prepareRxConnForRead中,可以看到从conn->pkt_q[]队列的头conn->pkt_q_head拿一个ICBuffer。然后通过RecvTupleChunk函数从ICBuffer中构建chunk链表chunkSorterEntry->chunk_list中并组装到chunkSorterEntry->ready_tuples。此时,该ICBuffer就可以释放掉了。
通过MlPutRxBufferIFC函数释放:可以在上图蓝框中看到其流程,会将该包的seq作为ack的extraSeq,当前已接收最大包的seq:conn->conn_info.seq-1作为ack的seq发送给发送端(setAckSendParam设置ack,sendAckWithParam发送ack)。
2、发送端
发送端发送函数SendChunkUDPIFC,先通过sendBuffers进行发送。可以看到当conn->capacity大于0时才能发送。该值在SetupUDPIFCInterconnect_Internal中初始化为gp_interconnect_queue_depth(默认值4),即接收端接收队列大小,也就是接收窗口。初始表示接收队列空闲这4个空间。若发送成功,该值会减1。那么什么时候,该值会增加呢?
3、接收端接收线程
接收端接收线程接收到数据包后,回ack的函数是handleDataPacket。会收到三种数据包:正常包、乱序包、重复包。
4、接收线程收到正常包
因为接收队列总是根据包的seq号来决定放到哪个槽位。发送端发送一个包,会将当前连接的序列号conn_info.seq作为包的seq,并将序列号+1;若是重复包则不会加。因此正常包的seq总是递增的,反应到槽位上也是依次从前向后然后循环到头从新开始放。队列中每放一个正常包,队列tail向后移动一个槽位,并且接收端的conn_info.seq+1。因此判定正常包的条件:pos == tail
正常包时,会将当前接收队列的接收的包的seq作为ack的seq,将当前已处理包的序列号extraSeq作为ack的extraSeq回给发送端。
handleAcks处理接收端发来的ack中会更新该值。当回的是正常ack时:
pkt->extrsSeq > pkt->consumedSeq(初始0),正常情况下该条件会满足,接收端正常处理了包:pkt->extraSeq - ackConn->consumedSeq表示处理了包的个数。
注:不是每处理一个包就立即返回的。1)seq每隔2个,会回一次ack,反应到extraSeq上:已处理了该seq,但每次都会更新extraSeq;2)接收线程每接收一个包,并放到了队列中,也会回一次ack。当然这里的extraSeq是主线程处理一个包更新后的值。
举例:ackConn->consumedSeq开始是0,返回来的pkt->extraSeq为2。pkt->extraSeq - ackConn->consumedSeq表示接收端接收队列空出了2个,此时更新下consumedSeq值为2。再收到回复的ack时,pkt->extraSeq为3,接收端上次处理到了2,这次处理到了3,正好空闲出了1个,所以这样对应到发送端的黄框更新capacity上。
5、接收线程收到乱序的包
1)发送端后发送的先到达了,也会导致后接收到的包落到了接收队列前面.seq2先到,此时pos!=tail,即seq2是个乱序包。
2)发送端发送时丢包了,接收端没接收到,但后续的包接收到了。丢的包再次发送时,它势必会在接收队列的前面。Seq1中途丢了,那么seq2到了后,pos!=tail,即seq2是乱序包。
handleDisorderPacket-->sendDisorderAck(conn, pkt->seq, conn->conn_info.seq - 1, lostPktCnt);
针对乱序包,回的ack:包的seq作为ack的seq,当前接收端已接收的包seq作为extraSeq。
例1)中,seq2先到,回的ack(seq,extraSeq)=(2,0);seq1到了后,会执行下面循环:更新到seq2的位置,即tail会移动到3,seq号加2。然后回ack(1,0)
乱序的包,如何导致extraSeq变小?
1)接收端已处理了seq1,发送ack,更新了发送端capacity和consumedSeq为1
2)接收端陆续又接收了seq2、seq3,因是正常包,回ack的extraSeq为接收队列已处理的seq,此时是1,所以发送端pkt->extraSeq(1) > ackConn->consumedSeq(1)条件不满足,不会更新capacity和consumedSeq
3)seq4和seq5发送乱序,seq5先到,此时回的ack的extraSeq为3,发送端pkt->extraSeq(3) > ackConn->consumedSeq(1)条件满足,更新capacity(+2 -> 3)和consumedSeq(3)。
4)当接收端再次拿走seq2处理后,回ack的extraSeq为2,此时就发送extraSeq变小。当然,发送端也不会更新capacity和consumedSeq。
也就是,实际上接收端还每空闲出槽位,发送端就提前更新说接收端空闲出来了。导致发送端激进发送(当然这不会发生,因为还会和loss和capacity机制结合,当接收端发送队列都每接收到ack或达到拥塞窗口时,就不再发送)。
另外,发生乱序时,实际上接收端已消耗了一个槽位,但是仍旧不会向发送端报告说又消耗了一个,只有当丢的包或者慢的包收到时,才会将槽位一下子更新到之前接收到乱序包的位置,然后才告诉发送端:丢失包和乱序包占用的槽位都消耗了。
6、接收线程收到重复包的情况
发送端发送的seq2在接收端接收到了,正常占用槽位,tail正常推进。接收队列还没处理,所以发送端的capacity和consumedSeq不能更新变大。Seq2接收到后,回的ack丢失了,那么发送端会超时重发,因此seq2在接收端已接收过,所以认定seq2是重复包。针对重复包seq2回的ack(seq,extraSeq)=(2,5),发送端会将capacity进行更新为5,consumedSeq更新5。
此时,接收端拿走seq1处理,回ack(1,1)。这种就和乱序包一样的情况了,不会继续更新capacity了。
针对重复包,接收队列还没空出槽位,同样会提前通知发送端说:我有空闲槽位了。
7、总结
主要介绍gp_interconnect_fc_method流量控制,如何理解根据接收端窗口来控制发送。针对正常包、乱序包、重复包、已处理包回的ack,来更新发送端capacity(标记接收窗口)。仅当capacity大于0时,才会进入是否发送数据包的判断流程。