<kbd id="5sdj3"></kbd>
<th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>

    分布式機(jī)器學(xué)習(xí)原理及實戰(zhàn)(Pyspark)

    共 6473字,需瀏覽 13分鐘

     ·

    2021-06-12 17:14

    一、大數(shù)據(jù)框架及Spark介紹

    1.1 大數(shù)據(jù)框架

    大數(shù)據(jù)(Big Data)是指無法在一定時間內(nèi)用常規(guī)軟件工具對其內(nèi)容進(jìn)行抓取、管理和處理的數(shù)據(jù)集合。大數(shù)據(jù)技術(shù),是指從各種各樣類型的數(shù)據(jù)中,快速獲得有價值信息的能力。

    自2003年Google公布了3篇大數(shù)據(jù)奠基性論文,為大數(shù)據(jù)存儲及分布式處理的核心問題提供了思路:非結(jié)構(gòu)化文件分布式存儲(GFS)、分布式計算(MapReduce)及結(jié)構(gòu)化數(shù)據(jù)存儲(BigTable),并奠定了現(xiàn)代大數(shù)據(jù)技術(shù)的理論基礎(chǔ),而后大數(shù)據(jù)技術(shù)便快速發(fā)展,誕生了很多日新月異的技術(shù)。歸納現(xiàn)有大數(shù)據(jù)框架解決的核心問題及相關(guān)技術(shù)主要為:

    • 分布式存儲的問題:有GFS,HDFS等,使得大量的數(shù)據(jù)能橫跨成百上千臺機(jī)器;
    • 大數(shù)據(jù)計算的問題:有MapReduce、Spark批處理、Flink流處理等,可以分配計算任務(wù)給各個計算節(jié)點(機(jī)器);
    • 結(jié)構(gòu)化數(shù)據(jù)存儲及查詢的問題:有Hbase、Bigtable等,可以快速獲取/存儲結(jié)構(gòu)化的鍵值數(shù)據(jù);
    • 大數(shù)據(jù)挖掘的問題:有Hadoop的mahout,spark的ml等,可以使用分布式機(jī)器學(xué)習(xí)算法挖掘信息;

    1.2 Spark的介紹

    Spark是一個分布式內(nèi)存批計算處理框架,Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node組成。對于每個Spark應(yīng)用程序,Worker Node上存在一個Executor進(jìn)程,Executor進(jìn)程中包括多個Task線程。在執(zhí)行具體的程序時,Spark會將程序拆解成一個任務(wù)DAG(有向無環(huán)圖),再根據(jù)DAG決定程序各步驟執(zhí)行的方法。該程序先分別從textFile和HadoopFile讀取文件,經(jīng)過一些列操作后再進(jìn)行join,最終得到處理結(jié)果。PySpark是Spark的Python API,通過Pyspark可以方便地使用 Python編寫 Spark 應(yīng)用程序, 其支持 了Spark 的大部分功能,例如 Spark SQL、DataFrame、Streaming、MLLIB(ML)和 Spark Core。

    二、PySpark分布式機(jī)器學(xué)習(xí)

    2.1 PySpark機(jī)器學(xué)習(xí)庫

    Pyspark中支持兩個機(jī)器學(xué)習(xí)庫:mllib及ml,區(qū)別在于ml主要操作的是DataFrame,而mllib操作的是RDD,即二者面向的數(shù)據(jù)集不一樣。相比于mllib在RDD提供的基礎(chǔ)操作,ml在DataFrame上的抽象級別更高,數(shù)據(jù)和操作耦合度更低。

    注:mllib在后面的版本中可能被廢棄,本文示例使用的是ml庫。

    pyspark.ml訓(xùn)練機(jī)器學(xué)習(xí)庫有三個主要的抽象類:Transformer、Estimator、Pipeline。

    • Transformer主要對應(yīng)feature子模塊,實現(xiàn)了算法訓(xùn)練前的一系列的特征預(yù)處理工作,例如MinMaxScaler、word2vec、onehotencoder等,對應(yīng)操作為transform;
    # 舉例:特征加工
    from pyspark.ml.feature import VectorAssembler
    featuresCreator = VectorAssembler(
        inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
        outputCol='features'
    )

    • Estimator對應(yīng)各種機(jī)器學(xué)習(xí)算法,主要為分類、回歸、聚類和推薦算法4大類,具體可選算法大多在sklearn中均有對應(yīng),對應(yīng)操作為fit;
    # 舉例:分類模型
    from pyspark.ml.classification import LogisticRegression

    logistic = LogisticRegression(featuresCol=featuresCreator.getOutputCol(),
                                    labelCol='INFANT_ALIVE_AT_REPORT')
    • Pipeline可將一些列轉(zhuǎn)換和訓(xùn)練過程串聯(lián)形成流水線。
    # 舉例:創(chuàng)建流水線
    from pyspark.ml import Pipeline

    pipeline = Pipeline(stages=[encoder, featuresCreator, logistic]) # 特征編碼,特征加工,載入LR模型
    # 擬合模型
    train, test = data.randomSplit([0.7,0.3],seed=123)
    model = pipeline.fit(train)

    2.2 PySpark分布式機(jī)器學(xué)習(xí)原理

    在分布式訓(xùn)練中,用于訓(xùn)練模型的工作負(fù)載會在多個微型處理器之間進(jìn)行拆分和共享,這些處理器稱為工作器節(jié)點,通過這些工作器節(jié)點并行工作以加速模型訓(xùn)練。分布式訓(xùn)練可用于傳統(tǒng)的 ML 模型,但更適用于計算和時間密集型任務(wù),如用于訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)。分布式訓(xùn)練有兩種主要類型:數(shù)據(jù)并行及模型并行,主要代表有Spark ML,Parameter Server和TensorFlow。

    spark的分布式訓(xùn)練的實現(xiàn)為數(shù)據(jù)并行:按行對數(shù)據(jù)進(jìn)行分區(qū),從而可以對數(shù)百萬甚至數(shù)十億個實例進(jìn)行分布式訓(xùn)練。以其核心的梯度下降算法為例: 

    1、首先對數(shù)據(jù)劃分至各計算節(jié)點; 

    2、把當(dāng)前的模型參數(shù)廣播到各個計算節(jié)點(當(dāng)模型參數(shù)量較大時會比較耗帶寬資源);

    3、各計算節(jié)點進(jìn)行數(shù)據(jù)抽樣得到mini batch的數(shù)據(jù),分別計算梯度,再通過treeAggregate操作匯總梯度,得到最終梯度gradientSum; 

    4、利用gradientSum更新模型權(quán)重(這里采用的阻斷式的梯度下降方式,當(dāng)各節(jié)點有數(shù)據(jù)傾斜時,每輪的時間取決于最慢的節(jié)點。這是Spark并行訓(xùn)練效率較低的主要原因)。

    PySpark項目實戰(zhàn)

    注:單純拿Pyspark練練手,可無需配置Pyspark集群,直接本地配置下單機(jī)Pyspark,也可以使用線上spark集群(如: community.cloud.databricks.com)。

    本項目通過PySpark實現(xiàn)機(jī)器學(xué)習(xí)建模全流程:包括數(shù)據(jù)的載入,數(shù)據(jù)分析,特征加工,二分類模型訓(xùn)練及評估。

    #!/usr/bin/env python
    # coding: utf-8


    #  初始化SparkSession
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("Python Spark RF example").config("spark.some.config.option""some-value").getOrCreate()

    # 加載數(shù)據(jù)
    df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data.csv",header=True)

    from pyspark.sql.functions import *
    # 數(shù)據(jù)基本信息分析
    df.dtypes # Return df column names and data types
    df.show()  #Display the content of df
    df.head()  #Return first n rows
    df.first()  #Return first row 
    df.take(2)  #Return the first n rows
    df.schema   # Return the schema of df
    df.columns # Return the columns of df
    df.count()  #Count the number of rows in df
    df.distinct().count()  #Count the number of distinct rows in df
    df.printSchema()  #Print the schema of df
    df.explain()  #Print the (logical and physical)  plans
    df.describe().show()  #Compute summary statistics 
    df.groupBy('Survived').agg(avg("Age"),avg("Fare")).show()  # 聚合分析
    df.select(df.Sex, df.Survived==1).show()  # 帶條件查詢 
    df.sort("Age", ascending=False).collect() # 排序
    # 特征加工
    df = df.dropDuplicates()   # 刪除重復(fù)值
    df = df.na.fill(value=0)  # 缺失填充值
    df = df.na.drop()        # 或者刪除缺失值
    df = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性別0 1
    df = df.drop('_c0','Name','Sex'# 刪除姓名、性別、索引列

    # 設(shè)定特征/標(biāo)簽列
    from pyspark.ml.feature import VectorAssembler
    ignore=['Survived']
    vectorAssembler = VectorAssembler(inputCols=[x for x in df.columns  
                      if x not in ignore], outputCol = 'features')
    new_df = vectorAssembler.transform(df)
    new_df = new_df.select(['features''Survived'])

    # 劃分測試集訓(xùn)練集
    train, test = new_df.randomSplit([0.75, 0.25], seed = 12345)

    # 模型訓(xùn)練
    from pyspark.ml.classification import LogisticRegression

    lr = LogisticRegression(featuresCol = 'features'
                             labelCol='Survived')
    lr_model = lr.fit(test)

    # 模型評估
    from pyspark.ml.evaluation import BinaryClassificationEvaluator

    predictions = lr_model.transform(test)
    auc = BinaryClassificationEvaluator().setLabelCol('Survived')
    print('AUC of the model:' + str(auc.evaluate(predictions)))
    print('features weights', lr_model.coefficientMatrix)

    文章首發(fā)于算法進(jìn)階,公眾號閱讀原文可訪問GitHub項目源碼


    瀏覽 103
    點贊
    評論
    收藏
    分享

    手機(jī)掃一掃分享

    分享
    舉報
    評論
    圖片
    表情
    推薦
    點贊
    評論
    收藏
    分享

    手機(jī)掃一掃分享

    分享
    舉報

    <kbd id="5sdj3"></kbd>
    <th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>
    99精品在这里 | 免费成人黄色电影网站 | 欧洲久久网 | 亚洲高清在线 | 西西444www大胆高清图片 |