Logstash用例

注意事项

日期格式

logstash在同步MySQL数据时,在日期格式化上面,废了不少功夫。以此记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create table `pao_fiancial_stat` (
`id` int(11) unsigned not null auto_increment,
`date` date not null DEFAULT '0000-00-00' comment '统计日期',
`newMemberNum` int(4) not null DEFAULT 0 comment '今日注册用户',
`memberPayNum` int(4) not null DEFAULT 0 comment '今日消费用户',
`todayInCome` DECIMAL (9,2) not NULL DEFAULT 0.0 comment '今日消费金额',
`newMemberPayNum` int(4) not null DEFAULT 0 comment '新用户消费数',
`newMemberInCome` DECIMAL (9,2) not NULL DEFAULT 0.0 comment '新用户消费金额',
`oldMemberPayNum` int(4) not null DEFAULT 0 comment '老用户消费数',
`oldMemberInCome` DECIMAL (9,2) not NULL DEFAULT 0.0 comment '老用户消费金额',
`todayArup` DECIMAL (9,2) not NULL DEFAULT 0.0 comment '今日ARUP值',
`update_time` TIMESTAMP not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `unq_date` (`date`)
)engine=innodb DEFAULT charset=utf8 auto_increment=1;

在ES索引中,date字段的type为 date 类型。logstash在同步的过程中,date字段每次都会被自动转成UTF时间格式,如;2019-01-04T02:43:29.000Z ,而我想要的格式是:2019-01-01 。最后查询文档最后也是只能将MySQL的date字段的类型改成 varchar 类型。

检索MySQL最新数据

在配置同步MySQL的数据到ES时,有用到这个配置:

1
last_run_metadata_path => "/data0/log/syncpoint_table"

那么这个配置的意思就是:将最新的更新时间保存到指定的文件。问题就出在这个文件下。
如果syncpoint_table这个文件是我在/data0/log/ 目录下手动创建的,那么还好,但是如果是logstash执行的时候自动创建的,那么问题就来了。一旦停止logstash,这个文件也会被删除。当下次再次执行的时候,logstash又会检索所有的MySQL数据。因此记得一定要手动创建这个文件。

同步Mysql数据到ES

读取本地库的数据,并且用update_time获取最新改动的数据,最后同步到ES。

备忘

起初阶段因为copy别人的用例而导致了一些奇怪的问题,现在将记录一下防止忘记。
1、stdin和stdout两个模块是否特别眼熟,在我们第一步开始安装Logstash的时候就有见过,如:

1
bin/logstash -e 'input { stdin { } } output { stdout {} }'

然后我们就可以在终端上面敲入随意的字符并且会输出一些信息以检验Logstash安装成功。

然而,在配置livecastMatch.conf的时候,由于配置了stdin和stdout,导致:

  • –config.reload.automatic 配置项失效。即使配置了config.reload.automatic: true 也不起作用。

  • 在dynamic为true的情况下,在终端输入任何字符(包括换行符)都会导致写入一些信息到ES。

  • 后来开启了stdout,关闭stdin,还是可以自动加载配置文件。(只有stdin会影响?)

同步MySql可能遇到的问题

  1. 数据库里的日期格式需要转换

  2. 数据库里只有外键ID,没有描述

  3. 新增一些其他的字段

  4. 更新已有字段的值

Logstash配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
# stdin{
# }

# jdbc相关文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
jdbc {
#sql_last_value 如果启用了tracking_column,那么这个变量存储的值则是这个字段
# 数据库 必须
jdbc_connection_string => "jdbc:mysql://localhost:3306/sports_data"
# 用户名密码
jdbc_user => "root"
jdbc_password => "123456"
#jdbc_password_filepath
# jar包的位置
jdbc_driver_library => "/Users/pzr/Downloads/logstash/mysql-connector-java-5.0.8-bin.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#statement_filepath => "config-mysql/test02.sql"
statement => "select `id`,`Title`,`Team1`,`Team2`,`MatchDate`,`update_time`,`LeagueType`,`MatchTime` from livecast_match where `update_time`>:sql_last_value "
schedule => "*/1 * * * *"
#索引的类型
type => "doc"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "/data0/log/syncpoint_table"

#clean_run 默认false,是否先前的执行需要被保护
#columns_charset 默认{} ,columns_charset => { "column0" => "ISO-8859-1" } 设置某个字段的编码
#connection_retry_attempts 默认1,db的重试连接次数
#connection_retry_attempts_wait_time 默认0.5,重试的间隔时间
#jdbc_connection_string 时区,使你的sql自动转换时间
#jdbc_fetch_size #JDBC fetch size. if not provided, respective driver’s default will be used
#jdbc_pool_timeout 默认5,从连接池获取连接的过期时间
#jdbc_validate_connection 连接jdbc之前是否需要检查,默认false
#jdbc_validate_timeout 默认3600,检查连接的频度
#last_run_metadata_path 默认:$HOME/.logstash_jdbc_last_run
#lowercase_column_names: 默认true
#parameters hash类型,eg { "target_id" => "321" }
#record_last_run 默认true,Whether to save state or not in last_run_metadata_path
#sequel_opts
#sql_log_level 默认info,可选:fatal, error, warn, info, debug
#statement eg:"SELECT * FROM MYTABLE WHERE id = :target_id"
#statement_filepath
#tracking_column #The column whose value is to be tracked if use_column_value is set to true
#tracking_column_type #Type of tracking column. Currently only "numeric" and "timestamp"
#use_column_value 默认false,

jdbc_default_timezone => 'Asia/Shanghai'

#公用字段
#add_field
codec => "rubydebug"
#enable_metric
#id
#tags 默认[],
#type

}
}

filter {
mutate {
add_field => {
"match_date" => "%{matchdate} %{matchtime}"
}
remove_field => ["matchdate", "matchtime"]
}

jdbc_streaming {
cache_expiration => 3600
jdbc_driver_library => "/Users/pzr/Downloads/logstash/mysql-connector-java-5.0.8-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/sports_data"
jdbc_user => "root"
jdbc_password => "123456"
statement => "select `name`,`name_en` from lottery_league WHERE `lottery_id` = :lid"
parameters => { "lid" => "[leaguetype]"}
target => "league"
add_field => {
"ln" => "%{[league][0][name]}"
"ln_en" => "%{[league][0][name_en]}"
}
remove_field => ["league"]
}


# mutate {
# remove_field => ["matchdate", "matchtime"]
# }

date {
match => ["update_time", "ISO8601"]
# timezone => "Asia/Shanghai"
target => "updatetime"
}
}

output {

# elasticsearch 模块:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
# 将input的值,输出到elasticsearch模块配置中
elasticsearch {
#action 默认index,可选:delete,create,update,还可以用传参的形式如: %{[foo]}
#hosts => [//127.0.0.1] 可以:["127.0.0.1:9200","127.0.0.2:9200"] 负载均衡
hosts => "127.0.0.1:9200"
# index名
index => "livecast_match2"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
#document_type 已经废弃
# 如果不存在就插入,否则更新
#doc_as_upsert => true
# 过滤一些错误码不作提示
failure_type_logging_whitelist => [409]
#healthcheck_path 检测服务是否正常响应,但是不知道填什么值
#http_compression => false 是否可以压缩
#keystore 证书
#keystore_password 证书密码
#manage_template => true 默认是true,理解:索引选用的模板,如果自己定义了index并且以field创建,那么建议设置为false。
#parameters hash类型。 以键值对的形式配置查询,并且作用于hosts的配置。如果hosts其中已有配置,那么追加条件。
#parent 默认值是:nil,
#password 对于一些需要检验的elasticsearch来说,需要密码。
#pileline 选择你想执行事件的管道 可以配置值为:"%{INGEST_PIPELINE}"
#pool_max 数值类型,默认是1000. 最大可以开启的output模块的连接数。 如果设置太小那么将会导致output连接快速关闭||创建。
#pool_max_per_route 默认是100,每个终端可设置的最大output连接数
#proxy uri类型,设置http代理
#resurrect_delay 默认是5,短时间内可以尝试的连接次数
#retry_initial_interval 默认2, 设置首次短时间内可以重连的次数, 不超过retry_max_interval 的双倍
#retry_max_interval 默认是64,设置短时间内可以重连的次数
#retry_on_conflict 默认1,索引的更改一般都是:删除,创建,同步。但是我们也可以update,但其实内部也是做了delete和insert。然后在update的时候如果有其他进程也在修改,则可能产生冲突。
#routing string类型,可以作用到所有进程。
#script 类型string,选用脚本在update 模式
#script_lang 设置script的语言。在ES5.0默认是:painless,如果在6或者更高版本,用了indexed script那么必须设置为空“”
#.....省略部分参数
#timeout 默认60s,短时间内请求ES,如果超时那么重试
#公用的参数
#codec 默认plain,格式化输出数据
#enable_metric 默认是true,记录日志?
#id 强烈建议设置id,默认是空,如果为空那么系统会自动设置一个值。如果在多个plugins时就很有必要了。

codec => 'rubydebug'
}
stdout {
codec => "rubydebug"
}
}