异源数据同步 → DataX 为什么要支持 kafka?

news/2024/9/21 3:27:25/文章来源:https://www.cnblogs.com/youzhibing/p/18378097

开心一刻

昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我 10 万,要我陪她去上海,我没同意

朋友评论道:你没同意,为什么在上海?

我回复到:上个月没同意

嘴真硬

前情回顾

关于 DataX,官网有很详细的介绍,鄙人不才,也写过几篇文章

异构数据源同步之数据同步 → datax 改造,有点意思

异构数据源同步之数据同步 → datax 再改造,开始触及源码

异构数据源同步之数据同步 → DataX 使用细节

异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密

不了解的小伙伴可以按需去查看,所以了,DataX 就不做过多介绍了;官方提供了非常多的插件,囊括了绝大部分的数据源,基本可以满足我们日常需要,但数据源种类太多,DataX 插件不可能包含全部,比如 kafka,DataX 官方是没有提供读写插件的,大家知道为什么吗?你们如果对数据同步了解的比较多的话,一看到 kafka,第一反应往往想到的是 实时同步,而 DataX 针对的是 离线同步,所以 DataX 官方没提供 kafka 插件是不是也就能理解了?因为不合适嘛!

但如果客户非要离线同步也支持 kafka

人家要嘛

你能怎么办?直接怼过去:实现不了?

实现不了

所以没得选,那就只能给 DataX 开发一套 kafka 插件了;基于 DataX插件开发宝典,插件开发起来还是非常简单的

kafkawriter

  1. 编程接口

    自定义 Kafkawriter 继承 DataX 的 Writer,实现 job、task 对应的接口即可

    /*** @author 青石路*/
    public class KafkaWriter extends Writer {public static class Job extends Writer.Job {private Configuration conf = null;@Overridepublic List<Configuration> split(int mandatoryNumber) {List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);for (int i = 0; i < mandatoryNumber; i++) {configurations.add(this.conf.clone());}return configurations;}private void validateParameter() {this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);}@Overridepublic void init() {this.conf = super.getPluginJobConf();this.validateParameter();}@Overridepublic void destroy() {}}public static class Task extends Writer.Task {private static final Logger logger = LoggerFactory.getLogger(Task.class);private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");private Producer<String, String> producer;private Configuration conf;private Properties props;private String fieldDelimiter;private List<String> columns;private String writeType;@Overridepublic void init() {this.conf = super.getPluginJobConf();fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);columns = conf.getList(Key.COLUMN, String.class);writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null);if (CollUtil.isEmpty(columns)) {throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE,String.format("您提供配置文件有误,[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN));}props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null));props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null));props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));Configuration saslConf = conf.getConfiguration(Key.SASL);if (ObjUtil.isNotNull(saslConf)) {logger.info("配置启用了SASL认证");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE));props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE));String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE);String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE);props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));}producer = new KafkaProducer<String, String>(props);}@Overridepublic void prepare() {if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {ListTopicsResult topicsResult = AdminClient.create(props).listTopics();String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);try {if (!topicsResult.names().get().contains(topic)) {new NewTopic(topic,Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null)));List<NewTopic> newTopics = new ArrayList<NewTopic>();AdminClient.create(props).createTopics(newTopics);}} catch (Exception e) {throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());}}}@Overridepublic void startWrite(RecordReceiver lineReceiver) {logger.info("start to writer kafka");Record record = null;while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完//获取一行数据,按照指定分隔符 拼成字符串 发送出去if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) {producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),recordToString(record),recordToString(record)));} else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) {producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),recordToString(record),recordToKafkaJson(record)));}producer.flush();}}@Overridepublic void destroy() {logger.info("producer close");if (producer != null) {producer.close();}}/*** 数据格式化** @param record* @return*/private String recordToString(Record record) {int recordLength = record.getColumnNumber();if (0 == recordLength) {return NEWLINE_FLAG;}Column column;StringBuilder sb = new StringBuilder();for (int i = 0; i < recordLength; i++) {column = record.getColumn(i);sb.append(column.asString()).append(fieldDelimiter);}sb.setLength(sb.length() - 1);sb.append(NEWLINE_FLAG);return sb.toString();}private String recordToKafkaJson(Record record) {int recordLength = record.getColumnNumber();if (recordLength != columns.size()) {throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM,String.format("您提供配置文件有误,列数不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size()));}List<KafkaColumn> kafkaColumns = new ArrayList<>();for (int i = 0; i < recordLength; i++) {KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i));kafkaColumns.add(column);}return JSONUtil.toJsonStr(kafkaColumns);}}
    }
    

    DataX 框架按照如下的顺序执行 Job 和 Task 的接口

    job_task 接口执行顺序

    重点看 Task 的接口实现

    • init:读取配置项,然后创建 Producer 实例

    • prepare:判断 Topic 是否存在,不存在则创建

    • startWrite:通过 RecordReceiver 从 Channel 获取 Record,然后写入 Topic

      支持两种写入格式:textjson,细节请看下文中的 kafkawriter.md

    • destroy:关闭 Producer 实例

    实现不难,相信大家都能看懂

  2. 插件定义

    resources 下新增 plugin.json

    {"name": "kafkawriter","class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter","description": "write data to kafka","developer": "qsl"
    }
    

    强调下 class,是 KafkaWriter 的全限定类名,如果你们没有完全拷贝我的,那么要改成你们自己的

  3. 配置文件

    resources 下新增 plugin_job_template.json

    {"name": "kafkawriter","parameter": {"bootstrapServers": "","topic": "","ack": "all","batchSize": 1000,"retries": 0,"fieldDelimiter": ",","writeType": "json","column": ["const_id","const_field","const_field_value"],"sasl": {"securityProtocol": "SASL_PLAINTEXT","mechanism": "PLAIN","username": "","password": ""}}
    }
    

    配置项说明:kafkawriter.md

  4. 打包发布

    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkawriter 就算完成了

kafkareader

  1. 编程接口

    自定义 Kafkareader 继承 DataX 的 Reader,实现 job、task 对应的接口即可

    /*** @author 青石路*/
    public class KafkaReader extends Reader {public static class Job extends Reader.Job {private Configuration originalConfig = null;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.validateParameter();}@Overridepublic void destroy() {}@Overridepublic List<Configuration> split(int adviceNumber) {List<Configuration> configurations = new ArrayList<>(adviceNumber);for (int i=0; i<adviceNumber; i++) {configurations.add(this.originalConfig.clone());}return configurations;}private void validateParameter() {this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE);}}public static class Task extends Reader.Task {private static final Logger logger = LoggerFactory.getLogger(Task.class);private Consumer<String, String> consumer;private String topic;private Configuration conf;private int maxPollRecords;private String fieldDelimiter;private String readType;private List<Column.Type> columnTypes;@Overridepublic void destroy() {logger.info("consumer close");if (Objects.nonNull(consumer)) {consumer.close();}}@Overridepublic void init() {this.conf = super.getPluginJobConf();this.topic = conf.getString(Key.TOPIC);this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);if (!ReadType.JSON.name().equalsIgnoreCase(readType)&& !ReadType.TEXT.name().equalsIgnoreCase(readType)) {throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,String.format("您提供配置文件有误,不支持的readType[%s]", readType));}if (ReadType.JSON.name().equalsIgnoreCase(readType)) {List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);if (CollUtil.isEmpty(columnTypeList)) {throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,String.format("您提供配置文件有误,readType是JSON时[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN_TYPE));}convertColumnType(columnTypeList);}Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);Configuration saslConf = conf.getConfiguration(Key.SASL);if (ObjUtil.isNotNull(saslConf)) {logger.info("配置启用了SASL认证");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));}consumer = new KafkaConsumer<>(props);}@Overridepublic void startRead(RecordSender recordSender) {consumer.subscribe(CollUtil.newArrayList(topic));int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);int retries = conf.getInt(Key.RETRIES, 5);if (retries < 0) {logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries);retries = 5;}/*** consumer 每次都是新创建,第一次poll时会重新加入消费者组,加入过程会进行Rebalance,而 Rebalance 会导致同一 Group 内的所有消费者都不能工作* 所以 poll 拉取的过程中,即使topic中有数据也不一定能拉到,因为 consumer 正在加入消费者组中* kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间* 故增加重试*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));int i = 0;if (CollUtil.isEmpty(records)) {for (; i < retries; i++) {records = consumer.poll(Duration.ofMillis(pollTimeoutMs));logger.info("第 {} 次重试,获取消息记录数[{}]", i + 1, records.count());if (!CollUtil.isEmpty(records)) {break;}}}if (i >= retries) {logger.info("重试 {} 次后,仍未获取到消息,请确认是否有数据、配置是否正确", retries);return;}transferRecord(recordSender, records);do {records = consumer.poll(Duration.ofMillis(pollTimeoutMs));transferRecord(recordSender, records);} while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);}private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {if (CollUtil.isEmpty(records)) {return;}for (ConsumerRecord<String, String> record : records) {Record sendRecord = recordSender.createRecord();String msgValue = record.value();if (ReadType.JSON.name().equalsIgnoreCase(readType)) {transportJsonToRecord(sendRecord, msgValue);} else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {// readType = text,全当字符串类型处理String[] columnValues = msgValue.split(fieldDelimiter);for (String columnValue : columnValues) {sendRecord.addColumn(new StringColumn(columnValue));}}recordSender.sendToWriter(sendRecord);}consumer.commitAsync();}private void convertColumnType(List<String> columnTypeList) {columnTypes = new ArrayList<>();for (String columnType : columnTypeList) {switch (columnType.toUpperCase()) {case "STRING":columnTypes.add(Column.Type.STRING);break;case "LONG":columnTypes.add(Column.Type.LONG);break;case "DOUBLE":columnTypes.add(Column.Type.DOUBLE);case "DATE":columnTypes.add(Column.Type.DATE);break;case "BOOLEAN":columnTypes.add(Column.Type.BOOL);break;case "BYTES":columnTypes.add(Column.Type.BYTES);break;default:throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnType));}}}private void transportJsonToRecord(Record sendRecord, String msgValue) {List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);if (columnTypes.size() != kafkaColumns.size()) {throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,String.format("您提供的配置文件有误,readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));}for (int i=0; i<columnTypes.size(); i++) {KafkaColumn kafkaColumn = kafkaColumns.get(i);switch (columnTypes.get(i)) {case STRING:sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue()));break;case LONG:sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue()));break;case DOUBLE:sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue()));break;case DATE:// 暂只支持时间戳sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue())));break;case BOOL:sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue()));break;case BYTES:sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8)));break;default:throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnTypes.get(i)));}}}}
    }
    

    重点看 Task 的接口实现

    • init:读取配置项,然后创建 Consumer 实例

    • startWrite:从 Topic 拉取数据,通过 RecordSender 写入到 Channel 中

      这里有几个细节需要注意下

      1. Consumer 每次都是新创建的,拉取数据的时候,如果消费者还未加入到指定的消费者组中,那么它会先加入到消费者组中,加入过程会进行 Rebalance,而 Rebalance 会导致同一消费者组内的所有消费者都不能工作,此时即使 Topic 中有可拉取的消息,也拉取不到消息,所以引入了重试机制来尽量保证那一次同步任务拉取的时候,消费者能正常拉取消息
      2. 一旦 Consumer 拉取到消息,则会循环拉取消息,如果某一次的拉取数据量小于最大拉取量(maxPollRecords),说明 Topic 中的消息已经被拉取完了,那么循环终止;这与常规使用(Consumer 会一直主动拉取或被动接收)是有差别的
      3. 支持两种读取格式:textjson,细节请看下文的配置文件说明
      4. 为了保证写入 Channel 数据的完整,需要配置列的数据类型(DataX 的数据类型)
    • destroy:

      关闭 Consumer 实例

  2. 插件定义

    resources 下新增 plugin.json

    {"name": "kafkareader","class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader","description": "read data from kafka","developer": "qsl"
    }
    

    classKafkaReader 的全限定类名

  3. 配置文件

    resources 下新增 plugin_job_template.json

    {"name": "kafkareader","parameter": {"bootstrapServers": "","topic": "test-kafka","groupId": "test1","writeType": "json","pollTimeoutMs": 2000,"columnType": ["LONG","STRING","STRING"],"sasl": {"securityProtocol": "SASL_PLAINTEXT","mechanism": "PLAIN","username": "","password": "2"}}
    }
    

    配置项说明:kafkareader.md

  4. 打包发布

    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkareader 也完成了

总结

  1. 完整代码:qsl-datax
  2. kafkareader 重试机制只能降低拉取不到数据的概率,并不能杜绝;另外,如果上游一直往 Topic 中发消息,kafkareader 每次拉取的数据量都等于最大拉取量,那么同步任务会一直进行而不会停止,这还是离线同步吗?
  3. 离线同步,不推荐走 kafka,因为用 kafka 走实时同步更香

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/787374.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

互联工厂数据交换标准:IPC-CFX

本文我们一起了解下IPC-CFX标准产生的背景 和 用途,它是机器设备之间通信的“统一语言”,是大家都懂的“普通话”而不是“方言”。IPC-CFX使用AMQP v1.0传输协议实现安全的连接,使用JSON进行数据编码,提供了明确的消息结构和数据内容,确保即插即用,它或许是工业4.0应用的…

植物大战僵尸合集高速下载

植物大战僵尸合集高速下载

NetScaler Console Release 14.1 Build 29.63 (ESXi, Hyper-V, KVM, Xen) - 集中管理 NetScaler

NetScaler Console Release 14.1 Build 29.63 (ESXi, Hyper-V, KVM, Xen) - 集中管理 NetScalerNetScaler Console Release 14.1 Build 29.63 (ESXi, Hyper-V, KVM, Xen) - 集中管理 NetScaler Formerly known as NetScaler ADM - 集中管理 NetScaler 请访问原文链接:NetScale…

C#/.NET/.NET Core技术前沿周刊 | 第 2 期(2024年8.19-8.25)

前言 C#/.NET/.NET Core技术前沿周刊,你的每周技术指南针!记录、追踪C#/.NET/.NET Core领域、生态的每周最新、最实用、最有价值的技术文章、社区动态、优质项目和学习资源等。让你时刻站在技术前沿,助力技术成长与视野拓宽。欢迎投稿,推荐或自荐优质文章/项目/学习资源等。…

最全!嵌入式STM32单片机开发环境配置教学Win/Mac!!!

嵌入式STM32单片机开发环境配置教学Win/Mac 本教程支持Windows和Mac Windows可选的开发软件为Keil、Clion、STM32CubeMX,可自由选择开发方式 Mac的开发环境为(Clion+OpenOCD+STM32CubeMX),仅支持HAL库 Windows配置教程在Windows上面开发Stm32有多种工具组合,可以单纯使用Kei…

读软件开发安全之道:概念、设计与实施09安全设计

安全设计1. 安全设计 1.1. 过载、混乱和迷惑性并不是信息的属性,而是设计的失败。1.1.1. 爱德华塔夫特1.2. 不应该将系统的安全性留给审查员进行修补1.2.1. 审查员应该在对设计进行审查时,将威胁和缓解作为安全评估的附加1.2.2. 小型企业的运营通常不会那么正式,并且设计师和…

Qt数据库驱动编译(MySQL)

Qt数据库驱动编译(MySQL)哈喽!我是 Pro_er,一名热爱编程的小伙伴。在这里我会分享一些实用的开发技巧和经验心得。如果你也对编程充满热情,欢迎关注并一起交流学习!第一步:安装所需文件 /** * 注意 Qt和MySql的位数要相同都为64或者32位* Qt 安装的时候一定要安装源码…

黑神话:悟空 通关感言

最近这几天一直再肝这个游戏, 今天终于通关了。 没有做全收集,全成就。只做了全图鉴(丢了一个无量蝠),和 第二个不带紧箍咒的结局。花了大概35小时,因为有挂机时间 有时候挂一晚上,与图片上的时间有所出入,问题不大。 整体的难度还可以,对我而言,卡我的只有虎先锋,小…

Truffle框架

这两天上网冲浪的时候,偶然发现一个介绍Truffle这个开发编程语言框架的一个影片:视频比较有意思的就是使用了Graal VM提供的Truffle语言框架区开发了一个编程语言,实现了一些基本的功能,比如四则运算和函数定义。个人看了看,对于想要了解编译器的入门者还是比较有启发的。…

带你了解 WebAssembly 的发展、应用与开发

一、WebAssembly 是什么?“WebAssembly(缩写为 Wasm)是一种基于堆栈式虚拟机的二进制指令集。Wasm 被设计成为一种编程语言的可移植编译目标,并且可以通过将其部署在 Web 平台上,以便为客户端及服务端应用程序提供服务”以上是 wasm 官网给出的一段解释。它运行在虚拟机中…

Nuxt3读取markdown文件

背景 想在Nuxt3中读取markdown以渲染文章。 分析 静态文件一般是放在public中的,但是官方文档中写明:而且,在SSR阶段(服务器渲染),nuxt无法通过fetch来访问public里的内容(虽然不推荐,但是客户端的js是可以通过fetch直接请求到文件的)。 过程 nuxt提供了一个content模…

Easysearch 性能测试方法概要

INFINI Easysearch INFINI Easysearch 是一个分布式的近实时搜索与分析引擎,核心引擎基于开源的 Apache Lucene。Easysearch 衍生自基于开源协议 Apache 2.0 的 Elasticsearch 7.10 版本,完善和支持更多的企业级功能,优化搜索业务场景,以保证更佳的数据探索与分析体验。 Ea…