1.加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.加配置
spring:kafka:bootstrap-servers: 192.168.101.129:9092 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.启动类加注解@EnableKafka(最好加上)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
@EnableKafka
public class Springbootdemo3Application {public static void main(String[] args) {SpringApplication.run(Springbootdemo3Application.class, args);}}
4. 编写生产者和消费者,可以测测看
4.1 生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
@RequestMapping("/kafka-test")
public class KafkaTestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public String sendMessage(String param) throws ExecutionException, InterruptedException {String topic = "test-kafka";String message = param;ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);SendResult<String, String> result = send.completable().get();System.out.println("消息发送成功: " + result.toString());return result.toString();}
}
4.2 消费者
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {@KafkaListener(topics = "test-kafka", groupId = "my-group")public void listen(String message) {System.out.println("kafak接收消息 " + message);}
}
5.测试成功
