
如何快速實(shí)現(xiàn)REST API集成以優(yōu)化業(yè)務(wù)流程
Flink編程API中,我們開發(fā)的數(shù)據(jù)處理Job程序,都是始于一個(gè)Source,對(duì)應(yīng)的輸入數(shù)據(jù)源,終于一個(gè)Sink,對(duì)應(yīng)輸出的存儲(chǔ)系統(tǒng),中間是各種豐富的處理算子。但是對(duì)于批式和流式編程API,從代碼層面對(duì)應(yīng)的抽象基本上是名稱不同,具體邏輯功能比較一致:批式編程API對(duì)批式Job DAG中每個(gè)節(jié)點(diǎn)的抽象使用的是DataSet,而流式編程API中對(duì)應(yīng)的是DataStream。
對(duì)于批式Job DAG中,DataSet的類設(shè)計(jì)體系,如下圖所示:
相關(guān)的類都在包org.apache.flink.api.java.operators下面,通過上圖可以看出,主要分為4類:DataSource、DataSink、SingleInputOperator、TwoInputOperator。其中,DataSink并沒有繼承自DataSet,但是作為批式Job DAG的輸出節(jié)點(diǎn)抽象,也還是與上圖中各個(gè)Operator有直接或間接的關(guān)系。 在編寫批式Job過程中,輸入是一個(gè)DataSource(實(shí)際也是DataSet),處理中每經(jīng)過一個(gè)轉(zhuǎn)換操作(Transformation),都會(huì)生成一個(gè)新的類型的DataSet,這在API上看是統(tǒng)一的,實(shí)際底層稍微有一點(diǎn)不同。比如,經(jīng)過groupBy操作后,返回的是一個(gè)UnsortedGrouping,它不是一個(gè)DataSet實(shí)現(xiàn),而是一種中間結(jié)構(gòu),封裝了很多有用的信息以供翻譯過程中使用,通過使用這種中間結(jié)構(gòu)能夠更好地處理復(fù)雜的轉(zhuǎn)換操作,通過下面代碼來(lái)看可能更直觀一些,在DataSet中查看groupBy()方法代碼,如下所示:
public UnsortedGrouping<T> groupBy(int... fields) {
return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
}
上面代碼中UnsortedGrouping并不是一個(gè)DataSet實(shí)現(xiàn),而是一個(gè)用來(lái)處理groupBy操作的中間結(jié)構(gòu),它繼承自Grouping抽象類,類定義如下所示:
@Public
public abstract class Grouping<T> {
protected final DataSet<T> inputDataSet;
protected final Keys<T> keys;
protected Partitioner<?> customPartitioner;
public Grouping(DataSet<T> set, Keys<T> keys) {
if (set == null || keys == null) {
throw new NullPointerException();
}
if (keys.isEmpty()) {
throw new InvalidProgramException("The grouping keys must not be empty.");
}
this.inputDataSet = set;
this.keys = keys;
}
@Internal
public DataSet<T> getInputDataSet() {
return this.inputDataSet;
}
@Internal
public Keys<T> getKeys() {
return this.keys;
}
@Internal
public Partitioner<?> getCustomPartitioner() {
return this.customPartitioner;
}
}
對(duì)于流式Job DAG中,類設(shè)計(jì)方面稍有不同,Flink使用了DataStream和StreamOperator這兩個(gè)類設(shè)計(jì)體系。我們先看DataStream類設(shè)計(jì)體系,如下圖所示:
DataStream表示在流式Job DAG中每一步轉(zhuǎn)換操作之前與之后,都對(duì)應(yīng)著一個(gè)DataStream的數(shù)據(jù)結(jié)構(gòu),它內(nèi)部封裝了與轉(zhuǎn)換操作相關(guān)的處理邏輯,其實(shí)就是StreamOperator。對(duì)應(yīng)上圖中,我們舉幾個(gè)編寫流式處理程序的例子說明:調(diào)用StreamExecutionEnvironment.readTextFile()時(shí)會(huì)生成一個(gè)DataStreamSource,調(diào)用keyBy()時(shí)會(huì)生成一個(gè)KeyedStream,調(diào)用split()時(shí)會(huì)生成一個(gè)SplitStream,調(diào)用iterate()時(shí)會(huì)生成一個(gè)IterativeStream。
下面看下StreamOperator類的設(shè)計(jì)體系,如下圖所示:
編寫批式Job程序,使用執(zhí)行上線文環(huán)境對(duì)象ExecutionEnvironment,而流式使用的是StreamExecutionEnvironment。通過用戶編程API構(gòu)建好DAG Job后,都是通過調(diào)用執(zhí)行上線文環(huán)境對(duì)象的execute()方法提交Job去運(yùn)行。無(wú)論是批式Job還是流式Job,它們?cè)谔峤粓?zhí)行過程中,有相同的流程,也有不同的流程,通過識(shí)別這個(gè)過程中涉及相同/不同的API對(duì)象,我們抽象出如下流程概念圖:
上圖中,左側(cè)是批式Job通過API構(gòu)建并提交到計(jì)算集群,基于DataSet進(jìn)行編程實(shí)現(xiàn),始于DataSource,終于DataSink;右側(cè)是流式Job通過API構(gòu)建并提交到計(jì)算集群,基于DataStream進(jìn)行編程實(shí)現(xiàn),始于DataStreamSource,終于DataStreamSink。中間部分,跨兩個(gè)不同環(huán)境上下文對(duì)象是在提交Job過程中公共的抽象。
對(duì)于批式Job程序提交,核心代碼如下所示:
final Plan plan = createProgramPlan(jobName);
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(plan, configuration);
對(duì)于流式Job程序提交運(yùn)行的核心代碼,如下所示:
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
… …
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
? ? ? ? ?上面代碼中的Plan和StreamGraph,都是Pipeline接口的具體實(shí)現(xiàn),這兩個(gè)實(shí)現(xiàn)分別用來(lái)表示批式和流式Job程序的拓?fù)洌瑑?nèi)部封裝了用于構(gòu)建生成JobGraph所需要的全部信息。
? ? ? ?上面代碼是構(gòu)建生成JobGraph的主要邏輯,先是通過getExecutor()獲取到一個(gè)PipelineExecutor,然后調(diào)用PipelineExecutor的execute()來(lái)構(gòu)建并提交JobGraph。這里,PipelineExecutor表示執(zhí)行Flink Job的方式,比如,本地執(zhí)行使用LocalExecutor,或提交到Y(jié)ARN集群上執(zhí)行使用YarnJobClusterExecutor,或提交到Kubernetes集群上執(zhí)行使用KubernetesSessionClusterExecutor,等等。
? ? ? ?無(wú)論是提交批式還是流式Job,最終都被轉(zhuǎn)換成JobGraph對(duì)象,構(gòu)建JobGraph的處理邏輯是完全統(tǒng)一的。構(gòu)建JobGraph的代碼邏輯,如下代碼所示:
public static JobGraph getJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) {
FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
return pipelineTranslator.translateToJobGraph(pipeline,
optimizerConfiguration,
defaultParallelism);
}
? ? ?上面,輸入的Pipeline對(duì)象,批式編程對(duì)應(yīng)具體實(shí)現(xiàn)Plan,流式編程對(duì)應(yīng)具體實(shí)現(xiàn)StreamGraph。由于用戶編程API的不同,這里面選擇了不同的FlinkPipelineTranslator來(lái)對(duì)輸入的Pipeline對(duì)象進(jìn)行翻譯,其中批式對(duì)應(yīng)的是PlanTranslator,流式對(duì)應(yīng)的是StreamGraphTranslator,最終通過翻譯,都生成一個(gè)統(tǒng)一的JobGraph。當(dāng)然,調(diào)用translateToJobGraph()方法進(jìn)行翻譯處理的邏輯有很大的不同。
文章轉(zhuǎn)自微信公眾號(hào)@架構(gòu)師
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)