Apache Beam Runner - Flink

检查apache beam对应版本所需要的flink版本

方式一:(推荐)
github上查找beam项目,找到指定版本的flink runner的pom.xml文件,检查flink.version版本
https://github.com/apache/beam/blob/v2.0.0/runners/flink/pom.xml

1
2
3
<properties>
<flink.version>1.2.1</flink.version>
</properties>

方式二:
官方所说的通过mvn命令查询支持的flink版本

1
2
3
4
$ mvn dependency:tree -Pflink-runner |grep flink
...
[INFO] | +- org.apache.flink:flink-streaming-java_2.10:jar:1.1.2:runtime
...

下载链接:

flink-1.2.1-bin-hadoop24-scala_2.10.tgz

安装

flink安装可以参考官方的QuickStart文档进行安装(测试只需要安装local模式)。

引入maven依赖

使用java作为开发语言,需要在项目的pom.xml文件中加入此依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>2.0.0</version>
<scope>runtime</scope>
</dependency>

apache beam使用flink runner通过-P指定

1
$ mvn package -Pflink-runner

下面是官方给的一个例子

1
2
3
4
5
6
7
$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Pflink-runner \
-Dexec.args="--runner=FlinkRunner \
--inputFile=/path/to/pom.xml \
--output=/path/to/counts \
--flinkMaster=<flink master url> \
--filesToStage=target/word-count-beam--bundled-0.1.jar"

注:
–flinkMaster:指定flink jobmanager访问地址,本地模式为localhost:6123;
–filesToStage:指定运行的jar包
以下两个参数为pipeline options提供给flink runner的内置参数,更多内置参数请点击这里查阅

遇到的问题

依赖包

将以下以来包放入flink lib目录下

beam-runners-flink_2.10-2.0.0.jar
beam-sdks-java-core-2.0.0.jar
beam-sdks-java-io-kafka-2.0.0.jar
kafka-clients-0.10.1.0.jar
kafka_2.10-0.8.2.2.jar
beam-runners-core-java-2.0.0.jar
spring-core-4.3.6.RELEASE.jar
spring-expression-4.3.6.RELEASE.jar

问题:apache beam 2.0.0与apache flink1.3.0版本问题

apache beam 2.0.0的runner flink是基于flink 1.2.1版本编译的,而flink1.3.0版本删除了JobSnapshottingSettings类,导致用flink1.3.0运行的时候抛ClassNotFoundException异常。

1
2
3
4
5
java.lang.ClassNotFoundException: org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
... (省略部分日志)

解决方法:

  • 如果想使用最新的flink1.3.0,建议直接从github下载源码自行编译,目前apache beam的master分支已经是采用flink1.3.0版本。
  • 采用flink1.2.1版本运行程序

参考链接:
https://beam.apache.org/documentation/runners/flink/
https://github.com/apache/beam/blob/v2.0.0/runners/flink/pom.xml

坚持原创技术分享,您的支持将鼓励为继续创作!