您的位置:首页 > 科技 > 能源 > 辽宁建设执业信息网官网_手机优化怎么关闭_月嫂免费政府培训中心_百度网址收录入口

辽宁建设执业信息网官网_手机优化怎么关闭_月嫂免费政府培训中心_百度网址收录入口

2025/2/28 4:49:41 来源:https://blog.csdn.net/weixin_43783284/article/details/145589756  浏览:    关键词:辽宁建设执业信息网官网_手机优化怎么关闭_月嫂免费政府培训中心_百度网址收录入口
辽宁建设执业信息网官网_手机优化怎么关闭_月嫂免费政府培训中心_百度网址收录入口

需求说明:

  • 创建 Kafka 生产者,采用异步的方式生产者将数据发送到 Kafka Broker

一、消息的异步发送API:

1.1.异步发送含义:

  • 1.所谓的异步发送是指 将外部的数据发送到队列中,不管队列中的数据有没有发送到kafka集群,main线程会把数据一批的一批的发送到队列中
    在这里插入图片描述

1.2.编码实现异步发送:

a.创建kafka工程

在这里插入图片描述
在这里插入图片描述

b.导入kafka依赖:

	<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies>

c.实现无回调功能的异步发送:

  • 1.实现生产者发送消息
package com.qun.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {//0.属性配置Properties properties = new Properties();//连接到Kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,hadoop104:9092,hadoop105:9092");//指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//1.创建生产者对象KafkaProducer<String, String> KafkaProducer = new KafkaProducer<>(properties);//first是代表的发送到的主题//2.发送数据for (int i = 0; i < 5; i++) {KafkaProducer.send(new ProducerRecord<>("firstopic", "jianqun" + i));//参数1:数据发送到哪个主题;  参数2:发送的数据内容}//3.关闭资源KafkaProducer.close();}
}
  • 2.代码测试:
    • 在服务器上q启动kafka的消费者,并指定消费的主题:bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic firstopic
      在这里插入图片描述
    • 执行生产者代码:
    • 查看消费者消费的消息:
      在这里插入图片描述

d.带回调函数的异步发送流程:

  • 1.什么叫回调函数:
    • 带回调函数的异步发送就是指生产者发送数据后,对列会返回数据所在的队列、分区等数据
    • 回调函数会在生产者收到 ack 时被调用,为异步调用,回调函数方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败
      在这里插入图片描述
  • 2.编码实现生产者发送消息:
package com.qun.kafka.producer;import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop103:9092,hadoop104:9092,hadoop105:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new  KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i), new Callback() {//该方法在Producer收到ack时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " +   metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2);}// 5. 关闭资源kafkaProducer.close();}}
  • 3.测试:
    • 启动kafka消费者:
      在这里插入图片描述
    • 执行生产者代码,可以看到在控制台上输出的内容:
      在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试


二、同步发送API:

2.1.同步发送概念:

  • 1.同步发送就是发送到对列中的数据,必须全部发送到Broke后,生产者才可以继续发送数据到队列中

2.2.代码实现:

  • 1.只需在异步发送的基础上,再调用一下 get()方法即可
package com.qun.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerTongBu {public static void main(String[] args) throws InterruptedException, ExecutionException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,hadoop104:9092,hadoop105:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<>("firstopic", "jianqun +++ " + i)).get();// 延迟一会会看到数据发往不同分区Thread.sleep(2);}// 5. 关闭资源kafkaProducer.close();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com