我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Netty、MINA、Twisted一起学系列02:TCP消息边界问题及按行分割消息

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Netty、MINA、Twisted一起学系列02:TCP消息边界问题及按行分割消息

Netty、MINA、Twisted一起学系列02:TCP消息边界问题及按行分割消息

文章已获得作者授权

点击文末左下角“阅读原文”即可跳转到原文地址

在TCP连接开始到结束连接,之间可能会多次传输数据,也就是服务器和客户端之间可能会在连接过程中互相传输多条消息。理想状况是一方每发送一条消息,另一方就立即接收到一条,也就是一次write对应一次read。但是,现实不总是按照剧本来走。

MINA官方文档节选:

TCP guarantess delivery of all packets in the correct order. But there is no guarantee that one write operation on the sender-side will result in one read event on the receiving side. One call of IoSession.write(Object message) by the sender can result in multiple messageReceived(IoSession session, Object message) events on the receiver; and multiple calls of IoSession.write(Object message) can lead to a single messageReceived event.

Netty官方文档节选:

In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote.

上面两段话表达的意思相同:TCP是基于字节流的协议,它只能保证一方发送和另一方接收到的数据的字节顺序一致,但是,并不能保证一方每发送一条消息,另一方就能完整的接收到一条信息。有可能发送了两条对方将其合并成一条,也有可能发送了一条对方将其拆分成两条。所以在上一篇文章(Netty、MINA、Twisted一起学系列01:实现简单的TCP服务器)中的Demo,可以说是一个错误的示范。不过服务器和客户端在同一台机器上或者在局域网等网速很好的情况下,这种问题还是很难测试出来。

举个简单了例子(这个例子来源于Netty官方文档):消息发送方发送了三个字符串:

Netty、MINA、Twisted一起学系列02:TCP消息边界问题及按行分割消息

但是接收方收到的可能是这样的:

Netty、MINA、Twisted一起学系列02:TCP消息边界问题及按行分割消息

那么问题就很严重了,接收方没法分开这三条信息了,也就没法解析了。对此,MINA的官方文档提供了以下几种解决方案:

use fixed length messages

使用固定长度的消息。比如每个长度4字节,那么接收的时候按每条4字节拆分就可以了。

use a fixed length header that indicates the length of the body

使用固定长度的Header,Header中指定Body的长度(字节数),将信息的内容放在Body中。例如Header中指定的Body长度是100字节,那么Header之后的100字节就是Body,也就是信息的内容,100字节的Body后面就是下一条信息的Header了。

using a delimiter; for example many text-based protocols append a newline (or CR LF pair) after every message

使用分隔符。例如许多文本内容的协议会在每条消息后面加上换行符(CR LF,即”\r\n”),也就是一行一条消息。当然也可以用其他特殊符号作为分隔符,例如逗号、分号等等。

当然除了上面说到的3种方案,还有其他方案。有的协议也可能会同时用到上面多种方案。例如HTTP协议,Header部分用的是CR LF换行来区分每一条Header,而Header中用Content-Length来指定Body字节数。

下面,分别用MINA、Netty、Twisted自带的相关API实现按换行符CR LF来分割消息。

MINA

MINA可以使用ProtocolCodecFilter来对发送和接收的二进制数据进行加工,如何加工取决于ProtocolCodecFactory或ProtocolEncoder、ProtocolDecoder,加工后在IoHandler中messageReceived事件函数获取的message就不再是IoBuffer了,而是你想要的其他类型,可以是字符串,Java对象。这里可以使用TextLineCodecFactory(ProtocolCodecFactory的一个实现类)实现CR LF分割消息。

public class TcpServer {

 public static void main(String[] args) throws IOException {
   IoAcceptor acceptor = new NioSocketAcceptor();

   // 添加一个Filter,用于接收、发送的内容按照"\r\n"分割
   acceptor.getFilterChain().addLast("codec",
       new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n")));

   acceptor.setHandler(new TcpServerHandle());
   acceptor.bind(new InetSocketAddress(8080));
 }

}

class TcpServerHandle extends IoHandlerAdapter {

 @Override
 public void exceptionCaught(IoSession session, Throwable cause)
     throws Exception {
   cause.printStackTrace();
 }

 // 接收到新的数据
 @Override
 public void messageReceived(IoSession session, Object message)
     throws Exception {

   // 接收客户端的数据,这里接收到的不再是IoBuffer类型,而是字符串
   String line = (String) message;
   System.out.println("messageReceived:" + line);

 }

 @Override
 public void sessionCreated(IoSession session) throws Exception {
   System.out.println("sessionCreated");
 }

 @Override
 public void sessionClosed(IoSession session) throws Exception {
   System.out.println("sessionClosed");
 }
}

Netty

Netty设计上和MINA类似,需要在ChannelPipeline加上一些ChannelHandler用来对原始数据进行处理。这里用LineBasedFrameDecoder将接收到的数据按行分割,StringDecoder再将数据由字节码转成字符串。同样,接收到的数据进过加工后,在channelRead事件函数中,msg参数不再是ByteBuf而是String。

public class TcpServer {

 public static void main(String[] args) throws InterruptedException {
   EventLoopGroup bossGroup = new NioEventLoopGroup();
   EventLoopGroup workerGroup = new NioEventLoopGroup();
   try {
     ServerBootstrap b = new ServerBootstrap();
     b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch)
               throws Exception {
             ChannelPipeline pipeline = ch.pipeline();

             // LineBasedFrameDecoder按行分割消息
             pipeline.addLast(new LineBasedFrameDecoder(80));
             // 再按UTF-8编码转成字符串
             pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

             pipeline.addLast(new TcpServerHandler());
           }
         });
     ChannelFuture f = b.bind(8080).sync();
     f.channel().closeFuture().sync();
   } finally {
     workerGroup.shutdownGracefully();
     bossGroup.shutdownGracefully();
   }
 }

}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

 // 接收到新的数据
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {

   // msg经过StringDecoder后类型不再是ByteBuf而是String
   String line = (String) msg;
   System.out.println("channelRead:" + line);
 }

 @Override
 public void channelActive(ChannelHandlerContext ctx) {
   System.out.println("channelActive");
 }

 @Override
 public void channelInactive(ChannelHandlerContext ctx) {
   System.out.println("channelInactive");
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
   cause.printStackTrace();
   ctx.close();
 }
}

Twisted

Twisted的设计和上面两者的设计不太一样,所以实现消息分割也不太一样。处理事件的类TcpServerHandle不再继承Protocol,而是继承Protocol的子类LineOnlyReceiver。接收到新数据的事件方法也不再是dataReceived,而是LineOnlyReceiver提供的lineReceived。看Twisted源码的话可以发现LineOnlyReceiver的内部实际上自己实现了dataReceived,然后将其按行分割,有新的一行数据就调用lineReceived。

# -*- coding:utf-8 –*-

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(LineOnlyReceiver):

   # 新的连接建立
   def connectionMade(self):
       print 'connectionMade'

   # 连接断开
   def connectionLost(self, reason):
       print 'connectionLost'

   # 接收到新的一行数据
   def lineReceived(self, data):
       print 'lineReceived:' + data

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

下面用一个Java客户端对三个服务器进行测试。

public class TcpClient {

 public static void main(String[] args) throws IOException {

   Socket socket = null;
   OutputStream out = null;

   try {

     socket = new Socket("localhost", 8080);
     out = socket.getOutputStream();

     // 请求服务器
     String lines = "床前明月光\r\n疑是地上霜\r\n举头望明月\r\n低头思故乡\r\n";
     byte[] outputBytes = lines.getBytes("UTF-8");
     out.write(outputBytes);
     out.flush();

   } finally {
     // 关闭连接
     out.close();
     socket.close();
   }

 }

}

MINA服务器输出结果:

sessionCreated

messageReceived:床前明月光

messageReceived:疑是地上霜

messageReceived:举头望明月

messageReceived:低头思故乡

sessionClosed

Netty服务器输出结果:

channelActive

channelRead:床前明月光

channelRead:疑是地上霜

channelRead:举头望明月

channelRead:低头思故乡

channelInactive

Twisted服务器输出结果:

connectionMade

lineReceived:床前明月光

lineReceived:疑是地上霜

lineReceived:举头望明月

lineReceived:低头思故乡

connectionLost

当然,测试的时候也可以将发送的数据模拟成不按规则分割的情况,下面用一个更变态的客户端来测试。

public class TcpClient {

 public static void main(String[] args) throws IOException, InterruptedException {


   Socket socket = null;
   OutputStream out = null;

   try{

     socket = new Socket("localhost", 8080);  
     out = socket.getOutputStream();

     String lines = "床前";
     byte[] outputBytes = lines.getBytes("UTF-8");
     out.write(outputBytes);
     out.flush();

     Thread.sleep(1000);

     lines = "明月";
     outputBytes = lines.getBytes("UTF-8");
     out.write(outputBytes);
     out.flush();

     Thread.sleep(1000);

     lines = "光\r\n疑是地上霜\r\n举头";
     outputBytes = lines.getBytes("UTF-8");
     out.write(outputBytes);
     out.flush();

     Thread.sleep(1000);

     lines = "望明月\r\n低头思故乡\r\n";
     outputBytes = lines.getBytes("UTF-8");
     out.write(outputBytes);
     out.flush();

   } finally {
     // 关闭连接
     out.close();
     socket.close();
   }

 }

}

再次分别测试上面三个服务器,结果和上面的输出结果一样,没有任何问题。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Netty、MINA、Twisted一起学系列02:TCP消息边界问题及按行分割消息

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Netty、MINA、Twisted一起学系列02:TCP消息边界问题及按行分割消息

文章已获得作者授权点击文末左下角“阅读原文”即可跳转到原文地址在TCP连接开始到结束连接,之间可能会多次传输数据,也就是服务器和客户端之间可能会在连接过程中互相传输多条消息。理想状况是一方每发送一条消息,另一方就立即接收到一条,也就是一次w
2023-06-03

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录