久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

heka從kalka中讀取數據的示例分析

160次閱讀
沒有評論

共計 3846 個字符,預計需要花費 10 分鐘才能閱讀完成。

這期內容當中丸趣 TV 小編將會給大家帶來有關 heka 從 kalka 中讀取數據的示例分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

heka 從 kalka 中讀取數據。

配置:

[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092][RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder

上述配置只有從 kalfka 中讀取數據并顯示到 console,寫到 kalfka 中數據,

結果

:Timestamp: 2016-07-21 09:39:46.342093657 +0000 UTC
:Type: heka.kafka
:Hostname: master
:Pid: 0
:Uuid: 501b0a0e-63a9-4eee-b9ca-ab572c17d273
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running1 , userid : 12 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T    20:40:05.509926234Z , severity :1}
:EnvVersion: 
:Severity: 7
:Fields:
    | name: Key type:bytes value:
    | name: Topic type:string value: test
    | name: Partition type:integer value:0
    | name: Offset type:integer value:8

讀取出來的數據放到了 payload 中,而 fileds 中存放了讀取 kalkfa 中的一些信息。那么可以使用 jsondecoder 進行解析。

[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua

        [JsonDecoder.config]
        type = artemis
        payload_keep = true
        map_fields = true
        Severity = severity

[RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder

結果如下:

:Timestamp: 2016-07-21 09:42:34 +0000 UTC
:Type: artemis
:Hostname: master
:Pid: 0
:Uuid: 3965285c-70ac-4069-a1a3-a9bcf518d3e8
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running2 , userid : 11 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T    20:40:05.509926234Z , severity :1}
:EnvVersion: 
:Severity: 1
:Fields:
    | name: time type:string value: 2015-05-06T    20:40:05.509926234Z
    | name: msg type:string value: Start Request
    | name: userid type:string value: 11
    | name: event type:string value: artemis.web.ensure-running2
    | name: extra.workspace-id type:string value: cN907xLngi

經過 decoder 解析之后,fileds 發生了改變,但是我們可以看到 Logger 顯示的還是 KafkaInputExample,說明數據不是 decoder 產生,而是 Input 產生,只不過使用了 decoder 進行了解析,重寫改寫了 fields 而已。

接下來,把數據錄入都 es 中吧。
[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua

        [JsonDecoder.config]
        type = artemis
        payload_keep = true
        map_fields = true
        Severity = severity

[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
    [ESJsonEncoder.field_mappings]
    Timestamp = @timestamp
    Severity = level

[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1

導入到 es 中,也需要 json,所以使用 ESJsonEncoder,同時指定索引名字和類型。執行結果如下,

可以看到,除了 heka 中元數據 field 之外,還有 JsonDecoder 生成 field 啊,其實是截取 JsonDecoder 中的 fields 屬性中拿出。注意,Payload 不解析。

:Fields:
    | name: time type:string value: 2015-05-06T    20:40:05.509926234Z
    | name: msg type:string value: Start Request
    | name: userid type:string value: 11
    | name: event type:string value: artemis.web.ensure-running2
    | name: extra.workspace-id type:string value: cN907xLngi

這些 field 當然隨著數據不同而不同,那么稱之為 dynamic fileds。

入 es 的時候,可以指定提取哪些 dynamic fields,

fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]

只要使用 dynamic_fileds,就必須要在 fields 中指定 DynamicFields。

如果沒有 dynamic_fileds,那么 fields 只能列舉幾個固定的屬性,參照官方文檔即可。

完成的列子:

[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua

        [JsonDecoder.config]
        type = artemis
        payload_keep = true
        map_fields = true
        Severity = severity

[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]

raw_bytes_fields=[Payload]
    [ESJsonEncoder.field_mappings]
    Timestamp = @timestamp
    Severity = level

[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1

結果如下,

上述就是丸趣 TV 小編為大家分享的 heka 從 kalka 中讀取數據的示例分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計3846字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 新蔡县| 马公市| 平舆县| 赤城县| 宝应县| 榆林市| 南木林县| 新平| 西昌市| 绥化市| 白朗县| 怀化市| 东丰县| 喀什市| 中方县| 衡水市| 齐河县| 循化| 收藏| 蒙阴县| 贺州市| 日喀则市| 霍城县| 固阳县| 桓台县| 灵山县| 天水市| 湾仔区| 二手房| 广宗县| 吴堡县| 炎陵县| 青浦区| 黄浦区| 郯城县| 新源县| 新建县| 灵台县| 灵璧县| 得荣县| 武安市|