MySQL同步ES的六種方案!
引言
在分布式架構中,MySQL與Elasticsearch(ES)的協(xié)同已成為解決高并發(fā)查詢與復雜檢索的標配組合。
然而,如何實現(xiàn)兩者間的高效數(shù)據(jù)同步,是架構設計中繞不開的難題。
這篇文章跟大家一起聊聊MySQL同步ES的6種主流方案,結合代碼示例與場景案例,幫助開發(fā)者避開常見陷阱,做出最優(yōu)技術選型。
方案一:同步雙寫
場景:適用于對數(shù)據(jù)實時性要求極高,且業(yè)務邏輯簡單的場景,如金融交易記錄同步。
在業(yè)務代碼中同時寫入MySQL與ES。
代碼如下:
@Transactional
public void createOrder(Order order) {
// 寫入MySQL
orderMapper.insert(order);
// 同步寫入ES
IndexRequest request = new IndexRequest("orders")
.id(order.getId())
.source(JSON.toJSONString(order), XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
}
痛點:
- 硬編碼侵入:所有涉及寫操作的地方均需添加ES寫入邏輯。
- 性能瓶頸:雙寫操作導致事務時間延長,TPS下降30%以上。
- 數(shù)據(jù)一致性風險:若ES寫入失敗,需引入補償機制(如本地事務表+定時重試)。
方案二:異步雙寫
場景:電商訂單狀態(tài)更新后需同步至ES供客服系統(tǒng)檢索。
我們可以使用MQ進行解耦。
架構圖如下:
代碼示例如下:
// 生產(chǎn)者端
public void updateProduct(Product product) {
productMapper.update(product);
kafkaTemplate.send("product-update", product.getId());
}
// 消費者端
@KafkaListener(topics = "product-update")
public void syncToEs(String productId) {
Product product = productMapper.selectById(productId);
esClient.index(product);
}
優(yōu)勢:
- 吞吐量提升:通過MQ削峰填谷,可承載萬級QPS。
- 故障隔離:ES宕機不影響主業(yè)務鏈路。
缺陷:
- 消息堆積:突發(fā)流量可能導致消費延遲(需監(jiān)控Lag值)。
- 順序性問題:需通過分區(qū)鍵保證同一數(shù)據(jù)的順序消費。
方案三:Logstash定時拉取
場景:用戶行為日志的T+1分析場景。
該方案低侵入但高延遲。
配置示例如下:
input {
jdbc{
jdbc_driver=>"com.mysql.jdbc.Driver"
jdbc_url=>"jdbc:mysql://localhost:3306/log_db"
schedule=>"*/5 * * * *"# 每5分鐘執(zhí)行
statement=>"SELECT * FROM user_log WHERE update_time > :sql_last_value"
}
}
output{
elasticsearch{
hosts=>["es-host:9200"]
index=>"user_logs"
}
}
適用性分析:
- 優(yōu)點:零代碼改造,適合歷史數(shù)據(jù)遷移。
- 致命傷:
- 分鐘級延遲(無法滿足實時搜索)
- 全表掃描壓力大(需優(yōu)化增量字段索引)
方案四:Canal監(jiān)聽Binlog
場景:社交平臺動態(tài)實時搜索(如微博熱搜更新)。技術棧:Canal + RocketMQ + ES
該方案高實時,并且低侵入。
架構流程如下:
關鍵配置:
# canal.properties
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=canal.es.sync
避坑指南:
- 數(shù)據(jù)漂移:需處理DDL變更(通過Schema Registry管理映射)。
- 冪等消費:通過
_id
唯一鍵避免重復寫入。
方案五:DataX批量同步
場景:將歷史訂單數(shù)據(jù)從分庫分表MySQL遷移至ES。
該方案是大數(shù)據(jù)遷移的首選。
配置文件如下:
{
"job":{
"content":[{
"reader":{
"name":"mysqlreader",
"parameter":{"splitPk":"id","querySql":"SELECT * FROM orders"}
},
"writer":{
"name":"elasticsearchwriter",
"parameter":{"endpoint":"http://es-host:9200","index":"orders"}
}
}]
}
}
性能調優(yōu):
- 調整
channel
數(shù)提升并發(fā)(建議與分片數(shù)對齊) - 啟用
limit
分批查詢避免OOM
方案六:Flink流處理
場景:商品價格變更時,需關聯(lián)用戶畫像計算實時推薦評分。
該方案適合于復雜的ETL場景。
代碼片段如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CanalSource())
.map(record -> parseToPriceEvent(record))
.keyBy(event -> event.getProductId())
.connect(userProfileBroadcastStream)
.process(new PriceRecommendationProcess())
.addSink(new ElasticsearchSink());
優(yōu)勢:
- 狀態(tài)管理:精準處理亂序事件(Watermark機制)
- 維表關聯(lián):通過Broadcast State實現(xiàn)實時畫像關聯(lián)
總結:
對于文章上面給出的這6種技術方案,我們在實際工作中,該如何做選型呢?
下面用一張表格做對比:
方案 | 實時性 | 侵入性 | 復雜度 | 適用階段 |
同步雙寫 | 秒級 | 高 | 低 | 小型單體項目 |
MQ異步 | 秒級 | 中 | 中 | 中型分布式系統(tǒng) |
Logstash | 分鐘級 | 無 | 低 | 離線分析 |
Canal | 毫秒級 | 無 | 高 | 高并發(fā)生產(chǎn)環(huán)境 |
DataX | 小時級 | 無 | 中 | 歷史數(shù)據(jù)遷移 |
Flink | 毫秒級 | 低 | 極高 | 實時數(shù)倉 |
建議:
- 若團隊無運維中間件能力 → 選擇Logstash或同步雙寫
- 需秒級延遲且允許改造 → MQ異步 + 本地事務表
- 追求極致實時且資源充足 → Canal + Flink雙保險