相关推荐recommended
实现mysql和es数据同步的两大工具——Logstash和Canal
作者:mmseoamin日期:2024-04-01

 用途

在大型实战项目开发过程中,当数据量达到比较大的规模时,不可避免的要考虑使用ElasticSearch(es)等搜索引擎来解决大量数据的查询性能压力,因此,做好mysql的数据同步变得至关重要。我所了解,并且使用的是通过Logstash和Canal中间件,来实现将数据写入到ES等中。

一、实现同步原理

1.1  Logstash

Logstash提供了一个JDBC插件,它可以定期查询数据库并捕获变化。通过配置Logstash指定连接到mysql的哪个表和es的哪个索引库,并指定对应的查询语句。当MySQL中的数据发生变化时,Logstash的JDBC插件会定时的检测到这些变化,并且迅速捕获这些新数据或者以更新的数据(根据文档id),并将其同步到es中。

注意
  • 因为涉及到数据的更新,ES和Mysql表的id字段对应关系
  • 因为是根据时间实现增量同步,所以mysql表中必须有一个包含更新或插入时间的字段

    1.2  Canal

    Canal中间件则是通过模拟成mysql的从库,实时接收mysql的增量数据,然后通过RESTful API将数据写入到ES中。

    二、使用步骤(以es为7.14.0为例)

    2.1  Logstash

    1. 下载与es对应版本的logstash-7.14.0和使用的mysql jar包
    2. 在logstash-7.14.0安装目录下创建文件夹mysqltb,将jar包放在该目录下,新建mysql.conf文件,添加内容如下:
    3. 命令行执行 logstash ‐f ../mysqletc/mysql.conf

    input {

      jdbc {

          # 连接的数据库以及表

          jdbc_connection_string => ""

          # 数据库的账号和密码

          jdbc_user => ""

          jdbc_password => ""

          # mysql jar包的路径

          jdbc_driver_library => ""

          # the name of the driver class for mysql

          jdbc_driver_class => "com.mysql.jdbc.Driver"

          jdbc_paging_enabled => "true"

          jdbc_page_size => "50000"

          #以下对应着要执行的sql的绝对路径。

          #statement_filepath => ""

          statement => ""

          #定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

          schedule => "* * * * *"

           # 数据标签,用于标记不同的索引数据

          type => "user"

          # 加上jdbc时区, 要不然logstash的时间会不准确

          jdbc_default_timezone => "Asia/Shanghai"

          # 设置列名区分大小写, 默认全小写

          lowercase_column_names => "false"

      }

    }

    output {

      if[type] == "user"{

      elasticsearch {

          #ESIP地址与端口

          hosts => ["" ]

          #ES索引名称(自己定义的)

          index => ""

          #!!!自增ID编号,与查询语句中特有字段名一致

          document_id => "%{id}"

          # 这个必须要写不然不会同步

          document_type => "_doc"

      }}

      stdout {

          #以JSON格式输出

          codec => json_lines

      }

       实现mysql和es数据同步的两大工具——Logstash和Canal,第1张

    2.2  Canal

    下载canal1.1.5

    8.0以下版本均要开启mysql的binlog写入功能,在mysql中使用使用命令show variables like ‘%log_bin%’看是否修改成功

    #开启binlog模式

    log_bin=mysql-bin

    binlog-format=row

    #single DB binlog-ignore-db=mysql

    在Canal.deployer目录下找到配置文件conf/example/instance.properties,修改如下字段

    # 同步数据库地址

    canal.instance.master.address=你的mysql数据库地址

    # 同步数据库账号密码

    canal.instance.dbUsername=你的mysql账号

    canal.instance.dbPassword=你的mysql密码

    修改完成后,保存,启动bin目录下面的startup.bat(linux启动startup.sh),显示Listening for transport dt_socket at address: 9099则没错

    在canal.adapter目录下找到配置文件conf/application.yml,修改以下配置:

     srcDataSources:

        defaultDS:

          url: 你自己的mysql数据库地址

          username: mysql数据库账号

          password: mysql数据库密码

      canalAdapters:

      - instance: example # canal instance Name or mq topic name

        groups:

        - groupId: g1

          outerAdapters:

          - name: logger

          - name: es7 #ES同步适配器

            key: es7Key

            hosts: #ES连接地址,!!!!!!主要这里有的地方要加http:

            properties:

             mode: rest

    在conf/es7目录中,新建与sql映射表名一样的yml文件

    outerAdapterKey: es7Key # 这里和主配置canalAdapters.instance.groupId.outerAdapters.key下面的的key一样

    dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值

    destination: example  

    groupId: g1

    esMapping:

      _index: #ES的索引名称

      _id: _id  

      sql: ""        # sql映射

      etlCondition: "where a.c_time>={}"  

      commitBatch: 3000   #提交批大小

    实现mysql和es数据同步的两大工具——Logstash和Canal,第2张

    注意:

    如果报错,可以先看日志,如果日志没有报错,并且监听到了mysql数据的改变,但是没有同步到es,可能是由于es7.x的版本问题,解决办法如下:

    将canal 1.1.5.alpha-2中plugins目录下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar复制到1.1.5的plugins下

    更多报错参考:canalAdapter同步至es7.x踩坑指南_client-adapter.es7x-1.1.5-snapshot-jar-with-depend-CSDN博客

    三、总结

    logstash使用相对比较简单,易上手,可扩展插件生态系统,支持定时存储,但是它耗资源较大,并且不能与消息队列缓存。canal实时同步存储,查询更快、更迅速,对数据库无压力,但是相对而言难些。