Easysearch、Elasticsearch 还是 Opensearch,是个问题

filebeat到logstash之间加一层Kafka后的日志格式问题

Logstash | 作者 qwefdrt | 发布于2018年07月13日 | 阅读数:3267

以前filebeat直接到logstash的在Kibana看日志格式是这样的:
{
"_index": "it_mail_cas_iis_2018-07-12_v1",
"_type": "type",
"_id": "it_mail_cas_iis_0_4814375143_0",
"_score": null,
"_source": {
"method": "POST",
"offset": 6895796299,
"input_type": "log",
"source": "D:\\inetpub\\logs\\LogFiles\\W3SVC1\\u_ex180708.log",
"type": "log",
"message": "2018-07-08 11:51:34 172.20.1.40 POST /EWS/Exchange.asmx &CorrelationID=<empty>;&ClientId=WMKSDOSOREQI9RVPBBHQ&cafeReqId=a9f291bb-8b63-47f0-bb6f-ab24864cffad; 443 DIDICHUXING\\zhangyesummer 172.22.112.129 Mac+OS+X/10.12.6+(16G29);+ExchangeWebServices/7.2+(268); - 200 0 0 12",
"tags": [
"iislog",
"beats_input_codec_plain_applied"
],
"@timestamp": "2018-07-13T09:46:10.777Z",
"@version": "1",
"beat": {
"hostname": "BJEXCAS002"
},
"host": "BJEXCAS002",
},
"fields": {
"sinkTime": [
1531475177480
],
"@timestamp": [
1531475170777
]
},
"sort": [
1531475177480
]
}

后来在filebeat和logstash之间加入了一层kafka后日志格式变成这样了:
{
"_index": "kafka-2018.07.13",
"_type": "doc",
"_id": "rckFk2QBD89thZF5A3f9",
"_version": 1,
"_score": null,
"_source": {
"@version": "1",
"message": "{\"@timestamp\":\"2018-07-13T09:33:47.666Z\",\"beat\":{\"hostname\":\"BJEXCAS004\"},\"input_type\":\"log\",\"message\":\"2018-07-13 09:33:45 172.20.1.40 POST /EWS/Exchange.asmx \\u0026CorrelationID=\\u003cempty\\u003e;\\u0026ClientId=BCWHN0UJWTGNMJXZW\\u0026cafeReqId=271691f2-d3ce-4a97-924d-b5551eed643e; 443 DIDICHUXING\\\\lemonswujiawen_i 172.21.130.125 Mac+OS+X/10.12.6+(16G29);+ExchangeWebServices/7.2+(268); - 200 0 0 111\",\"offset\":10648672855,\"source\":\"D:\\\\inetpub\\\\logs\\\\LogFiles\\\\W3SVC1\\\\u_ex180713.log\",\"tags\":[\"iislog\"],\"type\":\"log\"}",
"@timestamp": "2018-07-13T09:42:16.490Z",
"type": "kafka"
},
"fields": {
"@timestamp": [
"2018-07-13T09:42:16.490Z"
]
},
"sort": [
1531474936490
]
}
请问,Kafka传输日志的时候如何保留原格式?
已邀请:

rockybean - Elastic Certified Engineer, ElasticStack Fans,公众号:ElasticTalk

赞同来自: qwefdrt

logstash 从 kafka 读取的时候要做 json 解析
 
codec: "json"

muou

赞同来自:

能不能贴下你的conf看下

要回复问题请先登录注册