网站首页 > 技术教程 正文
在当今数据驱动的互联网时代,数据的高效处理和实时同步至关重要。对于互联网软件开发人员而言,如何在不同的数据存储系统之间实现精准、高效的数据同步是一个常见且具有挑战性的任务。本文将深入探讨如何在 Spring Boot3 框架下,借助阿里的 Canal 工具,实现 MySQL 数据库与 ElasticSearch 之间的数据同步,为大家提供一套完整且实用的解决方案。
Canal 简介
Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的中间件,其核心作用是将 MySQL 的 binlog(二进制日志)解析为结构化的数据,并提供给下游系统进行消费。Canal 通过模拟 MySQL 从库的交互协议,伪装成 MySQL 从库向主库发送 dump 协议请求,MySQL 主库在收到请求后,会推送 binary log 给 Canal。随后,Canal 将这些原始的二进制日志解析为易于理解和处理的结构化数据,例如 JSON 格式。这一过程实现了对 MySQL 数据库增量变更数据(包括插入、更新、删除等操作)的高效抓取,为数据同步和其他相关应用场景奠定了基础。
Canal 具备诸多显著优势,使其在数据同步领域脱颖而出。首先,它拥有出色的实时性,基于 MySQL 的 binlog 机制,能够在毫秒级内完成数据同步,确保数据的及时性和一致性。其次,Canal 支持批量获取数据库变更数据,大大减少了网络开销和处理时间,提高了数据处理效率。此外,它还具备多线程处理能力,可以配置多个线程来并行处理不同的数据变更事件,进一步提升整体吞吐量。
同时,Canal 支持断点续传功能,在数据同步过程中,若因各种原因导致中断,它能够从断点处继续消费数据,有效避免数据丢失。另外,Canal 还可以将消费进度持久化到 ZooKeeper 中,即使在出现故障后恢复,也能依据持久化的进度信息继续正常工作,内置的多种容错机制,如重试策略和自动恢复功能,极大地提高了系统的可靠性。
此外,Canal 使用标准化的 binlog 协议,方便与其他系统进行集成,并且支持灵活的过滤规则,可以根据实际需求选择性地订阅特定的数据库和表,还允许动态配置,方便开发者根据业务场景的变化随时调整监控范围和处理逻辑。不仅如此,Canal 还为开发者提供了自定义处理器的接口,开发者可以编写自己的代码来实现复杂的数据处理逻辑,满足多样化的业务需求。最后,Canal 能够精确地捕获和同步数据库的每一行变更,确保数据的一致性,并且能够处理复杂的事务场景,保证事务的原子性和完整性,同时还提供了多种冲突解决策略,有效避免数据同步过程中的冲突问题。
环境准备
(一)MySQL 配置
开启 Binlog 并设置模式:要使用 Canal 实现 MySQL 与 ElasticSearch 的数据同步,首先需要在 MySQL 中开启 Binlog 功能,并将其格式设置为 ROW 模式。这可以通过修改 MySQL 的配置文件(在 Linux 系统中通常是 my.cnf,在 Windows 系统中是 my.ini)来实现。在配置文件中添加或修改以下配置项:
log-bin=mysql-bin
binlog-format=ROW
server-id=1
添加或修改完成后,需要重启 MySQL 服务,使配置生效。开启 Binlog 并设置为 ROW 模式是确保 Canal 能够准确解析·数据变更的关键步骤,因为 ROW 模式会记录每一行数据的具体变更情况,为 Canal 提供详细的同步依据。
创建 Canal 用户并授权:为了让 Canal 能够访问 MySQL 数据库,需要创建一个专门的 Canal 用户,并为其赋予相应的权限。在 MySQL 的命令行或客户端工具中执行以下 SQL 语句:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
上述语句创建了一个名为 “canal” 的用户,并设置其密码为 “canal”,同时赋予该用户在所有数据库和表上的 SELECT、REPLICATION SLAVE 以及 REPLICATION CLIENT 权限。这些权限是 Canal 能够正常连接 MySQL 并获取 binlog 日志所必需的。执行完上述语句后,记得使用FLUSH PRIVILEGES;语句刷新权限,使新的权限设置生效。
(二)Spring Boot 项目配置
添加依赖:在 Spring Boot 项目的 pom.xml 文件中,需要添加 Canal 客户端依赖以及 Elasticsearch 相关依赖。确保 Canal 版本与本地或服务器上启动的 Canal 版本一致,以避免出现兼容性问题。以下是相关依赖的示例代码:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.2</version>
</dependency>
通过添加这些依赖,项目可以引入 Canal 客户端来连接 Canal 服务器并获取数据变更信息,同时引入 Elasticsearch 的高级 REST 客户端,用于将同步的数据写入到 Elasticsearch 中。
新建监听类:创建一个专门的监听类,用于监听 Canal 通道中的 binlog 日志信息,实时捕捉数据库的数据变化。以下是一个示例代码:
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
public class CanalListener {
public void listen() {
// 直接连接Canal
com.alibaba.otter.canal.client.CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "example", "", "");
try {
connector.connect();
connector.subscribe(".*\\..*");// 订阅所有数据库表,可根据需求修改
while (true) {
Message message = connector.get(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
processEntries(message.getEntries());
}
connector.ack(batchId); // 确认消息,小于等于此batchId的Message都会被确认
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void processEntries(java.util.List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry.toString(), e);
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 在这里处理数据变更,将数据同步到Elasticsearch
if (rowData.getIsDdl()) {
// DDL操作处理
} else {
// DML操作处理,如INSERT、UPDATE、DELETE
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
// 获取列名和值,用于构建Elasticsearch文档
}
}
}
}
}
}
在上述代码中,首先通过
CanalConnectors.newSingleConnector方法创建一个 Canal 连接器,连接到本地的 Canal 服务器(地址为 “localhost”,端口为 “11111”),并订阅所有数据库表的变更信息。然后,在一个无限循环中,通过connector.get(100)方法获取一批(最多 100 条)数据变更消息。
如果获取到的消息 ID 为 - 1 或消息列表为空,表示没有新的变更数据,程序会暂停 1 秒后再次尝试获取。如果有新的变更数据,则调用processEntries方法对这些数据进行处理。在processEntries方法中,会遍历每一个数据变更条目,对于非事务开始和结束的条目,解析其RowChange对象,根据是否为 DDL 操作或 DML 操作分别进行处理。
对于 DML 操作,进一步遍历变更后的列数据,获取列名和值,为后续同步到 Elasticsearch 做准备。最后,通过connector.ack(batchId)方法确认已处理的消息批次,确保 Canal 服务器知道哪些消息已经被成功处理。
启动类集成接口:在 Spring Boot 的启动类上集成CommandLineRunner接口,并重写run方法,在run方法中启动 Canal 监听。这样,当 Spring Boot 项目启动时,就会自动开始监听 Canal 通道中的数据变更。以下是启动类的示例代码:
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class YourApplication implements CommandLineRunner {
private final CanalListener canalListener;
public YourApplication(CanalListener canalListener) {
this.canalListener = canalListener;
}
public static void main(String[] args) {
SpringApplication.run(YourApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
canalListener.listen();
}
}
在上述代码中,启动类YourApplication实现了CommandLineRunner接口,在构造函数中注入了CanalListener实例。在run方法中,调用canalListener.listen()方法启动 Canal 监听,开始捕获 MySQL 数据库的变更数据。
同步到 Elasticsearch 的关键要点
(一)异常处理
在数据同步过程中,由于网络波动、系统故障等原因,可能会出现各种异常情况。为了确保数据的一致性,需要增加重试机制或记录详细的错误日志。例如,可以使用 Spring 的RetryTemplate来实现重试逻辑。在捕获到异常时,RetryTemplate会按照预设的重试策略进行多次重试,直到操作成功或达到最大重试次数。同时,记录详细的错误日志,包括异常信息、发生时间、涉及的数据等,便于后续排查问题。这样,即使在同步过程中遇到临时的异常,也能最大程度保证数据的完整性和一致性。
(二)性能优化
为了提高数据同步的性能,减少对系统资源的消耗,可以采用批量处理 Canal 消息的方式,减少对 Redis/ES 的频繁写入。可以设置一个合适的批量处理大小,当收集到一定数量的变更消息后,一次性将这些消息写入到 Elasticsearch 中。这样可以减少网络请求次数,提高写入效率。同时,合理配置 Elasticsearch 的索引设置,如分片数量、副本数量等,根据数据量和查询负载进行优化,以提升整体的读写性能。
(三)数据结构
确保 Elasticsearch 的索引 Mapping 与 MySQL 表结构兼容是实现数据准确同步的重要环节。在创建 Elasticsearch 索引时,需要根据 MySQL 表的字段类型、索引情况等,定义合适的 Mapping。例如,对于 MySQL 中的字符串类型字段,在 Elasticsearch 中可能需要根据字段用途选择合适的文本类型或关键字类型,并设置是否进行分词等。对于数字类型、日期类型等字段,也需要在 Elasticsearch 中进行正确的映射配置,以保证数据在同步后能够正确存储和检索。
(四)事务管理
如果业务对数据一致性有较高要求,需要强一致性保证,可以结合本地事务表或消息队列(如 RocketMQ)做可靠投递。当 MySQL 中发生数据变更时,首先将变更信息记录到本地事务表中,并发送一条包含变更信息的消息到消息队列。Canal 在处理数据变更时,从消息队列中获取消息,并将数据同步到 Elasticsearch。同时,在本地事务表中标记该变更已处理。如果在同步过程中出现异常,可以通过查询本地事务表,重新发送未成功同步的消息,确保数据最终一致性。通过这种方式,能够在复杂的分布式环境下,保障 MySQL 与 Elasticsearch 之间的数据一致性。
总结
通过上述步骤,我们详细介绍了在 Spring Boot3 中使用阿里 Canal 实现 MySQL 数据库与 ElasticSearch 数据同步的方法和关键要点。从 Canal 的原理和优势,到 MySQL 和 Spring Boot 项目的配置,再到同步过程中的异常处理、性能优化、数据结构适配以及事务管理等方面,为互联网软件开发人员提供了一套完整的技术解决方案。在实际应用中,开发者可以根据具体的业务需求和场景,对这些技术进行灵活调整和优化,以构建高效、可靠的数据同步系统,满足不断变化的业务需求。希望本文能够对大家在相关技术领域的实践和探索有所帮助,助力大家在互联网软件开发的道路上取得更好的成果。
猜你喜欢
- 2025-07-15 Stellar Repair for MySQL:受损 MySQL 数据库的专业恢复工具
- 2025-07-15 在CentOS7系统源码安装Nginx+MySQL+PHP+Go
- 2025-07-15 linux通过yum安装nginx和mysql(linux在线安装nginx)
- 2025-07-15 Chat to MySQL 最佳实践:MCP Server 服务调用
- 2025-07-15 安装mysql-8.0.33-linux-glibc2.17-x86_64-minimal.tar.xz
- 2025-07-15 阿里云 RDS MySQL物理备份文件恢复到自建数据库
- 2025-07-15 MySQL 安装全攻略(Windows/Linux/macOS)
- 2025-07-15 使用docker备份mysql数据库(docker 备份 文件夹)
- 2025-07-15 Linux系统安装SQL Server数据库(linux如何安装数据库)
- 2025-07-15 MySQL 8.0——创建并使用数据库、获得数据库和表的信息
你 发表评论:
欢迎- 07-15Rocky Linux Nginx 自动更新免费 SSL,全流程实战
- 07-15nginx-1.22.1在linux服务器上的安装
- 07-15Rocky Linux 9 系统下安装Nginx(在linux中安装nginx)
- 07-15Rocky Linux 9.x 从零安装 Nginx 全流程:源码编译 + dnf 安装方案详解
- 07-15Linux-Nginx-反向代理篇-02(nginx反向代理apache)
- 07-15RockyLinux 9快速部署Nginx+HTTPS(基于DNF安装)
- 07-15Stellar Repair for MySQL:受损 MySQL 数据库的专业恢复工具
- 07-15在CentOS7系统源码安装Nginx+MySQL+PHP+Go
- 最近发表
-
- Rocky Linux Nginx 自动更新免费 SSL,全流程实战
- nginx-1.22.1在linux服务器上的安装
- Rocky Linux 9 系统下安装Nginx(在linux中安装nginx)
- Rocky Linux 9.x 从零安装 Nginx 全流程:源码编译 + dnf 安装方案详解
- Linux-Nginx-反向代理篇-02(nginx反向代理apache)
- RockyLinux 9快速部署Nginx+HTTPS(基于DNF安装)
- Stellar Repair for MySQL:受损 MySQL 数据库的专业恢复工具
- 在CentOS7系统源码安装Nginx+MySQL+PHP+Go
- linux通过yum安装nginx和mysql(linux在线安装nginx)
- Chat to MySQL 最佳实践:MCP Server 服务调用
- 标签列表
-
- 下划线是什么 (87)
- 精美网站 (58)
- qq登录界面 (90)
- nginx 命令 (82)
- nginx .http (73)
- nginx lua (70)
- nginx 重定向 (68)
- Nginx超时 (65)
- nginx 监控 (57)
- odbc (59)
- rar密码破解工具 (62)
- annotation (71)
- 红黑树 (57)
- 智力题 (62)
- php空间申请 (61)
- 按键精灵 注册码 (69)
- 软件测试报告 (59)
- ntcreatefile (64)
- 闪动文字 (56)
- guid (66)
- abap (63)
- mpeg 2 (65)
- column (63)
- dreamweaver教程 (57)
- excel行列转换 (56)
本文暂时没有评论,来添加一个吧(●'◡'●)