大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
05-Kafka生产者示例
Kafka
Java
2022-12-05 23:06:00
35
0
0
bigdata
Kafka
Java
Java 客户端创建一个用于发布和使用消息的应用程序。 Kafka 生产者客户端包括以下 API。 # KafkaProducer API 让我们了解本节中最重要的一组 Kafka 生产者 API。 KafkaProducer API 的中心部分是 KafkaProducer 类。 KafkaProducer 类提供了一个选项,用于将其构造函数中的 Kafka 代理连接到以下方法。 * KafkaProducer 类提供 send 方法以异步方式将消息发送到主题。 send() 的签名如下 ```java producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback); ``` * **ProducerRecord** - 生产者管理等待发送的记录的缓冲区。 * **回调** - 当服务器确认记录时执行的用户提供的回调 (null 表示无回调)。 * KafkaProducer 类提供了一个 flush 方法,以确保所有先前发送的消息都已实际完成。 flush 方法的语法如下 - ```java public void flush() ``` * KafkaProducer 类提供了 partitionFor 方法,这有助于获取给定主题的分区元数据。 这可以用于自定义分区。 这种方法的签名如下 - ```java public Map metrics() ``` 它返回由生产者维护的内部度量的映射。 * public void close() - KafkaProducer 类提供关闭方法块,直到所有先前发送的请求完成。 # 生产者 API 生产者 API 的中心部分是生产者类。 生产者类提供了一个选项,通过以下方法在其构造函数中连接 Kafka 代理。 ## 生产者类 生产者类提供 send 方法以使用以下签名向单个或多个主题**发送**消息。 ```java public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,partitioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,"async") ProducerConfig config = new ProducerConfig(prop); ``` 有两种类型的生产者 - **同步**和**异步**。 相同的 API 配置也适用于同步生产者。 它们之间的区别是同步生成器直接发送消息,但在后台发送消息。 当您想要更高的吞吐量时,异步生产者是首选。 在以前的版本,如 0.8,一个异步生产者没有回调 send() 注册错误处理程序。 这仅在当前版本 0.9 中可用。 ## public void close() 生产者类提供**关闭**方法以关闭与所有 Kafka 代理的生产者池连接。 ## 配置设置 下表列出了 Producer API 的主要配置设置,以便更好地理解 - <table><tbody><tr><th trans_replace="0" trans="1">S.No</th><th trans_replace="0" trans="1">配置设置和说明</th></tr><tr><td trans="1">1</td><td trans="1"><p trans_replace="0" trans="1"><strong>client.id</strong></p><p trans_replace="0" trans="1">标识生产者应用程序</p></td></tr><tr><td trans="1">2</td><td trans="1"><p trans_replace="0" trans="1"><strong>producer.type</strong></p><p trans_replace="0" trans="1">同步或异步</p></td></tr><tr><td trans="1">3</td><td trans="1"><p trans_replace="0" trans="1"><strong>acks</strong></p><p trans_replace="0" trans="1">acks 配置控制生产者请求下的标准是完全的。</p></td></tr><tr><td trans="1">4</td><td trans="1"><p trans_replace="0" trans="1"><strong>重试</strong></p><p trans_replace="0" trans="1">如果生产者请求失败,则使用特定值自动重试。</p></td></tr><tr><td trans="1">5</td><td trans="1"><p trans_replace="0" trans="1">bootstrapping 代理列表。</p></td></tr><tr><td trans="1">6</td><td trans="1"><p trans_replace="0" trans="1"><strong>linger.ms</strong></p><p trans_replace="0" trans="1">如果你想减少请求的数量,你可以将 linger.ms 设置为大于某个值的东西。</p></td></tr><tr><td trans="1">7</td><td trans="1"><p trans_replace="0" trans="1"><strong>key.serializer</strong></p><p trans_replace="0" trans="1">序列化器接口的键。</p></td></tr><tr><td trans="1">8</td><td trans="1"><p trans_replace="0" trans="1"><strong>value.serializer</strong></p><p trans_replace="0" trans="1">值。</p></td></tr><tr><td trans="1">9</td><td trans="1"><p trans_replace="0" trans="1"><strong>batch.size</strong></p><p trans_replace="0" trans="1">缓冲区大小。</p></td></tr><tr><td trans="1">10</td><td trans="1"><p trans_replace="0" trans="1"><strong>buffer.memory</strong></p><p trans_replace="0" trans="1">控制生产者可用于缓冲的存储器的总量。</p></td></tr></tbody></table> ## ProducerRecord API ProducerRecord 是发送到 Kafka cluster.ProducerRecord 类构造函数的键 / 值对,用于使用以下签名创建具有分区,键和值对的记录。 ```java public ProducerRecord (string topic, int partition, k key, v value) ``` * **主题** - 将附加到记录的用户定义的主题名称。 * **分区** - 分区计数。 * **键** - 将包含在记录中的键。 * **值** - 记录内容。 ```java public ProducerRecord (string topic, k key, v value) ``` ProducerRecord 类构造函数用于创建带有键,值对和无分区的记录。 * **主题** - 创建主题以分配记录。 * **键** - 记录的键。 * **值** - 记录内容。 ```java public ProducerRecord (string topic, v value) ``` ProducerRecord 类创建一个没有分区和键的记录。 * **主题** - 创建主题。 * **值** - 记录内容。 ProducerRecord 类方法列在下表中 - <table><tbody><tr><th trans_replace="0" trans="1">S.No</th><th trans_replace="0" trans="1">类方法和描述</th></tr><tr><td trans="1">1</td><td trans="1"><p trans_replace="0" trans="1"><strong>public string topic()</strong></p><p trans_replace="0" trans="1">主题将附加到记录。</p></td></tr><tr><td trans="1">2</td><td trans="1"><p trans_replace="0" trans="1"><strong>public K key()</strong></p><p trans_replace="0" trans="1">将包括在记录中的键。 如果没有这样的键,null 将在这里重新打开。</p></td></tr><tr><td trans="1">3</td><td trans="1"><p trans_replace="0" trans="1"><strong>public V value()</strong></p><p trans_replace="0" trans="1">记录内容。</p></td></tr><tr><td trans="1">4</td><td trans="1"><p trans_replace="0" trans="1"><strong>partition()</strong></p><p trans_replace="0" trans="1">记录的分区计数</p></td></tr></tbody></table> # SimpleProducer 应用程序 在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建`nbu`的主题。 之后,创建一个名为 SimpleProducer.java 的 java 类。 ```bash #启动zookeeper nbu@ecs:~$ cd /usr/local/zookeeper/bin nbu@ecs:/usr/local/zookeeper/bin$ ./zkServer.sh start #启动kafka nbu@ecs:/usr/local/zookeeper/bin$ cd /usr/local/kafka/bin nbu@ecs:/usr/local/kafka/bin$ nohup ./kafka-server-start.sh ../config/server.properties & #创建nbu主题 nbu@ecs:/usr/local/kafka/bin$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic nbu #创建文件夹`~/bigdata/kafka/kafka-demo` nbu@ecs:~$ mkdir -p ~/bigdata/kafka/kafka-demo nbu@ecs:~$ cd ~/bigdata/kafka/kafka-demo/ #创建SimpleProducer.java文件 nbu@ecs:~/bigdata/kafka/kafka-demo$ vim SimpleProducer.java ``` 输入如下内容: ```java //import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named "SimpleProducer" public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println("Message sent successfully"); producer.close(); } } ``` **编译** - 可以使用以下命令编译应用程序。 ```bash nbu@ecs:~/bigdata/kafka/kafka-demo$ javac -cp "/usr/local/kafka/libs/*":. SimpleProducer.java ``` > 若在CLASSPATH中指定了`/usr/local/kafka/libs/*`,则不需要`-cp "/usr/local/kafka/libs/*":.`来指定kafka库文件位置 **执行** - 可以使用以下命令执行应用程序。 ```bash nbu@ecs:~/bigdata/kafka/kafka-demo$ java SimpleProducer nbu ``` **输出**  ```bash #查看nbu主题的内容 nbu@ecs:~$ cd /usr/local/kafka nbu@ecs:/usr/local/kafka$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic nbu -from-beginning ``` 
上一篇:
05-InfluxDB采样和数据保留
下一篇:
05-MongoDB基本操作
文档导航