博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
es 导入数据logstash
阅读量:6693 次
发布时间:2019-06-25

本文共 1760 字,大约阅读时间需要 5 分钟。

hot3.png

1.1 logstash-jdbc-input
配置文件,需要ES已经安装好logstash-jdbc-input 插件

/etc/logstash/conf.d /mysql-new.conf

input {

    jdbc {

    # 数据库地址  端口  数据库名

      jdbc_connection_string => "jdbc:mysql://192.168.16.220:3306/goods"

    # 数据库用户名      

          jdbc_user => "user"

    # 数据库密码

      jdbc_pwd => "pwd"

    # mysql java驱动地址

      jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.44-bin.jar"

      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"

      jdbc_page_size => "50000"

      # sql 语句文件

      statement_filepath =>"/etc/logstash/conf.d/mysql-goods.sql"

      schedule => "0 * * * * "

          lowercase_column_names => false

   add_field => {

                        stock=> 1

                                       }

    }

}

output {

        stdout {

        codec => json_lines

        }

        elasticsearch {

        hosts => "192.168.160.160:9200"

        index => "goods"

                document_type => "goods"

                document_id => "%{id}"

    }

}

其中的schedule代表定时执行cron,可以让他不断读取mysql数据更新,但数据大的话过程较慢,无法实时。

Sql文件:/etc/logstash/conf.d/mysql-goods.sql

select * from table 

执行命令:

  nohup  /usr/share/logstash/bin/logstash  –f  /etc/logstash/conf.d /mysql-new.conf

 >xxxx.log  &

1.2 logstash 读取kafka消息生成到es,动态更改数据。

配置文件

/etc/logstash/conf.d /kafka.conf

input {

        kafka{

                bootstrap_servers => "192.168.161.128:9092"

                group_id => "logstash-goods-group"

                topics => ["goods"]

                consumer_threads => 5

                decorate_events => true

                codec => "json"

        }

}

output {

  elasticsearch {

                hosts => ["192.168.160.160:9200"]

                index => "goods"

                document_type => "goods"

                document_id => "%{id}"

        }

  stdout { codec => rubydebug }

}

这里的配置就是让logstash 接收到kafka对应topic的数据,然后倒入到ES指定的索引库里。

bootstrap_servers:kafka  server

group_id: kafka消息的group_id

topics:topic

index:es的索引名字

document_type: es 索引对应的type

document_id : es数据的id字段

执行命令:

nohup  /usr/share/logstash/bin/logstash  –f  /etc/logstash/conf.d /kafka.conf

 >xxxx.log  &

执行命令成功后,其他地方生产kafka消息后,就能推入到ES里。

 
 

转载于:https://my.oschina.net/xiaominmin/blog/1785436

你可能感兴趣的文章
【iCore4 双核心板_FPGA】例程九:锁相环实验——锁相环使用
查看>>
SQL Server 审计
查看>>
Java并发编程(一)学习大纲
查看>>
centos 基础修改文件权限
查看>>
05Hadoop-左外连接
查看>>
BBS论坛(四)
查看>>
轮询、长轮询、长连接、socket连接、WebSocket
查看>>
python3 识别图片文字
查看>>
aspx->cs->dll :在部署后就让所有的aspx处于已经编译成dll的状态
查看>>
存储过程介绍及asp存储过程的使用
查看>>
hibernate---->多对一关联映射
查看>>
学习动态性能表(5)--v$session
查看>>
文字在div中水平和垂直居中的的css样式
查看>>
spring与hibernate整合
查看>>
cocos creator protobuf实践
查看>>
OEA 2.11 支持单机版数据库 - SQLite与SQLCE对比
查看>>
Android基本界面控件(转)
查看>>
jdbc 处理Clob
查看>>
【转】【IE大叔的嘴歪眼斜】之—— 由hasLayout引发的临床CSS Bug表
查看>>
Eclipse4.0修改为传统的界面
查看>>