13161216443

您所在位置: 首頁> 學習課程> 大數據培訓 | Apache Spark 中編寫可伸縮代碼的4個技巧

大數據培訓 | Apache Spark 中編寫可伸縮代碼的4個技巧

發布百知教育 來源:學習課程 2019-12-05

在本文中,我將分享一些關于如何編寫可伸縮的 Apache Spark 代碼的技巧。本文提供的示例代碼實際上是基于我在現實世界中遇到的。因此,通過分享這些技巧,我希望能夠幫助新手在不增加集群資源的情況下編寫高性能 Spark 代碼。 

背景

我最近接手了一個 notebook ,它主要用來跟蹤我們的 AB 測試結果,以評估我們的推薦引擎的性能。這個 notebook 里面的代碼運行起來非常的慢,以下代碼片段(片段1)是運行時間最長的:


val df = bucketPeriod.map{date =>
   val Array(year, month, day) = date.split("-")
   getAnalytics(BucketRecSysTracking, Brand, year, month, day)
   .select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15")
   .filter($"c3".isin(18, 37))
   .distinct
}.filter(_.count > 0)
.reduce(_ union _)
.cache


在我們的 AB 測試實驗中,用于跟蹤數據的文件按年、月和日劃分到不同文件夾中,文中中每一行都是一個 JSON 字符串,每天可能有幾百個 JSON 文件。如果上面代碼中的 bucketPeriod 代表需要查詢的天列表,那么對于每天的數據會調用 getAnalytics 函數去遍歷每天對應的文件夾下面的 json 文件,程序得到了每天的統計數,然后通過 reduce( union ) 合并成一個 DataFrame,并且刪掉了 c3 不滿足 isin(18, 37) 的數據。

運行上面的代碼來遍歷三天的數據居然要花費 17.72 分鐘,而運行 df.count 居然花費了 26.71 分鐘!現在讓我們來看看是否不通過增加機器的資源來加快運行速度。

在我們深入討論之前,下來看一下 getAnalytics 函數的實現(片段2)。其主要作用就是讀取某個目錄下的所有 JSON 文件,然后根據一些字段的情況添加一些東西而已。


def getAnalytics(bucketName: String, brand: String, bucketYear: String, bucketMonth: String,
     bucketDay: String, candidate_field: String=candidateField, groups: String=Groups): DataFrame = {
 var rankingOrderedIds = Window.partitionBy("c12").orderBy("id")
 val s3PathAnalytics = getS3Path(bucketName, brand, bucketFolder, year=bucketYear, month=bucketMonth, day=bucketDay)
 readJSON(s3PathAnalytics)
   .distinct
   .withColumn("x", explode($"payload"))
   // a few more calls to withColumn to create columns
   .withColumn("c10", explode(when(size(col("x1")) > 0, col("x1")).otherwise(array(lit(null).cast("string")))))
   // a few more calls to withColumn to create columns
   .withColumn("id", monotonically_increasing_id)
   // a few more calls to withColumn to create columns
   .withColumn("c12", concat_ws("X", $"x2", $"x3", $"c3"))
   .withColumn("c13", rank().over(rankingOrderedIds))
   .distinct
}


技巧一:盡可能給 Spark 函數更多的輸入路徑 

最上面的代碼片段每次調用 spark.read.json 的時候只輸入一個目錄,這樣的寫法非常的低效,因為 spark.read.json 可以接受文件名列表,然后 driver 只需要調度一次就可以獲取到這些文件列表里面的數據,而不是像上面一樣每個路徑調用一次。

所以如果你需要讀取分散在不同文件夾里面的文件,你需要將類似于下面的代碼(片段3)


getDays("2019-08-01", "2019-08-31")
   .map{date =>
       val Array(year, month, day) = date.split("-")
       val s3PathAnalytics = getS3Path(bucketName, brand, bucketFolder, bucketYear, bucketMonth, bucketDay)
       readJSON(s3PathAnalytics)
   }


修改成以下的代碼邏輯(片段4)


val s3Files = getDays("2019-08-01", "2019-08-31")
               .map(_.split("-"))
               .map{
                   case Array(year, month, day) => getS3Path(bucketName, brand, bucketFolder, bucketYear, bucketMonth, bucketDay)
               }

spark.read.json(s3Files: _*)


這樣的寫法比之前的寫法速度是要快的,而且如果輸入的目錄越多,速度也要更快。

技巧二:盡可能跳過模式推斷

根據 spark.read.json 文檔的描述:spark.read.json 函數會先遍歷一下輸入的目錄,以便確定輸入數據的 schema。所以如果你事先知道輸入文件的 schema,最好事先指定。

因此,我們的代碼可以修改成以下樣子(片段5):


val s3Files = getDays("2019-08-01", "2019-08-31")
               .map(_.split("-"))
               .map{
                   case Array(year, month, day) => getS3Path(bucketName, brand, bucketFolder, bucketYear, bucketMonth, bucketDay)
               }

val jsonString = spark.read.json(s3Files(0)).schema.json
val newSchema = DataType.fromJson(jsonString).asInstanceOf[StructType]

spark.read.schema(newSchema).json(s3Files: _*)


上面代碼片段我們先掃描了第一個文件,然后從文件中獲取了數據的 schema,然后把獲取到的 schema 傳遞給 spark.read,上面的代碼片段一共運行了 29.82。

技巧三:盡可能避免 Shuffle

現在我們已經提高了文件 I/O 的速度,讓我們來看看能不能提升 getAnalytics 函數的運行速度。

getAnalytics 函數里面一開始就調用了 distinct,這個是會導致 shuffle 的算子。那我們如何快速識別出哪些操作會導致 shuffle 呢?答案是調用 RDD 函數上面的 toDebugString 函數,即可判斷,具體如下:


df1.distinct.rdd.toDebugString

res20: String =
(200) MapPartitionsRDD[951] at rdd at command-1:1 []
 |   MapPartitionsRDD[950] at rdd at command-1:1 []
 |   MapPartitionsRDD[949] at rdd at command-1:1 []
 |   ShuffledRowRDD[948] at rdd at command-1:1 []
 +-(350) MapPartitionsRDD[947] at rdd at command-1:1 []
     |   MapPartitionsRDD[946] at rdd at command-1:1 []
     |   FileScanRDD[945] at rdd at command-1:1 []


使用同樣的方法,我們也可以快速找到 rank().over(rankingOrderedIds) 方法也會導致 shuffle。

在這種情況下,讀取文件后立即觸發一次 shuffle 是一個壞主意,因為整個數據集是非常大的,而且我們可能還讀出不需要的列。因此,我們不必在集群中移動這些大文件。我們要盡可能將 shuffle 的發生時間推到最后,而且最好先過濾一部分數據。

根據這些思路,我們可以將 getAnalytics 函數的實現重寫如下(片段6):


val df2 = df1
   .withColumn("x", explode($"payload"))
   // a few more calls to withColumn to create columns
   .withColumn("c10", explode(when(size(col("x1")) > 0, col("x1")).otherwise(array(lit(null).cast("string")))))
   // a few more calls to withColumn to create columns
   .withColumn("id", monotonically_increasing_id)
   // a few more calls to withColumn to create columns
   .withColumn("c12", concat_ws("X", $"x2", $"x3", $"c3"))
   .withColumn("c13", rank().over(rankingOrderedIds))
   .select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9",
           "c10", "c11", "c12", "c13", "c14", "c15")
   .filter($"c3".isin(18, 37))
   .distinct


為了清楚起見,我將片段1的相關代碼部分合并到片段6中,以便得到相同的結果。注意,最開始的寫法需要調用三次 distinct (一次在片段1,二次在片段2中),而最新的寫法只需要調用一次 distinct,并得到同樣的結果。當發生 shuffle 時(片段6中的第9行),物理計劃顯示只有 select 調用中指定的列被移動,這很好,因為它只是原始數據集中所有列的一個小子集。

片段6中的代碼執行只花了0.90秒,count 只需要2.58分鐘。這個例子表明,認真考慮我們編寫好的查詢是有好處的,這樣可以避免執行一些不必要的 shuffle 操作。

技巧四:不要過度依賴 Catalyst Optimizer 

我們比較以下兩個代碼片段:


val df2 = df1
   .withColumn("x", explode($"payload"))
   // a few more calls to withColumn to create columns
   .withColumn("c10", explode(when(size(col("x1")) > 0, col("x1")).otherwise(array(lit(null).cast("string")))))
   // a few more calls to withColumn to create columns
   .withColumn("id", monotonically_increasing_id)
   // a few more calls to withColumn to create columns
   .withColumn("c12", concat_ws("X", $"x2", $"x3", $"c3"))
   .withColumn("c13", rank().over(rankingOrderedIds))
   .select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9",
           "c10", "c11", "c12", "c13", "c14", "c15")
   .filter($"c3".isin(18, 37))
   .distinct


以及


val df2 = df1
   .withColumn("x", explode($"payload"))
   // a few more calls to withColumn to create columns
   .withColumn("c10", explode(when(size(col("x1")) > 0, col("x1")).otherwise(array(lit(null).cast("string")))))
   // a few more calls to withColumn to create columns
   .withColumn("id", monotonically_increasing_id)
   // a few more calls to withColumn to create columns
   .withColumn("c12", concat_ws("X", $"x2", $"x3", $"c3"))
   .filter($"c3".isin(18, 37))
   .withColumn("c13", rank().over(rankingOrderedIds))
   .select("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9",
           "c10", "c11", "c12", "c13", "c14", "c15")
   .distinct


代碼片段7和片段8功能是相同的,除了執行 fliter 的順序不一樣(分別是第14行和第9行)。但是這兩個代碼片段最后的輸出結果是一樣的,因為 rank 窗口函數并不用于 c3 這列,所以在執行 rank 之前或之后執行 filter 并不重要。但是,執行 df2.count 的時間有顯著差異:片段7代碼執行 df2.count 的時間為 2.58分鐘,而片段8代碼執行 df2.count 的時間為45.88秒。 這兩個查詢的物理計劃解釋了原因:


大數據培訓班


上圖顯示,在調用 rank 窗口函數之前進行過濾的效果是將第一階段的行數從 8.02 億減少到僅僅700萬。因此,當發生 shuffle 時,只需要在集群中移動 2.9 GB的數據。相反,在調用 rank 窗口函數之前沒有進行過濾會導致 304.9 GB的數據在集群中移動。

這個例子告訴我們,如果您的查詢中有 shuffle 操作,請仔細分析能不能先執行一些 filter 操作,以減少需要傳輸的數據量。另外,我們也要學會從物理計劃中尋找一些優化點,而不能完全依賴于 Catalyst Optimizer,因為 Catalyst Optimizer 不是萬能的。


大數據培訓班:http://www.onhairsalon.com/bigdata2019


注釋:本文內容來自公眾號過往記憶大數據

上一篇:大數據培訓班 | 關于商業智能、數據可視化內容

下一篇:應屆生去公司找個Java程序員的職位需要什么技能?

相關推薦

www.onhairsalon.com

有位老師想和您聊一聊

關閉

立即申請