之前在网上看到这样一个例子
https://github.com/wtanaka/streaming/blob/d2e63184f604ea42d9711dd59951af3f3f4200f9/beam/src/main/java/com/wtanaka/streaming/beam/KafkaToKafka.java
但是从代码的设计上看对flink kafka存在依赖,而这样编写代码我觉得违背了apache beam的初衷,下面我自己写了一个很简单的例子,从kafka input-topic读取数据,写入kafka output-topic。
这里我只是做一个简单测试,所以使用的都是单机模式的kafka、zookeeper和flink
公共参数
|
|
创建option
|
|
KafkaIO Read
|
|
KafkaIO Write
|
|
完整代码
|
|
这里注释掉的updateProducerProperties方法,可以用于设置kafka写入时所需的参数
运行任务
|
|
使用-P指定runner,这里指定flink-runner作为运行器
流程验证
flink运行状态检查
kafka数据输入输出检查
参考链接:
https://beam.apache.org/documentation/sdks/javadoc/2.0.0/
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java