Beam程序从头到尾表达了一个数据处理流程。这里介绍在Beam SDK中使用类构建管道的机制。要使用Beam SDK中的类构建管道,您的程序将需要执行以下一般步骤:
- 创建一个Pipeline对象
- 使用Read或者Create转换去为管道数据创建一个或多个PCollections
- 应用Transforms到每个PCollection.Transforms可以对PCollection中的每个元素进行修改,过滤,分组,解析以及其他处理
- 最后使用Write或者其他输出方式,完成PCollection的转换
- 运行Pipeline
创建Pipeline对象
创建一个Pipeline,需要声明一个Pipeline对象,并通过它传递一些配置参数。一个beam程序通常从创建Pipeline对象开始
在Beam SDK中,一个Pipeline都以明确的类型对象表示Pipeline。每个Pipeline对象是一个独立的实体。它封装了Pipeline运行的数据和应用于该数据的转换。12345// Start by defining the options for the pipeline.PipelineOptions options = PipelineOptionsFactory.create();// Then create the pipeline.Pipeline p = Pipeline.create(options);
将数据读取进管道
创建Pipeline并初始化PCollection,应用root转换到pipeline对象。一个root转换从外部数据源或指定的某些本地数据创建一个PCollection。
Beam SDK包含两种root转换:Read和Create。Read转换从外部数据(例如文本文件或数据库表)源读取数据。Create转换从java.util.Collection内存创建一个PCollection
以下示例代码显示了如何apply的TextIO.Read root转换从文本文件中读取数据。该转换应用于一个Pipeline对象p,并返回一个类型为PCollection
应用转换处理Pipeline数据
这里我们可以使用Beam SDK提供的各种转换操作数据。通过PCollection的apply方法处理每一个PCollection并所需的转换对象作为参数进行传递,将转换应用于Pipeline。
以下的示例代码演示了如何应用一个转换到类型为string的PCollection上。
用户定义的自定义变换,它可以反转每个字符串的内容,并输出一个包含反转字符串的PCollection新变量
|
|
写入或输出最终管道数据
应用所有的转换之后,你需要将计算结果进行输出。输出Pipeline最终的PCollections,可以使用Write转换进行操作。Write转换可以输出每一个PCollection到一个外部data sink,例如数据库表。你可以在pipeline中通过Write输出PCollection,通常会在pipeline末端写出数据。
下面的示例代码演示了如何apply一个TextIO.Write转换将PCollection的String文本写入到文件:
运行Pipeline
构建完Pipeline后,可以采用run方法运行。
run方法是异步的。如果想要同步运行,可以在run方法后面添加waitUntilFinish方法
参考链接:
https://beam.apache.org/documentation/pipelines/create-your-pipeline/