VUE代码
// 初始化EventSourceinitEventSource(url) {const token = getAccessToken();const eventSource = new EventSourcePolyfill(url, {headers: {'Authorization': `Bearer ${token}`,'tenant-id': getTenantId(),}});eventSource.onerror = (e) => {console.log("SSE连接错误", e);if (e.readyState === EventSource.CLOSED || eventSource.readyState === EventSource.CONNECTING) {console.log("SSE连接已关闭或正在重连");} else {// 当发生错误时,尝试重新连接if (reconnectAttempts < maxReconnectAttempts) {console.log(`尝试第${reconnectAttempts + 1}次重连`);reconnectAttempts++;setTimeout(() => {eventSource.close(); // 关闭当前连接this.initEventSource(url); // 重新初始化EventSource}, reconnectDelay * reconnectAttempts);} else {console.error("达到最大重连次数,不再尝试重连");}}};eventSource.addEventListener("message", res => {const data = JSON.parse(res.data)if (data.type == 2) {this.unreadCount = data.content;}if (data.type == 1) {this.createNotify(data)}})},
后端采用redis做管道,能够兼容分布式服务
JAVA 监听接口
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamEvents() {Long loginUserId = SecurityFrameworkUtils.getLoginUserId();USER_IDS.add(loginUserId);SseMessageVO heartbeat = new SseMessageVO().setType(SseNotifyTypeEnum.HEARTBEAT.getType()).setUserId(loginUserId).setContent("Heartbeat");return reactiveRedisOperations.listenToChannel(SseService.getDestination(loginUserId)).map(data -> sendMsg(loginUserId,data.getMessage(),heartbeat)).publishOn(Schedulers.boundedElastic()).doOnSubscribe(subscription -> {// 订阅时发送一次心跳,确认连接heartbeat(heartbeat);});}private String sendMsg(Long loginUserId,String sseMessage,SseMessageVO heartbeat){SseMessageVO sseMessageVO = JSONUtil.toBean(sseMessage, SseMessageVO.class);if (null != sseMessageVO && Objects.equals(sseMessageVO.getUserId(), loginUserId)){return JSONUtil.toJsonStr(sseMessageVO);}return JSONUtil.toJsonStr(heartbeat);}/*** 登录时心跳*/private void heartbeat(SseMessageVO heartbeat) {ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();executorService.schedule(() -> {sseService.publishEventToChannel(heartbeat).subscribe();}, 1, TimeUnit.SECONDS);executorService.shutdown();}/*** 保活*/@PostConstructpublic void heartbeatTimer() {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {if (CollectionUtil.isNotEmpty(USER_IDS)){for (Long userId : USER_IDS) {String message = "Heartbeat at " + LocalDateTime.now();SseMessageVO heartbeat = new SseMessageVO().setType(SseNotifyTypeEnum.HEARTBEAT.getType()).setUserId(userId).setContent(message);sseService.publishEventToChannel(heartbeat).subscribe();}}}, 0, 10, TimeUnit.SECONDS);}
JAVA 提交数据服务
@Component
public class SseService {@Resourceprivate ReactiveRedisOperations<String, String> reactiveRedisOperations;private static final String DESTINATION = "event-channel-user:";/*** 获取指定通道* @param userId* @return*/public static String getDestination(Long userId){return DESTINATION+userId;}/*** 推送事件到通道* @param sseMessageVO* @return*/public Mono<Long> publishEventToChannel(SseMessageVO sseMessageVO) {return reactiveRedisOperations.convertAndSend(getDestination(sseMessageVO.getUserId()), JSONUtil.toJsonStr(sseMessageVO));}}