| 
                         TextFile DataFrame 
- import.org.apache.spark.sql._ 
 - //定义数据的列名称和类型 
 - valdt=StructType(List(id:String,name:String,gender:String,age:Int)) 
 -  
 - //导入user_info.csv文件并指定分隔符 
 - vallines = sc.textFile("/path/user_info.csv").map(_.split(",")) 
 -  
 - //将表结构和数据关联起来,把读入的数据user.csv映射成行,构成数据集 
 - valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt)) 
 -  
 - //通过SparkSession.createDataFrame()创建表,并且数据表表头 
 - val df= spark.createDataFrame(rowRDD, dt) 
 
  
读取规则数据文件作为DataFrame 
- SparkSession.Builder builder = SparkSession.builder() 
 - Builder.setMaster("local").setAppName("TestSparkSQLApp") 
 - SparkSession spark = builder.getOrCreate(); 
 - SQLContext sqlContext = spark.sqlContext(); 
 -  
 - # 读取 JSON 数据,path 可为文件或者目录 
 - valdf=sqlContext.read().json(path); 
 -  
 - # 读取 HadoopParquet 文件 
 - vardf=sqlContext.read().parquet(path); 
 -  
 - # 读取 HadoopORC 文件 
 - vardf=sqlContext.read().orc(path); 
 
  
JSON 文件为每行一个 JSON 对象的文件类型,行尾无须逗号。文件头也无须[]指定为数组;SparkSQL 读取是只是按照每行一条 JSON  Record序列化; 
Parquet文件 
- Configurationconfig = new Configuration(); 
 - ParquetFileReaderreader = ParquetFileReader.open( 
 -  HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf)); 
 - Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData(); 
 - String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata"); 
 
  
allFiedls 的值就是各字段的名称和具体的类型,整体是一个json格式进行展示。 
读取 Hive 表作为 DataFrame 
Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式创建 SparkContext。  Builder.getOrCreate() 用于创建 SparkSession,SparkSession 是 SparkContext 的封装。 
在Spark1.6中有两个核心组件SQLcontext和HiveContext。SQLContext 用于处理在 SparkSQL  中动态注册的表,HiveContext 用于处理 Hive 中的表。 
从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql  即可执行 Hive 中的表,也可执行内部注册的表; 
在需要执行 Hive 表时,只需要在 SparkSession.Builder 中开启 Hive  支持即可(enableHiveSupport())。 
- SparkSession.Builder builder = SparkSession.builder().enableHiveSupport(); 
 - SparkSession spark = builder.getOrCreate(); 
 - SQLContext sqlContext = spark.sqlContext(); 
 
  
// db 指 Hive 库中的数据库名,如果不写默认为 default 
// tableName 指 hive 库的数据表名 
- sqlContext.sql(“select * from db.tableName”) 
 
  
SparkSQL ThriftServer 
//首先打开 Hive 的 Metastore服务 
- hive$bin/hive –-service metastore –p 8093 
 
  
//把 Spark 的相关 jar 上传到hadoophdfs指定目录,用于指定sparkonyarn的依赖 jar 
- spark$hadoop fs –put jars/*.jar /lib/spark2 
 
  
// 启动 spark thriftserver 服务 
- spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf  
 - spark.yarn.jars=hdfs:///lib/spark2/*.jar 
 
  
当hdfs 上传了spark 依赖 jar 时,通过spark.yarn.jars 可看到日志 spark 无须每个job  都上传jar,可节省启动时间 
- 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar 
 - 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar 
 
  
                        (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |