Spark Core——RDD何以替代Hadoop MapReduce?
導(dǎo)讀
繼續(xù)前期依次推文PySpark入門和SQL DataFrame簡介的基礎(chǔ)上,今日對Spark中最重要的一個概念——RDD進(jìn)行介紹。雖然在Spark中,基于RDD的其他4大組件更為常用,但作為Spark core中的核心數(shù)據(jù)抽象,RDD是必須深刻理解的基礎(chǔ)概念。

RDD(Resilient Distributed Dataset),彈性分布式數(shù)據(jù)集,是Spark core中的核心數(shù)據(jù)抽象,其他4大組件都或多或少依賴于RDD。簡單理解,RDD就是一種特殊的數(shù)據(jù)結(jié)構(gòu),是為了適應(yīng)大數(shù)據(jù)分布式計算的特殊場景(此時傳統(tǒng)的數(shù)據(jù)集合無法滿足分布式、容錯性等需求)而設(shè)計的一種數(shù)據(jù)形式,其三個核心關(guān)鍵詞是:
彈性:主要包含4層含義:即數(shù)據(jù)大小可變、分區(qū)數(shù)可變、計算可容錯、內(nèi)存硬盤存儲位置可變
分布式:大數(shù)據(jù)一般都是分布式的,意味著多硬件依賴、多核心并行計算
數(shù)據(jù)集:說明這是一組數(shù)據(jù)的集合,或者說數(shù)據(jù)結(jié)構(gòu)

RDD在Spark中占據(jù)"core"的地位
看一個人,可以看看他的對手;了解一個產(chǎn)品,也可以看看他的競品。Spark是為了解決Hadoop中 MapReduce計算框架效率低下而產(chǎn)生的大數(shù)據(jù)計算引擎,所以Spark起初的競爭對手就是MapReduce。
MapReduce之所以計算效率低,主要原因在于每次計算都涉及從硬盤的數(shù)據(jù)讀寫問題,而Spark設(shè)計之初就考慮盡可能避免硬盤讀寫,所以Spark的第一大特點是數(shù)據(jù)優(yōu)先存儲于內(nèi)存中(除非內(nèi)存存儲不夠才放到硬盤中)。同時,為了盡可能優(yōu)化RDD在內(nèi)存中的計算流程,Spark還引入了lazy特性。lazy特性其實質(zhì)就是直至"真正碰上事了"才計算,否則就一直"推托下去",頗有不見兔子不撒鷹的味道。
這實際上又涉及到了RDD的兩類算子:transformation和action,前者只是建立邏輯轉(zhuǎn)換流程,后者才真正落地執(zhí)行。transformation的結(jié)果是從一個RDD轉(zhuǎn)換到另一個RDD,而action則是從一個RDD轉(zhuǎn)換到一個非RDD,因此從執(zhí)行結(jié)果是否仍然是RDD也可推斷出該操作是transformation抑或action。進(jìn)一步地,在transformation過程中,Spark內(nèi)部調(diào)度RDD的計算過程是一個有向無環(huán)圖(Directed Acyclic Graph,DAG ),意味著所有RDD的轉(zhuǎn)換都帶有方向性(一個產(chǎn)生另一個,即血緣關(guān)系),且不存在循環(huán)依賴的,這對Spark的容錯性帶來了有效保證:當(dāng)一個環(huán)節(jié)出現(xiàn)問題時僅需按照方向關(guān)系追溯到相應(yīng)的父RDD即可,而無需從頭開始全流程計算。
Spark中關(guān)于寬窄依賴的經(jīng)典圖例(圖片選自網(wǎng)絡(luò))
上圖給出了寬窄依賴的一個圖例。實際上,這里的寬窄依賴是針對RDD的每個partition而言的,分析子RDD的每個partition來源就容易理解其依賴為寬或窄:
窄依賴:子RDD和父RDD中的各partition是一一對應(yīng)關(guān)系,由于僅單個依賴,所以是窄的,也無需等待其他父RDD中的partition
寬依賴:子RDD和父RDD中partition存在一對多的關(guān)系,也就是說生成子RDD中的某個partition不僅需要這個父RDD中的一個partition,還需要其他partition或其他父RDD的partition,由于依賴多個partition,所以是寬的,在實際執(zhí)行過程中要等到所有partition就位后方可執(zhí)行
也正因如此,對于整個DAG而言,依據(jù)依賴類型可將Spark執(zhí)行過程劃分為多個階段,同一階段內(nèi)部Spark還會進(jìn)行相應(yīng)的調(diào)度和優(yōu)化??梢哉f,內(nèi)存計算+DAG兩大特性共同保證了Spark執(zhí)行的高效性。
RDD的創(chuàng)建主要有3類形式:
從Python中的其他數(shù)據(jù)結(jié)構(gòu)創(chuàng)建,用到的方法為parallelize(),接收一個本地Python集合對象,返回一個RDD對象,一般適用于較小的數(shù)據(jù)集
從本地或HDFS文件中創(chuàng)建RDD對象,適用于大數(shù)據(jù)集,也是生產(chǎn)部署中較為常用的方式
從一個已有RDD中生成另一個RDD,所有transformation類算子其實都是執(zhí)行這一過程
from?pyspark?import?SparkContext??#?SparkContext是spark?core的入口
sc?=?SparkContext()??#?sc是一個單例
rdd1?=?sc.parallelize(['Tom',?'John',?'Joy'])??#?從本地已有Python集合創(chuàng)建
rdd2?=?sc.textFile('test.txt')??#?從本地文件序列化一個RDD
rdd3?=?rdd1.map(lambda?x:(x,?1))??#?從一個RDD轉(zhuǎn)換為另一個RDD
需要指出的是,RDD作為分布式的數(shù)據(jù)集合,其本身是不可變對象(immutable),所以所有的transformation算子都是從一個RDD轉(zhuǎn)換生成了一個新的RDD,這也印證了DAG中無環(huán)的概念。
至于說轉(zhuǎn)換過程中仍然可以使用相同的變量名,這是由Python的特性所決定的,類似于字符串是不可變數(shù)據(jù)類型,但也可以由一個字符串生成另一個同名字符串一樣。
Spark中的算子,其實就是一類操作,或者更具體說是一個函數(shù)!
前面提到,Spark在執(zhí)行過程中,依據(jù)從一個RDD是生成另一個RDD還是其他數(shù)據(jù)類型,可將操作分為兩類:transformation和action。這實際上也是最為常用的RDD操作,甚至說Spark core編程模式就是先經(jīng)歷一系列的transformation,然后在action提取相應(yīng)的結(jié)果。
然而,在系列transformation過程中,由于其lazy特性,當(dāng)且僅當(dāng)遇到action操作時才真正從頭至尾的完整執(zhí)行,所以就不得不面對一個問題:假如有RDD6是由前面系列的RDD1-5轉(zhuǎn)換生成,而RDD6既是RDD7的父RDD,也是RDD8的父RDD,所以在獨立執(zhí)行RDD7和RDD8時,實際上會將RDD1=>RDD6的轉(zhuǎn)換操作執(zhí)行兩遍,存在資源和效率上的浪費。當(dāng)存在2遍計算重復(fù)或許尚可接受,但若存在更多重復(fù)轉(zhuǎn)換時,這種模式或許不是一個明智之舉,為此Spark還為RDD設(shè)計了第三類算子:持久化操作persistence。
至此,RDD的三類常用算子介紹如下:
1. transformation算子
map,接收一個函數(shù)作為參數(shù),實現(xiàn)將RDD中的每個元素一對一映射生成另一個RDD,其實與Python中的原生map函數(shù)功能類似
filter,接收一個函數(shù)作為參數(shù),實現(xiàn)將RDD中每個元素判斷條件是否滿足,進(jìn)行執(zhí)行過濾,與Python中的原生filter函數(shù)類似
flatMap,實際上包含了兩個步驟,首先執(zhí)行map功能,將RDD中的每個元素執(zhí)行一個映射轉(zhuǎn)換,當(dāng)轉(zhuǎn)換結(jié)果是多個元素時(例如轉(zhuǎn)換為列表),再將其各個元素展平,實現(xiàn)一對多映射
groupByKey,適用于RDD中每個元素是一個包含兩個元素的元組格式,例如(key, value)形式,進(jìn)而將相同key對應(yīng)的value構(gòu)成一個特殊的集合對象,實質(zhì)與SQL或者pandas中g(shù)roupby操作類似,一般還需與其他聚合函數(shù)配合操作
reduceByKey,實際上groupByKey只執(zhí)行了一半的聚合動作,即只有"聚"的過程,而缺少實質(zhì)性的"合"的操作。reduceByKey則是在groupby之后加入了reduce的函數(shù),實現(xiàn)真正聚合。換句話說,reduceByKey = groupByKey + aggFunction
sortByKey,也比較簡單,即根據(jù)key值進(jìn)行排序的過程
另外,針對以上函數(shù)還有一些功能相近的函數(shù),不再列出。
2. action算子
action算子Spark中真正執(zhí)行的操作,當(dāng)一個算子的執(zhí)行結(jié)果不再是RDD時,那么它就是一個action算子,此時Spark意識到不能再簡單的進(jìn)行邏輯運(yùn)算標(biāo)記,而需要實質(zhì)性的執(zhí)行計算。常用的action算子包括如下:
collect,可能是日常功能調(diào)試中最為常用的算子,用于將RDD實際執(zhí)行并返回所有元素的列表格式,在功能調(diào)試或者數(shù)據(jù)集較小時較為常用,若是面對大數(shù)據(jù)集或者線上部署時切忌使用,因為有可能造成內(nèi)存溢出
take,接收整數(shù)n,返回特定記錄條數(shù)
first,返回第一條記錄,相當(dāng)于take(1)
count,返回RDD記錄條數(shù)
reduce,對RDD的所有元素執(zhí)行聚合操作,與Python中的原生reduce功能類似,返回一個標(biāo)量
foreach,對RDD中每個元素執(zhí)行特定的操作,功能上類似map,但會實際執(zhí)行并返回結(jié)果
3. persistence算子
持久化的目的是為了短期內(nèi)將某一RDD存儲于內(nèi)存或硬盤中,使其可復(fù)用。主要操作有兩類:
persist,接收參數(shù)可以指定持久化級別,例如MEMORY_ONLY和MEMORY_AND_DISK,其中前者表示僅存儲于內(nèi)存中;后者表示優(yōu)先放于內(nèi)存,內(nèi)存不足再放硬盤中
cache,緩存,即僅將RDD存于內(nèi)存中,相當(dāng)于持久化級別為MEMORY_ONLY的persist操作
另外,還有checkpoint也屬于持久化操作。對于一個已經(jīng)持久化的對象,當(dāng)無需繼續(xù)使用時,可使用unpersist完成取消持久化。
需知,持久化操作是為了便于多次重復(fù)調(diào)用同一RDD時,防止發(fā)生重復(fù)計算而設(shè)計的操作,但其本身仍然是偏lazy的模式,即執(zhí)行了persist或者cache操作后,僅僅是將其標(biāo)記為需要持久化,而直至第一次遇到action觸發(fā)其執(zhí)行時才會真正的完成持久化。
最后,舉一個Spark中hello world級別的WordCount例子,實戰(zhàn)一下各類算子的應(yīng)用:
texts?=?['this?is?spark',?'this?is?RDD']
rdd?=?sc.parallelize(texts)??#?從已有集合創(chuàng)建RDD對象
#?rdd?=?['this?is?spark',?'this?is?RDD']
rdd1?=?rdd.flatMap(lambda?x:x.split('?'))??#?flatMap將原來的句子用空格分割,并展平至單個詞
#?rdd1?=?['this',?'is',?'spark',?'this',?'is',?'RDD']
rdd2?=?rdd1.map(lambda?x:(x,?1))??#?將每個單詞映射為(單詞,1)的(key?value)對象格式
#?rdd2?=?[('this',?1),?('is',?1),?('spark',?1),?('this',?1),?('is',?1),?('RDD',?1)]
rdd3?=?rdd2.reduceByKey(lambda?a,?b:a+b)??#?依據(jù)單詞相同進(jìn)行聚合
#?rdd3?=?[('spark',?1),?('RDD',?1),?('this',?2),?('is',?2)]
rdd3.collect()??#?遇到action算子,將上述rdd=>rdd1=>rdd2=>rdd3有向無環(huán)圖真正執(zhí)行,并返回列表

相關(guān)閱讀:
