使用Flink Runner先决条件及步骤
检查apache beam对应版本所需要的flink版本
方式一:(推荐)
github上查找beam项目,找到指定版本的flink runner的pom.xml文件,检查flink.version版本
https://github.com/apache/beam/blob/v2.0.0/runners/flink/pom.xml
方式二:
官方所说的通过mvn命令查询支持的flink版本
下载,安装flink
下载链接:
flink-1.2.1-bin-hadoop24-scala_2.10.tgz
安装
flink安装可以参考官方的QuickStart文档进行安装(测试只需要安装local模式)。
引入maven依赖
使用java作为开发语言,需要在项目的pom.xml文件中加入此依赖
使用flink runner执行pipeline
apache beam使用flink runner通过-P指定
下面是官方给的一个例子
注:
–flinkMaster:指定flink jobmanager访问地址,本地模式为localhost:6123;
–filesToStage:指定运行的jar包
以下两个参数为pipeline options提供给flink runner的内置参数,更多内置参数请点击这里查阅
遇到的问题
依赖包
将以下以来包放入flink lib目录下
beam-runners-flink_2.10-2.0.0.jar
beam-sdks-java-core-2.0.0.jar
beam-sdks-java-io-kafka-2.0.0.jar
kafka-clients-0.10.1.0.jar
kafka_2.10-0.8.2.2.jar
beam-runners-core-java-2.0.0.jar
spring-core-4.3.6.RELEASE.jar
spring-expression-4.3.6.RELEASE.jar
问题:apache beam 2.0.0与apache flink1.3.0版本问题
apache beam 2.0.0的runner flink是基于flink 1.2.1版本编译的,而flink1.3.0版本删除了JobSnapshottingSettings类,导致用flink1.3.0运行的时候抛ClassNotFoundException异常。
解决方法:
- 如果想使用最新的flink1.3.0,建议直接从github下载源码自行编译,目前apache beam的master分支已经是采用flink1.3.0版本。
- 采用flink1.2.1版本运行程序
参考链接:
https://beam.apache.org/documentation/runners/flink/
https://github.com/apache/beam/blob/v2.0.0/runners/flink/pom.xml