訂閱
糾錯(cuò)
加入自媒體

一文詳解Flink知識(shí)體系

2021-09-13 09:58
園陌
關(guān)注

本文目錄:

一、Flink簡(jiǎn)介

二、Flink 部署及啟動(dòng)

三、Flink 運(yùn)行架構(gòu)

四、Flink 算子大全

五、流處理中的 Time 與 Window

六、Flink 狀態(tài)管理

七、Flink 容錯(cuò)

八、Flink SQL

九、Flink CEP

十、Flink CDC

十一、基于 Flink 構(gòu)建全場(chǎng)景實(shí)時(shí)數(shù)倉(cāng)

十二、Flink 大廠面試題

Flink 涉及的知識(shí)點(diǎn)如下圖所示,本文將逐一講解:

本文檔參考了 Flink 的官網(wǎng)及其他眾多資料整理而成,為了整潔的排版及舒適的閱讀,對(duì)于模糊不清晰的圖片及黑白圖片進(jìn)行重新繪制成了高清彩圖。

一、Flink 簡(jiǎn)介1. Flink 發(fā)展

這幾年大數(shù)據(jù)的飛速發(fā)展,出現(xiàn)了很多熱門的開(kāi)源社區(qū),其中著名的有 Hadoop、Storm,以及后來(lái)的 Spark,他們都有著各自專注的應(yīng)用場(chǎng)景。Spark 掀開(kāi)了內(nèi)存計(jì)算的先河,也以內(nèi)存為賭注,贏得了內(nèi)存計(jì)算的飛速發(fā)展。Spark 的火熱或多或少的掩蓋了其他分布式計(jì)算的系統(tǒng)身影。就像 Flink,也就在這個(gè)時(shí)候默默的發(fā)展著。

在國(guó)外一些社區(qū),有很多人將大數(shù)據(jù)的計(jì)算引擎分成了 4 代,當(dāng)然,也有很多人不會(huì)認(rèn)同。我們先姑且這么認(rèn)為和討論。

首先第一代的計(jì)算引擎,無(wú)疑就是 Hadoop 承載的 MapReduce。這里大家應(yīng)該都不會(huì)對(duì) MapReduce 陌生,它將計(jì)算分為兩個(gè)階段,分別為 Map 和 Reduce。對(duì)于上層應(yīng)用來(lái)說(shuō),就不得不想方設(shè)法去拆分算法,甚至于不得不在上層應(yīng)用實(shí)現(xiàn)多個(gè) Job 的串聯(lián),以完成一個(gè)完整的算法,例如迭代計(jì)算。

由于這樣的弊端,催生了支持 DAG 框架的產(chǎn)生。因此,支持 DAG 的框架被劃分為第二代計(jì)算引擎。如 Tez 以及更上層的 Oozie。這里我們不去細(xì)究各種 DAG 實(shí)現(xiàn)之間的區(qū)別,不過(guò)對(duì)于當(dāng)時(shí)的 Tez 和 Oozie 來(lái)說(shuō),大多還是批處理的任務(wù)。

接下來(lái)就是以 Spark 為代表的第三代的計(jì)算引擎。第三代計(jì)算引擎的特點(diǎn)主要是 Job 內(nèi)部的 DAG 支持(不跨越 Job),以及強(qiáng)調(diào)的實(shí)時(shí)計(jì)算。在這里,很多人也會(huì)認(rèn)為第三代計(jì)算引擎也能夠很好的運(yùn)行批處理的 Job。

隨著第三代計(jì)算引擎的出現(xiàn),促進(jìn)了上層應(yīng)用快速發(fā)展,例如各種迭代計(jì)算的性能以及對(duì)流計(jì)算和 SQL 等的支持。Flink 的誕生就被歸在了第四代。這應(yīng)該主要表現(xiàn)在 Flink 對(duì)流計(jì)算的支持,以及更一步的實(shí)時(shí)性上面。當(dāng)然 Flink 也可以支持 Batch 的任務(wù),以及 DAG 的運(yùn)算。

總結(jié):

第 1 代:Hadoop MapReduc 批處理 Mapper、Reducer 2;

第 2 代:DAG 框架(Oozie 、Tez),Tez + MapReduce 批處理 1 個(gè) Tez = MR(1) + MR(2) + ... + MR(n) 相比 MR 效率有所提升;

第 3 代:Spark 批處理、流處理、SQL 高層 API 支持 自帶 DAG 內(nèi)存迭代計(jì)算、性能較之前大幅提;

第 4 代:Flink 批處理、流處理、SQL 高層 API 支持 自帶 DAG 流式計(jì)算性能更高、可靠性更高。

2. 什么是 Flink

Flink 起源于 Stratosphere 項(xiàng)目,Stratosphere 是在 2010~2014 年由 3 所地處柏林的大學(xué)和歐洲的一些其他的大學(xué)共同進(jìn)行的研究項(xiàng)目,2014 年 4 月 Stratosphere 的代碼被復(fù)制并捐贈(zèng)給了 Apache 軟件基金會(huì),參加這個(gè)孵化項(xiàng)目的初始成員是 Stratosphere 系統(tǒng)的核心開(kāi)發(fā)人員,2014 年 12 月,Flink 一躍成為 Apache 軟件基金會(huì)的頂級(jí)項(xiàng)目。

在德語(yǔ)中,Flink 一詞表示快速和靈巧,項(xiàng)目采用一只松鼠的彩色圖案作為 logo,這不僅是因?yàn)樗墒缶哂锌焖俸挽`巧的特點(diǎn),還因?yàn)榘亓值乃墒笥幸环N迷人的紅棕色,而 Flink 的松鼠 logo 擁有可愛(ài)的尾巴,尾巴的顏色與 Apache 軟件基金會(huì)的 logo 顏色相呼應(yīng),也就是說(shuō),這是一只 Apache 風(fēng)格的松鼠。

Flink 主頁(yè)在其頂部展示了該項(xiàng)目的理念:“Apache Flink 是為分布式、高性能、隨時(shí)可用以及準(zhǔn)確的流處理應(yīng)用程序打造的開(kāi)源流處理框架”。

Apache Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無(wú)界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink 被設(shè)計(jì)在所有常見(jiàn)的集群環(huán)境中運(yùn)行,以內(nèi)存執(zhí)行速度和任意規(guī)模來(lái)執(zhí)行計(jì)算。

3. Flink 流處理特性

支持高吞吐、低延遲、高性能的流處理

支持帶有事件時(shí)間的窗口(Window)操作

支持有狀態(tài)計(jì)算的 Exactly-once 語(yǔ)義

支持高度靈活的窗口(Window)操作,支持基于 time、count、session,以及 data-driven 的窗口操作

支持具有 Backpressure 功能的持續(xù)流模型

支持基于輕量級(jí)分布式快照(Snapshot)實(shí)現(xiàn)的容錯(cuò)

一個(gè)運(yùn)行時(shí)同時(shí)支持 Batch on Streaming 處理和 Streaming 處理

Flink 在 JVM 內(nèi)部實(shí)現(xiàn)了自己的內(nèi)存管理

支持迭代計(jì)算

支持程序自動(dòng)優(yōu)化:避免特定情況下 Shuffle、排序等昂貴操作,中間結(jié)果有必要進(jìn)行緩存

4. Flink 基石

Flink 之所以能這么流行,離不開(kāi)它最重要的四個(gè)基石:Checkpoint、State、Time、Window。

首先是 Checkpoint 機(jī)制,這是 Flink 最重要的一個(gè)特性。Flink 基于Chandy-Lamport算法實(shí)現(xiàn)了一個(gè)分布式的一致性的快照,從而提供了一致性的語(yǔ)義。Chandy-Lamport 算法實(shí)際上在 1985 年的時(shí)候已經(jīng)被提出來(lái),但并沒(méi)有被很廣泛的應(yīng)用,而 Flink 則把這個(gè)算法發(fā)揚(yáng)光大了。

Spark 最近在實(shí)現(xiàn) Continue streaming,Continue streaming 的目的是為了降低它處理的延時(shí),其也需要提供這種一致性的語(yǔ)義,最終采用 Chandy-Lamport 這個(gè)算法,說(shuō)明 Chandy-Lamport 算法在業(yè)界得到了一定的肯定。

提供了一致性的語(yǔ)義之后,Flink 為了讓用戶在編程時(shí)能夠更輕松、更容易地去管理狀態(tài),還提供了一套非常簡(jiǎn)單明了的 State API,包括里面的有 ValueState、ListState、MapState,近期添加了 BroadcastState,使用 State API 能夠自動(dòng)享受到這種一致性的語(yǔ)義。

除此之外,Flink 還實(shí)現(xiàn)了 Watermark 的機(jī)制,能夠支持基于事件的時(shí)間的處理,或者說(shuō)基于系統(tǒng)時(shí)間的處理,能夠容忍數(shù)據(jù)的延時(shí)、容忍數(shù)據(jù)的遲到、容忍亂序的數(shù)據(jù)。

另外流計(jì)算中一般在對(duì)流數(shù)據(jù)進(jìn)行操作之前都會(huì)先進(jìn)行開(kāi)窗,即基于一個(gè)什么樣的窗口上做這個(gè)計(jì)算。Flink 提供了開(kāi)箱即用的各種窗口,比如滑動(dòng)窗口、滾動(dòng)窗口、會(huì)話窗口以及非常靈活的自定義的窗口。

5. 批處理與流處理

批處理的特點(diǎn)是有界、持久、大量,批處理非常適合需要訪問(wèn)全套記錄才能完成的計(jì)算工作,一般用于離線統(tǒng)計(jì)。流處理的特點(diǎn)是無(wú)界、實(shí)時(shí),流處理方式無(wú)需針對(duì)整個(gè)數(shù)據(jù)集執(zhí)行操作,而是對(duì)通過(guò)系統(tǒng)傳輸?shù)拿總(gè)數(shù)據(jù)項(xiàng)執(zhí)行操作,一般用于實(shí)時(shí)統(tǒng)計(jì)。

在 Spark 生態(tài)體系中,對(duì)于批處理和流處理采用了不同的技術(shù)框架,批處理由 SparkSQL 實(shí)現(xiàn),流處理由 Spark Streaming 實(shí)現(xiàn),這也是大部分框架采用的策略,使用獨(dú)立的處理器實(shí)現(xiàn)批處理和流處理,而 Flink 可以同時(shí)實(shí)現(xiàn)批處理和流處理。

Flink 是如何同時(shí)實(shí)現(xiàn)批處理與流處理的呢?答案是,Flink 將批處理(即處理有限的靜態(tài)數(shù)據(jù))視作一種特殊的流處理。

Flink 的核心計(jì)算架構(gòu)是下圖中的 Flink Runtime 執(zhí)行引擎,它是一個(gè)分布式系統(tǒng),能夠接受數(shù)據(jù)流程序并在一臺(tái)或多臺(tái)機(jī)器上以容錯(cuò)方式執(zhí)行。

Flink Runtime 執(zhí)行引擎可以作為 YARN(Yet Another Resource Negotiator)的應(yīng)用程序在集群上運(yùn)行,也可以在 Mesos 集群上運(yùn)行,還可以在單機(jī)上運(yùn)行(這對(duì)于調(diào)試 Flink 應(yīng)用程序來(lái)說(shuō)非常有用)。

上圖為 Flink 技術(shù)棧的核心組成部分,值得一提的是,Flink 分別提供了面向流式處理的接口(DataStream API)和面向批處理的接口(DataSet API)。因此,Flink 既可以完成流處理,也可以完成批處理。Flink 支持的拓展庫(kù)涉及機(jī)器學(xué)習(xí)(FlinkML)、復(fù)雜事件處理(CEP)、以及圖計(jì)算(Gelly),還有分別針對(duì)流處理和批處理的 Table API。

能被 Flink Runtime 執(zhí)行引擎接受的程序很強(qiáng)大,但是這樣的程序有著冗長(zhǎng)的代碼,編寫起來(lái)也很費(fèi)力,基于這個(gè)原因,Flink 提供了封裝在 Runtime 執(zhí)行引擎之上的 API,以幫助用戶方便地生成流式計(jì)算程序。Flink 提供了用于流處理的 DataStream API 和用于批處理的 DataSet API。值得注意的是,盡管 Flink Runtime 執(zhí)行引擎是基于流處理的,但是 DataSet API 先于 DataStream API 被開(kāi)發(fā)出來(lái),這是因?yàn)楣I(yè)界對(duì)無(wú)限流處理的需求在 Flink 誕生之初并不大。

DataStream API 可以流暢地分析無(wú)限數(shù)據(jù)流,并且可以用 Java 或者 Scala 等來(lái)實(shí)現(xiàn)。開(kāi)發(fā)人員需要基于一個(gè)叫 DataStream 的數(shù)據(jù)結(jié)構(gòu)來(lái)開(kāi)發(fā),這個(gè)數(shù)據(jù)結(jié)構(gòu)用于表示永不停止的分布式數(shù)據(jù)流。

Flink 的分布式特點(diǎn)體現(xiàn)在它能夠在成百上千臺(tái)機(jī)器上運(yùn)行,它將大型的計(jì)算任務(wù)分成許多小的部分,每個(gè)機(jī)器執(zhí)行一部分。Flink 能夠自動(dòng)地確保發(fā)生機(jī)器故障或者其他錯(cuò)誤時(shí)計(jì)算能夠持續(xù)進(jìn)行,或者在修復(fù) bug 或進(jìn)行版本升級(jí)后有計(jì)劃地再執(zhí)行一次。這種能力使得開(kāi)發(fā)人員不需要擔(dān)心運(yùn)行失敗。Flink 本質(zhì)上使用容錯(cuò)性數(shù)據(jù)流,這使得開(kāi)發(fā)人員可以分析持續(xù)生成且永遠(yuǎn)不結(jié)束的數(shù)據(jù)(即流處理)。

二、Flink 部署及啟動(dòng)

Flink 支持多種安裝模式:

local(本地)——單機(jī)模式,一般不使用;

standalone——獨(dú)立模式,Flink 自帶集群,開(kāi)發(fā)測(cè)試環(huán)境使用;

yarn——計(jì)算資源統(tǒng)一由 Hadoop YARN 管理,生產(chǎn)環(huán)境使用。

Flink 集群的安裝不屬于本文檔的范疇,如安裝 Flink,可自行搜索資料進(jìn)行安裝。

本節(jié)重點(diǎn)在 Flink 的 Yarn 部署模式。

在一個(gè)企業(yè)中,為了最大化的利用集群資源,一般都會(huì)在一個(gè)集群中同時(shí)運(yùn)行多種類型的 Workload,可以使用 YARN 來(lái)管理所有計(jì)算資源。

1. Flink 在 Yarn 上的部署架構(gòu)

從圖中可以看出,Yarn 的客戶端需要獲取 hadoop 的配置信息,連接 Yarn 的 ResourceManager。所以要設(shè)置 YARN_CONF_DIR 或者 HADOOP_CONF_DIR 或者 HADOOP_CONF_PATH,只要設(shè)置了其中一個(gè)環(huán)境變量,就會(huì)被讀取。如果讀取上述的變量失敗了,那么將會(huì)選擇 hadoop_home 的環(huán)境變量,會(huì)嘗試加載$HADOOP_HOME/etc/hadoop 的配置文件。

當(dāng)啟動(dòng)一個(gè) Flink Yarn 會(huì)話時(shí),客戶端首先會(huì)檢查本次請(qǐng)求的資源(存儲(chǔ)、計(jì)算)是否足夠。資源足夠?qū)?huì)上傳包含 HDFS 及 Flink 的配置信息和 Flink 的 jar 包到 HDFS;

客戶端向 RM 發(fā)起請(qǐng)求;

RM 向 NM 發(fā)請(qǐng)求指令,創(chuàng)建 container,并從 HDFS 中下載 jar 以及配置文件;

啟動(dòng) ApplicationMaster 和 jobmanager,將 jobmanager 的地址信息寫到配置文件中,再發(fā)到 hdfs 上;

同時(shí),AM 向 RM 發(fā)送心跳注冊(cè)自己,申請(qǐng)資源(cpu、內(nèi)存);

創(chuàng)建 TaskManager 容器,從 HDFS 中下載 jar 包及配置文件并啟動(dòng);

各 task 任務(wù)通過(guò) jobmanager 匯報(bào)自己的狀態(tài)和進(jìn)度,AM 和 jobmanager 在一個(gè)容器上,AM 就能掌握各任務(wù)的運(yùn)行狀態(tài),從而可以在任務(wù)失敗時(shí),重新啟動(dòng)任務(wù);

任務(wù)完成后,AM 向 RM 注銷并關(guān)閉自己;

2. 啟動(dòng)集群修改 hadoop 的配置參數(shù):vim etc/hadoop/yarn-site.xml

添加:

修改 Hadoop 的 yarn-site.xml,添加該配置表示內(nèi)存超過(guò)分配值,是否將任務(wù)殺掉。

默認(rèn)為 true。運(yùn)行 Flink 程序,很容易內(nèi)存超標(biāo),這個(gè)時(shí)候 yarn 會(huì)自動(dòng)殺掉 job。

修改全局變量 /etc/profile:

添加:export HADOOP_CONF_DIR=/export/servers/hadoop/etc/Hadoop

YARN_CONF_DIR 或者 HADOOP_CONF_DIR 必須將環(huán)境變量設(shè)置為讀取 YARN 和 HDFS 配置

啟動(dòng) HDFS、zookeeper(如果是外置 zookeeper)、YARN 集群;

使用 yarn-session 的模式提交作業(yè)。

Yarn Session 模式提交作業(yè)有兩種方式:yarn-session 和 yarn-cluster

3. 模式一: yarn-session

特點(diǎn):

使用 Flink 中的 yarn-session(yarn 客戶端),會(huì)啟動(dòng)兩個(gè)必要服務(wù) JobManager 和 TaskManagers;

客戶端通過(guò) yarn-session 提交作業(yè);

yarn-session 會(huì)一直啟動(dòng),不停地接收客戶端提交的任務(wù);

如果擁有有大量的小作業(yè),適合使用這種方式。

在 flink 目錄啟動(dòng) yarn-session:

bin/yarn-session.sh -n 2 -tm 800 -jm 800 -s 1 -d

-n 表示申請(qǐng) 2 個(gè)容器
-s 表示每個(gè)容器啟動(dòng)多少個(gè) slot 離模式,表示以后臺(tái)程
-tm 表示每個(gè) TaskManager 申請(qǐng) 800M 內(nèi)存
-d 分序方式運(yùn)行

使用 flink 提交任務(wù):

bin/flink run examples/batch/WordCount.jar

如果程序運(yùn)行完了,可以使用 yarn application -kill application_id 殺掉任務(wù):

yarn application -kill application_1554377097889_0002

bin/yarn-session.sh -n 2 -tm 800 -s 1 -d 意思是:

同時(shí)向 Yarn 申請(qǐng) 3 個(gè) container(即便只申請(qǐng)了兩個(gè),因?yàn)?ApplicationMaster 和 Job Manager 有一個(gè)額外的容器。一旦將 Flink 部署到 YARN 群集中,它就會(huì)顯示 Job Manager 的連接詳細(xì)信息),其中 2 個(gè) Container 啟動(dòng) TaskManager(-n 2),每個(gè) TaskManager 擁有兩個(gè) Task Slot(-s 1),并且向每個(gè) TaskManager 的 Container 申請(qǐng) 800M 的內(nèi)存,以及一個(gè) ApplicationMaster(Job Manager)。

4. 模式二: yarn-cluster

特點(diǎn):

直接提交任務(wù)給 YARN;

大作業(yè),適合使用這種方式;

會(huì)自動(dòng)關(guān)閉 session。

使用 flink 直接提交任務(wù):

bin/flink run -m yarn-cluster -yn 2 -yjm 800 -ytm 800 /export/servers/flink-1.6.0/examples/batch/WordCount.jar

-yn 表示 TaskManager 的個(gè)數(shù)

注意:

在創(chuàng)建集群的時(shí)候,集群的配置參數(shù)就寫好了,但是往往因?yàn)闃I(yè)務(wù)需要,要更改一些配置參數(shù),這個(gè)時(shí)候可以不必因?yàn)橐粋(gè)實(shí)例的提交而修改 conf/flink-conf.yaml;

可以通過(guò):-D

-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

如果使用的是 flink on yarn 方式,想切換回 standalone 模式的話,需要?jiǎng)h除:/tmp/.yarn-properties-root,因?yàn)槟J(rèn)查找當(dāng)前 yarn 集群中已有的 yarn-session 信息中的 jobmanager。三、Flink 運(yùn)行架構(gòu)1. Flink 程序結(jié)構(gòu)

Flink 程序的基本構(gòu)建塊是流和轉(zhuǎn)換(請(qǐng)注意,Flink 的 DataSet API 中使用的 DataSet 也是內(nèi)部流 )。從概念上講,流是(可能永無(wú)止境的)數(shù)據(jù)記錄流,而轉(zhuǎn)換是將一個(gè)或多個(gè)流作為一個(gè)或多個(gè)流的操作。輸入,并產(chǎn)生一個(gè)或多個(gè)輸出流。

Flink 應(yīng)用程序結(jié)構(gòu)就是如上圖所示:

Source: 數(shù)據(jù)源,Flink 在流處理和批處理上的 source 大概有 4 類:基于本地集合的 source、基于文件的 source、基于網(wǎng)絡(luò)套接字的 source、自定義的 source。自定義的 source 常見(jiàn)的有 Apache kafka、RabbitMQ 等,當(dāng)然你也可以定義自己的 source。

Transformation:數(shù)據(jù)轉(zhuǎn)換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select等,操作很多,可以將數(shù)據(jù)轉(zhuǎn)換計(jì)算成你想要的數(shù)據(jù)。

Sink:接收器,Flink 將轉(zhuǎn)換計(jì)算后的數(shù)據(jù)發(fā)送的地點(diǎn) ,你可能需要存儲(chǔ)下來(lái),Flink 常見(jiàn)的 Sink 大概有如下幾類:寫入文件、打印出來(lái)、寫入 socket 、自定義的 sink 。自定義的 sink 常見(jiàn)的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 sink。

2. Flink 并行數(shù)據(jù)流

Flink 程序在執(zhí)行的時(shí)候,會(huì)被映射成一個(gè) Streaming Dataflow,一個(gè) Streaming Dataflow 是由一組 Stream 和 Transformation Operator 組成的。在啟動(dòng)時(shí)從一個(gè)或多個(gè) Source Operator 開(kāi)始,結(jié)束于一個(gè)或多個(gè) Sink Operator。

Flink 程序本質(zhì)上是并行的和分布式的,在執(zhí)行過(guò)程中,一個(gè)流(stream)包含一個(gè)或多個(gè)流分區(qū),而每一個(gè) operator 包含一個(gè)或多個(gè) operator 子任務(wù)。操作子任務(wù)間彼此獨(dú)立,在不同的線程中執(zhí)行,甚至是在不同的機(jī)器或不同的容器上。operator 子任務(wù)的數(shù)量是這一特定 operator 的并行度。相同程序中的不同 operator 有不同級(jí)別的并行度。

一個(gè) Stream 可以被分成多個(gè) Stream 的分區(qū),也就是 Stream Partition。一個(gè) Operator 也可以被分為多個(gè) Operator Subtask。如上圖中,Source 被分成 Source1 和 Source2,它們分別為 Source 的 Operator Subtask。每一個(gè) Operator Subtask 都是在不同的線程當(dāng)中獨(dú)立執(zhí)行的。一個(gè) Operator 的并行度,就等于 Operator Subtask 的個(gè)數(shù)。上圖 Source 的并行度為 2。而一個(gè) Stream 的并行度就等于它生成的 Operator 的并行度。

數(shù)據(jù)在兩個(gè) operator 之間傳遞的時(shí)候有兩種模式:

One to One 模式:兩個(gè) operator 用此模式傳遞的時(shí)候,會(huì)保持?jǐn)?shù)據(jù)的分區(qū)數(shù)和數(shù)據(jù)的排序;如上圖中的 Source1 到 Map1,它就保留的 Source 的分區(qū)特性,以及分區(qū)元素處理的有序性。

Redistributing (重新分配)模式:這種模式會(huì)改變數(shù)據(jù)的分區(qū)數(shù);每個(gè)一個(gè) operator subtask 會(huì)根據(jù)選擇 transformation 把數(shù)據(jù)發(fā)送到不同的目標(biāo) subtasks,比如 keyBy()會(huì)通過(guò) hashcode 重新分區(qū),broadcast()和 rebalance()方法會(huì)隨機(jī)重新分區(qū);

3. Task 和 Operator chain

Flink的所有操作都稱之為Operator,客戶端在提交任務(wù)的時(shí)候會(huì)對(duì)Operator進(jìn)行優(yōu)化操作,能進(jìn)行合并的Operator會(huì)被合并為一個(gè)Operator,合并后的Operator稱為Operator chain,實(shí)際上就是一個(gè)執(zhí)行鏈,每個(gè)執(zhí)行鏈會(huì)在TaskManager上一個(gè)獨(dú)立的線程中執(zhí)行。

4. 任務(wù)調(diào)度與執(zhí)行

當(dāng)Flink執(zhí)行executor會(huì)自動(dòng)根據(jù)程序代碼生成DAG數(shù)據(jù)流圖;

ActorSystem創(chuàng)建Actor將數(shù)據(jù)流圖發(fā)送給JobManager中的Actor;

JobManager會(huì)不斷接收TaskManager的心跳消息,從而可以獲取到有效的TaskManager;

JobManager通過(guò)調(diào)度器在TaskManager中調(diào)度執(zhí)行Task(在Flink中,最小的調(diào)度單元就是task,對(duì)應(yīng)就是一個(gè)線程);

在程序運(yùn)行過(guò)程中,task與task之間是可以進(jìn)行數(shù)據(jù)傳輸?shù)摹?/p>

Job Client:

主要職責(zé)是提交任務(wù), 提交后可以結(jié)束進(jìn)程, 也可以等待結(jié)果返回;Job Client 不是 Flink 程序執(zhí)行的內(nèi)部部分,但它是任務(wù)執(zhí)行的起點(diǎn);Job Client 負(fù)責(zé)接受用戶的程序代碼,然后創(chuàng)建數(shù)據(jù)流,將數(shù)據(jù)流提交給 Job Manager 以便進(jìn)一步執(zhí)行。執(zhí)行完成后,Job Client 將結(jié)果返回給用戶。

JobManager:

主要職責(zé)是調(diào)度工作并協(xié)調(diào)任務(wù)做檢查點(diǎn);集群中至少要有一個(gè) master,master 負(fù)責(zé)調(diào)度 task,協(xié)調(diào)checkpoints 和容錯(cuò);高可用設(shè)置的話可以有多個(gè) master,但要保證一個(gè)是 leader, 其他是standby;Job Manager 包含 Actor System、Scheduler、CheckPoint三個(gè)重要的組件;JobManager從客戶端接收到任務(wù)以后, 首先生成優(yōu)化過(guò)的執(zhí)行計(jì)劃, 再調(diào)度到TaskManager中執(zhí)行。

TaskManager:

主要職責(zé)是從JobManager處接收任務(wù), 并部署和啟動(dòng)任務(wù), 接收上游的數(shù)據(jù)并處理;Task Manager 是在 JVM 中的一個(gè)或多個(gè)線程中執(zhí)行任務(wù)的工作節(jié)點(diǎn);TaskManager在創(chuàng)建之初就設(shè)置好了Slot, 每個(gè)Slot可以執(zhí)行一個(gè)任務(wù)。5. 任務(wù)槽和槽共享

每個(gè)TaskManager是一個(gè)JVM的進(jìn)程, 可以在不同的線程中執(zhí)行一個(gè)或多個(gè)子任務(wù)。為了控制一個(gè)worker能接收多少個(gè)task。worker通過(guò)task slot來(lái)進(jìn)行控制(一個(gè)worker至少有一個(gè)task slot)。

1) 任務(wù)槽

每個(gè)task slot表示TaskManager擁有資源的一個(gè)固定大小的子集。

flink將進(jìn)程的內(nèi)存進(jìn)行了劃分到多個(gè)slot中。

圖中有2個(gè)TaskManager,每個(gè)TaskManager有3個(gè)slot的,每個(gè)slot占有1/3的內(nèi)存。

內(nèi)存被劃分到不同的slot之后可以獲得如下好處:

TaskManager最多能同時(shí)并發(fā)執(zhí)行的任務(wù)是可以控制的,那就是3個(gè),因?yàn)椴荒艹^(guò)slot的數(shù)量。

slot有獨(dú)占的內(nèi)存空間,這樣在一個(gè)TaskManager中可以運(yùn)行多個(gè)不同的作業(yè),作業(yè)之間不受影響。

2) 槽共享

默認(rèn)情況下,Flink允許子任務(wù)共享插槽,即使它們是不同任務(wù)的子任務(wù),只要它們來(lái)自同一個(gè)作業(yè)。結(jié)果是一個(gè)槽可以保存作業(yè)的整個(gè)管道。允許插槽共享有兩個(gè)主要好處:

只需計(jì)算Job中最高并行度(parallelism)的task slot,只要這個(gè)滿足,其他的job也都能滿足。

資源分配更加公平,如果有比較空閑的slot可以將更多的任務(wù)分配給它。圖中若沒(méi)有任務(wù)槽共享,負(fù)載不高的Source/Map等subtask將會(huì)占據(jù)許多資源,而負(fù)載較高的窗口subtask則會(huì)缺乏資源。

有了任務(wù)槽共享,可以將基本并行度(base parallelism)從2提升到6.提高了分槽資源的利用率。同時(shí)它還可以保障TaskManager給subtask的分配的slot方案更加公平。

1  2  3  4  下一頁(yè)>  
聲明: 本文由入駐維科號(hào)的作者撰寫,觀點(diǎn)僅代表作者本人,不代表OFweek立場(chǎng)。如有侵權(quán)或其他問(wèn)題,請(qǐng)聯(lián)系舉報(bào)。

發(fā)表評(píng)論

0條評(píng)論,0人參與

請(qǐng)輸入評(píng)論內(nèi)容...

請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字

您提交的評(píng)論過(guò)于頻繁,請(qǐng)輸入驗(yàn)證碼繼續(xù)

  • 看不清,點(diǎn)擊換一張  刷新

暫無(wú)評(píng)論

暫無(wú)評(píng)論

    掃碼關(guān)注公眾號(hào)
    OFweek人工智能網(wǎng)
    獲取更多精彩內(nèi)容
    文章糾錯(cuò)
    x
    *文字標(biāo)題:
    *糾錯(cuò)內(nèi)容:
    聯(lián)系郵箱:
    *驗(yàn) 證 碼:

    粵公網(wǎng)安備 44030502002758號(hào)