我的编程空间,编程开发者的网络收藏夹
学习永远不晚

JAVA NIO下I/O的阻塞与非阻塞实现

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

JAVA NIO下I/O的阻塞与非阻塞实现

本篇内容介绍了“JAVA NIO下I/O的阻塞与非阻塞实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

当前环境

  1. jdk == 1.8

代码地址

git 地址:https://github.com/jasonGeng88/java-network-programming

知识点

  • nio 下 I/O 阻塞与非阻塞实现

  • SocketChannel 介绍

  • I/O 多路复用的原理

  • 事件选择器与 SocketChannel 的关系

  • 事件监听类型

  • 字节缓冲 ByteBuffer 数据结构

nio 的阻塞实现

关于什么是 nio,从字面上理解为 New IO,就是为了弥补原本 I/O 上的不足,而在 JDK 1.4 中引入的一种新的 I/O 实现方式。简单理解,就是它提供了 I/O 的阻塞与非阻塞的两种实现方式(当然,默认实现方式是阻塞的。)。

下面,我们先来看下 nio 以阻塞方式是如何处理的。

建立连接

有了上一篇 socket 的经验,我们的第一步一定也是建立 socket 连接。只不过,这里不是采用 new socket() 的方式,而是引入了一个新的概念 SocketChannel。它可以看作是 socket 的一个完善类,除了提供 Socket 的相关功能外,还提供了许多其他特性,如后面要讲到的向选择器注册的功能。

类图如下: JAVA NIO下I/O的阻塞与非阻塞实现

建立连接代码实现:

// 初始化 socket,建立 socket 与 channel 的绑定关系SocketChannel socketChannel = SocketChannel.open();// 初始化远程连接地址SocketAddress remote = new InetSocketAddress(this.host, port);// I/O 处理设置阻塞,这也是默认的方式,可不设置socketChannel.configureBlocking(true);// 建立连接socketChannel.connect(remote);

获取 socket 连接

因为是同样是 I/O 阻塞的实现,所以后面的关于 socket 输入输出流的处理,和上一篇的基本相同。唯一差别是,这里需要通过 channel 来获取 socket 连接。

  • 获取 socket 连接

Socket socket = socketChannel.socket();
  • 处理输入输出流

PrintWriter pw = getWriter(socketChannel.socket());BufferedReader br = getReader(socketChannel.socket());

完整示例

package com.jason.network.mode.nio;import com.jason.network.constant.HttpConstant;import com.jason.network.util.HttpUtil;import java.io.*;import java.net.InetSocketAddress;import java.net.Socket;import java.net.SocketAddress;import java.nio.channels.SocketChannel;public class NioBlockingHttpClient {    private SocketChannel socketChannel;    private String host;    public static void main(String[] args) throws IOException {        for (String host: HttpConstant.HOSTS) {            NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);            client.request();        }    }    public NioBlockingHttpClient(String host, int port) throws IOException {        this.host = host;        socketChannel = SocketChannel.open();        socketChannel.socket().setSoTimeout(5000);        SocketAddress remote = new InetSocketAddress(this.host, port);        this.socketChannel.connect(remote);    }    public void request() throws IOException {        PrintWriter pw = getWriter(socketChannel.socket());        BufferedReader br = getReader(socketChannel.socket());        pw.write(HttpUtil.compositeRequest(host));        pw.flush();        String msg;        while ((msg = br.readLine()) != null){            System.out.println(msg);        }    }    private PrintWriter getWriter(Socket socket) throws IOException {        OutputStream out = socket.getOutputStream();        return new PrintWriter(out);    }    private BufferedReader getReader(Socket socket) throws IOException {        InputStream in = socket.getInputStream();        return new BufferedReader(new InputStreamReader(in));    }}

nio 的非阻塞实现

原理分析

nio 的阻塞实现,基本与使用原生的 socket 类似,没有什么特别大的差别。

下面我们来看看它真正强大的地方。到目前为止,我们将的都是阻塞 I/O。何为阻塞 I/O,看下图:

JAVA NIO下I/O的阻塞与非阻塞实现

我们主要观察图中的前三种 I/O 模型,关于异步 I/O,一般需要依靠操作系统的支持,这里不讨论。

从图中可以发现,阻塞过程主要发生在两个阶段上:

  • 第一阶段:等待数据就绪;

  • 第二阶段:将已就绪的数据从内核缓冲区拷贝到用户空间;

这里产生了一个从内核到用户空间的拷贝,主要是为了系统的性能优化考虑。假设,从网卡读到的数据直接返回给用户空间,那势必会造成频繁的系统中断,因为从网卡读到的数据不一定是完整的,可能断断续续的过来。通过内核缓冲区作为缓冲,等待缓冲区有足够的数据,或者读取完结后,进行一次的系统中断,将数据返回给用户,这样就能避免频繁的中断产生。

了解了 I/O 阻塞的两个阶段,下面我们进入正题。看看一个线程是如何实现同时处理多个 I/O 调用的。从上图中的非阻塞 I/O 可以看出,仅仅只有第二阶段需要阻塞,第一阶段的数据等待过程,我们是不需要关心的。不过该模型是频繁地去检查是否就绪,造成了 CPU 无效的处理,反而效果不好。如果有一种类似的好莱坞原则— “不要给我们打电话,我们会打给你” 。这样一个线程可以同时发起多个 I/O 调用,并且不需要同步等待数据就绪。在数据就绪完成的时候,会以事件的机制,来通知我们。这样不就实现了单线程同时处理多个 IO 调用的问题了吗?即所说的“I/O 多路复用模型”。


废话讲了一大堆,下面就来实际操刀一下。

创建选择器

由上面分析可以,我们得有一个选择器,它能监听所有的 I/O 操作,并且以事件的方式通知我们哪些 I/O 已经就绪了。

代码如下:

import java.nio.channels.Selector;...private static Selector selector;static {    try {        selector = Selector.open();    } catch (IOException e) {        e.printStackTrace();    }}

创建非阻塞 I/O

下面,我们来创建一个非阻塞的 SocketChannel,代码与阻塞实现类型,唯一不同是socketChannel.configureBlocking(false)

注意:只有在socketChannel.configureBlocking(false)之后的代码,才是非阻塞的,如果socketChannel.connect()在设置非阻塞模式之前,那么连接操作依旧是阻塞调用的。

SocketChannel socketChannel = SocketChannel.open();SocketAddress remote = new InetSocketAddress(host, port);// 设置非阻塞模式socketChannel.configureBlocking(false);socketChannel.connect(remote);

建立选择器与 socket 的关联

选择器与 socket 都创建好了,下一步就是将两者进行关联,好让选择器和监听到 Socket 的变化。这里采用了以 SocketChannel 主动注册到选择器的方式进行关联绑定,这也就解释了,为什么不直接new Socket(),而是以SocketChannel的方式来创建 socket。

代码如下:

socketChannel.register(selector,                        SelectionKey.OP_CONNECT                        | SelectionKey.OP_READ                        | SelectionKey.OP_WRITE);

上面代码,我们将 socketChannel 注册到了选择器中,并且对它的连接、可读、可写事件进行了监听。

具体的事件监听类型如下:

操作类型描述所属对象
OP_READ1 << 0读操作SocketChannel
OP_WRITE1 << 2写操作SocketChannel
OP_CONNECT1 << 3连接socket操作SocketChannel
OP_ACCEPT1 << 4接受socket操作ServerSocketChannel

选择器监听 socket 变化

现在,选择器已经与我们关心的 socket 进行了关联。下面就是感知事件的变化,然后调用相应的处理机制。

这里与 Linux 下的 selector 有点不同,nio 下的 selecotr 不会去遍历所有关联的 socket。我们在注册时设置了我们关心的事件类型,每次从选择器中获取的,只会是那些符合事件类型,并且完成就绪操作的 socket,减少了大量无效的遍历操作。

public void select() throws IOException {    // 获取就绪的 socket 个数    while (selector.select() > 0){        // 获取符合的 socket 在选择器中对应的事件句柄 key        Set keys = selector.selectedKeys();        // 遍历所有的key        Iterator it = keys.iterator();        while (it.hasNext()){            // 获取对应的 key,并从已选择的集合中移除            SelectionKey key = (SelectionKey)it.next();            it.remove();            if (key.isConnectable()){                // 进行连接操作                connect(key);            }            else if (key.isWritable()){                // 进行写操作                write(key);            }            else if (key.isReadable()){                // 进行读操作                receive(key);            }        }    }}

注意:这里的selector.select()是同步阻塞的,等待有事件发生后,才会被唤醒。这就防止了 CPU 空转的产生。当然,我们也可以给它设置超时时间,selector.select(long timeout)来结束阻塞过程。

处理连接就绪事件

下面,我们分别来看下,一个 socket 是如何来处理连接、写入数据和读取数据的(这些操作都是阻塞的过程,只是我们将等待就绪的过程变成了非阻塞的了)。

处理连接代码:

// SelectionKey 代表 SocketChannel 在选择器中注册的事件句柄private void connect(SelectionKey key) throws IOException {    // 获取事件句柄对应的 SocketChannel    SocketChannel channel = (SocketChannel) key.channel();   // 真正的完成 socket 连接    channel.finishConnect();   // 打印连接信息    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();    String host = remote.getHostName();    int port = remote.getPort();    System.out.println(String.format("访问地址: %s:%s 连接成功!", host, port));}

处理写入就绪事件

// 字符集处理类private Charset charset = Charset.forName("utf8");private void write(SelectionKey key) throws IOException {    SocketChannel channel = (SocketChannel) key.channel();    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();    String host = remote.getHostName();    // 获取 HTTP 请求,同上一篇    String request = HttpUtil.compositeRequest(host);    // 向 SocketChannel 写入事件     channel.write(charset.encode(request));    // 修改 SocketChannel 所关心的事件    key.interestOps(SelectionKey.OP_READ);}

这里有两个地方需要注意:

  • 第一个是使用 channel.write(charset.encode(request)); 进行数据写入。有人会说,为什么不能像上面同步阻塞那样,通过PrintWriter包装类进行操作。因为PrintWriterwrite() 方法是阻塞的,也就是说要等数据真正从 socket 发送出去后才返回。

这与我们这里所讲的阻塞是不一致的,这里的操作虽然也是阻塞的,但它发生的过程是在数据从用户空间到内核缓冲区拷贝过程。至于系统将缓冲区的数据通过 socket 发送出去,这不在阻塞范围内。也解释了为什么要用 Charset 对写入内容进行编码了,因为缓冲区接收的格式是ByteBuffer

  • 第二,选择器用来监听事件变化的两个参数是 interestOpsreadyOps

    • interestOps:表示 SocketChannel 所关心的事件类型,也就是告诉选择器,当有这几种事件发生时,才来通知我。这里通过key.interestOps(SelectionKey.OP_READ);告诉选择器,之后我只关心“读就绪”事件,其他的不用通知我了。

    • readyOps:表示 SocketChannel 当前就绪的事件类型。以key.isReadable()为例,判断依据就是:return (readyOps() & OP_READ) != 0;

处理读取就绪事件

private void receive(SelectionKey key) throws IOException {    SocketChannel channel = (SocketChannel) key.channel();    ByteBuffer buffer = ByteBuffer.allocate(1024);    channel.read(buffer);    buffer.flip();    String receiveData = charset.decode(buffer).toString();    // 当再没有数据可读时,取消在选择器中的关联,并关闭 socket 连接    if ("".equals(receiveData)) {        key.cancel();        channel.close();        return;    }    System.out.println(receiveData);}

这里的处理基本与写入一致,唯一要注意的是,这里我们需要自行处理去缓冲区读取数据的操作。首先会分配一个固定大小的缓冲区,然后从内核缓冲区中,拷贝数据至我们刚分配固定缓冲区上。这里存在两种情况:

  • 我们分配的缓冲区过大,那多余的部分以0补充(初始化时,其实会自动补0)。

  • 我们分配的缓冲去过小,因为选择器会不停的遍历。只要 SocketChannel 处理读就绪状态,那下一次会继续读取。当然,分配过小,会增加遍历次数。

最后,将一下 ByteBuffer 的结构,它主要有 position, limit,capacity 以及 mark 属性。以 buffer.flip(); 为例,讲下各属性的作用(mark 主要是用来标记之前 position 的位置,是在当前 postion 无法满足的情况下使用的,这里不作讨论)。

JAVA NIO下I/O的阻塞与非阻塞实现

从图中看出,

  • 容量(capacity):表示缓冲区可以保存的数据容量;

  • 极限(limit):表示缓冲区的当前终点,即写入、读取都不可超过该重点;

  • 位置(position):表示缓冲区下一个读写单元的位置;

完整代码

package com.jason.network.mode.nio;import com.jason.network.constant.HttpConstant;import com.jason.network.util.HttpUtil;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import java.util.Set;public class NioNonBlockingHttpClient {    private static Selector selector;    private Charset charset = Charset.forName("utf8");    static {        try {            selector = Selector.open();        } catch (IOException e) {            e.printStackTrace();        }    }    public static void main(String[] args) throws IOException {        NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();        for (String host: HttpConstant.HOSTS) {            client.request(host, HttpConstant.PORT);        }        client.select();    }    public void request(String host, int port) throws IOException {        SocketChannel socketChannel = SocketChannel.open();        socketChannel.socket().setSoTimeout(5000);        SocketAddress remote = new InetSocketAddress(host, port);        socketChannel.configureBlocking(false);        socketChannel.connect(remote);        socketChannel.register(selector,                        SelectionKey.OP_CONNECT                        | SelectionKey.OP_READ                        | SelectionKey.OP_WRITE);    }    public void select() throws IOException {        while (selector.select(500) > 0){            Set keys = selector.selectedKeys();            Iterator it = keys.iterator();            while (it.hasNext()){                SelectionKey key = (SelectionKey)it.next();                it.remove();                if (key.isConnectable()){                    connect(key);                }                else if (key.isWritable()){                    write(key);                }                else if (key.isReadable()){                    receive(key);                }            }        }    }    private void connect(SelectionKey key) throws IOException {        SocketChannel channel = (SocketChannel) key.channel();        channel.finishConnect();        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();        String host = remote.getHostName();        int port = remote.getPort();        System.out.println(String.format("访问地址: %s:%s 连接成功!", host, port));    }    private void write(SelectionKey key) throws IOException {        SocketChannel channel = (SocketChannel) key.channel();        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();        String host = remote.getHostName();        String request = HttpUtil.compositeRequest(host);        System.out.println(request);        channel.write(charset.encode(request));        key.interestOps(SelectionKey.OP_READ);    }    private void receive(SelectionKey key) throws IOException {        SocketChannel channel = (SocketChannel) key.channel();        ByteBuffer buffer = ByteBuffer.allocate(1024);        channel.read(buffer);        buffer.flip();        String receiveData = charset.decode(buffer).toString();        if ("".equals(receiveData)) {            key.cancel();            channel.close();            return;        }        System.out.println(receiveData);    }}

示例效果

JAVA NIO下I/O的阻塞与非阻塞实现

“JAVA NIO下I/O的阻塞与非阻塞实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

JAVA NIO下I/O的阻塞与非阻塞实现

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

JAVA NIO下I/O的阻塞与非阻塞实现

本篇内容介绍了“JAVA NIO下I/O的阻塞与非阻塞实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!当前环境jdk == 1.8代码地址
2023-06-02

Java 非阻塞I/O使用方法

绝大部分知识与实例来自O'REILLY的《Java网络编程》(Java Network Programming,Fourth Edition,by Elliotte Rusty Harold(O'REILLY))。非阻塞I/O简介非阻塞I/
2023-05-31

怎么在java中实现阻塞和非阻塞

这篇文章给大家介绍怎么在java中实现阻塞和非阻塞,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Java是什么Java是一门面向对象编程语言,可以编写桌面应用程序、Web应用程序、分布式系统和嵌入式系统应用程序。1.概
2023-06-14

Node.js的非阻塞I/O、异步与事件驱动实例分析

今天小编给大家分享一下Node.js的非阻塞I/O、异步与事件驱动实例分析的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。1、
2023-07-02

C++ 函数在网络编程中如何实现非阻塞 I/O?

c++++中使用非阻塞i/o 进行网络编程可以显著提升应用程序的响应能力和吞吐量。原理:使用异步i/o操作,应用程序在发出i/o请求后继续执行,内核完成操作后生成事件通知应用程序。实现:可使用boost.asio库,它提供了实现异步i/o所
C++ 函数在网络编程中如何实现非阻塞 I/O?
2024-04-27

java ArrayBlockingQueue阻塞队列的实现示例

JavaArrayBlockingQueue阻塞队列实现ArrayBlockingQueue是Java并发包中的有界、线程安全的阻塞队列,基于环形数组实现,遵循先进先出(FIFO)原则。它提供插入(put、offer)、移除(take、poll)和其他方法(peek、size、remainingCapacity)。示例代码展示了如何创建队列、插入和移除元素。ArrayBlockingQueue适用于多线程环境,具有线程安全、高性能的优点,但也有容量限制和FIFO顺序的缺点。
java ArrayBlockingQueue阻塞队列的实现示例
2024-04-02

详解Java阻塞队列(BlockingQueue)的实现原理

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取
2023-05-31

Java阻塞队列的实现原理是什么

本篇文章给大家分享的是有关Java阻塞队列的实现原理是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。BlockingQueue接口提供了3个添加元素方法:add:添加元素到
2023-06-17

利用Python中SocketServer 实现客户端与服务器间非阻塞通信

利用SocketServer模块来实现网络客户端与服务器并发连接非阻塞通信。 首先,先了解下SocketServer模块中可供使用的类: BaseServer:包含服务器的核心功能与混合(mix-in)类挂钩;这个类只用于派生,所以不会生成
2022-06-04

如何在Java项目中实现多线程的阻塞与唤醒

如何在Java项目中实现多线程的阻塞与唤醒?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。java线程的阻塞及唤醒 1. sleep() 方法: sleep(…毫
2023-05-31

轻松创建nodejs服务器(8):非阻塞是如何实现的

这节我们来了解一下nodejs实现非阻塞操作的方法。 我们先来修改一下 start的处理程序:var exec = require("child_process").exec;function start() {console.log("R
2022-06-04

Python的Tornado框架实现异步非阻塞访问数据库的示例

tornado即是一个http非阻塞服务器, 就要用起来, 我们将用到tornado框架 ,mongodb数据库 以及motor(mongodb的异步驱动).来简单实现tornado的非阻塞功能. 其他环境支持的下载与安装 1.安装mong
2022-06-04

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录