什么是RisingWave
RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库。RisingWave 让用户使用操作传统数据库的方式来处理流数据。通过创建实时物化视图,RisingWave 可以让用户轻松编写流计算逻辑,并通过访问物化视图来对流计算结果进行及时、一致的查询。
安装与启动
Docker 环境
docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest playground
通过DataGrip连接
创建表格与物化视图
create table t(v1 int, v2 int);
insert into t values(1,10),(2,20),(3,30);
create materialized view mv as select sum(v1) from t;
查询试图
select * from mv;
导入数据
通过datagen生成数据
CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
CREATE SOURCE s1 (w1 int, w2 int)
WITH (
connector = 'datagen',
fields.w1.kind = 'sequence',
fields.w1.start = '1',
fields.w2.kind = 'random',
fields.w2.min = '-10',
fields.w2.max = '10',
fields.w2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
查询创建的表
show tables;
查询source
show sources;
查询结果
Source不支持直接查询
进行流计算
create materialized view mv_t1 as select count(*) from t1;
create materialized view mv_s1 as select count(*) from s1;
从kafka topic中创建source数据
-- 创建kafka source CREATE SOURCE from_kafka2 (name2 string,id2 string,table2 string ) INCLUDE offsetinclude timestampinclude partition WITH (connector = 'kafka',topic = 'kafka-send2',properties.bootstrap.server = '192.168.5.54:9092' ) FORMAT PLAIN ENCODE json;
kafka中数据格式
查询结果