# Canal

# 安装(单机)

# 下载解压

官方 (opens new window)下载并解压

mkdir /opt/canal
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
1
2

# 修改 conf/canal.properties 的配置

这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111

hdp1-➜  /opt vim /opt/canal/conf/canal.properties
1

修改 canal 的输出 model,默认 tcp,改为输出到 kafka

# tcp 就是输出到 canal 客户端,通过编写 Java 代码处理
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
1
2
3

修改 Kafka 集群的地址

##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = hdp1:9092,hdp2:9092,hdp3:9092
1
2
3
4

如果创建多个实例

通过 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/ 下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实 例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example, 并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

#################################################
#########               destinations            #############
#################################################
canal.destinations = example
1
2
3
4

# 修改 instance.properties

我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下

hdp1-➜  /opt vim /opt/canal/conf/example/instance.properties
1

配置 MySQL 服务器地址


 





# position info
canal.instance.master.address=k8s-master:30054
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
1
2
3
4
5
6

创建数据库账号

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
1

配置连接 MySQL 的用户名和密码


 
 


# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
1
2
3
4

修改输出到 Kafka 的主题以及分区数


 








# mq config
canal.mq.topic=ods_base_db_c
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
1
2
3
4
5
6
7
8
9

默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 canal.mq.partitionHash 属性

# 单机 canal 测试

启动 canal

hdp1-➜  ~ /opt/canal/bin/startup.sh
hdp1-➜  ~ jps
9635 CanalLauncher
1
2
3
更新时间: 1/5/2022, 2:36:04 PM