您的位置:首页 > 科技 > 能源 > 黄石网站设计_网站建设方案书模板下载_百度网盘官网_青岛网站推广系统

黄石网站设计_网站建设方案书模板下载_百度网盘官网_青岛网站推广系统

2024/12/23 5:11:00 来源:https://blog.csdn.net/qq_36774734/article/details/144061644  浏览:    关键词:黄石网站设计_网站建设方案书模板下载_百度网盘官网_青岛网站推广系统
黄石网站设计_网站建设方案书模板下载_百度网盘官网_青岛网站推广系统

1,

<dependencies><!-- Spring Boot Starter for Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Web (可选,如果需要其他Spring Boot功能) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

2,KafkaProducerTest.java

# application.properties
spring.kafka.bootstrap-servers=127.0.0.1:9092 # Kafka服务器地址
spring.kafka.consumer.group-id=myGroup # 消费者组ID

3,KafkaConsumerTest.java

package cn.itcast.kafka;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** Kafka的生产者程序* 会将消息创建出来,并发送到Kafka集群中** 1. 创建用于连接Kafka的Properties配置* Properties props = new Properties();* props.put("bootstrap.servers", "192.168.88.100:9092");* props.put("acks", "all");* props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");* props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");** 2. 创建一个生产者对象KafkaProducer* 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值* 4. 再调用一个Future.get()方法等待响应* 5. 关闭生产者*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建用于连接Kafka的Properties配置Properties props = new Properties();props.put("bootstrap.servers", "node1.itcast.cn:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建一个生产者对象KafkaProducerKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);// 3. 发送1-100的消息到指定的topic中for(int i = 0; i < 10000000; ++i) {// 一、使用同步等待的方式发送消息// // 构建一条消息,直接new ProducerRecord// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");// Future<RecordMetadata> future = kafkaProducer.send(producerRecord);// // 调用Future的get方法等待响应// future.get();// System.out.println("第" + i + "条消息写入成功!");// 二、使用异步回调的方式发送消息ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test_1m", null, i + "");kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 1. 判断发送消息是否成功if(exception == null) {// 发送成功// 主题String topic = metadata.topic();// 分区idint partition = metadata.partition();// 偏移量long offset = metadata.offset();System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);}else {// 发送出现错误System.out.println("生产消息出现异常!");// 打印异常消息System.out.println(exception.getMessage());// 打印调用栈System.out.println(exception.getStackTrace());}}});}// 4.关闭生产者kafkaProducer.close();}
}

4,KafkaConsumerTest.java

package cn.itcast.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 消费者程序** 1.创建Kafka消费者配置* Properties props = new Properties();* props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");* props.setProperty("group.id", "test");* props.setProperty("enable.auto.commit", "true");* props.setProperty("auto.commit.interval.ms", "1000");* props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");* props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");** 2.创建Kafka消费者* 3.订阅要消费的主题* 4.使用一个while循环,不断从Kafka的topic中拉取消息* 5.将将记录(record)的offset、key、value都打印出来*/
public class KafkaConsumerTest {public static void main(String[] args) throws InterruptedException {// 1.创建Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092");// 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的props.setProperty("group.id", "test");// 自动提交offsetprops.setProperty("enable.auto.commit", "true");// 自动提交offset的时间间隔props.setProperty("auto.commit.interval.ms", "1000");// 拉取的key、value数据的props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2.创建Kafka消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);// 3. 订阅要消费的主题// 指定消费者从哪个topic中拉取数据kafkaConsumer.subscribe(Arrays.asList("test"));// 4.使用一个while循环,不断从Kafka的topic中拉取消息while(true) {// Kafka的消费者一次拉取一批的数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 主题String topic = consumerRecord.topic();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();// key\valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);}Thread.sleep(1000);}}
}

5,同步等待的方式发送消息KafkaProducerTest2.java

package cn.itcast.kafka;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Kafka的生产者程序* 会将消息创建出来,并发送到Kafka集群中** 1. 创建用于连接Kafka的Properties配置* Properties props = new Properties();* props.put("bootstrap.servers", "192.168.88.100:9092");* props.put("acks", "all");* props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");* props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");** 2. 创建一个生产者对象KafkaProducer* 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值* 4. 再调用一个Future.get()方法等待响应* 5. 关闭生产者*/
public class KafkaProducerTest2 {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建用于连接Kafka的Properties配置Properties props = new Properties();props.put("bootstrap.servers", "node1.itcast.cn:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建一个生产者对象KafkaProducerKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);int MAX = 10000000;// 3. 发送1-100的消息到指定的topic中for(int i = 1000000; i < MAX; ++i) {// 一、使用同步等待的方式发送消息// // 构建一条消息,直接new ProducerRecord// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");// Future<RecordMetadata> future = kafkaProducer.send(producerRecord);// // 调用Future的get方法等待响应// future.get();// System.out.println("第" + i + "条消息写入成功!");// 二、使用异步回调的方式发送消息ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test1", null, i + "");kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 1. 判断发送消息是否成功if(exception == null) {// 发送成功// 主题String topic = metadata.topic();// 分区idint partition = metadata.partition();// 偏移量long offset = metadata.offset();System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);}else {// 发送出现错误System.out.println("生产消息出现异常!");// 打印异常消息System.out.println(exception.getMessage());// 打印调用栈System.out.println(exception.getStackTrace());}}});Thread.sleep(1000);}// 4.关闭生产者kafkaProducer.close();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com