PySpark 的背后原理
Spark主要是由Scala語(yǔ)言開(kāi)發(fā),為了方便和其他系統(tǒng)集成而不引入scala相關(guān)依賴,部分實(shí)現(xiàn)使用Java語(yǔ)言開(kāi)發(fā),例如External Shuffle Service等??傮w來(lái)說(shuō),Spark是由JVM語(yǔ)言實(shí)現(xiàn),會(huì)運(yùn)行在JVM中。然而,Spark除了提供Scala/Java開(kāi)發(fā)接口外,還提供了Python、R等語(yǔ)言的開(kāi)發(fā)接口,為了保證Spark核心實(shí)現(xiàn)的獨(dú)立性,Spark僅在外圍做包裝,實(shí)現(xiàn)對(duì)不同語(yǔ)言的開(kāi)發(fā)支持,本文主要介紹Python Spark的實(shí)現(xiàn)原理,剖析pyspark應(yīng)用程序是如何運(yùn)行起來(lái)的。
Spark運(yùn)行時(shí)架構(gòu)
首先我們先回顧下Spark的基本運(yùn)行時(shí)架構(gòu),如下圖所示,其中橙色部分表示為JVM,Spark應(yīng)用程序運(yùn)行時(shí)主要分為Driver和Executor,Driver負(fù)載總體調(diào)度及UI展示,Executor負(fù)責(zé)Task運(yùn)行,Spark可以部署在多種資源管理系統(tǒng)中,例如Yarn、Mesos等,同時(shí)Spark自身也實(shí)現(xiàn)了一種簡(jiǎn)單的Standalone(獨(dú)立部署)資源管理系統(tǒng),可以不用借助其他資源管理系統(tǒng)即可運(yùn)行。

用戶的Spark應(yīng)用程序運(yùn)行在Driver上(某種程度上說(shuō),用戶的程序就是Spark Driver程序),經(jīng)過(guò)Spark調(diào)度封裝成一個(gè)個(gè)Task,再將這些Task信息發(fā)給Executor執(zhí)行,Task信息包括代碼邏輯以及數(shù)據(jù)信息,Executor不直接運(yùn)行用戶的代碼。
PySpark運(yùn)行時(shí)架構(gòu)
為了不破壞Spark已有的運(yùn)行時(shí)架構(gòu),Spark在外圍包裝一層Python API,借助Py4j實(shí)現(xiàn)Python和Java的交互,進(jìn)而實(shí)現(xiàn)通過(guò)Python編寫Spark應(yīng)用程序,其運(yùn)行時(shí)架構(gòu)如下圖所示。

其中白色部分是新增的Python進(jìn)程,在Driver端,通過(guò)Py4j實(shí)現(xiàn)在Python中調(diào)用Java的方法,即將用戶寫的PySpark程序”映射”到JVM中,例如,用戶在PySpark中實(shí)例化一個(gè)Python的SparkContext對(duì)象,最終會(huì)在JVM中實(shí)例化Scala的SparkContext對(duì)象;在Executor端,則不需要借助Py4j,因?yàn)镋xecutor端運(yùn)行的Task邏輯是由Driver發(fā)過(guò)來(lái)的,那是序列化后的字節(jié)碼,雖然里面可能包含有用戶定義的Python函數(shù)或Lambda表達(dá)式,Py4j并不能實(shí)現(xiàn)在Java里調(diào)用Python的方法,為了能在Executor端運(yùn)行用戶定義的Python函數(shù)或Lambda表達(dá)式,則需要為每個(gè)Task單獨(dú)啟一個(gè)Python進(jìn)程,通過(guò)socket通信方式將Python函數(shù)或Lambda表達(dá)式發(fā)給Python進(jìn)程執(zhí)行。語(yǔ)言層面的交互總體流程如下圖所示,實(shí)線表示方法調(diào)用,虛線表示結(jié)果返回。

下面分別詳細(xì)剖析PySpark的Driver是如何運(yùn)行起來(lái)的以及Executor是如何運(yùn)行Task的。
Driver端運(yùn)行原理
當(dāng)我們通過(guò)spark-submmit提交pyspark程序,首先會(huì)上傳python腳本及依賴,并申請(qǐng)Driver資源,當(dāng)申請(qǐng)到Driver資源后,會(huì)通過(guò)PythonRunner(其中有main方法)拉起JVM,如下圖所示。

PythonRunner入口main函數(shù)里主要做兩件事:
開(kāi)啟Py4j GatewayServer
通過(guò)Java Process方式運(yùn)行用戶上傳的Python腳本
用戶Python腳本起來(lái)后,首先會(huì)實(shí)例化Python版的SparkContext對(duì)象,在實(shí)例化過(guò)程中會(huì)做兩件事:
實(shí)例化Py4j GatewayClient,連接JVM中的Py4j GatewayServer,后續(xù)在Python中調(diào)用Java的方法都是借助這個(gè)Py4j Gateway
通過(guò)Py4j Gateway在JVM中實(shí)例化SparkContext對(duì)象
經(jīng)過(guò)上面兩步后,SparkContext對(duì)象初始化完畢,Driver已經(jīng)起來(lái)了,開(kāi)始申請(qǐng)Executor資源,同時(shí)開(kāi)始調(diào)度任務(wù)。用戶Python腳本中定義的一系列處理邏輯最終遇到action方法后會(huì)觸發(fā)Job的提交,提交Job時(shí)是直接通過(guò)Py4j調(diào)用Java的PythonRDD.runJob方法完成,映射到JVM中,會(huì)轉(zhuǎn)給sparkContext.runJob方法,Job運(yùn)行完成后,JVM中會(huì)開(kāi)啟一個(gè)本地Socket等待Python進(jìn)程拉取,對(duì)應(yīng)地,Python進(jìn)程在調(diào)用PythonRDD.runJob后就會(huì)通過(guò)Socket去拉取結(jié)果。
把前面運(yùn)行時(shí)架構(gòu)圖中Driver部分單獨(dú)拉出來(lái),如下圖所示,通過(guò)PythonRunner入口main函數(shù)拉起JVM和Python進(jìn)程,JVM進(jìn)程對(duì)應(yīng)下圖橙色部分,Python進(jìn)程對(duì)應(yīng)下圖白色部分。Python進(jìn)程通過(guò)Py4j調(diào)用Java方法提交Job,Job運(yùn)行結(jié)果通過(guò)本地Socket被拉取到Python進(jìn)程。還有一點(diǎn)是,對(duì)于大數(shù)據(jù)量,例如廣播變量等,Python進(jìn)程和JVM進(jìn)程是通過(guò)本地文件系統(tǒng)來(lái)交互,以減少進(jìn)程間的數(shù)據(jù)傳輸。

Executor端運(yùn)行原理
為了方便闡述,以Spark On Yarn為例,當(dāng)Driver申請(qǐng)到Executor資源時(shí),會(huì)通過(guò)CoarseGrainedExecutorBackend(其中有main方法)拉起JVM,啟動(dòng)一些必要的服務(wù)后等待Driver的Task下發(fā),在還沒(méi)有Task下發(fā)過(guò)來(lái)時(shí),Executor端是沒(méi)有Python進(jìn)程的。當(dāng)收到Driver下發(fā)過(guò)來(lái)的Task后,Executor的內(nèi)部運(yùn)行過(guò)程如下圖所示。

Executor端收到Task后,會(huì)通過(guò)launchTask運(yùn)行Task,最后會(huì)調(diào)用到PythonRDD的compute方法,來(lái)處理一個(gè)分區(qū)的數(shù)據(jù),PythonRDD的compute方法的計(jì)算流程大致分三步走:
如果不存在pyspark.deamon后臺(tái)Python進(jìn)程,那么通過(guò)Java Process的方式啟動(dòng)pyspark.deamon后臺(tái)進(jìn)程,注意每個(gè)Executor上只會(huì)有一個(gè)pyspark.deamon后臺(tái)進(jìn)程,否則,直接通過(guò)Socket連接pyspark.deamon,請(qǐng)求開(kāi)啟一個(gè)pyspark.worker進(jìn)程運(yùn)行用戶定義的Python函數(shù)或Lambda表達(dá)式。pyspark.deamon是一個(gè)典型的多進(jìn)程服務(wù)器,來(lái)一個(gè)Socket請(qǐng)求,fork一個(gè)pyspark.worker進(jìn)程處理,一個(gè)Executor上同時(shí)運(yùn)行多少個(gè)Task,就會(huì)有多少個(gè)對(duì)應(yīng)的pyspark.worker進(jìn)程。
緊接著會(huì)單獨(dú)開(kāi)一個(gè)線程,給pyspark.worker進(jìn)程喂數(shù)據(jù),pyspark.worker則會(huì)調(diào)用用戶定義的Python函數(shù)或Lambda表達(dá)式處理計(jì)算。
在一邊喂數(shù)據(jù)的過(guò)程中,另一邊則通過(guò)Socket去拉取pyspark.worker的計(jì)算結(jié)果。
把前面運(yùn)行時(shí)架構(gòu)圖中Executor部分單獨(dú)拉出來(lái),如下圖所示,橙色部分為JVM進(jìn)程,白色部分為Python進(jìn)程,每個(gè)Executor上有一個(gè)公共的pyspark.deamon進(jìn)程,負(fù)責(zé)接收Task請(qǐng)求,并fork pyspark.worker進(jìn)程單獨(dú)處理每個(gè)Task,實(shí)際數(shù)據(jù)處理過(guò)程中,pyspark.worker進(jìn)程和JVM Task會(huì)較頻繁地進(jìn)行本地Socket數(shù)據(jù)通信。

總結(jié)
總體上來(lái)說(shuō),PySpark是借助Py4j實(shí)現(xiàn)Python調(diào)用Java,來(lái)驅(qū)動(dòng)Spark應(yīng)用程序,本質(zhì)上主要還是JVM runtime,Java到Python的結(jié)果返回是通過(guò)本地Socket完成。雖然這種架構(gòu)保證了Spark核心代碼的獨(dú)立性,但是在大數(shù)據(jù)場(chǎng)景下,JVM和Python進(jìn)程間頻繁的數(shù)據(jù)通信導(dǎo)致其性能損耗較多,惡劣時(shí)還可能會(huì)直接卡死,所以建議對(duì)于大規(guī)模機(jī)器學(xué)習(xí)或者Streaming應(yīng)用場(chǎng)景還是慎用PySpark,盡量使用原生的Scala/Java編寫應(yīng)用程序,對(duì)于中小規(guī)模數(shù)據(jù)量下的簡(jiǎn)單離線任務(wù),可以使用PySpark快速部署提交。
