大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
使用Table API处理无界数据流
无
2024-04-13 13:57:37
18
0
0
bigdata
### 使用`Table API`处理无界数据流 > Written by 黄俊仁,2024/04/13 如下示例代码从自定义数据源 `NBUSource` 接收实时字符串流,并将这个流转换成表(`Table`)。它使用 Table API 执行一个查询,筛选出包含字母 't' 的所有字符串。最后,筛选后的结果被转换回 `DataStream<Row>` 。 ```java package org.nbubigdata.flink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class TableStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置环境配置,使用Blink Planner并启用流模式 EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() // 使用Blink Planner .inStreamingMode() // 设置为流处理模式 .build(); // 根据流执行环境和设置创建一个StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings); // 添加自定义的数据源 DataStream<String> dataStream = sEnv.addSource(new NBUSource()); // 将数据流转换为Table,并定义列名为"word" Table table1 = tEnv.fromDataStream(dataStream,$("word")); // 使用Table API对Table进行查询操作,筛选出包含字母't'的word列 Table table = table1.where($("word").like("%t%")); // 使用explain方法获取Table查询的执行计划,这对于理解Flink如何执行查询很有帮助 //String explanation_old = tEnv.explain(table); // 打印查询执行计划 System.out.println(explanation_old); // 将查询结果转换回DataStream<Row>并打印,"table"是前缀标签 tEnv.toAppendStream(table, Row.class).print("table"); // 启动Flink作业执行 sEnv.execute(); } } ``` 运行上述程序,可以在控制台中观察到如下的输出信息。 
上一篇:
使用SQL处理有界数据流
下一篇:
使用Table API处理有界数据流
文档导航