大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
11-Flume案例-自定义source
Flume
2022-09-27 17:22:47
19
0
0
bigdata
Flume
# Flume案例-自定义Source Source 是负责接收数据到 Flume Agent 的组件。 Source 组件可以处理各种类型、各种格式的日志数据, 包括 avro、 thrift、 exec、 jms、 spooling directory、 netcat、 sequencegenerator、 syslog、 http、 legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。 官方也提供了自定义 source 的接口: https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。 实现相应方法: getBackOffSleepIncrement()//暂不用 getMaxBackOffSleepInterval()//暂不用 configure(Context context)//初始化 context(读取配置文件内容)process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用。 使用场景:读取 MySQL 数据或者其他文件系统。 ## 1)案例需求: 使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。 ## 2)需求分析:   ## 3)实现步骤: ### 1.导入pom依赖 ```xml <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies> ``` ### 2.编写代码 ```java package org.nbubigdata.flume; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { //定义配置文件将来要读取的字段 private Long delay; private String field; //初始化配置信息 @Override public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field", "Hello!"); } @Override public Status process() throws EventDeliveryException { try { //创建事件头信息 HashMap<String, String> hearderMap = new HashMap<>(); //创建事件 SimpleEvent event = new SimpleEvent(); //循环封装事件 for (int i = 0; i < 5; i++) { //给事件设置头信息 event.setHeaders(hearderMap); //给事件设置内容 event.setBody((field + i).getBytes()); //将事件写入 channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } } ``` ### 3.测试 用maven对程序进行打包:  生成的jar包如下:  将打包好的jar包,放到 flume 的 lib 目录(/usr/local/flume/lib)下。 创建配置文件/usr/local/flume/job/mysource.conf 内容如下: ```bash # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.nbubigdata.flume.MySource a1.sources.r1.delay = 1000 #a1.sources.r1.field = nbu # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` 开启任务: ```bash nbu@ecs:~$ cd /usr/local/flume/ nbu@ecs:/usr/local/flume$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console ``` 结果展示: 
上一篇:
10-Zookeeper-watcher事件
下一篇:
11-InfluxDB数学计算
文档导航