大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
使用DataStream API处理无界数据流
无
2024-04-13 13:52:22
20
0
0
bigdata
### 使用DataStream API处理无界数据流 > Written by 黄俊仁,2024/04/13 首先需要自定义一个无界数据源。在Flink中可以通过继承`SourceFunction`接口,对`run()`方法和`cancel()`方法进行重写来实现自定义数据源。 ```java package org.nbubigdata.flink; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.ArrayList; import java.util.List; import java.util.Random; public class NBUSource implements SourceFunction<String> { private boolean isRunning = true; @Override public void run(SourceContext<String> sourceContext) throws Exception { while(isRunning) { //单词流 List<String> stringList = new ArrayList<>(); stringList.add("hello"); stringList.add("world"); stringList.add("NBU"); stringList.add("bigdata"); stringList.add("flink"); stringList.add("set"); stringList.add("stream"); stringList.add("batch"); stringList.add("table"); stringList.add("SQL"); int size = stringList.size(); int i = new Random().nextInt(size); sourceContext.collect(stringList.get(i)); //产生数据时间间隔200ms Thread.sleep(200); } } @Override //取消执行 public void cancel() { isRunning = false; } } ``` 如下程序是一个使用 Apache Flink 实现的实时流处理应用,目标是进行单词计数。它首先从自定义数据源 `NBUSource` 接收字符串流,然后使用 `Splitter` 类的 `flatMap` 方法将每个字符串拆分成单词,并为每个单词创建一个包含单词本身和数字 1 的元组。接着,程序通过 `keyBy` 方法按单词进行分组,并使用 `sum` 聚合操作来累加每个单词出现的次数。最后,聚合结果被打印到控制台。 ```java package org.nbubigdata.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamApi { public static void main(String[] args) throws Exception { //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); //获取自定义的数据流 DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new NBUSource()) .flatMap(new Splitter()) .keyBy(0) .sum(1); //打印数据到控制台 //System.out.println(env.getExecutionPlan()); dataStream.print(); //执行任务操作。因为flink是懒加载的,所以必须调用execute方法才会执行 env.execute("WordCount"); } //使用FlatMapFunction函数分割字符串 public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word : sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } } ``` 运行上述程序,可以在控制台中观察到如下的输出信息。由于运行程序的机器的CPU核心数为4,Flink作业会利用多个核心进行并行处理任务。 
上一篇:
使用 DataSet API 处理有界数据流
下一篇:
使用ProcessFunction实现计数器
文档导航