您的位置:首页 > 文旅 > 美景 > 建设工程官方网站_建站工具帝国_郑州seo哪家专业_接单平台app

建设工程官方网站_建站工具帝国_郑州seo哪家专业_接单平台app

2024/12/23 1:34:04 来源:https://blog.csdn.net/qq_46637011/article/details/142227342  浏览:    关键词:建设工程官方网站_建站工具帝国_郑州seo哪家专业_接单平台app
建设工程官方网站_建站工具帝国_郑州seo哪家专业_接单平台app

上一篇通过配置Canal+MQ的数据同步环境,实现了Canal从数据库读取binlog并且将数据写入MQ。

下边编写程序监听MQ,收到消息后向ES创建索引。

1. 环境准备

  • RabbitMQ:已配置并运行,Canal 已将 MySQL 的 binlog 消息发送到 RabbitMQ 队列中。
  • Elasticsearch:已运行,并可以通过 REST API 访问。
  • Spring Boot 项目:通过 spring-boot-starter-amqpspring-boot-starter-data-elasticsearch 集成 RabbitMQ 和 Elasticsearch。

2. 添加依赖

在 Spring Boot 项目的 pom.xml 中,添加 RabbitMQ 和 Elasticsearch 的依赖:

<dependencies><!-- RabbitMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Elasticsearch Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency>
</dependencies>

3. 配置 application.yml

配置文件中需要包含 RabbitMQ 和 Elasticsearch 的相关信息:

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual   # 手动ACKelasticsearch:rest:uris: http://localhost:9200connection-timeout: 1000socket-timeout: 30000

4. 创建 Elasticsearch 索引

定义一个实体类来映射 Elasticsearch 中的索引:

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;@Document(indexName = "user_index")
public class User {@Idprivate Long id;private String name;private String email;// getters and setters
}

5. 编写 Elasticsearch 的 Repository

创建一个 Elasticsearch 的 Repository,用于保存数据到索引中:

import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;public interface UserRepository extends ElasticsearchRepository<User, Long> {
}

6. 监听 RabbitMQ 队列并同步到 Elasticsearch

创建一个 RabbitMQ 消息监听器,收到消息后解析并将数据存储到 Elasticsearch:

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class CanalToEsSyncService {@Autowiredprivate UserRepository userRepository;@Autowiredprivate RabbitTemplate rabbitTemplate;private ObjectMapper objectMapper = new ObjectMapper();// 监听 Canal 发送到 RabbitMQ 的队列消息@RabbitListener(queues = "example_queue")public void handleCanalMessage(String message) {try {// 假设 Canal 发送的是 JSON 格式的消息,可以用 Jackson 解析User user = objectMapper.readValue(message, User.class);// 将解析的 User 对象存入 ElasticsearchuserRepository.save(user);// 手动确认消息已成功消费System.out.println("Message processed and indexed to Elasticsearch: " + user);} catch (Exception e) {e.printStackTrace();// 处理失败,可以记录日志或根据需要重试}}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com