NIO编程

1、NIO介绍

  • Java NIO全称Java non-blocking IO,是指 JDK提供的新API。从JDK1.4开始,Java提供了一系列改进的输入/输出的新特性,被统称为NIO(即New IO),是同步非阻塞的。
  • NIO相关类都被放在java.nio包及子包下,并且对原java.io包中的很多类进行改写。
  • NIO有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)
    • 每个Channel都会对应一个Buffer。
    • Selector对应一个线程,一个线程对应多个Channel(连接)。
    • 该图反应了有三个Channel注册到该Selector。
    • 程序切换到哪个Channel是由事件决定的,Event就是一个重要的概念。
    • Selector会根据不同的事件,在各个通道上切换。
    • Buffer就是一个内存块,底层是有一个数组。
    • 数据的读取或写入是通过Buffer,这个和BIO是不同的,BIO中要么是输入流,或者是输出流,不能双向,但是NIO的Buffer是可以读也可以写,需要flip方法切换。
    • Channel是双向的,可以返回底层操作系统的情况,比如Linux,底层的操作系统通道就是双向的。
  • NIO是面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
  • Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
  • stream和channel比较。
    • stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区(更为底层)。
    • stream仅支持阻塞API,channel同时支持阻塞、非阻塞API,网络channel可配合selector实现多路复用。
    • 二者均为全双工,即读写可以同时进行。
  • NIO和BIO的比较。
    • BIO以流的方式处理数据,而NIO以块的方式处理数据,块I/O的效率比流I/O高很多。
    • BIO是阻塞的,NIO则是非阻塞的。
    • BIO基于字节流和字符流进行操作,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

1.1 缓冲区(Buffer)

1.1.1 ByteBuffer简介

  • 缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer,如图:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Slf4j
    public class TestByteBuffer {
    /**
    * 1.向buffer写入数据,例如调用channel.read(buffer)
    * 2.调用flip()切换至读模式
    * 3.从buffer读取数据,例如调用buffer.get()
    * 4.调用clear()或compact()切换至写模式
    * 5.重复1~4步骤
    */
    public static void main(String[] args) {
    // 通过输入输出流获取channel
    try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
    // 准备缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(10);
    while (true) {
    // 从channel读取数据,向buffer写入
    int len = channel.read(buffer);
    log.debug("读取到的字节数 {}", len);
    if (len == -1) {
    break;
    }
    // 切换至读模式
    buffer.flip();
    while (buffer.hasRemaining()) {
    // 一次读取一个字节
    byte b = buffer.get();
    log.debug("实际字节 {}", (char) b);
    }
    // 切换为写模式
    buffer.clear();
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    • 在项目路径下创建data.txt文件。

    • 运行程序后结果如下。

  • 在NIO中,Buffer是一个顶层父类,它是一个抽象类,类的层级关系图如下:

1.1.2 ByteBuffer原理

  • Buffer类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:

    1
    2
    3
    4
    5
    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
    • 一开始创建Buffer时。

    • 写模式下,position是写入位置,limit等于容量,下图表示写入了4个字节后的状态。

    • flip动作发生后,变成读模式,position切换为读取位置,limit切换为读取限制。

    • 读取4个字节后,状态如下。

    • clear动作发生后又切换为写模式。

    • compact方法也是切换到写模式,但是它是是把未读完的部分向前压缩,然后切换至写模式。

    • 可以利用以下调试工具类查看ByteBuffer内参数的信息:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      public class ByteBufferUtil {
      private static final char[] BYTE2CHAR = new char[256];
      private static final char[] HEXDUMP_TABLE = new char[256 * 4];
      private static final String[] HEXPADDING = new String[16];
      private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
      private static final String[] BYTE2HEX = new String[256];
      private static final String[] BYTEPADDING = new String[16];

      static {
      final char[] DIGITS = "0123456789abcdef".toCharArray();
      for (int i = 0; i < 256; i++) {
      HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
      HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
      }

      int i;

      // Generate the lookup table for hex dump paddings
      for (i = 0; i < HEXPADDING.length; i++) {
      int padding = HEXPADDING.length - i;
      StringBuilder buf = new StringBuilder(padding * 3);
      for (int j = 0; j < padding; j++) {
      buf.append(" ");
      }
      HEXPADDING[i] = buf.toString();
      }

      // Generate the lookup table for the start-offset header in each row (up to 64KiB).
      for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
      StringBuilder buf = new StringBuilder(12);
      buf.append(NEWLINE);
      buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
      buf.setCharAt(buf.length() - 9, '|');
      buf.append('|');
      HEXDUMP_ROWPREFIXES[i] = buf.toString();
      }

      // Generate the lookup table for byte-to-hex-dump conversion
      for (i = 0; i < BYTE2HEX.length; i++) {
      BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
      }

      // Generate the lookup table for byte dump paddings
      for (i = 0; i < BYTEPADDING.length; i++) {
      int padding = BYTEPADDING.length - i;
      StringBuilder buf = new StringBuilder(padding);
      for (int j = 0; j < padding; j++) {
      buf.append(' ');
      }
      BYTEPADDING[i] = buf.toString();
      }

      // Generate the lookup table for byte-to-char conversion
      for (i = 0; i < BYTE2CHAR.length; i++) {
      if (i <= 0x1f || i >= 0x7f) {
      BYTE2CHAR[i] = '.';
      } else {
      BYTE2CHAR[i] = (char) i;
      }
      }
      }

      /**
      * 打印所有内容
      * @param buffer
      */
      public static void debugAll(ByteBuffer buffer) {
      int oldlimit = buffer.limit();
      buffer.limit(buffer.capacity());
      StringBuilder origin = new StringBuilder(256);
      appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
      System.out.println("+--------+-------------------- all ------------------------+----------------+");
      System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
      System.out.println(origin);
      buffer.limit(oldlimit);
      }

      /**
      * 打印可读取内容
      * @param buffer
      */
      public static void debugRead(ByteBuffer buffer) {
      StringBuilder builder = new StringBuilder(256);
      appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
      System.out.println("+--------+-------------------- read -----------------------+----------------+");
      System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
      System.out.println(builder);
      }

      private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
      if (isOutOfBounds(offset, length, buf.capacity())) {
      throw new IndexOutOfBoundsException(
      "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
      + ") <= " + "buf.capacity(" + buf.capacity() + ')');
      }
      if (length == 0) {
      return;
      }
      dump.append(
      " +-------------------------------------------------+" +
      NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
      NEWLINE + "+--------+-------------------------------------------------+----------------+");

      final int startIndex = offset;
      final int fullRows = length >>> 4;
      final int remainder = length & 0xF;

      // Dump the rows which have 16 bytes.
      for (int row = 0; row < fullRows; row++) {
      int rowStartIndex = (row << 4) + startIndex;

      // Per-row prefix.
      appendHexDumpRowPrefix(dump, row, rowStartIndex);

      // Hex dump
      int rowEndIndex = rowStartIndex + 16;
      for (int j = rowStartIndex; j < rowEndIndex; j++) {
      dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
      }
      dump.append(" |");

      // ASCII dump
      for (int j = rowStartIndex; j < rowEndIndex; j++) {
      dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
      }
      dump.append('|');
      }

      // Dump the last row which has less than 16 bytes.
      if (remainder != 0) {
      int rowStartIndex = (fullRows << 4) + startIndex;
      appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

      // Hex dump
      int rowEndIndex = rowStartIndex + remainder;
      for (int j = rowStartIndex; j < rowEndIndex; j++) {
      dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
      }
      dump.append(HEXPADDING[remainder]);
      dump.append(" |");

      // Ascii dump
      for (int j = rowStartIndex; j < rowEndIndex; j++) {
      dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
      }
      dump.append(BYTEPADDING[remainder]);
      dump.append('|');
      }

      dump.append(NEWLINE +
      "+--------+-------------------------------------------------+----------------+");
      }

      private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
      if (row < HEXDUMP_ROWPREFIXES.length) {
      dump.append(HEXDUMP_ROWPREFIXES[row]);
      } else {
      dump.append(NEWLINE);
      dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
      dump.setCharAt(dump.length() - 9, '|');
      dump.append('|');
      }
      }

      public static short getUnsignedByte(ByteBuffer buffer, int index) {
      return (short) (buffer.get(index) & 0xFF);
      }
      }
      • 编写测试类:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        public class TestByteBufferReadWrite {
        public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put((byte) 0x61); // 'a'
        debugAll(buffer);
        buffer.put(new byte[]{0x62, 0x63, 0x64}); // b c d
        debugAll(buffer);
        System.out.println(buffer.get());
        buffer.flip();
        System.out.println(buffer.get());
        debugAll(buffer);
        buffer.compact();
        debugAll(buffer);
        buffer.put(new byte[]{0x65, 0x6f});
        debugAll(buffer);
        }
        }

        测试结果如下:

1.1.3 ByteBuffer常用方法

  • Buffer类相关方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public abstract class Buffer{
    //JDK1.4时,引入的api
    public final int capacity()//返回此缓冲区的容量
    public final int position()//返回此缓冲区的位置
    public final Buffer position(int newPositio)//设置此缓冲区的位置
    public final int limit()//返回此缓冲区的限制
    public final Buffer limit(int newLimit)//设置此缓冲区的限制
    public final Buffer mark()//在此缓冲区的位置设置标记
    public final Buffer reset()//将此缓冲区的位置重置为以前标记的位置
    public final Buffer clear()//清除此缓冲区,即将各个标记恢复到初始状态,但是数据并没有真正擦除,后面操作会覆盖
    public final Buffer flip()//反转此缓冲区
    public final Buffer rewind()//重绕此缓冲区
    public final int remaining()//返回当前位置与限制之间的元素数
    public final boolean hasRemaining()//告知在当前位置和限制之间是否有元素
    public abstract boolean isReadOnly()//告知此缓冲区是否为只读缓冲区

    //JDK1.6时引入的api
    public abstract boolean hasArray()//告知此缓冲区是否具有可访问的底层实现数组
    public abstract Object array()//返回此缓冲区的底层实现数组
    public abstract int arrayOffset()//返回此缓冲区的底层实现数组中第一个缓冲区元素的偏移量
    public abstract boolean isDirect()//告知此缓冲区是否为直接缓冲区
    }
  • 从前面可以看出对于Java中的基本数据类型(boolean除外),都有一个Buffer类型与之相对应,最常用的自然是ByteBuffer类(二进制数据),该类的主要方法如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public abstract class ByteBuffer {
    //缓冲区创建相关api
    public static ByteBuffer allocateDirect(int capacity)//创建直接缓冲区public
    static ByteBuffer allocate(int capacity)//设置缓冲区的初始容量public
    static ByteBuffer wrap(byte[] array)//把一个数组放到缓冲区中使用
    //构造初始化位置offset和上界length的缓冲区
    public static ByteBuffer wrap(byte[] array,int offset,int length)
    //缓存区存取相关api
    public abstract byte get()//从当前位置position上get,get之后,position会自动+1
    public abstract byte get(int index)//从绝对位置get
    public abstract ByteBuffer put(byte b)//从当前位置上添加,put之后,position会自动+1
    public abstract ByteBuffer put(int index,byte b)//从绝对位置上put
    }
    • 常用方法1:ByteBuffer.allocate()为ByteBuffer分配空间,其它buffer类也有该方法。

      1
      2
      3
      4
      5
      6
      public class TestByteBufferAllocate {
      public static void main(String[] args) {
      System.out.println(ByteBuffer.allocate(16).getClass());// class java.nio.HeapByteBuffer,使用的是java堆内存,读写效率较低(GC时可能需要将数据进行内存搬迁),受到GC的影响
      System.out.println(ByteBuffer.allocateDirect(16).getClass());// class java.nio.DirectByteBuffer,使用的是直接内存,读写效率高(少一次拷贝),不会受GC影响,分配的效率低(因为需要调用操作系统函数)
      }
      }
    • 常用方法2:向buffer写入数据。

      • 调用channel的read方法。

        1
        int readBytes = channel.read(buf);// 从channel读取数据,向buffer写入
      • 调用buffer自己的put方法。

        1
        buf.put((byte)127);
    • 常用方法3:从buffer读取数据。

      • 调用channel的write方法。

        1
        int writeBytes = channel.write(buf); // 从buffer读取数据,向channel写入
      • 调用buffer自己的get方法。

        1
        byte b = buf.get();
        • get方法会让position读指针向后走,如果想重复读取数据。

          • 可以调用rewind方法将position重新置为0。

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            public class TestByteBufferRead {
            public static void main(String[] args) {
            ByteBuffer buffer = ByteBuffer.allocate(10);
            buffer.put(new byte[]{'a', 'b', 'c', 'd'});
            buffer.flip();
            buffer.get(new byte[4]);
            debugAll(buffer);
            // rewind从头开始读
            buffer.rewind();
            System.out.println((char)buffer.get());
            }
            }
          • 或者调用get(int i)方法获取索引i的内容,它不会移动读指针。

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            public class TestByteBufferRead {
            public static void main(String[] args) {
            ByteBuffer buffer = ByteBuffer.allocate(10);
            buffer.put(new byte[]{'a', 'b', 'c', 'd'});
            buffer.flip();

            // get(i) 不会改变读索引的位置
            System.out.println((char) buffer.get(3));
            debugAll(buffer);
            }
            }

    • 常用方法4:mark和reset。mark是在读取时,做一个标记,即使position改变,只要调用reset就能回到mark的位置。rewind和flip都会清除mark位置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      public class TestByteBufferRead {

      public static void main(String[] args) {
      ByteBuffer buffer = ByteBuffer.allocate(10);
      buffer.put(new byte[]{'a', 'b', 'c', 'd'});
      buffer.flip();

      System.out.println((char) buffer.get());
      System.out.println((char) buffer.get());
      buffer.mark(); // 加标记,索引2的位置
      System.out.println((char) buffer.get());
      System.out.println((char) buffer.get());
      buffer.reset(); // 将position重置到索引2
      System.out.println((char) buffer.get());
      System.out.println((char) buffer.get());
      }
      }
    • 常用方法5:字符串与ByteBuffer互转。

      • 字符串转换成ByteBuffer。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        public class TestByteBufferString {
        public static void main(String[] args) {
        // 使用getBytes()
        ByteBuffer buffer1 = ByteBuffer.allocate(16);
        buffer1.put("hello".getBytes());
        debugAll(buffer1);

        // 使用字符集Charset,会自动切换为读模式
        ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");
        debugAll(buffer2);

        // 使用wrap()方法,会自动切换为读模式
        ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());
        debugAll(buffer3);
        }
        }
      • ByteBuffer转换成字符串。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        public class TestByteBufferString {
        public static void main(String[] args) {
        ByteBuffer buffer1 = ByteBuffer.allocate(16);
        buffer1.put("hello".getBytes());
        debugAll(buffer1);

        ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");
        debugAll(buffer2);

        // 由于buffer2已经自动切换成读模式,所以结果没问题
        String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();
        System.out.println(str1);

        // 需要先将写模式转换为读模式,否则读出来是个空串
        buffer1.flip();
        String str2 = StandardCharsets.UTF_8.decode(buffer1).toString();
        System.out.println(str2);
        }
        }

1.1.4 分散读与集中写

Scattering Reads(分散读)

  • 使用如下方式读取,可以将数据填充至多个buffer。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class TestScatteringReads {
    public static void main(String[] args) {
    try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {
    ByteBuffer b1 = ByteBuffer.allocate(3);
    ByteBuffer b2 = ByteBuffer.allocate(3);
    ByteBuffer b3 = ByteBuffer.allocate(5);
    channel.read(new ByteBuffer[]{b1, b2, b3});
    b1.flip();
    b2.flip();
    b3.flip();
    debugAll(b1);
    debugAll(b2);
    debugAll(b3);
    } catch (IOException e) {
    }
    }
    }

Gathering Writes(集中写)

  • 使用如下方式写入,可以将多个buffer的数据填充至channel。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class TestGatheringWrites {
    public static void main(String[] args) {
    ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
    ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");
    ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");
    try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
    channel.write(new ByteBuffer[]{b1, b2, b3});
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

1.1.5 黏包半包

黏包半包案例

  • 网络上有多条数据发送给服务端,数据之间使用\n进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为:

    • Hello,world\n
    • I’m zhangsan\n
    • How are you?\n
  • 变成了下面的两个 byteBuffer (黏包,半包):

    • Hello,world\nI’m zhangsan\nHo(黏包)
    • w are you?\n(半包)
  • 编写程序,将错乱的数据恢复成原始的按\n分隔的数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class TestByteBufferExam {
    public static void main(String[] args) {
    ByteBuffer source = ByteBuffer.allocate(32);
    source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
    split(source);
    source.put("w are you?\n".getBytes());
    split(source);
    }

    private static void split(ByteBuffer source) {
    source.flip();
    for (int i = 0; i < source.limit(); i++) {
    // 找到一条完整消息
    if (source.get(i) == '\n') {
    int length = i + 1 - source.position();
    // 把这条完整消息存入新的 ByteBuffer
    ByteBuffer target = ByteBuffer.allocate(length);
    // 从source读,向target写
    for (int j = 0; j < length; j++) {
    target.put(source.get());
    }
    debugAll(target);
    }
    }
    source.compact();
    }
    }

1.2 通道(Channel)

1.2.1 Channel简介

  • NIO的通道类似于流,但有些区别如下:

    • 通道可以同时进行读写,而流只能读或者只能写。
    • 通道可以实现异步读写数据。
    • 通道可以从缓冲读数据,也可以写数据到缓冲。
  • 常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。(ServerSocketChannel类似ServerSocket、SocketChannel类似Socket)

    • FileChannel用于文件的数据读写,DatagramChannel用于UDP的数据读写,ServerSocketChannel和SocketChannel用于TCP的数据读写。

1.2.2 FileChannel

1.2.2.1 常用方法

  • FileChannel主要用来对本地文件进行IO操作,且只能工作在阻塞模式下,常见的方法有:

    1
    2
    3
    4
    public int read(ByteBuffer dst)//从通道读取数据并放到缓冲区中
    public int write(ByteBuffer src)//把缓冲区的数据写到通道中
    public long transferFrom(ReadableByteChannel src, long position, long count)//从目标通道中复制数据到当前通道
    public long transferTo(long position, long count, WritableByteChannel target)//把数据从当前通道复制给目标通道
    • FileChannel的获取不能通过直接打开FileChannel,必须通过FileInputStream、FileOutputStream或者RandomAccessFile来获取FileChannel,它们都有getChannel()方法。

      • 通过FileInputStream获取的channel只能读。(读取)

        1
        2
        // 会从channel读取数据填充ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾
        int readBytes = channel.read(buffer);
      • 通过FileOutputStream获取的channel只能写。(写入)

        1
        2
        3
        4
        5
        6
        7
        ByteBuffer buffer = ...;
        buffer.put(...); // 存入数据
        buffer.flip(); // 切换读模式
        // 在while中调用channel.write是因为write方法并不能保证一次将buffer中的内容全部写入channel,对于socketChannel一定要这么做,但fileChannel可以不这么做
        while(buffer.hasRemaining()) {
        channel.write(buffer);
        }
      • 通过RandomAccessFile是否能读写根据构造RandomAccessFile时的读写模式决定。

    • channel使用完毕后必须关闭,不过调用了FileInputStream、FileOutputStream或者RandomAccessFile的close方法会间接地调用channel的close方法。(关闭)

    • 位置有关:

      • 获取当前位置:

        1
        long pos = channel.position();
      • 设置当前位置:

        • 设置当前位置时,如果设置为文件的末尾:
          • 这时读取会返回 -1。
          • 这时写入,会追加内容,但要注意如果position超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)。
        1
        2
        long newPos = ...;
        channel.position(newPos);
    • 可通过size()方法获取文件的大小

    • 操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用force(true)方法将文件内容和元数据(文件的权限等信息)强制立刻写入磁盘

1.2.2.2 两个Channel传输数据

测试使用transferTo方法在两个Channel中传输数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try {
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
// 效率高,底层会利用操作系统的零拷贝进行优化
long size = from.size();
// left 变量代表还剩余多少字节
for (long left = size; left > 0; ) {
System.out.println("position:" + (size - left) + " left:" + left);
// transferTo方法一次最多可传2g的数据
left -= from.transferTo((size - left), left, to);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

测试使用ByteBuffer(缓冲)和FileChannel(通道),将字符串写进文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TestFileChannel {
public static void main(String[] args) {
String str = null;
FileOutputStream fileOutputStream = null;
FileChannel fileChannel = null;
ByteBuffer byteBuffer = null;
try {
str = "hello java";
// 创建一个输出流
fileOutputStream = new FileOutputStream("d:\\file.txt");
// 通过fileOutputStream获取对应的FileChannel
// 这个fileChannel真实类型是FileChannelImpl
fileChannel = fileOutputStream.getChannel();
// 创建一个缓冲区ByteBuffer
byteBuffer = ByteBuffer.allocate(1024);
// 将str放入byteBuffer
byteBuffer.put(str.getBytes());
// 将byteBuffer切换成读模式
byteBuffer.flip();
// 将byteBuffer数据写入到fileChannel
fileChannel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(fileOutputStream!=null){
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

测试使用ByteBuffer(缓冲)和FileChannel(通道), 将文件中的数据读入到程序,并显示在控制台屏幕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestFileChannel {
public static void main(String[] args) {
File file = null;
FileInputStream fileInputStream = null;
FileChannel fileChannel = null;
ByteBuffer byteBuffer = null;
try {
// 创建文件的输入流
file = new File("d:\\file.txt");
fileInputStream = new FileInputStream(file);
// 通过fileInputStream获取对应的FileChannel,实际类型为FileChannelImpl
fileChannel = fileInputStream.getChannel();
// 创建缓冲区
byteBuffer = ByteBuffer.allocate((int) file.length());
// 将通道的数据读入到Buffer
fileChannel.read(byteBuffer);
//将 byteBuffer 的字节数据转成 String
System.out.println(new String(byteBuffer.array()));
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

测试使用FileChannel(通道)和方法read、write,完成文件的拷贝。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class TestFileChannel {
public static void main(String[] args) {
FileInputStream fileInputStream = null;
FileChannel fileChannelInput = null;

FileOutputStream fileOutputStream = null;
FileChannel fileChannelOutput = null;

ByteBuffer byteBuffer = null;
try {
fileInputStream = new FileInputStream("d:\\1.txt");
fileChannelInput = fileInputStream.getChannel();

fileOutputStream = new FileOutputStream("d:\\2.txt");
fileChannelOutput = fileOutputStream.getChannel();

byteBuffer = ByteBuffer.allocate(512);
while (true) { // 循环读取
byteBuffer.clear(); // 清空buffer
int read = fileChannelInput.read(byteBuffer);
System.out.println("read = " + read);
if (read == -1) { //表示读完
break;
}
// 将buffer中的数据写入到fileChannelOutput
byteBuffer.flip();
fileChannelOutput.write(byteBuffer);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

测试使用FileChannel(通道)和方法transferFrom,完成文件的拷贝。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class TestFileChannel {
public static void main(String[] args) {
FileInputStream fileInputStream = null;
FileChannel sourceCh = null;

FileOutputStream fileOutputStream = null;
FileChannel destCh = null;
try {
//创建相关流
fileInputStream = new FileInputStream("d:\\1.txt");
sourceCh = fileInputStream.getChannel();

//获取各个流对应的 FileChannel
fileOutputStream = new FileOutputStream("d:\\2.txt");
destCh = fileOutputStream.getChannel();

//使用 transferForm 完成拷贝
destCh.transferFrom(sourceCh, 0, sourceCh.size());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

1.2.2.3 Path

  • Path工具类(jdk7引入):

    • Path用来表示文件路径。
    • Paths是工具类,用来获取Path实例。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Path source = Paths.get("1.txt"); // 相对路径使用user.dir环境变量来定位1.txt

    Path source = Paths.get("d:\\1.txt"); // 绝对路径代表了d:\1.txt

    Path source = Paths.get("d:/1.txt"); // 绝对路径同样代表了d:\1.txt

    Path projects = Paths.get("d:\\data", "projects"); // 代表了d:\data\projects

    // "."代表当前路径,".."代表上一级路径
    Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
    System.out.println(path);// d:\data\projects\a\..\b
    System.out.println(path.normalize()); // 正常化路径,d:\data\projects\b
  • 例如目录结构如下:

    1
    2
    3
    4
    5
    d:
    |- data
    |- projects
    |- a
    |- b

    代码:

    1
    2
    3
    Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
    System.out.println(path);
    System.out.println(path.normalize()); // 正常化路径

    会输出:

    1
    2
    d:\data\projects\a\..\b
    d:\data\projects\b

1.2.2.4 Files

  • Files工具类(jdk7引入):

    • 检查文件是否存在。

      1
      2
      Path path = Paths.get("helloword/data.txt");
      System.out.println(Files.exists(path));
    • 创建一级目录。

      1
      2
      Path path = Paths.get("helloword/d1");
      Files.createDirectory(path);
      • 如果目录已存在,会抛异常FileAlreadyExistsException。
      • 不能一次创建多级目录,否则会抛异常NoSuchFileException。
    • 创建多级目录。

      1
      2
      Path path = Paths.get("helloword/d1/d2");
      Files.createDirectories(path);
    • 拷贝文件。

      1
      2
      3
      Path source = Paths.get("helloword/data.txt");
      Path target = Paths.get("helloword/target.txt");
      Files.copy(source, target);
      • 如果文件已存在,会抛异常FileAlreadyExistsException。

      • 如果希望用source覆盖掉target,需要用StandardCopyOption来控制。

        1
        Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
    • 移动文件。

      1
      2
      3
      Path source = Paths.get("helloword/data.txt");
      Path target = Paths.get("helloword/data.txt");
      Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
      • StandardCopyOption.ATOMIC_MOVE保证文件移动的原子性。
    • 删除文件。

      1
      2
      Path target = Paths.get("helloword/target.txt");
      Files.delete(target);
      • 如果文件不存在,会抛异常NoSuchFileException。
    • 删除目录。

      1
      2
      Path target = Paths.get("helloword/d1");
      Files.delete(target);
      • 如果目录还有内容,会抛异常DirectoryNotEmptyException。
    • 遍历目录文件。

      • 进行遍历目录和文件的操作。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        public class TestFilesWalkFileTree {
        public static void main(String[] args) {
        AtomicInteger dirCount = new AtomicInteger();
        AtomicInteger fileCount = new AtomicInteger();
        // 典型的访问者模式
        Files.walkFileTree(Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91"), new SimpleFileVisitor<Path>(){
        @Override
        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
        System.out.println("====>"+dir);
        dirCount.incrementAndGet();
        return super.preVisitDirectory(dir, attrs);
        }

        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
        System.out.println(file);
        fileCount.incrementAndGet();
        return super.visitFile(file, attrs);
        }
        });
        System.out.println("dir count:" +dirCount);
        System.out.println("file count:" +fileCount);
        }
        }
      • 统计jar的数目。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        public class TestFilesWalkFileTree {
        public static void main(String[] args) {
        AtomicInteger jarCount = new AtomicInteger();
        Files.walkFileTree(Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91"), new SimpleFileVisitor<Path>(){
        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
        if (file.toString().endsWith(".jar")) {
        System.out.println(file);
        jarCount.incrementAndGet();
        }
        return super.visitFile(file, attrs);
        }
        });
        System.out.println("jar count:" +jarCount);
        }
        }
    • 删除多级目录。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      public class TestFilesWalkFileTree {
      public static void main(String[] args) throws IOException {
      Files.walkFileTree(Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91"), new SimpleFileVisitor<Path>() {
      @Override
      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
      Files.delete(file);
      return super.visitFile(file, attrs);
      }
      // 退出文件夹时再删文件夹
      @Override
      public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
      Files.delete(dir);
      return super.postVisitDirectory(dir, exc);
      }
      });
      }
      }
    • 拷贝多级目录。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      public class TestFilesCopy {
      public static void main(String[] args) throws IOException {
      long start = System.currentTimeMillis();
      String source = "C:\\Program Files\\Java\\jdk1.8.0_91";
      String target = "C:\\Program Files\\Java\\jdk1.8.0_9111";
      Files.walk(Paths.get(source)).forEach(path -> {
      try {
      String targetName = path.toString().replace(source, target);
      // 是目录
      if (Files.isDirectory(path)) {
      Files.createDirectory(Paths.get(targetName));
      }
      // 是普通文件
      else if (Files.isRegularFile(path)) {
      Files.copy(path, Paths.get(targetName));
      }
      } catch (IOException e) {
      e.printStackTrace();
      }
      });
      long end = System.currentTimeMillis();
      System.out.println(end - start);
      }
      }

1.2.3 SocketChannel

1.2.3.1 非阻塞和阻塞

  • 阻塞:

    • 阻塞模式下,相关方法都会导致线程暂停。
      • ServerSocketChannel.accept会在没有连接建立时让线程暂停。
      • SocketChannel.read会在没有数据可读时让线程暂停。
      • 阻塞的表现其实就是线程暂停了,暂停期间不会占用cpu,但线程相当于闲置。
    • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持。
    • 但多线程下,有新的问题,体现在以下方面。
      • 32位jvm一个线程320k,64位jvm一个线程1024k,如果连接数过多,必然导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低。
        • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Slf4j
    public class Server {
    public static void main(String[] args) throws IOException {
    // 使用nio来理解阻塞模式, 单线程
    // 1、创建ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(16);
    // 2、创建服务器
    ServerSocketChannel ssc = ServerSocketChannel.open();
    // 3、绑定监听端口
    ssc.bind(new InetSocketAddress(8080));
    // 4、创建连接集合
    List<SocketChannel> channels = new ArrayList<>();
    while (true) {
    // 5、accept建立与客户端连接,经历tcp三次握手,SocketChannel用来与客户端之间通信
    log.debug("connecting...");
    SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行,不会占用CPU资源
    log.debug("connected... {}", sc);
    channels.add(sc);
    for (SocketChannel channel : channels) {
    // 6、接收客户端发送的数据
    log.debug("before read... {}", channel);
    channel.read(buffer); // 阻塞方法,线程停止运行,不会占用CPU资源
    buffer.flip();
    debugRead(buffer);
    buffer.clear();
    log.debug("after read...{}", channel);
    }
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    public class Client {
    public static void main(String[] args) throws IOException {
    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("localhost", 8080));
    sc.write(ByteBuffer.wrap("hello".getBytes()));
    System.in.read();
    }
    }

    启动服务器端后启动客户端,服务器端控制台打印如下:

  • 非阻塞:

    • 非阻塞模式下,相关方法都会不会让线程暂停。
      • 在ServerSocketChannel.accept在没有连接建立时,会返回null,继续运行。
      • SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read或是去执行ServerSocketChannel.accept。
      • 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去。
    • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了cpu。
    • 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Slf4j
    public class Server {
    public static void main(String[] args) throws IOException {
    // 使用nio来理解非阻塞模式, 单线程
    // 1、创建ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(16);
    // 2、创建服务器
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false); // 设置成非阻塞模式,影响的是accept方法
    // 3、绑定监听端口
    ssc.bind(new InetSocketAddress(8080));
    // 4、创建连接集合
    List<SocketChannel> channels = new ArrayList<>();
    while (true) {
    // 5、accept建立与客户端连接, SocketChannel用来与客户端之间通信
    log.debug("connecting...");
    SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,则sc是null
    if (sc != null) {
    log.debug("connected... {}", sc);
    sc.configureBlocking(false); // 设置成非阻塞模式,影响的是read方法
    channels.add(sc);
    }
    for (SocketChannel channel : channels) {
    // 6、接收客户端发送的数据
    log.debug("before read... {}", channel);
    int read = channel.read(buffer); // 非阻塞,线程仍然会继续运行,如果没有读到数据,read返回0
    if (read > 0) {
    buffer.flip();
    debugRead(buffer);
    buffer.clear();
    log.debug("after read...{}", channel);
    }
    }
    }
    }
    }

1.2.3.2 SocketChannel方法

  • ServerSocketChannel在服务器端监听新的客户端Socket连接,相关方法如下:

    1
    2
    3
    4
    5
    6
    7
    public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel {
    public static ServerSocketChannel open();//得到一个ServerSocketChannel通道
    public final ServerSocketChannel bind(SocketAddress local);//设置服务器端端口号
    public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
    public SocketChannel accept();//接受一个连接,返回代表这个连接的通道对象
    public final SelectionKey register(Selector sel,int ops);//注册一个选择器并设置监听事件
    }
  • SocketChannel,网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。相关方法如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel,ScatteringByteChannel,GatheringByteChannel,NetworkChannel{
    public static SocketChannel open();//得到一个SocketChannel通道
    public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
    public boolean connect(SocketAddress remote);//连接服务器
    public boolean finishConnect();//如果上面的方法连接失败,接下来就要通过该方法完成连接操作
    public int write(ByteBuffer src);//往通道里写数据
    public int read(ByteBuffer dst);//从通道里读数据
    public final SelectionKey register(Selector sel,int ops,Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
    public final void close();//关闭通道
    }
  • 关于Buffer和Channel的注意事项和细节

    • ByteBuffer支持类型化的put和get,put放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有BufferUnderflowException异常。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      public class TestNIOByteBufferPutGet {
      public static void main(String[] args) {
      // 创建一个 Buffer
      ByteBuffer buffer = ByteBuffer.allocate(64);

      // 类型化方式放入数据
      buffer.putInt(100);
      buffer.putLong(9);
      buffer.putChar('尚');
      buffer.putShort((short) 4);

      // 转换成读模式
      buffer.flip();

      System.out.println();

      System.out.println(buffer.getInt());
      System.out.println(buffer.getLong());
      System.out.println(buffer.getChar());
      System.out.println(buffer.getShort());
      }
      }
    • 可以将一个普通Buffer转成只读Buffer。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      public class TestReadOnlyBuffer {
      public static void main(String[] args) {

      // 创建一个buffer
      ByteBuffer buffer = ByteBuffer.allocate(64);

      for (int i = 0; i < 64; i++) {
      buffer.put((byte) i);
      }

      // 读取
      buffer.flip();

      // 得到一个只读的 Buffer
      ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
      System.out.println(readOnlyBuffer.getClass());//class java.nio.HeapByteBufferR

      // 读取
      while (readOnlyBuffer.hasRemaining()) {
      System.out.println(readOnlyBuffer.get());
      }

      readOnlyBuffer.put((byte) 100); // ReadOnlyBufferException
      }
      }
    • NIO还提供了MappedByteBuffer,可以让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由NIO来完成。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      public class TestMappedByteBuffer {
      public static void main(String[] args) throws Exception {
      RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
      //获取对应的通道
      FileChannel channel = randomAccessFile.getChannel();

      /**
      * 参数 1:FileChannel.MapMode.READ_WRITE 使用的读写模式
      * 参数 2:0:可以直接修改的起始位置
      * 参数 3:5: 是映射到内存的大小(不是索引位置),即将 1.txt 的多少个字节映射到内存
      * 可以直接修改的范围就是 0-5
      * 实际类型 DirectByteBuffer
      */
      MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);

      mappedByteBuffer.put(0, (byte) 'H');
      mappedByteBuffer.put(3, (byte) '9');
      mappedByteBuffer.put(5, (byte) 'Y');//IndexOutOfBoundsException

      randomAccessFile.close();
      System.out.println("修改成功~~");
      }
      }

1.3 选择器(Selector)

  • 为同时处理多个socket的读写,传统的设计方式有多线程版本如下:

    • 但带来一些缺点:
      • 内存占用高。
      • 线程上下文切换成本高。
      • 只适合连接数少的场景。
  • 这时候出现了线程池版本

    • 但带来一些缺点:
      • 阻塞模式下,线程仅能处理一个socket连接。
      • 仅适合短连接场景。
    • Java的NIO,用非阻塞的IO方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)。

1.3.1 Selector简介

selector版设计

  • 单线程可以配合Selector完成对多个Channel可读写事件的监控,这称之为多路复用。

    • 多路复用仅针对网络IO、普通文件IO没法利用多路复用。
  • 如果不用Selector的非阻塞模式,线程大部分时间都在做无用功,而Selector能够保证:

    • 有可连接事件时才去连接。
    • 有可读事件才去读取。
    • 有可写事件才去写入。
      • 限于网络传输能力,Channel未必时时可写,一旦Channel可写,会触发Selector的可写事件。
  • selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上。适合连接数特别多,但流量低的场景(low traffic)。

    • 调用selector的select()会阻塞直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理,避免非阻塞模式下所做无用功,同时使线程能够被充分利用。
    • 只有在连接/通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。
    • 避免了多线程之间的上下文切换导致的开销。

1.3.2 使用Selector

  • Selector类是一个抽象类,其常用相关方法如下:

    1
    2
    3
    4
    5
    6
    7
    8
    public abstract class Selector implements Closeable {
    public static Selector open();//得到一个选择器对象
    public abstract int select() throws IOException;//阻塞直至channel中至少有一个事件发生
    public int select(long timeout);//监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,参数用来设置超时时间
    public Set<SelectionKey> selectedKeys();//从内部集合中得到所有的SelectionKey
    public abstract Selector wakeup();//唤醒 selector
    public abstract int selectNow() throws IOException;//不阻塞,立刻返回
    }
    • 创建。

      1
      Selector selector = Selector.open();
    • 绑定Channel事件。也称之为注册事件,绑定的事件selector才会关心。

      1
      2
      channel.configureBlocking(false);
      SelectionKey key = channel.register(selector, 绑定事件);
      • channel必须工作在非阻塞模式。
      • FileChannel没有非阻塞模式,因此不能配合selector一起使用
      • 绑定的事件类型可以有:
        • connect:客户端连接成功时触发,值为8。
        • accept:服务器端成功接受连接时触发,值为16。
        • read:数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况,值为1。
        • write:数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况,值为4。
    • 监听Channel事件。可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少channel发生了事件。

      • ①阻塞直到绑定事件发生。

        1
        int count = selector.select();
      • ②阻塞直到绑定事件发生,或是超时(时间单位为ms)。

        1
        int count = selector.select(long timeout);
      • ③不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件。

        1
        int count = selector.selectNow();
    • select何时不阻塞?

      • 事件发生时。
        • 客户端发起连接请求,会触发accept事件。
        • 客户端发送数据过来,客户端正常、异常关闭时,都会触发read事件,另外如果发送的数据大于buffer缓冲区,会触发多次读取事件。
        • channel可写,会触发write事件。
        • 在linux下nio bug发生时。
      • 调用selector.wakeup()。
      • 调用selector.close()。
      • selector所在线程interrupt。
  • NIO中的ServerSocketChannel功能类似ServerSocket、SocketChannel功能类似Socket。

  • Selector、Selectionkey、ServerScoketChannel和SocketChannel关系图如下:

    • ①当客户端连接时,会通过ServerSocketChannel得到SocketChannel。

    • ②将ServerSocketChannel注册到Selector(关心事件为OP_ACCEPT)。

      1
      2
      3
      4
      5
      6
      //SocketChannel类的父类SelectableChannel中
      public final SelectionKey register(Selector sel, int ops)
      throws ClosedChannelException
      {
      return register(sel, ops, null);
      }
    • ③Selector调用select方法进行监听,返回有事件发生的通道的个数。如果select方法返回的值>0,就获取到相关的selectionKey集合。

    • ④遍历selectionKey集合,如果有新的客户端连接(OP_ACCEPT),则给客户端生成一个SocketChannel(serverSocketChannel.accept())注册到Selector,一个Selector上可以注册多个SocketChannel。

      1
      2
      3
      //SocketChannel类的父类SelectableChannel中
      public abstract SelectionKey register(Selector sel, int ops, Object att)
      throws ClosedChannelException;
    • ⑤通过SelectionKey反向获取SocketChannel。

      1
      2
      //抽象类SelectionKey中
      public abstract SelectableChannel channel();
    • ⑥最后可以通过得到的channel,完成业务处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    @Slf4j
    public class Server {
    public static void main(String[] args) throws IOException {
    // 1、创建selector, 管理多个channel
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    // 2、建立selector和channel的联系(注册)
    // 即将channel注册到selector中
    // SelectionKey就是将来事件发生后,通过它可以知道是什么事件以及哪个channel发生的事件,ops=0表示不关注任何事件
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // sscKey只关注accept事件,即ops=16
    sscKey.interestOps(SelectionKey.OP_ACCEPT);
    log.debug("sscKey:{}", sscKey);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
    // 3、select方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
    // select在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理,否则下次该事件仍会触发,这是因为nio底层使用的是水平触发
    selector.select();
    // 4、处理事件, selectedKeys内部包含了所有发生的事件
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
    while (iter.hasNext()) {
    SelectionKey key = iter.next();
    // 处理key时,要从selectedKeys集合中删除,否则下次处理就会有问题
    iter.remove();
    log.debug("key: {}", key);
    // 5、区分事件类型
    if (key.isAcceptable()) { // 如果是accept
    // 这里也可以写成ServerSocketChannel channel = ssc.accept()
    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    // accept会将此事件标注为已处理
    SocketChannel sc = channel.accept();
    sc.configureBlocking(false);
    SelectionKey scKey = sc.register(selector, 0, null);
    // scKey只关注读事件
    scKey.interestOps(SelectionKey.OP_READ);
    log.debug("{}", sc);
    log.debug("scKey:{}", scKey);
    } else if (key.isReadable()) { // 如果是read
    try {
    SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
    ByteBuffer buffer = ByteBuffer.allocate(4);
    int read = channel.read(buffer); // 如果客户端调用sc.close()正常断开,read的方法的返回值是-1,注意正常断开时会产生一次读事件
    if (read == -1) {
    key.cancel();
    } else {
    buffer.flip();
    // debugAll(buffer);
    System.out.println(Charset.defaultCharset().decode(buffer));
    }
    } catch (IOException e) {
    e.printStackTrace();
    key.cancel(); // 因为客户端断开了,因此需要将key取消(从selector的keys集合中真正删除key),注意客户端异常断开时也会触发一次read事件(即关闭连接时会向服务器发送关闭连接报文)
    }
    }
    }
    }
    }
    }
    • 为何要iter.remove()?

      • 因为select在事件发生后,就会将相关的key放入selectedKeys集合,但不会在处理完后从selectedKeys集合中移除,需要我们自己编码删除。例如:
        • 第一次触发了ssckey上的accept事件,没有移除ssckey。
        • 第二次触发了sckey上的read事件,但这时selectedKeys中还有上次的ssckey,在处理时因为没有真正的serverSocket连上了,就会导致空指针异常。
    • cancel的作用?

      • cancel会取消注册在selector上的channel,并从keys集合中删除key后续不会再监听事件。

1.3.3 处理消息的边界

  • 不处理边界的问题。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class Server {
    public static void main(String[] args) throws IOException {
    ServerSocket ss=new ServerSocket(9000);
    while (true) {
    Socket s = ss.accept();
    InputStream in = s.getInputStream();
    // 这里这么写,有没有问题
    byte[] arr = new byte[4];
    while(true) {
    int read = in.read(arr);
    // 这里这么写,有没有问题
    if(read == -1) {
    break;
    }
    System.out.println(new String(arr, 0, read));
    }
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class Client {
    public static void main(String[] args) throws IOException {
    Socket max = new Socket("localhost", 9000);
    OutputStream out = max.getOutputStream();
    out.write("hello".getBytes());
    out.write("world".getBytes());
    out.write("你好".getBytes());
    max.close();
    }
    }

    输出有乱码:

    1
    2
    3
    4
    hell
    owor
    ld�
    �好
  • 处理消息的边界。

    • 当传输的数据和ByteBuffer的长度不一致时就会出现数据打印的乱码(即出现黏包或半包的情况),可能出现以下几种情况。

      1637589522975
    • 针对以上的情况,一般会有以下几种解决办法:

      • ①固定消息长度,服务器数据包(ByteBuffer)大小和客户端的一样,服务器按预定长度读取,缺点是浪费带宽。

      • ②按分隔符拆分,缺点是效率低。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        @Slf4j
        public class Server {
        private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
        // 找到一条完整消息
        if (source.get(i) == '\n') {
        int length = i + 1 - source.position();
        // 把这条完整消息存入新的 ByteBuffer
        ByteBuffer target = ByteBuffer.allocate(length);
        // 从source读,向target写
        for (int j = 0; j < length; j++) {
        target.put(source.get());
        }
        debugAll(target);
        }
        }
        source.compact();
        }

        public static void main(String[] args) throws IOException {
        // 1、创建selector, 管理多个channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        // 2、建立selector和channel的联系(注册)
        // SelectionKey就是将来事件发生后,通过它可以知道事件和哪个channel的事件,ops=0表示不关注任何事件
        SelectionKey sscKey = ssc.register(selector, 0, null);
        // sscKey只关注accept事件,即ops=16
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("sscKey:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));
        while (true) {
        // 3、select方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
        // select在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理,否则下次该事件仍会触发,这是因为nio底层使用的是水平触发
        selector.select();
        // 4、处理事件, selectedKeys内部包含了所有发生的事件
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
        while (iter.hasNext()) {
        SelectionKey key = iter.next();
        // 处理key时,要从selectedKeys集合中删除,否则下次处理就会有问题
        iter.remove();
        log.debug("key: {}", key);
        // 5、区分事件类型
        if (key.isAcceptable()) { // 如果是accept
        // 这里也可以写成ServerSocketChannel channel = ssc.accept()
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        SocketChannel sc = channel.accept();
        sc.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
        // 将一个byteBuffer作为附件关联到selectionKey上
        SelectionKey scKey = sc.register(selector, 0, buffer);
        // scKey只关注读事件
        scKey.interestOps(SelectionKey.OP_READ);
        log.debug("{}", sc);
        log.debug("scKey:{}", scKey);
        } else if (key.isReadable()) { // 如果是read
        try {
        SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
        // 获取selectionKey上关联的附件
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        int read = channel.read(buffer); // 如果客户端调用sc.close()正常断开,read的方法的返回值是-1,注意正常断开时会产生一次读事件
        if (read == -1) {
        key.cancel();
        } else {
        split(buffer);
        // 需要扩容
        if (buffer.position() == buffer.limit()) {
        ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
        // 切换成读模式
        buffer.flip();
        newBuffer.put(buffer); // 0123456789abcdef3333\n
        key.attach(newBuffer);
        }
        }
        } catch (IOException e) {
        e.printStackTrace();
        key.cancel(); // 因为客户端断开了,因此需要将key取消(从selector的keys集合中真正删除key),注意客户端异常断开时也会触发一次read事件
        }
        }
        }
        }
        }
        }
        1
        2
        3
        4
        5
        6
        7
        8
        public class Client {
        public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
        System.in.read();
        }
        }
      • ③TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则影响server吞吐量。

        • Http1.1是TLV格式。
        • Http2.0是LTV格式。
  • ByteBuffer大小分配。

    • 每个channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer。
    • ByteBuffer不能太大,比如一个ByteBuffer 1Mb的话,要支持百万连接就要1Tb内存,因此需要设计大小可变的ByteBuffer。
      • 一种思路是首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4k的buffer内容拷贝至8kbuffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。这是上方实现的方法。参考实现[http://tutorials.jenkov.com/java-performance/resizable-array.html](http://tutorials.jenkov.com/java-performance/resizable-array.html)
      • 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗。

1.3.4 处理write事件

  • 上方只是写了channel的accept和read事件,现在介绍write事件。

    • 非阻塞模式下,无法保证把buffer中所有数据都写入channel,因此需要追踪write方法的返回值(代表实际写入字节数)。
    • 用selector监听所有channel的可写事件,每个channel都需要一个key来跟踪buffer,但这样又会导致占用内存过多,就有两阶段策略。
      • 当消息处理器第一次写入消息时,才将channel注册到selector上。
      • selector检查channel上的可写事件,如果所有的数据写完了,就取消channel的注册。
      • 如果不取消,会每次可写均会触发write事件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public class WriteServer {
    public static void main(String[] args) throws IOException {
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    Selector selector = Selector.open();
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
    selector.select();
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
    SelectionKey key = iter.next();
    iter.remove();
    if (key.isAcceptable()) {
    SocketChannel sc = ssc.accept();
    sc.configureBlocking(false);
    SelectionKey sckey = sc.register(selector, 0, null);
    sckey.interestOps(SelectionKey.OP_READ);
    // 1、向客户端发送大量数据
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 5000000; i++) {
    sb.append("a");
    }
    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
    // 2、返回值代表实际写入的字节数
    int write = sc.write(buffer);
    System.out.println(write);
    // 3、判断是否有剩余内容
    if (buffer.hasRemaining()) {
    // 4、关注可写事件 1 4
    // 按位或等于十进制的加法操作
    sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
    // sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
    // 5、把未写完的数据挂到sckey上
    sckey.attach(buffer);
    }
    } else if (key.isWritable()) {
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    SocketChannel sc = (SocketChannel) key.channel();
    int write = sc.write(buffer);
    System.out.println(write);
    // 6. 清理操作
    if (!buffer.hasRemaining()) {
    key.attach(null); // 需要清除buffer
    key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);// 不需关注可写事件
    }
    }
    }
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class WriteClient {
    public static void main(String[] args) throws IOException {
    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("localhost", 8080));
    // 接收数据
    int count = 0;
    while (true) {
    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
    count += sc.read(buffer);
    System.out.println(count);
    buffer.clear();
    }
    }
    }
    • 为何要取消write事件?
      • 只要向channel发送数据时,socket缓冲可写,这个事件会频繁触发,因此应当只在socket缓冲区写不下时再关注可写事件,数据写完之后再取消关注。

1.3.5 利用多线程优化

  • 前面的代码只有一个选择器,没有充分利用多核cpu,于是应该改进两组选择器:

    • 单线程配一个选择器,专门处理accept事件。
    • 创建cpu核心数的线程,每个线程配一个选择器,轮流处理read事件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    @Slf4j
    public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
    Thread.currentThread().setName("boss");
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    Selector boss = Selector.open();
    SelectionKey bossKey = ssc.register(boss, 0, null);
    bossKey.interestOps(SelectionKey.OP_ACCEPT);
    ssc.bind(new InetSocketAddress(8080));
    // 1、创建固定数量的worker并初始化
    // Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理cpu个数,而不是容器申请时的个数
    // 这个问题直到jdk10才修复,使用jvm参数UseContainerSupport配置,默认开启
    Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
    for (int i = 0; i < workers.length; i++) {
    workers[i] = new Worker("worker-" + i);
    }
    AtomicInteger index = new AtomicInteger();
    while(true) {
    boss.select();
    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
    while (iter.hasNext()) {
    SelectionKey key = iter.next();
    iter.remove();
    if (key.isAcceptable()) {
    SocketChannel sc = ssc.accept();
    sc.configureBlocking(false);
    log.debug("connected...{}", sc.getRemoteAddress());
    // 2、socketChannel关联worker的selector
    log.debug("before register...{}", sc.getRemoteAddress());
    // round robin轮询
    // boss初始化workers的selector,启动worker
    workers[index.getAndIncrement() % workers.length].register(sc);
    log.debug("after register...{}", sc.getRemoteAddress());
    }
    }
    }
    }

    static class Worker implements Runnable{
    private Thread thread;
    private Selector selector;
    private String name;
    private volatile boolean start = false; // 还未初始化
    private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
    public Worker(String name) {
    this.name = name;
    }

    // 初始化线程,和 selector
    public void register(SocketChannel sc) throws IOException {
    if(!start) {
    selector = Selector.open();
    thread = new Thread(this, name);
    thread.start();
    start = true;
    }
    // 唤醒select方法,此方法类似park和unPark的原理
    selector.wakeup();
    // 当selector.select()方法和sc.register(selector)方法使用同一个selector时,如果selector.select()先执行会导致sc.register(selector)方法阻塞,因为它们在竞争同一个锁Set<SelectionKey> publicKeys。
    // 这里要保证sc.register(selector)方法比selector.select()方法先执行。
    sc.register(selector, SelectionKey.OP_READ, null);
    }

    @Override
    public void run() {
    while(true) {
    try {
    selector.select();
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
    SelectionKey key = iter.next();
    iter.remove();
    if (key.isReadable()) {
    ByteBuffer buffer = ByteBuffer.allocate(16);
    SocketChannel channel = (SocketChannel) key.channel();
    log.debug("read...{}", channel.getRemoteAddress());
    channel.read(buffer);
    buffer.flip();
    debugAll(buffer);
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    public class Client {
    public static void main(String[] args) throws IOException {
    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("localhost", 8080));
    sc.write(ByteBuffer.wrap("abcd".getBytes()));
    System.in.read();
    }
    }

1.4 NIO群聊案例

  • 编写一个NIO群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)

    • 服务器端:可以监测用户上线,离线,并实现消息转发功能。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      public class GroupChatServer {
      //定义属性
      private Selector selector;
      private ServerSocketChannel listenChannel;

      private static final int PORT = 6667;

      //构造器
      //初始化工作
      public GroupChatServer() {
      try {
      //得到选择器
      selector = Selector.open();
      //ServerSocketChannel
      listenChannel = ServerSocketChannel.open();
      //绑定端口
      listenChannel.socket().bind(new InetSocketAddress(PORT));
      //设置非阻塞模式
      listenChannel.configureBlocking(false);
      //将该 listenChannel 注册到 selector
      listenChannel.register(selector, SelectionKey.OP_ACCEPT);
      } catch (IOException e) {
      e.printStackTrace();
      }
      }

      public void listen() {
      try {
      //循环处理
      while (true) {
      int count = selector.select();
      if (count > 0) { //有事件处理
      // 遍历得到 selectionKey 集合
      Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
      while (iterator.hasNext()) {
      //取出 selectionKey
      SelectionKey key = iterator.next();
      //监听到 accept
      if (key.isAcceptable()) {
      SocketChannel sc = listenChannel.accept();
      sc.configureBlocking(false);
      //将该 sc 注册到 selector
      sc.register(selector, SelectionKey.OP_READ);
      System.out.println(sc.getRemoteAddress().toString().substring(1) + " 上线 ");
      }
      if (key.isReadable()) {//通道发送read事件,即通道是可读的状态
      readData(key);
      }
      //当前的 key 删除,防止重复处理
      iterator.remove();
      }
      }
      }
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      }
      }

      //读取客户端消息
      public void readData(SelectionKey key) {
      SocketChannel channel = null;
      try {
      //得到 channel
      channel = (SocketChannel) key.channel();
      //创建 buffer
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      int count = channel.read(buffer);
      //根据 count 的值做处理
      if (count > 0) {
      //把缓存区的数据转成字符串
      String msg = new String(buffer.array());
      //输出该消息
      System.out.println("from客户端--> " + msg);
      //向其它的客户端转发消息(去掉自己)
      sendInfoToOtherClients(msg, channel);
      }
      } catch (IOException e) {
      try {
      System.out.println(channel.getRemoteAddress() + "离线了..");
      //取消注册
      key.cancel();
      //关闭通道
      channel.close();
      } catch (IOException e2) {
      e2.printStackTrace();
      }
      }
      }

      //转发消息给其它客户(通道)
      private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
      //遍历所有注册到 selector 上的 SocketChannel,并排除 self
      for (SelectionKey key : selector.keys()) {
      //通过 key 取出对应的 SocketChannel
      Channel targetChannel = key.channel();
      //排除自己
      if (targetChannel instanceof SocketChannel && targetChannel != self) {
      //转型
      SocketChannel dest = (SocketChannel) targetChannel;
      //将 msg 存储到 buffer
      ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
      //将 buffer 的数据写入通道
      dest.write(buffer);
      }
      }
      }

      public static void main(String[] args) {
      //创建服务器对象
      GroupChatServer groupChatServer = new GroupChatServer();
      groupChatServer.listen();
      }
      }
    • 客户端:通过Channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(由服务器转发得到)。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      public class GroupChatClient {
      //定义相关的属性
      private final String HOST = "127.0.0.1";//服务器的ip
      private final int PORT = 6667;//服务器端口
      private Selector selector;
      private SocketChannel socketChannel;
      private String username;

      //构造器,完成初始化工作
      public GroupChatClient() throws IOException {
      selector = Selector.open();
      //连接服务器
      socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
      //设置非阻塞
      socketChannel.configureBlocking(false);
      //将 channel 注册到selector
      socketChannel.register(selector, SelectionKey.OP_READ);
      //得到 username
      username = socketChannel.getLocalAddress().toString().substring(1);
      System.out.println(username + " is ok...");
      }

      //向服务器发送消息
      public void sendInfo(String info) {
      info = username + " 说:" + info;
      try {
      socketChannel.write(ByteBuffer.wrap(info.getBytes()));
      } catch (IOException e) {
      e.printStackTrace();
      }
      }

      //读取从服务器端回复的消息
      public void readInfo() {
      try {
      int readChannels = selector.select();
      if (readChannels > 0) {//有可以用的通道
      Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
      while (iterator.hasNext()) {
      SelectionKey key = iterator.next();
      if (key.isReadable()) {
      //得到相关的通道
      SocketChannel sc = (SocketChannel) key.channel();
      //得到一个 Buffer
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      //读取
      sc.read(buffer);
      //把读到的缓冲区的数据转成字符串
      String msg = new String(buffer.array());
      System.out.println(msg.trim());
      }
      iterator.remove(); //删除当前的 selectionKey,防止重复操作
      }
      }
      } catch (Exception e) {
      e.printStackTrace();
      }
      }

      public static void main(String[] args) throws Exception {
      //启动客户端
      GroupChatClient chatClient = new GroupChatClient();
      //启动一个线程,每个 3 秒,读取从服务器发送的数据
      new Thread() {
      public void run() {
      while (true) {
      chatClient.readInfo();
      try {
      Thread.sleep(3000);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }
      }.start();

      //发送数据给服务器端
      Scanner scanner = new Scanner(System.in);
      while (scanner.hasNextLine()) {
      String s = scanner.nextLine();
      chatClient.sendInfo(s);
      }
      }
      }
  • 另一种写法(客户端不创建selector)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    public class ChatServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private Set<SocketChannel> clients = new HashSet<>();
    private static final int PORT = 6666;

    public ChatServer() throws IOException {
    selector = Selector.open();
    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("服务器启动成功");
    }

    public void start() {
    try {
    while (true) {
    int select = selector.select();
    if (select > 0) {
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    if (iterator.hasNext()) {
    SelectionKey next = iterator.next();
    if (next.isAcceptable()) {
    SocketChannel client = serverSocketChannel.accept();
    client.configureBlocking(false);
    client.register(selector, SelectionKey.OP_READ);
    clients.add(client);
    System.out.println(client.getRemoteAddress().toString().substring(1) + " 上线 ");
    }
    if (next.isReadable()) {
    readData(next);
    }
    iterator.remove();
    }
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    }
    }

    public void readData(SelectionKey key) {
    SocketChannel channel = null;
    try {
    channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int count = channel.read(buffer);
    if (count > 0) {
    String msg = new String(buffer.array());
    System.out.println("from客户端-->" + msg);
    sendInfoToOtherClients(msg, channel);
    }
    } catch (IOException e) {
    try {
    System.out.println(channel.getRemoteAddress() + "离线了..");
    clients.remove(channel);
    //取消注册
    key.cancel();
    //关闭通道
    channel.close();
    } catch (IOException e2) {
    e2.printStackTrace();
    }
    } finally {
    }
    }

    //转发消息给其它客户(通道)
    private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
    //遍历所有注册到 selector 上的 SocketChannel,并排除 self
    for (SelectionKey key : selector.keys()) {
    //通过 key 取出对应的 SocketChannel
    Channel targetChannel = key.channel();
    //排除自己
    if (targetChannel instanceof SocketChannel && targetChannel != self) {
    //转型
    SocketChannel dest = (SocketChannel) targetChannel;
    //将 msg 存储到 buffer
    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
    //将 buffer 的数据写入通道
    dest.write(buffer);
    }
    }
    }

    public static void main(String[] args) throws IOException {
    ChatServer chatServer = new ChatServer();
    chatServer.start();
    }
    }

    public class ChatClient {
    private final String HOST = "127.0.0.1";
    private final int PORT = 6666;
    private SocketChannel socketChannel;
    private String clientName;

    public ChatClient() throws IOException {
    socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
    socketChannel.configureBlocking(false);
    clientName = socketChannel.getLocalAddress().toString().substring(1);
    System.out.println(clientName + "连接服务器成功");
    }

    //向服务器发送消息
    public void sendInfo() {
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLine()) {
    String info = clientName + "说:" + scanner.nextLine();
    try {
    socketChannel.write(ByteBuffer.wrap(info.getBytes()));
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

    //读取从服务器端回复的消息
    public void readInfo() {
    try {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = socketChannel.read(buffer);
    if (read > 0) {
    String msg = new String(buffer.array());
    System.out.println(msg.trim());
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public static void main(String[] args) throws IOException {
    ChatClient chatClient = new ChatClient();
    new Thread(() -> {
    while (true) {
    chatClient.readInfo();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }).start();
    chatClient.sendInfo();
    }
    }

3、IO模型

  • 当相应的设备准备好数据后,会将数据复制到内核态。数据从相应的设备到内核态的处理方式分为:阻塞和非阻塞

    • 阻塞:用户请求会等待数据准备好从操作系统调用相应的设备返回到内核态,如果没有返回处于阻塞状态。
    • 非阻塞:是操作系统接收到一组文件描述符,然后操作系统批量处理这些个文件描述符,然后不管有没有准备好数据都立即返回,如果没有对应的准备好的文件描述符,则继续轮询获取准备好数据的文件描述符。
  • 数据从内核态到用户态的复制数据的处理方式的不同可分为:同步和异步

    • 同步:用户请求等待数据从内核态向用户态复制数据,在此期间不做其他的事情,称为同步。即线程自己去获取结果(一个线程)。
    • 异步:在数据从内核态向用户态复制的过程中,用户请求不会一直处于等待状态而是做其他事情,称为异步。即线程自己不去获取结果,而是由其它线程送结果(至少两个线程)
  • 当调用一次channel.read或stream.read后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

    • 等待数据阶段。准备数据,kernel就要等待足够的数据到来,这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。

    • 复制数据阶段。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

      1637762611081

3.1 同步阻塞IO

  • 在这个IO模型中,用户空间的应用程序执行一个系统调用(recvform),这会导致应用程序阻塞,什么也不干,直到数据准备好,并且将数据从内核复制到用户进程,最后进程再处理数据,在等待数据到处理数据的两个阶段,整个进程都被阻塞。调用应用程序处于一种不再消费CPU而只是简单等待响应的状态。blocking IO的特点就是在IO执行的两个阶段都被block了。

    1637763137539

3.2 同步非阻塞IO

  • 非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有,也同时进行了多次用户态和内核态的切换,所以性能不是特别高。

    1637763349692

3.3 IO多路复用(同步)

  • I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。它有两个特别的系统调用select、poll、epoll函数,能实现同时对多个IO端口进行监听,当用户进程调用了select,那么整个进程会被阻塞,而同时kernel会”监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

  • 实际中,对于每一个socket,一般都设置成为non-blocking,但是整个用户的process其实是一直被block的,只不过process是被select这个函数block,而不是被socket IO给block。所以IO多路复用是阻塞在select,epoll这样的系统调用之上,而没有阻塞在真正的I/O系统调用如recvfrom之上。

  • 多路复用I/O比非阻塞I/O模型的效率高是因为在非阻塞I/O中,不断地询问socket状态时通过用户线程去进行的,而在多路复用I/O中,轮询每个socket状态是内核在进行的,这个效率要比用户线程要高的多。

    1637764293519
  • 阻塞IO和多路复用的区别:

    • 阻塞IO:

      1637764493938
    • 多路复用:

      1637764518986

3.3 异步非阻塞IO(asynchronous IO)

  • 相对于同步IO,异步IO不是顺序执行。用户进程进行aio_read系统调用之后,无论内核数据是否准备好,都会直接返回给用户进程,然后用户态进程可以去做别的事情。等到socket数据准备好了,内核直接复制数据给进程,然后从内核向进程发送通知。IO两个阶段,进程都是非阻塞的。

  • AIO用来解决数据复制阶段的阻塞问题。

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置。

  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果。

  • 异步模型需要底层操作系统(Kernel)提供支持。

    • Windows系统通过IOCP实现了真正的异步IO。
    • Linux系统异步IO在2.6版本引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势。
    1637766096321
  • 文件AIO:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Slf4j
    public class AioFileChannel {
    public static void main(String[] args) throws IOException {
    try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {
    // 参数1 ByteBuffer
    // 参数2 读取的起始位置
    // 参数3 附件
    // 参数4 回调对象CompletionHandler
    ByteBuffer buffer = ByteBuffer.allocate(16);
    log.debug("read begin...");
    channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
    @Override // read成功
    public void completed(Integer result, ByteBuffer attachment) {
    log.debug("read completed...{}", result);
    attachment.flip();
    debugAll(attachment);
    }

    @Override // read失败
    public void failed(Throwable exc, ByteBuffer attachment) {
    exc.printStackTrace();
    }
    });
    log.debug("read end...");
    } catch (IOException e) {
    e.printStackTrace();
    }
    // 异步线程是守护线程,如果主线程提前结束,那么异步线程也会立马结束
    System.in.read();
    }
    }
  • 网络AIO:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    public class AioServer {
    public static void main(String[] args) throws IOException {
    AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
    ssc.bind(new InetSocketAddress(8080));
    ssc.accept(null, new AcceptHandler(ssc));
    System.in.read();
    }

    private static void closeChannel(AsynchronousSocketChannel sc) {
    try {
    System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
    sc.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    private final AsynchronousSocketChannel sc;

    public ReadHandler(AsynchronousSocketChannel sc) {
    this.sc = sc;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
    try {
    if (result == -1) {
    closeChannel(sc);
    return;
    }
    System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
    attachment.flip();
    System.out.println(Charset.defaultCharset().decode(attachment));
    attachment.clear();
    // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
    sc.read(attachment, attachment, this);
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
    closeChannel(sc);
    exc.printStackTrace();
    }
    }

    private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
    private final AsynchronousSocketChannel sc;

    private WriteHandler(AsynchronousSocketChannel sc) {
    this.sc = sc;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
    // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
    if (attachment.hasRemaining()) {
    sc.write(attachment);
    }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
    exc.printStackTrace();
    closeChannel(sc);
    }
    }

    private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
    private final AsynchronousServerSocketChannel ssc;

    public AcceptHandler(AsynchronousServerSocketChannel ssc) {
    this.ssc = ssc;
    }

    @Override
    public void completed(AsynchronousSocketChannel sc, Object attachment) {
    try {
    System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
    } catch (IOException e) {
    e.printStackTrace();
    }
    ByteBuffer buffer = ByteBuffer.allocate(16);
    // 读事件由 ReadHandler 处理
    sc.read(buffer, buffer, new ReadHandler(sc));
    // 写事件由 WriteHandler 处理
    sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
    // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
    ssc.accept(null, this);
    }

    @Override
    public void failed(Throwable exc, Object attachment) {
    exc.printStackTrace();
    }
    }
    }

3、零拷贝

  • “零拷贝”是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式(即用户空间和内核空间没有拷贝操作,后面讲的sendFile属于零拷贝)。

  • 零拷贝实际的实现并没有真正的标准,取决于操作系统如何实现这一点。零拷贝完全依赖于操作系统。操作系统支持,就有;不支持,就没有。不依赖Java本身。

  • 零拷贝是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有kernel buffer有一份数据)。

  • 零拷贝带来的好处:

    • 减少甚至完全避免不必要的CPU拷贝,从而让CPU解脱出来去执行其他的任务。
    • 减少内存带宽的占用。
    • 通常零拷贝技术还能够减少用户空间和操作系统内核空间之间的上下文切换。
  • Java传统IO和网络编程的一段代码:

    1
    2
    3
    4
    5
    6
    7
    8
    File file = new File("test.txt");
    RandomAccessFile raf = new RandomAccessFile(file, "rw");

    byte[] arr = new byte[(int) file.length()];
    raf.read(arr);

    Socket socket = new ServerSocket(8080).accept();
    socket.getOutputStream().write(arr);
    • 其中经历了4次数据拷贝 ,3次用户态与内核态的切换,效率不高,示意图如下:

      • user context: 用户态

      • kernel context: 内核态

      • User space: 用户空间

      • Kernel space: 内核空间

      • Syscall read: 系统调用读取

      • Syscall write: 系统调用写入

      • Hard drive: 硬件驱动

      • kernel buffer: 内核态缓冲区

      • user buffer: 用户态缓冲区

      • socket buffer: 套接字缓存

      • protocol engine: 协议引擎

      • DMA: Direct Memory Access: 直接内存拷贝(不使用CPU)

        • ①java 本身并不具备IO读写能力,因此使用read()系统调用后系统**由用户态转换为内核态(第一次上线文切换),磁盘中的数据有DMA(Direct Memory Access)的方式读取到内核缓冲区(kernel buffer)**。DMA过程中CPU不需要参与数据的读写,而是DMA处理器直接将硬盘数据通过总线传输到内存中。
        • ②系统由内核态转换为用户态(第二次上下文切换),当程序要读取的数据已经完成写入内核缓冲区以后,程序会将数据**由内核缓存区,写入用户缓存区(即byte[] buf)**,这个过程需要CPU参与数据的读写。
        • ③程序使用write()系统调用。系统**由用户态切换到内核态(第三次上下文切换),数据从用户态缓冲区写入到socket缓冲区(Socket Buffer)**,这个过程需要CPU参与数据的读写。
        • ④接下来要向网卡写数据,这项能力java又不具备,于是将调用操作系统的写能力,使用DMA的方式将socket缓冲区的数据写入网卡,不会使用cpu。
  • 零拷贝的优点有:

    • 更少的用户态与内核态的切换。
    • 不利用cpu计算,减少cpu缓存伪共享。
    • 零拷贝适合小文件传输。

1.1 mmap优化(NIO优化)

  • 通过DirectByteBuf。

    • ByteBuffer.allocate(10):HeapByteBuffer,使用的还是java内存。
    • ByteBuffer.allocateDirect(10):DirectByteBuffer使用的是操作系统内存。
  • 大部分步骤与优化前相同,不再赘述。唯有一点:java可以使用DirectByteBuf将堆外内存映射到jvm内存中来直接访问使用。

    • 这块内存不受jvm垃圾回收的影响,因此内存地址固定,有助于IO读写。
    • java中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分成两步:
      • DirectByteBuf对象被垃圾回收,将虚引用加入引用队列。
      • 通过专门线程访问引用队列,根据虚引用释放堆外内存。
    • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少。
  • mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这种方式的I/O原理就是将用户缓冲区(user buffer)的内存地址和内核缓冲区(kernel buffer)的内存地址做一个映射,也就是说系统在用户态可以直接读取并操作内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数。如下图:

    1637840267318
    • 可以看到这种内存映射的方式减少了一次数据拷贝,但用户态与内核态的切换次数没有减少,同时需要注意在进行这种内存映射的时候,有可能会出现并发线程操作同一块内存区域而导致的严重的数据不一致问题,所以需要进行合理的并发编程来解决这些问题。
      • ①mmap()系统调用首先会使用DMA的方式将磁盘数据读取到内核缓冲区,然后通过内存映射的方式,使用户缓冲区和内核读缓冲区的内存地址为同一内存地址,也就是说不需要CPU再将数据从内核读缓冲区复制到用户缓冲区。
      • ②当使用write()系统调用的时候,CPU将用户缓冲区的数据直接写入到网络发送缓冲区(socket buffer),然后通过DMA的方式将数据传入到网卡驱动程序中准备发送。

1.2 sendFile优化

  • Linux2.1版本提供了sendFile函数,java中对应着两个channel调用transferTo/transferFrom方法(只在FileChannel中有)拷贝数据,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到SocketBuffer,同时,由于和用户态完全无关,就减少了2次上下文切换。如下图:

    1637841016843
    • sendfile()系统调用,依旧3次数据拷贝,1次用户态和内核态的切换操作,相比较于内存映射的方式有了很大的进步,但问题是程序不能对数据进行修改,而只是单纯地进行了一次数据的传输过程。
      • ①java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu。
      • ②数据从内核缓冲区传输到socket缓冲区,cpu会参与拷贝。
      • ③最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu。
  • Linux在2.4版本中,做了一些修改,避免了从内核缓冲区拷贝到Socketbuffer的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝。如下图:

    • ①java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu。
    • ②只会将一些offset和length信息从内核缓冲区拷入socket缓冲区,几乎无消耗。
    • ③使用DMA将内核缓冲区的数据写入网卡,不会使用cpu。
      • 可以看到经过优化后基本变成2次数据拷贝,1次状态切换(忽略较小消耗的CPU拷贝)这是真正意义上的零拷贝,因为其间CPU已经不参与数据的拷贝过程,也就是说完全通过其他硬件和中断的方式来实现数据的读写过程,但是这样的过程需要硬件的支持才能实现。
      • 系统调用sendfile()发起后,磁盘数据通过DMA方式读取到内核缓冲区,内核缓冲区中的数据通过DMA聚合网络缓冲区,然后一齐发送到网卡中。