共計 5617 個字符,預計需要花費 15 分鐘才能閱讀完成。
這篇“Argo Event 云原生的事件驅動架構怎么用”文章的知識點大部分人都不太理解,所以丸趣 TV 小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Argo Event 云原生的事件驅動架構怎么用”文章吧。
1、Argo Event 簡介
Event 事件大家都很熟悉,可以說 Kubernetes 就是完全由事件驅動的,不同的 controller manager 本質就是實現了不同的事件處理函數,比如所有 ReplicaSet 對象是由 ReplicaSetController 控制器管理,該控制器通過 Informer 監聽 ReplicaSet 以及其關聯的 Pod 的事件變化,從而維持運行狀態和我們聲明 spec 保持一致。
當然 Kubernetes 無論是什么 Controller,其監聽和處理的都是內部事件,而在應用層上我們也有很多外部事件,比如 CICD 事件、Webhook 事件、日志事件等等,如何處理這些事件呢,目前 Kubernetes 原生是無法實現的。
當然你可以自己實現一個 event handler 運行在 Kubernetes 平臺,不過實現難度也不小。而 Argo Event 組件完美解決了這個問題。
如圖是 Argo Event 官方提供的的流程圖:
首先事件源 EventSource 可以是 Webhook、S3、Github、SQS 等等,中間會經過一個叫 Gateway(新版本叫 EventBus) 的組件,更準確地說老版本原來 gateway 的配置功能已經合并到 EventSource 了,EventBus 是新引入的組件,后端默認基于高性能分布式消息中間件 NATS[1] 實現,當然其他中間件比如 Kafka 也是可以的。
這個 EventBus 可以看做是事件的一個消息隊列,消息生產者連接 EvenSource,EventSource 又連接到 Sensor。更詳細地說 EvenSource 把事件發送給 EvenBus,Sensor 會訂閱 EvenBus 的消息隊列,EvenBus 負責把事件轉發到已訂閱該事件的 Sensor 組件,EventSorce 在上圖中沒有體現,具體設計文檔可以參考 Argo-events Enhancement Proposals[2]。
有些人可能會說為什么 EventBus 不直接到 Trigger,中間引入一個 Sensor,這主要是兩個原因,一是為了使事件轉發和處理松耦合,二是為了實現 Trigger 事件的參數化,通過 Sensor 不僅可以實現事件的過濾,還可以實現事件的參數化,比如后面的 Trigger 是創建一個 Kubernetes Pod,那這個 Pod 的 metadata、env 等,都可以根據事件內容進行填充。
Sensor 組件注冊關聯了一個或者多個觸發器,這些觸發器可以觸發 AWS Lambda 事件、Argo Workflow 事件、Kubernetes Objects 等,通俗簡單地說,可以執行 Lambda 函數,可以動態地創建 Kubernetes 的對象或者創建前面的介紹的 Workflow。
還記得前面介紹的 Argo Rollout 嗎,我們演示了手動 promote 實現應用發布或者回滾,通過 Argo Event 就可以很完美地和測試平臺或者 CI/CD 事件結合起來,實現自動應用自動發布或者回滾。
2、一個簡單的 Webhook 例子
關于 Argo Event 的部署非常簡單,直接通過 kubecl apply 或者 helm 均可,可以參考文檔 Installation[3],這里不再贅述。
Argo Event 部署完成后注意還需要部署 EventBus,官方推薦使用 NATS 中間件,文檔中有部署 NATS stateful 的文檔。
接下來我們以一個最簡單的 Webhook 事件為例,從而了解 Argo Event 的幾個組件功能以及用法。
首先按照前面的介紹,我們需要先定義 EventSource:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: webhook
spec:
service:
ports:
- port: 12000
targetPort: 12000
webhook:
webhook_example:
port: 12000
endpoint: /webhook
method: POST
這個 EventSource 定義了一個 webhook webhook_example,端口為 12000,路徑為 /webhook,一般 Webhook 為 POST 方法,因此該 Webhhok 處理器我們配置只接收 POST 方法。
為了把這個 Webhook EventSource 暴露,我們還創建了一個 Service,端口也是 12000。
此時我們可以手動 curl 該 Service:
# kubectl get svc -l eventsource-name=webhook
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
webhook-eventsource-svc ClusterIP 10.96.93.24 none 12000/TCP 5m49s
# curl -X POST -d {} 10.96.93.24:12000/webhook
success
當然此時由于沒有注冊任何的 Sensor,因此什么都不會發生。
接下來我們定義 Sensor:
首先在 dependencies 中定義了訂閱的 EventSource 以及具體的 Webhook,由于一個 EventSource 可以定義多個 Webhook,因此必須同時指定 EventSource 和 Webhook 兩個參數。
在 Trigger 中我們定義了對應 Action 為 create 一個 workflow,這個 workflow 的 spec 定義在 resource 中配置。
最后的 parameters 部分定義了 workflow 的參數,這些參數值從 event 中獲取,這里我們會把整個 event 都當作 workflow 的 input。當然你可以通過 dataKey 只汲取 body 部分:dataKey: body.message。
此時我們再次 curl 這個 webhook 事件:
curl -X POST -d {message : HelloWorld!} 10.96.93.24:12000/webhook
此時我們獲取 argo workflow 列表發現新創建了一個實例:
# argo list
NAME STATUS AGE DURATION PRIORITY
webhook-8xt4s Succeeded 1m 18s 0
查看 workflow 輸出如下:
由于我們是把整個 event 作為 workflow input 發過去的,因此 data 內容部分是 base64 編碼,我們可以查看解碼后的內容如下:
{
header : {
Accept : [
*/*
],
Content-Length : [
26
],
Content-Type : [
application/x-www-form-urlencoded
],
User-Agent : [
curl/7.58.0
]
},
body : {
message : HelloWorld!
}
}
從這里我們也可以看出 Event 包含兩個部分,一個是 context,一個是 data,data 中又包含 header 部分以及 body 部分,在 parameters 中可以通過 Key 獲取任意部分內容。
如上的 webhook 觸發是通過手動 curl 的,你可以很容易地在 github 或者 bitbucket 上配置到 webhook 中,這樣一旦代碼有更新就能觸發這個事件了。
3、Kubernetes 觸發 AWS Lambda 函數
前面的例子中的 EventSource 使用了 Webhook,除了 Webhook,Argo Event 還支持很多的 EventSource,比如:
amqp
aws-sns
aws-sqs
github/gitlab
hdfs
kafka
redis
Kubernetes resource
…
Trigger 也同樣支持很多,比如:
aws lambda
amqp
kafka
…
如上官方都提供了非常豐富的例子,可以參考 argo events examples[4]。
這里以 Kubernetes resource 事件源為例,這個事件監聽 Kubernetes 的資源狀態,比如 Pod 創建、刪除等,這里以創建 Pod 為例:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: k8s-resource-demo
spec:
template:
serviceAccountName: argo-events-sa
resource:
pod_demo:
namespace: argo-events
version: v1
resource: pods
eventTypes:
- ADD
filter:
afterStart: true
labels:
- key: app
operation: ==
value: my-pod
如上例子監聽 Pods 的 ADD 事件,即創建 Pod,filter 中過濾只有包含 app=my-pod 標簽的 Pod,特別需要注意的是使用的 serviceaccount argo-events-sa 必須具有 Pod 的 list、watch 權限。
接下來我們使用 AWS Lambda 觸發器,Lambda 函數已經在 AWS 提前創建好:
這個 Lambda 函數很簡單,直接返回 event 本身。
創建 Sensor 如下:
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: aws-lambda-trigger-demo
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: test-dep
eventSourceName: k8s-resource-demo
eventName: pod_demo
triggers:
- template:
name: lambda-trigger
awsLambda:
functionName: hello
accessKey:
name: aws-secret
key: accesskey
secretKey:
name: aws-secret
key: secretkey
namespace: argo-events
region: cn-northwest-1
payload:
- src:
dependencyName: test-dep
dataKey: body.name
dest: name
如上 AWS access key 和 access secret 需要提前放到 aws-secret 中。
此時我們創建一個新的 Pod my-pod:
apiVersion: v1
kind: Pod
metadata:
labels:
app: my-pod
name: my-pod
spec:
containers:
- image: nginx
name: my-pod
dnsPolicy: ClusterFirst
restartPolicy: Always
當 Pod 啟動后,我們發現 AWS Lambda 函數被觸發執行:
4、event filter
前面的例子中 webhook 中所有的事件都會被 sensor 觸發,我們有時不需要處理所有的事件,Argo Event 支持基于 data 以及 context 過濾,比如我們只處理 message 為 hello 或者為 hey 的事件,其他消息忽略,只需要在原來的 dependencies 中 test-dep 增加 filter 即可:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: webhook_example
filters:
- name: data-filter
data:
- path: body.message
type: string
value:
- hello
- hey
filter 指定了基于 data 過濾,過濾的字段為 body.message,匹配的內容為 hello、hey。
5、trigger policy
trigger policy 主要用來判斷最后觸發器執行的結果是成功還是失敗,如果是創建 Kubernetes 資源比如 Workflow,可以根據 Workflow 最終狀態決定這個 Trigger 的執行結果,而如果是觸發一個 HTTP 或者 AWS Lambda,則需要自定義 policy status。
awsLambda:
functionName: hello
accessKey:
name: aws-secret
key: accesskey
secretKey:
name: aws-secret
key: secretkey
namespace: argo-events
region: us-east-1
payload:
- src:
dependencyName: test-dep
dataKey: body.message
dest: message
policy:
status:
allow:
- 200
- 201
如上表示當 AWS Lambda 返回 200 或者 201 時表示 Trigger 成功。
以上就是關于“Argo Event 云原生的事件驅動架構怎么用”這篇文章的內容,相信大家都有了一定的了解,希望丸趣 TV 小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注丸趣 TV 行業資訊頻道。