自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Netty的常用編解碼器與使用

開發(fā) 前端
我們發(fā)現(xiàn),真正跑起來(lái),卻并沒(méi)有按照我們預(yù)期那樣逐行打印,而是好幾行連在一起打印,而且有些字符還出現(xiàn)了亂碼,這是為什么呢?

[[414980]]

本文轉(zhuǎn)載自微信公眾號(hào)「源碼學(xué)徒」,作者皇甫嗷嗷叫。轉(zhuǎn)載本文請(qǐng)聯(lián)系源碼學(xué)徒公眾號(hào)。

我們本章節(jié)將了解基本的編解碼器以及自定義編解碼器的使用,在了解之前,我們先看一段代碼:

一、開發(fā)服務(wù)端

1.開發(fā)服務(wù)端的Handler

  1. /** 
  2.  * ********************************************************************* 
  3.  * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】 
  4.  * ********************************************************************* 
  5.  * 
  6.  * @author huangfu 
  7.  * @date 2021/5/6 21:22 
  8.  */ 
  9. public class CodecServerHandler extends ChannelInboundHandlerAdapter { 
  10.     @Override 
  11.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  12.         //開啟一個(gè)定時(shí)任務(wù) 
  13.         ctx.channel().eventLoop().scheduleAtFixedRate(() -> { 
  14.             ByteBufAllocator aDefault = ByteBufAllocator.DEFAULT
  15.             ByteBuf byteBuf = aDefault.directBuffer(); 
  16.             //向客戶端寫一句話 
  17.             byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8)); 
  18.             ctx.writeAndFlush(byteBuf); 
  19.         }, 10, 10, TimeUnit.MILLISECONDS); 
  20.     } 
  21.  
  22.     @Override 
  23.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  24.         cause.printStackTrace(); 
  25.         super.exceptionCaught(ctx, cause); 
  26.     } 

2. 開發(fā)服務(wù)端的Server

  1. /** 
  2.  * ********************************************************************* 
  3.  * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】 
  4.  * ********************************************************************* 
  5.  * 
  6.  * @author huangfu 
  7.  * @date 2021/5/6 21:20 
  8.  */ 
  9. public class CodecServer { 
  10.     public static void main(String[] args) throws InterruptedException { 
  11.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  12.         EventLoopGroup worker = new NioEventLoopGroup(); 
  13.  
  14.         try { 
  15.             ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  16.             serverBootstrap.group(boss, worker) 
  17.                     .channel(NioServerSocketChannel.class) 
  18.                     .localAddress(8989) 
  19.                     .childHandler(new ChannelInitializer<SocketChannel>() { 
  20.                         @Override 
  21.                         protected void initChannel(SocketChannel ch) throws Exception { 
  22.                             ch.pipeline().addLast("codecHandler", new CodecHandler()); 
  23.                         } 
  24.                     }); 
  25.             ChannelFuture channelFuture = serverBootstrap.bind().sync(); 
  26.             channelFuture.channel().closeFuture().sync(); 
  27.         } finally { 
  28.             boss.shutdownGracefully(); 
  29.             worker.shutdownGracefully(); 
  30.         } 
  31.     } 

二、開發(fā)客戶端

1.開發(fā)客戶端的Handler

  1. /** 
  2.  * ********************************************************************* 
  3.  * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】 
  4.  * ********************************************************************* 
  5.  * 
  6.  * @author huangfu 
  7.  * @date 2021/5/6 21:31 
  8.  */ 
  9. public class CodecClientHandler extends ChannelInboundHandlerAdapter { 
  10.     @Override 
  11.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  12.         System.out.println("連接成功"); 
  13.     } 
  14.  
  15.     @Override 
  16.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  17.         ByteBuf byteBuf = (ByteBuf) msg; 
  18.         System.out.println(byteBuf.toString(StandardCharsets.UTF_8)); 
  19.         super.channelRead(ctx, msg); 
  20.     } 

2.開發(fā)客戶端

  1. /** 
  2.  * ********************************************************************* 
  3.  * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】 
  4.  * ********************************************************************* 
  5.  * 
  6.  * @author huangfu 
  7.  * @date 2021/5/6 21:29 
  8.  */ 
  9. public class CodecClient { 
  10.     public static void main(String[] args) throws InterruptedException { 
  11.         EventLoopGroup worker = new NioEventLoopGroup(); 
  12.  
  13.         try { 
  14.             Bootstrap bootstrap = new Bootstrap(); 
  15.             bootstrap.group(worker) 
  16.                     .remoteAddress(new InetSocketAddress("127.0.0.1",8989)) 
  17.                     .channel(NioSocketChannel.class) 
  18.                     .handler(new ChannelInitializer<SocketChannel>() { 
  19.                         @Override 
  20.                         protected void initChannel(SocketChannel ch) throws Exception { 
  21.                             ch.pipeline().addLast("codecClientHandler",new CodecClientHandler()); 
  22.                         } 
  23.                     }); 
  24.  
  25.             ChannelFuture channelFuture = bootstrap.connect().sync(); 
  26.             channelFuture.channel().closeFuture().sync(); 
  27.  
  28.         }finally { 
  29.             worker.shutdownGracefully(); 
  30.         } 
  31.     } 

三、結(jié)果演示

上述的代碼相信大家都極其熟悉,就是開發(fā)一個(gè)服務(wù)端和客戶端,當(dāng)客戶端連接到服務(wù)端之后,服務(wù)端每隔10毫秒向客戶端輸出一句話,客戶端收到之后打印出來(lái)!

預(yù)期結(jié)果:

實(shí)際結(jié)果:

我們發(fā)現(xiàn),真正跑起來(lái),卻并沒(méi)有按照我們預(yù)期那樣逐行打印,而是好幾行連在一起打印,而且有些字符還出現(xiàn)了亂碼,這是為什么呢?

了解過(guò)網(wǎng)絡(luò)傳輸?shù)耐瑢W(xué)大概都明白,Socket其實(shí)也是TCP的一種,底層通過(guò)流的方式傳輸,由服務(wù)端發(fā)送的數(shù)據(jù)到客戶端,客戶端的Netty需要重新拼裝為一個(gè)完整的包:

  • 當(dāng)傳輸?shù)臄?shù)據(jù)量過(guò)大的時(shí)候,Netty就 分多從拼裝,這就造成了亂碼的現(xiàn)象! 這種現(xiàn)象,術(shù)語(yǔ)叫做半包
  • 當(dāng)Netty讀取的時(shí)候,一次讀取了兩個(gè)數(shù)據(jù)包,那就會(huì)自動(dòng)將兩個(gè)數(shù)據(jù)包合為一個(gè)數(shù)據(jù)包,從而完成封裝為一個(gè)數(shù)據(jù)包,這就是造成好幾行連著打印的問(wèn)題! 這種現(xiàn)象 術(shù)語(yǔ)叫做粘包

 

四、常用的編解碼器

為什么會(huì)發(fā)生粘包、半包!Netty在解析底層數(shù)據(jù)流轉(zhuǎn)換成ByteBuf,但是當(dāng)請(qǐng)求過(guò)于頻繁的時(shí)候,兩次的請(qǐng)求數(shù)據(jù)可能會(huì)被合并為一個(gè),甚至,一次數(shù)據(jù)合并一個(gè)半的數(shù)據(jù)流,此時(shí)因?yàn)閿?shù)據(jù)流字節(jié)的不完全接收,會(huì)導(dǎo)致讀取數(shù)據(jù)不正確或者亂碼等問(wèn)題!

假設(shè),我們預(yù)先知道了這個(gè)數(shù)據(jù)包的一個(gè)規(guī)則,當(dāng)數(shù)據(jù)包規(guī)則不滿足的情況下等待,超過(guò)數(shù)據(jù)規(guī)則限制的時(shí)候進(jìn)行切分,那么是不是就能夠有效的區(qū)分?jǐn)?shù)據(jù)包的界限,從根本上上解決粘包半包的問(wèn)題?

1. 基于換行符的解碼器

LineBasedFrameDecoder

該代碼將以\n或者\(yùn)r\n 作為區(qū)分?jǐn)?shù)據(jù)包的依據(jù),程序在進(jìn)行數(shù)據(jù)解碼的時(shí)候,會(huì)判斷該當(dāng)前的數(shù)據(jù)包內(nèi)是否存在\n或者\(yùn)r\n,當(dāng)存在的時(shí)候會(huì)截取以\n或者\(yùn)r\n的一段字符,作為一個(gè)完整的數(shù)據(jù)包!

客戶端增加解碼器:

CodecClient:

  1. .handler(new ChannelInitializer<SocketChannel>() { 
  2.     @Override 
  3.     protected void initChannel(SocketChannel ch) throws Exception { 
  4.         //增加數(shù)據(jù)包解碼器基于換行符的解碼器 
  5.         ch.pipeline().addLast("lineBasedFrameDecoder", new LineBasedFrameDecoder(Integer.MAX_VALUE)); 
  6.         ch.pipeline().addLast("codecClientHandler", new CodecClientHandler()); 
  7.     } 
  8. }); 

服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:

CodecServerHandler:

  1. ByteBuf byteBuf = aDefault.directBuffer(); 
  2. //增加一個(gè)換行符 
  3. byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!\n".getBytes(StandardCharsets.UTF_8)); 
  4. ctx.writeAndFlush(byteBuf); 

效果圖:

2. 基于自定義換行符的解碼器

DelimiterBasedFrameDecoder

該代碼將以自定義符號(hào)作為區(qū)分?jǐn)?shù)據(jù)包的依據(jù),程序在進(jìn)行數(shù)據(jù)解碼的時(shí)候,會(huì)判斷該當(dāng)前的數(shù)據(jù)包內(nèi)是否存在指定的自定義的符號(hào),當(dāng)存在的時(shí)候會(huì)截取以自定義符號(hào)為結(jié)尾的一段字符,作為一個(gè)完整的數(shù)據(jù)包!

客戶端增加解碼器:

CodecClient:

  1. .handler(new ChannelInitializer<SocketChannel>() { 
  2.     @Override 
  3.     protected void initChannel(SocketChannel ch) throws Exception { 
  4.         ByteBuf byteBuf = Unpooled.copiedBuffer("|".getBytes(StandardCharsets.UTF_8)); 
  5.         ch.pipeline().addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, byteBuf)); 
  6.         ch.pipeline().addLast("codecClientHandler", new CodecClientHandler()); 
  7.     } 
  8. }); 

服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:

CodecServerHandler:

  1. ByteBuf byteBuf = aDefault.directBuffer(); 
  2. //末尾增加一個(gè)指定的字符 
  3. byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!|".getBytes(StandardCharsets.UTF_8)); 
  4. ctx.writeAndFlush(byteBuf); 

效果圖:

3. 基于固定長(zhǎng)度的解碼器

FixedLengthFrameDecoder

定長(zhǎng)數(shù)據(jù)解碼器適用于每次發(fā)送的數(shù)據(jù)包是一個(gè)固定長(zhǎng)度的場(chǎng)景,指定每次讀取的數(shù)據(jù)包的數(shù)據(jù)長(zhǎng)度來(lái)進(jìn)行解碼操作!

我們查看我們的數(shù)據(jù)總共長(zhǎng)度是多少:

  1. 無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受! 

經(jīng)過(guò)計(jì)算為213各字符,我們假設(shè)以后的數(shù)據(jù)都是這個(gè),我們就可以使用固定字符串,作為區(qū)分一個(gè)完整數(shù)據(jù)包的依據(jù):

客戶端增加解碼器:

CodecClient:

  1. .handler(new ChannelInitializer<SocketChannel>() { 
  2.     @Override 
  3.     protected void initChannel(SocketChannel ch) throws Exception { 
  4.         //指定一個(gè)完整數(shù)據(jù)包的長(zhǎng)度為213個(gè) 
  5.         ch.pipeline().addLast("fixedLengthFrameDecoder", new FixedLengthFrameDecoder(213)); 
  6.         ch.pipeline().addLast("codecClientHandler", new CodecClientHandler()); 
  7.     } 
  8. }); 

服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:

CodecServerHandler:

  1. ByteBuf byteBuf = aDefault.directBuffer(); 
  2. //發(fā)送原數(shù)據(jù) 不做任何更改 
  3. byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8)); 
  4. ctx.writeAndFlush(byteBuf); 

效果圖:

4. 基于不定長(zhǎng)的解碼器

LengthFieldBasedFrameDecoder

不定長(zhǎng)長(zhǎng)度域解碼器的使用是用在我們不確定數(shù)據(jù)包的大小的場(chǎng)景下,這也是比較常用的一個(gè)解碼器

客戶端增加解碼器:

CodecClient:

  1. .handler(new ChannelInitializer<SocketChannel>() { 
  2.     @Override 
  3.     protected void initChannel(SocketChannel ch) throws Exception { 
  4.         ch.pipeline().addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); 
  5.         ch.pipeline().addLast("codecClientHandler", new CodecClientHandler()); 
  6.     } 
  7. }); 

服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:

CodecServerHandler:

  1. ByteBuf byteBuf = aDefault.directBuffer(); 
  2. byte[] bytes = "無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8); 
  3. byteBuf.writeInt(bytes.length); 
  4. byteBuf.writeBytes(bytes); 
  5. ctx.writeAndFlush(byteBuf); 

他的參數(shù)比較多,我們做幾個(gè)基本的認(rèn)識(shí):

maxFrameLength:本次能接收的最大的數(shù)據(jù)長(zhǎng)度

lengthFieldOffset:設(shè)置的長(zhǎng)度域的偏移量,長(zhǎng)度域在數(shù)據(jù)包的起始位置,所以偏移量為0

lengthFieldLength:長(zhǎng)度域的長(zhǎng)度,例子使用的是Int占4位 所以參數(shù)為4

lengthAdjustment:數(shù)據(jù)包的偏移量,計(jì)算方式=數(shù)據(jù)長(zhǎng)度 +lengthAdjustment=數(shù)據(jù)總長(zhǎng)度 這里數(shù)據(jù)包的總長(zhǎng)度=lengthFieldLength ,所以不需要補(bǔ)充,所以參數(shù)為0

initialBytesToStrip:需要跳過(guò)的字節(jié)數(shù),這里我們只關(guān)注真正的數(shù)據(jù),不關(guān)注數(shù)據(jù)包的長(zhǎng)度,所以我們把長(zhǎng)度域跳過(guò)去,長(zhǎng)度域?yàn)?,所以跳過(guò)4

效果圖:

5. 自定義編解碼器

I. ByteToMessageDecoder

需求:我們需要在解碼器中就將ByteBuf解碼,并轉(zhuǎn)成字符串,后面直接打印

開發(fā)一個(gè)自定義的解碼器:

  1. /** 
  2.  * ********************************************************************* 
  3.  * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】 
  4.  * 自定義一個(gè)基于固定長(zhǎng)度的解碼器,當(dāng)解碼成功后,將數(shù)據(jù)轉(zhuǎn)成字符串 
  5.  * ********************************************************************* 
  6.  * 
  7.  * @author huangfu 
  8.  * @date 2021/5/7 22:43 
  9.  */ 
  10. public class MyByteToMessageDecoder extends ByteToMessageDecoder { 
  11.     private Integer length; 
  12.  
  13.     public MessageEqualDecoder(Integer length) { 
  14.         this.length = length; 
  15.     } 
  16.  
  17.     @Override 
  18.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
  19.         //當(dāng)前的可讀字節(jié)數(shù) 
  20.         int readableBytes = in.readableBytes(); 
  21.         //當(dāng)可讀字節(jié)數(shù)超過(guò)預(yù)設(shè)數(shù)量的時(shí)候 
  22.         if(readableBytes >= length) { 
  23.             byte[] bytes = new byte[length]; 
  24.             //讀取出來(lái) 
  25.             in.readBytes(bytes); 
  26.             //轉(zhuǎn)換成字符串 并添加進(jìn)集合中 
  27.             out.add(new String(bytes, StandardCharsets.UTF_8)); 
  28.         } 
  29.     } 

客戶端處理器開發(fā):

CodecClientHandler

  1. /** 
  2.  * ********************************************************************* 
  3.  * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】 
  4.  * ********************************************************************* 
  5.  * 
  6.  * @author huangfu 
  7.  * @date 2021/5/6 21:31 
  8.  */ 
  9. public class CodecClientHandler extends ChannelInboundHandlerAdapter { 
  10.     @Override 
  11.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  12.         System.out.println("連接成功"); 
  13.     } 
  14.  
  15.     @Override 
  16.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  17.         //解碼器已經(jīng)將數(shù)據(jù)轉(zhuǎn)換成字符串了,這里直接強(qiáng)壯為字符串使用 
  18.         String msgStr = (String) msg; 
  19.         System.out.println(msgStr); 
  20.         super.channelRead(ctx, msg); 
  21.     } 

客戶端開發(fā):

CodecClient

  1. public class CodecClient { 
  2.     public static void main(String[] args) throws InterruptedException { 
  3.         EventLoopGroup worker = new NioEventLoopGroup(); 
  4.  
  5.         try { 
  6.             Bootstrap bootstrap = new Bootstrap(); 
  7.             bootstrap.group(worker) 
  8.                     .remoteAddress(new InetSocketAddress("127.0.0.1", 8989)) 
  9.                     .channel(NioSocketChannel.class) 
  10.                     .handler(new ChannelInitializer<SocketChannel>() { 
  11.                         @Override 
  12.                         protected void initChannel(SocketChannel ch) throws Exception { 
  13.                             //添加自定義的解碼器 
  14.                             ch.pipeline().addLast("messageEqualDecoder", new MyByteToMessageDecoder(213)); 
  15.                             ch.pipeline().addLast("codecClientHandler", new CodecClientHandler()); 
  16.                         } 
  17.                     }); 
  18.  
  19.             ChannelFuture channelFuture = bootstrap.connect().sync(); 
  20.             channelFuture.channel().closeFuture().sync(); 
  21.  
  22.         } finally { 
  23.             worker.shutdownGracefully(); 
  24.         } 
  25.     } 

效果圖:

II. MessageToMessageDecoder

需求:我們?cè)偕厦孀远x的解碼器的基礎(chǔ)上增加一個(gè)需求,要求上一個(gè)解碼器解碼出來(lái)的數(shù)據(jù),在傳播到客戶端的時(shí)候,需用[]包裹住。

開發(fā)自定義的消息轉(zhuǎn)換器(泛型為String的原因是 上一個(gè)解碼器已經(jīng)將其轉(zhuǎn)換為了String):

  1. /** 
  2.  * 將消息用[]包裹起來(lái) 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年5月8日08:25:21 
  6.  */ 
  7. public class MyMessageToMessageDecoder extends MessageToMessageDecoder<String> { 
  8.     @Override 
  9.     protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { 
  10.         if(!StringUtil.isNullOrEmpty(msg)){ 
  11.             out.add(String.format("[%s]", msg)); 
  12.         } 
  13.     } 

客戶端開發(fā):

CodecClient

  1. /** 
  2.  * ********************************************************************* 
  3.  * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】 
  4.  * ********************************************************************* 
  5.  * 
  6.  * @author huangfu 
  7.  * @date 2021/5/6 21:29 
  8.  */ 
  9. public class CodecClient { 
  10.     public static void main(String[] args) throws InterruptedException { 
  11.         EventLoopGroup worker = new NioEventLoopGroup(); 
  12.  
  13.         try { 
  14.             Bootstrap bootstrap = new Bootstrap(); 
  15.             bootstrap.group(worker) 
  16.                     .remoteAddress(new InetSocketAddress("127.0.0.1",8989)) 
  17.                     .channel(NioSocketChannel.class) 
  18.                     .handler(new ChannelInitializer<SocketChannel>() { 
  19.                         @Override 
  20.                         protected void initChannel(SocketChannel ch) throws Exception { 
  21.                             //添加自定義的解碼器 
  22.                             ch.pipeline().addLast("messageEqualDecoder", new MyByteToMessageDecoder(213)); 
  23.                             ch.pipeline().addLast("myMessageToMessageDecoder", new MyMessageToMessageDecoder()); 
  24.                             ch.pipeline().addLast("codecClientHandler", new CodecClientHandler()); 
  25.                         } 
  26.                     }); 
  27.  
  28.             ChannelFuture channelFuture = bootstrap.connect().sync(); 
  29.             channelFuture.channel().closeFuture().sync(); 
  30.  
  31.         }finally { 
  32.             worker.shutdownGracefully(); 
  33.         } 
  34.     } 

效果圖:

6. 心跳檢測(cè)

我們現(xiàn)在假設(shè)有一個(gè)客戶端與服務(wù)端,客戶端與服務(wù)端進(jìn)行數(shù)據(jù)交互,服務(wù)端探測(cè)到客戶端5秒沒(méi)有發(fā)送數(shù)據(jù) 3次以上關(guān)閉連接!

開發(fā)一個(gè)心跳服務(wù)端處理器

  1. /** 
  2.  * 心跳處理的Handler 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年5月8日09:03:46 
  6.  */ 
  7. public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { 
  8.  
  9.     /** 
  10.      * 讀空閑次數(shù) 
  11.      */ 
  12.     private int readIdleTimes = 0; 
  13.  
  14.     @Override 
  15.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  16.         System.out.println("客戶端連接:"+ ctx.channel().remoteAddress()); 
  17.         super.channelActive(ctx); 
  18.     } 
  19.  
  20.     @Override 
  21.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  22.         ByteBuf byteBuf = (ByteBuf) msg; 
  23.         String string = byteBuf.toString(StandardCharsets.UTF_8); 
  24.         System.out.println(string); 
  25.         //有數(shù)據(jù)  次數(shù)歸0 
  26.         readIdleTimes = 0; 
  27.         super.channelRead(ctx, msg); 
  28.     } 
  29.  
  30.     @Override 
  31.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
  32.         if (evt instanceof IdleStateEvent) { 
  33.             IdleStateEvent idleStateEvent = (IdleStateEvent) evt; 
  34.             if (idleStateEvent.state() == IdleState.READER_IDLE) { 
  35.                 System.out.println("發(fā)生讀空閑"); 
  36.                 readIdleTimes++; 
  37.             } 
  38.             //3次讀空閑之后,關(guān)閉客戶端連接 
  39.             if (readIdleTimes > 3) { 
  40.                 //關(guān)閉客戶端連接 
  41.                 System.out.println("客戶端連接被關(guān)閉:"+ ctx.channel().remoteAddress()); 
  42.                 ctx.close(); 
  43.             } 
  44.         } 
  45.     } 

開發(fā)一個(gè)心跳服務(wù)端

  1. /** 
  2.  * 心跳服務(wù)器 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年5月8日08:52:56 
  6.  */ 
  7. public class HeartBeatServer { 
  8.     public static void main(String[] args) { 
  9.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  10.         EventLoopGroup worker = new NioEventLoopGroup(); 
  11.  
  12.         try { 
  13.             ServerBootstrap bootstrap = new ServerBootstrap(); 
  14.             bootstrap.group(boss,worker) 
  15.                     .channel(NioServerSocketChannel.class) 
  16.                     .localAddress(8989) 
  17.                     .childHandler(new ChannelInitializer<SocketChannel>() { 
  18.                         @Override 
  19.                         protected void initChannel(SocketChannel ch) throws Exception { 
  20.                             //心跳觸發(fā)器  讀空閑  寫空閑  讀寫空閑5秒的均會(huì)觸發(fā)心跳事件 
  21.                             ch.pipeline().addLast(new IdleStateHandler(5,5,5, TimeUnit.SECONDS)); 
  22.                             ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4)); 
  23.                             //定義處理器 
  24.                             ch.pipeline().addLast(new HeartBeatServerHandler()); 
  25.                         } 
  26.                     }); 
  27.             ChannelFuture channelFuture = bootstrap.bind().sync(); 
  28.             channelFuture.channel().closeFuture().sync(); 
  29.         } catch (InterruptedException e) { 
  30.             e.printStackTrace(); 
  31.         } finally { 
  32.             boss.shutdownGracefully(); 
  33.             worker.shutdownGracefully(); 
  34.         } 
  35.  
  36.     } 

開發(fā)一個(gè)心跳客戶端處理器

  1. /** 
  2.  * 客戶端心跳處理 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年5月8日09:29:05 
  6.  */ 
  7. public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter { 
  8.  
  9.     @Override 
  10.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  11.         System.out.println("通道被激活"); 
  12.         super.channelActive(ctx); 
  13.     } 
  14.  
  15.     @Override 
  16.     public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
  17.         System.out.println("通道被銷毀"); 
  18.         super.channelInactive(ctx); 
  19.     } 

開發(fā)一個(gè)心跳客戶端

  1. /** 
  2.  * 心跳消息服務(wù) 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年5月8日09:37:07 
  6.  */ 
  7. public class HeartBeatClient { 
  8.     private static Channel channel = null
  9.     private static Scanner sc = new Scanner(System.in); 
  10.     public static void main(String[] args) { 
  11.         EventLoopGroup worker = new NioEventLoopGroup(); 
  12.         try { 
  13.             Bootstrap bootstrap = new Bootstrap(); 
  14.             bootstrap.group(worker) 
  15.                     .channel(NioSocketChannel.class) 
  16.                     .remoteAddress("127.0.0.1",8989) 
  17.                     .handler(new ChannelInitializer<SocketChannel>() { 
  18.                         @Override 
  19.                         protected void initChannel(SocketChannel ch) throws Exception { 
  20.                             //長(zhǎng)度解碼器 
  21.                             ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4)); 
  22.                             ch.pipeline().addLast(new HeartBeatClientHandler()); 
  23.                         } 
  24.                     }); 
  25.             //連接服務(wù)端 
  26.             ChannelFuture channelFuture = bootstrap.connect().sync(); 
  27.             channel = channelFuture.channel(); 
  28.             Thread thread = new Thread(HeartBeatClient::writeStr); 
  29.             thread.setDaemon(true); 
  30.             thread.start(); 
  31.             channel.closeFuture().sync(); 
  32.         } catch (InterruptedException e) { 
  33.             e.printStackTrace(); 
  34.         } finally { 
  35.             worker.shutdownGracefully(); 
  36.         } 
  37.     } 
  38.  
  39.     /** 
  40.      * 向服務(wù)端寫入數(shù)據(jù) 
  41.      */ 
  42.     public static void writeStr(){ 
  43.         while (true) { 
  44.             System.out.print("請(qǐng)輸入要發(fā)送的數(shù)據(jù):"); 
  45.             //從鍵盤讀入數(shù)據(jù) 
  46.             String line = sc.nextLine(); 
  47.             ByteBuf buffer = Unpooled.buffer(); 
  48.             buffer.writeInt(line.length()); 
  49.             buffer.writeBytes(line.getBytes(StandardCharsets.UTF_8)); 
  50.             //發(fā)送數(shù)據(jù) 
  51.             channel.writeAndFlush(buffer).addListener(future -> { 
  52.                 if (future.isSuccess()) { 
  53.                     System.out.println("發(fā)送成功"); 
  54.                 } 
  55.             }); 
  56.         } 
  57.  
  58.     } 

 

責(zé)任編輯:武曉燕 來(lái)源: 源碼學(xué)徒
相關(guān)推薦

2021-10-08 09:38:57

NettyChannelHand架構(gòu)

2021-04-07 13:52:57

GoogleLyra編譯器

2024-07-05 08:27:07

2025-04-10 10:15:30

2022-10-10 10:38:22

FedoraopenSUSE視頻編解碼

2020-02-19 19:15:27

UbuntuLinux媒體編解碼器

2020-12-22 07:58:46

Netty編碼器解碼器

2022-02-15 21:42:23

嵌入式系統(tǒng)音頻編解碼器開發(fā)

2021-12-25 16:20:38

微軟WindowsWindows 10

2023-06-20 08:34:33

SVT-AV1開源

2021-08-18 10:41:24

GoogleSoundStream神經(jīng)網(wǎng)絡(luò)

2023-07-26 16:31:09

Windows 10Windows 11微軟

2021-04-22 11:21:03

Windows 10Windows微軟

2024-02-28 08:22:07

2020-10-10 15:22:33

Windows 功能系統(tǒng)

2021-10-08 10:50:33

AI 編碼器人工智能

2023-07-05 11:16:59

2021-08-20 13:12:18

Google 開源技術(shù)

2021-08-11 10:03:07

iOS 14.8蘋果iOS 15
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)