使用canal实现mysql数据同步

MySQL主从复制原理:

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

一.canal-php 简介

canal-php 是阿里巴巴开源项目 Canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 php 客户端。为 php 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。

二.应用场景

canal-php 作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:

  • 1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
  • 2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等
  • 3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis
  • 4.数据库异地备份、数据同步
  • 5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。
  • 6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

四.工作流程

  • 1.Canal连接到mysql数据库,模拟slave
  • 2.canal-php 与 Canal 建立连接
  • 3.数据库发生变更写入到binlog
  • 4.Canal向数据库发送dump请求,获取binlog并解析
  • 5.canal-php 向 Canal 请求数据库变更
  • 6.Canal 发送解析后的数据给canal-php
  • 7.canal-php收到数据,消费成功,发送回执。(可选)
  • 8.Canal记录消费位置。

五:开启binlog

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

查看binlog是否开启成功:

//value=ON表示开启
show variables like 'log_bin'; 

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

六:安装canal

docker pull canal/canal-server:v1.1.5

docker run -p 11111:11111 --name canal \
-e canal.destinations=qc \
-e canal.instance.master.address=120.79.xxx.xx:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=laravel6.articles \  
-d canal/canal-server:v1.1.5

注释:
-e canal.destinations=qc   #监听到的数据变更发送的队列
其中的ip地址是你的mysql地址
配置了访问数据库的账户和密码
-e canal.instance.filter.regex=laravel6.articles  
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2 
docker logs canal查看安装成功
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Generating SSH1 RSA host key: [  OK  ]
Starting sshd: [  OK  ]
Starting crond: [  OK  ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Starting sshd: [  OK  ]
Starting crond: [  OK  ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

七、构建canal php客户端

composer require xingwenge/canal_php

    public function test()
    {
        try {
            $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
            # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

            $client->connect("120.79.xxx.xx", 11111);
            $client->checkValid();
//            $client->subscribe("1001", "qc", ".*\\..*");
            $client->subscribe("1001", "qc", "laravel6.articles");
            # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤

            while (true) {
                $message = $client->get(100);
                if ($entries = $message->getEntries()) {
                    foreach ($entries as $entry) {
                        Fmt::println($entry);
                    }
                }
                sleep(1);
            }

            $client->disConnect();
        } catch (\Exception $e) {
            echo $e->getMessage(), PHP_EOL;
        }
    }

效果:

zed
请先登录后发表评论
  • latest comments
  • 总共0条评论