| 
                         除了看日志,我们可以用命令显示的查询我们是否成功的创建了flink-topic,如下: 
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181 
 -  
 - flink-tipic 
 
  
如果输出flink-tipic那么说明我们的Topic成功创建了。 
那么Topic是保存在哪里?Kafka是怎样进行消息的发布和订阅的呢?为了直观,我们看如下Kafka架构示意图简单理解一下: 
  
简单介绍一下,Kafka利用ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka  Server 实例,Kafka  Server叫做Broker,我们创建的Topic可以在一个或多个Broker中。Kafka利用Push模式发送消息,利用Pull方式拉取消息。 
3. 发送消息 
如何向已经存在的Topic中发送消息呢,当然我们可以API的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下: 
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic 
 - >Kafka test msg 
 - >Kafka connector 
 
  
上面我们发送了两条消息Kafka test msg 和 Kafka connector 到 flink-topic Topic中。 
4. 读取消息 
如果读取指定Topic的消息呢?同样可以API和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下: 
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning 
 - Kafka test msg 
 - Kafka connector 
 
  
其中--from-beginning 描述了我们从Topic开始位置读取消息。 
三、Flink Kafka Connector 
前面我们以最简单的方式安装了Kafka环境,那么我们以上面的环境介绍Flink Kafka Connector的使用。Flink  Connector相关的基础知识会在《Apache Flink 漫谈系列(14) - Connectors》中介绍,这里我们直接介绍与Kafka  Connector相关的内容。 
Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例进行介绍。 
1. mvn 依赖 
要使用Kakfa Connector需要在我们的pom中增加对Kafka Connector的依赖,如下: 
- <dependency> 
 - <groupId>org.apache.flink</groupId> 
 - <artifactId>flink-connector-kafka_2.11</artifactId> 
 - <version>1.7.0</version> 
 - </dependency> 
 
  
Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。  DeserializationSchema允许用户指定这样的模式。 为每个Kafka消息调用 T deserialize(byte []  message)方法,从Kafka传递值。 
2. Examples 
我们示例读取Kafka的数据,再将数据做简单处理之后写入到Kafka中。我们需要再创建一个用于写入的Topic,如下: 
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output 
 
  
所以示例中我们Source利用flink-topic, Sink用slink-topic-output。 
(1) Simple ETL 
我们假设Kafka中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serialize和deserialize的实现,也就是我们要定义一个实现DeserializationSchema和SerializationSchema  的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写Flink主程序。 
- import org.apache.flink.api.common.serialization.DeserializationSchema; 
 - import org.apache.flink.api.common.serialization.SerializationSchema; 
 - import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
 - import org.apache.flink.api.common.typeinfo.TypeInformation; 
 - import org.apache.flink.util.Preconditions; 
 -  
 - import java.io.IOException; 
 - import java.io.ObjectInputStream; 
 - import java.io.ObjectOutputStream; 
 - import java.nio.charset.Charset; 
 -  
 - public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> { 
 -     private static final long serialVersionUID = 1L; 
 -     private transient Charset charset; 
 -  
 -     public KafkaMsgSchema() { 
 - // 默认UTF-8编码 
 -         this(Charset.forName("UTF-8")); 
 -     } 
 -  
 -     public KafkaMsgSchema(Charset charset) { 
 -         this.charset = Preconditions.checkNotNull(charset); 
 -     } 
 -  
 -     public Charset getCharset() { 
 -         return this.charset; 
 -     } 
 -  
 -     public String deserialize(byte[] message) { 
 - // 将Kafka的消息反序列化为java对象 
 -         return new String(message, charset); 
 -     } 
 -  
 -     public boolean isEndOfStream(String nextElement) { 
 - // 流永远不结束 
 -         return false; 
 -     } 
 -  
 -     public byte[] serialize(String element) { 
 - // 将java对象序列化为Kafka的消息 
 -         return element.getBytes(this.charset); 
 -     } 
 -  
 -     public TypeInformation<String> getProducedType() { 
 - // 定义产生的数据Typeinfo 
 -         return BasicTypeInfo.STRING_TYPE_INFO; 
 -     } 
 -  
 -     private void writeObject(ObjectOutputStream out) throws IOException { 
 -         out.defaultWriteObject(); 
 -         out.writeUTF(this.charset.name()); 
 -     } 
 -  
 -     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 
 -         in.defaultReadObject(); 
 -         String charsetName = in.readUTF(); 
 -         this.charset = Charset.forName(charsetName); 
 -     } 
 - } 
 
  
    主程序 - 完整代码
    - import org.apache.flink.api.common.functions.MapFunction; 
 - import org.apache.flink.api.java.utils.ParameterTool; 
 - import org.apache.flink.streaming.api.datastream.DataStream; 
 - import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
 - import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
 - import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; 
 - import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; 
 -  
 - import java.util.Properties; 
 -  
 - public class KafkaExample { 
 -     public static void main(String[] args) throws Exception { 
 -         // 用户参数获取 
 -         final ParameterTool parameterTool = ParameterTool.fromArgs(args); 
 -         // Stream 环境 
 -         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
 -  
 -         // Source的topic 
 -         String sourceTopic = "flink-topic"; 
 -         // Sink的topic 
 -         String sinkTopic = "flink-topic-output"; 
 -         // broker 地址 
 -         String broker = "localhost:9092"; 
 -  
 -         // 属性参数 - 实际投产可以在命令行传入 
 -         Properties p = parameterTool.getProperties(); 
 -         p.putAll(parameterTool.getProperties()); 
 -         p.put("bootstrap.servers", broker); 
 -  
 -         env.getConfig().setGlobalJobParameters(parameterTool); 
 -  
 -         // 创建消费者 
 -         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>( 
 -                 sourceTopic, 
 -                 new KafkaMsgSchema(), 
 -                 p); 
 -         // 设置读取最早的数据 
 - //        consumer.setStartFromEarliest(); 
 -  
 -         // 读取Kafka消息 
 -         DataStream<String> input = env.addSource(consumer); 
 -  
 -  
 -         // 数据处理 
 -         DataStream<String> result = input.map(new MapFunction<String, String>() { 
 -             public String map(String s) throws Exception { 
 -                 String msg = "Flink study ".concat(s); 
 -                 System.out.println(msg); 
 -                 return msg; 
 -             } 
 -         }); 
 -  
 -         // 创建生产者 
 -         FlinkKafkaProducer producer = new FlinkKafkaProducer<String>( 
 -                 sinkTopic, 
 -                 new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()), 
 -                 p, 
 -                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); 
 -  
 -         // 将数据写入Kafka指定Topic中 
 -         result.addSink(producer); 
 -  
 -         // 执行job 
 -         env.execute("Kafka Example"); 
 -     } 
 - } 
 
  
                        (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |