Spark RDD是Spark的核心数据结构,代表不可变的/可以分区并行计算的数据集合。Spark RDD和DataFrame的关系如下:

image

下面是一个使用RDD进行数据操作的例子,值得说一句的是,对写SQL很熟悉的话,上手Spark会容易一些。

#等级 姓名 性别 销售产品 数量
2 ZhangFei Male iPhone 199
2 ZhangFei Male iPad 280
2 ZhangFei Male macbook 75
2 GuanYu Male iPhone 390
2 GuanYu Male iPad 135
2 GuanYu Male macbook 42
3 XiaoQiao Female iPhone 200
3 XiaoQiao Female macbook 101
3 XiaoQiao Female iPad 290
3 Lusu Male iPhone 101
3 Lusu Male macbook 88
3 Lusu Male iPad 196
5 ZhangLiao Male macbook 61
5 ZhangLiao Male iPhone 155
5 ZhangLiao Male iPad 232
import org.apache.spark.{SparkConf, SparkContext}

// A case class to represent one line of the sales information
case class Sale (classId: Int, name: String, gender: String, product: String, quantity: Int)

object RddSales {
  val fileName = "src/main/resources/sales.txt"

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("RDD_Students").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val sales = sc.textFile(fileName)
      .filter(line => line.trim.nonEmpty && !line.startsWith("#"))
      .map(line => {
        val arr = line.split("\\s+")
        Sale(arr(0).toInt, arr(1), arr(2), arr(3), arr(4).toInt)
      })
    //println(sales.collect.toList)

    // 1.查询销售人员的数量
    val salesPersonCount = sales.map(sale => sale.name).distinct.count()
    println(s"Total number of the sales is: $salesPersonCount")

    // 2.查询男性销售人员的数量
    val maleSalesCount = sales
      .map(sale => (sale.name, sale.gender))
      .filter(x => x._2 == "Male")
      .distinct
      .count
    println(s"Total number of the sales (Gender Male) is: $maleSalesCount")

    // 3.查询等级3人员销售的iPhone数量
    val grade3Sales = sales
      .filter(sale => sale.classId == 3 && sale.product == "iPhone")
      .map(sale => sale.quantity)
      .sum

    println(s"Total count of the iPhone sales by Class 3 is: $grade3Sales")

    // 4.查询平均每人销售iPhone产品的数量
    val totaliPhoneSales = sales
      .filter(sale => sale.product.equals("iPhone"))
      .map(sale => (sale.quantity, 1))
      // below is to reduce a tuple: 
      // ._1 is the product quantity, ._2 is the person quantity
      .reduce((a, b)=> (a._1 + b._1 , a._2 + b._2))

    val averageiPhoneSales = totaliPhoneSales._1 * 1.0/ totaliPhoneSales._2
    println(s"Average iPhone sales per person is: $averageiPhoneSales")

    // 5.查询销售macbook最多的人员
    val maxMacbookSeller = sales
       .filter(sale => sale.product.equals("macbook"))
       .sortBy(sale => sale.quantity, ascending = false)
       .take(1)
       .toList
     println("Max macbook sales seller: " + maxMacbookSeller.head)
  }
}

Result as below:

Total number of the sales is: 5
Total number of the sales (Gender Male) is: 4
Total count of the iPhone sales by Class 3 is: 301.0
Average iPhone sales per person is: 209.0
Max macbook sales seller: Sale(3,XiaoQiao,Female,macbook,101)