| 
                         (4) 统计需求对应的SQL 
- SELECT o.currency, o.amount, r.rate 
 - o.amount * r.rate AS yen_amount 
 - FROM 
 - Orders AS o, 
 - LATERAL TABLE (Rates(o.rowtime)) AS r 
 - WHERE r.currency = o.currency 
 
  
(5) 预期结果 
  
4. Without connnector 实现代码 
- object TemporalTableJoinTest { 
 - def main(args: Array[String]): Unit = { 
 - val env = StreamExecutionEnvironment.getExecutionEnvironment 
 - val tEnv = TableEnvironment.getTableEnvironment(env) 
 - env.setParallelism(1) 
 - // 设置时间类型是 event-time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
 - // 构造订单数据 
 - val ordersData = new mutable.MutableList[(Long, String, Timestamp)] 
 - ordersData.+=((2L, "Euro", new Timestamp(2L))) 
 - ordersData.+=((1L, "US Dollar", new Timestamp(3L))) 
 - ordersData.+=((50L, "Yen", new Timestamp(4L))) 
 - ordersData.+=((3L, "Euro", new Timestamp(5L))) 
 -  
 - //构造汇率数据 
 - val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)] 
 - ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L))) 
 - ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L))) 
 - ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L))) 
 - ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L))) 
 - ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L))) 
 -  
 - // 进行订单表 event-time 的提取 
 - val orders = env 
 - .fromCollection(ordersData) 
 - .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]()) 
 - .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime) 
 -  
 - // 进行汇率表 event-time 的提取 
 - val ratesHistory = env 
 - .fromCollection(ratesHistoryData) 
 - .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]()) 
 - .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime) 
 -  
 - // 注册订单表和汇率表 
 - tEnv.registerTable("Orders", orders) 
 - tEnv.registerTable("RatesHistory", ratesHistory) 
 - val tab = tEnv.scan("RatesHistory"); 
 - // 创建TemporalTableFunction 
 - val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency) 
 - //注册TemporalTableFunction 
 - tEnv.registerFunction("Rates",temporalTableFunction) 
 -  
 - val SQLQuery = 
 - """ 
 - |SELECT o.currency, o.amount, r.rate, 
 - | o.amount * r.rate AS yen_amount 
 - |FROM 
 - | Orders AS o, 
 - | LATERAL TABLE (Rates(o.rowtime)) AS r 
 - |WHERE r.currency = o.currency 
 - |""".stripMargin 
 -  
 - tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery)) 
 -  
 - val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
 - // 打印查询结果 
 - result.print() 
 - env.execute() 
 - } 
 - } 
 
                          (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |