是时候用 ES 拯救发际线啦

Day 9 - 动手实现一个自定义beat

Advent | 作者 Xinglong | 发布于2018年12月09日 | | 阅读数:3927

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9

下面的工具会提供python本身需要依赖的一些native包

yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y

安装python

wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install

rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python

安装工具包 distribute, setuptools, pip

cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4  && python setup.py install
cd ~/python/distribute-0.7.3  && python setup.py install

安装cookiecutter

pip install --user cookiecutter

安装cookiecutter所依赖的工具

pip install backports.functools-lru-cache
pip install six
pip install virtualenv

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到

代码实现

需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat

  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong

make setup

到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping

1 _meta/beat.yml

这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑

[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "."

2 config/config.go

这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations

package config

import "time"

type Config struct {
    Period time.Duration `config:"period"`
        Path   string        `config:"path"`
}

var DefaultConfig = Config{
    Period: 1 * time.Second,
        Path:   ".",
}

3 beater/hdfsauditbeat.go

这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater

import (
    "fmt"
    "time"
        "os"
        "io"
        "bufio"
        "strings"
    "github.com/elastic/beats/libbeat/beat"
    "github.com/elastic/beats/libbeat/common"
    "github.com/elastic/beats/libbeat/logp"
    "github.com/elastic/beats/libbeat/publisher"

    "github.com/suxingfate/hdfsauditbeat/config"
)

type Hdfsauditbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
}

// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    bt := &Hdfsauditbeat{
        done: make(chan struct{}),
        config: config,
    }
    return bt, nil
}

func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
    logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")

    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }

        bt.catAudit(bt.config.Path)

        logp.Info("Event sent")
        counter++
    }
}

func (bt *Hdfsauditbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

func (bt *Hdfsauditbeat) catAudit(auditFile string) {
    file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
    if err != nil {
        //fmt.Println("Open file error!", err)
        return
    }
    defer file.Close()

    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        if line == "" {
            return
        }

    timeEnd := strings.Index(line, ",")
        timeString := line[0 :timeEnd]
        tm, _ := time.Parse("2006-01-02 03:04:05", timeString)

        ugiStart := strings.Index(line, "ugi=") + 4
        ugiEnd := strings.Index(line, " (auth")
        ugi := line[ugiStart :ugiEnd]

        cmdStart := strings.Index(line, "cmd=") + 4
        line = line[cmdStart:len(line)]
        cmdEnd := strings.Index(line, " ")
        cmd := line[0 : cmdEnd]

        srcStart := strings.Index(line, "src=") + 4
        line = line[srcStart:len(line)]
        srcEnd := strings.Index(line, " ")
        src := line[0:srcEnd]

        dstStart := strings.Index(line, "dst=") + 4
        line = line[dstStart:len(line)]
        dstEnd := strings.Index(line, " ")
        dst := line[0:dstEnd]

        event := common.MapStr{
                "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
                "ugi":       ugi,
                "cmd":       cmd,
                "src":    src,
                "dst":   dst,
            }
            bt.client.PublishEvent(event)

        if err != nil {
            if err == io.EOF {
                //fmt.Println("File read ok!")
                break
            } else {
                //fmt.Println("Read file error!", err)
                return
            }
        }
    }
}

4 _meat/fields.yml

[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
  title: hdfsauditbeat
  description:
  fields:
    - name: counter
      type: long
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added hdfsaudit
    - name: entrytime
      type: date
    - name: ugi
      type: keyword
    - name: cmd
      type: keyword
    - name: src
      type: keyword
    - name: dst
      type: keyword

测试

首先编译好项目

make update
make

然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "/root/go/hdfs-audit.log"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

output.console:
  pretty: true
#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

开始执行

[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}

结束

使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。


[尊重社区原创,转载请保留或注明出处]
本文地址:http://elasticsearch.cn/article/6182


0 个评论

要回复文章请先登录注册