分布式异步任务处理组件(七)

分布式异步任务处理组件底层网络通信模型的设计--如图:

  1. 使用Java原生NIO来实现TCP通信模型
  2. 普通节点维护一个网络IO线程,负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写,上层会有另一个线程负责网络通信协议的实现;---也就是说维护一个selector线程,负责处理socketchannel的IO事件;
  3. Leader节点网络通信层有多个线程--一个selector线程负责接受其他节点的连接请求,然后为每个连接建立一个线程并分配单独的selector来处理各自连接上的IO事件--如此设计的原因是各节点的状态严格依赖与主节点的心跳和其他通信,防止主节点线程阻塞导致心跳失败;从而引发节点下线带来的大量同步工作--后续会聊到;
  4. 各节点网络通信线程之上会有一个线程专门负责组件的网络通信协议,就是将网络传输的字节流解码成组件的通信协议包,因为NIO的buffer是数据块,所以首先通过读写队列将字节转化为字节流,通过协议转化为网络通信命令包,同时解决粘包半包等问题;
  5. 网络通信线程和协议实现线程之间通过读写两个队列来实现(网络IO线程的读队列就是协议线程的写队列,反过来一样,所以这里读写队列是相对的;),为了保证性能,避免重复创建对象和对象回收,设计了ByteBuffer缓存机制和异步读写队列数据结构--详细结构如图--
  6. 说一下三个队列--读写队列和缓存队列,用来实现IO通信线程和协议通信线程之间的数据通信--两个线程基本上会轮训处理网络IO事件,和上层协议事件,基本过程如下--
    1. 从网络IO线程角度出发--
      1. 当产生可读事件时,网络IO线程会从缓存队列中获取一个空的ByteBuffer,这里设计为当没有可用的缓存Buffer对象时会新建一个--具体在队列实现里讲,可能会产生写扩张现象,后期性能优化时考虑加入回收机制;
      2. 将socket缓冲区中的网络数据read进Buffer中,然后将Buffer对象入队到IO写队列中;
      3. 然后检查IO读队列不为空时,对IO读队列出队,获取要发送的数据Buffer对象,发送到其他节点中;
  7. 异步多线程队列,支持两个线程同时出队入队操作;原理和代码贴下来,基本实现:
package org.example.web.buffer;import org.example.web.api.SocketBufferQueue;import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {//异步读写队列实现原理;/** 当队列中的元素个数>1时,读线程和写线程可以同时进行,因为这时候不涉及操作共享变量*当队列中的元素个数<=1时,读写队列中只能有一个线程操作读或者写,因为此时会涉及队列头尾指针的操作;* 实现原理,写线程在获取写锁时可以正常做写操作:此时有两种情况--*     1,获取写锁之后队列为空,此时不会有读线程做读操作,只有获得写锁的该线程可以put,put完成之后将头尾指针同时指向改为以元素即可;此时队列元素个数为1;*     2,获取写锁之后队列中只有一个元素,这时也可以保证只有该线程在做写入,因为只有一个元素的情况下,读线程要读取该元素必须同时获得读锁和写锁;此时队列元素个数为2;*     3,读线程获取读锁之后有三种情况;size>1;size=1;size=0;*     4, 重点是保证不能多个线程同时进入队列元素为零的状态;就是读线程消费了最后一个元素,正好此时写线程在队列为空的时候写入,读写线程会同时操作头尾指针,造成错乱,所以在元素数量为1* 的时候就要进行同步操作;原理:*           1.读线程获取读锁之后如果size=1,此时不会先消费,而是试图获取写锁,防止此时有写线程同时操作,获取写锁之后再判断size是否为1,如果为1则做出队操作,然后释放写锁,如果为2则直接释放写锁--再进行出队操作;*           2,这里读线程获取读锁之后判断size=1,再获取读锁成功之后有两种情况--*                   1,有写线程在读线程之前获取到了写锁,则读线程获取到写锁的时候size>=2了(可能不止一个),*                   2,判断size=1之后直接获取到了写锁,此时就应该阻塞其他写线程做入队操作,等待自己完成出队操作之后再释放写锁;*     5,再说一下size怎么保证同步,*           1,在size<=1的时候严格保证线程同步操作,保证size;*           2,在size>1的时候,此时可以理解为队列同时在出队和入队,size在两个线程操作的时候先出队-1还是先入队+1其实是没有关系的,因为原子操作保证了最后结果是没有问题的就行;* */private AtomicInteger size;protected T head;protected T tail;private Object readLock;private Object writeLock;//这里考虑使用cas还是SynchronizedAsynchronousQueue(){this.writeLock=new Object();this.readLock=new Object();}AsynchronousQueue(int initSize){this();this.size=new AtomicInteger(initSize);}//空队列初始化要创建一个nodeAsynchronousQueue(T node){this(1);this.head=node;this.tail=this.head;}public boolean offerFirstOne(T node){synchronized (this.writeLock){if(this.size.get()>0){return false;}this.head=this.tail=node;return this.size.compareAndSet(0,1);}}public boolean offer(T node){preOfferElement(node);synchronized (this.writeLock){if(this.size.get()==0){return this.offerFirstOne(node);}else{T temp=this.head;node.next=temp;temp.pre=node;this.head=node;}return this.size.incrementAndGet() > 1;}}private void preOfferElement(T bufferNode){bufferNode.next=null;bufferNode.pre=null;}public T pollLastOne(){return this.size.compareAndSet(1,0)?this.tail:null;}public T poll(){synchronized (this.readLock){if(this.size.get()==0){return null;}if(this.size.get()==1){synchronized (this.writeLock){if(this.size()>1){return this.getTailElement();}if(this.size()==1){this.pollLastOne();}}}return this.getTailElement();}}private T getTailElement(){if(this.size()>1){this.tail= (T) this.tail.pre;this.size.decrementAndGet();return (T) this.tail.next;}return null;}public int size(){return this.size.get();}public int increamentSize(){return this.size.incrementAndGet();}public int decrementSize(){return this.size.decrementAndGet();}private class BufferNode{private ByteBuffer buffer;private BufferNode pre;private BufferNode next;BufferNode(ByteBuffer byteBuffer){this.buffer=byteBuffer;}BufferNode(){}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/53142.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式该往哪个方向发展?

1. 你所在的城市嵌入式Linux岗位多吗&#xff1f;我觉得这是影响你做决定的另一个大问题。我们学嵌入式Linux这门技术&#xff0c;绝大部分人是为了从事相关的工作&#xff0c;而不是陶冶情操。但是根据火哥统计来看&#xff0c;嵌入式Linux的普遍薪资虽然高于单片机&#xff0…

把几个Cad图纸 合并到一个Cad文件。但是不同图纸比例不一致,怎么调成一样大--推荐

把几个Cad图纸 合并到一个Cad文件。但是不同图纸比例不一致&#xff0c;怎么调成一样大; 一、需求&#xff1a; 最近在做cad画图纸的过程中&#xff0c;需要在不同的图上获取“框图”&#xff0c;但是复制到当前的cad中后&#xff0c;大小&#xff0c;比例都是变了&#xff0c…

Day11-Webpack前端工程化开发

Webpack 一 webpack基本概念 遇到问题 开发中希望将文件分开来编写,比如CSS代码,可以分为头部尾部内容,公共的样式。 JS代码也希望拆分为多个文件,分别引入,以后代码比较好维护。 本地图片,希望可以实现小图片不用访问后端,保存在前端代码中就可以了 运行程序时我…

Windows下安装Spark(亲测成功安装)

Windows下安装Spark 一、Spark安装前提1.1、JDK安装&#xff08;version&#xff1a;1.8&#xff09;1.1.1、JDK官网下载1.1.2、JDK网盘下载1.1.3、JDK安装 1.2、Scala安装&#xff08;version&#xff1a;2.11.12&#xff09;1.2.1、Scala官网下载1.2.2、Scala网盘下载1.2.3、…

LLVM笔记1

参考&#xff1a;https://www.bilibili.com/video/BV1D84y1y73v/?share_sourcecopy_web&vd_sourcefc187607fc6ec6bbd2c74a3d0d7484cf 文章目录 零、入门名词解释1. Compiler & Interpreter2. AOT静态编译和JIT动态解释的编译方式3. Pass4. Intermediate Representatio…

AirLink 101 Wireless N 150 PCI Adapter驱动和管理软件

从光盘里拷出来的&#xff0c;界面比较复古&#xff0c;实际功能聊胜于无 链接&#xff1a;https://pan.baidu.com/s/1clUcp7QsF8QMWdGoZkc_dQ?pwdmkra 提取码&#xff1a;mkra

开放式耳机怎么样?值得入手的开放式耳机推荐

​蓝牙耳机是现代音乐爱好者不可或缺的装备&#xff0c;近几年热门的开放式耳机似乎更受大家热爱&#xff0c;它们能够带来更加清晰、自然的声音体验&#xff0c;同时还能让你在听音乐时保持一定的舒适度。那么&#xff0c;都有哪些好用的开放式耳机呢&#xff1f;希望通过今天…

记录--基于css3写出的流光登录(注释超详细!)

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 完整效果 对基本的表单样式进行设置 这里设置了基本的表单样式&#xff0c;外层用了div进行包裹&#xff0c;重点是运用了两个i元素在后期通过css样式勾画出一条线没在聚焦文本框的时候线会过度成一个…

【前端】对前端小白极为友好的JS DOM入门文章

在现代web开发中&#xff0c;JavaScript (JS) 是不可或缺的一部分&#xff0c;它使我们能够为网页赋予交互性和动态性。其中&#xff0c;DOM&#xff08;文档对象模型&#xff09;技术在前端开发中起着至关重要的作用。本篇博客将带领前端初学者深入理解JavaScript DOM技术&…

【数据结构】快速排序

快速排序是一种高效的排序算法&#xff0c;其基本思想是分治法。它将一个大问题分解成若干个小问题进行解决&#xff0c;最后将这些解合并得到最终结果。 快速排序的主要思路如下&#xff1a; 选择一个基准元素&#xff1a;从待排序的数组中选择一个元素作为基准&#xff08;…

PAT 1002 A+B for Polynomials

个人学习记录&#xff0c;代码难免不尽人意 Output Specification: For each test case you should output the sum of A and B in one line, with the same format as the input. Notice that there must be NO extra space at the end of each line. Please be accurate to 1…

炼钢工艺流程(2)

1. 轧制单元 更换前后两个工作辊之间的轧制对象称为轧制单元&#xff0c;对应一个轧制计划。两个 支撑辊之间的轧制对象是由多个轧制单元组成&#xff0c;称为轧制单元组&#xff0c;对应多个轧制计 划。 轧制单元的结构 每个计划开始的部分板坯按照宽度非减的方向排列来加热轧…