Logstash插件
Logstash 插件
input
一些经常被公用的属性介绍:
- file:从文件中读取数据,类似UNIX的
tail-0f
- syslog:监听514端口获取syslog的消息,并且解析格式化成 RFC3164.
- redis:从redis服务器读取数据,同时使用redis通道和redis列表。Redis通常用作中央Logstash安装中的“代理”,它将Logstash事件从远程Logstash“发件人”中进行排队。
- beats:处理Beats发送的事件。
jdbc
1 | # jdbc相关文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html |
beats
beats有以下种类:
- Filebeat,
- Metricbeat,
- Packetbeat,
- Winlogbeat,
- Auditbeat,
- Heartbeat,
- Functionbeat。
介绍
这个输入插件允许Logstash从Elastic Beats框架接收事件。如下:
1 | input { |
属性
| 参数 | 类型 | 必要 | 备注 |
| :———————————————————————————————————————————————————– | :—————————————————————————————————— | :— | :—————————————————————————————————————————————————— | — | —————- |
| add_hostname
| boolean | No | 是否自动读取beat的hostname值到host字段,(废弃)。默认false |
| cipher_suites
| array | No | 密码组合配置列表,参数可参考:查看参数 |
| client_inactivity_timeout
| number | No | 关闭空闲的客户端在设定的时间之后。默认是60s |
| host
| string | No | 如果配置add_hostname为true,那么则自动抓取beat的值 |
| include_codec_tag
| boolean | No | 默认true, |
| port
| number | Yes | 端口 |
| ssl
| boolean | No | 消息传递默认是以文本的形式。可以通过开启ssl认证加密。 |
| ssl_certificate
| 路径 | No | 结合ssl |
| ssl_certificate_authorities
| array | No | |
| ssl_handshake_timeout
| number | No | |
| ssl_key
| 路径 | No | |
| ssl_key_passphrase
| password | No | |
| ssl_verify_mode
| string | No | 其中之一: ["none", "peer", "force_peer"]
|
| ssl_peer_metadata
| boolean | No | 默认false,[ssl_verify_mode](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats-ssl_verify_mode)
为peer | | force_peer才有效 |
| tls_max_version
| number | No | 默认1.2,对应的TLS版本 |
| tls_min_version
| number | No | 默认1,对应TLS版本 |
公共属性
参数 | 类型 | 必要 |
---|---|---|
add_field |
hash | No |
codec |
codec | No |
enable_metric |
boolean | No |
id |
string | No |
tags |
array | No |
type |
string | No |
filter
filter过滤器,可以将input读取的数据进行过滤从而得到你最需要的数据。
filter插件中获取input的值的形式有哪些呢?
“%{my_field}” 获取某个字段的值
“%{[object][my_field]}” 获取某个数组对象的字段值
值得注意的是:如果数据经过input模块之后是不区分大小写的,那么在filter模块获取数据的时候必须用小写(input模块输出的字段)。否则可能会无法取到值。
一些常用的过滤器有:
grok:Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询数据的最佳方法。Logstash内置了120个模式,您很可能会找到一个满足您需求的模式!
mutate:对事件字段执行一般转换。您可以重命名、删除、替换和修改事件中的字段。
drop:完全删除某一个事件,例如 debug事件。
clone:备份事件,可能是添加或者删除字段。
geoip:添加关于IP地址地理位置的信息(在Kibana还显示惊人的图表!)
date filter
详情点击:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
日期筛选器用于解析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。
参数选项
locale:默认为空字符。类型:string
使用IETF-BCP47或POSIX语言标记指定要用于日期解析的语言环境。简单的例子有en,en- us表示BCP47, en_US表示POSIX。
对于解析月份名称(带有MMM的模式)和工作日名称(带有EEE的模式),通常需要设置语言环境。
如果没有指定,将使用平台默认值,但是对于非英语平台默认值,还将使用英语解析器作为回退机制。match:类型:array
eg:
1 | match => [ "logdate", "MMM dd yyyy HH:mm:ss", |
那么会自动匹配logdate的这么多的格式。
ISO8601:可以匹配任何ISO8601的格式,比如:2011-04-19T03:44:01.103Z
UNIX:可以匹配float或者int型的以秒为单位的日期格式,例如:1326149001
UNIX_MS:匹配以ms为单位的日期格式,比如:1366125117000
TAI64N:可以匹配tai64n格式的日期。
那么y M d HH mm ss又是什么意思呢?
y year
yyyy 完整格式的年,比如:2018
yy 2位数格式的年,比如18
M month
M 1位数格式,比如:1表示1月,12表示12月。
MM 2位数格式月份,01表示1月,12表示12月。
MMM 比如:
Jan
for JanuaryMMMM 比如:January
d day
d 1位数格式,比如:1表示1号,而不用01表示。
dd 2位数格式,比如:01表示1号。
H hour
H 1位数格式,比如 1表示凌晨1点
HH 2位数格式,比如 01表示凌晨1点
m minute
m 1位数表示,比如 : 1
mm 2位数表示,比如:01
s second
s 1位数表示,比如:1
ss 2位数表示,比如:01
S 每秒最大精度的分数是毫秒(SSS)。除此之外,还附加了0。
S 十分之一秒,0表示毫秒数:012 。怎么算的呢?
SS 百分之一秒,01表示 01毫秒。
SSS 千分之一秒,012表示012毫秒。
Z time zone
Z
ZZ
ZZZ
z time zone names,无法解析
w week of the year
w:1位数表示
ww:2位数表示
D day of the year
e day of the week(number)
E day of the week(text)
E,EE,EEE
EEEE
tag_on_failure:类型:array,默认值:[“_dateparsefailure”]
如果匹配失败则追加值到tags字段target:类型string,默认:”@timestamp”
存储匹配的值到这个配置的字段中,默认是更新@timestamp 这个字段timezone:类型:string,无默认值 相关的配置项查看
公共配置项
add_field
add_tag
enable_metric
id
periodic_flush
remove_field
remove_tag
mutate filter
详情查看:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
介绍:可以帮助你重命名、移除,替换或者修改字段。
属性
coerce
类型hash,无默认值。给字段设置一个默认值如果这个字段存在但是为null
1 | filter { |
rename
类型hash,无默认值。重命名一个或者多个字段。
1 | filter { |
update
类型hash,无默认值。更新一个已有的字段值,如果这个字段不存在不操作。
1 | filter { |
replace
类型hash,无默认值。替换一个字段的值。可以用%{foo}获取其他的值。
1 | filter { |
convert
类型:hash,无默认值,数据类型转换。如果这个字段是一个数组,那么它的所有成员都会被转换。如果是一个hash,那么不会变。
类型有:
integer:
字符串:eg “1000”=>1000, “1000.01”=>1000 小数点会被舍去
浮点数:eg 3.99=>3 , -2.7=>-2
boolean:true=>1, false=>0integer_eu:和integer相同,除了字符串支持点分隔符和小数。(e.g.,
"1.000"
produces an integer with value of one thousand)float:
integer:转成浮点型
字符串:”1000.5”=>1000.5
boolean:true=>1.0, false=>0.0float_eu:和float相同,除了字符串支持点分隔符和小数。(e.g.,
"1.000,5"
produces an integer with value of one thousand and one half)string:所有类型都可以转成编码格式为utf-8的字符串。
boolean:
字符串:”true”, “t”, “yes”, “y”, “1” 都被转成true。”false”,”f”,”no”,”n”,”0” 都被转成false
空字符串转成false
其他的一概不被转换,保留原值且会输出警告日志。
gsub
类型:array,无默认值。根据字段值匹配正则表达式,并用替换字符串替换所有匹配。只支持字符串或字符串数组字段。对于其他类型的领域将不采取行动。
1 | #此配置接受每个字段/替换包含3个元素的数组。 |
uppercase
类型array,无默认值。将字符串转成大写。
1 | filter { |
capitalize
类型array,无默认值。将字符串转换为大写的等效字符串。
1 | filter { |
lowercase
类型array,无默认值。将字符串转成小写
1 | filter { |
strip
类型array,无默认值。去除头部和尾部的空格。
1 | filter { |
remove
在官方文档里面的枚举中有,但是却没有详细的记录。在线上配置中也提示不存在remove命令。可以用公用配置remove_field 替代。
split
类型hash,无默认值。将一个字符串切割成数组。只对字符串起作用。
1 | filter { |
join
类型hash,无默认值。用分隔符连接数组。对非数组字段不做任何操作。
1 | filter { |
merge
类型hash,无默认值。合并2个数组字段或者hash字段。2个字符串会自动传承两个元素的数组。
1 | #`array` + `string` will work |
copy
类型hash,无默认值。复制一个已有字段到另外一个字段。旧的字段会被重写。
1 | filter { |
示例:
1 | filter { |
公用属性
Setting | Input type | Required |
---|---|---|
add_field | hash | No |
add_tag | array | No |
enable_metric | boolean | No |
id | string | No |
periodic_flush | boolean | No |
remove_field | array | No |
remove_tag | array | No |
jdbc_streaming filter
介绍
这个过滤器执行一个SQL查询,并将结果集存储在指定为target的字段中。它将结果集存储在本地的LRU缓存中。
1 | filter { |
属性
Setting | Input type | Required | 默认值 | 介绍 |
---|---|---|---|---|
cache_expiration | number | No | 5.0s | 缓存过期时间 |
cache_size | number | No | 500 | 缓存中最大的存储数量,最久未使用的将被移除 |
default_hash | hash | No | {} | 定义一个默认对象,当查找不能返回匹配的行时使用。确保该对象的键名与语句中的列匹配 |
jdbc_connection_string | string | Yes | 无 | jdbc的连接句柄:jdbc:mysql://localhost:3306/sports_data |
jdbc_driver_class | string | Yes | 无 | jdbc驱动:com.mysql.jdbc.Driver |
jdbc_driver_library | url | No | 无 | jdbc的jar包:~/mysql-connector-java-5.0.8-bin.jar |
jdbc_password | password | No | 无 | 数据库密码 |
jdbc_user | string | No | 无 | 数据库用户名 |
jdbc_validate_connection | boolean | No | false | 连接池分配连接之前校验 |
jdbc_validation_timeout | number | No | 3600s | 多久检验一次数据库连接 |
parameters | hash | No | {} | { “id” => “id_field” } |
statement | string | Yes | 无 | sql语句 |
tag_on_default_use | array | No | [“_jdbcstreamingdefaultsused”] | 如果没有查找到数据并且启用了默认的值,则追加输出到tags |
tag_on_failure | array | No | [“_jdbcstreamingfailure”] | sql查询错误输出到tags |
target | string | Yes | 无 | 定义新的字段或者重写旧的字段 |
use_cache | boolean | No | true | 是否启用缓存 |
公用属性
Setting | Input type | Required |
---|---|---|
add_field | hash | No |
add_tag | array | No |
enable_metric | boolean | No |
id | string | No |
periodic_flush | boolean | No |
remove_field | array | No |
remove_tag | array | No |
用例
1 | jdbc_streaming { |
要点:
在statement中的查询sql中需要传入一个变量lottery_id, 而这个lottery_id 是从input模块中获取的leaguetype字段的值。需要注意的是这里的赋值方式是: { “lid” => “[leaguetype]”}
获取sql查询的结果值,这里可以看到是用的:%{[league][0][name]}。league是从sql查询的结果缓存到这个字段,并且是一个对象数组。因此通过%{[league][0][name]}这种写法获取到league这个对象数组中的第一个对象的name属性。
target属性的用法:在很多的模块中都可以看到这个属性,通过这个例子也可以知道target的用法。它可以结合公用属性add_field给事件添加一些字段或者删除一些不必要的字段。
output
logstash的最后一个阶段,一个事件可以经过多个output,如果所有的output都执行结束,那么这个事件则结束。
一些常用的插件
elasticsearch:将数据写入到elasticsearch
file:将数据写入到文件
graphite:将数据用图像显示
statsd:侦听通过UDP发送的统计信息,如计数器和计时器,并将聚合发送到一个或多个可插入后端服务
Elasticsearch
参数配置
1 | # elasticsearch 模块:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html |
死信队列
死信队列特性目前只支持用于elasticsearch输出。此外,死信队列仅在响应代码为400或404的情况下使用,这两种情况都表示无法重试事件。对额外输出的支持将在Logstash插件的未来版本中提供。在配置logstash以使用此特性之前,请参阅输出插件文档,以验证插件是否支持死信队列特性。
启用私信队列:
1 | dead_letter_queue.enable: true |
codec
codec是一个最基础的流过滤器,可以最为input和output的一部分。可以很简易的将input或者output的数据输出格式化成其他的格式,如:json,msgpack,plain。
- json:格式化成json格式
- multiline:将多行文本事件(如java exception和stacktrace消息)合并到单个事件中。
input-filter-output
- 从twitter获取数据并且输出到文件
1 | input { |
数据转换
核心操作
因为filter的插件有200多个,所以选择哪一个最适合是一件头疼的事情。因此这里会介绍几种常用的操作。
1、date filter
1 | filter { |
2、drop filter
1 | filter { |
3、fingerprint filter
1 | filter { |
4、mutate filter
1 | filter { |
5、ruby filter
1 | filter { |
数据反序列化
1、从kafka中反序列化
1 | input { |
2、csv filter
1 | filter { |
3、fluent codec
1 | input { |
4、json codec
1 | input { |
5、protobuf codec
1 | input |
6、xml filter
1 | filter { |
取出字段并且解析
1、dissert filter
使用分隔符将非结构化事件数据提取到字段中。dissect过滤器不使用正则表达式,而且非常快。但是,如果数据的结构因行而异,则grok过滤器更适合。
1 | #message |
2、kv filter
1 | #message |
3、grok filter
将非结构化事件数据解析为字段。该工具适用于syslog日志、Apache和其他webserver日志、MySQL日志,一般来说,适用于任何为人类而不是计算机使用而编写的日志格式。Grok通过将文本模式组合成与日志相匹配的内容来工作。
1 | #message |
丰富数据
1、dns filter
1 | filter { |
2、elasticsearch filter
将Elasticsearch中以前日志事件的字段复制到当前事件。
下面的配置显示了如何使用这个过滤器的完整示例。每当Logstash接收到“end”事件时,它都会使用这个Elasticsearch过滤器根据某个操作标识符查找匹配的“start”事件。然后,它将@timestamp字段从“开始”事件复制到“结束”事件的新字段中。最后,使用date过滤器和ruby过滤器的组合,示例中的代码计算两个事件之间的时间间隔(以小时为单位)。
1 | if [type] == "end" { |
3、geoip filter
1 | filter { |
4、jdbc_static filter
使用预先从远程数据库加载的数据丰富事件。
下面的示例从远程数据库中获取数据,将其缓存到本地数据库中,并使用查找来用本地数据库中缓存的数据丰富事件。
1 | filter { |
5、jdbc_streaming filter
将执行sql的结果存储在country_details 这个字段。
1 | filter { |
6、translate filter
根据哈希或文件中指定的替换值替换字段内容。目前支持这些文件类型:YAML、JSON和CSV。
下面的示例获取response_code字段的值,根据字典中指定的值将其转换为描述,然后从事件中删除
(可以理解成枚举吗?)
response_code字段:
1 | filter { |
7、useragent filter
下面的示例获取代理字段中的agent字符串,将其解析为agent字段,并将用户代理字段添加到名为user_agent的新字段。它还删除了原来的agent字段:
1 | filter { |