自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

MySQL 二進(jìn)制日志 binlog 核心知識點(diǎn)小結(jié)

開發(fā)
本文將從MySQL二進(jìn)制日志bin.log基本概念、工作機(jī)制和一些常見的運(yùn)用場景角度展開探討,希望對你有幫助。

binlog的概念和基本作用

bin log實(shí)際上是一個(gè)物理日志,當(dāng)我們對某個(gè)數(shù)據(jù)頁進(jìn)行修改操作時(shí)我們就會將這個(gè)操作寫到bin log中,當(dāng)我們數(shù)據(jù)庫需要進(jìn)行主備、主從復(fù)制等操作時(shí),都可以基于bin log保證數(shù)據(jù)一致性。

binlog緩沖區(qū)

bin log緩沖區(qū)和我們的redo log和undo log緩沖區(qū)有所不同,redo log和undo log緩存都在存儲引擎的共享緩沖區(qū)緩沖區(qū)buffer pool中,而bin log則是為每個(gè)工作線程獨(dú)立分配一個(gè)內(nèi)存作為bin log緩沖區(qū):

bin log之所以是在每個(gè)線程中,原因有二:

  • 考慮到mysql-server要求所有存儲引擎對于bin.log都是兼容。
  • 將bin_log_buffer設(shè)置在每個(gè)線程/事務(wù)中還能保證并發(fā)操作的性能,避免為各個(gè)線程爭搶臨界緩沖區(qū)的沖突導(dǎo)致并發(fā)性能下降。

binlog對應(yīng)的三種記錄格式

row:這種格式主要用于保證數(shù)據(jù)實(shí)時(shí)性的,例如我們執(zhí)行下面這段SQL

update table set time=now() where id=1;

如果我們將其存到bin log之后很長一段時(shí)間才提交事務(wù),那么時(shí)間就會有所延遲,所以MySQL為了保證數(shù)據(jù)實(shí)時(shí)性,就會將寫入bin log中的SQL用row格式,如下圖所示,可以看到row格式的SQL語句時(shí)間是當(dāng)前時(shí)間的具體值,并且where條件寫死了當(dāng)前條件列,確保數(shù)據(jù)實(shí)時(shí)一致性:

當(dāng)然這樣做的缺點(diǎn)也很明顯,如果涉及大批量操作,那么針對每條數(shù)據(jù)對應(yīng)的都會生成對應(yīng)的row語句,那么對于內(nèi)存的占用就很高,進(jìn)行恢復(fù)和同步時(shí)的IO和SQL執(zhí)行時(shí)間也是非常不友好的。

stament:這種同步策略即執(zhí)行的SQL是什么,對應(yīng)傳輸過去的時(shí)對應(yīng)的語句就是什么樣的,這就會導(dǎo)致我們上文所說的一致性問題:

mixed:這種格式就是為了上述兩種方案的混合體,如果操作可能出現(xiàn)數(shù)據(jù)不一致問題則用row格式,反之使用stament格式。

binlog文件日志格式

我們可以通過下面這條SQL語句看到我們本地的bin log文件:

show binary logs;

輸出結(jié)果如下所示,可以看到bin log的格式基本都是mysql-bin.0000xxx:

mysql-bin.001606 440052 No
mysql-bin.001607 111520 No

binlog是如何完成寫入

當(dāng)我們開始事務(wù)時(shí),將修改寫入bin log cache中,一旦事務(wù)提交,就會將bin log通過write寫入到文件系統(tǒng)緩存的page cache中,然后根據(jù)我們配置的刷盤參數(shù)將cache內(nèi)容調(diào)用操作系統(tǒng)內(nèi)核方法fsync將結(jié)果寫入到bin log 物理文件中:

而調(diào)用系統(tǒng)函數(shù)fsync的實(shí)際是根據(jù)MySQL系統(tǒng)參數(shù)決定的,這個(gè)系統(tǒng)變量查詢SQL如下:

SHOW VARIABLES LIKE 'sync_binlog';

而sync_binlog值分別三種:

  • 0.當(dāng)配置為了0時(shí),每次事務(wù)提交都只會write,fsync調(diào)用時(shí)機(jī)是由系統(tǒng)決定的。
  • 1.當(dāng)配置設(shè)置為1時(shí),每次事務(wù)提交都會調(diào)用fsync。
  • N. 當(dāng)配置為N,代表提交了N個(gè)事務(wù)之后就會將page cache中的數(shù)據(jù)通過fsync進(jìn)行刷盤。

binlog和redolog的區(qū)別

這個(gè)問題我們可以從以下幾個(gè)場景來表述一下:

從使用場景來說:

  • bin log常用于數(shù)據(jù)災(zāi)備或數(shù)據(jù)同步到其他異構(gòu)程序中的場景。
  • redo log常用于故障恢復(fù)保證數(shù)據(jù)持久性。

從數(shù)據(jù)內(nèi)容來說:

  • redo log存儲的物理日志,即修改的數(shù)據(jù)內(nèi)容,對應(yīng)的redo block結(jié)構(gòu)體針對各種偏移量和修改涉及的頁都有及其復(fù)雜的涉及,這里就不多做贅述。
  • bin log則是記錄可以是statment語句也可以是原生修改的row,具體可以通過查看binlog_format知曉。

生成范圍:

  • bin log是MySQL server生成的事務(wù)日志,任何存儲引擎都可以使用
  • redo log只有innodb這個(gè)存儲引擎支持。

(實(shí)踐)基于flink cdc訂閱binlog同步數(shù)據(jù)

接下來我們就基于spring boot演示一下如何基于flink cdc訂閱bin.log完成db庫中的tb_1和tb_2的數(shù)據(jù)訂閱和同步:

之所以筆者使用flink cdc而不是canel大體有以下幾個(gè)原因:

  • flink cdc支持全量和增量同步以及斷點(diǎn)續(xù)傳等功能,尤其是斷點(diǎn)續(xù)傳這一點(diǎn)對于需要保證異構(gòu)數(shù)據(jù)庫的數(shù)據(jù)一致性是非常好的。
  • 性能表現(xiàn)更出色,按照阿里云的說法:

我們將全增量一體化框架與 Debezium 1.6 版本做 簡單的 TPC-DS 讀取測試對比,customer 單表數(shù)據(jù)量 6500 萬,在 Flink CDC 用 8 個(gè)并發(fā)的情況下,吞吐提升了 6.8 倍,耗時(shí)僅 13 分鐘,得益于并發(fā)讀取的支持,如果用戶需要更快的讀取速度,用戶可以增加并發(fā)實(shí)現(xiàn)。

話不說我們給出基礎(chǔ)的集成步驟,首先是引入flink cdc和MySQL的依賴,這里筆者為了文章的簡練只給出的flink cdc相關(guān)的pom依賴:

<properties>
        <flink.version>1.13.6</flink.version>
    </properties>


<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--mysql -cdc-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>

然后我們在yml或者properties文件中給出MySQL配置即可,然后我們聲明一個(gè)CdcInfo記錄從bin.log中同步的數(shù)據(jù):

@Data
publicclass CdcInfo {
    /**
     * 變更前數(shù)據(jù)
     */
    private JSONObject beforeData;
    /**
     * 變更后數(shù)據(jù)
     */
    private JSONObject afterData;

    private String operation;
    /**
     * binlog 文件名
     */
    private String binLogName;
    /**
     * binlog當(dāng)前讀取點(diǎn)位
     */
    private Integer filePos;
    /**
     * 數(shù)據(jù)庫名
     */
    private String dbName;
    /**
     * 表名
     */
    private String tbName;
    /**
     * 變更時(shí)間
     */
    private Long changeTime;

}

然后我們編寫一個(gè)關(guān)于bin.log通知事件的監(jiān)聽,針對flink cdc配置筆者都基于CommandLineRunner 這個(gè)拓展點(diǎn)完成配置,這里面涉及眾多的flink cdc配置參數(shù),可以看到筆者的程序同步模式配置的是initial即啟動后會進(jìn)行全量同步再進(jìn)行增量同步,同時(shí)通過表達(dá)式db.tb_[1-2]+指明僅僅處理tb_1和tb_2表的數(shù)據(jù)更新變化。

@Component
publicclass MysqlCdcEventListener implements CommandLineRunner {

    //數(shù)據(jù)接收器用于應(yīng)用架構(gòu)更改和將更改數(shù)據(jù)寫入外部系統(tǒng)
    privatefinal CdcSink cdcSink;

    public MysqlCdcEventListener(CdcSink cdcSink) {
        this.cdcSink = cdcSink;
    }


    @Override
    public void run(String... args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設(shè)置并行度
        env.setParallelism(Runtime.getRuntime().availableProcessors());


        DebeziumSourceFunction<CdcInfo> debeziumSource = buildDebeziumSource();


        DataStream<CdcInfo> streamSource = env
                .addSource(debeziumSource, "mysql-source")
                .setParallelism(1);
        //將流數(shù)據(jù)交給
        streamSource.addSink(cdcSink);
        env.execute("mysql-stream-cdc");
    }


    /**
     * 構(gòu)造變更數(shù)據(jù)源
     */
    private DebeziumSourceFunction<CdcInfo> buildDebeziumSource() {

        Properties debeziumProperties = new Properties();
        //設(shè)置快照為無鎖
        debeziumProperties.put("snapshot.locking.mode", "none");

        return MySqlSource.<CdcInfo>builder()
                .hostname("xxxx")
                .port(3306)
                .databaseList("db")
                //監(jiān)聽db庫中的[1-2]表
                .tableList("db.tb_[1-2]+")
                .username("xxxx")
                .password("xxxx")
                //設(shè)置為 initial:在第一次啟動時(shí)對受監(jiān)視的數(shù)據(jù)庫表執(zhí)行初始快照,并繼續(xù)讀取最新的 binlog
                .startupOptions(StartupOptions.initial())
                //設(shè)置序列化配置
                .deserializer(new MysqlDeserialization())
                .serverTimeZone("GMT+8")
                .debeziumProperties(debeziumProperties)
                .build();
    }


}
  • 關(guān)于這些配置的信息建議讀者移步官方文檔的說明:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/
  • 相應(yīng)的使用配置示例讀者也可以參考flink cdc對應(yīng)的GitHub上的說明:https://github.com/gunnarmorling/flink-cdc-connectors

上文代碼示例中給出一個(gè)涉及反序列化生產(chǎn)CdcInfo的操作,筆者指明了MysqlDeserialization 這里也給出對應(yīng)的源碼示例:

public class MysqlDeserialization implements DebeziumDeserializationSchema<CdcInfo> {

    publicstaticfinal String TS_MS = "ts_ms";
    publicstaticfinal String BIN_FILE = "file";
    publicstaticfinal String POS = "pos";
    publicstaticfinal String CREATE = "CREATE";
    publicstaticfinal String BEFORE = "before";
    publicstaticfinal String AFTER = "after";
    publicstaticfinal String SOURCE = "source";
    publicstaticfinal String UPDATE = "UPDATE";


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<CdcInfo> collector) {
        //獲取bin.log訂閱到的信息
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        Struct struct = (Struct) sourceRecord.value();
        final Struct source = struct.getStruct(SOURCE);
        CdcInfo tbCdcInfo = new CdcInfo();
        //獲取前后變化數(shù)據(jù)
        tbCdcInfo.setBeforeData(convert2JsonObj(struct, BEFORE));
        tbCdcInfo.setAfterData(convert2JsonObj(struct, AFTER));
        //5.獲取操作類型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toUpperCase();

        tbCdcInfo.setOperation(type);
        tbCdcInfo.setBinLogName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
        tbCdcInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
        tbCdcInfo.setDbName(database);
        tbCdcInfo.setTbName(tableName);
        tbCdcInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
        //7.輸出數(shù)據(jù)
        collector.collect(tbCdcInfo);
    }

    /**
     * 從原始數(shù)據(jù)獲取出變更之前或之后的數(shù)據(jù)
     */
    private JSONObject convert2JsonObj(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }


    @Override
    public TypeInformation<CdcInfo> getProducedType() {
        return TypeInformation.of(CdcInfo.class);
    }
}

此時(shí)我們啟動程序后針對數(shù)據(jù)表進(jìn)行修改操作就會收到數(shù)據(jù)消息的訂閱了:

訂閱到的數(shù)據(jù):CdcInfo(beforeData={"id":1,"name":"xiaoming"}, afterData={"id":1,"name":"xiaoming1"}, operatinotallow=UPDATE, binLogName=binlog.000156, filePos=1256, dbName=db, tbName=tb_2, changeTime=1734622269654)
責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2010-06-09 13:02:29

MySQL啟用二進(jìn)制日

2010-10-13 15:45:23

MySQL二進(jìn)制日志

2021-12-30 08:17:27

Springboot數(shù)據(jù)訪問DataSourceB

2025-01-07 14:10:46

SpringBoot開發(fā)Java

2020-11-06 00:50:16

JavaClassLoaderJVM

2021-01-15 08:35:49

Zookeeper

2020-10-26 10:40:31

Axios前端攔截器

2021-01-06 13:52:19

zookeeper開源分布式

2024-11-04 09:00:00

Java開發(fā)

2025-03-26 11:30:40

2021-04-13 08:25:12

測試開發(fā)Java注解Spring

2024-04-23 14:25:16

Python備忘清單

2020-05-19 14:40:08

Linux互聯(lián)網(wǎng)核心

2018-08-21 10:05:59

MySQLbinlog數(shù)據(jù)庫

2025-05-07 08:55:00

2022-10-29 08:55:19

頁面react

2014-08-06 10:10:52

MariaDB二進(jìn)制日志

2018-03-12 14:33:49

數(shù)據(jù)庫MySQL日志

2022-04-08 07:51:31

JavaJVM垃圾回收

2024-01-31 09:55:53

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號