目錄
- ??1 數(shù)據(jù) ETL??
- ??2 使用 SQL 分析??
- ??3 使用 DSL 分析??
- ??4 保存結(jié)果數(shù)據(jù)??
- ??5 案例完整代碼??
- ??6 Shuffle 分區(qū)數(shù)目問題??
1 數(shù)據(jù) ETL
使用電影評分?jǐn)?shù)據(jù)進(jìn)行數(shù)據(jù)分析,分別使用DSL編程和SQL編程,熟悉數(shù)據(jù)處理函數(shù)及SQL使用,業(yè)務(wù)需求說明:對電影評分?jǐn)?shù)據(jù)進(jìn)行統(tǒng)分析,獲取Top10電影(電影評分平均值最高,并且每個(gè)電影被評分的次數(shù)大于2000)。數(shù)據(jù)集ratings.dat總共100萬條數(shù)據(jù),數(shù)據(jù)格式如下每行數(shù)據(jù)各個(gè)字段之間使用雙冒號分開:
數(shù)據(jù)處理分析步驟如下:
1. 第一步、讀取電影評分?jǐn)?shù)據(jù),從本地文件系統(tǒng)讀取 2. 第二步、轉(zhuǎn)換數(shù)據(jù),指定Schema信息,封裝到DataFrame 3. 第三步、基于SQL方式分析 4. 第四步、基于DSL方式分析
讀取電影評分?jǐn)?shù)據(jù),將其轉(zhuǎn)換為DataFrame,使用指定列名方式定義Schema信息,代碼如下:
// 構(gòu)建SparkSession實(shí)例對象 val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getSimpleName.stripSuffix("$")) .getOrCreate() // 導(dǎo)入隱式轉(zhuǎn)換 import spark.implicits._ // 1. 讀取電影評分?jǐn)?shù)據(jù),從本地文件系統(tǒng)讀取 val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat") // 2. 轉(zhuǎn)換數(shù)據(jù) val ratingsDF: DataFrame = rawRatingsDS // 過濾數(shù)據(jù). .filter(line => null != line && line.trim.split("::").length == 4) // 提取轉(zhuǎn)換數(shù)據(jù) .mapPartitions { iter => iter.map { line => // 按照分割符分割,拆箱到變量中 val Array(userId, movieId, rating, timestamp) = line.trim.split("::") // 返回四元組 (userId, movieId, rating.toDouble, timestamp.toLong) } } // 指定列名添加Schema .toDF("userId", "movieId", "rating", "timestamp") /* root |-- userId: string (nullable = true) |-- movieId: string (nullable = true) |-- rating: double (nullable = false) |-- timestamp: long (nullable = false) */ //ratingsDF.printSchema() /* +------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 1| 1193| 5.0|978300760| | 1| 661| 3.0|978302109| | 1| 594| 4.0|978302268| | 1| 919| 4.0|978301368| +------+-------+------+---------+ */ //ratingsDF.show(4)
2 使用 SQL 分析
首先將DataFrame注冊為臨時(shí)視圖,再編寫SQL語句,最后使用SparkSession執(zhí)行,代碼如下:
// TODO: 基于SQL方式分析 // 第一步、注冊DataFrame為臨時(shí)視圖 ratingsDF.createOrReplaceTempView("view_temp_ratings") // 第二步、編寫SQL val top10MovieDF: DataFrame = spark.sql( """ |SELECT | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating |FROM | view_temp_ratings |GROUP BY | movieId |HAVING | cnt_rating > 2000 |ORDER BY | avg_rating DESC, cnt_rating DESC |LIMIT | 10 """.stripMargin) //top10MovieDF.printSchema() top10MovieDF.show(10, truncate = false)
應(yīng)用scala的stripMargin方法,在scala中stripMargin默認(rèn)是“|”作為出來連接符,在多行換行的行頭前面加一個(gè)“|”符號即可。
代碼實(shí)例:
val speech = “”"abc
|def""".stripMargin
運(yùn)行的結(jié)果為:
abc
ldef
運(yùn)行程序結(jié)果如下:
3 使用 DSL 分析
調(diào)用Dataset中函數(shù),采用鏈?zhǔn)骄幊谭治鰯?shù)據(jù),核心代碼如下:
// TODO: 基于DSL=Domain Special Language(特定領(lǐng)域語言) 分析 import org.apache.spark.sql.functions._ val resultDF: DataFrame = ratingsDF // 選取字段 .select($"movieId", $"rating") // 分組:按照電影ID,獲取平均評分和評分次數(shù) .groupBy($"movieId") .agg( // round(avg($"rating"), 2).as("avg_rating"), // count($"movieId").as("cnt_rating") // ) // 過濾:評分次數(shù)大于2000 .filter($"cnt_rating" > 2000) // 排序:先按照評分降序,再按照次數(shù)降序 .orderBy($"avg_rating".desc, $"cnt_rating".desc) // 獲取前10 .limit(10) //resultDF.printSchema() resultDF.show(10)
Round函數(shù)返回一個(gè)數(shù)值,該數(shù)值是按照指定的小數(shù)位數(shù)進(jìn)行四舍五入運(yùn)算的結(jié)果。除數(shù)值外,也可對日期進(jìn)行舍入運(yùn)算。
round(3.19, 1) 將 3.19 四舍五入到一個(gè)小數(shù)位 (3.2)
round(2.649, 1) 將 2.649 四舍五入到一個(gè)小數(shù)位 (2.6)
round(-5.574, 2) 將 -5.574 四舍五入到兩小數(shù)位 (-5.57)
其中使用SparkSQL中自帶函數(shù)庫functions,在org.apache.spark.sql.functions中,包含常用函
數(shù),有些與Hive中函數(shù)庫類似,但是名稱不一樣。
使用需要導(dǎo)入函數(shù)庫:import org.apache.spark.sql.functions._
4 保存結(jié)果數(shù)據(jù)
將分析結(jié)果數(shù)據(jù)保存到外部存儲系統(tǒng)中,比如保存到MySQL數(shù)據(jù)庫表中或者CSV文件中。
// TODO: 將分析的結(jié)果數(shù)據(jù)保存MySQL數(shù)據(jù)庫和CSV文件 // 結(jié)果DataFrame被使用多次,緩存 resultDF.persist(StorageLevel.MEMORY_AND_DISK) // 1. 保存MySQL數(shù)據(jù)庫表匯總 resultDF .coalesce(1) // 考慮降低分區(qū)數(shù)目 .write .mode("overwrite") .option("driver", "com.mysql.cj.jdbc.Driver") .option("user", "root") .option("password", "123456") .jdbc( "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic ode = true", "db_test.tb_top10_movies", new Properties () ) // 2. 保存CSV文件:每行數(shù)據(jù)中個(gè)字段之間使用逗號隔開 resultDF .coalesce (1) .write.mode ("overwrite") .csv ("datas/top10-movies") // 釋放緩存數(shù)據(jù) resultDF.unpersist ()
查看數(shù)據(jù)庫中結(jié)果表的數(shù)據(jù):
5 案例完整代碼
電影評分?jǐn)?shù)據(jù)分析,經(jīng)過數(shù)據(jù)ETL、數(shù)據(jù)分析(SQL分析和DSL分析)及最終保存結(jié)果,整套
數(shù)據(jù)處理分析流程,其中涉及到很多數(shù)據(jù)細(xì)節(jié),完整代碼如下
import java.util.Properties import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.storage.StorageLevel /** * 需求:對電影評分?jǐn)?shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,獲取Top10電影(電影評分平均值最高,并且每個(gè)電影被評分的次數(shù)大于2000) */ object SparkTop10Movie { def main(args: Array[String]): Unit = { // 構(gòu)建SparkSession實(shí)例對象 val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getSimpleName.stripSuffix("$")) // TODO: 設(shè)置shuffle時(shí)分區(qū)數(shù)目 .config("spark.sql.shuffle.partitions", "4") .getOrCreate() // 導(dǎo)入隱式轉(zhuǎn)換 import spark.implicits._ // 1. 讀取電影評分?jǐn)?shù)據(jù),從本地文件系統(tǒng)讀取 val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat") // 2. 轉(zhuǎn)換數(shù)據(jù) val ratingsDF: DataFrame = rawRatingsDS // 過濾數(shù)據(jù) .filter(line => null != line && line.trim.split("::").length == 4) // 提取轉(zhuǎn)換數(shù)據(jù) .mapPartitions { iter => iter.map { line => // 按照分割符分割,拆箱到變量中 val Array(userId, movieId, rating, timestamp) = line.trim.split("::") // 返回四元組 (userId, movieId, rating.toDouble, timestamp.toLong) } } // 指定列名添加Schema .toDF("userId", "movieId", "rating", "timestamp") /* root |-- userId: string (nullable = true) |-- movieId: string (nullable = true) |-- rating: double (nullable = false) |-- timestamp: long (nullable = false) */ //ratingsDF.printSchema() /* +------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 1| 1193| 5.0|978300760| | 1| 661| 3.0|978302109| | 1| 594| 4.0|978302268| | 1| 919| 4.0|978301368| +------+-------+------+---------+ */ //ratingsDF.show(4) // TODO: 基于SQL方式分析 // 第一步、注冊DataFrame為臨時(shí)視圖 ratingsDF.createOrReplaceTempView("view_temp_ratings") // 第二步、編寫SQL val top10MovieDF: DataFrame = spark.sql( """ |SELECT | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating |FROM | view_temp_ratings |GROUP BY | movieId |HAVING | cnt_rating > 2000 |ORDER BY | avg_rating DESC, cnt_rating DESC |LIMIT | 10 """.stripMargin) //top10MovieDF.printSchema() top10MovieDF.show(10, truncate = false) println("===============================================================") // TODO: 基于DSL=Domain Special Language(特定領(lǐng)域語言) 分析 import org.apache.spark.sql.functions._ val resultDF: DataFrame = ratingsDF // 選取字段 .select($"movieId", $"rating") // 分組:按照電影ID,獲取平均評分和評分次數(shù) .groupBy($"movieId") .agg( // round(avg($"rating"), 2).as("avg_rating"), // count($"movieId").as("cnt_rating") // ) // 過濾:評分次數(shù)大于2000 .filter($"cnt_rating" > 2000) // 排序:先按照評分降序,再按照次數(shù)降序 .orderBy($"avg_rating".desc, $"cnt_rating".desc) // 獲取前10 .limit(10) //resultDF.printSchema() resultDF.show(10) // TODO: 將分析的結(jié)果數(shù)據(jù)保存MySQL數(shù)據(jù)庫和CSV文件 // 結(jié)果DataFrame被使用多次,緩存 resultDF.persist(StorageLevel.MEMORY_AND_DISK) // 1. 保存MySQL數(shù)據(jù)庫表匯總 resultDF .coalesce(1) // 考慮降低分區(qū)數(shù)目 .write .mode("overwrite") .option("driver", "com.mysql.cj.jdbc.Driver") .option("user", "root") .option("password", "123456") .jdbc( "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic ode = true", "db_test.tb_top10_movies", new Properties () ) // 2. 保存CSV文件:每行數(shù)據(jù)中個(gè)字段之間使用逗號隔開 resultDF .coalesce (1) .write.mode ("overwrite") .csv ("datas/top10-movies") // 釋放緩存數(shù)據(jù) resultDF.unpersist () // 應(yīng)用結(jié)束,關(guān)閉資源 Thread.sleep (10000000) spark.stop () } }
6 Shuffle 分區(qū)數(shù)目問題
運(yùn)行上述程序時(shí),查看WEB UI監(jiān)控頁面發(fā)現(xiàn),某個(gè)Stage中有200個(gè)Task任務(wù),也就是說RDD有200分區(qū)Partition。
原因:在SparkSQL中當(dāng)Job中產(chǎn)生Shuffle時(shí),默認(rèn)的分區(qū)數(shù)(spark.sql.shuffle.partitions )為
200,在實(shí)際項(xiàng)目中要合理的設(shè)置。在構(gòu)建SparkSession實(shí)例對象時(shí),設(shè)置參數(shù)的值:
// 構(gòu)建SparkSession實(shí)例對象 val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getSimpleName.stripSuffix("$")) // TODO: 設(shè)置shuffle時(shí)分區(qū)數(shù)目 .config("spark.sql.shuffle.partitions", "4") .getOrCreate() // 導(dǎo)入隱式轉(zhuǎn)換 import spark.implicits._