业务中台 - 数据订阅通知方案

可选方案

1、基于 MySQL 数据库 binlog 的订阅通知

2、基于消息中间件(RabbitMQ、Redis)的广播通知

3、基于 Disruptor 本地队列 + API 调用的接口通知

技术实现

1、基于 binlog 日志的数据变更通知

基于 Canal 实现

1、业务系统,开发 单独的数据订阅处理服务
2、订阅处理服务,集成 canal-spring-boot-starter 进行数据订阅

<!-- For Canal -->
<dependency>
    <groupId>com.github.hiwepy</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
</dependency>

3、配置 Canal 客户端

################################################################################################################
###Canal (CanalProperties、CanalPulsarProperties、) 基本配置:
################################################################################################################
canal:
  # Canal Server 模式。默认为 simple
  mode: cluster
  async: false
  #filter: .*\\.ty_admin.*\\..*
  cluster:
    instances:
      - destination: simple
        # Canal Zookeeper 地址
        zk-servers: 192.168.3.64:2181
        # Canal Server 用户名
        username: canal
        # Canal Server 密码
        password: canal
        # Socket 连接超时时间,单位:毫秒。默认为 60000
        so-timeout: 6000
        # Socket 空闲超时时间,单位:毫秒。默认为 3600000
        idle-timeout: 3600000
        # 重试次数;设置-1时可以subscribe阻塞等待时优雅停机
        retry-times: 3
        # 重试间隔时间,单位:毫秒。默认为 1000
        retry-interval: 1000
  # Simple 客户端模式配置
  simple:
    instances:
      - destination: simple
        # Canal Server 主机地址
        host: 192.168.3.100
        # Canal Server 端口。默认为 11111
        port: 32673
        # Canal Server 用户名
        username: canal
        # Canal Server 密码
        password: canal
        # Socket 连接超时时间,单位:毫秒。默认为 60000
        so-timeout: 6000
        # Socket 空闲超时时间,单位:毫秒。默认为 3600000
        idle-timeout: 3600000
        # 重试次数;设置-1时可以subscribe阻塞等待时优雅停机
        retry-times: 3
        # 重试间隔时间,单位:毫秒。默认为 1000
        retry-interval: 1000

4、编写 CanalEventHandler 处理数据变更实践

import com.alibaba.otter.canal.annotation.CanalEventHandler;
import com.alibaba.otter.canal.annotation.event.OnDeleteEvent;
import com.alibaba.otter.canal.annotation.event.OnInsertEvent;
import com.alibaba.otter.canal.annotation.event.OnUpdateEvent;
import com.alibaba.otter.canal.model.CanalModel;
import com.alibaba.otter.canal.protocol.CanalEntry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;

@CanalEventHandler
@Slf4j
public class DemoCanalEventHandler implements InitializingBean {

    @Override
    public void afterPropertiesSet() throws Exception {
    }

    @OnInsertEvent(destination = "simple", schema = "ty_evaluation", table = "pj_v2_rule_item")
    public void onEventInsertData(CanalModel model, CanalEntry.RowChange rowChange) {
        log.info(" database {} table {} insert event", model.getSchema(), model.getTable());
        // TODO insert event
    }

    @OnUpdateEvent(destination = "simple", schema = "ty_evaluation", table = "pj_v2_rule_item")
    public void onEventUpdateData(CanalModel model, CanalEntry.RowChange rowChange) {
        log.info(" database {} table {} update event", model.getSchema(), model.getTable());
        // TODO update event
    }

    @OnDeleteEvent(destination = "simple", schema = "ty_evaluation", table = "pj_v2_rule_item")
    public void onEventDeleteData(CanalEntry.RowChange rowChange, CanalModel model) {
        log.info(" database {} table {} delete event", model.getSchema(), model.getTable());
        // TODO delete event
    }
}
基于 mysql-binlog-connector-java 实现

1、业务系统,开发单独的数据订阅处理服务
2、订阅处理服务,集成 mysql-binlog-connector-java 此类可以进行数据订阅的

<!-- For mysql-binlog-connector -->
<dependency>
    <groupId>com.zendesk</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.30.1</version>
</dependency>

3、本地binlog日志文件处理,可参考 BinaryLogFileReaderTest

import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.util.Collections;

public class BinaryLogFileReaderTest {

    private static final String FILE_PATH = "D:/tmp/binglog-000028-2.sql";


    public static void main(String[] args) throws Exception {

        File binlogFile = new File("E:\\mysql-bin.000028") ;
        EventDeserializer eventDeserializer = new EventDeserializer() ;
        // 设置兼容性模式
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
        try (BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer)) {
            for (Event event; (event = reader.readEvent()) != null; ) {
                EventData data = event.getData();
                // 判断事件的类型
                /*if (data instanceof WriteRowsEventData) {
                    WriteRowsEventData ed = (WriteRowsEventData) data;
                    System.out.printf("新增事件:%s%n", ed.getIncludedColumns().toString());
                    List<Serializable[]> rows = ed.getRows();
                    rows.forEach(row -> {
                        for (Serializable s : row) {
                            if (s instanceof byte[]) {
                                byte[] bs = (byte[]) s;
                                System.err.print(new String(bs) + "\t");
                            } else {
                                System.err.print(s + "\t");
                            }
                        }
                        System.out.println();
                    });
                } else */if (data instanceof QueryEventData) {
                    QueryEventData ed = (QueryEventData) data;
                    if (!ed.getDatabase().equals("ty_portfolio_new")) {
                        continue;
                    }
                    if (!ed.getSql().contains("growth_template_page_user")) {
                        continue;
                    }
                    if (!ed.getSql().contains("1873614200041103404")) {
                        continue;
                    }
                    System.out.printf("查询事件:%s%n", ed.getSql());
                    FileUtils.writeLines(new File(FILE_PATH), Collections.singletonList(ed.getSql()), true);
                } else if (data instanceof DeleteRowsEventData) {
                    DeleteRowsEventData ed = (DeleteRowsEventData) data;
                    System.err.println("删除事件");
                } else if (data instanceof UpdateRowsEventData) {
                    UpdateRowsEventData ed = (UpdateRowsEventData) data;
                    System.err.println("更新事件");
                } else if (data instanceof RowsQueryEventData) {
                    RowsQueryEventData ed = (RowsQueryEventData) data;
                    System.err.println("行查询事件");
                } else if (data instanceof TableMapEventData) {
                    TableMapEventData ed = (TableMapEventData) data;
                    String database = ed.getDatabase();
                    String table = ed.getTable();
                    System.out.printf("数据库: %s, 表名: %s%n", database, table);
                }
            }
        }
    }
}

4、远程binlog日志文件处理,可参考 BinaryLogClientTest

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

import java.io.Serializable;
import java.util.BitSet;
import java.util.List;
import java.util.Map;

public class BinaryLogClientTest {

    public static void main(String[] args) throws Exception {

        BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "test", "root", "123456");
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(new BinaryLogClient.EventListener() {
            @Override
            public void onEvent(Event event) {
                EventHeader header = event.getHeader() ;
                switch(header.getEventType()) {
                    case EXT_WRITE_ROWS:
                        WriteRowsEventData writeData = event.getData() ;
                        List<Serializable[]> rows = writeData.getRows() ;
                        for (Serializable row : rows) {
                            if (row.getClass().isArray()) {
                                printRow(row);
                            }
                        }
                        break ;
                    case EXT_UPDATE_ROWS:
                        UpdateRowsEventData updateData = event.getData() ;
                        BitSet columns = updateData.getIncludedColumns() ;
                        System.err.printf("更新列: %s%n", columns) ;
                        List<Map.Entry<Serializable[], Serializable[]>> updateRows = updateData.getRows() ;
                        for (Map.Entry<Serializable[], Serializable[]> entry : updateRows) {
                            printRow(entry.getKey()) ;
                            System.out.println(">>>>>>>>>>>>>>>>>>>>>before") ;
                            printRow(entry.getValue()) ;
                            System.out.println(">>>>>>>>>>>>>>>>>>>>>after") ;
                        }
                        break ;
                    case EXT_DELETE_ROWS:
                        DeleteRowsEventData deleteData = event.getData() ;
                        List<Serializable[]> deleteRow = deleteData.getRows() ;
                        for (Serializable row : deleteRow) {
                            if (row.getClass().isArray()) {
                                printRow(row);
                            }
                        }
                        break ;
                    case TABLE_MAP:
                        TableMapEventData data = event.getData() ;
                        System.out.printf("变更表: %s.%s%n", data.getDatabase(), data.getTable()) ;
                        break ;
                    default:
                        break ;
                }
            }
            private void printRow(Serializable row) {
                Serializable[] ss = (Serializable[]) row ;
                for (Serializable s : ss) {
                    if (s.getClass().isArray()) {
                        System.out.print(new String((byte[])s) + "\t") ;
                    } else {
                        System.out.print(s + "\t") ;
                    }
                }
                System.out.println() ;
            }
        });
        client.connect();
    }
}

2、基于消息中间件(RabbitMQ、Redis)的广播通知

业务中台在逻辑代码中增加

3、基于 Disruptor 本地队列 + API 调用的接口通知

方案对比

方案
基于 MySQL 数据库 binlog 的订阅通知
基于消息中间件(RabbitMQ、Redis)的广播通知
基于 Disruptor 本地队列 + API 调用的接口通知
作者:杭州天音  创建时间:2025-01-22 10:30
最后编辑:杭州天音  更新时间:2025-06-09 11:42