SpringBoot+Netty+Websocket实现后台向前端推送信息
学过 Netty 的都知道,Netty 对 NIO 进行了很好的封装,简单的 API,庞大的开源社区。深受广大程序员喜爱。基于此本文分享一下基础的 netty 使用。实战制作一个 Netty + websocket 的消息推送小栗子。
netty服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Component public class NettyServer {
static final Logger log = LoggerFactory.getLogger(NettyServer.class);
@Value("${webSocket.netty.port:8888}") int port;
EventLoopGroup bossGroup; EventLoopGroup workGroup;
@Autowired ProjectInitializer nettyInitializer;
@PostConstruct public void start() throws InterruptedException { new Thread(() -> { bossGroup = new NioEventLoopGroup(); workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.localAddress(new InetSocketAddress(port)); bootstrap.childHandler(nettyInitializer);
ChannelFuture channelFuture = null; try { channelFuture = bootstrap.bind().sync(); log.info("Server started and listen on:{}", channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
@PreDestroy public void destroy() throws InterruptedException { if (bossGroup != null) { bossGroup.shutdownGracefully().sync(); } if (workGroup != null) { workGroup.shutdownGracefully().sync(); } } }
|
Netty配置
管理全局Channel以及用户对应的channel(推送消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class NettyConfig {
private static volatile ChannelGroup channelGroup = null;
private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
private static final Object lock1 = new Object(); private static final Object lock2 = new Object();
public static ChannelGroup getChannelGroup() { if (null == channelGroup) { synchronized (lock1) { if (null == channelGroup) { channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } } } return channelGroup; }
public static ConcurrentHashMap<String, Channel> getChannelMap() { if (null == channelMap) { synchronized (lock2) { if (null == channelMap) { channelMap = new ConcurrentHashMap<>(); } } } return channelMap; }
public static Channel getChannel(String userId) { if (null == channelMap) { return getChannelMap().get(userId); } return channelMap.get(userId); } }
|
管道配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Component public class ProjectInitializer extends ChannelInitializer<SocketChannel> {
static final String WEBSOCKET_PROTOCOL = "WebSocket";
@Value("${webSocket.netty.path:/webSocket}") String webSocketPath; @Autowired WebSocketHandler webSocketHandler;
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(8192)); pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10)); pipeline.addLast(webSocketHandler); } }
|
自定义handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @Component @ChannelHandler.Sharable public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText()); NettyConfig.getChannelGroup().add(ctx.channel()); }
@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("服务器收到消息:{}", msg.text());
JSONObject jsonObject = JSONUtil.parseObj(msg.text()); String uid = jsonObject.getStr("uid"); NettyConfig.getChannelMap().put(uid, ctx.channel());
AttributeKey<String> key = AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(uid);
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦")); }
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("用户下线了:{}", ctx.channel().id().asLongText()); NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("异常:{}", cause.getMessage()); NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); }
private void removeUserId(ChannelHandlerContext ctx) { AttributeKey<String> key = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(key).get(); NettyConfig.getChannelMap().remove(userId); } }
|
推送消息接口及实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public interface PushMsgService {
void pushMsgToOne(String userId, String msg);
void pushMsgToAll(String msg);
} @Service public class PushMsgServiceImpl implements PushMsgService {
@Override public void pushMsgToOne(String userId, String msg) { Channel channel = NettyConfig.getChannel(userId); if (Objects.isNull(channel)) { throw new RuntimeException("未连接socket服务器"); }
channel.writeAndFlush(new TextWebSocketFrame(msg)); }
@Override public void pushMsgToAll(String msg) { NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg)); } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.netty.controller;
import com.netty.service.PushMsgService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/push") public class TestController {
@Autowired PushMsgService pushMsgService;
@GetMapping("/{uid}") public void pushOne(@PathVariable String uid) { pushMsgService.pushMsgToOne(uid, "hello"); }
}
|