scala常见的操作

1. 为什么会出现Spark?

Hadoop(离线数据计算)

Hive(SQL)

2. SparkSQL的发展

对于非开发人员,也希望使用大数平台强劲的少量数据处理分析能力,因此产生以SQL语句的方法来对数据进行分析。

刚开始是没有SparkSQL的,最初发展的是Apache Shark由于它过度依赖Hive,导致性能瓶颈无法解决。

2014年重新开户Spark SQL的研发计划。

3. Spark SQL组成部分

DataFrame(Table, Excel)基于列名的数据结构

  1. Row(行操作)
    • 每一个Row对象,表示 DataFrame中的一行数据
  2. Cloumn(列操作)
    • 每一个Column对象,表示DataFrame中的某一列数据

4. SQL编程

RDD转DataFrame

  1. 反射机制转换

    case class Person(id: String, name: String, age: Int)
       
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("SparkSqlContext")
          .setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        val people = sc.textFile("file:///C:\\myjar\\spark-1.6.3-bin-hadoop2.6\\examples\\src\\main\\resources\\people2.txt")
          .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt))
          .toDF()
        people.registerTempTable("people")
        val teenagers = sqlContext.sql("select * from people")
        teenagers.rdd
        val rdd = sc.parallelize(teenagers.collect())
        rdd.foreach(println)
      }
    

    注:case class放在函数外部

  2. 以编程方式动态指定元数据

    官网给的方式

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val people = sc.textFile("examples/src/main/resources/people.txt")
    val schemaString = "name age"
       
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.types.{StructType,StructField,StringType};
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    peopleDataFrame.registerTempTable("people")
    val results = sqlContext.sql("SELECT name FROM people")
    results.map(t => "Name: " + t(0)).collect().foreach(println)
    

    另一种方式

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val stuRDD = sc.textFile("...//user.txt")
    val rowRDD = stuRDD.map(_.split(","))
    	.map(u=>u(0).trim.toInt, u(1))
    val schema = StructType( Array(
    	StructField("id", IntegerType, ture),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
    ))
    // 将RDD转换成DataFrame
    val stuDF = sqlContext.createDataFrame(rowRDD, schema)
    stuDF.registerTempTable("student")
    val resultDF = sqlContext.sql("select id, name form student")
    resultDF.show()
    

5. 数据源

6. 性能优化

Caching Data In Memory

可以将tables缓存在内存中,通过sqlcontext.cacheTable("tableName") or dataFrame.cache()。然后SparkSQL将只扫描所需要的列并,并自动调整压力以缓解GC和内存的压力。也可以通过sqlContext.uncacheTable("table")将表移出内存。

默认参数如下

Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true 自动启动压缩
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列缓存批次的大小。大小越大可以提高内存的利用率和压缩效率,可以会出现OOM异常。

更多SparkSQL优化参数

SparkSQL参考——官网