scala常见的操作
1. 为什么会出现Spark?
Hadoop(离线数据计算)
- 慢,Hadoop基于磁盘I/O
- 计算只有Map、Reduce两个阶段,不支持复杂计算
- 想要用Hadoop必须会Java
Hive(SQL)
- 以SQL语句的方式来执行Hadoop的MapReduce
2. SparkSQL的发展
对于非开发人员,也希望使用大数平台强劲的少量数据处理分析能力,因此产生以SQL语句的方法来对数据进行分析。
刚开始是没有SparkSQL的,最初发展的是Apache Shark由于它过度依赖Hive,导致性能瓶颈无法解决。
2014年重新开户Spark SQL的研发计划。
3. Spark SQL组成部分
DataFrame(Table, Excel)基于列名的数据结构
- Row(行操作)
- 每一个Row对象,表示 DataFrame中的一行数据
- Cloumn(列操作)
- 每一个Column对象,表示DataFrame中的某一列数据
4. SQL编程
RDD转DataFrame
-
反射机制转换
case class Person(id: String, name: String, age: Int) def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("SparkSqlContext") .setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val people = sc.textFile("file:///C:\\myjar\\spark-1.6.3-bin-hadoop2.6\\examples\\src\\main\\resources\\people2.txt") .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.registerTempTable("people") val teenagers = sqlContext.sql("select * from people") teenagers.rdd val rdd = sc.parallelize(teenagers.collect()) rdd.foreach(println) }
注:case class放在函数外部
-
以编程方式动态指定元数据
官网给的方式
val sqlContext = new org.apache.spark.sql.SQLContext(sc) val people = sc.textFile("examples/src/main/resources/people.txt") val schemaString = "name age" import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType,StructField,StringType}; val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) peopleDataFrame.registerTempTable("people") val results = sqlContext.sql("SELECT name FROM people") results.map(t => "Name: " + t(0)).collect().foreach(println)
另一种方式
val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val stuRDD = sc.textFile("...//user.txt") val rowRDD = stuRDD.map(_.split(",")) .map(u=>u(0).trim.toInt, u(1)) val schema = StructType( Array( StructField("id", IntegerType, ture), StructField("name", StringType, true), StructField("age", IntegerType, true) )) // 将RDD转换成DataFrame val stuDF = sqlContext.createDataFrame(rowRDD, schema) stuDF.registerTempTable("student") val resultDF = sqlContext.sql("select id, name form student") resultDF.show()
5. 数据源
-
通过Load/Save函数。读取json文件时不能有空格和换行符
-
将数据读取
dataFrame.write.format("json").save(hdfspath)
或sqlContext.read.format
-
read.format write.format
支持格式 1. parquet(默认) 2. json 3.jdbc 4.csv(需要扩展jar包)
def genericLoadAndSave(): Unit = { val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //load val df = sqlContext.read.json(path="....") // df.show() df.write.save(path = "...") // 手动指定 读取Jso val df1 = sqlContext.read .format("josn") .load(path = "...") df1.write .format("parquet") .mode(SaveMode.ErrorIfExists) .save(path = "...") }
-
-
saveMode 四种模式
"error"
或SaveMode.ErrorIfExists
如果存在就报错"append"
或SaveMode.Append
追加"overwrite"
或SaveMode.Overwrite
覆盖"ignore"
或SaveMode.Ignore
如果存在就忽略
-
Run SQL on files directly 直接使用SQL查询文件
val df = sqlContext .sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") val df = sqlContext .sql("SELECT * FROM json.`examples/src/main/resources/users.json`")
-
Parquet数据加载
val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val parquetFile = sqlContext.read .parquet(path = "users.parquet") parquetFile.printSchema() parquetFile.registerTempTable(tableName = "t_user") val resutDF = sqlContext.sql(sqlText="select name,favorite_numbers from t_user") resutDF.show()
-
Parquet合并元数据
// 创建学生基本信息(name, age) val stuInfo = Array(("MAX", 18), ("Mike", 20), ("Bob", 25)) val stuInfoDF = sc.parallelize(stuInfo).toDF("name", "age") stuInfoDF.write .mode(saveMode = "append") // JSON parquet JDBC .format(source = "parquet") .save(path = "...\\parquet2") // 创建学生成绩信息 val stuScore = Array(("MAX", 90), ("Mike", 75), ("Bob", 88)) val stuScoreDF = sc.parallelize(stuScore).toDF("name", "source") stuScoreDF.write .mode(saveMode = "append") .format(source = "parquet") .save(path = "...\\parquet2") val stuMergeDF = sqlContext.read .option("mergeSchema", "true") .parquet(path = "...\\parquet2") stuMergeDF.show()
-
parquet 分区推断
hdfs dfs -mkdir -p /user/root/parquet/gender=male/country=CN hdfs dfs -put *.parquet /user/root/parquet/gender=male/country=CN val parquetFile = sqlContext.read .parquet("hdfs://../gender=male/country=CN/users.parquet") parquetFile.show()
-
复杂Json数据源格式处理
val conf = new SparkConf() .setAppName("SparkSqlContext") .setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val schema = new StructType() .add("type", StringType, true) .add("version", StringType, true) .add("data", MapType(StringType, new StructType() .add("title", StringType, true) .add("id", IntegerType, true) .add("key", StringType, true) .add("name", StringType, true) )) val df = sqlContext.read.schema(schema).json("...\\champion_info_data.json") df.select(df("type"),df("version"),explode(df("data"))).show()
-
CSV数据源
// 读取csv文件需要依赖 commons-csv-1.6.jar spark-csv_2.11-1.4.0.jar val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df1 = sqlContext.read .option("header", "true") .option("InferSchema", "true") .format("com.databricks.spark.csv") .load(path="xx.csv")
-
Hive数据源
val sc = new SparkContext(conf) val hiveContext = new SQLContext(sc) hiveContext.sql("CREATE TABLE IF NOT EXISTS work.src (key INT, value STRING)") hiveContext.sql("LOAD DATA INPATH '/user/root/kv1.txt' INTO TABLE work.src ") hiveContext.sql("FROM work.src SELECT key, value").show()
-
JDBC数据源
val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // 第一种方式 val jdbcDF = sqlContext.read .format("jdbc") .options( Map( "url" ->"jdbc:mysql://localhost:3306/test", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "salary", "user" -> "root", "password" -> "root" ) ).load() // 第二种方式 val pro = new Properties() pro.setProperty("user", "root") pro.setProperty("password", "root") val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/test","salary", pro)
6. 性能优化
Caching Data In Memory
可以将tables缓存在内存中,通过sqlcontext.cacheTable("tableName")
or dataFrame.cache()
。然后SparkSQL将只扫描所需要的列并,并自动调整压力以缓解GC和内存的压力。也可以通过sqlContext.uncacheTable("table")
将表移出内存。
默认参数如下
Property Name | Default | Meaning |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 自动启动压缩 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列缓存批次的大小。大小越大可以提高内存的利用率和压缩效率,可以会出现OOM异常。 |