Logstash 插件

input

一些经常被公用的属性介绍:

  1. file:从文件中读取数据,类似UNIX的 tail-0f
  2. syslog:监听514端口获取syslog的消息,并且解析格式化成 RFC3164.
  3. redis:从redis服务器读取数据,同时使用redis通道和redis列表。Redis通常用作中央Logstash安装中的“代理”,它将Logstash事件从远程Logstash“发件人”中进行排队。
  4. beats:处理Beats发送的事件。

jdbc

jdbc的相关配置说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# jdbc相关文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
jdbc {
#sql_last_value 如果启用了tracking_column,那么这个变量存储的值则是这个字段
# 数据库 必须
jdbc_connection_string => "jdbc:mysql://localhost:3306/sports_data"
# 用户名密码
jdbc_user => "root"
jdbc_password => "123456"
#jdbc_password_filepath
# jar包的位置
jdbc_driver_library => "/Users/pzr/Downloads/logstash/mysql-connector-java-5.0.8-bin.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#statement_filepath => "config-mysql/test02.sql"
statement => "select `id`,`Title`,`Team1`,`Team2`,`MatchDate`,`update_time`,`LeagueType` from livecast_match where `update_time`>:sql_last_value "
schedule => "*/1 * * * *"
#索引的类型
type => "doc"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "/data0/log/syncpoint_table"

#clean_run 默认false,是否先前的执行需要被保护
#columns_charset 默认{} ,columns_charset => { "column0" => "ISO-8859-1" } 设置某个字段的编码
#connection_retry_attempts 默认1,db的重试连接次数
#connection_retry_attempts_wait_time 默认0.5,重试的间隔时间
#jdbc_connection_string 时区,使你的sql自动转换时间
#jdbc_fetch_size #JDBC fetch size. if not provided, respective driver’s default will be used
#jdbc_pool_timeout 默认5,从连接池获取连接的过期时间
#jdbc_validate_connection 连接jdbc之前是否需要检查,默认false
#jdbc_validate_timeout 默认3600,检查连接的频度
#last_run_metadata_path 默认:$HOME/.logstash_jdbc_last_run
#lowercase_column_names: 默认true
#parameters hash类型,eg { "target_id" => "321" }
#record_last_run 默认true,Whether to save state or not in last_run_metadata_path
#sequel_opts
#sql_log_level 默认info,可选:fatal, error, warn, info, debug
#statement eg:"SELECT * FROM MYTABLE WHERE id = :target_id"
#statement_filepath
#tracking_column #The column whose value is to be tracked if use_column_value is set to true
#tracking_column_type #Type of tracking column. Currently only "numeric" and "timestamp"
#use_column_value 默认false,
#jdbc_default_timezone => 'ASIA/BEIJING'

#公用字段
#add_field
#codec
#enable_metric
#id
#tags 默认[],
#type

}

beats

beats有以下种类:

  • Filebeat,
  • Metricbeat,
  • Packetbeat,
  • Winlogbeat,
  • Auditbeat,
  • Heartbeat,
  • Functionbeat。

介绍

这个输入插件允许Logstash从Elastic Beats框架接收事件。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
input {
beats {
port => 5044
}
}

output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#document_type => "%{[@metadata][type]}" 从6.0以后将被移除
}
}

属性

| 参数 | 类型 | 必要 | 备注 |
| :———————————————————————————————————————————————————– | :—————————————————————————————————— | :— | :—————————————————————————————————————————————————— | — | —————- |
| add_hostname | boolean | No | 是否自动读取beat的hostname值到host字段,(废弃)。默认false |
| cipher_suites | array | No | 密码组合配置列表,参数可参考:查看参数 |
| client_inactivity_timeout | number | No | 关闭空闲的客户端在设定的时间之后。默认是60s |
| host | string | No | 如果配置add_hostname为true,那么则自动抓取beat的值 |
| include_codec_tag | boolean | No | 默认true, |
| port | number | Yes | 端口 |
| ssl | boolean | No | 消息传递默认是以文本的形式。可以通过开启ssl认证加密。 |
| ssl_certificate | 路径 | No | 结合ssl |
| ssl_certificate_authorities | array | No | |
| ssl_handshake_timeout | number | No | |
| ssl_key | 路径 | No | |
| ssl_key_passphrase | password | No | |
| ssl_verify_mode | string | No | 其中之一: ["none", "peer", "force_peer"] |
| ssl_peer_metadata | boolean | No | 默认false,[ssl_verify_mode](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats-ssl_verify_mode)为peer | | force_peer才有效 |
| tls_max_version | number | No | 默认1.2,对应的TLS版本 |
| tls_min_version | number | No | 默认1,对应TLS版本 |

公共属性

参数 类型 必要
add_field hash No
codec codec No
enable_metric boolean No
id string No
tags array No
type string No

filter

filter过滤器,可以将input读取的数据进行过滤从而得到你最需要的数据。

filter插件中获取input的值的形式有哪些呢?

  1. “%{my_field}” 获取某个字段的值

  2. “%{[object][my_field]}” 获取某个数组对象的字段值

值得注意的是:如果数据经过input模块之后是不区分大小写的,那么在filter模块获取数据的时候必须用小写(input模块输出的字段)。否则可能会无法取到值。

一些常用的过滤器有:

  1. grok:Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询数据的最佳方法。Logstash内置了120个模式,您很可能会找到一个满足您需求的模式!

  2. mutate:对事件字段执行一般转换。您可以重命名、删除、替换和修改事件中的字段。

  3. drop:完全删除某一个事件,例如 debug事件。

  4. clone:备份事件,可能是添加或者删除字段。

  5. geoip:添加关于IP地址地理位置的信息(在Kibana还显示惊人的图表!)

date filter

详情点击:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html

日期筛选器用于解析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。

参数选项

  1. locale:默认为空字符。类型:string
    使用IETF-BCP47或POSIX语言标记指定要用于日期解析的语言环境。简单的例子有en,en- us表示BCP47, en_US表示POSIX。
    对于解析月份名称(带有MMM的模式)和工作日名称(带有EEE的模式),通常需要设置语言环境。
    如果没有指定,将使用平台默认值,但是对于非英语平台默认值,还将使用英语解析器作为回退机制。

  2. match:类型:array
    eg:

1
2
match => [ "logdate", "MMM dd yyyy HH:mm:ss",
"MMM d yyyy HH:mm:ss", "ISO8601" ]

那么会自动匹配logdate的这么多的格式。

  • ISO8601:可以匹配任何ISO8601的格式,比如:2011-04-19T03:44:01.103Z

  • UNIX:可以匹配float或者int型的以秒为单位的日期格式,例如:1326149001

  • UNIX_MS:匹配以ms为单位的日期格式,比如:1366125117000

  • TAI64N:可以匹配tai64n格式的日期。

那么y M d HH mm ss又是什么意思呢?
y year

  • yyyy 完整格式的年,比如:2018

  • yy 2位数格式的年,比如18

M month

  • M 1位数格式,比如:1表示1月,12表示12月。

  • MM 2位数格式月份,01表示1月,12表示12月。

  • MMM 比如:Jan for January

  • MMMM 比如:January

d day

  • d 1位数格式,比如:1表示1号,而不用01表示。

  • dd 2位数格式,比如:01表示1号。

H hour

  • H 1位数格式,比如 1表示凌晨1点

  • HH 2位数格式,比如 01表示凌晨1点

m minute

  • m 1位数表示,比如 : 1

  • mm 2位数表示,比如:01

s second

  • s 1位数表示,比如:1

  • ss 2位数表示,比如:01

S 每秒最大精度的分数是毫秒(SSS)。除此之外,还附加了0。

  • S 十分之一秒,0表示毫秒数:012 。怎么算的呢?

  • SS 百分之一秒,01表示 01毫秒。

  • SSS 千分之一秒,012表示012毫秒。

Z time zone

  • Z

  • ZZ

  • ZZZ

z time zone names,无法解析
w week of the year

  • w:1位数表示

  • ww:2位数表示

D day of the year
e day of the week(number)
E day of the week(text)

  • E,EE,EEE

  • EEEE

  1. tag_on_failure:类型:array,默认值:[“_dateparsefailure”]
    如果匹配失败则追加值到tags字段

  2. target:类型string,默认:”@timestamp”
    存储匹配的值到这个配置的字段中,默认是更新@timestamp 这个字段

  3. timezone:类型:string,无默认值 相关的配置项查看

公共配置项

  1. add_field

  2. add_tag

  3. enable_metric

  4. id

  5. periodic_flush

  6. remove_field

  7. remove_tag

mutate filter

详情查看:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

介绍:可以帮助你重命名、移除,替换或者修改字段。

属性

coerce

类型hash,无默认值。给字段设置一个默认值如果这个字段存在但是为null

1
2
3
4
5
6
filter {
mutate {
# Sets the default value of the 'field1' field to 'default_value'
coerce => { "field1" => "default_value" }
}
}

rename

类型hash,无默认值。重命名一个或者多个字段。

1
2
3
4
5
6
filter {
mutate {
# Renames the 'HOSTORIP' field to 'client_ip'
rename => { "HOSTORIP" => "client_ip" }
}
}

update

类型hash,无默认值。更新一个已有的字段值,如果这个字段不存在不操作。

1
2
3
4
5
filter {
mutate {
update => { "sample" => "My new message" }
}
}

replace

类型hash,无默认值。替换一个字段的值。可以用%{foo}获取其他的值。

1
2
3
4
5
filter {
mutate {
replace => { "message" => "%{source_host}: My new message" }
}
}

convert

类型:hash,无默认值,数据类型转换。如果这个字段是一个数组,那么它的所有成员都会被转换。如果是一个hash,那么不会变。
类型有:

  • integer:
    字符串:eg “1000”=>1000, “1000.01”=>1000 小数点会被舍去
    浮点数:eg 3.99=>3 , -2.7=>-2
    boolean:true=>1, false=>0

  • integer_eu:和integer相同,除了字符串支持点分隔符和小数。(e.g., "1.000" produces an integer with value of one thousand)

  • float:
    integer:转成浮点型
    字符串:”1000.5”=>1000.5
    boolean:true=>1.0, false=>0.0

  • float_eu:和float相同,除了字符串支持点分隔符和小数。(e.g., "1.000,5" produces an integer with value of one thousand and one half)

  • string:所有类型都可以转成编码格式为utf-8的字符串。

  • boolean:
    字符串:”true”, “t”, “yes”, “y”, “1” 都被转成true。”false”,”f”,”no”,”n”,”0” 都被转成false
    空字符串转成false
    其他的一概不被转换,保留原值且会输出警告日志。

gsub

类型:array,无默认值。根据字段值匹配正则表达式,并用替换字符串替换所有匹配。只支持字符串或字符串数组字段。对于其他类型的领域将不采取行动。

1
2
3
4
5
6
7
8
9
10
11
12
13
#此配置接受每个字段/替换包含3个元素的数组。
#注意要转义配置文件中的任何反斜杠。
filter {
mutate {
gsub => [
# replace all forward slashes with underscore
"fieldname", "/", "_",
# replace backslashes, question marks, hashes, and minuses
# with a dot "."
"fieldname2", "[\\?#-]", "."
]
}
}

uppercase

类型array,无默认值。将字符串转成大写。

1
2
3
4
5
filter {
mutate {
uppercase => [ "fieldname" ]
}
}

capitalize

类型array,无默认值。将字符串转换为大写的等效字符串。

1
2
3
4
5
filter {
mutate {
capitalize => [ "fieldname" ]
}
}

lowercase

类型array,无默认值。将字符串转成小写

1
2
3
4
5
filter {
mutate {
lowercase => [ "fieldname" ]
}
}

strip

类型array,无默认值。去除头部和尾部的空格。

1
2
3
4
5
filter {
mutate {
strip => ["field1", "field2"]
}
}

remove

在官方文档里面的枚举中有,但是却没有详细的记录。在线上配置中也提示不存在remove命令。可以用公用配置remove_field 替代。

split

类型hash,无默认值。将一个字符串切割成数组。只对字符串起作用。

1
2
3
4
5
filter {
mutate {
split => { "fieldname" => "," }
}
}

join

类型hash,无默认值。用分隔符连接数组。对非数组字段不做任何操作。

1
2
3
4
5
filter {
mutate {
join => { "fieldname" => "," }
}
}

merge

类型hash,无默认值。合并2个数组字段或者hash字段。2个字符串会自动传承两个元素的数组。

1
2
3
4
5
6
7
8
9
#`array` + `string` will work
#`string` + `string` will result in an 2 entry array in `dest_field`
#`array` and `hash` will not work

filter {
mutate {
merge => { "dest_field" => "added_field" }
}
}

copy

类型hash,无默认值。复制一个已有字段到另外一个字段。旧的字段会被重写。

1
2
3
4
5
filter {
mutate {
copy => { "source_field" => "dest_field" }
}
}

示例:

1
2
3
4
5
6
7
8
9
10
filter {
mutate {
split => ["hostname", "."]
add_field => { "shortHostname" => "%{hostname[0]}" }
}

mutate {
rename => ["shortHostname", "hostname" ]
}
}

公用属性

Setting Input type Required
add_field hash No
add_tag array No
enable_metric boolean No
id string No
periodic_flush boolean No
remove_field array No
remove_tag array No

jdbc_streaming filter

介绍

这个过滤器执行一个SQL查询,并将结果集存储在指定为target的字段中。它将结果集存储在本地的LRU缓存中。

1
2
3
4
5
6
7
8
9
10
11
12
filter {
jdbc_streaming {
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => ""jdbc:mysql://localhost:3306/mydatabase"
jdbc_user => "me"
jdbc_password => "secret"
statement => "select * from WORLD.COUNTRY WHERE Code = :code"
parameters => { "code" => "country_code"}
target => "country_details"
}
}

属性

Setting Input type Required 默认值 介绍
cache_expiration number No 5.0s 缓存过期时间
cache_size number No 500 缓存中最大的存储数量,最久未使用的将被移除
default_hash hash No {} 定义一个默认对象,当查找不能返回匹配的行时使用。确保该对象的键名与语句中的列匹配
jdbc_connection_string string Yes jdbc的连接句柄:jdbc:mysql://localhost:3306/sports_data
jdbc_driver_class string Yes jdbc驱动:com.mysql.jdbc.Driver
jdbc_driver_library url No jdbc的jar包:~/mysql-connector-java-5.0.8-bin.jar
jdbc_password password No 数据库密码
jdbc_user string No 数据库用户名
jdbc_validate_connection boolean No false 连接池分配连接之前校验
jdbc_validation_timeout number No 3600s 多久检验一次数据库连接
parameters hash No {} { “id” => “id_field” }
statement string Yes sql语句
tag_on_default_use array No [“_jdbcstreamingdefaultsused”] 如果没有查找到数据并且启用了默认的值,则追加输出到tags
tag_on_failure array No [“_jdbcstreamingfailure”] sql查询错误输出到tags
target string Yes 定义新的字段或者重写旧的字段
use_cache boolean No true 是否启用缓存

公用属性

Setting Input type Required
add_field hash No
add_tag array No
enable_metric boolean No
id string No
periodic_flush boolean No
remove_field array No
remove_tag array No

用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
jdbc_streaming {
cache_expiration => 3600
jdbc_driver_library => "/Users/pzr/Downloads/logstash/mysql-connector-java-5.0.8-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/sports_data"
jdbc_user => "root"
jdbc_password => "123456"
statement => "select `name`,`name_en` from lottery_league WHERE `lottery_id` = :lid"
parameters => { "lid" => "[leaguetype]"}
target => "league"
add_field => {
"ln" => "%{[league][0][name]}"
"ln_en" => "%{[league][0][name_en]}"
}
remove_field => ["league"]
}

要点:

  1. 在statement中的查询sql中需要传入一个变量lottery_id, 而这个lottery_id 是从input模块中获取的leaguetype字段的值。需要注意的是这里的赋值方式是: { “lid” => “[leaguetype]”}

  2. 获取sql查询的结果值,这里可以看到是用的:%{[league][0][name]}。league是从sql查询的结果缓存到这个字段,并且是一个对象数组。因此通过%{[league][0][name]}这种写法获取到league这个对象数组中的第一个对象的name属性。

  3. target属性的用法:在很多的模块中都可以看到这个属性,通过这个例子也可以知道target的用法。它可以结合公用属性add_field给事件添加一些字段或者删除一些不必要的字段。

output

logstash的最后一个阶段,一个事件可以经过多个output,如果所有的output都执行结束,那么这个事件则结束。

一些常用的插件

  1. elasticsearch:将数据写入到elasticsearch

  2. file:将数据写入到文件

  3. graphite:将数据用图像显示

  4. statsd:侦听通过UDP发送的统计信息,如计数器和计时器,并将聚合发送到一个或多个可插入后端服务

Elasticsearch

参数配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# elasticsearch 模块:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
# 将input的值,输出到elasticsearch模块配置中
elasticsearch {
#action 默认index,可选:delete,create,update,还可以用传参的形式如: %{[foo]}
#hosts => [//127.0.0.1] 可以:["127.0.0.1:9200","127.0.0.2:9200"] 负载均衡
hosts => "127.0.0.1:9200"
# index名
index => "livecast_match2"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
#document_type 已经废弃
# 如果不存在就插入,否则更新
doc_as_upsert => true
# 过滤一些错误码不作提示
failure_type_logging_whitelist => [409]
#healthcheck_path 检测服务是否正常响应,但是不知道填什么值
#http_compression => false 是否可以压缩
#keystore 证书
#keystore_password 证书密码
#manage_template => true 默认是true,理解:索引选用的模板,如果自己定义了index并且以field创建,那么建议设置为false。
#parameters hash类型。 以键值对的形式配置查询,并且作用于hosts的配置。如果hosts其中已有配置,那么追加条件。
#parent 默认值是:nil,
#password 对于一些需要检验的elasticsearch来说,需要密码。
#pileline 选择你想执行事件的管道 可以配置值为:"%{INGEST_PIPELINE}"
#pool_max 数值类型,默认是1000. 最大可以开启的output模块的连接数。 如果设置太小那么将会导致output连接快速关闭||创建。
#pool_max_per_route 默认是100,每个终端可设置的最大output连接数
#proxy uri类型,设置http代理
#resurrect_delay 默认是5,短时间内可以尝试的连接次数
#retry_initial_interval 默认2, 设置首次短时间内可以重连的次数, 不超过retry_max_interval 的双倍
#retry_max_interval 默认是64,设置短时间内可以重连的次数
#retry_on_conflict 默认1,索引的更改一般都是:删除,创建,同步。但是我们也可以update,但其实内部也是做了delete和insert。然后在update的时候如果有其他进程也在修改,则可能产生冲突。
#routing string类型,可以作用到所有进程。
#script 类型string,选用脚本在update 模式
#script_lang 设置script的语言。在ES5.0默认是:painless,如果在6或者更高版本,用了indexed script那么必须设置为空“”
#.....省略部分参数
#timeout 默认60s,短时间内请求ES,如果超时那么重试
#公用的参数
#codec 默认plain,格式化输出数据
#enable_metric 默认是true,记录日志?
#id 强烈建议设置id,默认是空,如果为空那么系统会自动设置一个值。如果在多个plugins时就很有必要了。
}

死信队列

死信队列特性目前只支持用于elasticsearch输出。此外,死信队列仅在响应代码为400或404的情况下使用,这两种情况都表示无法重试事件。对额外输出的支持将在Logstash插件的未来版本中提供。在配置logstash以使用此特性之前,请参阅输出插件文档,以验证插件是否支持死信队列特性。
启用私信队列:

1
2
dead_letter_queue.enable: true
path.dead_letter_queue: "path/to/data/dead_letter_queue"

codec

codec是一个最基础的流过滤器,可以最为input和output的一部分。可以很简易的将input或者output的数据输出格式化成其他的格式,如:json,msgpack,plain。

  1. json:格式化成json格式
  2. multiline:将多行文本事件(如java exception和stacktrace消息)合并到单个事件中。

input-filter-output

  1. 从twitter获取数据并且输出到文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
twitter {
consumer_key => "enter_your_consumer_key_here"
consumer_secret => "enter_your_secret_here"
keywords => ["cloud"]
oauth_token => "enter_your_access_token_here"
oauth_token_secret => "enter_your_access_token_secret_here"
}
beats {
port => "5044"
}
}
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
file {
path => "/path/to/target/file"
}
}

数据转换

核心操作

因为filter的插件有200多个,所以选择哪一个最适合是一件头疼的事情。因此这里会介绍几种常用的操作。

1、date filter

1
2
3
4
5
filter {
date {
match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
}
}

2、drop filter

1
2
3
4
5
filter {
if [loglevel] == "debug" {
drop { }
}
}

3、fingerprint filter

1
2
3
4
5
6
7
8
filter {
fingerprint {
source => ["IP", "@timestamp", "message"]
method => "SHA1"
key => "0123"
target => "[@metadata][generated_id]"
}
}

4、mutate filter

1
2
3
4
5
6
7
8
9
10
11
filter {
mutate { #你可以重命名、移除、替换、修改字段
rename => { "HOSTORIP" => "client_ip" }
}
}

filter {
mutate { #去除空格
strip => ["field1", "field2"]
}
}

5、ruby filter

1
2
3
4
5
filter {
ruby {
code => "event.cancel if rand <= 0.90"
}
}

数据反序列化

数据反序列化

1、从kafka中反序列化

1
2
3
4
5
6
7
8
9
10
input {
kafka {
codec => {
avro => {
schema_uri => "/tmp/schema.avsc"
}
}
}
}
...

2、csv filter

1
2
3
4
5
6
filter {
csv {
separator => ","
columns => [ "Transaction Number", "Date", "Description", "Amount Debit", "Amount Credit", "Balance" ]
}
}

3、fluent codec

1
2
3
4
5
6
input {
tcp { #解析 fluent-logger-ruby 的日志
codec => fluent
port => 4000
}
}

4、json codec

1
2
3
4
5
input {
file {
path => "/path/to/myfile.json"
codec =>"json"
}

5、protobuf codec

1
2
3
4
5
6
7
8
9
10
input
kafka {
zk_connect => "127.0.0.1"
topic_id => "your_topic_goes_here"
codec => protobuf {
class_name => "Animal::Unicorn"
include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb']
}
}
}

6、xml filter

1
2
3
4
5
filter {
xml {
source => "message"
}
}

取出字段并且解析

1、dissert filter
使用分隔符将非结构化事件数据提取到字段中。dissect过滤器不使用正则表达式,而且非常快。但是,如果数据的结构因行而异,则grok过滤器更适合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#message
Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...

#filter
filter {
dissect {
mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }
}
}

#result
{
"msg" => "Starting system activity accounting tool...",
"@timestamp" => 2017-04-26T19:33:39.257Z,
"src" => "localhost",
"@version" => "1",
"host" => "localhost.localdomain",
"pid" => "1",
"message" => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...",
"type" => "stdin",
"prog" => "systemd",
"ts" => "Apr 26 12:20:02"
}

2、kv filter

1
2
3
4
5
6
7
8
9
10
11
#message
ip=1.2.3.4 error=REFUSED

#filter
filter {
kv { }
}

#result
ip: 1.2.3.4
error: REFUSED

3、grok filter
将非结构化事件数据解析为字段。该工具适用于syslog日志、Apache和其他webserver日志、MySQL日志,一般来说,适用于任何为人类而不是计算机使用而编写的日志格式。Grok通过将文本模式组合成与日志相匹配的内容来工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#message
55.3.244.1 GET /index.html 15824 0.043

#filter
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}

#result
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

Grok Debugger

丰富数据

1、dns filter

1
2
3
4
5
6
filter {
dns { #将IP替换成为域名
reverse => [ "source_host" ]
action => "replace"
}
}

2、elasticsearch filter
将Elasticsearch中以前日志事件的字段复制到当前事件。
下面的配置显示了如何使用这个过滤器的完整示例。每当Logstash接收到“end”事件时,它都会使用这个Elasticsearch过滤器根据某个操作标识符查找匹配的“start”事件。然后,它将@timestamp字段从“开始”事件复制到“结束”事件的新字段中。最后,使用date过滤器和ruby过滤器的组合,示例中的代码计算两个事件之间的时间间隔(以小时为单位)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if [type] == "end" {
elasticsearch {
hosts => ["es-server"]
query => "type:start AND operation:%{[opid]}"
fields => { "@timestamp" => "started" }
}
date {
match => ["[started]", "ISO8601"]
target => "[started]"
}
ruby {
code => 'event.set("duration_hrs", (event.get("@timestamp") - event.get("started")) / 3600) rescue nil'
}
}

3、geoip filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
filter {
geoip {
source => "clientip"
}
}

#result
"geoip" => {
"timezone" => "Europe/Moscow",
"ip" => "83.149.9.216",
"latitude" => 55.7522,
"continent_code" => "EU",
"city_name" => "Moscow",
"country_code2" => "RU",
"country_name" => "Russia",
"dma_code" => nil,
"country_code3" => "RU",
"region_name" => "Moscow",
"location" => [
[0] 37.6156,
[1] 55.7522
],
"postal_code" => "101194",
"longitude" => 37.6156,
"region_code" => "MOW"
}

4、jdbc_static filter
使用预先从远程数据库加载的数据丰富事件。
下面的示例从远程数据库中获取数据,将其缓存到本地数据库中,并使用查找来用本地数据库中缓存的数据丰富事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
filter {
jdbc_static {
loaders => [ #将远程的DB数据通过select获取并且缓存到本地库表servers
{
id => "remote-servers"
query => "select ip, descr from ref.local_ips order by ip"
local_table => "servers"
},
{
id => "remote-users"
query => "select firstname, lastname, userid from ref.local_users order by userid"
local_table => "users"
}
]
local_db_objects => [ #定义用于构建本地数据库结构的列、类型和索引。列名和类型应该与外部数据库匹配。
{
name => "servers"
index_columns => ["ip"]
columns => [
["ip", "varchar(15)"],
["descr", "varchar(255)"]
]
},
{
name => "users"
index_columns => ["userid"]
columns => [
["firstname", "varchar(255)"],
["lastname", "varchar(255)"],
["userid", "int"]
]
}
]
local_lookups => [ #定义lookup查询语句
{
id => "local-servers"
query => "select descr as description from servers WHERE ip = :ip"
parameters => {ip => "[from_ip]"}
target => "server"
},
{
id => "local-users"
query => "select firstname, lastname from users WHERE userid = :id"
parameters => {id => "[loggedin_userid]"}
target => "user" #定义存储的字段,如果返回的是多列那么存储的格式是json。
}
]
# using add_field here to add & rename values to the event root
#从JSON对象中获取数据,并将其存储在顶级事件字段中,以便在Kibana中进行更简单的分析。
add_field => { server_name => "%{[server][0][description]}" }
add_field => { user_firstname => "%{[user][0][firstname]}" }
add_field => { user_lastname => "%{[user][0][lastname]}" }
remove_field => ["server", "user"]
jdbc_user => "logstash"
jdbc_password => "example"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
}
}

5、jdbc_streaming filter
将执行sql的结果存储在country_details 这个字段。

1
2
3
4
5
6
7
8
9
10
11
12
filter {
jdbc_streaming {
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
jdbc_user => "me"
jdbc_password => "secret"
statement => "select * from WORLD.COUNTRY WHERE Code = :code"
parameters => { "code" => "country_code"}
target => "country_details"
}
}

6、translate filter
根据哈希或文件中指定的替换值替换字段内容。目前支持这些文件类型:YAML、JSON和CSV。
下面的示例获取response_code字段的值,根据字典中指定的值将其转换为描述,然后从事件中删除
(可以理解成枚举吗?)
response_code字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
filter {
translate {
field => "response_code"
destination => "http_response"
dictionary => {
"200" => "OK"
"403" => "Forbidden"
"404" => "Not Found"
"408" => "Request Timeout"
}
remove_field => "response_code"
}
}

7、useragent filter
下面的示例获取代理字段中的agent字符串,将其解析为agent字段,并将用户代理字段添加到名为user_agent的新字段。它还删除了原来的agent字段:

1
2
3
4
5
6
7
filter {
useragent {
source => "agent"
target => "user_agent"
remove_field => "agent"
}
}