以前filebeat直接到logstash的在Kibana看日志格式是这样的:
后来在filebeat和logstash之间加入了一层kafka后日志格式变成这样了:
{
"_index": "it_mail_cas_iis_2018-07-12_v1",
"_type": "type",
"_id": "it_mail_cas_iis_0_4814375143_0",
"_score": null,
"_source": {
"method": "POST",
"offset": 6895796299,
"input_type": "log",
"source": "D:\\inetpub\\logs\\LogFiles\\W3SVC1\\u_ex180708.log",
"type": "log",
"message": "2018-07-08 11:51:34 172.20.1.40 POST /EWS/Exchange.asmx &CorrelationID=<empty>;&ClientId=WMKSDOSOREQI9RVPBBHQ&cafeReqId=a9f291bb-8b63-47f0-bb6f-ab24864cffad; 443 DIDICHUXING\\zhangyesummer 172.22.112.129 Mac+OS+X/10.12.6+(16G29);+ExchangeWebServices/7.2+(268); - 200 0 0 12",
"tags": [
"iislog",
"beats_input_codec_plain_applied"
],
"@timestamp": "2018-07-13T09:46:10.777Z",
"@version": "1",
"beat": {
"hostname": "BJEXCAS002"
},
"host": "BJEXCAS002",
},
"fields": {
"sinkTime": [
1531475177480
],
"@timestamp": [
1531475170777
]
},
"sort": [
1531475177480
]
}
后来在filebeat和logstash之间加入了一层kafka后日志格式变成这样了:
{
"_index": "kafka-2018.07.13",
"_type": "doc",
"_id": "rckFk2QBD89thZF5A3f9",
"_version": 1,
"_score": null,
"_source": {
"@version": "1",
"message": "{\"@timestamp\":\"2018-07-13T09:33:47.666Z\",\"beat\":{\"hostname\":\"BJEXCAS004\"},\"input_type\":\"log\",\"message\":\"2018-07-13 09:33:45 172.20.1.40 POST /EWS/Exchange.asmx \\u0026CorrelationID=\\u003cempty\\u003e;\\u0026ClientId=BCWHN0UJWTGNMJXZW\\u0026cafeReqId=271691f2-d3ce-4a97-924d-b5551eed643e; 443 DIDICHUXING\\\\lemonswujiawen_i 172.21.130.125 Mac+OS+X/10.12.6+(16G29);+ExchangeWebServices/7.2+(268); - 200 0 0 111\",\"offset\":10648672855,\"source\":\"D:\\\\inetpub\\\\logs\\\\LogFiles\\\\W3SVC1\\\\u_ex180713.log\",\"tags\":[\"iislog\"],\"type\":\"log\"}",
"@timestamp": "2018-07-13T09:42:16.490Z",
"type": "kafka"
},
"fields": {
"@timestamp": [
"2018-07-13T09:42:16.490Z"
]
},
"sort": [
1531474936490
]
}
请问,Kafka传输日志的时候如何保留原格式?
2 个回复
rockybean - Elastic Certified Engineer, ElasticStack Fans,公众号:ElasticTalk
赞同来自: qwefdrt
codec: "json"
muou
赞同来自: