Apache Beam IO - KafkaIO

之前在网上看到这样一个例子
https://github.com/wtanaka/streaming/blob/d2e63184f604ea42d9711dd59951af3f3f4200f9/beam/src/main/java/com/wtanaka/streaming/beam/KafkaToKafka.java
但是从代码的设计上看对flink kafka存在依赖,而这样编写代码我觉得违背了apache beam的初衷,下面我自己写了一个很简单的例子,从kafka input-topic读取数据,写入kafka output-topic。
这里我只是做一个简单测试,所以使用的都是单机模式的kafka、zookeeper和flink

公共参数

1
2
3
4
5
6
7
8
9
10
private static final long WINDOW_SIZE = 10;
// Default window duration in seconds
private static final long SLIDE_SIZE = 5;
// Default window slide in seconds
private static final String KAFKA_TOPIC = "input-topic";
private static final String KAFKA_OUTPUT_TOPIC = "output-topic";
private static final String KAFKA_BROKER = "localhost:9092";
// Default kafka broker to contact
private static final String GROUP_ID = "beamGroup"; // Default groupId
private static final String ZOOKEEPER = "localhost:2181";

创建option

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public interface KafkaOptions extends PipelineOptions {
@Description("Sliding window duration, in seconds")
@Default.Long(WINDOW_SIZE)
Long getWindowSize();
void setWindowSize(Long value);
@Description("Window slide, in seconds")
@Default.Long(SLIDE_SIZE)
Long getSlide();
void setSlide(Long value);
@Description("The Kafka topic to read from")
@Default.String(KAFKA_TOPIC)
String getKafkaTopic();
void setKafkaTopic(String value);
@Description("The Kafka topic to write to")
@Default.String(KAFKA_OUTPUT_TOPIC)
String getOutputKafkaTopic();
void setOutputKafkaTopic(String value);
@Description("The Kafka Broker to read from")
@Default.String(KAFKA_BROKER)
String getBroker();
void setBroker(String value);
@Description("The Zookeeper server to connect to")
@Default.String(ZOOKEEPER)
String getZookeeper();
void setZookeeper(String value);
@Description("The groupId")
@Default.String(GROUP_ID)
String getGroup();
void setGroup(String value);
}

KafkaIO Read

1
2
3
4
5
6
KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic(KAFKA_TOPIC) // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata() // PCollection<KV<String, String>>

KafkaIO Write

1
2
3
4
5
6
7
8
KafkaIO.<Long, String>write()
.withBootstrapServers("localhost:9092")
.withTopic(KAFKA_OUTPUT_TOPIC)
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
// you can further customize KafkaProducer used to write the records by adding more
// settings for ProducerConfig. e.g, to enable compression :
//.updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.exmind.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class App {
private static final long WINDOW_SIZE = 10;
// Default window duration in seconds
private static final long SLIDE_SIZE = 5;
// Default window slide in seconds
private static final String KAFKA_TOPIC = "input-topic";
private static final String KAFKA_OUTPUT_TOPIC = "output-topic";
private static final String KAFKA_BROKER = "localhost:9092";
// Default kafka broker to contact
private static final String GROUP_ID = "beamGroup"; // Default groupId
private static final String ZOOKEEPER = "localhost:2181";
public interface KafkaOptions extends PipelineOptions {
@Description("Sliding window duration, in seconds")
@Default.Long(WINDOW_SIZE)
Long getWindowSize();
void setWindowSize(Long value);
@Description("Window slide, in seconds")
@Default.Long(SLIDE_SIZE)
Long getSlide();
void setSlide(Long value);
@Description("The Kafka topic to read from")
@Default.String(KAFKA_TOPIC)
String getKafkaTopic();
void setKafkaTopic(String value);
@Description("The Kafka topic to write to")
@Default.String(KAFKA_OUTPUT_TOPIC)
String getOutputKafkaTopic();
void setOutputKafkaTopic(String value);
@Description("The Kafka Broker to read from")
@Default.String(KAFKA_BROKER)
String getBroker();
void setBroker(String value);
@Description("The Zookeeper server to connect to")
@Default.String(ZOOKEEPER)
String getZookeeper();
void setZookeeper(String value);
@Description("The groupId")
@Default.String(GROUP_ID)
String getGroup();
void setGroup(String value);
}
public static void main(String[] args) {
KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic(KAFKA_TOPIC) // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata() // PCollection<KV<String, String>>
).apply(KafkaIO.<Long, String>write()
.withBootstrapServers("localhost:9092")
.withTopic(KAFKA_OUTPUT_TOPIC)
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
// you can further customize KafkaProducer used to write the records by adding more
// settings for ProducerConfig. e.g, to enable compression
//.updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
);
pipeline.run();
}
}

这里注释掉的updateProducerProperties方法,可以用于设置kafka写入时所需的参数

运行任务

1
2
3
4
5
mvn exec:java -Dexec.mainClass=com.exmind.beam.App \
-Pflink-runner \
-Dexec.args="--runner=FlinkRunner \
--flinkMaster=localhost:6123 \
--filesToStage=target/com.exmind.beam-0.0.1-SNAPSHOT.jar"

使用-P指定runner,这里指定flink-runner作为运行器

流程验证

flink运行状态检查

img

kafka数据输入输出检查

img

参考链接:
https://beam.apache.org/documentation/sdks/javadoc/2.0.0/
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java

坚持原创技术分享,您的支持将鼓励为继续创作!