| 
                         真正写数据的操作是 txn.writeFiles 函数执行的,具体实现如下: 
- def writeFiles( 
 -       data: Dataset[_], 
 -       writeOptions: Option[DeltaOptions], 
 -       isOptimize: Boolean): Seq[AddFile] = { 
 -     hasWritten = true 
 -   
 -     val spark = data.sparkSession 
 -     val partitionSchema = metadata.partitionSchema 
 -     val outputPath = deltaLog.dataPath 
 -   
 -     val (queryExecution, output) = normalizeData(data, metadata.partitionColumns) 
 -     val partitioningColumns = 
 -       getPartitioningColumns(partitionSchema, output, output.length < data.schema.size) 
 -   
 -     // 获取 DelayedCommitProtocol,里面可以设置写文件的名字, 
 -     // commitTask 和 commitJob 等做一些事情 
 -     val committer = getCommitter(outputPath) 
 -   
 -     val invariants = Invariants.getFromSchema(metadata.schema, spark) 
 -   
 -     SQLExecution.withNewExecutionId(spark, queryExecution) { 
 -       val outputSpec = FileFormatWriter.OutputSpec( 
 -         outputPath.toString, 
 -         Map.empty, 
 -         output) 
 -   
 -       val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants) 
 -   
 -       FileFormatWriter.write( 
 -         sparkSession = spark, 
 -         plan = physicalPlan, 
 -         fileFormat = snapshot.fileFormat, 
 -         committer = committer, 
 -         outputSpec = outputSpec, 
 -         hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), 
 -         partitionColumns = partitioningColumns, 
 -         bucketSpec = None, 
 -         statsTrackers = Nil, 
 -         options = Map.empty) 
 -     } 
 -   
 -     // 返回新增的文件 
 -     committer.addedStatuses 
 - } 
 
  
Delta Lake 写操作最终调用 Spark 的 FileFormatWriter.write 方法进行的,通过这个方法的复用将我们真正的数据写入到  Delta Lake 表里面去了。                         (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |