发现filebeat给logstash,logstash处理后给es。当前架构是2个filebeat节点发送数据给一台logstash。logstash给3台服务器的es集群。
 
问题是:当filebeat给logstash的数据量相对较小时,logstash没有积压队列,filebeat传来给logstash,再给数据给es,没有积压的时候,都是正常的,没有重复数据也。数据量大概在20条/s左右。
当增加日志量到100条/s时,logstash明显有3000条队列的积压。然后运行一段时间,es里查询发现有重复数据了。重复数据在日志文件里对比过了,日志里就1条,es里查到了7条,大部分数据都有重复的情况。
 
下面是logstash的event情况:
}[root@elk2 ~]# curl -s localhost:9600/_node/stats/events?pretty=true
{
"host" : "elk2",
"version" : "6.2.2",
"http_address" : "127.0.0.1:9600",
"id" : "e25829e3-db96-4b7a-ae0e-5ea0c3be8c83",
"name" : "elk2",
"events" : {
"in" : 1505442,
"filtered" : 1503441,
"out" : 1502441,
"duration_in_millis" : 371851898,
"queue_push_duration_in_millis" : 41948714
}
}[root@elk2 ~]#
 
filebeat配置如下:
#=========================== Filebeat prospectors =============================
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.
#检查目标二业务日志
- type: log
# Change to true to enable this prospector configuration.
enabled: true
paths:
- /note/__logs/ess/ess.log
# 探矿者在指定的路径中检查新文件的频率
# 而不会导致Filebeat过于频繁地扫描。默认值:10s。
scan_frequency: 10s
# 定义每个采集器在获取文件时使用的缓冲区大小
harvester_buffer_size: 16384
#单个日志事件可以拥有的最大字节数
#max_bytes之后的所有字节被丢弃并且不发送默认值是10MB
#这对于可能变大的多行日志消息特别有用
max_bytes: 10485760
fields :
myhost : 10.14.125.11
appname : errchy
document_type: businesslog
    
fields_under_root : true
  
drop_fields:
fields: ["beat", "offset"]
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
include_lines: ['^{']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']
#============================= Filebeat modules ===============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["10.14.125.4:5044"]
logstash配置文件:
# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
# pipeline:
# batch:
# size: 125
# delay: 5
#
# Or as flat keys:
#
# pipeline.batch.size: 125
# pipeline.batch.delay: 5
#
# ------------ Node identity ------------
#
# Use a descriptive name for the node:
#
node.name: n-elk4
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
path.data: /var/lib/logstash
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
pipeline.workers: 4
#
# How many events to retrieve from inputs before sending to filters+workers
#
pipeline.batch.size: 500
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#
pipeline.unsafe_shutdown: false
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ Module Settings ---------------
# Define modules here. Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
# - name: MODULE_NAME
# var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
# var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
queue.max_bytes: 4gb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false
# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
# * fatal
# * error
# * warn
# * info (default)
# * debug
# * trace
#
# log.level: info
path.logs: /var/log/logstash
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins:
 
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
 
 
补充:上面的现象说明,数据量少的时候,es每个请求都能接收处理。当数据量大的时候,部分logstash的请求可能就会被es拒绝,而拒绝后logstash就会重发。会不会是logstash请求发送给了es,es超时了导致logstash认为失败重发了。?????不知道我猜的对不对,,,如果是有没有办法解决
 
 
															
																				问题是:当filebeat给logstash的数据量相对较小时,logstash没有积压队列,filebeat传来给logstash,再给数据给es,没有积压的时候,都是正常的,没有重复数据也。数据量大概在20条/s左右。
当增加日志量到100条/s时,logstash明显有3000条队列的积压。然后运行一段时间,es里查询发现有重复数据了。重复数据在日志文件里对比过了,日志里就1条,es里查到了7条,大部分数据都有重复的情况。
下面是logstash的event情况:
}[root@elk2 ~]# curl -s localhost:9600/_node/stats/events?pretty=true
{
"host" : "elk2",
"version" : "6.2.2",
"http_address" : "127.0.0.1:9600",
"id" : "e25829e3-db96-4b7a-ae0e-5ea0c3be8c83",
"name" : "elk2",
"events" : {
"in" : 1505442,
"filtered" : 1503441,
"out" : 1502441,
"duration_in_millis" : 371851898,
"queue_push_duration_in_millis" : 41948714
}
}[root@elk2 ~]#
filebeat配置如下:
#=========================== Filebeat prospectors =============================
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.
#检查目标二业务日志
- type: log
# Change to true to enable this prospector configuration.
enabled: true
paths:
- /note/__logs/ess/ess.log
# 探矿者在指定的路径中检查新文件的频率
# 而不会导致Filebeat过于频繁地扫描。默认值:10s。
scan_frequency: 10s
# 定义每个采集器在获取文件时使用的缓冲区大小
harvester_buffer_size: 16384
#单个日志事件可以拥有的最大字节数
#max_bytes之后的所有字节被丢弃并且不发送默认值是10MB
#这对于可能变大的多行日志消息特别有用
max_bytes: 10485760
fields :
myhost : 10.14.125.11
appname : errchy
document_type: businesslog
fields_under_root : true
drop_fields:
fields: ["beat", "offset"]
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
include_lines: ['^{']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']
#============================= Filebeat modules ===============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["10.14.125.4:5044"]
logstash配置文件:
# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
# pipeline:
# batch:
# size: 125
# delay: 5
#
# Or as flat keys:
#
# pipeline.batch.size: 125
# pipeline.batch.delay: 5
#
# ------------ Node identity ------------
#
# Use a descriptive name for the node:
#
node.name: n-elk4
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
path.data: /var/lib/logstash
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
pipeline.workers: 4
#
# How many events to retrieve from inputs before sending to filters+workers
#
pipeline.batch.size: 500
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#
pipeline.unsafe_shutdown: false
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ Module Settings ---------------
# Define modules here. Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
# - name: MODULE_NAME
# var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
# var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
queue.max_bytes: 4gb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false
# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
# * fatal
# * error
# * warn
# * info (default)
# * debug
# * trace
#
# log.level: info
path.logs: /var/log/logstash
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins:
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
帮忙看下是哪的原因导致的重复数据。。。
补充:上面的现象说明,数据量少的时候,es每个请求都能接收处理。当数据量大的时候,部分logstash的请求可能就会被es拒绝,而拒绝后logstash就会重发。会不会是logstash请求发送给了es,es超时了导致logstash认为失败重发了。?????不知道我猜的对不对,,,如果是有没有办法解决

 
	
5 个回复
rockybean - Elastic Certified Engineer, ElasticStack Fans,公众号:ElasticTalk
赞同来自: muou
你这里没有提你测试机器的配置,比如 es 的指标你要列一下,是否有 bulk 队列堆积的现象、gc 情况如何等等。我猜是你 es 性能跟不上导致的。400条/秒不算很大的压力
wajika
赞同来自: muou
muou
赞同来自:
2018-06-28T11:34:27.838+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:55212->10.14.125.4:5044: i/o timeout
2018-06-28T11:34:27.838+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:55212->10.14.125.4:5044: i/o timeout
2018-06-28T11:35:00.922+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:56237->10.14.125.3:5044: i/o timeout
2018-06-28T11:35:00.923+0800 ERROR logstash/async.go:235 Failed to publish events caused by: read tcp 10.14.125.11:56237->10.14.125.3:5044: i/o timeout
muou
赞同来自:
补充下:发现不光重复数据,发现还有丢失数据的情况。当日志产生量400条/秒,就会有这样的问题,100条每秒就没事。
niumore
赞同来自: