1. 安装Kafka
-
下载Kafka:从Kafka官网下载最新版本的Kafka。
-
解压并启动:
-
解压Kafka文件后,进入
bin
目录。 -
启动ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
。 -
启动Kafka:
./kafka-server-start.sh ../config/server.properties
。 -
确认启动成功后,Kafka服务即可使用。
-
2. 创建Spring Boot项目
-
在Spring Initializr创建一个新项目,选择需要的依赖(如Spring Web和Spring Kafka)。
-
下载并解压项目,导入到IDE中。
3. 添加Kafka依赖
在pom.xml
中添加以下依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
这个依赖会自动配置Spring Kafka的相关组件。
4. 配置Kafka
在application.yml
中添加Kafka的配置:
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
这里配置了Kafka服务器地址、消费者组、序列化器等。
5. 创建Kafka生产者
-
创建生产者配置类:
@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
-
创建生产者服务类:
@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(success -> System.out.println("Message sent successfully: " + message),failure -> System.err.println("Failed to send message: " + failure.getMessage()));}
}
通过KafkaTemplate
发送消息。
6. 创建Kafka消费者
-
创建消费者服务类:
@Service
public class KafkaConsumerService {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}
使用@KafkaListener
注解监听指定主题并接收消息。
7. 测试应用
-
创建一个控制器,用于发送消息:
@RestController
public class KafkaController {private final KafkaProducerService kafkaProducerService;public KafkaController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducerService.sendMessage("my-topic", message);return "Message sent";}
}
-
启动Spring Boot应用,通过访问
http://localhost:8080/send?message=HelloKafka
发送消息。
通过以上步骤,你可以在Spring Boot中成功集成并使用Kafka。