您现在的位置是:亿华云 > 数据库

Server-Sent Events (SSE) 技术解析与实战

亿华云2025-10-02 16:33:38【数据库】9人已围观

简介前言在当今互联网应用中,实时数据交互已成为关键需求。从AI聊天机器人到实时通知系统,从股票行情更新到协作编辑工具,都需要服务器能够主动向客户端推送数据。Server-Sent Events作为一种基于

前言

在当今互联网应用中,术解实时数据交互已成为关键需求。析实从AI聊天机器人到实时通知系统,术解从股票行情更新到协作编辑工具,析实都需要服务器能够主动向客户端推送数据。术解Server-Sent Events作为一种基于HTTP的析实单向服务器推送技术,在这些场景中展现出独特的术解优势。

SSE特别适合以下场景:

AI聊天机器人:服务器逐步返回AI生成的析实回答实时通知系统:新消息、提醒等通知推送日志监控:实时展示系统日志输出数据更新:股票价格、术解天气信息等实时更新效果图

图片

基础概念

Server-Sent Events是析实一种允许服务器向客户端推送实时事件的技术。与传统的术解HTTP请求 - 响应模式不同,SSE建立的析实是一个持久的HTTP连接,服务器可以在任何时候通过这个连接向客户端推送数据。术解

SSE具有以下特点:

基于标准HTTP协议,析实无需额外协议支持单向通信:仅服务器向客户端推送数据轻量级:相比WebSocket,云服务器提供商术解实现更简单支持自动重连:连接断开时客户端可自动尝试重新连接支持事件类型:可以推送不同类型的事件工作原理

SSE的工作流程如下:

复制客户端 服务端 | | |-- GET /events -------->| 建立连接 |<-- 200 OK -------------| 返回事件流 |<-- data: message1 -----| 推送消息1 |<-- data: message2 -----| 推送消息2 |<-- ... ---| 持续推送1.2.3.4.5.6.7. 客户端通过EventSource API向服务器发起请求服务器建立连接并保持该连接打开服务器可以随时通过该连接向客户端推送事件数据当连接断开时,客户端会自动尝试重新连接服务器可以通过特定响应关闭连接

SSE数据格式采用简单的文本格式,使用data:前缀表示数据内容,event:前缀表示事件类型,例如:

复制event: message data: Hello, this is a server-sent event! data: This is another message without event type1.2.3.4. SSE 与其他实时技术的对比

技术

连接类型

双向通信

实现复杂度

浏览器支持

适用场景

SSE

单向 HTTP

良好

服务器主动推送,单向数据流

WebSocket

双向 TCP

良好

双向实时通信,高交互场景

轮询

多次 HTTP

良好

简单场景,对实时性要求不高

长轮询

单次 HTTP

良好

服务器主动推送,中等实时性要求

实现 SSE

创建 SSE 控制器

以下使用预设的中文回复示例,实际应用中这里会调用AI API

复制@RestController @RequestMapping("/api/sse") public class SseController { // 存储所有活跃的SSE连接 private final Map<String, SseEmitter> clients = new HashMap<>(); // 用于处理消息生成的线程池 private final ExecutorService executor = Executors.newFixedThreadPool(10); // 消息计数器,用于生成唯一消息ID private final AtomicInteger messageCounter = new AtomicInteger(0); // 超时时间设置为30分钟,避免频繁断开重连 private static final long TIMEOUT = 30 * 60 * 1000L; @GetMapping("/connect/{ clientId}") public SseEmitter connect(@PathVariable String clientId) { // 检查是否已有相同客户端ID的连接 if (clients.containsKey(clientId)) { clients.get(clientId).complete(); } // 创建新的SseEmitter实例 SseEmitter emitter = new SseEmitter(TIMEOUT); // 添加心跳机制,亿华云计算每20秒发送一次心跳消息 executor.submit(() -> { try { while (true) { Thread.sleep(20000); if (emitter != null && clients.containsKey(clientId)) { emitter.send(SseEmitter.event().name("heartbeat").data("ping")); } else { break; } } } catch (Exception e) { emitter.completeWithError(e); } }); emitter.onTimeout(() -> { System.out.println("客户端连接超时: " + clientId); clients.remove(clientId); emitter.complete(); }); emitter.onError(e -> { System.out.println("客户端连接错误: " + clientId + ", 错误: " + e.getMessage()); clients.remove(clientId); emitter.completeWithError(e); }); // 注册完成事件处理器 emitter.onCompletion(() -> { System.out.println("SSE处理完成: " + clientId); clients.remove(clientId); }); clients.put(clientId, emitter); sendSystemMessage(emitter, "连接已建立,您可以开始与AI助手聊天了。"); return emitter; } @GetMapping("/chat/{ clientId}/{ message}") public void sendChatMessage(@PathVariable String clientId, @PathVariable String message) { SseEmitter emitter = clients.get(clientId); if (emitter == null) { return; } // 在单独线程中处理消息,避免阻塞主线程 executor.submit(() -> { try { String response = generateAiResponse(message); int messageId = messageCounter.incrementAndGet(); // 发送消息开始事件 emitter.send(SseEmitter.event() .name("messageStart") .data(MapUtil.builder() .put("id", messageId) .put("clientId", clientId) .build())); // 模拟AI逐步生成响应 String[] parts = response.split("(?<=。|!|?|\\.|!|\\?)"); for (String part : parts) { if (part.trim().isEmpty()) continue; // 发送消息片段 emitter.send(SseEmitter.event() .name("messageFragment") .data(MapUtil.builder() .put("id", messageId) .put("content", part) .put("isLast", false) .build())); // 模拟思考时间 Thread.sleep(500 + (long)(Math.random() * 1000)); } // 发送消息结束事件 emitter.send(SseEmitter.event() .name("messageEnd") .data(MapUtil.builder() .put("id", messageId) .put("clientId", clientId) .build())); } catch (Exception e) { try { emitter.send(SseEmitter.event() .name("error") .data("生成AI回复时出错: " + e.getMessage())); } catch (IOException ex) { ex.printStackTrace(); } } }); } @PostMapping("/ack/{ clientId}/{ messageId}") public void acknowledgeMessage( @PathVariable String clientId, @PathVariable int messageId ) { System.out.println("收到消息确认: " + messageId + " 来自客户端: " + clientId); // 这里可以记录消息已确认,进行相应处理 } @GetMapping("/disconnect/{ clientId}") public void disconnect(@PathVariable String clientId) { SseEmitter emitter = clients.get(clientId); if (emitter != null) { clients.remove(clientId); emitter.complete(); } } private void sendSystemMessage(SseEmitter emitter, String content) { try { emitter.send(SseEmitter.event() .name("systemMessage") .data(MapUtil.of("content", content))); } catch (IOException e) { e.printStackTrace(); } } private String generateAiResponse(String prompt) { // 实际应用中这里会调用AI API // 这里使用预设的中文回复示例 String lowerCasePrompt = prompt.toLowerCase(); if (lowerCasePrompt.contains("你好") || lowerCasePrompt.contains("hi") || lowerCasePrompt.contains("hello")) { return"你好!我是AI助手,很高兴为你服务。"; } elseif (lowerCasePrompt.contains("一安") || lowerCasePrompt.contains("一安未来")) { return"一安未来公众号致力于Java,大数据;心得交流,技术分享;"; } else { return"很有意思的问题!让我思考一下... " + prompt + " 是一个很好的话题。我可以从多个角度为你分析和解答,你是否想了解更多相关信息?"; } } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.137.138.139.140.141.142.143.144.145.146.147.148.149.150.151.152.153.154.155.156.157.158.159.160.161.162.163. 配置 CORS 复制@Configuration public class CorsConfig { @Bean public CorsFilter corsFilter() { CorsConfiguration config = new CorsConfiguration(); config.addAllowedOriginPattern("*"); // 允许所有域名进行跨域调用 config.addAllowedHeader("*"); // 允许任何请求头 config.addAllowedMethod("*"); // 允许任何方法(POST、GET等) config.setAllowCredentials(true); // 允许携带凭证 UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(); source.registerCorsConfiguration("/**", config); // 对所有接口都有效 return new CorsFilter(source); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.

很赞哦!(5229)