Java Io

Java IO

从Java的继承结构上看,IO流分为两大类

字节流,InputStream、OutputStream,从源代码注释中也可以看出,它们的最小单位一定是字节

Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255. If no byte is available because the end of the stream has been reached, the value -1 is returned. This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.
Writes the specified byte to this output stream. The general contract for write is that one byte is written to the output stream. The byte to be written is the eight low-order bits of the argument b. The 24 high-order bits of b are ignored.

字符流,Reader、Writer,再看一下源码注释

Reads a single character. This method will block until a character is available, an I/O error occurs, or the end of the stream is reached.
...
Returns:
The character read, as an integer in the range 0 to 65535 (0x00-0xffff), or -1 if the end of the stream has been reached
Writes a single character. The character to be written is contained in the 16 low-order bits of the given integer value; the 16 high-order bits are ignored.

这里要插一下,Unicode

Unicode的编码空间从U+0000到U+10FFFF,共有1,112,064个码位(code point)可用来映射字符。

可以理解为,UTF-16等编码,是Unicode的实现方式,Unicode是一种逻辑抽象。具体还可见 wikipedia

但是这里很奇怪,Reader、Writer抽象类规定了返回值、参数的范围为0~65535,是不是意味着某些Unicode大于65535的字符会"溢出"?

package me.liuweiqiang;

import java.io.*;

public class IODemo {

    public static void main(String[] args) throws Exception {
        PipedReader reader = new PipedReader();
        PipedWriter writer = new PipedWriter(reader);
        writer.write(0x10040);
        writer.flush();
        System.out.printf("%x", reader.read());
        System.out.println();
        reader.close();
        writer.close();
    }
}

输出为40。

IO类型

Linux IO模型

一个IO操作,会有两个阶段

对这两个阶段的不同,可以分为5中不同的模型

  1. 阻塞 阻塞 应用程序开始前调用内核,阻塞至二阶段结束后返回。
  2. 非阻塞 非阻塞 应用程序开始前调用内核,数据未准备好时不阻塞,准备好二阶段复制时阻塞。但应用程序为了获取到数据,需要不停地轮询。
  3. IO多路复用 IO多路复用 涉及到多个IO,一阶段阻塞,在多个IO中一个IO准备好时返回,然后二阶段阻塞式复制。
  4. 信号驱动IO 信号驱动IO 与非阻塞IO比较像,只不过一阶段不是轮询而是通知。内核发送SIGIO通知信号处理程序进行处理,信号处理程序与"主线程"共享进程空间。 信号处理程序可以在自己的"线程"上从内核复制数据,这样的话就和异步IO一样了,但因为并发安全的问题一般不建议这么做。
  5. 异步IO 异步IO

对比

以上Java IO章节都是阻塞的,而且是面向流的。

阻塞的,就是说,为了获取一个单位的数据,应用程序自始至终都只调用一个函数,只调用一次。

面向流的,意思是,一次IO只获取一个基本单位的数据,比如一个字节、一个字符。然而,有一点比较疑惑的是,InputStream不是有个read(byte b[], int off, int len)?

翻一下这个方法的源码

    public int read(byte b[], int off, int len) throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        int c = read();
        if (c == -1) {
            return -1;
        }
        b[off] = (byte)c;

        int i = 1;
        try {
            for (; i < len ; i++) {
                c = read();
                if (c == -1) {
                    break;
                }
                b[off + i] = (byte)c;
            }
        } catch (IOException ee) {
        }
        return i;
    }

可见,这个方法也是通过read()一个字节一个字节地把数据复制到用户态数据结构(变量c),然后又复制到到应用buffer(byte b[])。

Java NIO

Java的NIO包,N是New的意思,而不仅仅是非阻塞IO。事实上,NIO包下的IO类(Channel),甚至可以是阻塞的,与Linux中的IO模型对应关系不太容易被弄清。

以下是文件的Channel

    public static void main(String[] args) throws Exception {
        FileOutputStream fileOutputStream = new FileOutputStream("../../test");
        FileChannel fileChannel = fileOutputStream.getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(16);
        buffer.put("test".getBytes(StandardCharsets.UTF_16));
        buffer.flip();
        fileChannel.write(buffer);
        fileOutputStream.close();
    }

这里的用户态数据结构是ByteBuffer buffer。

对于异步IO模型,Java NIO中定义了AsynchronousChannel接口。以下依旧是文件的Channel

    public static void main(String[] args) throws Exception {
        AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel
                .open(Paths.get("../../test"), StandardOpenOption.READ);
        ByteBuffer buffer = ByteBuffer.allocate(16);
        asynchronousFileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                System.out.println("result: " + result);

                attachment.flip();
                byte[] data = new byte[attachment.limit()];
                attachment.get(data);
                System.out.println(new String(data, StandardCharsets.UTF_16));
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {

            }
        });
        System.in.read();
        asynchronousFileChannel.close();
    }

这里的用户态数据是ByteBuffer buffer。

对于多路复用:

    public static void main(String[] args) throws Exception {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(9999));

        int count = 3;
        while (count > 0) {
            int finalCount = count;
            new Thread(() -> {
                try {
                    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(9999));
                    ByteBuffer buffer = ByteBuffer.allocate(12);
                    buffer.put(("test" + finalCount).getBytes(StandardCharsets.UTF_16));
                    buffer.flip(); // 下面socketChannel.write(buffer)时需要读取buffer,这里要用flip切换为读取模式
                    if (finalCount == 3) {
                        TimeUnit.SECONDS.sleep(3);
                    }
                    socketChannel.write(buffer);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
            count--;
        }

        count = 3;
        while (count > 0) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println(socketChannel.getLocalAddress() + " " + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            count--;
        }
        TimeUnit.SECONDS.sleep(1);
        read(selector);
        TimeUnit.SECONDS.sleep(2);
        read(selector);
        serverSocketChannel.close();
    }

    private static void read(Selector selector) throws Exception {
        System.out.println(selector.select(2000));
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        selectionKeys.forEach(selectionKey -> {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(12);
                ((SocketChannel) selectionKey.channel()).read(buffer);
                buffer.flip();
                byte[] data = new byte[buffer.limit()];
                buffer.get(data);
                System.out.println(new String(data, StandardCharsets.UTF_16));
                selectionKey.channel().close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

PS:从读取再次切换到写入模式时使用compact():JAVA BYTEBUFFER EXAMPLE: HOW TO USE FLIP() AND COMPACT()

PPS:后缀带R的ByteBuffer是只读的,如HeapByteBufferR。

java.nio定义了SelectableChannel和Selector(AbstractSelector)。 SelectableChannel声明了register方法:

    public abstract SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException;

而在实现中(AbstractSelectableChannel)又调用了AbstractSelector的register方法。

在SelectableChannel源码的注释中,可以看出Channel可以是阻塞又可以是非阻塞的:

A selectable channel is either in blocking mode or in
non-blocking mode.

The blocking mode of a selectable channel may
be determined by invoking its isBlocking method.

如果AbstractSelectableChannel对象工作在阻塞模式的话,又会在运行时抛出IllegalBlockingModeException。 关于IO多路复用搭配非阻塞IO的讨论,可以见:为什么 IO 多路复用要搭配非阻塞 IO?

IO多路复用API对比:

- select poll epoll kqueue
平台 POSIX POSIX Linux Unix
FD限制 FD_SETSIZE
时间复杂度 O(n)(n为监听FD数) O(n) O(k)(k为事件数,k小于n) O(k)
数据复制 -

核心API:

       int select(int nfds, fd_set *restrict readfds,
                  fd_set *restrict writefds, fd_set *restrict exceptfds,
                  struct timeval *restrict timeout);

       int poll(struct pollfd *fds, nfds_t nfds, int timeout);

       int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

Windows版JVM尚未实现高效的多路复用。

NIO与IO相比,除去以上,还支持一个Channel对象的双向访问,面向块等。

还存在不是上面模型的Channel,所谓的零拷贝如内存映射、sendfile等。这些零拷贝技术,大多都省去了二阶段的拷贝操作(涉及到用户态数据的拷贝)。

内存映射:

    public static void main(String[] args) throws Exception {
        FileChannel fileChannel = FileChannel.open(Paths.get("../../test"));
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
        byte[] data = new byte[mappedByteBuffer.limit()];
        mappedByteBuffer.get(data);
        System.out.println(new String(data, StandardCharsets.UTF_16));
        fileChannel.close();
    }

由于MappedByteBuffer的特殊性,释放需要按

    public static void clean(final MappedByteBuffer buffer) {
        AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
            try {
                Method getCleanerMethod = buffer.getClass().getMethod("cleaner");
                getCleanerMethod.setAccessible(true);
                Cleaner cleaner = (Cleaner) getCleanerMethod.invoke(buffer);
                cleaner.clean();
            } catch(Exception e) {
                e.printStackTrace();
            }
            return null;
        });
    }

sendfile:

    public static void main(String[] args) throws Exception {
        FileChannel fileChannel = FileChannel.open(Paths.get("../../test"));
        FileOutputStream fileOutputStream = new FileOutputStream("../../testCopy");
        FileChannel toChannel = fileOutputStream.getChannel();
        fileChannel.transferTo(0, fileChannel.size(), toChannel);
        fileOutputStream.close();
        fileChannel.close();
    }

这里直接省去了用户态数据的拷贝,就完成了输出。

sendfile(SG-DMA)

上下文切换

sendfile比mmap+write少了内核空间到内核空间的复制,同时少了两次用户态内核台上下文切换的开销。

mmap+write及sendfile(no SG-DMA)

ByteBuffer

上面用到了MappedByteBuffer,它是通过FileChannel.map()得到的。

还有另外一种高效实现IO的对象——DirectByteBuffer,它是通过ByteBuffer.allocateDirect()得到的。

理论上来说,它们是两种关注点在不同维度的对象,MappedByteBuffer是指内存映射的ByteBuffer,DirectByteBuffer是指非堆的ByteBuffer。 实现上却是DirectByteBuffer继承了MappedByteBuffer。 在MappedByteBuffer的源码注释中:

This is a little bit backwards: By rights MappedByteBuffer should be a subclass of DirectByteBuffer, but to keep the spec clear and simple, and for optimization purposes, it's easier to do it the other way around. This works because DirectByteBuffer is a package-private class.

同时,DirectByteBuffer实现了DirectBuffer接口,但allocateDirect返回的却是ByteBuffer,这是因为DirectBuffer完整的类名是sun.nio.ch.DirectBuffer。 对于sun包:Why Developers Should Not Write Programs That Call 'sun' Packages

所以ByteBuffer提供了运行时判断Buffer是否Direct的方法:ByteBuffer.isDirect()。

epoll

https://juejin.cn/post/6844904006301843464

int epoll_create(int size);

在内核中创建epoll实例并返回一个epoll文件描述符。 在最初的实现中,调用者通过size参数告知内核需要监听的文件描述符数量。 而现在 size 已经没有这种语义了,但是调用者调用时size依然必须大于0,以保证后向兼容性。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

向epfd对应的内核epoll实例添加、修改或删除对fd上事件event的监听。 epoll实例内部使用红黑树保存fd(The interest list),实现对fd的高效搜索(避免重复)、增加、删除。

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

           typedef union epoll_data {
               void    *ptr;
               int      fd;
               uint32_t u32;
               uint64_t u64;
           } epoll_data_t;

           struct epoll_event {
               uint32_t     events;    /* Epoll events */
               epoll_data_t data;      /* User data variable */
           };

当timeout为0时,epoll_wait永远会立即返回。 而timeout为-1时,epoll_wait会一直阻塞直到任一已注册的事件变为就绪。 当timeout为一正整数时,epoll会阻塞直到计时timeout毫秒终了或已注册的事件变为就绪。

当已注册事件变为就绪时,中断程序向epoll实例中的就绪队列(The ready list,双向链表)添加就绪事件。 中断程序一方面修改就绪队列,另一方面唤醒epoll实例对应的进程。 用户通过epoll_wait返回值和events进行事件的处理。

https://cdn.xiaolincoding.com/gh/xiaolincoder/ImageHost4@main/%E6%93%8D%E4%BD%9C%E7%B3%BB%E7%BB%9F/%E5%A4%9A%E8%B7%AF%E5%A4%8D%E7%94%A8/epoll.png

Reactor & Proactor

Reactor模式见Doug Lea大师的《 Scalable IO in Java 》,里面有源码说明。

目前没有Proactor相关的Java实现。