一、引言
今天跟着 官方文档 基于docker玩一把Pulsar IO吧
二、概要
-
在用户能够轻松的将消息队列跟其他系统(数据库、其他消息系统)一起使用时,消息队列的作用才是最强大的。而Pulsar IO connectors可以让你很轻松的创建、部署以及管理这些跟外部系统的连接,例如mysql、kafka、cassandra等。
-
Pulsar connector分为Source和Sink两种,Source connector会将数据从外部系统喂给Pulsar,而Sink connector负责将数据从Pulsar喂给外部系统。
-
Pulsar connector是一种特殊的Function,只不过这个Function持有其他系统的客户端作为pulsar与其他系统的桥梁,它在处理保证上跟Function是一致的,分别是最多一次、至少一次、精准一次。处理保证不仅依靠Pulsar,还跟外部系统相关以及实现逻辑相关。
- 最多一次:发给connector的消息最多处理一次或者不做处理
- 至少一次:发给connector的消息处理一次或者多次
- 精准一次:发给connector的消息只处理一次
三、实战
1.安装connector
-
在 这里 下载对应的connector,先选择对应的版本,在点进 connectors 目录选择对应的source或者sink
-
将下载的nar文件放到pulsar安装地址的connectors 目录下(没有则需要创建)
-
启动Pulsar
-
通过指令查看服务connector信息,先输出下面这样的信息就说明connector已经注册到Pulsar上面了
curl -s http://localhost:8080/admin/v2/functions/connectors
2. 安装Cassandra
-
基于
brew install --cask --appdir=/Applications docker
安装docker(仅针对mac环境) -
基于docker运行 cassandra,成功运行后通过
docker ps
可以看到Cassandra服务已经起来了docker run -d --rm --name=cassandra -p 9042:9042 cassandra:3.11
-
通过
docker exec -ti cassandra cqlsh localhost
进入Cassandra服务的容器,并通过以下指令进行库表的初始化CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};USE pulsar_test_keyspace;CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
-
先查询该表确保没有数据
select * from pulsar_test_table;
3. 功能验证
-
写配置文件cassandra-sink.yml
configs:roots: "localhost:9042"keyspace: "pulsar_test_keyspace"columnFamily: "pulsar_test_table"keyname: "key"columnName: "col"
-
启动写Cassandra的sink,启动后通过指令查看显示sink已经正常启动
pulsar-admin sinks create \--tenant public \--namespace default \--name cassandra-test-sink \--sink-type cassandra \--sink-config-file examples/cassandra-sink.yml \--inputs test_cassandra
-
执行命令批量往pulsar中写入数据,看是否会正常输出到Cassandra中
for i in {0..9}; do pulsar-client produce -m "key-$i" -n 1 test_cassandra; done
-
由于上面的操作是有延迟的,所以不断的查询Cassandra的表是可以看到数据在逐步的增加,并最终写满十条数据
四、总结
纸上得来终觉浅,绝知此事要躬行。
学习不能仅仅停留在纸面上或者理论,脱离使用去探讨设计或者源码都是不切实际的。因此今天一起体验了一把Pulsar IO,除此之外Pulsar还提供了非常丰富的跟其他系统交互的Connector,详细可以看上面发的下载地址并尝试使用自己感兴趣的Connector感受下实操的快乐~