大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
使用SQL处理有界数据流
无
2024-04-13 13:56:41
24
0
0
bigdata
### 使用SQL处理有界数据流 > Written by 黄俊仁,2024/04/13 如下的程序展示了如何在 Apache Flink 中使用批处理环境结合表和 SQL API 来执行数据处理任务。具体地,它首先创建了一个 `DataSet<NBUOrder>`,包含了订单信息,然后将这个数据集注册为一个临时视图,以便可以使用 SQL 对其进行查询。程序执行的 SQL 查询是对订单按产品名称进行分组并计算每个产品的总数量。最后,查询结果通过转换为 `DataSet<Row>` 打印输出。 ```java import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.types.Row; import org.nbubigdata.flink.Pojo.NBUOrder; import static org.apache.flink.table.api.Expressions.$; public class SQLBatch { public static void main(String[] args) throws Exception { // 获取Flink批处理作业的执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建一个BatchTableEnvironment。BatchTableEnvironment是用于Flink批处理中的表和SQL API的环境 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); // 使用env.fromElements创建一个DataSet<NBUOrder>。这里创建了一个NBUOrder对象的集合,每个对象都包含了订单的ID、产品名称和数量 DataSet<NBUOrder> input = env.fromElements( new NBUOrder(1L,"蔚来", 1), new NBUOrder(2L,"小鹏", 8), new NBUOrder(2L,"理想", 8), new NBUOrder(3L,"小米", 20)); // 将上面创建的DataSet注册为一个临时视图,这样就可以在SQL查询中使用它。这里指定了视图的列名为"id"、"product"和"amount" tEnv.createTemporaryView("NBUOrder", input,$("id"),$("product"), $("amount")); // 执行一个SQL查询,该查询选择产品名称和产品总数量,按产品名称分组 Table table = tEnv.sqlQuery( "SELECT product,SUM(amount) as amount FROM NBUOrder GROUP BY product"); // 将查询结果(Table)转换为DataSet<Row>并打印出来。这一步是作业的输出部分 tEnv.toDataSet(table, Row.class).print(); } } ``` 运行上述程序,可以在控制台中观察到如下的输出信息。 
上一篇:
使用SQL处理无界数据流
下一篇:
使用Table API处理无界数据流
文档导航