共計 3846 個字符,預計需要花費 10 分鐘才能閱讀完成。
這期內容當中丸趣 TV 小編將會給大家帶來有關 heka 從 kalka 中讀取數據的示例分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
heka 從 kalka 中讀取數據。
配置:
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092][RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder
上述配置只有從 kalfka 中讀取數據并顯示到 console,寫到 kalfka 中數據,
結果
:Timestamp: 2016-07-21 09:39:46.342093657 +0000 UTC
:Type: heka.kafka
:Hostname: master
:Pid: 0
:Uuid: 501b0a0e-63a9-4eee-b9ca-ab572c17d273
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running1 , userid : 12 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T 20:40:05.509926234Z , severity :1}
:EnvVersion:
:Severity: 7
:Fields:
| name: Key type:bytes value:
| name: Topic type:string value: test
| name: Partition type:integer value:0
| name: Offset type:integer value:8
讀取出來的數據放到了 payload 中,而 fileds 中存放了讀取 kalkfa 中的一些信息。那么可以使用 jsondecoder 進行解析。
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua
[JsonDecoder.config]
type = artemis
payload_keep = true
map_fields = true
Severity = severity
[RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder
結果如下:
:Timestamp: 2016-07-21 09:42:34 +0000 UTC
:Type: artemis
:Hostname: master
:Pid: 0
:Uuid: 3965285c-70ac-4069-a1a3-a9bcf518d3e8
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running2 , userid : 11 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T 20:40:05.509926234Z , severity :1}
:EnvVersion:
:Severity: 1
:Fields:
| name: time type:string value: 2015-05-06T 20:40:05.509926234Z
| name: msg type:string value: Start Request
| name: userid type:string value: 11
| name: event type:string value: artemis.web.ensure-running2
| name: extra.workspace-id type:string value: cN907xLngi
經過 decoder 解析之后,fileds 發生了改變,但是我們可以看到 Logger 顯示的還是 KafkaInputExample,說明數據不是 decoder 產生,而是 Input 產生,只不過使用了 decoder 進行了解析,重寫改寫了 fields 而已。
接下來,把數據錄入都 es 中吧。
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua
[JsonDecoder.config]
type = artemis
payload_keep = true
map_fields = true
Severity = severity
[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
[ESJsonEncoder.field_mappings]
Timestamp = @timestamp
Severity = level
[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1
導入到 es 中,也需要 json,所以使用 ESJsonEncoder,同時指定索引名字和類型。執行結果如下,
可以看到,除了 heka 中元數據 field 之外,還有 JsonDecoder 生成 field 啊,其實是截取 JsonDecoder 中的 fields 屬性中拿出。注意,Payload 不解析。
:Fields:
| name: time type:string value: 2015-05-06T 20:40:05.509926234Z
| name: msg type:string value: Start Request
| name: userid type:string value: 11
| name: event type:string value: artemis.web.ensure-running2
| name: extra.workspace-id type:string value: cN907xLngi
這些 field 當然隨著數據不同而不同,那么稱之為 dynamic fileds。
入 es 的時候,可以指定提取哪些 dynamic fields,
fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]
只要使用 dynamic_fileds,就必須要在 fields 中指定 DynamicFields。
如果沒有 dynamic_fileds,那么 fields 只能列舉幾個固定的屬性,參照官方文檔即可。
完成的列子:
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua
[JsonDecoder.config]
type = artemis
payload_keep = true
map_fields = true
Severity = severity
[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]
raw_bytes_fields=[Payload]
[ESJsonEncoder.field_mappings]
Timestamp = @timestamp
Severity = level
[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1
結果如下,
上述就是丸趣 TV 小編為大家分享的 heka 從 kalka 中讀取數據的示例分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注丸趣 TV 行業資訊頻道。