本文主要是为Flink的java客户端使用和flink-sql使用的大致介绍,具体使用查看文档页面。
java client使用
文档
Apache Flink Documentation | Apache Flink
数据处理模型
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink_test</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- Apache Flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.15.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.15.4</version></dependency><!-- Apache Flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.4</version></dependency><!-- Kafka Client --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.1</version></dependency><!--json--><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20210307</version></dependency><!-- 解决 No ExecutorFactory found to execute the application--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.15.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.15.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies><!--build fat jar--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><mainClass>com.KafkaDataProcessor</mainClass></manifest><manifestEntries><Encoding>UTF-8</Encoding></manifestEntries></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><compilerArgs><arg>-Xlint:unchecked</arg><arg>-Xlint:deprecation</arg></compilerArgs></configuration></plugin></plugins></build>
</project>
代码样例
读取kafka并打印结果
KafkaFlinkExample
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkExample {public static void main(String[] args) throws Exception {// 设置 Flink 程序的执行环境// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.10.153:9092");props.setProperty("group.id", "test");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);env.addSource(consumer).map(data -> "Received: " + data).print();env.execute("Kafka Flink Example");}
}
处理kafka数据并保存结果入新的topic
KafkaDataProcessor
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;import java.util.Properties;public class KafkaDataProcessor {public static void main(String[] args) throws Exception {// 设置 Flink 程序的执行环境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建一个本地流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 设置 Kafka 的配置信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.10.153:9092");properties.setProperty("group.id", "flink-consumer-group");// 创建 Kafka 消费者,并从指定的 topic 中读取数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);// 将 JSON 数据解析并添加性别字段DataStream<String> processedDataStream = kafkaDataStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {// 解析 JSON 数据JSONObject jsonObject = new JSONObject(value);String name = jsonObject.getString("name");int id = jsonObject.getInt("id");int age = jsonObject.getInt("age");// 根据姓名判断性别String gender;if (name.equals("jack")) {gender = "male_xxx";} else {gender = "female_xxx";}// 构造新的 JSON 数据JSONObject newJsonObject = new JSONObject();newJsonObject.put("name", name);newJsonObject.put("id", id);newJsonObject.put("age", age);newJsonObject.put("gender", gender);return newJsonObject.toString();}});// 创建 Kafka 生产者,并将新的数据写入指定的 topicFlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties);processedDataStream.addSink(kafkaProducer);// 执行程序env.execute("Kafka Data Processor");}
}
设置执行并行度
LocalWebUI
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class LocalWebUI {public static void main(String[] args) throws Exception {//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Configuration configuration = new Configuration();//创建一个带webUI的本地执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism = env.getParallelism();Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.10.153:9092");props.setProperty("group.id", "test");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);System.out.println("执行环境的并行度:" + parallelism);
// DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);DataStreamSource<String> lines = env.addSource(consumer);int parallelism1 = lines.getParallelism();System.out.println("socketTextStream创建的DataStreamSource的并行度:" + parallelism1);SingleOutputStreamOperator<String> uppered = lines.map(line -> line.toUpperCase());int parallelism2 = uppered.getParallelism();System.out.println("调用完map方法得到的DataStream的并行度:" + parallelism2);DataStreamSink<String> print = uppered.print();int parallelism3 = print.getTransformation().getParallelism();System.out.println("调用完print方法得到的DataStreamSink的并行度:" + parallelism3);env.execute();}
}
本地执行
Flink可以和Spark类似,开发过程中,在本地临时执行,需要两个条件
1.需要flink-client依赖引入,否则会报No ExecutorFactory found to execute the application
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.15.4</version> </dependency>
2.设置flink的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
集群执行
打包
mvn clean package
提交任务
flink run -c com.KafkaDataProcessor /root/flink_test-1.0-SNAPSHOT-jar-with-dependencies.jar
观察任务状态
Job---->>Running Jobs
结束任务
Job---->>Running Jobs--->>点击任务---->>Cannel Job
flink-sql
文档
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/
需求案例
汇总kafka数据,将结果保存入mysql中
依赖准备
mysql版本是8.0.25,flink版本是1.15.4,connector的版本一定要和flink版本保持一致。所有集群节点的lib一定要保持一致然后重启。
依赖下载位置:Central Repository:
mysql-connector-java-8.0.25.jar
flink-connector-jdbc-1.15.4.jar
kafka-clients-2.8.1.jar
flink-connector-kafka-1.15.4.jar
启动客户端
cd /root/flink-1.15.4/bin
./sql-client.sh
准备结果表
CREATE TABLE sync_test_1 (`day_time` varchar(64) NOT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
sql配置
create table flink_test_1 ( id BIGINT,day_time VARCHAR,amnount BIGINT,proctime AS PROCTIME ()
)with ( 'connector' = 'kafka','topic' = 'flink_test','properties.bootstrap.servers' = '192.168.10.153:9092', 'properties.group.id' = 'flink_gp_test1','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '192.168.10.153:2181/kafka');CREATE TABLE sync_test_1 (day_time string,total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.10.151:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false','table-name' = 'sync_test_1','username' = 'root','password' = '123456');INSERT INTO sync_test_1
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;
测试数据
./bin/kafka-console-producer.sh --bootstrap-server 192.168.10.153:9092 --topic flink_test
{"day_time": "20201009","id": 7,"amnount":20}
查看数据结果
来源:
docs/sql_demo/demo_1.md · 无情(朱慧培)/flink-streaming-platform-web - Gitee.com
flink-sql大量使用案例_flink sql使用_第一片心意的博客-CSDN博客