Flink On Standalone 即Flink任務運行在Standalone集群中,Standlone集群部署時采用Session模式來構建集群,即:首先構建一個Flink集群,Flink集群資源就固定了,所有提交到該集群的Flink作業都運行在這一個集群中,如果集群中提交的任務多資源不夠時,需要手動增加節點,所以Flink 基于Standalone運行任務一般用在開發測試或者企業實時業務較少的場景下。
Flink On Standalone 任務提交支持Session會話模式和Application應用模式,不支持Per-Job單作業模式。下面介紹基于Standalone 的Session會話模式和Application應用模式任務提交命令和原理,演示兩類任務提交模式的代碼還是以上一章節中讀取Socket 數據進行實時WordCount統計代碼為例,代碼如下:
package com.lanson.flinkjava.code.chapter4;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * 讀取Socket數據進行實時WordCount統計 */public class SocketWordCount { public static void main(String[] args) throws Exception { //1.準備環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.讀取Socket數據 DataStreamSource ds = env.socketTextStream("node3", 9999); //3.準備K,V格式數據 SingleOutputStreamOperator> tupleDS = ds.flatMap((String line, Collector> out) -> { String[] words = line.split(","); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }).returns(Types.TUPLE(Types.STRING, Types.INT)); //4.聚合打印結果 tupleDS.keyBy(tp -> tp.f0).sum(1).print(); //5.execute觸發執行 env.execute(); }}
將以上代碼進行打包,名稱為"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node3節點上啟動socket服務(nc -lk 9999)。
(資料圖)
在Standalone集群搭建完成后,基于Standalone集群提交Flink任務方式就是使用的Session模式,提交任務之前首先啟動Standalone集群($FLINK_HOME/bin/start-cluster.sh),然后再提交任務,Standalone Session模式提交任務命令如下:
[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./flink run -m node1:8081 -d -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
以上提交任務的參數解釋如下:
參數 | 解釋 |
---|---|
-m | --jobmanager,指定提交任務連接的JobManager地址。 |
-c | --class,指定運行的class主類。 |
-d | --detached,任務提交后在后臺獨立運行,退出客戶端,也可不指定。 |
-p | --parallelism,執行程序的并行度。 |
以上任務提交完成后,我們可以登錄Flink WebUI(https://node1:8081)查看啟動一個任務:
再次按照以上命令提交Flink任務可以看到集群中會有2個任務,說明Standalone Session模式下提交的所有Flink任務共享集群資源,如下:
以上提交Flink流任務的名稱默認為"Flink Streaming Job",也可以通過參數"pipeline.name"來自定義指定Job 名稱,提交命令如下:
./flink run -m node1:8081 -d -Dpipeline.name=socket-wc1 -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
提交之后,可以看到頁面中有三個任務,最后一個任務提交的名稱改成了自定義任務名稱。
Standalone Session模式提交任務中首先需要創建Flink集群,集群創建啟動的同時Dispatcher、JobMaster、ResourceManager對象一并創建、TaskManager也一并啟動,TaskManager會向集群ResourceManager匯報Slot信息,Flink集群資源也就確定了。Standalone Session模式提交任務流程如下:
在客戶端提交Flink任務,客戶端會將任務轉換成JobGraph提交給JobManager。Dispatcher將提交任務提交給JobMaster。JobMaster向ResourceManager申請Slot資源。ResourceManager會在對應的TaskManager上劃分Slot資源。TaskManager向JobMaster offer Slot資源。JobMaster將任務對應的task發送到TaskManager上執行。Standalone Application模式中不會預先創建Flink集群,在提交Flink 任務的同時會創建JobManager,啟動Flink集群,然后需要手動啟動TaskManager連接該Flink集群,啟動的TaskManager會根據$FLINK_HOME/conf/flink-conf.yaml配置文件中的"jobmanager.rpc.address"配置找JobManager,所以這里選擇在node1節點上提交任務并啟動JobManager,方便后續其他節點啟動TaskManager后連接該節點。Standalone Appliction模式提交任務步驟和命令如下:
1.1、準備Flink jar包
在node1節點上將Flink 打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包放在 $FLINK_HOME/lib目錄下。
1.2、提交任務,在node1 節點上啟動 JobManager
cd /software/flink-1.16.0/bin/
#執行如下命令,啟動JobManager ./standalone-job.sh start --job-classname com.lanson.flinkjava.code.chapter4.SocketWordCount
執行以上命令后會自動從$FLINK_HOME/lib中掃描所有jar包,執行指定的入口類。命令執行后可以訪問對應的Flink WebUI:https://node1:8081,可以看到提交的任務,但是由于還沒有執行TaskManager任務無法執行。
1.3、啟動TaskManager
在node1、node2、node3任意一臺節點上啟動taskManager,根據$FLINK_HOME/conf/flink-conf.yaml配置文件中"jobmanager.rpc.address"配置項會找到對應node1 JobManager。
#在node1節點上啟動TaskManager[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./taskmanager.sh start#在node2節點上啟動TaskManager[root@node2 ~]# cd /software/flink-1.16.0/bin/[root@node2 bin]# ./taskmanager.sh start
啟動兩個TaskManager后可以看到Flink WebUI中對應的有2個TaskManager,可以根據自己任務使用資源的情況,手動啟動多個TaskManager。
1.4、停止集群
#停止啟動的JobManager[root@node1 bin]# ./standalone-job.sh stop#停止啟動的TaskManager[root@node1 bin]# ./taskmanager.sh stop[root@node2 bin]# ./taskmanager.sh stop
我們可以以同樣的方式在其他節點上以Standalone Application模式提交先的Flink任務,但是每次提交都是當前提交任務獨享集群資源。
Standalone Application模式提交任務中提交任務的同時會啟動JobManager創建Flink集群,但是需要手動啟動TaskManager,這樣提交的任務才能正常運行,如果提交的任務使用資源多,還可以啟動多個TaskManager。Standalone Application模式提交任務流程如下:
在客戶端提交Flink任務的同時啟動JobManager,客戶端會將任務轉換成JobGraph提交給JobManager。Dispatcher會啟動JobMaster,Dispatcher將提交任務提交給JobMaster。JobMaster向ResourceManager申請Slot資源。手動啟動TaskManager,TaskManager會向ResourceManager注冊Slot資源ResourceManager會在對應的TaskManager上劃分Slot資源。TaskManager向JobMaster offer Slot資源。JobMaster將任務對應的task發送到TaskManager上執行。Standalone Application模式任務提交流程和Standalone Session模式類似,兩者區別主要是Standalone Session模式中啟動Flink集群時JobManager、TaskManager、JobMaster會預先啟動;Standalone Application模式中提交任務時同時啟動集群JobManager、JobMaster,需要手動啟動TaskManager。
?
責任編輯: