订阅类A
package com.hdx.master.listener;import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;import java.util.List;@Slf4j
@Component
public class AMessageListener extends MessageListenerAdapter {@AutowiredRedisTemplate<String, Object> redisTemplate;@SneakyThrows@Overridepublic void onMessage(Message message, byte[] pattern) {List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());}
}
订阅类B
package com.hdx.master.listener;import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;import java.util.List;@Slf4j
@Component
public class BMessageListener extends MessageListenerAdapter {@AutowiredRedisTemplate<String, Object> redisTemplate;@SneakyThrows@Overridepublic void onMessage(Message message, byte[] pattern) {List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());}
}
订阅的主题的配置类
package com.hdx.master.socket.client;import com.hdx.master.listener.AMessageListener;
import com.hdx.master.listener.BMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class OldDataHandle implements InitializingBean {@Autowiredprivate RedisMessageListenerContainer container;@Autowiredprivate AMessageListener aMessageListener;@Autowiredprivate BMessageListener bMessageListener;@Overridepublic void afterPropertiesSet() throws Exception {subscribe();}public void subscribe() {container.addMessageListener(aMessageListener, new PatternTopic("aTOPIC"));container.addMessageListener(bMessageListener, new PatternTopic("bTOPIC"));}
}
redis配置类
package com.hdx.master.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;@Configuration
@EnableRedisRepositories
public class RedisConfig {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Beanpublic ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(16); executor.setMaxPoolSize(500); executor.setQueueCapacity(10000); executor.setThreadFactory(new CustomizableThreadFactory("RedisPubSub-exec-"));executor.initialize(); return executor;}@Beanpublic RedisMessageListenerContainer messageListenerContainer(Executor threadPoolTaskExecutor) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);container.setTaskExecutor(threadPoolTaskExecutor);return container;}@Bean@Primarypublic RedisTemplate<String, Object> redisTemplate() {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);redisTemplate.setKeySerializer(keySerializer());redisTemplate.setHashKeySerializer(keySerializer());redisTemplate.setValueSerializer(jacksonSerializer());redisTemplate.setHashValueSerializer(jacksonSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}public RedisSerializer<String> keySerializer() {return new StringRedisSerializer();}private Jackson2JsonRedisSerializer jacksonSerializer() {Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);return jackson2JsonRedisSerializer;}}
测试使用mvc接口发送消息进行发布订阅
package com.hdx.master.controller;import com.hdx.master.common.HttpResult;
import com.hdx.master.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
@RequestMapping("test")
public class PublishController {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@PostMapping("/publish")public HttpResult publish(String message) {redisTemplate.convertAndSend("aTOPIC", message);return HttpResult.successMsg("发布成功");}}