| 
                         Delta Lake 所有的更新操作都是在事务中进行的,deltaLog.withNewTransaction  就是一个事务,withNewTransaction 的实现如下: 
- def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { 
 -   try { 
 -     // 更新当前表事务日志的快照 
 -     update() 
 -     // 初始化乐观事务锁对象 
 -     val txn = new OptimisticTransaction(this) 
 -     // 开启事务 
 -     OptimisticTransaction.setActive(txn) 
 -     // 执行写数据操作 
 -     thunk(txn) 
 -   } finally { 
 -     // 关闭事务 
 -     OptimisticTransaction.clearActive() 
 -   } 
 - } 
 
  
在开启事务之前,需要更新当前表事务的快照,因为在执行写数据之前,这张表可能已经被修改了,执行 update  操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 就是需要执行的事务操作,对应  deltaLog.withNewTransaction 里面的所有代码。 
我们回到上面的 run 方法。val actions = write(txn, sparkSession) 就是执行写数据的操作,它的实现如下: 
-   def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { 
 -     import sparkSession.implicits._ 
 -     // 如果不是第一次往表里面写数据,需要判断写数据的模式是否符合条件 
 -     if (txn.readVersion > -1) { 
 -       // This table already exists, check if the insert is valid. 
 -       if (mode == SaveMode.ErrorIfExists) { 
 -         throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath) 
 -       } else if (mode == SaveMode.Ignore) { 
 -         return Nil 
 -       } else if (mode == SaveMode.Overwrite) { 
 -         deltaLog.assertRemovable() 
 -       } 
 -     } 
 -   
 -     // 更新表的模式,比如是否覆盖现有的模式,是否和现有的模式进行 merge 
 -     updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation) 
 -   
 -     // 是否定义分区过滤条件 
 -     val replaceWhere = options.replaceWhere 
 -     val partitionFilters = if (replaceWhere.isDefined) { 
 -       val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get) 
 -       if (mode == SaveMode.Overwrite) { 
 -         verifyPartitionPredicates( 
 -           sparkSession, txn.metadata.partitionColumns, predicates) 
 -       } 
 -       Some(predicates) 
 -     } else { 
 -       None 
 -     } 
 -   
 -     // 第一次写数据初始化事务日志的目录 
 -     if (txn.readVersion < 0) { 
 -       // Initialize the log path 
 -       deltaLog.fs.mkdirs(deltaLog.logPath) 
 -     } 
 -   
 -     // 写数据到文件系统中 
 -     val newFiles = txn.writeFiles(data, Some(options)) 
 -       
 -     val deletedFiles = (mode, partitionFilters) match { 
 -        // 全量覆盖,直接拿出缓存在内存中最新事务日志快照里面的所有 AddFile 文件 
 -       case (SaveMode.Overwrite, None) => 
 -         txn.filterFiles().map(_.remove) 
 -       // 从事务日志快照中获取对应分区里面的所有 AddFile 文件 
 -       case (SaveMode.Overwrite, Some(predicates)) => 
 -         // Check to make sure the files we wrote out were actually valid. 
 -         val matchingFiles = DeltaLog.filterFileList( 
 -           txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect() 
 -         val invalidFiles = newFiles.toSet -- matchingFiles 
 -         if (invalidFiles.nonEmpty) { 
 -           val badPartitions = invalidFiles 
 -             .map(_.partitionValues) 
 -             .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") } 
 -             .mkString(", ") 
 -           throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions) 
 -         } 
 -   
 -         txn.filterFiles(predicates).map(_.remove) 
 -       case _ => Nil 
 -     } 
 -   
 -     newFiles ++ deletedFiles 
 -   } 
 - } 
 
  
如果 txn.readVersion == -1,说明是第一次写数据到 Delta Lake 表,所以当这个值大于 -1  的时候,需要判断一下写数据的操作是否合法。 
由于 Delta Lake 底层使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,这就是 updateMetadata  函数对应的操作。 
因为 Delta Lake 表支持分区,所以我们可能在写数据的时候指定某个分区进行覆盖。                         (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |