java阻塞队列/kafka/spring整合kafka

queue增加删除元素

  • 增加元素
    • add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
    • put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素
    • offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false
  • 删除元素
    • poll: 若队列为空,返回null。
    • remove:若队列为空,抛出NoSuchElementException异常。
    • take:若队列为空,发生阻塞,等待有元素

BlockingQueue:

  • 解决线程通信的问题
  • 阻塞方法:put、take

其他实现类:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue/ SynchronousQueue/ DelayQueue

BlockingQueue实例

package com.nowcoder.mycommunity;import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class BlockingQueueTests {public static void main(String[] args) {BlockingQueue queue = new ArrayBlockingQueue(10);new Thread(new Producer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();}
}class Producer implements Runnable{private BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue){this.queue = queue;}@Overridepublic void run() {try {for(int i = 0; i < 100; ++ i){queue.put(i);Thread.sleep(20);System.out.println(Thread.currentThread().getName() + "   producer" + queue.size());}}catch (Exception e){e.printStackTrace();}}
}class Consumer implements Runnable{public BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue){this.queue = queue;}@Overridepublic void run() {try {while (true) {queue.take();Thread.sleep(new Random().nextInt(1000));System.out.println(Thread.currentThread().getName() + "   consuer" + queue.size());}} catch (Exception e) {e.printStackTrace();}}
}

kafka

  • kafka是一个分布式的流媒体平台
  • 主要应用:消息系统、日志收集、用户行为追踪、流式处理
  • 特点:高吞吐量、消息持久化(存放在磁盘上,btw,磁盘顺序读写速度并不慢)、高可靠性、高扩展性

Broker

kafka的服务器,每一台服务器称为一个Broker

Zookeeper

管理其他集群,包括kafka的集群。可以单独下载

Topic/ Partition/ Offset

消息队列可能是一对多的形式,生产者将一条消息放在多个队列中,然后消费者从各自的队列中取消息。
下图为一个Topic,Topic中可能会含有很多Partition,Offset为Partition的索引
在这里插入图片描述

Leader Replica/ Follower Replica

kafka的数据不止存储一份,他会存为多份,即使某一个分区坏了还可以有备份。
leader Replica(祖副本):当尝试从分区获取数据时,祖副本可以处理请求,返回数据
Follower Replica(随从副本):只能备份,不能响应请求
如果祖副本挂掉,集群会从Follower Replica中选一个作为新的leader

kafka命令

官方文档

配置

进入到configure目录下,修改consumer.properties

使用

进入到kafka的目录中

// 启动zookeeper
> ./bin/zookeeper-server-start.sh config/zookeeper.properties // 启动kafka
> ./bin/kafka-server-start.sh config/server.properties // --create:创建主题
// --bootstrap-server localhost:9092:在哪个服务器创建主题,kafka默认端口为9092
// --replication-factor 1:副本为1
// --partitions 1:分区为1
// --topic test:主题的名字
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.// 查看该服务器上的主题
> ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092            
test// 创建生产者向某个服务器的某个主题中发消息
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test        
>hello
>world// 创建一个消费者,读取某个服务器上某个主题下的消息队列,从头开始读取
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

Spring整合Kafka

引入依赖

pom.xml

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.0.7</version>
</dependency>

配置Kafka

  • 配置server
  • 配置consumer
# Kafka Properties
# 服务器地址
spring.kafka.bootstrap-servers==localhost:9092
#消费者id,可以在consumer.properties查看
spring.kafka.consumer.group.id=mycommunity-consumer-group
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=true
# 自动提交的时间间隔,单位毫秒
spring.kafka.consumer.auto-commit-interval=3000

访问Kafka

  • producer
  • consumer

Spring整合Kafka的例子

package com.nowcoder.mycommunity;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = MyCommunityApplication.class)
public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka(){kafkaProducer.sendMessage("test", "hello");kafkaProducer.sendMessage("test", "world");try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}}
}@Component
class KafkaProducer{@Autowiredpublic KafkaTemplate kafkaTemplate;public void sendMessage(String topic, String content){kafkaTemplate.send(topic, content);}
}@Component
class KafkaConsumer{// 加上listener注解,Spring会自动注入@KafkaListener(topics = {"test"})public void handleMessage(ConsumerRecord record){System.out.println(record.value());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/8278.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习算法基础学习 # 集成学习之随机森林

随机森林(Random Forests) 是集成学习算法的一种。集成学习是通过组合多个学习器来完成学习任务。随机森林是结合多颗决策树来对样本进行训练和预测。随机森林通过随机扰动而令所有的树去相关。 随机森林可以使用巨量的预测器&#xff0c;甚至预测器的数量比观察样本的数量还多…

解决PyInstaller打包selenium脚本时弹出driver终端窗口

解决PyInstaller打包selenium脚本时弹出driver终端窗口 找到service.py C:\Users\XXX\AppData\Roaming\Python\Python39\site-packages\selenium\webdriver\common\service.py添加creationflags 在第77行添加: creationflags134217728使用PyInstaller打包 pyinstaller -F -w -…

Springboot集成magic-api

目录 1、前言 2、springboot集成magic-api 2.1、添加maven依赖 2.2、application.yml配置 2.3、编写测试接口 2.4、启动程序&#xff0c;访问接口 2.5、magic-api脚本 3、magic-api其他语法 4、注意事项 1、前言 今天项目中遇到一个问题&#xff0c;springboot后端项目…

找出一个List中每个元素出现的次数

文章目录 一、需求&#xff1a;找出一个list中&#xff0c;每个元素出现的次数1. 普通实现&#xff08;hashmap&#xff09;&#xff1a;1.1 代码实现&#xff1a;1.2运行结果&#xff1a;1.3 案例分析&#xff1a; 2. 普通实现&#xff08;HashSet#Collections.frequency&…

springboot实现后端防重复提交(AOP+redis分布式锁)单机情况下

文章目录 0、依赖1、自定义接口2、实现redis分布式锁3、统一返回值ReturnT4、CookieUtil5、自定义AOP6、测试 为什么要实现这个功能呢&#xff0c;可能用户在提交一份数据后&#xff0c;可能因为网络的原因、处理数据的速度慢等原因导致页面没有及时将用户刚提交数据的后台处理…

【经典题目分析】数组分割问题

文章目录 698. 划分为k个相等的子集416. 分割等和数组 698. 划分为k个相等的子集 把一个数组&#xff0c;拆分成K个大小一样的子数组。方法可以是状态枚举&#xff0c;或者dfs class Solution { public:bool canPartitionKSubsets(vector<int>& nums, int k) {// 从…

Gateway网关

网关的作用 对用户请求作身份认证、权限校验将用户请求路由到微服务&#xff0c;并实现负载均衡对用户请求作限流 引入依赖 <!--nacos服务注册发现依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter…

pdf如何导出为图片?分享三个方法PDF转图片!

将PDF文件转换为图片是在许多场景下都非常有用的操作&#xff0c;不仅能够保留原始文档的内容&#xff0c;还方便在各种平台上共享和展示。在本文中&#xff0c;我们将介绍三种简便的方法&#xff0c;帮助您将PDF文件快速转换为图片格式。 方法一&#xff1a;使用记灵在线工具…

Linux环境搭建(三)— 搭建数据库服务器

linux &#xff08;ubuntu&#xff09;安装mysql 和环境配置 一、安装MySql二、配置环境三、外网访问四、重置密码五、卸载 写在前面&#xff1a; 本文默认你的Linux系统已经安装vim&#xff0c;yum等&#xff0c;如你使用的是一个全新的操作系统&#xff0c;移步上一篇开始配置…

webpack相关面试题

webpack面试题 1.webpack和vite区别2.如何优化webpack打包速度&#xff1f;3.说说webpack中常见的Plugin&#xff1f;解决了什么问题4.说说如何借助webpack来优化前端性能&#xff1f;如何优化JS代码压缩CSS代码压缩Html文件代码压缩文件大小压缩图片压缩Tree ShakingusedExpor…

rsync 远程同步

rsync 远程同步 一、概念 rsync&#xff08;Remote Sync&#xff0c;远程同步&#xff09; 是一个开源的快速备份工具&#xff0c;可以在不同主机之间镜像同步整个目录&#xff0c;支持增量备份&#xff0c;并保持链接和权限&#xff0c;且采用优化的同步算法&#xff0c;传输…

爬虫入门指南(7):使用Selenium和BeautifulSoup爬取豆瓣电影Top250实例讲解【爬虫小白必看】

文章目录 介绍技术要点SeleniumBeautifulSoupOpenpyxl 实现步骤&#xff1a;导入所需库设置网页URL和驱动路径创建 ChromeDriver 服务配置 ChromeDriver创建 Excel 文件爬取数据关闭浏览器保存 Excel 文件 完整代码导出的excel 效果图未完待续.... 介绍 在本篇博客中&#xff…