多流处理
无    2024-04-13 14:01:29    31    0    0
bigdata

多流处理

Written by 黄俊仁,2024/04/13

这个程序展示了如何在 Apache Flink 中使用 connect 操作符将两个不同的数据流连接起来,并通过 CoMapFunction 分别对这两个流的元素进行处理。每个流的数据通过 NBUSource 产生,并分别加上前缀 “Stream1: ” 和 “Stream2: ” 来区分来源,最后将处理过的数据统一输出到控制台。这个示例演示了 Flink 处理多源数据流并在单个作业中应用不同逻辑的能力。

  1. package org.nbubigdata.flink;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
  6. public class MultiStreamTransformation {
  7. public static void main(String[] args) throws Exception {
  8. // 设置Flink流执行环境
  9. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. // 创建两个使用NBUSource的数据流
  11. DataStream<String> stream1 = env.addSource(new NBUSource());
  12. DataStream<String> stream2 = env.addSource(new NBUSource());
  13. // 使用connect操作符连接两个流
  14. ConnectedStreams<String, String> connectedStreams = stream1.connect(stream2);
  15. // 使用CoMapFunction对连接后的流进行处理
  16. DataStream<String> resultStream = connectedStreams.map(new CoMapFunction<String, String, String>() {
  17. // 第一个流中的元素使用这个方法处理
  18. @Override
  19. public String map1(String value) {
  20. return "Stream1: " + value;
  21. }
  22. // 第二个流中的元素使用这个方法处理
  23. @Override
  24. public String map2(String value) {
  25. return "Stream2: " + value;
  26. }
  27. });
  28. // 输出处理后的结果
  29. resultStream.print();
  30. // 执行Flink程序
  31. env.execute("MultiStream Transformation Example");
  32. }
  33. }

运行上述程序,可以在控制台中观察到如下的输出信息。

image-20240408213620468

可以观察到,两个使用 NBUSource 的数据流被连接(connect)起来。 CoMapFunction 为来自第一个数据流的每个字符串添加前缀 "Stream1: ",并为来自第二个数据流的每个字符串添加前缀 "Stream2: "

上一篇: 使用keyBy对无界数据流进行分区

下一篇: 实验01-数据定义

文档导航