01、Source API

Source API 已經(jīng)引入很多版本,從 1.12 開始我們有了 Source API 的第一個(gè)版本,到 Flink 1.14 開始逐漸達(dá)到一個(gè)穩(wěn)定的狀態(tài),并標(biāo)記成 Public。如果了解 Flink 的時(shí)間較長,我們之前還有 InputFormat 和 SourceFunction。請大家注意這些 API 在 2.0 都會(huì)被棄用掉,如果需要開發(fā)一個(gè)新的 Connector ,請關(guān)注最新的 Source API

從整體的設(shè)計(jì)來看,Source API 使用了主從的結(jié)構(gòu),和 Flink 的集群結(jié)構(gòu)是類似的,它分為了兩個(gè)部分,第一部分就是 SplitEnumerator ,它相當(dāng)于整個(gè) Source 的大腦,從名字上來看它的主要功能就是枚舉、分片。

分片是 Source 對(duì)外部系統(tǒng)的部分抽象,比如 Kafka 一個(gè)分片就是一個(gè) Topic 里的 Partition。如果是一個(gè)文件系統(tǒng)的 Source ,那一個(gè) Split 就是一個(gè)文件或者是一個(gè)文件夾。SplitEnumerator 的工作就是在外部系統(tǒng)當(dāng)中發(fā)現(xiàn)這些分片并把它做成任務(wù)分配給最終真正進(jìn)行干活的,我們稱為 SourceReader 。SplitEnumerator 是作業(yè)級(jí)別的,也就是每個(gè)作業(yè)或者說每個(gè) Source 是只有一個(gè)的, SourceReader 是 Subtask 級(jí)別的,整個(gè) Source 的并發(fā)是多少,那我們 SourceReader 的實(shí)例就有多少。SourceReader 和 Enumerator之間是通過 JM 和 TM 的 RPC 進(jìn)行通信的,我們在這之上也封裝了一些事件能夠讓 Enumerator 和 Reader 之間進(jìn)行溝通,我們稱之為 Source Event,這樣能夠更好地協(xié)調(diào)  Enumerator 和 Reader 之間的任務(wù)分配和全局管理的工作。

為了進(jìn)一步簡化用戶對(duì) Source 的開發(fā)過程,正如上圖所示,我們提供 SourceReaderBase 的基類,SourceReader 是一個(gè)相當(dāng)于比較底層的接口,為了簡化開發(fā)難度,我們也提供了一個(gè) SourceReaderBase ,它進(jìn)一步把 Source 和外部系統(tǒng)之間的溝通部分和與 Flink 進(jìn)行協(xié)作的部分做了一個(gè)拆分,這樣做的好處是 Source 開發(fā)者能夠更加關(guān)注和外部系統(tǒng)之間的交互,而不需要過多關(guān)注和 Flink 之間的 Checkpoint 處理、會(huì)不會(huì)影響 Flink 主線程的工作等問題。在 SorceReaderBase 的設(shè)計(jì)之下,我們又抽出了一個(gè)名為 SplitReader 的 API,它才是真正的從外部系統(tǒng)中拉取數(shù)據(jù),根據(jù) Enumerator 分配的分片到外部系統(tǒng)中讀取數(shù)據(jù)的部分。數(shù)據(jù)在讀取來之后,SplitReader 會(huì)把它放到 SourceReaderBase 中間的一個(gè) Element 隊(duì)列,實(shí)際上就是做了和外部系統(tǒng)以及和 Flink 之間的隔離。在 Flink 這一側(cè),我們真正運(yùn)轉(zhuǎn)的是 Flink Task 的主線程,這是一個(gè)沒有 Box 的模型,它會(huì)不斷地從中間拉取數(shù)據(jù),然后把數(shù)據(jù)經(jīng)過 RecordEmitter 的處理后發(fā)送到下游。RecordEmitter 的工作主要就是用來做反序列化,將外部系統(tǒng)當(dāng)中的數(shù)據(jù)格式轉(zhuǎn)換成下游當(dāng)中的一些數(shù)據(jù)格式。

在開發(fā) Source 的時(shí)候需要注意的問題有哪些,這個(gè)是我在檢查 Source 的實(shí)現(xiàn)和管理當(dāng)中總結(jié)出來的。第一點(diǎn)是開發(fā)的時(shí)候一定要注意把和外部系統(tǒng)交互的部分以及和 Flink 交互的部分區(qū)分開,比如剛剛講的 SourceReaderBase 為什么中間要插一個(gè)隊(duì)列,這樣就是盡量把和外部系統(tǒng)交互的部分和與 Flink 之間交互的部分區(qū)分開。之所以這樣做是因?yàn)?Flink 的主線程是一個(gè) mailbox 模型 ,包括 Checkpoint 和一些控制信息的傳遞都是通過這個(gè) mailbox 的線程來做的。如果是我們用它去和外部系統(tǒng)做 IO 的話,這樣有可能會(huì)對(duì)下游算子以及整個(gè) Task 的運(yùn)行產(chǎn)生一些影響,包括 Checkpoint 的運(yùn)行有可能也會(huì)受到一定程度的影響,所以在開發(fā)的時(shí)候一定要注意把做 IO 的線程和 Flink 的線程區(qū)分開。


第二點(diǎn)是我們 Source 當(dāng)中提供了很多工具或者說方法,比如 SplitEnumeratorContext 里面有一個(gè) callAsync 方法,很多人在開發(fā) Enumerator 時(shí)沒有注意到它,自己去起一個(gè)線程池或者去起一個(gè)線程,很費(fèi)勁地去處理各個(gè)線程之間的協(xié)調(diào)問題。那通過這個(gè) callAsync 我們已經(jīng)提供了一個(gè)能夠給外部系統(tǒng)做 IO 的一個(gè)線程的,叫 worker thread。大家就可以直接利用這個(gè)工具,我們會(huì)在 Flink 里面把一些線程之間的隔離問題處理好。盡量能去復(fù)用 SourceReaderBase 和 SplitReader 的時(shí)候就盡量去復(fù)用它,這樣能夠大大降低我們的開發(fā)難度,總的來說就是盡量自己少造輪子,可以復(fù)用現(xiàn)有的輪子。

在近幾個(gè)版本中我們對(duì) Source 的功能做了增強(qiáng),首先就是 Hybrid Source ,它有一種典型的用戶場景,一些線上用戶需要首先去讀取 HDFS 或者其他文件系統(tǒng)存儲(chǔ)里面的一些存量數(shù)據(jù),在讀取完已有的存量數(shù)據(jù)之后進(jìn)行切換,比如切到 Kafka 或者其他的消息隊(duì)列來讀在線數(shù)據(jù),那實(shí)際上是需要一個(gè)在不同 Source 之間進(jìn)行切換的能力。Hybrid Source 就是在現(xiàn)有 Source 的基礎(chǔ)上,封裝這么一層,提供了這樣一個(gè)多 Source 之間按序切換的能力。兩個(gè) Source 之間是有切換順序的,當(dāng)一個(gè) Source 比如 FileEnumerator 執(zhí)行完工作之后,會(huì)產(chǎn)生一個(gè) Switch context ,也就是說我會(huì)把當(dāng)前的進(jìn)度或狀態(tài)的信息通過這個(gè) SwitchContext 提供給接下來要運(yùn)行的 Source 。比如說剛提到的場景里面,F(xiàn)ileEnumerator 就要告訴 Kafka 的 Enumerator 我現(xiàn)在讀到哪, Kafka Enumerator 會(huì)根據(jù)當(dāng)前在 Switchcontext 里面提供的位點(diǎn)信息或者說時(shí)間信息來正確地啟動(dòng) Kafka Source 的讀取,平滑地遷移到存量階段或者在線數(shù)據(jù)的讀取當(dāng)中。從這個(gè)架構(gòu)圖中我們也可以觀察到,實(shí)際上 Hybrid Source 就是對(duì)我們現(xiàn)有的 Enumerator 還有 Reader 進(jìn)行了二次封裝,并提供了這樣一個(gè)工具類來幫助它們進(jìn)行切換。

第二個(gè)在 Source 上面我們支持的功能就是 Watermark Alignment。

先說一下這個(gè)問題的背景,不管大家的作業(yè)當(dāng)中有一個(gè) Source 還是多個(gè) Source ,經(jīng)常會(huì)遇到的情況是不同 Source 之間讀取的進(jìn)度差異會(huì)很大,比方如果是 Kafka, Source 中的某一個(gè) Partition 因?yàn)榫W(wǎng)絡(luò)或者其他原因,它的進(jìn)度遠(yuǎn)遠(yuǎn)落后于其他的 Partition。或者有兩個(gè) Source ,它們之間因?yàn)樽x取不同的外部系統(tǒng)導(dǎo)致產(chǎn)生不同的進(jìn)度,這樣就會(huì)導(dǎo)致一些下游的算子比如說我需要做一些 Join、我需要做 Aggregate,就需要等待所有并發(fā)的 Watermark 都前進(jìn)到同一個(gè)位置之后才能夠出發(fā)計(jì)算,只要有一個(gè)拖了后腿,那其他人的數(shù)據(jù)都要在狀態(tài)里面等,這樣就會(huì)導(dǎo)致后面某些需要用到的算子狀態(tài)會(huì)越來越大,這實(shí)際上就是讀取進(jìn)度的不同所導(dǎo)致的。

針對(duì)這個(gè)問題,我們提出了這樣一個(gè) Watermark Alignment 的機(jī)制,在實(shí)現(xiàn)的時(shí)候,如果是同一個(gè) Source 相對(duì)會(huì)簡單一點(diǎn),可以直接在這個(gè) Source 的 Coordinator 、或者說是 Enumerator 里面就把這個(gè)事情做了。如果跨 Source 之間要實(shí)現(xiàn)這個(gè)能力,我們是在中間引入了一個(gè)叫 CoordinatorStore 的一個(gè)組件。它能夠讓不同的 Source 之間來交換一些信息,在這里面我們需要交換的就是 Watermark 信息, Source Operator 這邊會(huì)周期性的給自己的 Coordinator 匯報(bào)當(dāng)前處理的進(jìn)度怎么樣,然后 Source Coordinator 會(huì)周期性的檢查當(dāng)前進(jìn)度的最小值,如果發(fā)現(xiàn)某些 Operator 讀的太快了,有落在后面的并發(fā)或者說落在后面的 Source ,會(huì)讓它先等一等,降低一下讀取速度,等大家都追齊之后再往前讀。這就是 Watermark Alignment 實(shí)現(xiàn)的一個(gè)細(xì)節(jié)。

02、Sink?API

介紹完 Source 之后,我們再為大家介紹一下 Sink API 。Sink API 也是經(jīng)過了很多版本的迭代,最開始我們有 OutputFormat 和 SinkFunction,同樣還要提醒大家這兩個(gè) API 在 2.0 里面是被廢棄了的。在引入 Sink 之后我們也因?yàn)槟承┬枨鬀]法滿足,所以推出了兩個(gè)版本:Sink V1 、Sink V2 ,在這里主要介紹 Sink V2。Sink API 本身的設(shè)計(jì)相對(duì)來講沒有那么復(fù)雜,它不涉及到主從結(jié)構(gòu)或者說不涉及到協(xié)調(diào)能力,Sink 本身只是一個(gè)工廠類,是來構(gòu)建整個(gè) Sink 的拓?fù)浠蛘哒f各個(gè)組件的。其中最核心的組件就是 SinkWriter ,因?yàn)?Sink 本身需要往外寫數(shù)據(jù),所以不管是什么 Sink ,SinkWriter 一定是必不可少的,它的功能就是把上游的數(shù)據(jù)進(jìn)行序列化,然后對(duì)應(yīng)的寫出到外部系統(tǒng)。如果說 Sink想要實(shí)現(xiàn) Exactly-once 或者說第二階段提交的能力,那在此基礎(chǔ)上需要提供一個(gè)可選的 SinkCommitter 的組件。它們兩個(gè)之間協(xié)調(diào)的方式就是在每個(gè) Checkpoint 的時(shí)候,SinkWriter 會(huì)生成一個(gè)叫 Committable 的特殊的消息。

一般來講數(shù)據(jù)庫可能就是一個(gè) Transaction,當(dāng) Checkpoint 觸發(fā)的時(shí)候會(huì)產(chǎn)生這樣一個(gè) Committable,留給下面的 SinkCommitter,當(dāng)所有的并發(fā)的 Checkpoint 都完成之后,我們會(huì)通過 SinkCommitter 將 Committable 提交到外部系統(tǒng)當(dāng)中去,從而實(shí)現(xiàn)這樣一個(gè)第二階段提交的過程。有了這兩個(gè)組件之后我們還是發(fā)現(xiàn)有一些需求很難滿足,比如說像 Iceberg、Hive 這些 Sink ,它可能會(huì)涉及到 Checkpoint 之后再做一些小文件合并等額外的邏輯。為了更大程度地豐富 Sink 可以適用的場景,我們在此基礎(chǔ)上又提供了三個(gè)部分,分別是 PreWrite 、 PreCommit 、PostCommit。實(shí)際上就是允許 Sink 的開發(fā)者在 SinkWriter 和 Committer 之間可以插入任意的拓?fù)溥壿嫛N铱梢栽谠诶锩娲?lián)很多的Operator也好或者說我可以給它們設(shè)計(jì)不同的并發(fā),從而實(shí)現(xiàn)我 Sink 里面的特殊功能。但其實(shí)對(duì)于絕大多數(shù)的 Sink 來講,這些功能可能用到的機(jī)會(huì)很少,但是如果你發(fā)現(xiàn)我們現(xiàn)有的 Writer和Committer 沒有辦法滿足需求的時(shí)候,那就可以直接考慮用這三個(gè)自定義組件來實(shí)現(xiàn)自己的邏輯。

類似于剛剛介紹的 SourceReaderBase, 為了簡化 Sink 的開發(fā),我們提供 Async Sink 的基類,它提供的能力是對(duì)一些通用的、異步輸出數(shù)據(jù)邏輯,通過這些場景來提供一個(gè)基本的抽象。在這里面涉及的概念比如 ElementConverter 會(huì)將我們上游的數(shù)據(jù)轉(zhuǎn)換成能夠?qū)ν獠肯到y(tǒng)進(jìn)行的真正的請求。Async Sink 本身會(huì)提供攢批的能力,用戶可以通過設(shè)置攢批的條件比如當(dāng)數(shù)據(jù)達(dá)到一定的大小,然后攢批的時(shí)間是多少之后,然后將這一批請求的批量提交到外部系統(tǒng)當(dāng)中去。同樣這里面也提供了內(nèi)置的異常重試邏輯,如果是某次提交失敗了,那么在下一次提交的時(shí)候再一次嘗試把這些數(shù)據(jù)進(jìn)行重試提交。基于這些邏輯我們可以看到實(shí)際上它只是實(shí)現(xiàn)了一個(gè) at-least-once 語義的。但實(shí)際上我們生產(chǎn)當(dāng)中,絕大部分的 Sink 都是 at-least-once ,因?yàn)閷?shí)現(xiàn)一個(gè) Exactly-once 的成本會(huì)很大,有一些 Sink 會(huì)覺得費(fèi)了半天勁實(shí)現(xiàn)一個(gè) Exactly-once 的 Sink 但是真正應(yīng)用的人很少,那不如退而求其次,能夠把 at-least-once 語義做到極致就可以了。如果你的 Sink 只需要實(shí)現(xiàn) at-least-once 語義,不妨嘗試 Async Sink ,能夠大大降低大家的開發(fā)難度。

03、集成至Table?/?SQL?API

介紹完下面兩層之后,我們再來說一下怎么把 Source Sink 集成至 Table / SQL API 上面去。

Table SQL API 對(duì)于 connector 提供的接口主要是一個(gè)層次關(guān)系, Source 最基礎(chǔ)的接口叫做 DynamicTableSource,它下面有兩種集成:ScanTableSource 和 LookupTableSource 。Scan 顧名思義就是對(duì)原表的掃描, Lookup 就是我們常說的對(duì)維表的典查的邏輯。這里我列了一個(gè)樣例的kafka,那 ScanTableSource 可能就是我從 Kafka 中讀取數(shù)據(jù),再讀取完之后通過 Redis 維表提供的LookupTableSource 去 Redis 上面進(jìn)行點(diǎn)查,最終會(huì)寫到 Sink 當(dāng)中。比如說我們是 Hive ,會(huì)把這個(gè)結(jié)果通過這個(gè) Sink 對(duì)應(yīng)的接口叫做 DynamicTableSource 來寫到外部系統(tǒng)當(dāng)中去。實(shí)際上這三個(gè)接口都是對(duì) Source 和 Sink 或者說我們下面會(huì)介紹的 LookupFunction 的一個(gè)工廠或者說是一個(gè)構(gòu)造器,那真正在下面干活的,就是我們剛剛說的 DataStream API 、Source 和 Sink 。

那么我們先看 ScanTableSource 的接口長什么樣。很簡單也很好理解,它有兩個(gè)方法,第一個(gè)叫 getChangelogMode,因?yàn)槲覀僃link Source整體是支持三種數(shù)據(jù)類型的,比如說像 INSERT / UPDATE / DELETE ,如果你的 Source 是一個(gè)三種能力都支持的 Source ,比如說我是一個(gè)滿 MySQL CDC Source ,我能夠讀取原表里的插入、更新和刪除等等,我就需要在 ChangelogMode 里面指定我是支持三個(gè)能力的。如果你是一個(gè)消息隊(duì)列的 Source ,比如說 Kafka ,那它就只支持一種 INSERT ,那這里面返回一個(gè)支持 INSERT 就可以了。這個(gè)方法會(huì)被 Planner 拿去用來對(duì)一些下游算子的校驗(yàn)等等,包括你的一些邏輯,整體寫出來之后看 INSERT / UPDATE / DELETE 下游的算子能不能接受。
第二個(gè)方法就是我們怎么真正去從 Table 構(gòu)建出來 Source 在底層運(yùn)行的 Source API 的構(gòu)造器,從 Context 里面我們是可以拿到用戶在 Source 在 CREATE TABLE 語句里面的所有配置的。我會(huì)根據(jù)這些配置,創(chuàng)建出對(duì)應(yīng)的 Source Provider ,把最終運(yùn)行在里面的 Source 構(gòu)建出來。

再介紹一下 LookupTableSource ,這個(gè)剛剛說是實(shí)現(xiàn)了點(diǎn)查的邏輯,但是在 DataStream API 我們沒有提供統(tǒng)一的抽象的接口,就是能夠提供這樣一個(gè)典查的能力,那么在這里面我們在 Table 階段用的是叫 LookupFunction 。
它有兩個(gè)版本,一個(gè)是 LookupFunction ,一個(gè)是 AsyncLookupFunction ,分別對(duì)應(yīng)的是同步的實(shí)現(xiàn)和異步的實(shí)現(xiàn)。我們在 1.16 版本里面也為維表提供了一些輔助的能力,比如說維表的 Cache 能夠快速地幫你構(gòu)建出來一個(gè) LookupFunction 這樣一些工具類,這樣大家在實(shí)現(xiàn)維表的時(shí)候也可以去考慮使用它。LookupTableSource 的接口比 Source 的還要簡單,它不需要提供 ChangelogMode,因?yàn)樗墓ぷ骶褪墙右粭l數(shù)據(jù)然后將對(duì)應(yīng)的字段到外部系統(tǒng)查一下就可以了。唯一提供的就是 LookupRuntimeProvider 怎么根據(jù)用戶的的配置來構(gòu)建出來在 Runtime 當(dāng)中使用的 LookupFunction 。

然后是 Sink ,Sink 和 Source 相似,兩個(gè)接口我是不是支持寫出 INSERT / UPDATE / DELETE ,給 Planner 做校驗(yàn),下面就是我們怎么根據(jù)用戶的配置構(gòu)建出來 Sink ,這個(gè)跟 Source 基本上是完全對(duì)稱的。

除此之外我們有一些 Source 是支持高級(jí)輔助能力的,比如說我可以提供給 Planner 這個(gè)信息, Planner 可以把 Projection Pushdown 到這個(gè) Source 里面,把 Filter Pushdown 到另一個(gè) Source 里面,我們在這里面統(tǒng)一地去定義了一些接口比如說 Supports 后面是能力的名字。比如說我們有一個(gè)很厲害的 Table Source ,那它是支持 FilterPushdown 和 ProjectionPushdown 的。很簡單,我們只需要在這個(gè)類上面去實(shí)現(xiàn)這兩個(gè)接口就可以了,根據(jù)這些接口里面提供的方法來提供對(duì)應(yīng)的信息,比如怎么把真正的 Pushdown 推到 Source 里面。大家可以在代碼里面查看這些所有支持的能力,選擇自己能夠支持的然后進(jìn)行對(duì)應(yīng)的實(shí)現(xiàn)。

04、Catalog?API

在介紹完 Table 層后就是最后一部分 Catalog API

在這里我舉個(gè)簡單的例子,寫過 SQL 的人都知道,很頭疼的一件事就是去寫 CREATE TABLE ,我們需要給每一個(gè)字段去定義它的類型和名字,比如說我在讀取它的上游表,這個(gè)表里面定義的字段上百個(gè),這是非常常見的情況,需要一一把它映射到 Flink 的數(shù)據(jù)類型里面,然后把它羅列在列定義里。除此之外,我們還需要為 Table 寫 With 參數(shù),指定 Connector ,配置這個(gè) Connector。某些 Connector 的配置非常復(fù)雜,比如說連接一個(gè)開啟了 SSL 的 Kafka 集群,可能需要寫很多的參數(shù)才可以把這個(gè) Table 創(chuàng)建出來,這就是我們遇到的第一個(gè)問題:CREATE TABLE 語句太冗長了。

第二個(gè)問題是配置問題很難復(fù)用,比如說我今天為這個(gè)集群配置了這個(gè)表,寫了一堆參數(shù),明天我還需要用這個(gè)集群,另一個(gè)表又寫了一遍參數(shù),這個(gè)感覺就很冗余。另外還有一個(gè)問題就是剛剛提到的我要給每個(gè)字段處理它對(duì)應(yīng)的類型映射,這個(gè)也很麻煩。

Catalog 的誕生就是為了解決這個(gè)問題。Catalog 實(shí)際上是一個(gè)能夠提供外部系統(tǒng)原信息的一個(gè)組件,我們在 Catalog 這個(gè) API 里面是提供了一個(gè)統(tǒng)一的抽象,和 Flink、Source里面的概念相對(duì)應(yīng)。比如說像 Database 、 Table 、 Partition 、 View 、Function 提供這些統(tǒng)一的抽象概念,我們在開發(fā) Catalog 的時(shí)候只需要把外部系統(tǒng)對(duì)應(yīng)的概念和它們進(jìn)行一一映射就可以了。這個(gè)也不是必須的,比如說我外部系統(tǒng)沒有 Partition 、Function 這些的話就可以不實(shí)現(xiàn)它。Catalog 的其他能力還有能夠?qū)υ畔⑦M(jìn)行一個(gè)持久化的存儲(chǔ),對(duì)于 Hive 的話我們可以對(duì)接到 Hive 的 Catalog 里面,把一些表的信息存儲(chǔ)到里面進(jìn)行一個(gè)持久化,方便后面進(jìn)行復(fù)用。

除此之外 Catalog 提供了一個(gè)統(tǒng)一的 API 能夠?qū)ν獠肯到y(tǒng)進(jìn)行一個(gè)統(tǒng)一的管理。當(dāng)我們提供了 catalog 之后就可以大幅簡化用戶的配置成本。舉個(gè)例子,外部系統(tǒng)各種各樣,可能有各種各樣的數(shù)據(jù)庫類型,它們對(duì)自己管理數(shù)據(jù)庫的概念又不一樣,可能有的有 Schema、有的沒有 Schema,有的叫 Database 、有的叫 Namespace,經(jīng)過我們統(tǒng)一的 Catalog API 的這層翻譯之后,能夠把它們對(duì)應(yīng)的概念一一映射到 Flink 里面的概念當(dāng)中,用戶在使用的時(shí)候接觸到的就只有一種概念,我們在 Flink 里面定義的這些頂層邏輯,直接通過 Catalog 里面選擇這個(gè)表就可以把這個(gè)表的數(shù)據(jù)拉出來了。

在這里舉個(gè)例子,大家可能對(duì) MySQL 這些數(shù)據(jù)庫可能都比較好映射,它里面有 Database 、 Table 。那不典型的也不是這種結(jié)構(gòu)化存儲(chǔ)的比如說 kafka ,它能不能支持 Catalog 呢?當(dāng)然是可以的。在這里面我們把一個(gè) Kafka Catalog 映射到一個(gè) Kafka 集群上面,一個(gè) Table 對(duì) kafka 來說就是一個(gè) Topic ,在這里面 kafka 可能沒有那么多層級(jí)的概念,可能沒有一個(gè) Database,那我就不映射 Database,給一個(gè)默認(rèn)的就可以了,在實(shí)現(xiàn) Kafka Cluster 的時(shí)候可以讓用戶配置這個(gè) Topic 里面讀取的數(shù)據(jù)類型。在這里面我舉個(gè)例子,Kafka 里面存儲(chǔ)的是一個(gè) JSON 類型,那 Catalog 本身就可以對(duì)每一種字段的類型根據(jù)它 JOSN 的內(nèi)容進(jìn)行一個(gè)推導(dǎo),把每一條數(shù)據(jù)映射到表里面的每一個(gè)行,這樣就完成了對(duì) Kafka 統(tǒng)一的抽象,對(duì)終端用戶來講如果使用這個(gè) Kafka catalog ,就沒有必要反復(fù)去配置這個(gè) Kafka 集群的一些信息,想要哪一個(gè) Topic 的數(shù)據(jù),一個(gè) SELECT 語句就可以直接拿出來,這樣能夠大大降低用戶的使用門檻。

有了 Catalog 之后,基于 Catalog 可以做一些更豐富的事情,比如血緣信息管理。在 1.18 和 1.19(還未發(fā)布)兩個(gè)版本當(dāng)中 Flink 也會(huì)對(duì)血緣信息做一些支持。現(xiàn)在已經(jīng)實(shí)現(xiàn)的部分 FLIP-294、Catalog Modification Listener ,也就是我們可以在 Catalog 上面注冊一個(gè)監(jiān)聽器,如果 Catalog 有任何的變更,比如說加表、刪表,這些信息都會(huì)通過 Listener 匯報(bào)到對(duì)應(yīng)的外部組件里面。在血緣信息管理當(dāng)中,它匯報(bào)的對(duì)象就是一個(gè) MetadataPlatform ,比如說像 Atlas、Datahub 這些原信息管理系統(tǒng),相對(duì)應(yīng)的如果有建表我會(huì)在原信息管理平臺(tái)上面創(chuàng)建對(duì)應(yīng)的數(shù)據(jù)節(jié)點(diǎn),刪表之后會(huì)將其進(jìn)行移除。

在未來 1.19 版本里面我們預(yù)計(jì)要實(shí)現(xiàn)的就是這個(gè)對(duì)作業(yè)血緣的監(jiān)聽,剛剛我們通過對(duì) Katalog 的監(jiān)聽是對(duì) MetadataPlatform 的一些數(shù)據(jù)節(jié)點(diǎn)進(jìn)行一個(gè)創(chuàng)建,怎么把點(diǎn)之間的線連接起來呢?通過 FLIP-314 之間定義的一些接口它會(huì)對(duì)作業(yè)的啟動(dòng)、停止、暫停等基礎(chǔ)的信息進(jìn)行監(jiān)聽。如果一個(gè)作業(yè)啟動(dòng)之后,可以通過拿到它的 Source 和 Sink 把兩個(gè)數(shù)據(jù)節(jié)點(diǎn)之間的線連接起來,這樣就能夠完整的獲得這個(gè) Flink 集群上面運(yùn)行的數(shù)據(jù)血緣或者說節(jié)點(diǎn)之間的一個(gè)上下游邏輯,方便用戶對(duì)自己的數(shù)據(jù)流向有更充分的管理。

再回到這個(gè)圖,剛剛的介紹順序是由底向上的,當(dāng)大家想要實(shí)現(xiàn)自己的 Connector 時(shí),首先需要考慮要接觸到的一定是這個(gè) Source 和 Sink ,它們是在 DataStream 層上面的實(shí)現(xiàn),也是上面這些 API 的基石,想實(shí)現(xiàn)一個(gè) Connector 一定要實(shí)現(xiàn)這兩個(gè)接口。如果說 Connector 想支持 SQL、還有 Table 豐富的生態(tài),我們需要在它的基礎(chǔ)上實(shí)現(xiàn) DynamicTableSource 和 DynamicTableSink , 它們可以理解成下面 Source 和 Sink 的構(gòu)造器。如果想進(jìn)一步簡化用戶使用 Connector 的成本,不要每次都寫一堆很冗長的 Table ,我們對(duì)它進(jìn)行一個(gè)復(fù)用,然后就可以去對(duì)接到 Catalog API 上面,把自己外部系統(tǒng)的一些概念抽象到 Flink 上面去,這樣用戶可以直接從你的外部系統(tǒng) Catalog 里面 Select 數(shù)據(jù)出來, 不需要反復(fù)的去定義字段、定義配置等等,能夠降低用戶的使用門檻。提供了 Catalog 之后我們就能夠天生的獲得一些血緣管理或者說原信息管理的能力。

本文章轉(zhuǎn)載微信公眾號(hào)@Apache Flink

上一篇:

用php的post方法接入百度翻譯API(免費(fèi)版)

下一篇:

開源機(jī)器學(xué)習(xí)框架:Scikit-learn API簡介
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊

多API并行試用

數(shù)據(jù)驅(qū)動(dòng)選型,提升決策效率

查看全部API→
??

熱門場景實(shí)測,選對(duì)API

#AI文本生成大模型API

對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)

#AI深度推理大模型API

對(duì)比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)