Netty的常用編解碼器與使用
本文轉(zhuǎn)載自微信公眾號(hào)「源碼學(xué)徒」,作者皇甫嗷嗷叫。轉(zhuǎn)載本文請(qǐng)聯(lián)系源碼學(xué)徒公眾號(hào)。
我們本章節(jié)將了解基本的編解碼器以及自定義編解碼器的使用,在了解之前,我們先看一段代碼:
一、開發(fā)服務(wù)端
1.開發(fā)服務(wù)端的Handler
- /**
- * *********************************************************************
- * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
- * *********************************************************************
- *
- * @author huangfu
- * @date 2021/5/6 21:22
- */
- public class CodecServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- //開啟一個(gè)定時(shí)任務(wù)
- ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
- ByteBufAllocator aDefault = ByteBufAllocator.DEFAULT;
- ByteBuf byteBuf = aDefault.directBuffer();
- //向客戶端寫一句話
- byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8));
- ctx.writeAndFlush(byteBuf);
- }, 10, 10, TimeUnit.MILLISECONDS);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- super.exceptionCaught(ctx, cause);
- }
- }
2. 開發(fā)服務(wù)端的Server
- /**
- * *********************************************************************
- * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
- * *********************************************************************
- *
- * @author huangfu
- * @date 2021/5/6 21:20
- */
- public class CodecServer {
- public static void main(String[] args) throws InterruptedException {
- EventLoopGroup boss = new NioEventLoopGroup(1);
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(boss, worker)
- .channel(NioServerSocketChannel.class)
- .localAddress(8989)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("codecHandler", new CodecHandler());
- }
- });
- ChannelFuture channelFuture = serverBootstrap.bind().sync();
- channelFuture.channel().closeFuture().sync();
- } finally {
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
二、開發(fā)客戶端
1.開發(fā)客戶端的Handler
- /**
- * *********************************************************************
- * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
- * *********************************************************************
- *
- * @author huangfu
- * @date 2021/5/6 21:31
- */
- public class CodecClientHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("連接成功");
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
- super.channelRead(ctx, msg);
- }
- }
2.開發(fā)客戶端
- /**
- * *********************************************************************
- * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
- * *********************************************************************
- *
- * @author huangfu
- * @date 2021/5/6 21:29
- */
- public class CodecClient {
- public static void main(String[] args) throws InterruptedException {
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker)
- .remoteAddress(new InetSocketAddress("127.0.0.1",8989))
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("codecClientHandler",new CodecClientHandler());
- }
- });
- ChannelFuture channelFuture = bootstrap.connect().sync();
- channelFuture.channel().closeFuture().sync();
- }finally {
- worker.shutdownGracefully();
- }
- }
- }
三、結(jié)果演示
上述的代碼相信大家都極其熟悉,就是開發(fā)一個(gè)服務(wù)端和客戶端,當(dāng)客戶端連接到服務(wù)端之后,服務(wù)端每隔10毫秒向客戶端輸出一句話,客戶端收到之后打印出來(lái)!
預(yù)期結(jié)果:
實(shí)際結(jié)果:
我們發(fā)現(xiàn),真正跑起來(lái),卻并沒(méi)有按照我們預(yù)期那樣逐行打印,而是好幾行連在一起打印,而且有些字符還出現(xiàn)了亂碼,這是為什么呢?
了解過(guò)網(wǎng)絡(luò)傳輸?shù)耐瑢W(xué)大概都明白,Socket其實(shí)也是TCP的一種,底層通過(guò)流的方式傳輸,由服務(wù)端發(fā)送的數(shù)據(jù)到客戶端,客戶端的Netty需要重新拼裝為一個(gè)完整的包:
- 當(dāng)傳輸?shù)臄?shù)據(jù)量過(guò)大的時(shí)候,Netty就 分多從拼裝,這就造成了亂碼的現(xiàn)象! 這種現(xiàn)象,術(shù)語(yǔ)叫做半包
- 當(dāng)Netty讀取的時(shí)候,一次讀取了兩個(gè)數(shù)據(jù)包,那就會(huì)自動(dòng)將兩個(gè)數(shù)據(jù)包合為一個(gè)數(shù)據(jù)包,從而完成封裝為一個(gè)數(shù)據(jù)包,這就是造成好幾行連著打印的問(wèn)題! 這種現(xiàn)象 術(shù)語(yǔ)叫做粘包
四、常用的編解碼器
為什么會(huì)發(fā)生粘包、半包!Netty在解析底層數(shù)據(jù)流轉(zhuǎn)換成ByteBuf,但是當(dāng)請(qǐng)求過(guò)于頻繁的時(shí)候,兩次的請(qǐng)求數(shù)據(jù)可能會(huì)被合并為一個(gè),甚至,一次數(shù)據(jù)合并一個(gè)半的數(shù)據(jù)流,此時(shí)因?yàn)閿?shù)據(jù)流字節(jié)的不完全接收,會(huì)導(dǎo)致讀取數(shù)據(jù)不正確或者亂碼等問(wèn)題!
假設(shè),我們預(yù)先知道了這個(gè)數(shù)據(jù)包的一個(gè)規(guī)則,當(dāng)數(shù)據(jù)包規(guī)則不滿足的情況下等待,超過(guò)數(shù)據(jù)規(guī)則限制的時(shí)候進(jìn)行切分,那么是不是就能夠有效的區(qū)分?jǐn)?shù)據(jù)包的界限,從根本上上解決粘包半包的問(wèn)題?
1. 基于換行符的解碼器
LineBasedFrameDecoder
該代碼將以\n或者\(yùn)r\n 作為區(qū)分?jǐn)?shù)據(jù)包的依據(jù),程序在進(jìn)行數(shù)據(jù)解碼的時(shí)候,會(huì)判斷該當(dāng)前的數(shù)據(jù)包內(nèi)是否存在\n或者\(yùn)r\n,當(dāng)存在的時(shí)候會(huì)截取以\n或者\(yùn)r\n的一段字符,作為一個(gè)完整的數(shù)據(jù)包!
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //增加數(shù)據(jù)包解碼器基于換行符的解碼器
- ch.pipeline().addLast("lineBasedFrameDecoder", new LineBasedFrameDecoder(Integer.MAX_VALUE));
- ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
- }
- });
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
- //增加一個(gè)換行符
- byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!\n".getBytes(StandardCharsets.UTF_8));
- ctx.writeAndFlush(byteBuf);
效果圖:
2. 基于自定義換行符的解碼器
DelimiterBasedFrameDecoder
該代碼將以自定義符號(hào)作為區(qū)分?jǐn)?shù)據(jù)包的依據(jù),程序在進(jìn)行數(shù)據(jù)解碼的時(shí)候,會(huì)判斷該當(dāng)前的數(shù)據(jù)包內(nèi)是否存在指定的自定義的符號(hào),當(dāng)存在的時(shí)候會(huì)截取以自定義符號(hào)為結(jié)尾的一段字符,作為一個(gè)完整的數(shù)據(jù)包!
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ByteBuf byteBuf = Unpooled.copiedBuffer("|".getBytes(StandardCharsets.UTF_8));
- ch.pipeline().addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, byteBuf));
- ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
- }
- });
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
- //末尾增加一個(gè)指定的字符
- byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!|".getBytes(StandardCharsets.UTF_8));
- ctx.writeAndFlush(byteBuf);
效果圖:
3. 基于固定長(zhǎng)度的解碼器
FixedLengthFrameDecoder
定長(zhǎng)數(shù)據(jù)解碼器適用于每次發(fā)送的數(shù)據(jù)包是一個(gè)固定長(zhǎng)度的場(chǎng)景,指定每次讀取的數(shù)據(jù)包的數(shù)據(jù)長(zhǎng)度來(lái)進(jìn)行解碼操作!
我們查看我們的數(shù)據(jù)總共長(zhǎng)度是多少:
- 無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!
經(jīng)過(guò)計(jì)算為213各字符,我們假設(shè)以后的數(shù)據(jù)都是這個(gè),我們就可以使用固定字符串,作為區(qū)分一個(gè)完整數(shù)據(jù)包的依據(jù):
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //指定一個(gè)完整數(shù)據(jù)包的長(zhǎng)度為213個(gè)
- ch.pipeline().addLast("fixedLengthFrameDecoder", new FixedLengthFrameDecoder(213));
- ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
- }
- });
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
- //發(fā)送原數(shù)據(jù) 不做任何更改
- byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8));
- ctx.writeAndFlush(byteBuf);
效果圖:
4. 基于不定長(zhǎng)的解碼器
LengthFieldBasedFrameDecoder
不定長(zhǎng)長(zhǎng)度域解碼器的使用是用在我們不確定數(shù)據(jù)包的大小的場(chǎng)景下,這也是比較常用的一個(gè)解碼器
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
- ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
- }
- });
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
- byte[] bytes = "無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8);
- byteBuf.writeInt(bytes.length);
- byteBuf.writeBytes(bytes);
- ctx.writeAndFlush(byteBuf);
他的參數(shù)比較多,我們做幾個(gè)基本的認(rèn)識(shí):
maxFrameLength:本次能接收的最大的數(shù)據(jù)長(zhǎng)度
lengthFieldOffset:設(shè)置的長(zhǎng)度域的偏移量,長(zhǎng)度域在數(shù)據(jù)包的起始位置,所以偏移量為0
lengthFieldLength:長(zhǎng)度域的長(zhǎng)度,例子使用的是Int占4位 所以參數(shù)為4
lengthAdjustment:數(shù)據(jù)包的偏移量,計(jì)算方式=數(shù)據(jù)長(zhǎng)度 +lengthAdjustment=數(shù)據(jù)總長(zhǎng)度 這里數(shù)據(jù)包的總長(zhǎng)度=lengthFieldLength ,所以不需要補(bǔ)充,所以參數(shù)為0
initialBytesToStrip:需要跳過(guò)的字節(jié)數(shù),這里我們只關(guān)注真正的數(shù)據(jù),不關(guān)注數(shù)據(jù)包的長(zhǎng)度,所以我們把長(zhǎng)度域跳過(guò)去,長(zhǎng)度域?yàn)?,所以跳過(guò)4
效果圖:
5. 自定義編解碼器
I. ByteToMessageDecoder
需求:我們需要在解碼器中就將ByteBuf解碼,并轉(zhuǎn)成字符串,后面直接打印
開發(fā)一個(gè)自定義的解碼器:
- /**
- * *********************************************************************
- * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
- * 自定義一個(gè)基于固定長(zhǎng)度的解碼器,當(dāng)解碼成功后,將數(shù)據(jù)轉(zhuǎn)成字符串
- * *********************************************************************
- *
- * @author huangfu
- * @date 2021/5/7 22:43
- */
- public class MyByteToMessageDecoder extends ByteToMessageDecoder {
- private Integer length;
- public MessageEqualDecoder(Integer length) {
- this.length = length;
- }
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- //當(dāng)前的可讀字節(jié)數(shù)
- int readableBytes = in.readableBytes();
- //當(dāng)可讀字節(jié)數(shù)超過(guò)預(yù)設(shè)數(shù)量的時(shí)候
- if(readableBytes >= length) {
- byte[] bytes = new byte[length];
- //讀取出來(lái)
- in.readBytes(bytes);
- //轉(zhuǎn)換成字符串 并添加進(jìn)集合中
- out.add(new String(bytes, StandardCharsets.UTF_8));
- }
- }
- }
客戶端處理器開發(fā):
CodecClientHandler
- /**
- * *********************************************************************
- * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
- * *********************************************************************
- *
- * @author huangfu
- * @date 2021/5/6 21:31
- */
- public class CodecClientHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("連接成功");
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- //解碼器已經(jīng)將數(shù)據(jù)轉(zhuǎn)換成字符串了,這里直接強(qiáng)壯為字符串使用
- String msgStr = (String) msg;
- System.out.println(msgStr);
- super.channelRead(ctx, msg);
- }
- }
客戶端開發(fā):
CodecClient
- public class CodecClient {
- public static void main(String[] args) throws InterruptedException {
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker)
- .remoteAddress(new InetSocketAddress("127.0.0.1", 8989))
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //添加自定義的解碼器
- ch.pipeline().addLast("messageEqualDecoder", new MyByteToMessageDecoder(213));
- ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
- }
- });
- ChannelFuture channelFuture = bootstrap.connect().sync();
- channelFuture.channel().closeFuture().sync();
- } finally {
- worker.shutdownGracefully();
- }
- }
- }
效果圖:
II. MessageToMessageDecoder
需求:我們?cè)偕厦孀远x的解碼器的基礎(chǔ)上增加一個(gè)需求,要求上一個(gè)解碼器解碼出來(lái)的數(shù)據(jù),在傳播到客戶端的時(shí)候,需用[]包裹住。
開發(fā)自定義的消息轉(zhuǎn)換器(泛型為String的原因是 上一個(gè)解碼器已經(jīng)將其轉(zhuǎn)換為了String):
- /**
- * 將消息用[]包裹起來(lái)
- *
- * @author huangfu
- * @date 2021年5月8日08:25:21
- */
- public class MyMessageToMessageDecoder extends MessageToMessageDecoder<String> {
- @Override
- protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
- if(!StringUtil.isNullOrEmpty(msg)){
- out.add(String.format("[%s]", msg));
- }
- }
- }
客戶端開發(fā):
CodecClient
- /**
- * *********************************************************************
- * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
- * *********************************************************************
- *
- * @author huangfu
- * @date 2021/5/6 21:29
- */
- public class CodecClient {
- public static void main(String[] args) throws InterruptedException {
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker)
- .remoteAddress(new InetSocketAddress("127.0.0.1",8989))
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //添加自定義的解碼器
- ch.pipeline().addLast("messageEqualDecoder", new MyByteToMessageDecoder(213));
- ch.pipeline().addLast("myMessageToMessageDecoder", new MyMessageToMessageDecoder());
- ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
- }
- });
- ChannelFuture channelFuture = bootstrap.connect().sync();
- channelFuture.channel().closeFuture().sync();
- }finally {
- worker.shutdownGracefully();
- }
- }
- }
效果圖:
6. 心跳檢測(cè)
我們現(xiàn)在假設(shè)有一個(gè)客戶端與服務(wù)端,客戶端與服務(wù)端進(jìn)行數(shù)據(jù)交互,服務(wù)端探測(cè)到客戶端5秒沒(méi)有發(fā)送數(shù)據(jù) 3次以上關(guān)閉連接!
開發(fā)一個(gè)心跳服務(wù)端處理器
- /**
- * 心跳處理的Handler
- *
- * @author huangfu
- * @date 2021年5月8日09:03:46
- */
- public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
- /**
- * 讀空閑次數(shù)
- */
- private int readIdleTimes = 0;
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("客戶端連接:"+ ctx.channel().remoteAddress());
- super.channelActive(ctx);
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- String string = byteBuf.toString(StandardCharsets.UTF_8);
- System.out.println(string);
- //有數(shù)據(jù) 次數(shù)歸0
- readIdleTimes = 0;
- super.channelRead(ctx, msg);
- }
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
- if (idleStateEvent.state() == IdleState.READER_IDLE) {
- System.out.println("發(fā)生讀空閑");
- readIdleTimes++;
- }
- //3次讀空閑之后,關(guān)閉客戶端連接
- if (readIdleTimes > 3) {
- //關(guān)閉客戶端連接
- System.out.println("客戶端連接被關(guān)閉:"+ ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- }
- }
開發(fā)一個(gè)心跳服務(wù)端
- /**
- * 心跳服務(wù)器
- *
- * @author huangfu
- * @date 2021年5月8日08:52:56
- */
- public class HeartBeatServer {
- public static void main(String[] args) {
- EventLoopGroup boss = new NioEventLoopGroup(1);
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(boss,worker)
- .channel(NioServerSocketChannel.class)
- .localAddress(8989)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //心跳觸發(fā)器 讀空閑 寫空閑 讀寫空閑5秒的均會(huì)觸發(fā)心跳事件
- ch.pipeline().addLast(new IdleStateHandler(5,5,5, TimeUnit.SECONDS));
- ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4));
- //定義處理器
- ch.pipeline().addLast(new HeartBeatServerHandler());
- }
- });
- ChannelFuture channelFuture = bootstrap.bind().sync();
- channelFuture.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
開發(fā)一個(gè)心跳客戶端處理器
- /**
- * 客戶端心跳處理
- *
- * @author huangfu
- * @date 2021年5月8日09:29:05
- */
- public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("通道被激活");
- super.channelActive(ctx);
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("通道被銷毀");
- super.channelInactive(ctx);
- }
- }
開發(fā)一個(gè)心跳客戶端
- /**
- * 心跳消息服務(wù)
- *
- * @author huangfu
- * @date 2021年5月8日09:37:07
- */
- public class HeartBeatClient {
- private static Channel channel = null;
- private static Scanner sc = new Scanner(System.in);
- public static void main(String[] args) {
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker)
- .channel(NioSocketChannel.class)
- .remoteAddress("127.0.0.1",8989)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //長(zhǎng)度解碼器
- ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4));
- ch.pipeline().addLast(new HeartBeatClientHandler());
- }
- });
- //連接服務(wù)端
- ChannelFuture channelFuture = bootstrap.connect().sync();
- channel = channelFuture.channel();
- Thread thread = new Thread(HeartBeatClient::writeStr);
- thread.setDaemon(true);
- thread.start();
- channel.closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- worker.shutdownGracefully();
- }
- }
- /**
- * 向服務(wù)端寫入數(shù)據(jù)
- */
- public static void writeStr(){
- while (true) {
- System.out.print("請(qǐng)輸入要發(fā)送的數(shù)據(jù):");
- //從鍵盤讀入數(shù)據(jù)
- String line = sc.nextLine();
- ByteBuf buffer = Unpooled.buffer();
- buffer.writeInt(line.length());
- buffer.writeBytes(line.getBytes(StandardCharsets.UTF_8));
- //發(fā)送數(shù)據(jù)
- channel.writeAndFlush(buffer).addListener(future -> {
- if (future.isSuccess()) {
- System.out.println("發(fā)送成功");
- }
- });
- }
- }
- }