大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
使用Table API处理有界数据流
无
2024-04-13 13:58:35
21
0
0
bigdata
### 使用`Table API`处理有界数据流 > Written by 黄俊仁,2024/04/13 首先创建一个Java的POJO类`NBUOrder`,如下所示: ```java package org.nbubigdata.flink.Pojo; public class NBUOrder { //定义类的属性 public Long id; public String product; public int amount; //无参构造方法 public NBUOrder() { } //有参构造方法 public NBUOrder(Long id, String product, int amount) { this.id = id; this.product = product; this.amount = amount; } @Override public String toString() { return "NBUOrder{" + "id=" + id + ", product='" + product + '\'' + ", amount=" + amount + '}'; } } ``` 如下示例代码使用Table API 实现了对一批订单数据的简单查询和过滤操作。首先,它创建了一个包含 `NBUOrder` 对象的 `DataSet`,每个对象都包含订单的ID、产品名称和数量。然后,程序将这个 `DataSet` 转换成 `Table`,使用 Table API 对其进行查询,筛选出数量(`amount`)大于或等于 8 的订单记录。最后,筛选后的结果被转换回 `DataSet<NBUOrder>` 。 ```java package org.nbubigdata.flink; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.nbubigdata.flink.Pojo.NBUOrder; import static org.apache.flink.table.api.Expressions.$; public class TableBatch { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建一个批处理表环境 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); // 使用env.fromElements方法创建一个DataSet<NBUOrder>。这里创建了一个NBUOrder对象的集合,每个对象包含了订单的ID、产品名称和数量 DataSet<NBUOrder> input = env.fromElements( new NBUOrder(1L,"蔚来", 7), new NBUOrder(2L,"小鹏", 8), new NBUOrder(2L,"理想", 8), new NBUOrder(3L,"小米", 20)); // 将DataSet转换为Table,这样可以利用Table API对数据进行更灵活的处理 Table table = tEnv.fromDataSet(input); // 对Table进行过滤操作,选择数量(amount)大于或等于8的记录 Table filtered = table.where($("amount").isGreaterOrEqual(8)); // 将过滤后的Table转换回DataSet<NBUOrder>,以便进行进一步处理或输出 // 这里指定了转换回的类型为NBUOrder类,这要求Table的schema与NBUOrder类的结构相匹配 DataSet<NBUOrder> result = tEnv.toDataSet(filtered, NBUOrder.class); result.print(); } } ``` 运行上述程序,可以在控制台中观察到如下的输出信息。 
上一篇:
使用Table API处理无界数据流
下一篇:
使用WindowAll定义全局窗口
文档导航