安全的關(guān)鍵.png)
GraphQL API滲透測(cè)試指南
Flink ML API 指的是提供給用戶使用算法的接口。通過(guò)把所有算法打包為統(tǒng)一的 API 提供給用戶,讓所有使用者的體驗(yàn)保持一致,也能降低學(xué)習(xí)和理解算法的成本,此外算法之間也可以更好地交互和兼容。
舉個(gè)例子,在 Flink ML API 中提供一些基礎(chǔ)的功能類,通過(guò)使用這些功能類可以把不同算子連接組合成一個(gè)高級(jí)的算子,可以大大提高了算法的開(kāi)發(fā)效率。同時(shí),通過(guò)使用統(tǒng)一的 Table API,讓所有的數(shù)據(jù)都以 Table 格式進(jìn)行傳輸,可以使得不同公司開(kāi)發(fā)的算法能夠互相兼容,降低不同公司重復(fù)開(kāi)發(fā)的算子的成本,提升算法合作的效率。
之前版本的 Flink ML API 還是存在不少痛點(diǎn)。
首先是表達(dá)能力方面。之前的 API 的輸入只支持單個(gè) Table 的形式,無(wú)法表達(dá)一些常見(jiàn)的算法邏輯。比如有些訓(xùn)練算法的輸入表達(dá)是一張圖,把數(shù)據(jù)通過(guò)不同的 Table 傳進(jìn)來(lái),這種情況下單個(gè) Table 輸入的接口就不適用了。再比如有些數(shù)據(jù)預(yù)處理的邏輯需要將多個(gè)輸入得到的數(shù)據(jù)進(jìn)行融合,用單個(gè) Table 輸入的 API 也不適合。因此我們計(jì)劃把算法接口擴(kuò)展為支持多輸入多輸出。
其次是實(shí)時(shí)訓(xùn)練方面。之前的 API 無(wú)法原生支持實(shí)時(shí)機(jī)器學(xué)習(xí)場(chǎng)景。在實(shí)時(shí)機(jī)器學(xué)習(xí)中,我們希望訓(xùn)練算法可以實(shí)時(shí)產(chǎn)生模型數(shù)據(jù),并將模型數(shù)據(jù)以流的方式實(shí)時(shí)傳輸?shù)蕉鄠€(gè)前端服務(wù)器中。但是現(xiàn)有的接口只有一次性的訓(xùn)練和一次性的推理 API,無(wú)法表達(dá)這種邏輯。
最后是易用性方面。之前采用 toJson() 和 fromJson() 來(lái)導(dǎo)出和加載模型數(shù)據(jù),并且允許用戶儲(chǔ)存這些數(shù)據(jù)。但是有些模型的數(shù)據(jù)量高達(dá)幾個(gè) G,在這種情況下將模型數(shù)據(jù)以 string 的方式進(jìn)行儲(chǔ)存,效率會(huì)非常低,甚至可能無(wú)法實(shí)現(xiàn)。當(dāng)然,存在一些 hacky 方法,可以把模型數(shù)據(jù)儲(chǔ)存到一個(gè)遠(yuǎn)程終端,再把相關(guān)的 url 通過(guò) toJson() 方法傳導(dǎo)出來(lái)。但是這種情況下會(huì)存在易用性的問(wèn)題,算法使用者需要自己去解析 URL,并從遠(yuǎn)程獲取這些數(shù)據(jù)。
受限于以上幾個(gè)方面的因素,我們決定對(duì) Flink ML API 進(jìn)行擴(kuò)展。
在經(jīng)過(guò)大量討論以及思考之后,我們對(duì)新的 API 賦予了以下特性,解決了上面所有問(wèn)題。
上圖是最新的 API 構(gòu)架圖。最上層有一個(gè) WithParams interface,它提供了存取參數(shù)的 API。我們對(duì)這個(gè)接口做了改進(jìn),用戶不再需要表達(dá) isOptional 之類的 field。這個(gè)接口之下是一個(gè) stage 接口,它包含了所有算法模塊,并提供了存取模塊的 API,即 save() 和 load()。save() 負(fù)責(zé)把模型數(shù)據(jù)和參數(shù)儲(chǔ)存下來(lái),load() 負(fù)責(zé)把模型數(shù)據(jù)和參數(shù)讀取出來(lái),還原原先的 stage 實(shí)例。用戶不用考慮參數(shù)存取的復(fù)雜度。
Stage 下分為兩塊,一塊是表達(dá)訓(xùn)練邏輯的 Estimator,另一塊是表達(dá)推理邏輯的 AlgoOperator 和 Model 類。Estimator 的核心 API 是 fit()。與之前不同的是現(xiàn)在支持多個(gè) Table 的輸入,可以用來(lái)表達(dá)需要多個(gè) Table 輸入邏輯,比如特征的拼接。Estimator::fit() 輸出的是一個(gè) Model。Model 屬于 AlgoOperator。AlgoOperator 表達(dá)的是計(jì)算邏輯,支持多個(gè) Table 作為輸入和輸出,每個(gè) Table 都是一個(gè)數(shù)據(jù)源,可以用來(lái)表達(dá)通用的計(jì)算邏輯。
AlgoOperator 之下是 Transformer,可以表達(dá)對(duì)數(shù)據(jù)做轉(zhuǎn)換的邏輯。它與 AlgoOperator 具有相同的 API 格式,但是它們的抽象概念卻有所不同。Transformer 是一個(gè)具有模型語(yǔ)義的數(shù)據(jù)轉(zhuǎn)換邏輯。在計(jì)算中,比如數(shù)據(jù)預(yù)處理,存在一些更通用的將不同數(shù)據(jù)進(jìn)行拼接轉(zhuǎn)換的操作,例如對(duì)數(shù)據(jù)進(jìn)行過(guò)濾,在通用的概念下可能并不適用于 Transformer。因此我們特意增加了 AlgoOperator 類,方便用戶的理解和使用。
Transformer 之下是 model 類。我們?cè)黾恿?setModelData() 和 getModelData() API。這兩個(gè) API 是為實(shí)時(shí)機(jī)器學(xué)習(xí)專門設(shè)計(jì)的,可以讓用戶把模型數(shù)據(jù)實(shí)時(shí)導(dǎo)出到多個(gè)遠(yuǎn)程終端做在線的推理。
上圖是一個(gè)比較簡(jiǎn)化但經(jīng)典的實(shí)時(shí)機(jī)器學(xué)習(xí)場(chǎng)景。
這里的數(shù)據(jù)來(lái)源主要有兩個(gè),靜態(tài)數(shù)據(jù)來(lái)自于 HDFS,動(dòng)態(tài)數(shù)據(jù)來(lái)自于 Kafka。由 AlgoOperator 讀取來(lái)自以上兩個(gè)數(shù)據(jù)源的數(shù)據(jù),將它們拼接之后形成一個(gè) Table 輸入到 Estimator 邏輯。Estimator 讀取剛才拼接得到的數(shù)據(jù)并產(chǎn)生一個(gè) Model,然后可以通過(guò) getModelData() 拿到代表模型數(shù)據(jù)的 Table。再通過(guò) sink() API 將這些數(shù)據(jù)傳輸?shù)?Kafka topic。最后在多個(gè)前端服務(wù)器上面運(yùn)行程序,這些程序可以直接創(chuàng)建一個(gè) Model 實(shí)例,從 Kafka 中讀出模型數(shù)據(jù)形成一個(gè) Table,再通過(guò) setModelData() 把這些數(shù)據(jù)傳遞給 Model,使用得到的 Model 做在線推理。
在支持在線訓(xùn)練和在線推理之后,我們進(jìn)一步提供了一些基礎(chǔ)組件,方便用戶通過(guò)簡(jiǎn)單的算子構(gòu)建更復(fù)雜算子,這個(gè)組件便是 FLIP-175 提供的 GraphBuilder。
假設(shè)用戶的輸入也是與上文一致的兩個(gè)數(shù)據(jù)源,最終輸出到一個(gè)數(shù)據(jù)源。用戶的核心計(jì)算邏輯可以分為兩塊,第一塊是數(shù)據(jù)預(yù)處理,比如特征拼接,把兩個(gè)數(shù)據(jù)源的數(shù)據(jù)讀進(jìn)來(lái)之后做整合,以 Table 的形式輸出到 Estimator,執(zhí)行第二塊的訓(xùn)練邏輯。我們希望先執(zhí)行訓(xùn)練算子,得到一個(gè) Model。然后將預(yù)處理算子和 Model 連接,表達(dá)在線推理邏輯。
用戶需要做的只是通過(guò) GraphBuilder API 將上述步驟連接進(jìn)行,不需要專門為在線推理邏輯再寫(xiě)一遍連接邏輯。GraphBuilder 會(huì)自動(dòng)從前面一個(gè)圖生成,并與后面圖中的算子形成一一對(duì)應(yīng)的關(guān)系。AlgoOperator 在訓(xùn)練圖中的形式是直接轉(zhuǎn)換為推理圖中的算子,而 Estimator 在訓(xùn)練圖中得到的 Model 會(huì)成為推理圖中對(duì)應(yīng)的節(jié)點(diǎn),通過(guò)將這些節(jié)點(diǎn)相連,便得到了最后的 ModelA,最終用作在線推理。
Flink 是一個(gè)基于 Dag 描述執(zhí)行邏輯的流批一體的處理引擎,但是在許多場(chǎng)景下,尤其是機(jī)器學(xué)習(xí)\圖計(jì)算類型的應(yīng)用中,用戶還需要數(shù)據(jù)迭代處理的能力。例如,一些算法的離線訓(xùn)練、在線訓(xùn)練以及模型部署后根據(jù)結(jié)果動(dòng)態(tài)調(diào)整模型參數(shù)的場(chǎng)景,都需要數(shù)據(jù)迭代處理。
由于實(shí)際的場(chǎng)景同時(shí)會(huì)涵蓋離線和在線處理的案例,因此需要在迭代這一層能夠同時(shí)支持離線和在線處理。
前文提到的三個(gè)場(chǎng)景的處理邏輯既存在區(qū)別,也存在共性。
對(duì)于離線訓(xùn)練,以邏輯回歸為例,在迭代中可以使用一個(gè)節(jié)點(diǎn)來(lái)緩存整個(gè)模型,這個(gè)節(jié)點(diǎn)會(huì)將最新的模型發(fā)送給訓(xùn)練節(jié)點(diǎn),而訓(xùn)練節(jié)點(diǎn)會(huì)在迭代開(kāi)始前預(yù)先讀取整個(gè)數(shù)據(jù)集,隨后在每次收到最新的模型后,從數(shù)據(jù)集中選擇一個(gè)mini-batch數(shù)據(jù)對(duì)模型進(jìn)行更新,并把結(jié)果發(fā)送回模型緩存節(jié)點(diǎn)。
對(duì)于在線計(jì)算,由于訓(xùn)練數(shù)據(jù)是從外部源源不斷到達(dá)的,無(wú)法預(yù)先讀取所有訓(xùn)練數(shù)據(jù),一般的做法是動(dòng)態(tài)讀取一個(gè) mini-batch 的數(shù)據(jù),計(jì)算完模型的更新后將其發(fā)送給模型緩存的節(jié)點(diǎn),等模型的緩存節(jié)點(diǎn)進(jìn)一步發(fā)送更新后的模型以后,再讀取下一個(gè) mini-batch 數(shù)據(jù),這也就要求訓(xùn)練節(jié)點(diǎn)必須采用優(yōu)先級(jí)讀的方式對(duì)數(shù)據(jù)進(jìn)行讀取,從而最終實(shí)現(xiàn)逐個(gè)處理 mini-batch 的能力。
在這兩種場(chǎng)景下的訓(xùn)練都存在同步和異步方式,具體取決于模型緩存節(jié)點(diǎn)是否要收集到所有更新后再開(kāi)始下一輪訓(xùn)練。此外還存在一些模型,在預(yù)測(cè)的時(shí)候會(huì)對(duì)參數(shù)進(jìn)行動(dòng)態(tài)的更新,處理完每一條數(shù)據(jù)之后都要立刻評(píng)估是否會(huì)進(jìn)行參數(shù)更新,如果需要就再發(fā)起更新。
這幾種場(chǎng)景下的計(jì)算邏輯存在一定的共性,首先都需要在作業(yè)圖中引入迭代的結(jié)構(gòu)來(lái)支持?jǐn)?shù)據(jù)的循環(huán)處理,并且在數(shù)據(jù)循環(huán)處理之后需要進(jìn)行是否終止迭代的判斷。另一方面,計(jì)算過(guò)程中還需要在每一輪數(shù)據(jù)接收完成后,接收到相應(yīng)的通知,觸發(fā)特定的計(jì)算,比如離線訓(xùn)練中,接觸完整個(gè)模型后就要開(kāi)始下一輪的計(jì)算。
這里其實(shí)存在兩個(gè)選擇,一個(gè)是在迭代這一層,直接提供將數(shù)據(jù)集劃分為多個(gè) mini-batch,并且對(duì)每個(gè) mini-batch 賦予迭代的能力。它在邏輯上可以接受所有類型的迭代,但是如果想要同時(shí)支持逐個(gè) mini-batch 處理與多個(gè) mini-batch 并行處理的邏輯,就必須引入一套新的基于 mini-batch 的流處理接口,并且在實(shí)踐層支持這兩種處理邏輯。
另外在線訓(xùn)練和離線訓(xùn)練的 mini-batch 產(chǎn)生的方式也不一樣,離線 mini-batch 是通過(guò)本地預(yù)先讀取所有數(shù)據(jù),然后在每一輪處理中進(jìn)行選取來(lái)實(shí)現(xiàn)。而在線 mini-batch 通過(guò)從外部數(shù)據(jù)中讀取特定數(shù)據(jù)量的數(shù)據(jù)來(lái)實(shí)現(xiàn)的。因此如果想要兼容這兩種情況,會(huì)進(jìn)一步增加接口和實(shí)現(xiàn)的復(fù)雜度。
最后如果要兼容單個(gè) per-record 的處理,還必須引入無(wú)限大的 mini-batch,或?qū)⒚織l記錄看作一個(gè)單獨(dú)的 mini-batch,前者會(huì)進(jìn)一步增加接口的復(fù)雜度,而后者需要每一記錄對(duì)算子進(jìn)行一次通知,會(huì)增加運(yùn)行時(shí)的開(kāi)銷。
考慮到上述情況,我們?cè)诘兄惶峁┝苏麄€(gè)數(shù)據(jù)集級(jí)別的通知,而將劃分 mini-batch 的功能放在了迭代之上。在離線訓(xùn)練中,用戶可以通過(guò)從整個(gè)數(shù)據(jù)集中選取一部分?jǐn)?shù)據(jù)來(lái)高效實(shí)現(xiàn) mini-batch 選擇的功能。而在在線計(jì)算中,用戶可以通過(guò) Flink 自帶的運(yùn)行集輸入算子,實(shí)現(xiàn)逐個(gè) mini-batch 處理的功能。
基于上述思路,整個(gè)迭代的設(shè)計(jì)如上圖所示,主要由 4 部分組成。初始模型這類有回邊的輸入、數(shù)據(jù)集這類無(wú)回邊的只讀輸入、迭代回邊的位置以及最終輸出。其中回邊對(duì)應(yīng)的數(shù)據(jù)集與有回邊的輸入是一一對(duì)應(yīng)的,從回邊返回的數(shù)據(jù)與初始數(shù)據(jù)進(jìn)行 union 之后,實(shí)現(xiàn)數(shù)據(jù)的迭代處理。
迭代內(nèi)部為用戶提供了數(shù)據(jù)集處理完成的通知功能,即進(jìn)度追蹤的能力。用戶基于這一能力可以實(shí)現(xiàn)處理完成數(shù)據(jù)集的某一輪之后執(zhí)行特定操作。比如在離線訓(xùn)練的時(shí)候,用戶可以在某個(gè)算子收到模型的更新數(shù)據(jù)之后,計(jì)算模型的下一輪更新。
此外對(duì)于沒(méi)有回邊的輸入數(shù)據(jù),允許用戶指定每一輪是否進(jìn)行重放。對(duì)算子也提供了每輪新建一個(gè)算子以及通過(guò)一個(gè)算子的實(shí)例處理所有輪次數(shù)據(jù)的能力。通過(guò)這種方式,用戶無(wú)須重建算子實(shí)例就能實(shí)現(xiàn)在輪次之間緩存數(shù)據(jù)的能力。用戶也可以通過(guò)回放輸入數(shù)據(jù)并重建算子來(lái)復(fù)用迭代外的算子,比如 Reduce、Join 這種常用算子,輸入數(shù)據(jù)進(jìn)行重放并且算子會(huì)在某一輪進(jìn)行重建,這種情況下用戶可以直接復(fù)用這些迭代外的算子。
同時(shí),我們提供了兩種終止判斷邏輯,一種是當(dāng)整個(gè)迭代中已經(jīng)沒(méi)有數(shù)據(jù)在處理的時(shí)候,會(huì)自然終止迭代。另外一種是在有限流的情況下,也允許用戶指定特定數(shù)據(jù)集,如果這一數(shù)據(jù)集在某輪沒(méi)有數(shù)據(jù)產(chǎn)生,用戶可以提前終止整個(gè)迭代。
DataStream<double[]> initParameters = …
DataStream<Tuple2<double[], Double>> dataset = …
DataStreamList resultStreams =
Iterations.iterate(
DataStreamList.of(initParameters),
ReplayableDataStreamList.notReplay(dataset),
IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
(variableStreams, dataStreams) -> {
DataStream<double[]> modelUpdate = variableStreams.get(0);
DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);
DataStream<double[]> newModelUpdate = …
DataStream<double[]> modelOutput = …
return new IterationBodyResult(
DataStreamList.of(newModelUpdate),
DataStreamList.of(modelOutput)
});
DataStream<double[]> finalModel = resultStreams.get("final_model");
上圖是使用迭代 API 來(lái)構(gòu)建迭代的例子。用戶需要指定有回邊和無(wú)回邊的輸入列表、算子是否需要每輪重建以及迭代體的計(jì)算邏輯的等。對(duì)于迭代體,用戶需要返回回邊對(duì)應(yīng)的數(shù)據(jù)集以及迭代的最終輸出。
public static class ModelCacheFunction extends ProcessFunction<double[], double[]>
implements IterationListener<double[]> {
private final double[] parameters = new double[N_DIM];
public void processElement(double[] update, Context ctx, Collector<O> output) {
// Suppose we have a util to add the second array to the first.
ArrayUtils.addWith(parameters, update);
}
void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector) {
if (epochWatermark < N_EPOCH * N_BATCH_PER_EPOCH) {
collector.collect(parameters);
}
}
public void onIterationEnd(int[] round, Context context) {
context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
}
}
對(duì)于迭代內(nèi)的算子,如果它實(shí)現(xiàn)了 IterationListener 接口,就會(huì)在所有數(shù)據(jù)集處理完某一輪之后,通知迭代的算子。如果整個(gè)迭代都處理終止則會(huì)通過(guò) onIterationTerminated 對(duì)算子進(jìn)行進(jìn)一步通知,用戶可以在這兩個(gè)回調(diào)中實(shí)現(xiàn)需要的計(jì)算邏輯。
在迭代的實(shí)現(xiàn)中,對(duì)于用戶通過(guò)代碼來(lái)創(chuàng)建的迭代處理結(jié)構(gòu),會(huì)增加一部分迭代內(nèi)部的節(jié)點(diǎn),并對(duì)用戶所有的處理節(jié)點(diǎn)進(jìn)行 wrap 操作,從而達(dá)到管理算子生命周期并對(duì)數(shù)據(jù)類型進(jìn)行轉(zhuǎn)換的目的。最后, 迭代基于 Colocation 與本地內(nèi)存實(shí)現(xiàn)了回邊,這樣在調(diào)度器看來(lái)整個(gè)作業(yè)邏輯仍然是一個(gè) DAG,從而可以直接復(fù)用現(xiàn)在的調(diào)度邏輯。
在迭代中插入的專用算子主要包括 input、output、head 與 tail 算子,其中 input 和 output 負(fù)責(zé)數(shù)據(jù)類型的轉(zhuǎn)換,外部數(shù)據(jù)進(jìn)入迭代內(nèi)時(shí)會(huì)為每一條記錄增加一個(gè)迭代頭,里面記錄了該 record 處理的輪次,每個(gè)算子的 wrap 會(huì)將迭代頭去掉后交給用戶原始的算子處理。
head 和 tail 算子負(fù)責(zé)實(shí)現(xiàn)回邊及計(jì)算某一輪迭代是否已經(jīng)全部處理完成。head 算子從 input 讀取完整個(gè)輸入,并在最后插入一條特殊的 EpochWatermark 事件,來(lái)標(biāo)記第零輪迭代的終止。由于 head 算子可能會(huì)存在多個(gè)并發(fā),所以必須等 head 算子的所有并發(fā)都讀取完輸入后,才能發(fā)送第 0 輪終止的事件。
head 算子通過(guò) Operator Coordinator 來(lái)同步所有并發(fā)。Operator Coordinator 是一個(gè)位于 JobManager 中的全局組件,它可以與所有 head task 進(jìn)行雙向通信,所有 head 算子并發(fā)都收到每一輪處理完成的通知后,就會(huì)發(fā)送全局廣播給所有 head task,告訴他們這一輪的處理已經(jīng)全部結(jié)束。head 發(fā)送 EpochWaterMark 之后,所有迭代中的算子都會(huì)與這一消息進(jìn)行對(duì)齊。算子從所有輸入中都讀取到特定輪次的 EpochWatermark 之后,就會(huì)認(rèn)為這一輪處理完成,并調(diào)用這一輪結(jié)束的回調(diào)。當(dāng) tail 節(jié)點(diǎn)收到某一輪數(shù)據(jù)或 EpochWatermark 之后,會(huì)將輪次加 1,然后通過(guò)回邊再次發(fā)送給 head,從而實(shí)現(xiàn)數(shù)據(jù)循環(huán)處理。
最后我們也支持了有迭代情況下的 checkpoint 功能。由于 Flink 默認(rèn)的 checkpoint 機(jī)制無(wú)法支持帶環(huán)的計(jì)算圖,因此我們對(duì) Flink 的 checkpoint 機(jī)制進(jìn)行了擴(kuò)展,實(shí)現(xiàn)了帶環(huán)的 Chandy-Lamport 算法,會(huì)同時(shí)緩存來(lái)自回邊的消息。另外 head 算子在對(duì)齊的時(shí)候,除了要讀取正常輸入的 barrier 之外,也會(huì)等待來(lái)自 Operator Coordinator 的特殊的 barrier。每一輪全局結(jié)束的消息也是來(lái)自 Operator Coordinator 廣播,可以保證每個(gè) checkpoint 中所有迭代內(nèi)的算子都處在同一輪,從而簡(jiǎn)化算子后續(xù)進(jìn)行并發(fā)修改的操作。
另外還有一個(gè)開(kāi)發(fā)中的優(yōu)化,Operator Coordinator 會(huì)將收到的 barrier 延遲到下一個(gè)全局對(duì)齊消息之后,再發(fā)送通知,從而使得整個(gè)迭代內(nèi)的算子的 state 都是恰好處于讀取完某一輪數(shù)據(jù)之后。許多同步算法都是先將緩存收到的數(shù)據(jù)存儲(chǔ)在算子中,直到讀取完一輪所有數(shù)據(jù)之后再進(jìn)行統(tǒng)一處理。這一優(yōu)化可以保證在進(jìn)行 snapshot 操作的時(shí)候,正好清空所有緩存,從而使整個(gè) checkpoint 中緩存的數(shù)據(jù)量最小。
以上是關(guān)于 Flink 流批一體迭代引擎的介紹,這些引擎可以同時(shí)在在線和離線場(chǎng)景中使用,并且支持 exactly-once 的容錯(cuò)。未來(lái)我們將進(jìn)一步支持 batch 的執(zhí)行模式,并提供更多的上層開(kāi)發(fā)工具。
最近我們已經(jīng)將 Flink ML 相關(guān)代碼從 Flink 核心代碼庫(kù)中移入一個(gè)單獨(dú)的 Flink ML 代碼庫(kù)。這樣做的首先是為了方便 Flink ML 的快速迭代,其次也希望通過(guò)這個(gè)手段減少 Flink 核心代碼庫(kù)的復(fù)雜度,避免 Flink 核心代碼庫(kù)過(guò)于臃腫。
另外,我們?cè)?Github 上建立了一個(gè)中立的組織 Flink-extended,能夠?yàn)樗?Flink 社區(qū)的開(kāi)發(fā)者提供平臺(tái)來(lái)貢獻(xiàn)一些他們希望開(kāi)源的項(xiàng)目。方便大家分享不帶有特定公司的名字的代碼,使不同公司的開(kāi)發(fā)人員可以把代碼貢獻(xiàn)出來(lái),方便 Flink 社區(qū)來(lái)使用和共建。我們希望借此促進(jìn) Flink 生態(tài)的繁榮發(fā)展。
目前中立項(xiàng)目中已經(jīng)有一些比較重要的項(xiàng)目,比如 Deep Learning on Flink,它是由阿里大數(shù)據(jù)團(tuán)隊(duì)主要開(kāi)發(fā)的一個(gè)開(kāi)源項(xiàng)目,核心作用是可以把 Tensorflow 打包成 Java 算子在 Flink 中運(yùn)行,方便將 Flink 的預(yù)處理程序與 Tensorflow 深度學(xué)習(xí)的訓(xùn)練算法相結(jié)合,形成端到端的訓(xùn)練以及推理。
最近我們已經(jīng)在 Flink ML 中新增了若干常見(jiàn)算法,之后還會(huì)繼續(xù)提供更多開(kāi)箱可用的算法。
上圖是我們目前正在進(jìn)行中的重要工作,其中最核心的工作是將現(xiàn)有的阿里巴巴開(kāi)源的 Alink 代碼庫(kù)進(jìn)行改造,使其中的算法能夠適配新設(shè)計(jì)的 Flink ML API,并將改造后的算法貢獻(xiàn)到 Apache 項(xiàng)目,方便 Flink 用戶得到更多開(kāi)箱可用的算法。
此外,我們還與 360 一起合作共建 Clink 項(xiàng)目,核心目標(biāo)是在離線計(jì)算中用 Java 去運(yùn)行某些算子,得到訓(xùn)練結(jié)果。另一方面,這些算子需要能夠以非常低的延遲做在線推理。然而低延遲在線推理很難用 Java 實(shí)現(xiàn),通常需要用 C++ 來(lái)實(shí)現(xiàn)。為了使開(kāi)發(fā)者只寫(xiě)一遍算法就能同時(shí)應(yīng)用于 Java 和 C++ 環(huán)境,Clink 提供了一些打包的基礎(chǔ)類的功能,方便算法開(kāi)發(fā)者寫(xiě)好 C++ 算子之后,能夠使用 JNI 打包成 Java 算子,并在 Flink 中使用這些算子。
最后,我們計(jì)劃在 Flink ML 中開(kāi)發(fā)對(duì)于 Python 的支持,其中包括允許算法使用者通過(guò)寫(xiě) Python 程序?qū)?Flink ML 中的 Java 算子進(jìn)行連接和組合使用,希望能提高機(jī)器學(xué)習(xí)開(kāi)發(fā)者的效率和使用體驗(yàn)。
以上工作基本都已經(jīng)進(jìn)入開(kāi)源項(xiàng)目,其中算法 API 的設(shè)計(jì)在 FLIP-173 中 ,迭代引擎的設(shè)計(jì)主要在 FLIP-176 中 ,F(xiàn)LIP-174 和 FLIP-175 分別提供了算法參數(shù)的 API 以及 GraphBuilder 的 API。Clink 和 Deep Learning on Flink 等項(xiàng)目也已經(jīng)在 Flink-extended 的組織上,歡迎大家使用。
文章轉(zhuǎn)自微信公眾號(hào)@Apache Flink
GraphQL API滲透測(cè)試指南
Python + BaiduTransAPI :快速檢索千篇英文文獻(xiàn)(附源碼)
掌握ChatGPT API集成的方便指南
node.js + express + docker + mysql + jwt 實(shí)現(xiàn)用戶管理restful api
nodejs + mongodb 編寫(xiě) restful 風(fēng)格博客 api
表格插件wpDataTables-將 WordPress 表與 Google Sheets API 連接
手把手教你用Python和Flask創(chuàng)建REST API
使用 Django 和 Django REST 框架構(gòu)建 RESTful API:實(shí)現(xiàn) CRUD 操作
ASP.NET Web API快速入門介紹
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)