Kafka 的部署(单机和集群)和 SpringBoot 访问

news/2025/1/30 13:19:25/文章来源:https://www.cnblogs.com/studyjobs/p/18695134

Kafka 由 Scala 和 Java 编写,最初由 LinkedIn 开发,后来成为 Apache 顶级项目,是一种高吞吐量的分布式发布/订阅消息系统。

Kafka 不仅仅是一个消息队列,还支持实时数据处理,其高吞吐、可扩展和持久化特性使其在大数据领域广泛应用。

本篇博客不详细介绍 Kafka,主要聚焦于 Kafka 的快速搭建和使用,在本篇博客最后会提供源代码的下载。

kafka 官网地址:https://kafka.apache.org


一、单机搭建

我的虚拟机地址是 192.168.136.128 ,已经安装好了 docker 和 docker-compose

创建 /data/kafka_single 目录,在其下面创建两个子目录 data 和 ui ,并设置 777 权限,如下所示:

image

编写 docker-compose.yml 文件内容如下:

version: "3.2"
services:kafka:image: bitnami/kafka:latestcontainer_name: kafkarestart: alwaysports:- "9092:9092"volumes:- ./data:/bitnami/kafka- /etc/localtime:/etc/localtimeenvironment:# 节点id,通常设置值都是数字- KAFKA_CFG_NODE_ID=0# 节点角色,因为是单机部署,所以同时承担 controller 和 broker 两个角色- KAFKA_CFG_PROCESS_ROLES=controller,broker# 用来进行选举的 Controller 节点,如果有多个 Controller 能够参与选举,则都需要写上- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093# 监听的端口:9092 是提供数据服务的端口,9093 是多个 Controller 之间选举的端口- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093# 注意,这里要配置 docker 所在的服务器 ip 地址,不是 docker 内部的 ip 地址- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.136.128:9092# 监听器名称和安全协议之间的映射- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT# 控制器使用的侦听器名称- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER# 用于 broker 之间通信的侦听器名称- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXTkafka-ui:container_name: kafka-uiimage: provectuslabs/kafka-ui:latestrestart: alwaysports:- 9090:8080environment:# 启用动态配置,可通过向导界面配置kafka的连接信息- DYNAMIC_CONFIG_ENABLED=truevolumes:- ./ui:/etc/kafkaui

然后在 docker-compose.yml 文件所在目录运行 docker-compose up -d 启动服务即可。

我们部署了 KafkaUI 的 Web 可视化界面,根据上面的 yml 配置,访问 http://192.168.136.128:9090 即可。

对于 Kafka 的参数配置,可以参考官网介绍:https://kafka.apache.org/documentation/#brokerconfigs

对于 Broker 的配置项,比如从官网上看到一个配置 auto.create.topics.enable,将点(.)修改为下划线(_),然后在其前面加上 kafka_cfg 前缀,最后全部改为大写字母即可作为 docker 的环境变量进行配置,即 KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true,该配置项表示在使用 Kafka 收发消息时,如果 Topics 不存在,是否自动创建 Topics,官方默认配置就是 true。

对于 KafkaUI 来说,使用起来非常简单,这里就不做详细介绍,其官网地址是:https://docs.kafka-ui.provectus.io

这里也推荐一款客户端工具 OffsetExplorer(之前名称是 KafkaTool),使用起来也很方便,其官网地址是:https://www.kafkatool.com


二、集群部署

这里在一台虚拟机上,使用不同的端口部署 3 个 Kafka 节点,作为集群部署的演示。

创建 /data/kafka_cluster 目录,在其下面创建相关的子目录 ,并设置 777 权限,如下所示:

image

编写 docker-compose.yml 文件内容如下:

version: "3.2"
services:kafka-1:container_name: kafka-1image: bitnami/kafka:latestrestart: alwaysports:- "9091:9091"environment:# KRaft settings- KAFKA_CFG_NODE_ID=0- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-1:9099,1@kafka-2:9099,2@kafka-3:9099- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv# Listeners- KAFKA_CFG_LISTENERS=PLAINTEXT://:9091,CONTROLLER://:9099# 注意:这里需要配置成 docker 所在的服务器的 ip 地址- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.136.128:9091- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXTvolumes:- ./data1:/bitnami/kafka- /etc/localtime:/etc/localtimekafka-2:container_name: kafka-2image: bitnami/kafka:latestrestart: alwaysports:- "9092:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=1- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-1:9099,1@kafka-2:9099,2@kafka-3:9099- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv# Listeners- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9099- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.136.128:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXTvolumes:- ./data2:/bitnami/kafka- /etc/localtime:/etc/localtimekafka-3:container_name: kafka-3image: bitnami/kafka:latestrestart: alwaysports:- "9093:9093"environment:# KRaft settings- KAFKA_CFG_NODE_ID=2- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-1:9099,1@kafka-2:9099,2@kafka-3:9099- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv# Listeners- KAFKA_CFG_LISTENERS=PLAINTEXT://:9093,CONTROLLER://:9099- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.136.128:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXTvolumes:- ./data3:/bitnami/kafka- /etc/localtime:/etc/localtimekafka-ui:container_name: kafka-uiimage: provectuslabs/kafka-ui:latestrestart: alwaysports:- 9090:8080environment:- DYNAMIC_CONFIG_ENABLED=truevolumes:- ./ui:/etc/kafkaui

然后在 docker-compose.yml 文件所在目录运行 docker-compose up -d 启动服务即可。


三、SpringBoot 收发消息

新建一个名称为 springboot_kafka 的 springboot 工程,里面包含 2 个子工程:kafka_producer 和 kafka_consumer

image

kafka_producer 是生产者,向 kafka 发送消息

kafka_consumer 是消费者,从 kafka 拉取消息进行处理

两个子工程的 pom 文件没有引用依赖包,全部继承使用父工程 pom 文件的依赖包,父工程 pom 文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.jobs</groupId><artifactId>springboot_kafka</artifactId><version>1.0</version><modules><module>kafka_producer</module><module>kafka_consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.5</version></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency><!--springboot 自带 kafka 依赖,引入该依赖包即可--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>
</project>

Kafka_producer 生产者发送消息

kafka_producer 生产者,其 application.yml 配置文件内容如下:

spring:kafka:# kafka 服务器连接地址和端口,多个服务器之间使用英文逗号分隔bootstrap-servers: 192.168.136.128:9092producer:# key 和 value 使用 string 序列化和反序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 当遇到网络等相关问题时,最多重试 1 次retries: 1# 事务前缀,主要启动事务,则 ack 必须为 -1 或 all# 如果启用了事务,则其它相关方法上面,需要增加 @Transactional 注解#transaction-id-prefix: ktx_# acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应# acks= -1 或 all 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应# 开启事务时,必须设置为 -1 或 all#acks: -1# 有关 kafka 的相关配置内容,详见官网:https://kafka.apache.org/documentation/#configuration

kafka_producer 发送消息的代码在 ProcducerTest 测试类中编写,代码如下:

package com.jobs;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;@Slf4j
@SpringBootTest
public class ProducerTest {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;/*** kafka 默认都是异步发送消息* 如果向不存在的 topic 发送消息,默认会创建该 topic*/@Testvoid test1() {kafkaTemplate.send("Topic1", "hello kafka");}/*** 带有回调函数的异步消息发送(可以监听消息发送是否成功)*/@Testvoid asyncSendMessageWithCallBack() throws Exception {kafkaTemplate.send("Topic1", "我的中文消息异步发送").addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {RecordMetadata record = result.getRecordMetadata();log.info("发送消息成功:" +record.topic() + "-" + record.partition() + "-" + record.offset());}});//为了能够收到回调消息,这里等待 1 秒钟再结束程序Thread.sleep(1000);}/*** 同步发送消息*/@Testvoid syncSendMessage() {try {SendResult<String, Object> result = kafkaTemplate.send("Topic1", "hello kafka").get();RecordMetadata record = result.getRecordMetadata();log.info("发送的 toppic 为:" + record.topic() + ",发送的分区为:" + record.partition() +",发送的消息所在的 offset 为:" + record.offset());} catch (Exception e) {log.error(e.getMessage());}}/*** 发送多条消息给 Topic3 进行处理*/@Testvoid test2() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("Topic2", "hello kafka " + i);}}/*** 采用事务发送消息,代码中如果报错,则已经发送成功的消息将被撤回,消费者无法获取到消息。* 注意:需要在 producer 程序的 application.yml 中,启用 transaction-id-prefix 配置,而且 acks 必须为 -1* 其它的没有使用事务发送消息的方法,必须标注*/@Testvoid test3() {kafkaTemplate.executeInTransaction(operations -> {operations.send("Topic3", "采用事务发送的消息");//如果抛出异常,上面发出去的消息会撤回,消费者无法获取到消息throw new RuntimeException("fail");//return null;});}
}

对于 ProcducerConfig 配置类,里面主要是配置了监听器,对所有发送消息的状态进行监控,在实际项目中很少使用,这里只是展示一下:

package com.jobs.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ProducerConfig {@AutowiredProducerFactory producerFactory;/*** 为 KafkaTemplate 设置一个监听器,监测消息发送的状态*/@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {log.info("发送成功: " + producerRecord.value());}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {log.error("发送失败: " + producerRecord.value() + " ---> exception=" + exception.getMessage());}});return kafkaTemplate;}
}

Kafka_consumer 消费者拉取和处理消息

kafka_consumer 消费者,其 application.yml 配置文件内容如下:

server:port: 8010
spring:kafka:# kafka 服务器连接地址和端口,多个服务器之间使用英文逗号分隔bootstrap-servers: 192.168.136.128:9092consumer:# 每个 consumer 必须被分配到一个组中# 对于同一个 topic 来说,相同组内的多个 consumer 一定会拿到不同的消息# 可以在 application.yml 中配置,也可以在消费者处理程序的 @KafkaListener 注解中配置group-id: test# 是否启用自动提交 offset# 如果消息相对比较重要的话,建议自己手动提交。如果是不太重要的信息(如日志信息),建议使用自动提交enable-auto-commit: false# 当 kafka 中没有初始 offset 或 offset 超出范围时将自动重置 offset# earliest: 重置为分区中最小的 offset;# latest: 重置为分区中最新的 offset (从监听的时刻开始,当消费分区中有新数据时,才会进行消费);# none: 只要有一个分区不存在已提交的 offset,就抛出异常;auto-offset-reset: latest# key 和 value 使用 string 序列化和反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# record 当每一条记录被消费者监听器处理之后提交# batch 当每一批 poll() 的数据被消费者监听器处理之后提交# time 当每一批 poll() 的数据被消费者监听器处理之后,距离上次提交时间大于 time 时提交# count 当每一批 poll() 的数据被消费者监听器处理之后,被处理 record 数量大于等于 count 时提交# count_time  两种条件:time 或者 count 有一个条件满足时提交# manual 当每一批 poll() 的数据被消费者监听器处理之后, 手动调用 Acknowledgment.acknowledge() 后提交# manual_immediate 手动调用 Acknowledgment.acknowledge() 后立即提交ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 如果设置为 single,消费处理程序使用 ConsumerRecord 接收,表示单条消息# 如果设置为 batch,消费处理程序使用 ConsumerRecords 接收,表示多条消息type: single# 有关 kafka 的相关配置内容,详见官网:https://kafka.apache.org/documentation/#configuration

kafka_consumer 拉取和处理消息的代码在 ConsumerListener 类中编写,代码如下:

package com.jobs.listener;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConsumerListener {//逐个处理 Topic1 中的每条消息,使用 ConsumerRecord 表示获取到一条消息@KafkaListener(topics = {"Topic1"}, errorHandler = "consumerErrorHandler")public void listener1(ConsumerRecord<String, Object> record, Acknowledgment ack) {log.info("toppic 名称:" + record.topic() + ",分区为:" + record.partition() +",消息内容为:" + record.value());//如果想要测试异常处理,可以取消以下代码注释//Integer result = 1 / 0;//因为在 application.yml 配置的 enable-auto-commit 为 false,因此需要手动提交确认 offsetack.acknowledge();}//批量处理 Topic2 中的消息,使用 ConsumerRecords 表示获取到的多条消息//注意:需要将 application.yml 中的 listener.type 修改为 batch@KafkaListener(topics = {"Topic2"}, errorHandler = "consumerErrorHandler")public void listener2(ConsumerRecords<String, Object> records, Acknowledgment ack) {for (ConsumerRecord<?, ?> record : records) {log.info("toppic 名称:" + record.topic() + ",分区为:" + record.partition() +",消息内容为:" + record.value());}ack.acknowledge();}//接收 producer 事务提交的消息//如果 producer 发送过程中报错,已经发送成功的消息,将被撤销,consumer 将获取不到消息@KafkaListener(topics = {"Topic3"}, errorHandler = "consumerErrorHandler")public void listener3(ConsumerRecord<String, Object> record, Acknowledgment ack) {log.info("toppic 名称:" + record.topic() + ",分区为:" + record.partition() +",消息内容为:" + record.value());ack.acknowledge();}
}

ConsumerConfig 配置类,配置了异常处理的方法(在消费消息的方法上面通过 @KafkaListener 注解的 errorHandler 属性进行配置),代码如下:

package com.jobs.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConsumerConfig {//消费异常处理方法,可以配置在所有的消费者监听程序上//在 @KafkaListener 注解的 errorHandler 属性@Beanpublic ConsumerAwareListenerErrorHandler consumerErrorHandler() {return (message, exception, consumer) -> {log.error("触发了消息异常处理程序,消息内容:" + message.getPayload() +",消费异常信息:" + exception.getMessage());return null;};}
}

以上就是 Kafka 的简单使用介绍,能够确保快速运用到实际项目中。有关 Kafka 的深入学习和运用请参考官网或其它资料。

经过多个项目的实践经验来看,我个人认为:如果【重要业务】需要使用到消息队列的话,推荐使用 RabbitMQ 或 RocketMQ。对于【非核心业务】如实时日志采集、实时数据统计展示、大数据场景,推荐使用 kafka。毕竟每种技术都有自己最适合的领域,这里不做过多的讨论。

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/springboot_kafka.zip

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/877034.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DeepSeek火爆全网,官网宕机?本地部署一个随便玩「LLM探索」

前言 最近 DeepSeek 狠狠刷了一波屏,国产大模型真的越来越厉害了👍,官方的服务器已经爆满了,以至于频繁出现反应迟缓甚至宕机的情况,和两年多之前 ChatGPT 的遭遇颇为相似。 我已经好久没有本地部署模型了(现在各厂商的模型都便宜量大),这次正好来试试 DeepSeek 开源模…

CF1000

A link首先,对于一个数(比如说\(x\)),它和它加一一定互质(也就是\(x\)和\(x+1\)一定互质),那么它和它加一组成的区间(\([x,x+1]\))一定是好区间,也一定是最小好区间,因为除了本身\([x,x+1]\)、两个数\([x,x]\),\([x+1,x+1]\)和空集不包含其他区间了,而相等两个数一…

大模型

目录大模型的演变大模型的使用与训练大模型的特点与分类大模型的工作流程大模型的应用 大模型的演变机器学习:深度学习:大模型的使用与训练 大模型的特点与分类 大模型的工作流程 大模型的应用本文来自博客园,作者:chuangzhou,转载请注明原文链接:https://www.cnblogs.co…

“星门计划对AI未来的意义——以及谁将掌控它”

“星门计划对AI未来的意义——以及谁将掌控它”图片由DALL-E 3生成就在几天前,唐纳德特朗普宣布了“星门计划”,OpenAI随即跟进,分享了更多细节。他们明确表示,计划在未来四年内投资5000亿美元,在美国为OpenAI构建一个全新的AI基础设施。这让我颇感意外,尤其是考虑到埃隆…

A Critique of ANSI SQL Isolation Levels.18695069

原文:A critique of ANSI SQL isolation levels摘要:ANSI SQL-92[MS, ANSI]使用脏读、不可重复读以及幻读现象(phenomena)定义了隔离级别,本论文展示了这些现象,以及ANSI SQL定义并无法合适的描述众多流行的隔离级别,包括(ANSI标准)所涵盖的级别的标准锁实现。我们还介…

HTML, CSS

什么是 HTML、CSS HTML (HyperText Markup Language): 超文本标记语言. 超文本: 超越了普通文本的限制, 比普通文本更加强大. 除了文字信息, 还可以定义图片、音频、视频等内容. 标记语言: 由标签构成的语言. HTML 标签都是预定义好的。例如: 使用 <a> 展示超链接,使用 &…

Cisco NX-OS System Software - ACI 16.1(2f) - 适用于 ACI 模式下的 Nexus 9000 系列交换机系统软件

Cisco NX-OS System Software - ACI 16.1(2f) - 适用于 ACI 模式下的 Nexus 9000 系列交换机系统软件Cisco NX-OS System Software - ACI 16.1(2f) 适用于 ACI 模式下的 Cisco Nexus 9000 系列交换机系统软件 请访问原文链接:https://sysin.org/blog/cisco-aci-16/ 查看最新版…

Cisco APIC 6.1(2f)F - 应用策略基础设施控制器

Cisco APIC 6.1(2f)F - 应用策略基础设施控制器Cisco APIC 6.1(2f)F - 应用策略基础设施控制器 Application Policy Infrastructure Controller (APIC) 请访问原文链接:https://sysin.org/blog/cisco-apic-6/ 查看最新版。原创作品,转载请保留出处。 作者主页:sysin.org思科…

[搬运自 qq 空间] 19 北大冬令营小结

PKU 冬令营19北大冬令营小结 北大冬令营刚刚结束 , 以下是这两天以来笔者的经历。 Day1 比赛日 上午开营仪式 , 整个过程大概就是讲了一下北大计算机学科有哪些优势 , 比较无趣。 12 : 40的时候来到机房准备考试 , 1 : 00钟时 , 比赛正式开始。 首先浏览了一下A题 , 是吉…

X3ctf 比赛 Write Up

X3ctf Write Up 1. Misc p11n-trophy(签到题): 题目描述:我们首先会得到这样一份证书:第一题签到题的答案就是证书下面正中间的“This certificate does not grant the rank of Master"。 trophy-plus + trophy-plus64: 这两道目描述一模一样其中一个flag是藏在certif…

python--用户意见

https://www.python.org/about/quotes/