大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
02-Flink-WordCount编程
Flink
Java
2023-05-17 14:15:10
32
0
0
bigdata
Flink
Java
**一、建立WordCount工程** ```bash # 创建工程文件夹~/bigdata/flink/wordcount nbu@ecs:~$ mkdir -p bigdata/flink/wordcount nbu@ecs:~$ cd bigdata/flink/wordcount # 创建项目package org.nbubigdata.flink nbu@ecs:~/bigdata/flink/wordcount$ mkdir -p src/org/nbubigdata/flink # 创建输出文件夹classes 用于保存编译后的class文件及jar包 nbu@ecs:~/bigdata/flink/wordcount$ mkdir classes nbu@ecs:~/bigdata/flink/wordcount$ cd src/org/nbubigdata/flink nbu@ecs:~/bigdata/flink/wordcount/src/org/nbubigdata/flink$ vim WordCount.java ``` 输入如下内容: ```java package org.nbubigdata.flink import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; // 批处理word count public class WordCount { public static void main(String[] args) throws Exception{ // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 String inputPath = "/home/nbu/bigdata/flink/data/hello.txt"; DataSet<String> inputDataSet = env.readTextFile(inputPath); System.out.println("WordCount-begin"); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) // 按照第一个位置的word分组 .sum(1); // 将第二个位置上的数据求和 resultSet.print(); System.out.println("WordCount-end"); } // 自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历所有word,包成二元组输出 for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } } } ``` 在`/home/nbu/bigdata/flink/data/`下建立`hello.txt`文件,内容如下: ``` hello world hello flink hello nbu hello nbu ``` **二、使用 javac 命令进行编译** 配置CLASSPATH ```bash nbu@ecs:~$ vim ~/.bashrc # 在文件最后加上 export FLINK_HOME=/usr/local/flink export CLASSPATH=$CLASSPATH:$FLINK_HOME/lib/* nbu@ecs:~$ source ~/.bashrc ``` Javac编译 ```bash nbu@ecs:~/bigdata/flink/wordcount/src/org/nbubigdata/flink$ cd ~/bigdata/flink/wordcount nbu@ecs:~/bigdata/flink/wordcount$ javac -d ./classes ./src/org/nbubigdata/flink/WordCount.java ``` 编译完成后可以看到`~/bigdata/flink/wordcount/classes/org/nbubigdata/flink/` 下多了一个`WordCount.class`文件 **三、提交运行** 使用 java 命令运行编译后代码。 ```bash # 开启Flink集群 nbu@ecs:~/helloworld$/usr/local/flink/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host ecs. Starting taskexecutor daemon on host ecs. # 进入classes文件夹下 nbu@ecs:~/helloworld$ cd classes/ # 运行java对class文件进行测试 nbu@ecs:~/bigdata/flink/wordcount/classes$ java org.nbubigdata.flink.WordCount ``` 可以看到对`/home/nbu/bigdata/flink/data/hello.txt`中的内容进行了WordCount  **四、打成可运行的 jar 包** ```bash nbu@ecs:~/bigdata/flink/wordcount/classes$ vim MANIFEST.mf ``` 经过上述步骤以后,可以先在classes目录下编写一个MANIFEST.mf 文件内容如下,这里要注意每个冒号后面的英文空格,每行结束都要有个回车。 ``` Manifest-Version: 1.0 Class-Path: Created-By: 1.8.0_312 (Sun Microsystems Inc.) Main-Class: org.nbubigdata.flink.WordCount ``` 输入以下命令,可在classes文件夹下生成HelloWorld.jar包 ```bash nbu@ecs:~/bigdata/flink/wordcount/classes$ jar -cvfm WordCount.jar MANIFEST.mf org added manifest adding: org/(in = 0) (out= 0)(stored 0%) adding: org/nbubigdata/(in = 0) (out= 0)(stored 0%) adding: org/nbubigdata/flink/(in = 0) (out= 0)(stored 0%) adding: org/nbubigdata/flink/WordCount.class(in = 1444) (out= 709)(deflated 50%) adding: org/nbubigdata/flink/WordCount$MyFlatMapper.class(in = 1481) (out= 676)(deflated 54%) nbu@ecs:~/bigdata/flink/wordcount/classes$ ll total 20 drwxrwxr-x 3 nbu nbu 4096 Dec 8 22:49 ./ drwxrwxr-x 4 nbu nbu 4096 Dec 8 22:33 ../ -rw-rw-r-- 1 nbu nbu 125 Dec 8 22:49 MANIFEST.mf drwxrwxr-x 3 nbu nbu 4096 Dec 8 22:40 org/ -rw-rw-r-- 1 nbu nbu 2442 Dec 8 22:49 WordCount.jar ``` > `jar -cfm`中`f`和`m`的次序要与`WordCount.jar`和`MANIFEST.mf`相对应 **五、命令行提交jar包** ```bash nbu@ecs:~/bigdata/flink/wordcount/classes$ /usr/local/flink/bin/flink run WordCount.jar ``` 得到如下结果  注意在提交前要开启Flink集群 ```bash nbu@ecs:~/helloworld$/usr/local/flink/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host ecs. Starting taskexecutor daemon on host ecs. ``` **六、WUI提交jar包** 在浏览器输入网址 127.0.0.1:8081 点击`Submit New Job`、`Add New`    
上一篇:
02 openGauss数据库开发调试工具指导手册
下一篇:
02-Flume安装部署
文档导航