业务中台 - 数据订阅通知方案
可选方案
1、基于 MySQL 数据库 binlog 的订阅通知
2、基于消息中间件(RabbitMQ、Redis)的广播通知
3、基于 Disruptor 本地队列 + API 调用的接口通知
技术实现
1、基于 binlog 日志的数据变更通知
基于 Canal 实现
1、业务系统,开发 单独的数据订阅处理服务
2、订阅处理服务,集成 canal-spring-boot-starter 进行数据订阅
<!-- For Canal -->
<dependency>
<groupId>com.github.hiwepy</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
</dependency>3、配置 Canal 客户端
################################################################################################################
###Canal (CanalProperties、CanalPulsarProperties、) 基本配置:
################################################################################################################
canal:
# Canal Server 模式。默认为 simple
mode: cluster
async: false
#filter: .*\\.ty_admin.*\\..*
cluster:
instances:
- destination: simple
# Canal Zookeeper 地址
zk-servers: 192.168.3.64:2181
# Canal Server 用户名
username: canal
# Canal Server 密码
password: canal
# Socket 连接超时时间,单位:毫秒。默认为 60000
so-timeout: 6000
# Socket 空闲超时时间,单位:毫秒。默认为 3600000
idle-timeout: 3600000
# 重试次数;设置-1时可以subscribe阻塞等待时优雅停机
retry-times: 3
# 重试间隔时间,单位:毫秒。默认为 1000
retry-interval: 1000
# Simple 客户端模式配置
simple:
instances:
- destination: simple
# Canal Server 主机地址
host: 192.168.3.100
# Canal Server 端口。默认为 11111
port: 32673
# Canal Server 用户名
username: canal
# Canal Server 密码
password: canal
# Socket 连接超时时间,单位:毫秒。默认为 60000
so-timeout: 6000
# Socket 空闲超时时间,单位:毫秒。默认为 3600000
idle-timeout: 3600000
# 重试次数;设置-1时可以subscribe阻塞等待时优雅停机
retry-times: 3
# 重试间隔时间,单位:毫秒。默认为 1000
retry-interval: 10004、编写 CanalEventHandler 处理数据变更实践
import com.alibaba.otter.canal.annotation.CanalEventHandler;
import com.alibaba.otter.canal.annotation.event.OnDeleteEvent;
import com.alibaba.otter.canal.annotation.event.OnInsertEvent;
import com.alibaba.otter.canal.annotation.event.OnUpdateEvent;
import com.alibaba.otter.canal.model.CanalModel;
import com.alibaba.otter.canal.protocol.CanalEntry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
@CanalEventHandler
@Slf4j
public class DemoCanalEventHandler implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
}
@OnInsertEvent(destination = "simple", schema = "ty_evaluation", table = "pj_v2_rule_item")
public void onEventInsertData(CanalModel model, CanalEntry.RowChange rowChange) {
log.info(" database {} table {} insert event", model.getSchema(), model.getTable());
// TODO insert event
}
@OnUpdateEvent(destination = "simple", schema = "ty_evaluation", table = "pj_v2_rule_item")
public void onEventUpdateData(CanalModel model, CanalEntry.RowChange rowChange) {
log.info(" database {} table {} update event", model.getSchema(), model.getTable());
// TODO update event
}
@OnDeleteEvent(destination = "simple", schema = "ty_evaluation", table = "pj_v2_rule_item")
public void onEventDeleteData(CanalEntry.RowChange rowChange, CanalModel model) {
log.info(" database {} table {} delete event", model.getSchema(), model.getTable());
// TODO delete event
}
}基于 mysql-binlog-connector-java 实现
1、业务系统,开发单独的数据订阅处理服务
2、订阅处理服务,集成 mysql-binlog-connector-java 此类可以进行数据订阅的
<!-- For mysql-binlog-connector -->
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.30.1</version>
</dependency>3、本地binlog日志文件处理,可参考 BinaryLogFileReaderTest
import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.util.Collections;
public class BinaryLogFileReaderTest {
private static final String FILE_PATH = "D:/tmp/binglog-000028-2.sql";
public static void main(String[] args) throws Exception {
File binlogFile = new File("E:\\mysql-bin.000028") ;
EventDeserializer eventDeserializer = new EventDeserializer() ;
// 设置兼容性模式
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
try (BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer)) {
for (Event event; (event = reader.readEvent()) != null; ) {
EventData data = event.getData();
// 判断事件的类型
/*if (data instanceof WriteRowsEventData) {
WriteRowsEventData ed = (WriteRowsEventData) data;
System.out.printf("新增事件:%s%n", ed.getIncludedColumns().toString());
List<Serializable[]> rows = ed.getRows();
rows.forEach(row -> {
for (Serializable s : row) {
if (s instanceof byte[]) {
byte[] bs = (byte[]) s;
System.err.print(new String(bs) + "\t");
} else {
System.err.print(s + "\t");
}
}
System.out.println();
});
} else */if (data instanceof QueryEventData) {
QueryEventData ed = (QueryEventData) data;
if (!ed.getDatabase().equals("ty_portfolio_new")) {
continue;
}
if (!ed.getSql().contains("growth_template_page_user")) {
continue;
}
if (!ed.getSql().contains("1873614200041103404")) {
continue;
}
System.out.printf("查询事件:%s%n", ed.getSql());
FileUtils.writeLines(new File(FILE_PATH), Collections.singletonList(ed.getSql()), true);
} else if (data instanceof DeleteRowsEventData) {
DeleteRowsEventData ed = (DeleteRowsEventData) data;
System.err.println("删除事件");
} else if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData ed = (UpdateRowsEventData) data;
System.err.println("更新事件");
} else if (data instanceof RowsQueryEventData) {
RowsQueryEventData ed = (RowsQueryEventData) data;
System.err.println("行查询事件");
} else if (data instanceof TableMapEventData) {
TableMapEventData ed = (TableMapEventData) data;
String database = ed.getDatabase();
String table = ed.getTable();
System.out.printf("数据库: %s, 表名: %s%n", database, table);
}
}
}
}
}4、远程binlog日志文件处理,可参考 BinaryLogClientTest
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import java.io.Serializable;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
public class BinaryLogClientTest {
public static void main(String[] args) throws Exception {
BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "test", "root", "123456");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventHeader header = event.getHeader() ;
switch(header.getEventType()) {
case EXT_WRITE_ROWS:
WriteRowsEventData writeData = event.getData() ;
List<Serializable[]> rows = writeData.getRows() ;
for (Serializable row : rows) {
if (row.getClass().isArray()) {
printRow(row);
}
}
break ;
case EXT_UPDATE_ROWS:
UpdateRowsEventData updateData = event.getData() ;
BitSet columns = updateData.getIncludedColumns() ;
System.err.printf("更新列: %s%n", columns) ;
List<Map.Entry<Serializable[], Serializable[]>> updateRows = updateData.getRows() ;
for (Map.Entry<Serializable[], Serializable[]> entry : updateRows) {
printRow(entry.getKey()) ;
System.out.println(">>>>>>>>>>>>>>>>>>>>>before") ;
printRow(entry.getValue()) ;
System.out.println(">>>>>>>>>>>>>>>>>>>>>after") ;
}
break ;
case EXT_DELETE_ROWS:
DeleteRowsEventData deleteData = event.getData() ;
List<Serializable[]> deleteRow = deleteData.getRows() ;
for (Serializable row : deleteRow) {
if (row.getClass().isArray()) {
printRow(row);
}
}
break ;
case TABLE_MAP:
TableMapEventData data = event.getData() ;
System.out.printf("变更表: %s.%s%n", data.getDatabase(), data.getTable()) ;
break ;
default:
break ;
}
}
private void printRow(Serializable row) {
Serializable[] ss = (Serializable[]) row ;
for (Serializable s : ss) {
if (s.getClass().isArray()) {
System.out.print(new String((byte[])s) + "\t") ;
} else {
System.out.print(s + "\t") ;
}
}
System.out.println() ;
}
});
client.connect();
}
}2、基于消息中间件(RabbitMQ、Redis)的广播通知
业务中台在逻辑代码中增加
3、基于 Disruptor 本地队列 + API 调用的接口通知
方案对比
| 方案 | ||
|---|---|---|
| 基于 MySQL 数据库 binlog 的订阅通知 | ||
| 基于消息中间件(RabbitMQ、Redis)的广播通知 | ||
| 基于 Disruptor 本地队列 + API 调用的接口通知 |
作者:杭州天音 创建时间:2025-01-22 10:30
最后编辑:杭州天音 更新时间:2025-06-09 11:42
最后编辑:杭州天音 更新时间:2025-06-09 11:42