| 
                         在运行上面代码之前需要注意上面代码中对EventTime时间提取的过程,也就是说Apache  Flink的TimeCharacteristic.EventTime  模式,需要调用assignTimestampsAndWatermarks方法设置EventTime的生成方式,这种方式也非常灵活,用户可以控制业务数据的EventTime的值和WaterMark的产生,WaterMark相关内容可以查阅《Apache  Flink 漫谈系列(03) - Watermark》。 在本示例中提取EventTime的完整代码如下: 
- import java.SQL.Timestamp 
 -  
 - import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
 - import org.apache.flink.streaming.api.windowing.time.Time 
 -  
 - class OrderTimestampExtractor[T1, T2] 
 - extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
 - override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
 - element._3.getTime 
 - } 
 - } 
 
  
查看运行结果: 
  
5. With CSVConnector 实现代码 
在实际的生产开发中,都需要实际的Connector的定义,下面我们以CSV格式的Connector定义来开发Temporal Table JOIN  Demo。 
(1) genEventRatesHistorySource 
- def genEventRatesHistorySource: CsvTableSource = { 
 -  
 - val csvRecords = Seq( 
 - "ts#currency#rate", 
 - "1#US Dollar#102", 
 - "1#Euro#114", 
 - "1#Yen#1", 
 - "3#Euro#116", 
 - "5#Euro#119", 
 - "7#Pounds#108" 
 - ) 
 - // 测试数据写入临时文件 
 - val tempFilePath = 
 - FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp") 
 -  
 - // 创建Source connector 
 - new CsvTableSource( 
 - tempFilePath, 
 - Array("ts","currency","rate"), 
 - Array( 
 - Types.LONG,Types.STRING,Types.LONG 
 - ), 
 - fieldDelim = "#", 
 - rowDelim = CommonUtils.line, 
 - ignoreFirstLine = true, 
 - ignoreComments = "%" 
 - )} 
 
  
(2) genRatesOrderSource 
- def genRatesOrderSource: CsvTableSource = { 
 -  
 - val csvRecords = Seq( 
 - "ts#currency#amount", 
 - "2#Euro#10", 
 - "4#Euro#10" 
 - ) 
 - // 测试数据写入临时文件 
 - val tempFilePath = 
 - FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp") 
 -  
 - // 创建Source connector 
 - new CsvTableSource( 
 - tempFilePath, 
 - Array("ts","currency", "amount"), 
 - Array( 
 - Types.LONG,Types.STRING,Types.LONG 
 - ), 
 - fieldDelim = "#", 
 - rowDelim = CommonUtils.line, 
 - ignoreFirstLine = true, 
 - ignoreComments = "%" 
 - ) 
 - } 
 
  
                        (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |