| 
                         要使用内置的Schemas需要添加如下依赖: 
- <dependency> 
 - <groupId>org.apache.flink</groupId> 
 - <artifactId>flink-avro</artifactId> 
 - <version>1.7.0</version> 
 - </dependency> 
 
  
(3) 读取位置配置 
我们在消费Kafka数据时候,可能需要指定消费的位置,Apache Flink  的FlinkKafkaConsumer提供很多便利的位置设置,如下: 
    - consumer.setStartFromEarliest() - 从最早的记录开始;
 
    - consumer.setStartFromLatest() - 从最新记录开始;
 
    - consumer.setStartFromTimestamp(...); // 从指定的epoch时间戳(毫秒)开始;
 
    - consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。
 
 
上面的位置指定可以精确到每个分区,比如如下代码: 
- Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); 
 - specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始 
 - specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始 
 - specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始 
 -  
 - consumer.setStartFromSpecificOffsets(specificStartOffsets); 
 
  
对于没有指定的分区还是默认的setStartFromGroupOffsets方式。 
(4) Topic发现 
Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer,比如: 
- // 创建消费者 
 - FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")), 
 - new KafkaMsgSchema(), 
 - p); 
 
  
在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以sourceTopic的值开头并以单个数字结尾)。 
3. 定义Watermark(Window) 
对Kafka  Connector的应用不仅限于上面的简单数据提取,我们更多时候是期望对Kafka数据进行Event-time的窗口操作,那么就需要在Flink Kafka  Source中定义Watermark。 
要定义Event-time,首先是Kafka数据里面携带时间属性,假设我们数据是String#Long的格式,如only for  test#1000。那么我们将Long作为时间列。 
    - KafkaWithTsMsgSchema - 完整代码
 
 
要想解析上面的Kafka的数据格式,我们需要开发一个自定义的Schema,比如叫KafkaWithTsMsgSchema,将String#Long解析为一个Java的Tuple2 
- import org.apache.flink.api.common.serialization.DeserializationSchema; 
 - import org.apache.flink.api.common.serialization.SerializationSchema; 
 - import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
 - import org.apache.flink.api.common.typeinfo.TypeInformation; 
 - import org.apache.flink.api.java.tuple.Tuple2; 
 - import org.apache.flink.api.java.typeutils.TupleTypeInfo; 
 - import org.apache.flink.util.Preconditions; 
 -  
 - import java.io.IOException; 
 - import java.io.ObjectInputStream; 
 - import java.io.ObjectOutputStream; 
 - import java.nio.charset.Charset; 
 -  
 - public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> { 
 -     private static final long serialVersionUID = 1L; 
 -     private transient Charset charset; 
 -  
 -     public KafkaWithTsMsgSchema() { 
 -         this(Charset.forName("UTF-8")); 
 -     } 
 -  
 -     public KafkaWithTsMsgSchema(Charset charset) { 
 -         this.charset = Preconditions.checkNotNull(charset); 
 -     } 
 -  
 -     public Charset getCharset() { 
 -         return this.charset; 
 -     } 
 -  
 -     public Tuple2<String, Long> deserialize(byte[] message) { 
 -         String msg = new String(message, charset); 
 -         String[] dataAndTs = msg.split("#"); 
 -         if(dataAndTs.length == 2){ 
 -             return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim())); 
 -         }else{ 
 -             // 实际生产上需要抛出runtime异常 
 -             System.out.println("Fail due to invalid msg format.. ["+msg+"]"); 
 -             return new Tuple2<String, Long>(msg, 0L); 
 -         } 
 -     } 
 -  
 -     @Override 
 -     public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) { 
 -         return false; 
 -     } 
 -  
 -     public byte[] serialize(Tuple2<String, Long> element) { 
 -         return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset); 
 -     } 
 -  
 -     private void writeObject(ObjectOutputStream out) throws IOException { 
 -         out.defaultWriteObject(); 
 -         out.writeUTF(this.charset.name()); 
 -     } 
 -  
 -     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 
 -         in.defaultReadObject(); 
 -         String charsetName = in.readUTF(); 
 -         this.charset = Charset.forName(charsetName); 
 -     } 
 -  
 -     @Override 
 -     public TypeInformation<Tuple2<String, Long>> getProducedType() { 
 -         return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); 
 -     }} 
 
  
                        (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |