一、建立WordCount工程
# 创建工程文件夹~/bigdata/flink/wordcount
nbu@ecs:~$ mkdir -p bigdata/flink/wordcount
nbu@ecs:~$ cd bigdata/flink/wordcount
# 创建项目package org.nbubigdata.flink
nbu@ecs:~/bigdata/flink/wordcount$ mkdir -p src/org/nbubigdata/flink
# 创建输出文件夹classes 用于保存编译后的class文件及jar包
nbu@ecs:~/bigdata/flink/wordcount$ mkdir classes
nbu@ecs:~/bigdata/flink/wordcount$ cd src/org/nbubigdata/flink
nbu@ecs:~/bigdata/flink/wordcount/src/org/nbubigdata/flink$ vim StreamWordCount.java
输入如下内容:
package org.nbubigdata.flink
package org.nbubigdata.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文本流
DataStreamSource<String> lineDSS = env.socketTextStream("localhost",
7777);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING).map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
}
}
二、使用 javac 命令进行编译
配置CLASSPATH
nbu@ecs:~$ vim ~/.bashrc
# 在文件最后加上
export FLINK_HOME=/usr/local/flink
export CLASSPATH=$CLASSPATH:$FLINK_HOME/lib/*
nbu@ecs:~$ source ~/.bashrc
Javac编译
nbu@ecs:~/bigdata/flink/wordcount/src/org/nbubigdata/flink$ cd ~/bigdata/flink/wordcount
nbu@ecs:~/bigdata/flink/wordcount$ javac -d ./classes ./src/org/nbubigdata/flink/StreamWordCount.java
编译完成后可以看到~/bigdata/flink/wordcount/classes/org/nbubigdata/flink/
下多了一个StreamWordCount.java
文件
三、打成可运行的 jar 包
nbu@ecs:~/bigdata/flink/wordcount/classes$ vim MANIFEST.mf
经过上述步骤以后,可以先在classes目录下编写一个MANIFEST.mf
文件内容如下,这里要注意每个冒号后面的英文空格,每行结束都要有个回车。
Manifest-Version: 1.0
Class-Path:
Created-By: 1.8.0_312 (Sun Microsystems Inc.)
Main-Class: org.nbubigdata.flink.StreamWordCount
输入以下命令,可在classes文件夹下生成HelloWorld.jar包
nbu@ecs:~/bigdata/flink/wordcount/classes$ jar -cvfm StreamWordCount.jar MANIFEST.mf org
added manifest
adding: org/(in = 0) (out= 0)(stored 0%)
adding: org/nbubigdata/(in = 0) (out= 0)(stored 0%)
adding: org/nbubigdata/flink/(in = 0) (out= 0)(stored 0%)
adding: org/nbubigdata/flink/StreamWordCount.class(in = 5541) (out= 2159)(deflated 61%)
adding: org/nbubigdata/flink/WordCount.class(in = 1444) (out= 709)(deflated 50%)
adding: org/nbubigdata/flink/WordCount$MyFlatMapper.class(in = 1481) (out= 676)(deflated 54%)
jar -cfm
中f
和m
的次序要与WordCount.jar
和MANIFEST.mf
相对应
四、命令行提交jar包
# 开启Flink集群
nbu@ecs:~/helloworld$/usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host ecs.
Starting taskexecutor daemon on host ecs.
另开启一个netcat终端,输入如下命令
nbu@ecs:~$ nc -lk 7777
在原Flink终端提交StreamWordCount.jar
nbu@ecs:~/bigdata/flink/wordcount/classes$ /usr/local/flink/bin/flink run StreamWordCount.jar
在netcat终端
输入
hello world
hello flink
hello nbu
hello world
hello flink
hello nbu
可在/usr/local/flink/log/flink-nbu-taskexecutor-0-ecs.out
中看到输出结果
nbu@ecs:~/bigdata/flink/wordcount/classes$ tail -f /usr/local/flink/log/flink-*-taskexecutor-*.out
输出如下: