主要內(nèi)容本篇主要從FlinkSQL實(shí)現(xiàn)的內(nèi)核與原理,工作流等的視角帶大家構(gòu)建一幅FlinkSQL全景圖(以Blink為主介紹),探知背后支撐的“男人們”(組件)。建議收藏,僅此一份。
主要內(nèi)容:
1. Table API 與 SQL
2. Apache Calcite
3. 元數(shù)據(jù)
4. SQL 函數(shù)
5. Flink Planner 與 Blink Planner
6. Blink SQL執(zhí)行過(guò)程
7. SQL優(yōu)化器
8. 總結(jié)
Table API 與 Table SQLTable API 和 Table SQL 集成在同一套 API 中。這套 API 的核心概念是Table,用作查詢(xún)的輸入和輸出。
Apache Flink 具有兩個(gè)關(guān)系型 API - Table API 和 Table SQL - 用于統(tǒng)一的流和批處理。Table API 是 Scala 和 Java 的語(yǔ)言集成查詢(xún) API,它允許用非常直觀的方式從關(guān)系運(yùn)算符(如選擇、過(guò)濾和連接)組成查詢(xún)。Flink 的 SQL 支持是基于 Apache Calcite,它實(shí)現(xiàn)了 SQL 標(biāo)準(zhǔn)。無(wú)論輸入是批處理輸入(DataSet)還是流輸入(DataStream),在任一接口中指定的查詢(xún)都具有相同的語(yǔ)義,并指定相同的結(jié)果。
Table API 和 SQL 接口與 Flink 的 DataStream 和 DataSet API 緊密集成。你可以很容易地在所有 API 和建立在 API 基礎(chǔ)上的庫(kù)之間切換。
Apache CalciteCalcite 是什么
Apache Calcite是一款開(kāi)源的動(dòng)態(tài)數(shù)據(jù)管理框架,它提供了標(biāo)準(zhǔn)的 SQL 語(yǔ)言、多種查詢(xún)優(yōu)化和連接各種數(shù)據(jù)源的能力,但不包括數(shù)據(jù)存儲(chǔ)、處理數(shù)據(jù)的算法和存儲(chǔ)元數(shù)據(jù)的存儲(chǔ)庫(kù)。
Calcite采用的是業(yè)界大數(shù)據(jù)查詢(xún)框架的一種通用思路,它的目標(biāo)是“one size fits all(一種方案適應(yīng)所有需求場(chǎng)景)”,希望能為不同計(jì)算平臺(tái)和數(shù)據(jù)源提供統(tǒng)一的查詢(xún)引擎。
Calcite作為一個(gè)強(qiáng)大的SQL計(jì)算引擎,在Flink內(nèi)部的SQL引擎模塊就是基于Calcite。
Calcite 的特點(diǎn)
支持標(biāo)準(zhǔn)SQL語(yǔ)言;
獨(dú)立于編程語(yǔ)言和數(shù)據(jù)源,可以支持不同的前端和后端;
支持關(guān)系代數(shù)、可定制的邏輯規(guī)則和基于成本模型優(yōu)化的查詢(xún)引擎;
支持物化視圖(materialized view)的管理(創(chuàng)建、丟棄、持久化和自動(dòng)識(shí)別);
基于物化視圖的Lattice和Tile機(jī)制,以應(yīng)用于OLAP分析;
支持對(duì)流數(shù)據(jù)的查詢(xún)。
Calcite 的功能

1. SQL 解析
Calcite 的SQL解析是通過(guò)JavaCC實(shí)現(xiàn)的,使用JavaCC編寫(xiě)SQL語(yǔ)法描述文件,將SQL解析成未經(jīng)校驗(yàn)的AST語(yǔ)法樹(shù)。
2. SQL 校驗(yàn)
無(wú)狀態(tài)的校驗(yàn):驗(yàn)證SQL語(yǔ)句是否符合規(guī)范。有狀態(tài)的校驗(yàn):通過(guò)與元數(shù)據(jù)結(jié)合驗(yàn)證SQL的Schema,F(xiàn)ield,F(xiàn)unction是否存在,輸入和輸出是否符合。
3. 查詢(xún)優(yōu)化
對(duì)RelNode和邏輯計(jì)劃樹(shù)進(jìn)行優(yōu)化,得到優(yōu)化后的生成物理執(zhí)行計(jì)劃。
4. SQL 生成器
將物理執(zhí)行計(jì)劃生成特定平臺(tái)的可執(zhí)行程序,比如Flink,Hive,不同規(guī)則的SQL查詢(xún)語(yǔ)句。
5. 執(zhí)行
通過(guò)各個(gè)執(zhí)行平臺(tái)在內(nèi)存中編譯,然后執(zhí)行查詢(xún)。
FlinkSQL 結(jié)合 Calcite

一條SQL從提交到Calcite解析,優(yōu)化,到最后的Flink執(zhí)行,一般分以下過(guò)程:
1. Sql Parser: 將sql語(yǔ)句通過(guò)java cc解析成AST(語(yǔ)法樹(shù)),在calcite中用SqlNode表示AST;
2. Sql Validator: 結(jié)合數(shù)字字典(catalog)去驗(yàn)證sql語(yǔ)法;
3. 生成Logical Plan: 將sqlNode表示的AST轉(zhuǎn)換成LogicalPlan, 用relNode表示;
4. 生成 optimized LogicalPlan: 先基于calcite rules 去優(yōu)化logical Plan,基于flink定制的一些優(yōu)化rules去優(yōu)化logical Plan;
5. 生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉(zhuǎn)成成Flink的物理執(zhí)行計(jì)劃;
6. 將物理執(zhí)行計(jì)劃轉(zhuǎn)成Flink ExecutionPlan: 就是調(diào)用相應(yīng)的tanslateToPlan方法轉(zhuǎn)換和利用CodeGen元編程成Flink的各種算子。
Table API 來(lái)提交任務(wù)的話,基本流程和運(yùn)行SQL類(lèi)似,稍微不同的是:table api parser: flink會(huì)把table api表達(dá)的計(jì)算邏輯也表示成一顆樹(shù),用treeNode去表式;在這棵樹(shù)上的每個(gè)節(jié)點(diǎn)的計(jì)算邏輯用Expression來(lái)表示。
簡(jiǎn)單說(shuō)一下SQL優(yōu)化:RBO(基于規(guī)則)
RBO主要是開(kāi)發(fā)人員在使用SQL的過(guò)程中,有些發(fā)現(xiàn)有些通用的規(guī)則,可以顯著提高SQL執(zhí)行的效率,比如最經(jīng)典的filter下推:

將Filter下推到Join之前執(zhí)行,這樣做的好處是減少了Join的數(shù)量,同時(shí)降低了CPU,內(nèi)存,網(wǎng)絡(luò)等方面的開(kāi)銷(xiāo),提高效率。
SQL優(yōu)化的發(fā)展,則可以分為兩個(gè)階段,即RBO(基于規(guī)則),和CBO(基于代價(jià))
RBO和CBO的區(qū)別大概在于: RBO只為應(yīng)用提供的rule,而CBO會(huì)根據(jù)給出的Cost信息,智能應(yīng)用rule,求出一個(gè)Cost最低的執(zhí)行計(jì)劃。需要糾正很多人誤區(qū)的一點(diǎn)是,CBO其實(shí)也是基于rule的,接觸到RBO和CBO這兩個(gè)概念的時(shí)候,很容易將他們對(duì)立起來(lái)。但實(shí)際上CBO,可以理解為就是加上Cost的RBO。
元數(shù)據(jù)Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫(kù)、表、分區(qū)、視圖以及數(shù)據(jù)庫(kù)或其他外部系統(tǒng)中存儲(chǔ)的函數(shù)和信息。
數(shù)據(jù)處理最關(guān)鍵的方面之一是管理元數(shù)據(jù)。元數(shù)據(jù)可以是臨時(shí)的,例如臨時(shí)表、或者通過(guò) TableEnvironment 注冊(cè)的 UDF。元數(shù)據(jù)也可以是持久化的,例如 Hive Metastore 中的元數(shù)據(jù)。Catalog 提供了一個(gè)統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從 Table API 和 SQL 查詢(xún)語(yǔ)句中來(lái)訪問(wèn)。
1. 目前支持的類(lèi)型

(1) GenericInMemoryCatalog
是基于內(nèi)存實(shí)現(xiàn)的 Catalog,所有元數(shù)據(jù)只在 session 的生命周期內(nèi)可用。
(2) JdbcCatalog
JdbcCatalog 使得用戶(hù)可以將 Flink 通過(guò) JDBC 協(xié)議連接到關(guān)系數(shù)據(jù)庫(kù)。PostgresCatalog 是當(dāng)前實(shí)現(xiàn)的唯一一種 JDBC Catalog。
(3) HiveCatalog
HiveCatalog 有兩個(gè)用途:作為原生 Flink 元數(shù)據(jù)的持久化存儲(chǔ),以及作為讀寫(xiě)現(xiàn)有 Hive 元數(shù)據(jù)的接口。
(4) 用戶(hù)自定義 Catalog
Catalog 是可擴(kuò)展的,用戶(hù)可以通過(guò)實(shí)現(xiàn) Catalog 接口來(lái)開(kāi)發(fā)自定義 Catalog。想要在 SQL CLI 中使用自定義 Catalog,用戶(hù)除了需要實(shí)現(xiàn)自定義的 Catalog 之外,還需要為這個(gè) Catalog 實(shí)現(xiàn)對(duì)應(yīng)的 CatalogFactory 接口。
CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動(dòng)時(shí)配置 Catalog。這組屬性集將傳遞給發(fā)現(xiàn)服務(wù),在該服務(wù)中,服務(wù)會(huì)嘗試將屬性關(guān)聯(lián)到 CatalogFactory 并初始化相應(yīng)的 Catalog 實(shí)例。
2. 元數(shù)據(jù)分類(lèi)
catalog定義主要有三種數(shù)據(jù)類(lèi)型接口,也就是常用到的數(shù)據(jù)庫(kù),表&視圖,函數(shù)。當(dāng)然還有最上層的Catalog容器。
(1) 數(shù)據(jù)庫(kù)
等同于數(shù)據(jù)庫(kù)中庫(kù)的實(shí)例,接口定義為CatalogDatabase,定義數(shù)據(jù)庫(kù)實(shí)例的元數(shù)據(jù),一個(gè)數(shù)據(jù)庫(kù)實(shí)例中包含表,視圖,函數(shù)等多種對(duì)象。
(2) 表&視圖
CatalogTable對(duì)應(yīng)數(shù)據(jù)庫(kù)中的表,CatalogView隊(duì)形數(shù)據(jù)庫(kù)中的視圖。
表是一種存儲(chǔ)的實(shí)體,包換了字段信息,表的分區(qū),屬性,描述信息。其實(shí)說(shuō)白了字段定義和之前印象的數(shù)據(jù)庫(kù)很是類(lèi)似。你可以對(duì)比過(guò)來(lái)。不同的是,拿flink來(lái)說(shuō),所有的表都是外部數(shù)據(jù)源,除了上面所說(shuō)的,還需要訪問(wèn)信息,比如IP端口,mater地址,connector連接類(lèi)等等。
視圖是一個(gè)虛擬概念,本質(zhì)上是一條SQL查詢(xún)語(yǔ)句,底層對(duì)應(yīng)一張表或者多張表。包含SQL查詢(xún)語(yǔ)句,視圖的字段信息,視圖的屬性等等的信息。
(3) 函數(shù)
CatalogFunction是函數(shù)元數(shù)據(jù)的接口。函數(shù)元數(shù)據(jù)包含了所在的類(lèi)信息和編程語(yǔ)言。
3. 數(shù)據(jù)訪問(wèn)
Flink的Table API和SQL程序可以連接到其他外部系統(tǒng),用于讀和寫(xiě)批處理表和流表。source table提供對(duì)存儲(chǔ)在外部系統(tǒng)(如數(shù)據(jù)庫(kù)、消息隊(duì)列或文件系統(tǒng))中的數(shù)據(jù)的訪問(wèn)。sink table 向外部存儲(chǔ)系統(tǒng)發(fā)送表。根據(jù)source和sink器的類(lèi)型,它們支持不同的格式,如CSV、Avro、Parquet或ORC。
(1) TableSchema
Table Source 和 Sink需要具備對(duì)外數(shù)據(jù)源的描述能力,所以Flink定義了TableSchema對(duì)象來(lái)定義字段名稱(chēng)和字段類(lèi)型,存儲(chǔ)格式等等信息
(2) 時(shí)間屬性
支持處理時(shí)間和時(shí)間時(shí)間
(3) Watermark
用來(lái)處理亂序的數(shù)據(jù)。
4. Table Source & Table Sink
Flink本地支持各種連接器,可以查看往期總結(jié)
Filesystem
Elasticsearch
Apache Kafka
Amazon Kinesis Data Streams
JDBC
Apache HBase
Apache Hive
幾個(gè)主要Table Source與Sink體系
(1) StreamTableSource
流數(shù)據(jù)抽象,區(qū)分了無(wú)界數(shù)據(jù)與有界數(shù)據(jù)。
(2) LookupableTableSource
按照J(rèn)oin條件中的字段進(jìn)行關(guān)聯(lián)。
(3) FilterableTableSource
過(guò)濾不符合條件的記錄。
(4) LimitableTableSource
限制記錄條數(shù)。
(5) ProjectableTableSource
過(guò)濾不會(huì)被使用的字段。
(6) AppendStreamTableSink
追加模式的TableSink 支持追加,不支持更新。
(7) RetractStreamTableSink
支持召回模式的TableSink,召回模式其實(shí)就是流上的update。
(8) UpsertStreamTableSink
有則更新,無(wú)則插入
SQL 函數(shù)臨時(shí)函數(shù)和持久化函數(shù)。臨時(shí)函數(shù)始終由用戶(hù)創(chuàng)建,它容易改變并且僅在會(huì)話的生命周期內(nèi)有效。持久化函數(shù)不是由系統(tǒng)提供,就是存儲(chǔ)在 Catalog 中,它在會(huì)話的整個(gè)生命周期內(nèi)都有效。
內(nèi)置函數(shù)
Table API和SQL為用戶(hù)提供了一組用于數(shù)據(jù)轉(zhuǎn)換的內(nèi)置函數(shù)。如果您需要的函數(shù)還不受支持,您可以實(shí)現(xiàn)用戶(hù)定義的函數(shù)
(1) Comparison Functions(比較型函數(shù))
eg:value1 = value2
(2) Logical Functions(邏輯函數(shù))
eg: boolean1 OR boolean2
(3) Arithmetic Functions(算術(shù)函數(shù))
eg: numeric1 + numeric2
(4) String Functions(字符串函數(shù))
UPPER(string)
(5) Temporal Functions(時(shí)間函數(shù))
YEAR(date)
(6) Conditional Functions(有條件的函數(shù))
IF(condition, true_value, false_value)
(7) Type Conversion Functions(類(lèi)型轉(zhuǎn)換函數(shù))
CAST(value AS type)
(8) Collection Functions(集合函數(shù))
array ‘[’ INT ‘]’
(9) Value Construction Functions , Value Access Functions,Grouping Functions,Hash Functions,Auxiliary Functions,Aggregate Functions,Column Functions (不一一列舉)
自定義函數(shù)
(1) 標(biāo)量函數(shù)(UDF)
標(biāo)量函數(shù) 將標(biāo)量值轉(zhuǎn)換成一個(gè)新標(biāo)量值,也就是對(duì)一行數(shù)據(jù)中的一個(gè)或者多個(gè)字段返回一個(gè)單值。
(2) 聚合函數(shù)(UDAGG)
自定義聚合函數(shù)(UDAGG)是把一個(gè)表(一行或者多行,每行可以有一列或者多列)聚合成一個(gè)標(biāo)量值。
(3) 表值函數(shù)(UDTF)
表值函數(shù) 將標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù)??梢越邮找粋€(gè)或者多個(gè)字段作為參數(shù),輸出多行列數(shù)據(jù)。
(4) 表值聚合函數(shù)(UDTAGG)
自定義表值聚合函數(shù)(UDTAGG)可以把一個(gè)表(一行或者多行,每行有一列或者多列)聚合成另一張表,結(jié)果中可以有多行多列。
(5) 異步表值函數(shù)
異步表值函數(shù) 是異步查詢(xún)外部數(shù)據(jù)系統(tǒng)的特殊函數(shù)。
Planner 與 Blink PlannerFlink Table/SQL體系中的Planner(即查詢(xún)處理器)是溝通Flink與Calcite的橋梁,為T(mén)able/SQL API提供完整的解析、優(yōu)化和執(zhí)行環(huán)境。
Flink Table 的新架構(gòu)實(shí)現(xiàn)了查詢(xún)處理器的插件化,項(xiàng)目完整保留原有 Flink Planner (Old Planner),同時(shí)又引入了新的 Blink Planner,用戶(hù)可以自行選擇使用 Old Planner 還是 Blink Planner。
主要區(qū)別:
Blink做到了真正的流批統(tǒng)一,即將批看做是特殊的流,把處理批的API和處理流的API做成了一樣的。也就是說(shuō)不管是批數(shù)據(jù)還是流數(shù)據(jù),底層統(tǒng)統(tǒng)都是DataStream。所以使用Blink作為table planner的程序,Table和DataSet是不能相互轉(zhuǎn)換的。
Blink planner是不支持BatchTableSource的,它只支持StreamTableSource。
Blink Planner和Old Planner的FilterableTableSource是不兼容的。Old - Planner會(huì)下推PlannerExpression到FilterableTableSource。而B(niǎo)link planner下推的是Expression。
基于String的鍵值對(duì)配置項(xiàng)只能用于Blink Planner
Blink Planner會(huì)優(yōu)化多個(gè)sink到同一個(gè)TableEnvironment和StreamTableEnvironment。而Old Planner會(huì)為不同的sink優(yōu)化到自己的DAG中,也就是說(shuō)有幾個(gè)sink就有幾個(gè)DAG。
Old Planner 不支持 catalog統(tǒng)計(jì),Blink支持。
Old Planner 不支持版本表(versioned Table)。版本表類(lèi)似HBASE中版本表的意思,每個(gè)key可以記住過(guò)去的幾個(gè)值。
Blink SQL執(zhí)行過(guò)程
SQL執(zhí)行過(guò)程分三個(gè)階段
(1) 從SQL到 Operation
(2) 從Operation 到 Transformation
(3) 環(huán)境的執(zhí)行階段
從SQL到 Operation
(1) 解析SQL轉(zhuǎn)換為QueryOperation;
(2) SQL解析為SqlNode;
(3) 校驗(yàn)SqlNode;
(4) 調(diào)用Calcite SQLToRelConvertrt將SqlNode轉(zhuǎn)化為RelNode邏輯樹(shù);
(5) RelNode轉(zhuǎn)化為Operation。
Operation 到 Transformation
(1) DQL(數(shù)據(jù)查詢(xún)語(yǔ)言)轉(zhuǎn)換,在flink中作為中間運(yùn)算;
(2) DML(數(shù)據(jù)操作語(yǔ)言),DQL轉(zhuǎn)換。
整個(gè)轉(zhuǎn)換從Operation開(kāi)始,先轉(zhuǎn)換為Calcite的邏輯計(jì)劃樹(shù),再轉(zhuǎn)化為Flink的邏輯計(jì)劃樹(shù),然后進(jìn)行優(yōu)化。優(yōu)化后的邏輯樹(shù)轉(zhuǎn)換為Flink的物理執(zhí)行,物理執(zhí)行生成一系列的算子,udf等等,包裝到Transformation中。
環(huán)境的執(zhí)行階段
有了Transformation后正式進(jìn)入到StreamGraph的過(guò)程中,最終交給Flink集群去運(yùn)行。
SQL優(yōu)化器查詢(xún)優(yōu)化器
再次提到兩個(gè)優(yōu)化器:RBO(基于規(guī)則的優(yōu)化器) 和 CBO(基于代價(jià)的優(yōu)化器)
(1) RBO(基于規(guī)則的優(yōu)化器)會(huì)將原有表達(dá)式裁剪掉,遍歷一系列規(guī)則(Rule),只要滿(mǎn)足條件就轉(zhuǎn)換,生成最終的執(zhí)行計(jì)劃。一些常見(jiàn)的規(guī)則包括分區(qū)裁剪(Partition Prune)、列裁剪、謂詞下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit下推、sort下推、常量折疊(Constant Folding)、子查詢(xún)內(nèi)聯(lián)轉(zhuǎn)join等。
(2) CBO(基于代價(jià)的優(yōu)化器)會(huì)將原有表達(dá)式保留,基于統(tǒng)計(jì)信息和代價(jià)模型,嘗試探索生成等價(jià)關(guān)系表達(dá)式,最終取代價(jià)最小的執(zhí)行計(jì)劃。CBO的實(shí)現(xiàn)有兩種模型,Volcano模型,Cascades模型。這兩種模型思想很是相似,不同點(diǎn)在于Cascades模型一邊遍歷SQL邏輯樹(shù),一邊優(yōu)化,從而進(jìn)一步裁剪掉一些執(zhí)行計(jì)劃。
目前各大數(shù)據(jù)庫(kù)和計(jì)算引擎傾向于CBO。
總結(jié)在目前情況下,在阿里對(duì)Flink社區(qū)的貢獻(xiàn)下,F(xiàn)link包含了Flink SQL 和 Blink SQL體系,F(xiàn)link Planner稱(chēng)之為 Old Planner,Blink Planner稱(chēng)之為 New Planner。從中可以發(fā)現(xiàn) Blink Planner是未來(lái),F(xiàn)link Planner將會(huì)被淘汰。
FlinkSQL依靠 Calcite提供了一套SQL驗(yàn)證,解析,優(yōu)化等等操作。同時(shí)FlinkSQL提供元數(shù)據(jù)管理,SQL函數(shù),數(shù)據(jù)源的建設(shè)。也自由化地提供了自定義函數(shù),自定義connector連接,豐富了場(chǎng)景的使用。
FlinkSQL你值得擁有!?。?/p>
編輯:jq
-
處理器
+關(guān)注
關(guān)注
68文章
20339瀏覽量
255355 -
數(shù)據(jù)管理
+關(guān)注
關(guān)注
1文章
347瀏覽量
20630 -
SQL
+關(guān)注
關(guān)注
1文章
807瀏覽量
46964 -
UDF
+關(guān)注
關(guān)注
0文章
4瀏覽量
6703
原文標(biāo)題:干貨:詳解 FlinkSQL 實(shí)現(xiàn)原理
文章出處:【微信號(hào):DBDevs,微信公眾號(hào):數(shù)據(jù)分析與開(kāi)發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
全方位解析FlinkSQL的實(shí)現(xiàn)原理
評(píng)論