您的位置:首页 > 健康 > 美食 > 创建公司多少钱_广告公司企业简介怎么写_汕头网站建设优化_软文营销代理

创建公司多少钱_广告公司企业简介怎么写_汕头网站建设优化_软文营销代理

2025/2/23 20:48:07 来源:https://blog.csdn.net/wangchange/article/details/145701142  浏览:    关键词:创建公司多少钱_广告公司企业简介怎么写_汕头网站建设优化_软文营销代理
创建公司多少钱_广告公司企业简介怎么写_汕头网站建设优化_软文营销代理

目录

一、docker安装和配置Kafka

1.拉取 Zookeeper 的 Docker 镜像

2.运行 Zookeeper 容器

3.拉取 Kafka 的 Docker 镜像

4.运行 Kafka 容器

5.下载 Kafdrop

6.运行 Kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

二、在Spring Boot项目中集成和使用Kafka

1. 添加依赖

2. 配置Kafka

3. 创建消息对象

4. 创建生产者

5. 创建消费者

6. 测试

三、web访问Kafdrop


一、docker安装和配置Kafka

1.拉取 Zookeeper 的 Docker 镜像

docker pull wurstmeister/zookeeper

2.运行 Zookeeper 容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

3.拉取 Kafka 的 Docker 镜像

docker pull wurstmeister/kafka

4.运行 Kafka 容器

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.7.46:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper:zookeeper wurstmeister/kafka

5.下载 Kafdrop

docker pull obsidiandynamics/kafdrop

6.运行 Kafdrop

docker run -p 9000:9000 -d --name kafdrop -e KAFKA_BROKERCONNECT=192.168.7.46:9092 -e JVM_OPTS="-Xms16M -Xmx48M" obsidiandynamics/kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

下载:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
(kafdrop是一个kafka的web图形管理界面)
docker pull obsidiandynamics/kafdrop
打包:
docker save -o ./zookeeper.tar wurstmeister/zookeeper
docker save -o ./kafka.tar wurstmeister/kafka
docker save -o ./kafdrop.tar obsidiandynamics/kafdrop
传输:
scp kafka.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可
scp zookeeper.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可
scp kafdrop.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可

目标机加载成docker镜像
docker load -i /usr/root/kafka/kafka.tar
docker load -i /usr/root/kafka/zookeeper.tar
docker load -i /usr/root/kafka/kafdrop.tar
查看镜像列表
docker images

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

docker exec -it kafka kafka-topics.sh --create --topic users --partitions 1 --replication-factor 1 --bootstrap-server 192.168.7.46:9092

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

docker exec -it kafka kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 192.168.7.46:9092

这个命令,会启动一个额外的 Kafka 消费者来监听 users 主题。这个消费者是通过 Kafka 自带的 kafka-console-consumer.sh 工具启动的,主要用于测试和验证目的。它会持续监听并打印出发送到 users 主题的所有消息。

二、在Spring Boot项目中集成和使用Kafka

1. 添加依赖

首先,在你的pom.xml文件中添加Kafka的依赖:

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

2. 配置Kafka

在application.properties或application.yml文件中配置Kafka的相关属性。这里以application.properties为例:

# Kafka broker地址

spring.kafka.bootstrap-servers=localhost:9092

# 生产者配置

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# 消费者配置

spring.kafka.consumer.group-id=my-group

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.trusted.packages=*

3. 创建消息对象

假设我们要发送和接收一个简单的KafkaMsgs 对象:

public class KafkaMsgs {

    private String id;

    private String msg;

    private Long date;

    // 构造函数、getter和setter省略

}

4. 创建生产者

创建一个生产者类来发送消息:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaProducer {

    @Autowired

    private KafkaTemplate<String, KafkaMsgs> kafkaTemplate;

    public void sendMessage(String topic, KafkaMsgs kafkaMsgs) {

        kafkaTemplate.send(topic, kafkaMsgs);

    }

}

5. 创建消费者

创建一个消费者类来接收消息:

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Service;

@Service

public class KafkaConsumer {

    @KafkaListener(topics = "users", groupId = "my-group")

    public void listen(KafkaMsgs kafkaMsgs) {

        System.out.println("Received message: " + kafkaMsgs);

    }

}

6. 测试

你可以创建一个简单的测试类来验证生产和消费是否正常工作:

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.esop.resurge.core.config.kafka.KafkaProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.airbubble.kingdom.army.reponse.FeedBack;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Api(tags="kafka数据控制器")
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
    @Autowired
    KafkaProducer kafkaProducer;

    @ApiOperation(value = "测试发送数据到kafka", httpMethod = "GET")
    @GetMapping(value = "/sendKafkaData")
    public FeedBack<String> sendKafkaData(
            @ApiParam(value = "topic", required = true) @RequestParam(required = true,value = "topic") String topic,
            @ApiParam(value = "msg", required = true) @RequestParam(required = true,value = "msg") String msg
    ) throws Exception {
        kafkaProducer.sendMessage(topic, new com.esop.resurge.core.config.kafka.KafkaMsgs(
                IdUtil.fastUUID(),
                msg,
                Long.valueOf(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT).replace(" ", "").replace(":", "").replace("-", ""))
        ));
        return FeedBack.getInstance("发送成功");
    }

}

三、web访问Kafdrop

 打开浏览器,访问 http://192.168.7.46:9000,你应该能够看到 Kafdrop 的 Web 界面

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com