大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
使用WindowAll定义全局窗口
无
2024-04-13 13:59:17
30
0
0
bigdata
### 使用`WindowAll`定义全局窗口 > Written by 黄俊仁,2024/04/13 如下示例代码在一个全局窗口(不分组)的基础上计算每个窗口内接收到的单词总数。它首先从自定义数据源 `NBUSource` 中接收字符串流,然后利用 `Tokenizer` 类将每条字符串拆分成单词,并生成包含单词和计数(初始化为1)的元组。这些元组随后通过 `windowAll` 方法被收集进一个固定时间长度(5秒)的滚动窗口中,窗口内的所有元素计数被累加起来,以计算出该时间段内的单词总数。 ```java package org.nbubigdata.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.Iterator; public class StreamWindowAll { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.addSource(new NBUSource()); env.setParallelism(1); // 将字符串转换为(word, 1)元组 DataStream<Tuple2<String, Integer>> words = text .flatMap(new Tokenizer()); // 使用windowAll对所有数据应用窗口函数 DataStream<Long> wordCounts = words .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 每5秒钟一个窗口 .apply(new AllWindowFunction<Tuple2<String, Integer>, Long, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Long> out) { long count = 0; Iterator<Tuple2<String, Integer>> iterator = values.iterator(); while (iterator.hasNext()) { count += iterator.next().f1; } out.collect(count); } }); // 打印结果 wordCounts.print(); // 执行程序 env.execute("WindowAll Example"); } // 自定义FlatMapFunction,用于拆分输入的字符串,并为每个单词生成(word, 1)形式的元组 public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { for (String word : value.split("\\s")) { out.collect(new Tuple2<>(word, 1)); } } } } ``` 由于数据流的产生间隔为200毫秒,在每个5秒的时间窗口内一般会总计输出25个单词。运行上述程序,可以在控制台中观察到如下的输出信息,在第1个时间窗口内共输出了7个单词,在后续各个时间窗口内都正常输出了25个单词。与 `keyBy` 后的 `window` 操作不同,`windowAll` 不需要流先经过键控(keyed)分区就可以应用窗口逻辑。`windowAll` 适用于整个数据流,而不是分离的键控流,这意味着它将对数据流中的所有元素进行窗口操作,而不考虑它们的键值。 
上一篇:
使用Table API处理有界数据流
下一篇:
使用Window对无界数据流进行时间分窗
文档导航