| 
                         我们构造一个只包含一个data字段的用户表,用户表数据如下: 
  
查询的需求是将data字段flatten成为name和age两个字段的表,期望得到: 
  
我们以ITCase方式完成如上查询需求,完整代码如下: 
- @Test 
 - def testLateralTVF(): Unit = { 
 - val env = StreamExecutionEnvironment.getExecutionEnvironment 
 - val tEnv = TableEnvironment.getTableEnvironment(env) 
 - env.setStateBackend(getStateBackend) 
 - StreamITCase.clear 
 -  
 - val userData = new mutable.MutableList[(String)] 
 - userData.+=(("Sunny#8")) 
 - userData.+=(("Kevin#36")) 
 - userData.+=(("Panpan#36")) 
 -  
 - val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)" 
 -  
 - val users = env.fromCollection(userData).toTable(tEnv, 'data) 
 -  
 - val tvf = new SplitTVF() 
 - tEnv.registerTable("userTab", users) 
 - tEnv.registerFunction("splitTVF", tvf) 
 -  
 - val result = tEnv.SQLQuery(SQLQuery).toAppendStream[Row] 
 - result.addSink(new StreamITCase.StringSink[Row]) 
 - env.execute() 
 - StreamITCase.testResults.foreach(println(_)) 
 - } 
 
  
运行结果: 
  
上面的核心语句是: 
- val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)" 
 
  
如果大家想运行上面的示例,请查阅《Apache Flink 漫谈系列 - SQL概览》中 源码方式 搭建测试环境。 
六、小结 
本篇重点向大家介绍了一种新的JOIN类型 - JOIN LATERAL。并向大家介绍了SQL Server中对LATERAL的支持方式,详细分析了JOIN  LATERAL和INNER JOIN的区别与联系,最后切入到Apache Flink中,以UDTF示例说明了Apache Flink中对JOIN  LATERAL的支持,后续篇章会介绍Apache Flink中另一种使用LATERAL的场景,就是Temporal JION,Temporal  JION也是一种新的JOIN类型,我们下一篇再见! 
关于点赞和评论 
本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家! 
作者:孙金城,花名 金竹,目前就职于阿里巴巴,自2015年以来一直投入于基于Apache Flink的阿里巴巴计算平台Blink的设计研发工作。 
【本文为51CTO专栏作者“金竹”原创稿件,转载请联系原作者】 
【编辑推荐】 
    - Apache Flink 漫谈系列 - Fault Tolerance
 
    - Apache Flink 漫谈系列 - 流表对偶(duality)性
 
    - Apache Flink 漫谈系列 - 持续查询(Continuous Queries)
 
    - Apache Flink 漫谈系列 - SQL概览
 
    - Apache Flink 漫谈系列 - JOIN 算子
 
 
【责任编辑:赵宁宁 TEL:(010)68476606】 
			点赞 0                        (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |