Apache Flink 漫談系列(09) - JOIN 算子
聊什么
在《Apache Flink 漫談系列 - SQL概覽》中我們介紹了JOIN算子的語義和基本的使用方式,介紹過程中大家發(fā)現(xiàn)Apache Flink在語法語義上是遵循ANSI-SQL標(biāo)準(zhǔn)的,那么再深思一下傳統(tǒng)數(shù)據(jù)庫為啥需要有JOIN算子呢?在實(shí)現(xiàn)原理上面Apache Flink內(nèi)部實(shí)現(xiàn)和傳統(tǒng)數(shù)據(jù)庫有什么區(qū)別呢?本篇將詳盡的為大家介紹傳統(tǒng)數(shù)據(jù)庫為什么需要JOIN算子,以及JOIN算子在Apache Flink中的底層實(shí)現(xiàn)原理和在實(shí)際使用中的優(yōu)化!
什么是JOIN
在《Apache Flink 漫談系列 - SQL概覽》中我對JOIN算子有過簡單的介紹,這里我們以具體實(shí)例的方式讓大家對JOIN算子加深印象。JOIN的本質(zhì)是分別從N(N>=1)張表中獲取不同的字段,進(jìn)而得到最完整的記錄行。比如我們有一個(gè)查詢需求:在學(xué)生表(學(xué)號,姓名,性別),課程表(課程號,課程名,學(xué)分)和成績表(學(xué)號,課程號,分?jǐn)?shù))中查詢所有學(xué)生的姓名,課程名和考試分?jǐn)?shù)。如下:
為啥需要JOIN
JOIN的本質(zhì)是數(shù)據(jù)拼接,那么如果我們將所有數(shù)據(jù)列存儲(chǔ)在一張大表中,是不是就不需要JOIN了呢?如果真的能將所需的數(shù)據(jù)都在一張表存儲(chǔ),我想就真的不需要JOIN的算子了,但現(xiàn)實(shí)業(yè)務(wù)中真的能做到將所需數(shù)據(jù)放到同一張大表里面嗎?答案是否定的,核心原因有2個(gè):
(1)產(chǎn)生數(shù)據(jù)的源頭可能不是一個(gè)系統(tǒng);
(2)產(chǎn)生數(shù)據(jù)的源頭是同一個(gè)系統(tǒng),但是數(shù)據(jù)冗余的沉重代價(jià),迫使我們會(huì)遵循數(shù)據(jù)庫范式,進(jìn)行表的設(shè)計(jì)。簡說NF如下:
- 1NF - 列不可再分;
- 2NF - 符合1NF,并且非主鍵屬性全部依賴于主鍵屬性;
- 3NF - 符合2NF,并且消除傳遞依賴,即:任何字段不能由其他字段派生出來;
- BCNF - 符合3NF,并且主鍵屬性之間無依賴關(guān)系。
當(dāng)然還有 4NF,5NF,不過在實(shí)際的數(shù)據(jù)庫設(shè)計(jì)過程中做到BCNF已經(jīng)足夠了!(并非否定4NF,5NF存在的意義,只是個(gè)人還沒有遇到一定要用4NF,5NF的場景,設(shè)計(jì)往往會(huì)按存儲(chǔ)成本,查詢性能等綜合因素考量)
JOIN種類
JOIN 在傳統(tǒng)數(shù)據(jù)庫中有如下分類:
(1)CROSS JOIN - 交叉連接,計(jì)算笛卡兒積;
(2)INNER JOIN - 內(nèi)連接,返回滿足條件的記錄;
(3)OUTER JOIN
- LEFT - 返回左表所有行,右表不存在補(bǔ)NULL;
- RIGHT - 返回右表所有行,左邊不存在補(bǔ)NULL;
- FULL - 返回左表和右表的并集,不存在一邊補(bǔ)NULL;
(4)SELF JOIN - 自連接,將表查詢時(shí)候命名不同的別名。
JOIN語法
JOIN 在SQL89和SQL92中有不同的語法,以INNER JOIN為例說明:
- SQL89 - 表之間用“,”逗號分割,鏈接條件和過濾條件都在Where子句指定:
- SELECT
- a.colA,
- b.colA
- FROM
- tab1 AS a , tab2 AS b
- WHERE a.id = b.id and a.other > b.other
- SELECT
- a.colA,
- b.colA
- FROM
- tab1 AS a JOIN tab2 AS b ON a.id = b.id
- WHERE
- a.other > b.other
本篇中的后續(xù)示例將應(yīng)用SQL92語法進(jìn)行SQL的編寫,語法如下:
- tableExpression [ LEFT|RIGHT|FULL|INNER|SELF ] JOIN tableExpression [ ON joinCondition ] [WHERE filterCondition]
語義示例說明
在《Apache Flink 漫談系列 - SQL概覽》中對JOIN語義有過簡單介紹,這里會(huì)進(jìn)行展開介紹。 我們以開篇示例中的三張表:學(xué)生表(學(xué)號,姓名,性別),課程表(課程號,課程名,學(xué)分)和成績表(學(xué)號,課程號,分?jǐn)?shù))來介紹各種JOIN的語義。
1. CROSS JOIN
交叉連接會(huì)對兩個(gè)表進(jìn)行笛卡爾積,也就是LEFT表的每一行和RIGHT表的所有行進(jìn)行聯(lián)接,因此生成結(jié)果表的行數(shù)是兩個(gè)表行數(shù)的乘積,如student和course表的CROSS JOIN結(jié)果如下:
- mysql> SELECT * FROM student JOIN course;
- +------+-------+------+-----+-------+--------+
- | no | name | sex | no | name | credit |
- +------+-------+------+-----+-------+--------+
- | S001 | Sunny | M | C01 | Java | 2 |
- | S002 | Tom | F | C01 | Java | 2 |
- | S003 | Kevin | M | C01 | Java | 2 |
- | S001 | Sunny | M | C02 | Blink | 3 |
- | S002 | Tom | F | C02 | Blink | 3 |
- | S003 | Kevin | M | C02 | Blink | 3 |
- | S001 | Sunny | M | C03 | Spark | 3 |
- | S002 | Tom | F | C03 | Spark | 3 |
- | S003 | Kevin | M | C03 | Spark | 3 |
- +------+-------+------+-----+-------+--------+
- 9 rows in set (0.00 sec)
如上結(jié)果我們得到9行=student(3) x course(3)。交叉聯(lián)接一般會(huì)消耗較大的資源,也被很多用戶質(zhì)疑交叉聯(lián)接存在的意義?(任何時(shí)候我們都有質(zhì)疑的權(quán)利,同時(shí)也建議我們養(yǎng)成自己質(zhì)疑自己“質(zhì)疑”的習(xí)慣,就像小時(shí)候不理解父母的“廢話”一樣)。
我們以開篇的示例說明交叉聯(lián)接的巧妙之一,開篇中我們的查詢需求是:在學(xué)生表(學(xué)號,姓名,性別),課程表(課程號,課程名,學(xué)分)和成績表(學(xué)號,課程號,分?jǐn)?shù))中查詢所有學(xué)生的姓名,課程名和考試分?jǐn)?shù)。開篇中的SQL語句得到的結(jié)果如下:
- mysql> SELECT
- -> student.name, course.name, score
- -> FROM student JOIN score ON student.no = score.s_no
- -> JOIN course ON score.c_no = course.no;
- +-------+-------+-------+
- | name | name | score |
- +-------+-------+-------+
- | Sunny | Java | 80 |
- | Sunny | Blink | 98 |
- | Sunny | Spark | 76 |
- | Kevin | Java | 78 |
- | Kevin | Blink | 88 |
- | Kevin | Spark | 68 |
- +-------+-------+-------+
- 6 rows in set (0.00 sec)
如上INNER JOIN的結(jié)果我們發(fā)現(xiàn)少了Tom同學(xué)的成績,原因是Tom同學(xué)沒有參加考試,在score表中沒有Tom的成績,但是我們可能希望雖然Tom沒有參加考試但仍然希望Tom的成績能夠在查詢結(jié)果中顯示(成績 0 分),面對這樣的需求,我們怎么處理呢?交叉聯(lián)接可以幫助我們:
- ***步 student和course 進(jìn)行交叉聯(lián)接:
- mysql> SELECT
- -> stu.no, c.no, stu.name, c.name
- -> FROM student stu JOIN course c 笛卡爾積
- -> ORDER BY stu.no; -- 排序只是方便大家查看:)
- +------+-----+-------+-------+
- | no | no | name | name |
- +------+-----+-------+-------+
- | S001 | C03 | Sunny | Spark |
- | S001 | C01 | Sunny | Java |
- | S001 | C02 | Sunny | Blink |
- | S002 | C03 | Tom | Spark |
- | S002 | C01 | Tom | Java |
- | S002 | C02 | Tom | Blink |
- | S003 | C02 | Kevin | Blink |
- | S003 | C03 | Kevin | Spark |
- | S003 | C01 | Kevin | Java |
- +------+-----+-------+-------+
- 9 rows in set (0.00 sec)
- mysql> SELECT
- -> stu.no, c.no, stu.name, c.name,
- -> CASE
- -> WHEN s.score IS NULL THEN 0
- -> ELSE s.score
- -> END AS score
- -> FROM student stu JOIN course c -- 迪卡爾積
- -> LEFT JOIN score s ON sstu.no = s.s_no and c.no = s.c_no -- LEFT OUTER JOIN
- -> ORDER BY stu.no; -- 排序只是為了大家好看一點(diǎn):)
- +------+-----+-------+-------+-------+
- | no | no | name | name | score |
- +------+-----+-------+-------+-------+
- | S001 | C03 | Sunny | Spark | 76 |
- | S001 | C01 | Sunny | Java | 80 |
- | S001 | C02 | Sunny | Blink | 98 |
- | S002 | C02 | Tom | Blink | 0 | -- TOM 雖然沒有參加考試,但是仍然看到他的信息
- | S002 | C03 | Tom | Spark | 0 |
- | S002 | C01 | Tom | Java | 0 |
- | S003 | C02 | Kevin | Blink | 88 |
- | S003 | C03 | Kevin | Spark | 68 |
- | S003 | C01 | Kevin | Java | 78 |
- +------+-----+-------+-------+-------+
- 9 rows in set (0.00 sec)
經(jīng)過CROSS JOIN幫我們將Tom的信息也查詢出來了!(TOM 雖然沒有參加考試,但是仍然看到他的信息)
2. INNER JOIN
內(nèi)聯(lián)接在SQL92中 ON 表示聯(lián)接添加,可選的WHERE子句表示過濾條件,如開篇的示例就是一個(gè)多表的內(nèi)聯(lián)接,我們在看一個(gè)簡單的示例: 查詢成績大于80分的學(xué)生學(xué)號,學(xué)生姓名和成績:
- mysql> SELECT
- -> stu.no, stu.name , s.score
- -> FROM student stu JOIN score s ON sstu.no = s.s_no
- -> WHERE s.score > 80;
- +------+-------+-------+
- | no | name | score |
- +------+-------+-------+
- | S001 | Sunny | 98 |
- | S003 | Kevin | 88 |
- +------+-------+-------+
- 2 rows in set (0.00 sec)
上面按語義的邏輯是:
- ***步:先進(jìn)行student和score的內(nèi)連接,如下:
- mysql> SELECT
- -> stu.no, stu.name , s.score
- -> FROM student stu JOIN score s ON sstu.no = s.s_no ;
- +------+-------+-------+
- | no | name | score |
- +------+-------+-------+
- | S001 | Sunny | 80 |
- | S001 | Sunny | 98 |
- | S001 | Sunny | 76 |
- | S003 | Kevin | 78 |
- | S003 | Kevin | 88 |
- | S003 | Kevin | 68 |
- +------+-------+-------+
- 6 rows in set (0.00 sec)
- -> WHERE s.score > 80;
- +------+-------+-------+
- | no | name | score |
- +------+-------+-------+
- | S001 | Sunny | 98 |
- | S003 | Kevin | 88 |
- +------+-------+-------+
- 2 rows in set (0.00 sec)
上面的查詢過程符合語義,但是如果在filter條件能過濾很多數(shù)據(jù)的時(shí)候,先進(jìn)行數(shù)據(jù)的過濾,在進(jìn)行內(nèi)聯(lián)接會(huì)獲取更好的性能,比如我們手工寫一下:
- mysql> SELECT
- -> no, name , score
- -> FROM student stu JOIN ( SELECT s_no, score FROM score s WHERE s.score >80) as sc ON no = s_no;
- +------+-------+-------+
- | no | name | score |
- +------+-------+-------+
- | S001 | Sunny | 98 |
- | S003 | Kevin | 88 |
- +------+-------+-------+
- 2 rows in set (0.00 sec)
上面寫法語義和***種寫法語義一致,得到相同的查詢結(jié)果,上面查詢過程是:
- ***步:執(zhí)行過濾子查詢
- mysql> SELECT s_no, score FROM score s WHERE s.score >80;
- +------+-------+
- | s_no | score |
- +------+-------+
- | S001 | 98 |
- | S003 | 88 |
- +------+-------+
- 2 rows in set (0.00 sec)
- -> ON no = s_no;
- +------+-------+-------+
- | no | name | score |
- +------+-------+-------+
- | S001 | Sunny | 98 |
- | S003 | Kevin | 88 |
- +------+-------+-------+
- 2 rows in set (0.00 sec)
如上兩種寫法在語義上一致,但查詢性能在數(shù)量很大的情況下會(huì)有很大差距。上面為了和大家演示相同的查詢語義,可以有不同的查詢方式,不同的執(zhí)行計(jì)劃。實(shí)際上數(shù)據(jù)庫本身的優(yōu)化器會(huì)自動(dòng)進(jìn)行查詢優(yōu)化,在內(nèi)聯(lián)接中ON的聯(lián)接條件和WHERE的過濾條件具有相同的優(yōu)先級,具體的執(zhí)行順序可以由數(shù)據(jù)庫的優(yōu)化器根據(jù)性能消耗決定。也就是說物理執(zhí)行計(jì)劃可以先執(zhí)行過濾條件進(jìn)行查詢優(yōu)化,如果細(xì)心的讀者可能發(fā)現(xiàn),在第二個(gè)寫法中,子查詢我們不但有行的過濾,也進(jìn)行了列的裁剪(去除了對查詢結(jié)果沒有用的c_no列),這兩個(gè)變化實(shí)際上對應(yīng)了數(shù)據(jù)庫中兩個(gè)優(yōu)化規(guī)則:
- filter push down
- project push down
如上優(yōu)化規(guī)則以filter push down 為例,示意優(yōu)化器對執(zhí)行plan的優(yōu)化變動(dòng):
3. LEFT OUTER JOIN
左外聯(lián)接語義是返回左表所有行,右表不存在補(bǔ)NULL,為了演示作用,我們查詢沒有參加考試的所有學(xué)生的成績單:
- mysql> SELECT
- -> no, name , s.c_no, s.score
- -> FROM student stu LEFT JOIN score s ON sstu.no = s.s_no
- -> WHERE s.score is NULL;
- +------+------+------+-------+
- | no | name | c_no | score |
- +------+------+------+-------+
- | S002 | Tom | NULL | NULL |
- +------+------+------+-------+
- 1 row in set (0.00 sec)
上面查詢的執(zhí)行邏輯上也是分成兩步:
- ***步:左外聯(lián)接查詢
- mysql> SELECT
- -> no, name , s.c_no, s.score
- -> FROM student stu LEFT JOIN score s ON sstu.no = s.s_no;
- +------+-------+------+-------+
- | no | name | c_no | score |
- +------+-------+------+-------+
- | S001 | Sunny | C01 | 80 |
- | S001 | Sunny | C02 | 98 |
- | S001 | Sunny | C03 | 76 |
- | S002 | Tom | NULL | NULL | -- 右表不存在的補(bǔ)NULL
- | S003 | Kevin | C01 | 78 |
- | S003 | Kevin | C02 | 88 |
- | S003 | Kevin | C03 | 68 |
- +------+-------+------+-------+
- 7 rows in set (0.00 sec)
- mysql> SELECT
- -> no, name , s.c_no, s.score
- -> FROM student stu LEFT JOIN score s ON sstu.no = s.s_no
- -> WHERE s.score is NULL;
- +------+------+------+-------+
- | no | name | c_no | score |
- +------+------+------+-------+
- | S002 | Tom | NULL | NULL |
- +------+------+------+-------+
- 1 row in set (0.00 sec)
這兩個(gè)過程和上面分析的INNER JOIN一樣,但是這時(shí)候能否利用上面說的 filter push down的優(yōu)化呢?根據(jù)LEFT OUTER JOIN的語義來講,答案是否定的。我們手工操作看一下:
- ***步:先進(jìn)行過濾查詢(獲得一個(gè)空表)
- mysql> SELECT * FROM score s WHERE s.score is NULL;
- Empty set (0.00 sec)
- mysql> SELECT
- -> no, name , s.c_no, s.score
- -> FROM student stu LEFT JOIN (SELECT * FROM score s WHERE s.score is NULL) AS s ON sstu.no = s.s_no;
- +------+-------+------+-------+
- | no | name | c_no | score |
- +------+-------+------+-------+
- | S001 | Sunny | NULL | NULL |
- | S002 | Tom | NULL | NULL |
- | S003 | Kevin | NULL | NULL |
- +------+-------+------+-------+
- 3 rows in set (0.00 sec)
我們發(fā)現(xiàn)兩種寫法的結(jié)果不一致,***種寫法只返回Tom沒有參加考試,是我們預(yù)期的。第二種寫法返回了Sunny,Tom和Kevin三名同學(xué)都沒有參加考試,這明顯是非預(yù)期的查詢結(jié)果。所有LEFT OUTER JOIN不能利用INNER JOIN的 filter push down優(yōu)化。
4. RIGHT OUTER JOIN
右外鏈接語義是返回右表所有行,左邊不存在補(bǔ)NULL,如下:
- mysql> SELECT
- -> s.c_no, s.score, no, name
- -> FROM score s RIGHT JOIN student stu ON sstu.no = s.s_no;
- +------+-------+------+-------+
- | c_no | score | no | name |
- +------+-------+------+-------+
- | C01 | 80 | S001 | Sunny |
- | C02 | 98 | S001 | Sunny |
- | C03 | 76 | S001 | Sunny |
- | NULL | NULL | S002 | Tom | -- 左邊沒有的進(jìn)行補(bǔ) NULL
- | C01 | 78 | S003 | Kevin |
- | C02 | 88 | S003 | Kevin |
- | C03 | 68 | S003 | Kevin |
- +------+-------+------+-------+
- 7 rows in set (0.00 sec)
上面右外鏈接我只是將上面左外鏈接查詢的左右表交換了一下:)。
5. FULL OUTER JOIN
全外鏈接語義返回左表和右表的并集,不存在一邊補(bǔ)NULL,用于演示的MySQL數(shù)據(jù)庫不支持FULL OUTER JOIN。這里不做演示了。
6. SELF JOIN
上面介紹的INNER JOIN、OUTER JOIN都是不同表之間的聯(lián)接查詢,自聯(lián)接是一張表以不同的別名做為左右兩個(gè)表,可以進(jìn)行如上的INNER JOIN和OUTER JOIN。如下看一個(gè)INNER 自聯(lián)接:
- mysql> SELECT * FROM student l JOIN student r where l.no = r.no;
- +------+-------+------+------+-------+------+
- | no | name | sex | no | name | sex |
- +------+-------+------+------+-------+------+
- | S001 | Sunny | M | S001 | Sunny | M |
- | S002 | Tom | F | S002 | Tom | F |
- | S003 | Kevin | M | S003 | Kevin | M |
- +------+-------+------+------+-------+------+
- 3 rows in set (0.00 sec)
7. 不等值聯(lián)接
這里說的不等值聯(lián)接是SQL92語法里面的ON子句里面只有不等值聯(lián)接,比如:
- mysql> SELECT
- -> s.c_no, s.score, no, name
- -> FROM score s RIGHT JOIN student stu ON stu.no != s.c_no;
- +------+-------+------+-------+
- | c_no | score | no | name |
- +------+-------+------+-------+
- | C01 | 80 | S001 | Sunny |
- | C01 | 80 | S002 | Tom |
- | C01 | 80 | S003 | Kevin |
- | C02 | 98 | S001 | Sunny |
- | C02 | 98 | S002 | Tom |
- | C02 | 98 | S003 | Kevin |
- | C03 | 76 | S001 | Sunny |
- | C03 | 76 | S002 | Tom |
- | C03 | 76 | S003 | Kevin |
- | C01 | 78 | S001 | Sunny |
- | C01 | 78 | S002 | Tom |
- | C01 | 78 | S003 | Kevin |
- | C02 | 88 | S001 | Sunny |
- | C02 | 88 | S002 | Tom |
- | C02 | 88 | S003 | Kevin |
- | C03 | 68 | S001 | Sunny |
- | C03 | 68 | S002 | Tom |
- | C03 | 68 | S003 | Kevin |
- +------+-------+------+-------+
- 18 rows in set (0.00 sec)
上面這示例,其實(shí)沒有什么實(shí)際業(yè)務(wù)價(jià)值,在實(shí)際的使用場景中,不等值聯(lián)接往往是結(jié)合等值聯(lián)接,將不等值條件在WHERE子句指定,即, 帶有WHERE子句的等值聯(lián)接。
Apache Flink雙流JOIN
Apache Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF 可以轉(zhuǎn)換為普通的INNER和OUTER)。在語義上面Apache Flink嚴(yán)格遵守標(biāo)準(zhǔn)SQL的語義,與上面演示的語義一致。下面我重點(diǎn)介紹Apache Flink中JOIN的實(shí)現(xiàn)原理。
1. 雙流JOIN與傳統(tǒng)數(shù)據(jù)庫表JOIN的區(qū)別
傳統(tǒng)數(shù)據(jù)庫表的JOIN是兩張靜態(tài)表的數(shù)據(jù)聯(lián)接,在流上面是 動(dòng)態(tài)表(關(guān)于流與動(dòng)態(tài)表的關(guān)系請查閱 《Apache Flink 漫談系列 - 流表對偶(duality)性)》,雙流JOIN的數(shù)據(jù)不斷流入與傳統(tǒng)數(shù)據(jù)庫表的JOIN有如下3個(gè)核心區(qū)別:
- 左右兩邊的數(shù)據(jù)集合無窮 - 傳統(tǒng)數(shù)據(jù)庫左右兩個(gè)表的數(shù)據(jù)集合是有限的,雙流JOIN的數(shù)據(jù)會(huì)源源不斷的流入;
- JOIN的結(jié)果不斷產(chǎn)生/更新 - 傳統(tǒng)數(shù)據(jù)庫表JOIN是一次執(zhí)行產(chǎn)生最終結(jié)果后退出,雙流JOIN會(huì)持續(xù)不斷的產(chǎn)生新的結(jié)果。在 《Apache Flink 漫談系列 - 持續(xù)查詢(Continuous Queries)》篇也有相關(guān)介紹。
- 查詢計(jì)算的雙邊驅(qū)動(dòng) - 雙流JOIN由于左右兩邊的流的速度不一樣,會(huì)導(dǎo)致左邊數(shù)據(jù)到來的時(shí)候右邊數(shù)據(jù)還沒有到來,或者右邊數(shù)據(jù)到來的時(shí)候左邊數(shù)據(jù)沒有到來,所以在實(shí)現(xiàn)中要將左右兩邊的流數(shù)據(jù)進(jìn)行保存,以保證JOIN的語義。在Blink中會(huì)以State的方式進(jìn)行數(shù)據(jù)的存儲(chǔ)。State相關(guān)請查看《Apache Flink 漫談系列 - State》篇。
(1) 數(shù)據(jù)Shuffle
分布式流計(jì)算所有數(shù)據(jù)會(huì)進(jìn)行Shuffle,怎么才能保障左右兩邊流的要JOIN的數(shù)據(jù)會(huì)在相同的節(jié)點(diǎn)進(jìn)行處理呢?在雙流JOIN的場景,我們會(huì)利用JOIN中ON的聯(lián)接key進(jìn)行partition,確保兩個(gè)流相同的聯(lián)接key會(huì)在同一個(gè)節(jié)點(diǎn)處理。
(2) 數(shù)據(jù)的保存
不論是INNER JOIN還是OUTER JOIN 都需要對左右兩邊的流的數(shù)據(jù)進(jìn)行保存,JOIN算子會(huì)開辟左右兩個(gè)State進(jìn)行數(shù)據(jù)存儲(chǔ),左右兩邊的數(shù)據(jù)到來時(shí)候,進(jìn)行如下操作:
- LeftEvent到來存儲(chǔ)到LState,RightEvent到來的時(shí)候存儲(chǔ)到RState;
- LeftEvent會(huì)去RightState進(jìn)行JOIN,并發(fā)出所有JOIN之后的Event到下游;
- RightEvent會(huì)去LeftState進(jìn)行JOIN,并發(fā)出所有JOIN之后的Event到下游。
2. 簡單場景介紹實(shí)現(xiàn)原理
(1) INNER JOIN 實(shí)現(xiàn)
JOIN有很多復(fù)雜的場景,我們先以最簡單的場景進(jìn)行實(shí)現(xiàn)原理的介紹,比如:最直接的兩個(gè)進(jìn)行INNER JOIN,比如查詢產(chǎn)品庫存和訂單數(shù)量,庫存變化事件流和訂單事件流進(jìn)行INNER JOIN,JION條件是產(chǎn)品ID,具體如下:
雙流JOIN兩邊事件都會(huì)存儲(chǔ)到State里面,如上,事件流按照標(biāo)號先后流入到j(luò)oin節(jié)點(diǎn),我們假設(shè)右邊流比較快,先流入了3個(gè)事件,3個(gè)事件會(huì)存儲(chǔ)到state中,但因?yàn)樽筮呥€沒有數(shù)據(jù),所有右邊前3個(gè)事件流入時(shí)候,沒有join結(jié)果流出,當(dāng)左邊***個(gè)事件序號為4的流入時(shí)候,先存儲(chǔ)左邊state,再與右邊已經(jīng)流入的3個(gè)事件進(jìn)行join,join的結(jié)果如圖 三行結(jié)果會(huì)流入到下游節(jié)點(diǎn)sink。當(dāng)?shù)?號事件流入時(shí)候,也會(huì)和左邊第4號事件進(jìn)行join,流出一條jion結(jié)果到下游節(jié)點(diǎn)。這里關(guān)于INNER JOIN的語義和大家強(qiáng)調(diào)兩點(diǎn):
- INNER JOIN只有符合JOIN條件時(shí)候才會(huì)有JOIN結(jié)果流出到下游,比如右邊***來的1,2,3個(gè)事件,流入時(shí)候沒有任何輸出,因?yàn)樽筮呥€沒有可以JOIN的事件;
- INNER JOIN兩邊的數(shù)據(jù)不論如何亂序,都能夠保證和傳統(tǒng)數(shù)據(jù)庫語義一致,因?yàn)槲覀儽4媪俗笥覂蓚€(gè)流的所有事件到state中。
(2) LEFT OUTER JOIN 實(shí)現(xiàn)
LEFT OUTER JOIN 可以簡寫 LEFT JOIN,語義上和INNER JOIN的區(qū)別是不論右流是否有JOIN的事件,左流的事件都需要流入下游節(jié)點(diǎn),但右流沒有可以JION的事件時(shí)候,右邊的事件補(bǔ)NULL。同樣我們以最簡單的場景說明LEFT JOIN的實(shí)現(xiàn),比如查詢產(chǎn)品庫存和訂單數(shù)量,庫存變化事件流和訂單事件流進(jìn)行LEFT JOIN,JION條件是產(chǎn)品ID,具體如下:
下圖也是表達(dá)LEFT JOIN的語義,只是展現(xiàn)方式不同:
上圖主要關(guān)注點(diǎn)是當(dāng)左邊先流入1,2事件時(shí)候,右邊沒有可以join的事件時(shí)候會(huì)向下游發(fā)送左邊事件并補(bǔ)NULL向下游發(fā)出,當(dāng)右邊***個(gè)相同的Join key到來的時(shí)候會(huì)將左邊先來的事件發(fā)出的帶有NULL的事件撤回(對應(yīng)上面command的-記錄,+代表正向記錄,-代表撤回記錄)。這里強(qiáng)調(diào)三點(diǎn):
- 左流的事件當(dāng)右邊沒有JOIN的事件時(shí)候,將右邊事件列補(bǔ)NULL后流向下游;* 當(dāng)右邊事件流入發(fā)現(xiàn)左邊已經(jīng)有可以JOIN的key的時(shí)候,并且是***個(gè)可以JOIN上的右邊事件(比如上面的3事件是***個(gè)可以和左邊JOIN key P001進(jìn)行JOIN的事件)需要撤回左邊下發(fā)的NULL記錄,并下發(fā)JOIN完整(帶有右邊事件列)的事件到下游。后續(xù)來的4,5,6,8等待后續(xù)P001的事件是不會(huì)產(chǎn)生撤回記錄的。
- 在Apache Flink系統(tǒng)內(nèi)部事件類型分為正向事件標(biāo)記為“+”和撤回事件標(biāo)記為“-”。
3. RIGHT OUTER JOIN 和 FULL OUTER JOIN
RIGHT JOIN內(nèi)部實(shí)現(xiàn)與LEFT JOIN類似, FULL JOIN和LEFT JOIN的區(qū)別是左右兩邊都會(huì)產(chǎn)生補(bǔ)NULL和撤回的操作。對于State的使用都是相似的,這里不再重復(fù)說明了。
4. 復(fù)雜場景介紹State結(jié)構(gòu)
上面我們介紹了雙流JOIN會(huì)使用State記錄左右兩邊流的事件,同時(shí)我們示例數(shù)據(jù)的場景也是比較簡單,比如流上沒有更新事件(沒有撤回事件),同時(shí)流上沒有重復(fù)行事件。那么我們嘗試思考下面的事件流在雙流JOIN時(shí)候是怎么處理的?
上圖示例是連續(xù)產(chǎn)生了2筆銷售數(shù)量一樣的訂單,同時(shí)在產(chǎn)生一筆銷售數(shù)量為5的訂單之后,又將該訂單取消了(或者退貨了),這樣在事件流上面就會(huì)是上圖的示意,這種情況Blink內(nèi)部如何支撐呢?
根據(jù)JOIN的語義以INNER JOIN為例,右邊有兩條相同的訂單流入,我們就應(yīng)該向下游輸出兩條JOIN結(jié)果,當(dāng)有撤回的事件流入時(shí)候,我們也需要將已經(jīng)下發(fā)下游的JOIN事件撤回,如下:
上面的場景以及LEFT JOIN部分介紹的撤回情況,Apache Flink內(nèi)部需要處理如下幾個(gè)核心點(diǎn):
- 記錄重復(fù)記錄(完整記錄重復(fù)記錄或者記錄相同記錄的個(gè)數(shù))
- 記錄正向記錄和撤回記錄(完整記錄正向和撤回記錄或者記錄個(gè)數(shù))
- 記錄哪一條事件是***個(gè)可以與左邊事件進(jìn)行JOIN的事件
(1) 雙流JOIN的State數(shù)據(jù)結(jié)構(gòu)
在Apache Flink內(nèi)部對不同的場景有特殊的數(shù)據(jù)結(jié)構(gòu)優(yōu)化,本篇我們只針對上面說的情況(通用設(shè)計(jì))介紹一下雙流JOIN的State的數(shù)據(jù)結(jié)構(gòu)和用途:
數(shù)據(jù)結(jié)構(gòu)
- Map<JoinKey, Map<rowData, count>>;
- ***級MAP的key是Join key,比如示例中的P001, value是流上面的所有完整事件;
- 第二級MAP的key是行數(shù)據(jù),比如示例中的P001, 2,value是相同事件值的個(gè)數(shù)
數(shù)據(jù)結(jié)構(gòu)的利用
- 記錄重復(fù)記錄 - 利用第二級MAP的value記錄重復(fù)記錄的個(gè)數(shù),這樣大大減少存儲(chǔ)和讀取
- 正向記錄和撤回記錄 - 利用第二級MAP的value記錄,當(dāng)count=0時(shí)候刪除該元素
- 判斷右邊是否產(chǎn)生撤回記錄 - 根據(jù)***級MAP的value的size來判斷是否產(chǎn)生撤回,只有size由0變成1的時(shí)候(***條和左可以JOIN的事件)才產(chǎn)生撤回
雙流JOIN的應(yīng)用優(yōu)化
1. 構(gòu)造更新流
我們在 《Apache Flink 漫談系列 - 持續(xù)查詢(Continuous Queries)》篇中以雙流JOIN為例介紹了如何構(gòu)造業(yè)務(wù)上的PK source,構(gòu)造PK source本質(zhì)上在保證業(yè)務(wù)語義的同時(shí)也是對雙流JOIN的一種優(yōu)化,比如多級LEFT JOIN會(huì)讓流上的數(shù)據(jù)不斷膨脹,造成JOIN節(jié)點(diǎn)性能較慢,JOIN之后的下游節(jié)點(diǎn)邊堵(數(shù)據(jù)量大導(dǎo)致,非熱點(diǎn))。那么嫌少流入JOIN的數(shù)據(jù),比如構(gòu)造PK source就會(huì)大大減少JOIN數(shù)據(jù)的膨脹。這里不再重復(fù)舉例,大家可以查閱 《Apache Flink 漫談系列 - 持續(xù)查詢(Continuous Queries)》 的雙流JOIN示例部分。
2. NULL造成的熱點(diǎn)
比如我們有A LEFT JOIN B ON A.aCol = B.bCol LEFT JOIN C ON B.cCol = C.cCol 的業(yè)務(wù),JOB的DAG如下:
假設(shè)在實(shí)際業(yè)務(wù)中有這樣的特點(diǎn),大部分時(shí)候當(dāng)A事件流入的時(shí)候,B還沒有可以JOIN的數(shù)據(jù),但是B來的時(shí)候,A已經(jīng)有可以JOIN的數(shù)據(jù)了,這特點(diǎn)就會(huì)導(dǎo)致,A LEFT JOIN B 會(huì)產(chǎn)生大量的 (A, NULL),其中包括B里面的 cCol 列也是NULL,這時(shí)候當(dāng)與C進(jìn)行LEFT JOIN的時(shí)候,首先Blink內(nèi)部會(huì)利用cCol對AB的JOIN產(chǎn)生的事件流進(jìn)行Shuffle, cCol是NULL進(jìn)而是下游節(jié)點(diǎn)大量的NULL事件流入,造成熱點(diǎn)。那么這問題如何解決呢?
我們可以改變JOIN的先后順序,來保證A LEFT JOIN B 不會(huì)產(chǎn)生NULL的熱點(diǎn)問題,如下:
3. JOIN ReOrder
對于JOIN算子的實(shí)現(xiàn)我們知道左右兩邊的事件都會(huì)存儲(chǔ)到State中,在流入事件時(shí)候在從另一邊讀取所有事件進(jìn)行JOIN計(jì)算,這樣的實(shí)現(xiàn)邏輯在數(shù)據(jù)量很大的場景會(huì)有一定的state操作瓶頸,我們某些場景可以通過業(yè)務(wù)角度調(diào)整JOIN的順序,來消除性能瓶頸,比如:A JOIN B ON A.acol = B.bcol JOIN C ON B.bcol = C.ccol. 這樣的場景,如果 A與B進(jìn)行JOIN產(chǎn)生數(shù)據(jù)量很大,但是B與C進(jìn)行JOIN產(chǎn)生的數(shù)據(jù)量很小,那么我們可以強(qiáng)制調(diào)整JOIN的聯(lián)接順序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 如下示意圖:
小結(jié)
本篇向大家介紹了數(shù)據(jù)庫設(shè)計(jì)范式的要求和實(shí)際業(yè)務(wù)的查詢需要是傳統(tǒng)數(shù)據(jù)庫JOIN算子存在的原因,并以具體示例的方式向大家介紹JOIN在數(shù)據(jù)庫的查詢過程,以及潛在的查詢優(yōu)化,再以實(shí)際的例子介紹Apache Flink上面的雙流JOIN的實(shí)現(xiàn)原理和State數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì),***向大家介紹兩個(gè)雙流JOIN的使用優(yōu)化。
# 關(guān)于點(diǎn)贊和評論
本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點(diǎn)贊鼓勵(lì),對有不足的篇章給予反饋和建議,先行感謝大家!
作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺Blink的設(shè)計(jì)研發(fā)工作。
【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請聯(lián)系原作者】