logstash(filebeat)重复推送数据问题

作者 muou | 发布于2018年06月26日 | 阅读数:1006

发现filebeat给logstash,logstash处理后给es。当前架构是2个filebeat节点发送数据给一台logstash。logstash给3台服务器的es集群。
 
问题是:当filebeat给logstash的数据量相对较小时,logstash没有积压队列,filebeat传来给logstash,再给数据给es,没有积压的时候,都是正常的,没有重复数据也。数据量大概在20条/s左右。
当增加日志量到100条/s时,logstash明显有3000条队列的积压。然后运行一段时间,es里查询发现有重复数据了。重复数据在日志文件里对比过了,日志里就1条,es里查到了7条,大部分数据都有重复的情况。
 
下面是logstash的event情况:
}[root@elk2 ~]# curl -s localhost:9600/_node/stats/events?pretty=true 
{
  "host" : "elk2",
  "version" : "6.2.2",
  "http_address" : "127.0.0.1:9600",
  "id" : "e25829e3-db96-4b7a-ae0e-5ea0c3be8c83",
  "name" : "elk2",
  "events" : {
    "in" : 1505442,
    "filtered" : 1503441,
    "out" : 1502441,

    "duration_in_millis" : 371851898,
    "queue_push_duration_in_millis" : 41948714
  }
}[root@elk2 ~]# 
 
filebeat配置如下:
#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

#检查目标二业务日志
- type: log
  # Change to true to enable this prospector configuration.
  enabled: true  
  paths:
    - /note/__logs/ess/ess.log

  # 探矿者在指定的路径中检查新文件的频率
  # 而不会导致Filebeat过于频繁地扫描。默认值:10s。
  scan_frequency: 10s
  # 定义每个采集器在获取文件时使用的缓冲区大小
  harvester_buffer_size: 16384

  #单个日志事件可以拥有的最大字节数
  #max_bytes之后的所有字节被丢弃并且不发送默认值是10MB
  #这对于可能变大的多行日志消息特别有用
  max_bytes: 10485760

  fields :
    myhost : 10.14.125.11
    appname : errchy
    document_type: businesslog
    
  fields_under_root : true
  
  drop_fields:
    fields: ["beat", "offset"]


  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ['^DBG']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  include_lines: ['^{']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #exclude_files: ['.gz$']

#============================= Filebeat modules ===============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["10.14.125.4:5044"]


logstash配置文件:
# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
node.name: n-elk4
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
path.data: /var/lib/logstash
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
pipeline.workers: 4
#
# How many events to retrieve from inputs before sending to filters+workers
#
pipeline.batch.size: 500
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#
pipeline.unsafe_shutdown: false
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and 
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of 
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
queue.max_bytes: 4gb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
path.logs: /var/log/logstash
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins:

 
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
 
 
 补充:上面的现象说明,数据量少的时候,es每个请求都能接收处理。当数据量大的时候,部分logstash的请求可能就会被es拒绝,而拒绝后logstash就会重发。会不会是logstash请求发送给了es,es超时了导致logstash认为失败重发了。?????不知道我猜的对不对,,,如果是有没有办法解决
 
 
已邀请:

rockybean - Elastic Certified Engineer, ElasticStack Fans,公众号:ElasticTalk

赞同来自: muou

filebeat->logstash->es 你要确认一个个环节来确认瓶颈所在,一般 filebeat 不会是瓶颈。
你这里没有提你测试机器的配置,比如 es 的指标你要列一下,是否有 bulk 队列堆积的现象、gc 情况如何等等。我猜是你 es 性能跟不上导致的。400条/秒不算很大的压力

wajika

赞同来自: muou

我原来也遇到过logstash吃不消的情况,我们的结构是logstash > redis > logstash *1 >es,后来中间的logstash吃不消,又加了几台,明显好多了

muou

赞同来自:

我是题主,补充下,filebeat日志报这个错误。
 
2018-06-28T11:34:27.838+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:55212->10.14.125.4:5044: i/o timeout
2018-06-28T11:34:27.838+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:55212->10.14.125.4:5044: i/o timeout
2018-06-28T11:35:00.922+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:56237->10.14.125.3:5044: i/o timeout
2018-06-28T11:35:00.923+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:56237->10.14.125.3:5044: i/o timeout
 

muou

赞同来自:

我是题主:
补充下:发现不光重复数据,发现还有丢失数据的情况。当日志产生量400条/秒,就会有这样的问题,100条每秒就没事。

niumore

赞同来自:

上kafka,我们采集量也很大,十几个agent同时采,3个logstash,目前没发现丢失数据。

要回复问题请先登录注册