1 Knative Event Flow
-
Importer
- 连接至期望使用的第3方消息系统
- 基于HTTP协议POST CloudEvents到Channel、Broker、Sequence/Parallel或Service/KService
-
Channel
- 支持多路订阅
- 为订阅者“持久化”消息数据
-
Service
- 接收CloudEvents
- (可选)回复处理后的数据
-
Sequence
- Kubernetes CRD资源类型
- 串联多个Processor
- 由多个有序的Step组成,每个step定义一个Subscriber
- Step间的channel,由channeltemplate定义
-
Parallel:根据不同的过滤条件对事件进行选择处理
- Kubernetes CRD资源类型
- 由多个条件式的并行Branch组成,每个Branch由一对Filter及Subscriber组成
- 每个Filter即为一个Processor
- Channel由由ChannelTemplate定义
2 Sequence Flow示例
-
Sequence资源的期望状态(spec)主要包括三个字段
- steps
- 定义序列中的目的地(processor)列表
- 按顺序调用
- 每个step中还可以定义投递属性,主要定义event投递失败时的处理方案
- channelTemplate
- 指定要使用的ChannelCRD
- 未指定时,将使用当前的namespace或者cluster中默认的Channel CRD
- reply
- 序列中最后一个subscriber处理后的信息所要发往的目的地
- steps
-
示例环境说明
- Curl命令负责生成event
- Event由Sequence中的各Step顺次处理
- 各Step都运行一个appender应用
- 分别向收到的数据尾部附加自定义的专有数据项
- 最终结果发往ksvc/event-display,进行展示
-
准备实践环境
-
定义Sequence里,三个step中的knative service
kn service apply sq-appender-01 --image ikubernetes/appender --env MESSAGE=" - Handled by SQ-01"kn service apply sq-appender-02 --image ikubernetes/appender --env MESSAGE=" - Handled by SQ-02"$ kn service apply sq-appender-03 --image ikubernetes/appender --env MESSAGE=" - Handled by SQ-03"
apiVersion: serving.knative.dev/v1 kind: Service metadata:name: sq-appender-01 spec:template:spec:containers:- image: ikubernetes/appenderenv:- name: MESSAGEvalue: " - Handled by SQ-01" --- apiVersion: serving.knative.dev/v1 kind: Service metadata:name: sq-appender-02 spec:template:spec:containers:- image: ikubernetes/appenderenv:- name: MESSAGEvalue: " - Handled by SQ-02" --- apiVersion: serving.knative.dev/v1 kind: Service metadata:name: sq-appender-03 spec:template:spec:containers:- image: ikubernetes/appenderenv:- name: MESSAGEvalue: " - Handled by SQ-03" ---
-
负责最后接收事件的Kservice/event-display
kn service apply event-display --image=ikubernetes/event_display --port 8080 --scale-min 1 -n event-demo
--- apiVersion: serving.knative.dev/v1 kind: Service metadata:name: event-display spec:template:metadata:annotations:autoscaling.knative.dev/min-scale: "1"spec:containers:- image: ikubernetes/event_displayports:- containerPort: 8080
-
创建Sequence资源
apiVersion: flows.knative.dev/v1 kind: Sequence metadata:name: sq-demo spec:channelTemplate:apiVersion: messaging.knative.dev/v1kind: InMemoryChannelsteps:- ref:apiVersion: serving.knative.dev/v1kind: Servicename: sq-appender-01- ref:apiVersion: serving.knative.dev/v1kind: Servicename: sq-appender-02- ref:apiVersion: serving.knative.dev/v1kind: Servicename: sq-appender-03reply:ref:kind: ServiceapiVersion: serving.knative.dev/v1name: event-display
查看sequence资源:
kubectl get sequences # URL中的地址,即为当前sequence的调用接口
-
测试
-
创建一个客户端Pod,使用curl命令基于HTTP协议推送event
kubectl run client-$RANDOM --image=ikubernetes/admin-box:v1.2 --restart=Never --rm -it --command -- /bin/bash
-
在启动的客户端Pod中,向Sequence/sq-demo的URL发起事件推送事件即可启动测试
curl -v "http://sq-demo-kn-sequence-0-kn-channel.default.svc.cluster.local" -X POST -H "Content-Type: application/cloudevents+json" \ -d '{"id": "0", "specversion": "1.0", "type": "com.icloud2native.sayhi", "source": "Curl", "data": {"message":"Hello Knative Eventing Flow"}}'
-
在最后一个event-display的日志中查看事件
-
-
3 Parallel Flow 示例
-
Parallel资源的期望状态(spec)主要包括三个字段
- branches
- 每个分支由一个filter和一个subscriber组成
- filter负责定义过滤器,将符合条件的事件发给相应的subscriber
- subscriber负责定义processor,将filter过滤的事件做相应处理
- 结果发往reply或全局的reply
- channelTemplate
- 指定要使用的Channel CRD
- 未指定时,将使用当前namespace或者Cluster中默认的Channel CRD
- reply
- 为所有branch处理后的信息定义一个全局目的地
- branches
-
示例环境说明
- Curl命令负责生成event
- Parallel中有两个Branch
- 使用kservice/image-filter作为Filter,筛选类型为“com.icloud2native.file.image”的事件,相应的Subscriber为ksvc/paraappender-image,负责将该类事件信息标识为Image;
- 使用kservice/text-filter作为Filter,筛选类型为“com.icloud2native.file.text”的事件,相应的Subscriber为ksvc/para-appender-text ,负责将该类事件信息标识为Text;
- 所有分支的最终结果均发往ksvc/event-display,内容格式化CloudEvents存储入日志
-
具体步骤
-
创建两个filter
kn service apply image-filter --image villardl/filter-nodejs:0.1 --env FILTER='event.type == "com.icloud2native.file.image"' kn service apply text-filter --image villardl/filter-nodejs:0.1 --env FILTER='event.type == "com.icloud2native.file.text"'
apiVersion: serving.knative.dev/v1 kind: Service metadata:name: image-filter spec:template:spec:containers:- image: villardl/filter-nodejs:0.1env:- name: FILTERvalue: |event.type == "com.icloud2native.file.image" --- apiVersion: serving.knative.dev/v1 kind: Service metadata:name: text-filter spec:template:spec:containers:- image: villardl/filter-nodejs:0.1env:- name: FILTERvalue: |event.type == "com.icloud2native.file.text" ---
-
创建两个subscriber
kn service apply para-appender-image --image ikubernetes/appender --env MESSAGE=" - filetype/Image" kn service apply para-appender-text --image ikubernetes/appender --env MESSAGE=" - filetype/Text"
--- apiVersion: serving.knative.dev/v1 kind: Service metadata:name: para-appender-image spec:template:spec:containers:- image: ikubernetes/appenderenv:- name: MESSAGEvalue: " - filetype/Image" --- apiVersion: serving.knative.dev/v1 kind: Service metadata:name: para-appender-text spec:template:spec:containers:- image: ikubernetes/appenderenv:- name: MESSAGEvalue: " - filetype/Text" ---
-
负责最后接收事件的Kservice/event-display
kn service apply event-display --image=ikubernetes/event_display --port 8080 --scale-min 1
-
创建并查看Parallel资源
--- apiVersion: flows.knative.dev/v1 kind: Parallel metadata:name: filetype-parallel spec:channelTemplate:apiVersion: messaging.knative.dev/v1kind: InMemoryChannelbranches:- filter:ref:apiVersion: serving.knative.dev/v1kind: Servicename: image-filtersubscriber:ref:apiVersion: serving.knative.dev/v1kind: Servicename: para-appender-image- filter:ref:apiVersion: serving.knative.dev/v1kind: Servicename: text-filtersubscriber:ref:apiVersion: serving.knative.dev/v1kind: Servicename: para-appender-textreply:ref:apiVersion: serving.knative.dev/v1kind: Servicename: event-display
查看parallel资源
kubectl get parallels# URL中的地址,即为当前Parallel资源的调用接
-
测试
-
创建一个客户端Pod,使用curl命令基于HTTP协议推送event
kubectl run client-$RANDOM --image=ikubernetes/admin-box:v1.2 --restart=Never --rm -it --command -- /bin/bash
-
在启动的客户端Pod中,向Parallel/filetype-parallel资源的URL发起事件推送事件即可启动测试
# 首先推送一个类型为com.icloud2native.file.image的事件curl -v "http://filetype-parallel-kn-parallel-kn-channel.default.svc.cluster.local" -X POST \ -H "Ce-Id: 0" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: com.icloud2native.file.image" \ -H "Ce-Source: Curl" \ -H "Content-Type: application/json" \ -d '{"message": "A uploaded file by a user"}'# 然后再推送一个类型为com.icloud2native.file.image的事件curl -v "http://filetype-parallel-kn-parallel-kn-channel.default.svc.cluster.local" -X POST \ -H "Ce-Id: 0" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: com.icloud2native.file.text" \ -H "Ce-Source: Curl" \ -H "Content-Type: application/json" \ -d '{"message": "A uploaded file by a user"}'
-
在ksvc/event-display的日志中查看事件数据,验证事件是否被不同的branch做了不同的处理
-
-