写在文章开头
近期对一些比较老的项目进行代码走查,碰到一些极端的并发编程恶习,所以笔者就基于此文演示这类问题以及面对并发编程时我们应该需要了解一些常见套路。
Hi,我是sharkChili,是个不断在硬核技术上作死的java coder,是CSDN的博客专家,也是开源项目Java Guide的维护者之一,熟悉Java也会一点Go,偶尔也会在C源码边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号:写代码的SharkChili,实时获取笔者最新的技术推文同时还能和笔者进行深入交流。
提出一个需求
基于笔者近期走查的案例笔者以一个类似的需求进行演示,这个需求是通过一个定时的任务调度线程从任务表中获取任务项,通过这个任务项得到要到data
表查询对应任务的数据集并进行数据推送。
此时如果用户通过页面点击暂停,这些正在发送的数据在数据库中的状态就会被更新为暂停,完成后再将这个定时调度的线程暂停。
整体流程如下图所示,理想情况下,两个线程的工作过程为:
- 线程1从数据库找到任务,并通过这个任务找到数据表找到要发送的数据集,存入内存中。
- 线程1更新数据集状态为待发送,不断发送数据。
- 系统收到用户页面的暂停操作,创建一个线程2,从内存中找到要发送的数据,将这些数据集的状态更新为已暂停。
- 线程完成数据暂停后将线程1的执行打断。
问题复现
基于这个需求,笔者给出下面这样一个错误的例子,首先我们定义一下要发送的数据类,可以看到这个类包含id、数据和数据发送状态:
@Data
@AllArgsConstructor
public class SendData {private int id;private String data;/*** 0 未开始* 1 发送中* 2 已完成* 3 暂停*/private int status;
}
然后我们再给出任务的封装,如下所示,我们通过任务表可以查到任务的id和名称,通过id就可以到数据表定位到当前任务的数据集,并将其添加到sendDataLinkedList
中:
@Data
@AllArgsConstructor
public class TaskInfo {private int taskId;private String taskName;//数据集private LinkedList<SendData> sendDataLinkedList;//若sendDataLinkedList不为空则弹出第一个元素public SendData popSendData() {if (CollUtil.isNotEmpty(sendDataLinkedList)) {return sendDataLinkedList.pop();}return null;}//将数据添加到sendDataLinkedList中public void addSendData(SendData sendData) {sendDataLinkedList.add(sendData);}
}
然后我们给出模拟数据,可以看到笔者用taskInfoMap
模拟任务表中的数据,用mysqlSendDataList
模拟数据库中对应task
要发送的数据集:
private static List<SendData> mysqlSendDataList = new ArrayList<>();private static Map<Integer, TaskInfo> taskInfoMap = new HashMap<>();static {//模拟其他线程查到要执行的任务,并存入内存taskInfoMap.put(1, new TaskInfo(1, "任务1", new LinkedList<>()));//模拟任务1在mysql表中要发送的电话号码mysqlSendDataList.add(new SendData(1, "数据1", 0));mysqlSendDataList.add(new SendData(2, "数据2", 0));mysqlSendDataList.add(new SendData(3, "数据3", 0));mysqlSendDataList.add(new SendData(4, "数据4", 0));mysqlSendDataList.add(new SendData(5, "数据5", 0));mysqlSendDataList.add(new SendData(6, "数据6", 0));mysqlSendDataList.add(new SendData(7, "数据7", 0));mysqlSendDataList.add(new SendData(8, "数据8", 0));mysqlSendDataList.add(new SendData(9, "数据9", 0));mysqlSendDataList.add(new SendData(10, "数据10", 0));}
对应的线程代码如下,可以看到线程1会从数据库中读取数据并更新为发送中然后进行发送,并在完成后更新数据库状态。
而线程2则是模拟收到用户状态请求后,从内存中的任务集找到任务1,然后定位到正在发送的数据集将其数据库状态更新为暂停,然后将线程1暂停(这里用stop模拟打断定时任务)
。
public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {//模拟查任务TaskInfo taskInfo = taskInfoMap.get(1);//模拟从数据库中取出待发送的数据log.info("线程1更新状态为发送中");List<SendData> dataList = mysqlSendDataList.stream().filter(s -> s.getStatus() == 0).collect(Collectors.toList());//更新状态为发送中mysqlSendDataList.stream().forEach(d -> d.setStatus(1));//将数据存入链表中dataList.forEach(taskInfo::addSendData);while (true) {SendData sendData = taskInfo.popSendData();if (sendData == null) {break;}log.info("发送数据:{} 成功", JSONUtil.toJsonStr(sendData));}//更新状态为发送完成mysqlSendDataList.stream().forEach(d -> d.setStatus(2));});Thread t2 = new Thread(() -> {//模拟从内存中找到任务,然后从内存中找到正在发送的号码,并将其数据库状态更新为待发送TaskInfo taskInfo = taskInfoMap.get(1);for (SendData sendData : taskInfo.getSendDataLinkedList()) {SendData mysqlSendData = mysqlSendDataList.stream().filter(s -> s.getId() == sendData.getId()).findFirst().get();mysqlSendData.setStatus(3);log.info("暂停任务:{}", JSONUtil.toJsonStr(mysqlSendData));}//打断正在工作的线程try {t1.wait();t1.interrupt();} catch (InterruptedException e) {e.printStackTrace();}log.info("打断t1线程,暂停发送任务");});t1.setName("t1");t1.start();t2.setName("t2");t2.start();System.out.println("执行结束");}
正常情况下,这种代码因为多线程操作单一数据集进行动态迭代删除时是会抛出ConcurrentModificationException
的,但是笔者在走查类似上文这种例子时并为发现这个问题,经过对于流程和场景梳理时得出了答案。
笔者发现这个启动和暂停任务的场景执行的数据量非常大,因为庞大的数据量,被暂停了任务基本都会在排队或者刚刚完成数据集状态更新为发送中就被类似于线程2的代码完美暂停掉。
但是也不免出现一些比较极端的场景:
- 任务1正好被执行。
- 执行过程中收到暂停信号,线程2读取内存中任务1的数据集,更新数据库状态。
- 任务2正准备打断任务1,CPU又切回线程1,因为线程2暂停数据时并没有将内存中的数据集删除,导致这些在数据库中已经被暂停的数据集仍然被发送了。
最终很可能导致同样的一批数据被重复发送两次。
对应的现象也就像下面这段代码一样,
00:17:43.052 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 线程1更新状态为发送中
00:17:49.093 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":1,"data":"数据1","status":3}
00:17:49.716 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":2,"data":"数据2","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":3,"data":"数据3","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":4,"data":"数据4","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":5,"data":"数据5","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":6,"data":"数据6","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":7,"data":"数据7","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":8,"data":"数据8","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":9,"data":"数据9","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":10,"data":"数据10","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 打断t1线程,暂停发送任务
解决方案
对于此类并发问题的重构并解决的套路考虑的基本要考虑如下两个点:
- 保持原有的业务逻辑
- 线程互斥保持在一个维度。
- 选用合适的并发容器。
我们都知道重构代码对于测试的回归,逻辑的扭转变化都存在很大的风险点,所以笔者在对这段代码重构时非常明确的梳理的任务执行的数据流,明确了业务逻辑,这位作者意图是想在任务暂停时及时更新任务状态且让线程1不执行被暂停的任务,所以为了保证暂停的数据集不被线程1发送,首先就需要保证两个线程操作的集合处于一个维度,而不是像上面的代码一样线程1用pop
方法,线程2用get
加遍历的方式。
所以笔者改动的第一步,就是像容器安全化,将数据集存储容器改为ConcurrentLinkedDeque
,然后弹出元素的函数改为pollFirst
。
//数据集private ConcurrentLinkedDeque<SendData> sendDataLinkedList;//若sendDataLinkedList不为空则弹出第一个元素public SendData popSendData() {return sendDataLinkedList.pollFirst();}
这里我们也给出pollFirst
的源码,可以看到它进行元素弹出时会通过CAS
确定弹出的item
是否和操作直线得到的一致,只有compare and set
成功之后才能弹出。
public E pollFirst() {for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;//只有cas成功才能弹出元素if (item != null && p.casItem(item, null)) {unlink(p);return item;}}//若为空直接返回nullreturn null;}
其次为了保证两个线程操作处于一个维度,笔者将getter
容器方法私有化,确保两者操作都是用同一个pop
方法操作:
private ConcurrentLinkedDeque<SendData> getSendDataLinkedList() {return sendDataLinkedList;}
这样线程2的暂停逻辑就改为实时pop
出线程1正在发送的数据再暂停,保证了暂停的数据线程1不会发送:
Thread t2 = new Thread(() -> {//模拟从内存中找到任务,然后从内存中找到正在发送的号码,并将其数据库状态更新为待发送TaskInfo taskInfo = taskInfoMap.get(1);SendData sendData = null;while ((sendData = taskInfo.popSendData()) != null) {SendData finalSendData = sendData;SendData mysqlSendData = mysqlSendDataList.stream().filter(s -> s.getId() == finalSendData.getId()).findFirst().get();mysqlSendData.setStatus(3);log.info("暂停任务:{}", JSONUtil.toJsonStr(mysqlSendData));}//打断正在工作的线程try {log.info("打断t1线程,暂停发送任务");t1.stop();} catch (Exception e) {e.printStackTrace();}});
此时再看输出结果,可以看到线程1发送了一个数据之后,线程2暂停了其余的数据,调度回到线程1,线程1停止了发送,问题解决:
00:50:18.336 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 线程1更新状态为发送中
00:50:23.090 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 发送数据:{"id":1,"data":"数据1","status":1} 成功
00:50:26.242 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":2,"data":"数据2","status":3}
00:50:28.200 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":3,"data":"数据3","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":4,"data":"数据4","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":5,"data":"数据5","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":6,"data":"数据6","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":7,"data":"数据7","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":8,"data":"数据8","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":9,"data":"数据9","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":10,"data":"数据10","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 打断t1线程,暂停发送任务
小结
总的来说这是一段比较基础的并发编程问题,本篇文章更着重的是让读者了解并发编程时如何复现以及考虑问题的维度,不难看出笔者进行并发编程问题的解决思路就是三步:
- 理清数据流和并发代码逻辑。
- 确定合适的容器。
- 确保多线程操作互斥在同一个维度。
我是sharkchili,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号:
写代码的SharkChili,同时我的公众号也有我精心整理的并发编程、JVM、MySQL数据库个人专栏导航。