/etc/logstash/conf.d /mysql-new.conf
input {
jdbc {
# 数据库地址 端口 数据库名
jdbc_connection_string => "jdbc:mysql://192.168.16.220:3306/goods"
# 数据库用户名
jdbc_user => "user"
# 数据库密码
jdbc_pwd => "pwd"
# mysql java驱动地址
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# sql 语句文件
statement_filepath =>"/etc/logstash/conf.d/mysql-goods.sql"
schedule => "0 * * * * "
lowercase_column_names => false
add_field => {
stock=> 1
}
}
}
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => "192.168.160.160:9200"
index => "goods"
document_type => "goods"
document_id => "%{id}"
}
}
其中的schedule代表定时执行cron,可以让他不断读取mysql数据更新,但数据大的话过程较慢,无法实时。
Sql文件:/etc/logstash/conf.d/mysql-goods.sql
select * from table
执行命令:nohup /usr/share/logstash/bin/logstash –f /etc/logstash/conf.d /mysql-new.conf
>xxxx.log &
1.2 logstash 读取kafka消息生成到es,动态更改数据。
配置文件/etc/logstash/conf.d /kafka.conf
input {
kafka{
bootstrap_servers => "192.168.161.128:9092"
group_id => "logstash-goods-group"
topics => ["goods"]
consumer_threads => 5
decorate_events => true
codec => "json"
}
}
output {
elasticsearch {
hosts => ["192.168.160.160:9200"]
index => "goods"
document_type => "goods"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
这里的配置就是让logstash 接收到kafka对应topic的数据,然后倒入到ES指定的索引库里。
bootstrap_servers:kafka server
group_id: kafka消息的group_id
topics:topic
index:es的索引名字
document_type: es 索引对应的type
document_id : es数据的id字段
执行命令:
nohup /usr/share/logstash/bin/logstash –f /etc/logstash/conf.d /kafka.conf
>xxxx.log &
执行命令成功后,其他地方生产kafka消息后,就能推入到ES里。