| 
                         执行之后当前表信息会更新并在历史表里面产生一条历史信息,如下: 
  
注意当前表的SysStartTime意见发生了变化,历史表产生了一条记录,SyStartTIme是原当前表记录的SysStartTime,SysEndTime是当前表记录的SystemStartTime。我们再更新一次: 
- UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10 
 
  
  
到这里我们了解到SQLServer里面关于Temporal  Table的逻辑是有当前表和历史表来存储数据,并且数据库内部以StartTime和EndTime的方式管理数据的版本。 
(3) SELECT 
- SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime] 
 - FROM [dbo].[Department] 
 - FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ; 
 
  
  
SELECT语句查询的是Department的表,实际返回的数据是从历史表里面查询出来的,查询的底层逻辑就是 SysStartTime <=  '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000'  。 
四、Apache Flink Temporal Table 
我们不止一次的提到Apache Flink遵循ANSI-SQL标准,Apache Flink中Temporal  Table的概念也源于ANSI-2011的标准语义,但目前的实现在语法层面和ANSI-SQL略有差别,上面看到ANSI-2011中使用FOR  SYSTEM_TIME AS OF的语法,目前Apache Flink中使用 LATERAL  TABLE(TemporalTableFunction)的语法。这一点后续需要推动社区进行改进。 
1. 为啥需要 Temporal Table 
我们以具体的查询示例来说明为啥需要Temporal Table,假设我们有一张实时变化的汇率表(RatesHistory),如下: 
  
RatesHistory代表了Yen汇率(Yen汇率为1),是不断变化的Append  only的汇率表。例如,Euro兑Yen汇率从09:00至10:45的汇率为114。从10点45分到11点15分是116。 
假设我们想在10:58输出所有当前汇率,我们需要以下SQL查询来计算结果表: 
- SELECT * 
 - FROM RatesHistory AS r 
 - WHERE r.rowtime = ( 
 - SELECT MAX(rowtime) 
 - FROM RatesHistory AS r2 
 - WHERE rr2.currency = r.currency 
 - AND r2.rowtime <= '10:58'); 
 
  
相应Flink代码如下: 
    - 定义数据源-genRatesHistorySource
 
     - def genRatesHistorySource: CsvTableSource = { 
 -  
 - val csvRecords = Seq( 
 - "rowtime ,currency ,rate", 
 - "09:00:00 ,US Dollar , 102", 
 - "09:00:00 ,Euro , 114", 
 - "09:00:00 ,Yen , 1", 
 - "10:45:00 ,Euro , 116", 
 - "11:15:00 ,Euro , 119", 
 - "11:49:00 ,Pounds , 108" 
 - ) 
 - // 测试数据写入临时文件 
 - val tempFilePath = 
 - writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
 -  
 - // 创建Source connector 
 - new CsvTableSource( 
 - tempFilePath, 
 - Array("rowtime","currency","rate"), 
 - Array( 
 - Types.STRING,Types.STRING,Types.STRING 
 - ), 
 - fieldDelim = ",", 
 - rowDelim = "$", 
 - ignoreFirstLine = true, 
 - ignoreComments = "%" 
 - ) 
 - } 
 - def writeToTempFile( 
 - contents: String, 
 - filePrefix: String, 
 - fileSuffix: String, 
 - charset: String = "UTF-8"): String = { 
 - val tempFile = File.createTempFile(filePrefix, fileSuffix) 
 - val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset) 
 - tmpWriter.write(contents) 
 - tmpWriter.close() 
 - tempFile.getAbsolutePath} 
 
  
    主程序代码
    - def main(args: Array[String]): Unit = { 
 - // Streaming 环境 
 - val env = StreamExecutionEnvironment.getExecutionEnvironment 
 - val tEnv = TableEnvironment.getTableEnvironment(env) 
 -  
 - //方便我们查出输出数据 
 - env.setParallelism(1) 
 -  
 - val sourceTableName = "RatesHistory" 
 - // 创建CSV source数据结构 
 - val tableSource = CsvTableSourceUtils.genRatesHistorySource 
 - // 注册source 
 - tEnv.registerTableSource(sourceTableName, tableSource) 
 -  
 - // 注册retract sink 
 - val sinkTableName = "retractSink" 
 - val fieldNames = Array("rowtime", "currency", "rate") 
 - val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING) 
 -  
 - tEnv.registerTableSink( 
 - sinkTableName, 
 - fieldNames, 
 - fieldTypes, 
 - new MemoryRetractSink) 
 -  
 - val SQL = 
 - """ 
 - |SELECT * 
 - |FROM RatesHistory AS r 
 - |WHERE r.rowtime = ( 
 - | SELECT MAX(rowtime) 
 - | FROM RatesHistory AS r2 
 - | WHERE rr2.currency = r.currency 
 - | AND r2.rowtime <= '10:58:00' ) 
 - """.stripMargin 
 -  
 - // 执行查询 
 - val result = tEnv.SQLQuery(SQL) 
 -  
 - // 将结果插入sink 
 - result.insertInto(sinkTableName) 
 - env.execute() 
 - } 
 
  
    执行结果如下图:
      
                        (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |