Apache beam - 创建Pipeline

Beam程序从头到尾表达了一个数据处理流程。这里介绍在Beam SDK中使用类构建管道的机制。要使用Beam SDK中的类构建管道,您的程序将需要执行以下一般步骤:

  • 创建一个Pipeline对象
  • 使用Read或者Create转换去为管道数据创建一个或多个PCollections
  • 应用Transforms到每个PCollection.Transforms可以对PCollection中的每个元素进行修改,过滤,分组,解析以及其他处理
  • 最后使用Write或者其他输出方式,完成PCollection的转换
  • 运行Pipeline

    创建Pipeline对象

    一个beam程序通常从创建Pipeline对象开始
    在Beam SDK中,一个Pipeline都以明确的类型对象表示Pipeline。每个Pipeline对象是一个独立的实体。它封装了Pipeline运行的数据和应用于该数据的转换。

    创建一个Pipeline,需要声明一个Pipeline对象,并通过它传递一些配置参数。
    1
    2
    3
    4
    5
    // Start by defining the options for the pipeline.
    PipelineOptions options = PipelineOptionsFactory.create();
    // Then create the pipeline.
    Pipeline p = Pipeline.create(options);

将数据读取进管道

创建Pipeline并初始化PCollection,应用root转换到pipeline对象。一个root转换从外部数据源或指定的某些本地数据创建一个PCollection。
Beam SDK包含两种root转换:Read和Create。Read转换从外部数据(例如文本文件或数据库表)源读取数据。Create转换从java.util.Collection内存创建一个PCollection

以下示例代码显示了如何apply的TextIO.Read root转换从文本文件中读取数据。该转换应用于一个Pipeline对象p,并返回一个类型为PCollection的流水线数据集

1
2
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from("gs://some/inputData.txt"));

应用转换处理Pipeline数据

这里我们可以使用Beam SDK提供的各种转换操作数据。通过PCollection的apply方法处理每一个PCollection并所需的转换对象作为参数进行传递,将转换应用于Pipeline。

以下的示例代码演示了如何应用一个转换到类型为string的PCollection上。
用户定义的自定义变换,它可以反转每个字符串的内容,并输出一个包含反转字符串的PCollection新变量

1
2
PCollection<String> words = ...;
PCollection<String> reversedWords = words.apply(new ReverseWords());

写入或输出最终管道数据

应用所有的转换之后,你需要将计算结果进行输出。输出Pipeline最终的PCollections,可以使用Write转换进行操作。Write转换可以输出每一个PCollection到一个外部data sink,例如数据库表。你可以在pipeline中通过Write输出PCollection,通常会在pipeline末端写出数据。

下面的示例代码演示了如何apply一个TextIO.Write转换将PCollection的String文本写入到文件:

1
2
3
PCollection<String> filteredWords = ...;
filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));

运行Pipeline

构建完Pipeline后,可以采用run方法运行。

1
p.run();

run方法是异步的。如果想要同步运行,可以在run方法后面添加waitUntilFinish方法

1
p.run().waitUntilFinish();

参考链接:
https://beam.apache.org/documentation/pipelines/create-your-pipeline/

坚持原创技术分享,您的支持将鼓励为继续创作!