跳转至

消息实时推送方案

需求:客户端及时收到服务端的消息通知

  • 对于 Gitee 等网站,其消息都是界面刷新时才通知,注意实时的性价比

短轮询

短轮询:指定的时间间隔,由浏览器向服务器发出HTTP请求,服务器实时返回未读消息数据给客户端,浏览器再做渲染显示。

  • 实现:JS定时器;
  • 缺点:推送数据并不会频繁变更,无论后端此时是否有新的消息产生,客户端都会进行请求,势必会对服务端造成很大压力,浪费带宽和服务器资源。

长轮询(Comet)

示例代码见:LongPullingCode

客户端主动拉pull模型,应用长轮询(Long Polling

  • 由服务端控制响应客户端请求的返回时间,来减少客户端无效请求的一种优化手段;
  • 客户端发起请求后,服务端不会立即返回请求结果,而是将请求挂起等待一段时间,如果此段时间内服务端数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间后响应请求,客户端重新发起长链接

长轮询

实现

  • DeferredResultservelet3.0后经过Spring封装提供的一种异步请求机制
  • 允许容器线程快速释放占用的资源,不阻塞请求线程,以此接受更多的请求提升系统的吞吐量。
  • 启动异步工作线程处理真正的业务逻辑,处理完成调用DeferredResult.setResult(200)提交响应结果。
  • 一个ID可能会被多个长轮询请求监听,使用guava包的Multimap结构,一个key对应多个value。一旦监听到key变化,对应的所有长轮询都会响应;
  • 请求超过设置的超时时间,会抛出AsyncRequestTimeoutException异常,全局捕获统一返回,前端获取约定好的状态码后再次发起长轮询请求。
  • 前端得到非请求超时的状态码,知晓数据变更,主动查询未读消息数接口,更新页面数据;

注意

  • 竞态分析:如果在前端重新建立连接时,后端接收到新消息,此时前端还没有建立连接,该消息会丢失,需要根据业务设计相应方案;

缺点

长轮询相比于短轮询在性能上提升了很多,但依然会产生较多的请求,在响应之后,会引起请求突然激增。

SSE

不应该设置 WriteTimeOut,示例代码见:SSE 代码

服务器发送事件(Server-sent events),简称SSE

SSE在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。

注:SSE不支持IE浏览器,对其他主流浏览器兼容性做的还不错。

SSE与Websocket的区别

WebSocket SSE
全双工,可以同时发送和接收消息 单工,只能服务端单向发送消息
独立的协议 基于HTTP协议
需要代理服务器单独支持 代理服务器直接支持
协议相对复杂 协议相对简单,易于理解和使用
默认不支持断线重连 默认支持断线重连
默认支持传送二进制数据 一般只用来传送文本,二进制数据需要编码后传送
不支持自定义发送的数据类型 支持自定义发送的数据类型
支持CORS 不支持CORS,协议和端口都必须相同
  • 以1次/秒或者更快的频率向服务端传输数据,那应该用WebSocket;
  • 客户端和服务端脚本之间具有网络服务器情况时,一个SSE连接不仅使用一个套接字,还会占用一个Apache线程或进程;

实现

客户端:通过 EventSource建立连接;

  • 会自动断线重连(如超时重连),并且会发送Last-Event-ID header

服务端:Springboot 通过 SseEmitter实现;

  • SSE消息的格式为field:value\nfield:value\n\n, field 支持如下
: 即以:开头表示注释可以理解为服务端向客户端发送的心跳确保连接不中断
data数据
event: 事件默认值为 message如果是其它事件前端可以通过 addEventListener 监听不同的事件
id: 数据标识符用 id 字段表示相当于每一条数据的编号
retry: 重连时间

Nginx 转发 SSE

  • nginx upstream 超时连接关闭(upstream timeout)时,前端会自动进行重连;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_http_version 1.1;

# Nginx 默认 OFF
proxy_cache off;

# 应该由 server 端发送 X-Accel-Buffering: no; 
# 针对 proxy_pass 的后端配置,好像不需要配置,即使开启前端也会实时响应
# proxy_buffering off;

# 该配置对于SSE不需要
# chunked_transfer_encoding off;

MQTT

MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网Internet of Thing)中的一个标准传输协议。

将消息的发布者(publisher)与订阅者(subscriber)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似。

实现

https://mp.weixin.qq.com/s/U-fUGr9i1MVa4PoVyiDFCg

WebSocket

TCP连接上进行全双工通信的协议

SOCKJS

当今主流浏览器都支持 Websocket,因此 SOCKJS 不再需要

  • 除非反向代理(如Nginx)的配置不支持WebSocket.

使用 SOCKJS (模拟 Websocket 协议)

  • sockjs-client 提供浏览器兼容性,优先使用原生的WebSocket,如果某个浏览器不支持WebSocket,SockJS会自动降级为SSE 或者 长轮询;

心跳

  • SockJS 协议要求 servers 发送心跳消息,排除代理导致的连接挂起。(Spring SockJS 默认 heartbeatTime 为 25s);
  • When using STOMP over WebSocket and SockJS, if the STOMP client and server negotiate heartbeats to be exchanged, the SockJS heartbeats are disabled.

STOMP

使用 STOMP(定义Websocket的数据格式),消息的订阅和发送。

  • STOMP即Simple Text Orientated Messaging Protocol,简单文本定向消息协议;
  • stompjs 客户端,1.1 版本提供心跳机制(默认10s,但会跟 broker 连接时协调出最终的in/out心跳);
  • 连接稳定性,支持自动重连;

事务支持

// start the transaction
const tx = client.begin();
// send the message in a transaction
client.publish({
  destination: '/queue/test',
  headers: { transaction: tx.id },
  body: 'message in a transaction',
});
// commit the transaction to effectively send the message
tx.commit();

Spring

https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-server

On the Servlet stack, the Spring Framework provides both server (and also client) support for the SockJS protocol;

  • 跨域问题,the default behavior for WebSocket and SockJS is to accept only same-origin requests. ;
  • nginx 配置 proxy_set_header Host $host;
  • 或者 spring 配置 setAllowedOriginPatterns("*")

  • 事件问题:

  • SessionDisconnectEvent需要幂等处理,一个WS连接(Session)可能会被发送多次;

Simple Broker

The built-in simple message broker handles subscription requests from clients, stores them in memory, and broadcasts messages to connected clients that have matching destinations.

纯 WebSocket 的心跳问题(不使用 SockJS):

外部 Broker

支持 RabbitMQ, ActiveMQ 等

认证

通过configureClientInboundChannel()注册消息处理,进行验证。

nginx 转发 Websocket

proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_http_version 1.1;

proxied server 端需要定时发送 PING 消息保证 nginx 不会关闭连接。