大數(shù)據(jù)Spark電影評分?jǐn)?shù)據(jù)分析

| 2022-09-21 admin

目錄

  • ??1 數(shù)據(jù) ETL??
  • ??2 使用 SQL 分析??
  • ??3 使用 DSL 分析??
  • ??4 保存結(jié)果數(shù)據(jù)??
  • ??5 案例完整代碼??
  • ??6 Shuffle 分區(qū)數(shù)目問題??

    1 數(shù)據(jù) ETL

    使用電影評分?jǐn)?shù)據(jù)進(jìn)行數(shù)據(jù)分析,分別使用DSL編程和SQL編程,熟悉數(shù)據(jù)處理函數(shù)及SQL使用,業(yè)務(wù)需求說明:對電影評分?jǐn)?shù)據(jù)進(jìn)行統(tǒng)分析,獲取Top10電影(電影評分平均值最高,并且每個(gè)電影被評分的次數(shù)大于2000)。數(shù)據(jù)集ratings.dat總共100萬條數(shù)據(jù),數(shù)據(jù)格式如下每行數(shù)據(jù)各個(gè)字段之間使用雙冒號分開:

    大數(shù)據(jù)Spark電影評分?jǐn)?shù)據(jù)分析_sql

    數(shù)據(jù)處理分析步驟如下:

    1. 第一步、讀取電影評分?jǐn)?shù)據(jù),從本地文件系統(tǒng)讀取
    2. 第二步、轉(zhuǎn)換數(shù)據(jù),指定Schema信息,封裝到DataFrame
    3. 第三步、基于SQL方式分析
    4. 第四步、基于DSL方式分析

    讀取電影評分?jǐn)?shù)據(jù),將其轉(zhuǎn)換為DataFrame,使用指定列名方式定義Schema信息,代碼如下:

    // 構(gòu)建SparkSession實(shí)例對象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .getOrCreate()
    // 導(dǎo)入隱式轉(zhuǎn)換
    
    import spark.implicits._
    
    // 1. 讀取電影評分?jǐn)?shù)據(jù),從本地文件系統(tǒng)讀取
    val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
    // 2. 轉(zhuǎn)換數(shù)據(jù)
    val ratingsDF: DataFrame = rawRatingsDS
      // 過濾數(shù)據(jù).
      .filter(line => null != line && line.trim.split("::").length == 4)
      // 提取轉(zhuǎn)換數(shù)據(jù)
      .mapPartitions { iter =>
        iter.map { line =>
          // 按照分割符分割,拆箱到變量中
          val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
          // 返回四元組
          (userId, movieId, rating.toDouble, timestamp.toLong)
        }
      }
      // 指定列名添加Schema
      .toDF("userId", "movieId", "rating", "timestamp")
    /*
    root
    |-- userId: string (nullable = true)
    |-- movieId: string (nullable = true)
    |-- rating: double (nullable = false)
    |-- timestamp: long (nullable = false)
    */
    //ratingsDF.printSchema()
    /*
    +------+-------+------+---------+
    |userId|movieId|rating|timestamp|
    +------+-------+------+---------+
    | 1| 1193| 5.0|978300760|
    | 1| 661| 3.0|978302109|
    | 1| 594| 4.0|978302268|
    | 1| 919| 4.0|978301368|
    +------+-------+------+---------+
    */
    //ratingsDF.show(4)

    2 使用 SQL 分析

    首先將DataFrame注冊為臨時(shí)視圖,再編寫SQL語句,最后使用SparkSession執(zhí)行,代碼如下:

    // TODO: 基于SQL方式分析
    // 第一步、注冊DataFrame為臨時(shí)視圖
    ratingsDF.createOrReplaceTempView("view_temp_ratings")
    // 第二步、編寫SQL
    val top10MovieDF: DataFrame = spark.sql(
      """
        |SELECT
        | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
        |FROM
        | view_temp_ratings
        |GROUP BY
        | movieId
        |HAVING
        | cnt_rating > 2000
        |ORDER BY
        | avg_rating DESC, cnt_rating DESC
        |LIMIT
        | 10
    """.stripMargin)
    //top10MovieDF.printSchema()
    top10MovieDF.show(10, truncate = false)

    應(yīng)用scala的stripMargin方法,在scala中stripMargin默認(rèn)是“|”作為出來連接符,在多行換行的行頭前面加一個(gè)“|”符號即可。

    代碼實(shí)例:

    val speech = “”"abc

    |def""".stripMargin

    運(yùn)行的結(jié)果為:

    abc

    ldef

    運(yùn)行程序結(jié)果如下:

    大數(shù)據(jù)Spark電影評分?jǐn)?shù)據(jù)分析_big data_02

    3 使用 DSL 分析

    調(diào)用Dataset中函數(shù),采用鏈?zhǔn)骄幊谭治鰯?shù)據(jù),核心代碼如下:

    // TODO: 基于DSL=Domain Special Language(特定領(lǐng)域語言) 分析
    
    import org.apache.spark.sql.functions._
    
    val resultDF: DataFrame = ratingsDF
      // 選取字段
      .select($"movieId", $"rating")
      // 分組:按照電影ID,獲取平均評分和評分次數(shù)
      .groupBy($"movieId")
      .agg( //
        round(avg($"rating"), 2).as("avg_rating"), //
        count($"movieId").as("cnt_rating") //
      )
      // 過濾:評分次數(shù)大于2000
      .filter($"cnt_rating" > 2000)
      // 排序:先按照評分降序,再按照次數(shù)降序
      .orderBy($"avg_rating".desc, $"cnt_rating".desc)
      // 獲取前10
      .limit(10)
    //resultDF.printSchema()
    resultDF.show(10)

    Round函數(shù)返回一個(gè)數(shù)值,該數(shù)值是按照指定的小數(shù)位數(shù)進(jìn)行四舍五入運(yùn)算的結(jié)果。除數(shù)值外,也可對日期進(jìn)行舍入運(yùn)算。

    round(3.19, 1) 將 3.19 四舍五入到一個(gè)小數(shù)位 (3.2)

    round(2.649, 1) 將 2.649 四舍五入到一個(gè)小數(shù)位 (2.6)

    round(-5.574, 2) 將 -5.574 四舍五入到兩小數(shù)位 (-5.57)

    其中使用SparkSQL中自帶函數(shù)庫functions,在org.apache.spark.sql.functions中,包含常用函

    數(shù),有些與Hive中函數(shù)庫類似,但是名稱不一樣。

    大數(shù)據(jù)Spark電影評分?jǐn)?shù)據(jù)分析_數(shù)據(jù)_03

    使用需要導(dǎo)入函數(shù)庫:import org.apache.spark.sql.functions._

    4 保存結(jié)果數(shù)據(jù)

    將分析結(jié)果數(shù)據(jù)保存到外部存儲系統(tǒng)中,比如保存到MySQL數(shù)據(jù)庫表中或者CSV文件中。

    // TODO: 將分析的結(jié)果數(shù)據(jù)保存MySQL數(shù)據(jù)庫和CSV文件
    // 結(jié)果DataFrame被使用多次,緩存
    resultDF.persist(StorageLevel.MEMORY_AND_DISK)
    // 1. 保存MySQL數(shù)據(jù)庫表匯總
    resultDF
      .coalesce(1) // 考慮降低分區(qū)數(shù)目
      .write
      .mode("overwrite")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .jdbc(
        "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
          ode = true",
          "db_test.tb_top10_movies",
          new Properties ()
          )
          // 2. 保存CSV文件:每行數(shù)據(jù)中個(gè)字段之間使用逗號隔開
          resultDF
          .coalesce (1)
          .write.mode ("overwrite")
          .csv ("datas/top10-movies")
          // 釋放緩存數(shù)據(jù)
          resultDF.unpersist ()

    查看數(shù)據(jù)庫中結(jié)果表的數(shù)據(jù):

    大數(shù)據(jù)Spark電影評分?jǐn)?shù)據(jù)分析_數(shù)據(jù)_04

    5 案例完整代碼

    電影評分?jǐn)?shù)據(jù)分析,經(jīng)過數(shù)據(jù)ETL、數(shù)據(jù)分析(SQL分析和DSL分析)及最終保存結(jié)果,整套

    數(shù)據(jù)處理分析流程,其中涉及到很多數(shù)據(jù)細(xì)節(jié),完整代碼如下

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    import org.apache.spark.storage.StorageLevel
    
    /**
     * 需求:對電影評分?jǐn)?shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,獲取Top10電影(電影評分平均值最高,并且每個(gè)電影被評分的次數(shù)大于2000)
     */
    object SparkTop10Movie {
      def main(args: Array[String]): Unit = {
        // 構(gòu)建SparkSession實(shí)例對象
        val spark: SparkSession = SparkSession.builder()
          .master("local[4]")
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          // TODO: 設(shè)置shuffle時(shí)分區(qū)數(shù)目
          .config("spark.sql.shuffle.partitions", "4")
          .getOrCreate()
        // 導(dǎo)入隱式轉(zhuǎn)換
        import spark.implicits._
        // 1. 讀取電影評分?jǐn)?shù)據(jù),從本地文件系統(tǒng)讀取
        val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
        // 2. 轉(zhuǎn)換數(shù)據(jù)
        val ratingsDF: DataFrame = rawRatingsDS
          // 過濾數(shù)據(jù)
          .filter(line => null != line && line.trim.split("::").length == 4)
          // 提取轉(zhuǎn)換數(shù)據(jù)
          .mapPartitions { iter =>
            iter.map { line =>
              // 按照分割符分割,拆箱到變量中
              val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
              // 返回四元組
              (userId, movieId, rating.toDouble, timestamp.toLong)
            }
          }
          // 指定列名添加Schema
          .toDF("userId", "movieId", "rating", "timestamp")
        /*
        root
        |-- userId: string (nullable = true)
        |-- movieId: string (nullable = true)
        |-- rating: double (nullable = false)
        |-- timestamp: long (nullable = false)
        */
        //ratingsDF.printSchema()
        /*
        +------+-------+------+---------+
        |userId|movieId|rating|timestamp|
        +------+-------+------+---------+
        | 1| 1193| 5.0|978300760|
        | 1| 661| 3.0|978302109|
        | 1| 594| 4.0|978302268|
        | 1| 919| 4.0|978301368|
        +------+-------+------+---------+
        */
        //ratingsDF.show(4)
        // TODO: 基于SQL方式分析
        // 第一步、注冊DataFrame為臨時(shí)視圖
        ratingsDF.createOrReplaceTempView("view_temp_ratings")
        // 第二步、編寫SQL
        val top10MovieDF: DataFrame = spark.sql(
          """
            |SELECT
            | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
            |FROM
            | view_temp_ratings
            |GROUP BY
            | movieId
            |HAVING
            | cnt_rating > 2000
            |ORDER BY
            | avg_rating DESC, cnt_rating DESC
            |LIMIT
            | 10
    """.stripMargin)
        //top10MovieDF.printSchema()
        top10MovieDF.show(10, truncate = false)
        println("===============================================================")
        // TODO: 基于DSL=Domain Special Language(特定領(lǐng)域語言) 分析
        import org.apache.spark.sql.functions._
        val resultDF: DataFrame = ratingsDF
          // 選取字段
          .select($"movieId", $"rating")
          // 分組:按照電影ID,獲取平均評分和評分次數(shù)
          .groupBy($"movieId")
          .agg( //
            round(avg($"rating"), 2).as("avg_rating"), //
            count($"movieId").as("cnt_rating") //
          )
          // 過濾:評分次數(shù)大于2000
          .filter($"cnt_rating" > 2000)
          // 排序:先按照評分降序,再按照次數(shù)降序
          .orderBy($"avg_rating".desc, $"cnt_rating".desc)
          // 獲取前10
          .limit(10)
        //resultDF.printSchema()
        resultDF.show(10)
        // TODO: 將分析的結(jié)果數(shù)據(jù)保存MySQL數(shù)據(jù)庫和CSV文件
        // 結(jié)果DataFrame被使用多次,緩存
        resultDF.persist(StorageLevel.MEMORY_AND_DISK)
        // 1. 保存MySQL數(shù)據(jù)庫表匯總
        resultDF
          .coalesce(1) // 考慮降低分區(qū)數(shù)目
          .write
          .mode("overwrite")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("user", "root")
          .option("password", "123456")
          .jdbc(
            "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
              ode = true",
              "db_test.tb_top10_movies",
              new Properties ()
              )
              // 2. 保存CSV文件:每行數(shù)據(jù)中個(gè)字段之間使用逗號隔開
              resultDF
              .coalesce (1)
              .write.mode ("overwrite")
              .csv ("datas/top10-movies")
              // 釋放緩存數(shù)據(jù)
              resultDF.unpersist ()
              // 應(yīng)用結(jié)束,關(guān)閉資源
              Thread.sleep (10000000)
              spark.stop ()
              }
              }

    6 Shuffle 分區(qū)數(shù)目問題

    運(yùn)行上述程序時(shí),查看WEB UI監(jiān)控頁面發(fā)現(xiàn),某個(gè)Stage中有200個(gè)Task任務(wù),也就是說RDD有200分區(qū)Partition。大數(shù)據(jù)Spark電影評分?jǐn)?shù)據(jù)分析_big data_05

    原因:在SparkSQL中當(dāng)Job中產(chǎn)生Shuffle時(shí),默認(rèn)的分區(qū)數(shù)(spark.sql.shuffle.partitions )為

    200,在實(shí)際項(xiàng)目中要合理的設(shè)置。在構(gòu)建SparkSession實(shí)例對象時(shí),設(shè)置參數(shù)的值:

    // 構(gòu)建SparkSession實(shí)例對象
    val spark: SparkSession = SparkSession.builder()
    .master("local[4]")
    .appName(this.getClass.getSimpleName.stripSuffix("$"))
    // TODO: 設(shè)置shuffle時(shí)分區(qū)數(shù)目
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
    // 導(dǎo)入隱式轉(zhuǎn)換
    import spark.implicits._