Java IO编制程序全解(四)——NIO编制程序

发布时间:2019-08-10  栏目:Python  评论:0 Comments

Java IO编程全解(四)——NIO编制程序,ionio

  转发请注脚出处:http://www.cnblogs.com/Joanna-Yan/p/7793964.html 

  前面讲到:Java IO编程全解(三)——伪异步IO编制程序

  NIO,即New
I/O,那是法定叫法,因为它相对于事先的I/O类库是新增添的。可是,由于从前老的I/O类库是阻塞I/O,New
I/O类库的靶子正是要让Java扶助非阻塞I/O,所以,越来越多的人欣赏称之为非阻塞I/O(Non-block
I/O),由于非阻塞I/O更能够显示NIO的特色,所以那边运用的NIO都是指非阻塞I/O。

  与Socket类和ServerSocket类绝对应,NIO也提供了SocketChannel和ServerSocketChannel二种不一样的套接字通道达成。那二种新增加的大路都援助阻塞和非阻塞两种形式。阻塞方式应用特别轻便,可是质量和可信赖性都不佳,非阻塞则刚好相反。开采人士一般能够依据本身的急需来选拔适当的情势,一般的话,低负载、低产出的应用程序能够选用同步阻塞I/O以减低编制程序复杂度,可是对于高负载、高并发的网络利用,须求选拔NIO的非阻塞方式举行支付。

  转载请表明出处:http://www.cnblogs.com/Joanna-Yan/p/7793964.html 

1.NIO类库简单介绍

  新的输入/输出(NIO)库是在JDK1.4中引入的。NIO弥补了原本一块阻塞I/O的不足,它在标准Java代码中提供了连忙的、面向块的I/O。通过定义包蕴数据的类,以及通过以块的样式管理那几个多少,NIO不选取本机代码就能够动用低档优化,那是原先的I/O包所不能够达成的。上面临NIO的有个别概念和功力做下简介,以便大家能够高效地打听NIO类库和有关概念。

  1.缓冲区Buffer

  Buffer是三个对象,它含有部分要写入可能要读出的多寡。在NIO类库中投入Buffer对象,呈现了新库与原I/O的二个主要差别。在面向流的I/O中,可以将数据直接写入恐怕将数据间接读到Stream对象中。

  在NIO库中,全部数据都以用缓冲区管理的。在读取数据时,它是一向读到缓冲区中的;在写入数据时,写入到缓冲区中。任何时候访谈NIO中的数据,都以通过缓冲区进行操作。

  缓冲区实质上是一个数组。日常它是多少个字节数组(ByteBuffer),也足以选择其余门类的数组。可是缓冲区不仅是多个数组,缓冲区提供了对数码的结构化访谈以及吝惜读写地方(limit)等音讯。

  最常用的缓冲区是ByteBuffer,多少个ByteBuffer提供了一组作用用于操作byte数组。除了ByteBuffer,还会有任何的部分缓冲区,事实上,每一项Java基本项目(除了Boolean类型)都对应当一种缓冲区,具体如下:

  • ByteBuffer:字节缓冲区
  • CharBuffer:字符缓冲区
  • ShortBuffer:短整型缓冲区
  • IntBuffer:整型缓冲区
  • LongBuffer:长整型缓冲区
  • FloatBuffer:浮点型缓冲区
  • DoubleBuffer:双精度浮点型缓冲区

   每二个Buffer类都以Buffer接口的二个子实例。除了ByteBuffer,每三个Buffer类都有完全平等的操作,只是它们所拍卖的数据类型不平等。因为大多数标准I/O操作都以应用ByteBuffer,所以它除了具备相似缓冲区的操作之外还提供部分故意的操作,方便互连网读写。

  2.通道Channel

  Channel是三个通路,能够由此它读取和写入数据,它就好像自来水管同样,互连网数据经过Channel读取和写入。通道与流的差别之处在于通道是双向的,流只是在一个侧向上运动(多少个流必须是InputStream或然OutputStream的子类),而且通道能够用来读、写大概同不平日候读写。因为Channel是全双工的,所以它能够比流更加好地照耀底层操作系统的API。

  3.多路复用器Selector

  多路复用器Selector是Java
NIO编制程序的根底,熟谙地领会Selector对于驾驭NIO编制程序至关心重视要。多路复用器提供选取已经就绪的天职的力量。简来讲之,Selector会不断地轮询注册在其上的Channel,假使有些Channel上边有新的TCP连接接入、读和写事件,这么些Channel就处在就绪状态,会被Selector轮询出来,然后通过SelectionKey能够收获就绪Channel的联谊,进行持续的I/O操作。

  二个多路复用器Selector能够同时轮询两个Channel,由于JDK使用了epoll()代替古板的select实现,所以它并不曾最地拉那接句柄1024/2048的范围。那也就表示只需求二个线程担负Selector的轮询,就可以接入点不清的客户端,那的确是个十二分巨大的升华。

  后面讲到:Java
IO编制程序全解(三)——伪异步IO编制程序

2.NIO服务端连串图

  NIO服务端通讯连串图如下图所示:

ServerSocketChannel acceptorSvr=ServerSocketChannel.open();

  步骤二:绑定监听端口,设置连接为非阻塞情势。

acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
acceptorSvr.configureBlocking(false);

  步骤三:创制Reactor线程,创立多路复用器并运转线程。

Selector selector=Selector.open();
New Thread(new ReactorTask()).start();

  步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件。

SelectionKey key=acceptorSvr.register(selector,SelectionKey.OP_ACCEPT,ioHandler);

  步骤五:多路复用器在线程run方法的无线循环体内轮询准备妥善的Key。

int num=selector.select();
Set selectedKeys=selector.selectedKeys();
Iterator it=selectedKeys.iterator();
while(it.hasNext()){
  SelectionKey key=(SelectionKey )it.next();
  //...deal with I/O event...
}

  步骤六:多路复用器监听到有新的客户端连接,管理新的连结央浼,完结TCP一遍握手,创建物理链路。

SocketChannel channel=svrChannel.accpet();

  步骤七:设置客户端链路为非阻塞情势。

channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
......

  步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器,监听读操作,用来读取客户端发送的网络消息。

SelectionKey key=socketChannel.register(selector,SelectionKey.OP_READ,ioHandler);

  步骤九:异步读取客户端诉求音信到缓冲区。

int readNumber=channel.read(receivedBuffer);

  步骤十:对ByteBuffer举办编解码,假使有半包音讯指针reset,继续读取后续的报文,将解码成功的音信封装成Task,投递到业务线程池中,实行专门的学业逻辑编排。

Object message=null;
while(buffer.hasRemain()){
  byteBuffer.mark();
  Object message=decode(byteBuffer);
  if(message==null){
    byteBuffer.reset();
    break;
  }
  messageList.add(message);
}
if(!byteBuffer.hasRemain()){
  byteBuffer.clear();
}else{
  byteBuffer.compact();
}
if(messageList!=null& !messageList.isEmpty()){
  for(Object messageE: messageList){
    handlerTask(messageE);
  }
}

  步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将新闻异步发送给客户端。

socketChannel.write(buffer);

  注意:若是发送区TCP缓冲区满,会促成写半包,此时,须求登记监听写操作位,循环写,直到整包音信写入TCP缓冲区。

  当大家询问创设NIO服务端的大旨步骤之后,上面大家将前方的光阴服务器程序通过NIO重写叁遍,让我们能够学习到全部版的NIO服务端创制。

  NIO,即New
I/O,那是官方叫法,因为它相对于事先的I/O类库是新添的。但是,由于以前老的I/O类库是阻塞I/O,New
I/O类库的靶子正是要让Java支持非阻塞I/O,所以,越来越多的人兴奋称之为非阻塞I/O(Non-block
I/O),由于非阻塞I/O更能够浮现NIO的特点,所以那边运用的NIO都以指非阻塞I/O。

3.NIO创办的TimeServer源码深入分析

package joanna.yan.nio;

public class TimeServer {

    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 采用默认值
            }
        }

        MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

package joanna.yan.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
/**
 * 多路复用类
 * 它是一个独立的线程,负责轮询多路复器Selector,可以处理多个客户端的并发接入。
 * @author Joanna.Yan
 * @date 2017年11月6日下午3:51:41
 */
public class MultiplexerTimeServer implements Runnable{

        private Selector selector;//多路复用器
        private ServerSocketChannel servChannel;
        private volatile boolean stop;

        /**
         * 初始化多路复用器、绑定监听端口
         * @param port
         */
        public MultiplexerTimeServer(int port){
            try {
                selector=Selector.open();
                servChannel=ServerSocketChannel.open();
                servChannel.configureBlocking(false);
                servChannel.socket().bind(new InetSocketAddress(port), 1024);
                servChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("The time server is start in port: "+port);

            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    public void stop(){
            this.stop=true;
      }

    @Override
       public void run() {
            while(!stop){
                try {
                    //设置selector的休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次。
                    selector.select(1000);
                    //当有处于就绪状态的Channel时,selector就返回就绪状态的Channel的SelectionKey集合。
                    Set<SelectionKey> selectedKeys=selector.selectedKeys();
                    Iterator<SelectionKey> it=selectedKeys.iterator();
                    SelectionKey key=null;
                    //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作。
                    while(it.hasNext()){
                        key=it.next();
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if(key!=null){
                                key.cancel();
                                if(key.channel()!=null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
       /*
             * 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源。
             */
            if(selector!=null){
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                //处理新接入的请求消息
                //通过SelectionKey的操作位进行判断即可获知网络事件类型
                if(key.isAcceptable()){
                    //Accept the new connection
                    ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
                    SocketChannel sc=ssc.accept();
                    //-----以上操作相当于完成了TCP的三次握手,TCP物理链路正式建立------

                    //将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等。
                    sc.configureBlocking(false);
                    //Add the new connection to the selector
                    sc.register(selector, SelectionKey.OP_READ);
                }
         if(key.isReadable()){
                    //Read the data
                    SocketChannel sc=(SocketChannel) key.channel();
                    //由于实现我们得知客户端发送的码流大小,作为例程,我们开辟一个1K的缓冲区
                    ByteBuffer readBuffer=ByteBuffer.allocate(1024);
                    //由于已经设置SocketChannel为异步非阻塞模式,因此它的read是非阻塞的。
                    int readBytes=sc.read(readBuffer);
                    /*
                     * readBytes>0  读到了字节,对字节进行编解码;
                     * readBytes=0  没有读取到字节,属于正常场景,忽略;
                     * readByte=-1 链路已经关闭,需要关闭SocketChannel,释放资源
                     */
            if(readBytes>0){
                        //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。
                        readBuffer.flip();
                        //根据缓冲区可读的字节个数创建字节数组
                        byte[] bytes=new byte[readBuffer.remaining()];
                        //调用ByteBuffer的get操作将缓冲区可读的字节数组复制到新创建爱你的字节数组中
                        readBuffer.get(bytes);
                        String body=new String(bytes, "UTF-8");
                        System.out.println("The time server receive order: "+body);
                        //如果请求指令是"QUERY TIME ORDER"则把服务器的当前时间编码后返回给客户端
                        String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(
                                System.currentTimeMillis()).toString() : "BAD ORDER";

                        doWrite(sc,currentTime);
                    }else if(readBytes<0){

                //对端链路关闭
                        key.cancel();
                        sc.close();
                    }else{
                        //读到0字节,忽略
                    }
                }
            }
        }

    private void doWrite(SocketChannel channel,String response) throws IOException{
            if(response!=null&& response.trim().length()>0){
                byte[] bytes=response.getBytes();
                ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
                //调用ByteBuffer的put操作将字节数组复制到缓冲区
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer);

                /*
                 * 需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次性能够把需要发送的字节数组发送完,
                 * 此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector,将没有发送完毕的ByteBuffer发送完毕,
                 * 可以通过ByteBuffer的hasRemaining()方法判断消息是否发送完成。
                 * 此处仅仅是各简单的入门级例程,没有演示如何处理“写半包”场景,后面会说到。
                 */
            }
        }
}

  与Socket类和ServerSocket类相呼应,NIO也提供了SocketChannel和ServerSocketChannel两种不相同的套接字通道落成。那三种新扩充的大路都匡助阻塞和非阻塞两种情势。阻塞方式选择非常轻松,不过品质和可信性都不佳,非阻塞则刚好相反。开采人士一般能够依赖本身的内需来挑选适宜的情势,一般的话,低负载、低产出的应用程序可以挑选同步阻塞I/O以减少编制程序复杂度,但是对于高负载、高并发的互连网选用,供给选拔NIO的非阻塞方式举办支付。

4.NIO客户端种类图

  NIO客户端创制体系图如图所示。

SocketChannel clientChannel=SocketChannel.open();

  步骤二:设置SocketChannel为非阻塞方式,同一时候设置客户端连接的TCP参数。

clientChannel.configureBlocking(false);
socket.setReuseAddress(true);
socket.setReceiveBufferSize(BUFFER_SIZE);
socket.setSendBufferSize(BUFFER_SIZE);

  步骤三:异步连接服务端。

boolean connected=clientChannel.connect(new InetSocketAddress("ip",port));

  步骤四:判别是或不是连接成功,要是延续成功,则直接登记读状态位到多路复用器中,假设当前并没有连接成功(异步连接,重临false,表明客户端已经发送sync包,服务端未有回来ack包,物理链路还尚无创造)。

if(connected){
  clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
}else{
  clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);
}

  步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP
ACK应答。

clientChannel.register(selector,SelectionKay.OP_CONNECT,ioHandler);

  步骤六:创造Reactor线程,成立多路复用器并运行线程。

Selector selector=Selector.open();
new Thread(new ReactorTask()).start();

  步骤七:多路复用器在线程run方法的非常循环体内轮询计划妥帖的key。

int num=selector.select();
Set selectedKeys=selector.selectedKeys();
Iterator it=selectedKeys.iterator();
while(it.hasNext()){
  SelectionKey key=(SelectionKey)it.next();
  //...deal with I/O event...
}

  步骤八:接收connect事件进展管理。

if(key.isConnectable()){
  //handlerConnect();
}

  步骤九:剖断连接结果,借使连接成功,注册读事件到多路复用器。

if(channel.finishConnect()){
  registerRead();
}

  步骤十:注册读事件到多路复用器。

clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);

  步骤十一:异步读客户端伏乞新闻到缓冲区。

int readNumber=channel.read(receivedBuffer);

  步骤十二:对ByteBuffer实行编解码,倘诺有半包音讯接收缓冲区Reset,继续读取后续的报文,将解码成功的音讯封装成Task,投递到业务线程池中,举办作业逻辑编排。

Object message=null;

while(buffer.hasRemain()){
  byteBuffer.mark();
  Object message=decode(byteBuffer);
  if(message==null){
    byteBuffer.reset();
    break;
  }
  messageList.add(message);
}

if(!byteBuffer.hasRemain()){
  byteBuffer.clear();
}else{
  byteBuffer.compact();
}

if(messageList!=null & !messageList.isEmpty()){
  for(Object messageE:messageList){
    handlerTask(messageE);
  }
}

  步骤十三:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将音讯异步发送给客户端。

socketChannel.wirte(buffer);

1.NIO类库简要介绍

  新的输入/输出(NIO)库是在JDK1.4中引进的。NIO弥补了原来一块阻塞I/O的难认为继,它在规范Java代码中提供了便捷的、面向块的I/O。通过定义富含数据的类,以及因而以块的花样管理那几个数量,NIO不接纳本机代码就可以动用低等优化,那是原先的I/O包所不可能做到的。上面前蒙受NIO的部分定义和效能做下简介,以便大家能够飞快地询问NIO类库和连锁概念。

  1.缓冲区Buffer

  Buffer是二个目的,它富含部分要写入恐怕要读出的数额。在NIO类库中加入Buffer对象,体现了新库与原I/O的叁个至关心体贴要分化。在面向流的I/O中,能够将数据直接写入或许将数据直接读到Stream对象中。

  在NIO库中,全数数据都以用缓冲区管理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中。任何时候访谈NIO中的数据,都是通过缓冲区实行操作。

  缓冲区实质上是二个数组。常常它是三个字节数组(ByteBuffer),也足以选用任何类别的数组。然而缓冲区不止是二个数组,缓冲区提供了对数码的结构化访谈以及维护读写地点(limit)等音信。

  最常用的缓冲区是ByteBuffer,五个ByteBuffer提供了一组效能用于操作byte数组。除了ByteBuffer,还有任何的局地缓冲区,事实上,每一项Java基本类型(除了Boolean类型)都对应当一种缓冲区,具体如下:

  • ByteBuffer:字节缓冲区
  • CharBuffer:字符缓冲区
  • ShortBuffer:短整型缓冲区
  • IntBuffer:整型缓冲区
  • LongBuffer:长整型缓冲区
  • FloatBuffer:浮点型缓冲区
  • DoubleBuffer:双精度浮点型缓冲区

   每一个Buffer类都是Buffer接口的四个子实例。除了ByteBuffer,每叁个Buffer类都有一起一致的操作,只是它们所拍卖的数据类型不相同样。因为大多标准I/O操作都以采取ByteBuffer,所以它除了富有相似缓冲区的操作之外还提供一些特有的操作,方便互连网读写。

  2.通道Channel

  Channel是贰个大路,能够经过它读取和写入数据,它就好像自来水管一样,互联网数据通过Channel读取和写入。通道与流的差别之处在于通道是双向的,流只是在三个偏侧上移动(二个流必须是InputStream或然OutputStream的子类),并且通道能够用于读、写或然同期读写。因为Channel是全双工的,所以它能够比流越来越好地照耀底层操作系统的API。

  3.多路复用器Selector

  多路复用器Selector是Java
NIO编制程序的基础,纯熟地通晓Selector对于精通NIO编制程序至关心注重要。多路复用器提供选拔已经就绪的天职的力量。简单的说,Selector会不断地轮询注册在其上的Channel,即使某些Channel上边有新的TCP连接接入、读和写事件,那么些Channel就处在就绪状态,会被Selector轮询出来,然后经过SelectionKey能够拿走就绪Channel的群集,举行一连的I/O操作。

  多少个多路复用器Selector能够而且轮询三个Channel,由于JDK使用了epoll()代替古板的select完成,所以它并从未最亚松森接句柄1024/2048的限制。那也就象征只须求二个线程负担Selector的轮询,就能够接入数不完的客户端,那真的是个拾分了不起的进步。

5.NIO创设的TimeClient源码分析

package joanna.yan.nio;

public class TimeClient {
    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 采用默认值
            }
        }

        new Thread(new TimeClientHandle("127.0.0.1", port),"TimClient-001").start();
    }
}

package joanna.yan.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * 处理异步连接和读写操作
 * @author Joanna.Yan
 * @date 2017年11月6日下午4:33:14
 */
public class TimeClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    /**
     * 初始化NIO的多路复用器和SocketChannel对象
     * @param host
     * @param port
     */
        public TimeClientHandle(String host,int port){
        this.host=host==null ? "127.0.0.1" : host;
        this.port=port;
        try {
            selector=Selector.open();
            socketChannel=SocketChannel.open();
            //设置为异步非阻塞模式,同时还可以设置SocketChannel的TCP参数。例如接收和发送的TCP缓冲区大小
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while(!stop){
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys=selector.selectedKeys();
                Iterator<SelectionKey> it=selectedKeys.iterator();
                SelectionKey key=null;
                while(it.hasNext()){//轮询多路复用器Selector,当有就绪的Channel时
                    key=it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if(key!=null){
                            key.cancel();
                            if(key.channel()!=null){
                                key.channel().close();
                            }
                        }
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
            //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动注册并关闭,所以不需要重复释放资源。
        /*
         * 由于多路复用器上可能注册成千上万的Channel或者pipe,如果一一对这些资源进行释放显然不合适。
         * 因此,JDK底层会自动释放所有跟此多路复用器关联的资源。
         */

        if(selector!=null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

        //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动注册并关闭,所以不需要重复释放资源。
        /*
         * 由于多路复用器上可能注册成千上万的Channel或者pipe,如果一一对这些资源进行释放显然不合适。
         * 因此,JDK底层会自动释放所有跟此多路复用器关联的资源。
         */

        if(selector!=null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
        private void handleInput(SelectionKey key) throws ClosedChannelException, IOException {
        if(key.isValid()){
            //判断是否连接成功
            SocketChannel sc=(SocketChannel) key.channel();
            if(key.isConnectable()){//处于连接状态,说明服务器已经返回ACK应答消息
                if(sc.finishConnect()){//对连接结果进行判断
                    /*
                     * 将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,
                     * 监听网络读操作,然后发送请求消息给服务端。
                     */
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                }else{
                    System.exit(1);//连接失败,进程退出
                }
            }
                        if(key.isReadable()){
                //开辟缓冲区
                ByteBuffer readBuffer=ByteBuffer.allocate(1024);
                //异步读取
                int readBytes=sc.read(readBuffer);
                if(readBytes>0){
                    readBuffer.flip();
                    byte[] bytes=new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body=new String(bytes, "UTF-8");
                    System.out.println("Now is: "+body);
                    this.stop=true;
                }else if(readBytes<0){
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                }else{
                    //读到0字节,忽略
                }
            }
        }
    }
        private void doConnect() throws IOException {
        //如果直接连接成功,则将SocketChannel注册到多路复用器Selector上,发送请求消息,读应答
        if(socketChannel.connect(new InetSocketAddress(host, port))){
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        }else{
            /*
             * 如果没有直接连接成功,则说明服务端没有返回TCP握手应答信息,但这并不代表连接失败,
             * 我们需要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT,
             * 当服务端返回TCP syn-ack消息后,Selector就能轮询到整个SocketChannel处于连接就绪状态。
             */
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

        private void doWrite(SocketChannel sc) throws IOException {
        byte[] req="QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
        //写入到发送缓冲区中
        writeBuffer.put(req);
        writeBuffer.flip();
        //由于发送是异步的,所以会存在"半包写"问题,此处不赘述
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining()){//如果缓冲区中的消息全部发送完成
            System.out.println("Send order 2 server succeed.");
        }
    }
}

  通过源码相比解析发掘,NIO编制程序难度确实比同步阻塞BIO大相当多,此处大家的NIO例程并未设想“半包读”和“半包写”,假使加上那些,代码会愈发头眼昏花。NIO代码既然那样复杂,为何它的使用却更是广阔呢,使用NIO编制程序的长处计算如下:

  JDK1.7升任了NIO类库,升级后的NIO类库被称之为NIO
2.0。引进注指标是,Java正式提供了异步文件I/O操作,同有时间提供了与UNIX网络编制程序事件驱动I/O对应的AIO。

Java IO编制程序全解(五)——AIO编制程序

一旦此文对你有帮衬,微信打赏笔者瞬间啊~

图片 1

http://www.bkjia.com/Javabc/1232131.htmlwww.bkjia.comtruehttp://www.bkjia.com/Javabc/1232131.htmlTechArticleJava IO编制程序全解(四)——NIO编制程序,ionio
转发请证明出处:http://www.cnblogs.com/Joanna-Yan/p/7793964.html
后边讲到:Java IO编制程序全解(三)伪异步IO编…

2.NIO服务端体系图

  NIO服务端通讯系列图如下图所示:

图片 2

  下边,大家对NIO服务端的关键制程进展解说和说明,作为NIO的基本功入门,大家将忽略掉一部分在生产条件中配置所须求的部分天性和职能。

  步骤一:展开ServerSocketChannel,用于监听客户端的连年,它是负有客户端连接的父管道。

ServerSocketChannel acceptorSvr=ServerSocketChannel.open();

  步骤二:绑定监听端口,设置连接为非阻塞形式。

acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
acceptorSvr.configureBlocking(false);

  步骤三:创制Reactor线程,成立多路复用器并运营线程。

Selector selector=Selector.open();
New Thread(new ReactorTask()).start();

  步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件。

SelectionKey key=acceptorSvr.register(selector,SelectionKey.OP_ACCEPT,ioHandler);

  步骤五:多路复用器在线程run方法的有线循环体内轮询计划稳妥的Key。

int num=selector.select();
Set selectedKeys=selector.selectedKeys();
Iterator it=selectedKeys.iterator();
while(it.hasNext()){
  SelectionKey key=(SelectionKey )it.next();
  //...deal with I/O event...
}

  步骤六:多路复用器监听到有新的客户端连着,管理新的过渡恳求,完结TCP叁回握手,建构物理链路。

SocketChannel channel=svrChannel.accpet();

  步骤七:设置客户端链路为非阻塞格局。

channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
......

  步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器,监听读操作,用来读取客户端发送的网络音信。

SelectionKey key=socketChannel.register(selector,SelectionKey.OP_READ,ioHandler);

  步骤九:异步读取客户端央求音信到缓冲区。

int readNumber=channel.read(receivedBuffer);

  步骤十:对ByteBuffer进行编解码,假如有半包音讯指针reset,继续读取后续的报文,将解码成功的音信封装成Task,投递到业务线程池中,实行作业逻辑编排。

Object message=null;
while(buffer.hasRemain()){
  byteBuffer.mark();
  Object message=decode(byteBuffer);
  if(message==null){
    byteBuffer.reset();
    break;
  }
  messageList.add(message);
}
if(!byteBuffer.hasRemain()){
  byteBuffer.clear();
}else{
  byteBuffer.compact();
}
if(messageList!=null& !messageList.isEmpty()){
  for(Object messageE: messageList){
    handlerTask(messageE);
  }
}

  步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将新闻异步发送给客户端。

socketChannel.write(buffer);

  注意:如若发送区TCP缓冲区满,会促成写半包,此时,须要注册监听写操作位,循环写,直到整包音讯写入TCP缓冲区。

  当大家领悟创立NIO服务端的着力步骤之后,上边大家将眼前的岁月服务器程序通过NIO重写二回,让大家能够学习到总体版的NIO服务端创立。

3.NIO创制的TimeServer源码分析

package joanna.yan.nio;

public class TimeServer {

    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 采用默认值
            }
        }

        MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

package joanna.yan.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
/**
 * 多路复用类
 * 它是一个独立的线程,负责轮询多路复器Selector,可以处理多个客户端的并发接入。
 * @author Joanna.Yan
 * @date 2017年11月6日下午3:51:41
 */
public class MultiplexerTimeServer implements Runnable{

        private Selector selector;//多路复用器
        private ServerSocketChannel servChannel;
        private volatile boolean stop;

        /**
         * 初始化多路复用器、绑定监听端口
         * @param port
         */
        public MultiplexerTimeServer(int port){
            try {
                selector=Selector.open();
                servChannel=ServerSocketChannel.open();
                servChannel.configureBlocking(false);
                servChannel.socket().bind(new InetSocketAddress(port), 1024);
                servChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("The time server is start in port: "+port);

            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    public void stop(){
            this.stop=true;
      }

    @Override
       public void run() {
            while(!stop){
                try {
                    //设置selector的休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次。
                    selector.select(1000);
                    //当有处于就绪状态的Channel时,selector就返回就绪状态的Channel的SelectionKey集合。
                    Set<SelectionKey> selectedKeys=selector.selectedKeys();
                    Iterator<SelectionKey> it=selectedKeys.iterator();
                    SelectionKey key=null;
                    //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作。
                    while(it.hasNext()){
                        key=it.next();
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if(key!=null){
                                key.cancel();
                                if(key.channel()!=null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
       /*
             * 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源。
             */
            if(selector!=null){
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                //处理新接入的请求消息
                //通过SelectionKey的操作位进行判断即可获知网络事件类型
                if(key.isAcceptable()){
                    //Accept the new connection
                    ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
                    SocketChannel sc=ssc.accept();
                    //-----以上操作相当于完成了TCP的三次握手,TCP物理链路正式建立------

                    //将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等。
                    sc.configureBlocking(false);
                    //Add the new connection to the selector
                    sc.register(selector, SelectionKey.OP_READ);
                }
         if(key.isReadable()){
                    //Read the data
                    SocketChannel sc=(SocketChannel) key.channel();
                    //由于实现我们得知客户端发送的码流大小,作为例程,我们开辟一个1K的缓冲区
                    ByteBuffer readBuffer=ByteBuffer.allocate(1024);
                    //由于已经设置SocketChannel为异步非阻塞模式,因此它的read是非阻塞的。
                    int readBytes=sc.read(readBuffer);
                    /*
                     * readBytes>0  读到了字节,对字节进行编解码;
                     * readBytes=0  没有读取到字节,属于正常场景,忽略;
                     * readByte=-1 链路已经关闭,需要关闭SocketChannel,释放资源
                     */
            if(readBytes>0){
                        //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。
                        readBuffer.flip();
                        //根据缓冲区可读的字节个数创建字节数组
                        byte[] bytes=new byte[readBuffer.remaining()];
                        //调用ByteBuffer的get操作将缓冲区可读的字节数组复制到新创建爱你的字节数组中
                        readBuffer.get(bytes);
                        String body=new String(bytes, "UTF-8");
                        System.out.println("The time server receive order: "+body);
                        //如果请求指令是"QUERY TIME ORDER"则把服务器的当前时间编码后返回给客户端
                        String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(
                                System.currentTimeMillis()).toString() : "BAD ORDER";

                        doWrite(sc,currentTime);
                    }else if(readBytes<0){

                //对端链路关闭
                        key.cancel();
                        sc.close();
                    }else{
                        //读到0字节,忽略
                    }
                }
            }
        }

    private void doWrite(SocketChannel channel,String response) throws IOException{
            if(response!=null&& response.trim().length()>0){
                byte[] bytes=response.getBytes();
                ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
                //调用ByteBuffer的put操作将字节数组复制到缓冲区
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer);

                /*
                 * 需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次性能够把需要发送的字节数组发送完,
                 * 此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector,将没有发送完毕的ByteBuffer发送完毕,
                 * 可以通过ByteBuffer的hasRemaining()方法判断消息是否发送完成。
                 * 此处仅仅是各简单的入门级例程,没有演示如何处理“写半包”场景,后面会说到。
                 */
            }
        }
}

4.NIO客户端体系图

  NIO客户端创立种类图如图所示。

图片 3

  步骤一:展开SocketChannel,绑定客户端本地地址(可选,私下认可系统会轻巧分配三个可用的本土地址)

SocketChannel clientChannel=SocketChannel.open();

  步骤二:设置SocketChannel为非阻塞情势,同有的时候间设置客户端连接的TCP参数。

clientChannel.configureBlocking(false);
socket.setReuseAddress(true);
socket.setReceiveBufferSize(BUFFER_SIZE);
socket.setSendBufferSize(BUFFER_SIZE);

  步骤三:异步连接服务端。

boolean connected=clientChannel.connect(new InetSocketAddress("ip",port));

  步骤四:判别是或不是连接成功,若是连接成功,则一贯登记读状态位到多路复用器中,纵然当前从不连接成功(异步连接,重回false,表明客户端已经发送sync包,服务端未有回到ack包,物理链路还不曾树立)。

if(connected){
  clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
}else{
  clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);
}

  步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP
ACK应答。

clientChannel.register(selector,SelectionKay.OP_CONNECT,ioHandler);

  步骤六:成立Reactor线程,创制多路复用器并运营线程。

Selector selector=Selector.open();
new Thread(new ReactorTask()).start();

  步骤七:多路复用器在线程run方法的最为循环体内轮询希图伏贴的key。

int num=selector.select();
Set selectedKeys=selector.selectedKeys();
Iterator it=selectedKeys.iterator();
while(it.hasNext()){
  SelectionKey key=(SelectionKey)it.next();
  //...deal with I/O event...
}

  步骤八:接收connect事件进展管理。

if(key.isConnectable()){
  //handlerConnect();
}

  步骤九:剖断连接结果,若是总是成功,注册读事件到多路复用器。

if(channel.finishConnect()){
  registerRead();
}

  步骤十:注册读事件到多路复用器。

clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);

  步骤十一:异步读客户端乞求音讯到缓冲区。

int readNumber=channel.read(receivedBuffer);

  步骤十二:对ByteBuffer举行编解码,若是有半包信息接收缓冲区Reset,继续读取后续的报文,将解码成功的音讯封装成Task,投递到业务线程池中,进行工作逻辑编排。

Object message=null;

while(buffer.hasRemain()){
  byteBuffer.mark();
  Object message=decode(byteBuffer);
  if(message==null){
    byteBuffer.reset();
    break;
  }
  messageList.add(message);
}

if(!byteBuffer.hasRemain()){
  byteBuffer.clear();
}else{
  byteBuffer.compact();
}

if(messageList!=null & !messageList.isEmpty()){
  for(Object messageE:messageList){
    handlerTask(messageE);
  }
}

  步骤十三:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将音讯异步发送给客户端。

socketChannel.wirte(buffer);

5.NIO创造的TimeClient源码深入分析

package joanna.yan.nio;

public class TimeClient {
    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 采用默认值
            }
        }

        new Thread(new TimeClientHandle("127.0.0.1", port),"TimClient-001").start();
    }
}

package joanna.yan.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * 处理异步连接和读写操作
 * @author Joanna.Yan
 * @date 2017年11月6日下午4:33:14
 */
public class TimeClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    /**
     * 初始化NIO的多路复用器和SocketChannel对象
     * @param host
     * @param port
     */
        public TimeClientHandle(String host,int port){
        this.host=host==null ? "127.0.0.1" : host;
        this.port=port;
        try {
            selector=Selector.open();
            socketChannel=SocketChannel.open();
            //设置为异步非阻塞模式,同时还可以设置SocketChannel的TCP参数。例如接收和发送的TCP缓冲区大小
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while(!stop){
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys=selector.selectedKeys();
                Iterator<SelectionKey> it=selectedKeys.iterator();
                SelectionKey key=null;
                while(it.hasNext()){//轮询多路复用器Selector,当有就绪的Channel时
                    key=it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if(key!=null){
                            key.cancel();
                            if(key.channel()!=null){
                                key.channel().close();
                            }
                        }
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
            //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动注册并关闭,所以不需要重复释放资源。
        /*
         * 由于多路复用器上可能注册成千上万的Channel或者pipe,如果一一对这些资源进行释放显然不合适。
         * 因此,JDK底层会自动释放所有跟此多路复用器关联的资源。
         */

        if(selector!=null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

        //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动注册并关闭,所以不需要重复释放资源。
        /*
         * 由于多路复用器上可能注册成千上万的Channel或者pipe,如果一一对这些资源进行释放显然不合适。
         * 因此,JDK底层会自动释放所有跟此多路复用器关联的资源。
         */

        if(selector!=null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
        private void handleInput(SelectionKey key) throws ClosedChannelException, IOException {
        if(key.isValid()){
            //判断是否连接成功
            SocketChannel sc=(SocketChannel) key.channel();
            if(key.isConnectable()){//处于连接状态,说明服务器已经返回ACK应答消息
                if(sc.finishConnect()){//对连接结果进行判断
                    /*
                     * 将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,
                     * 监听网络读操作,然后发送请求消息给服务端。
                     */
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                }else{
                    System.exit(1);//连接失败,进程退出
                }
            }
                        if(key.isReadable()){
                //开辟缓冲区
                ByteBuffer readBuffer=ByteBuffer.allocate(1024);
                //异步读取
                int readBytes=sc.read(readBuffer);
                if(readBytes>0){
                    readBuffer.flip();
                    byte[] bytes=new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body=new String(bytes, "UTF-8");
                    System.out.println("Now is: "+body);
                    this.stop=true;
                }else if(readBytes<0){
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                }else{
                    //读到0字节,忽略
                }
            }
        }
    }
        private void doConnect() throws IOException {
        //如果直接连接成功,则将SocketChannel注册到多路复用器Selector上,发送请求消息,读应答
        if(socketChannel.connect(new InetSocketAddress(host, port))){
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        }else{
            /*
             * 如果没有直接连接成功,则说明服务端没有返回TCP握手应答信息,但这并不代表连接失败,
             * 我们需要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT,
             * 当服务端返回TCP syn-ack消息后,Selector就能轮询到整个SocketChannel处于连接就绪状态。
             */
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

        private void doWrite(SocketChannel sc) throws IOException {
        byte[] req="QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
        //写入到发送缓冲区中
        writeBuffer.put(req);
        writeBuffer.flip();
        //由于发送是异步的,所以会存在"半包写"问题,此处不赘述
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining()){//如果缓冲区中的消息全部发送完成
            System.out.println("Send order 2 server succeed.");
        }
    }
}

  通过源码相比较深入分析发掘,NIO编制程序难度的确比同步阻塞BIO大相当多,此处大家的NIO例程并未思虑“半包读”和“半包写”,要是加上那么些,代码会愈加错综相连。NIO代码既然那样复杂,为何它的应用却越发遍布呢,使用NIO编制程序的长处计算如下:

  1. 客户端发起的连天操作是异步的,可以通过多路复用器注册OP_CONNECT等待后续结果,无需像从前的客户端那样被一块阻塞。
  2. SocketChannel的读写操作都以异步的,若无可读写的数量它不会联合等待,间接重临,那样I/O通讯线程就足以拍卖别的的链路,无需一同等待那些链路可用。
  3. 线程模型的优化:由于JDK的Selector在Linux等主流操作系统上经过epoll达成,它未有连接句柄数的范围(只受限于操作系统的最大句柄数或许对单个进度的句柄限制),这表示三个Selector线程能够况且管理数不完个客户端连接,并且质量不会趁着客户端的扩展而线性下落,因而,它非常适合做高质量、高负载的互连网服务器。

  JDK1.7进级了NIO类库,进级后的NIO类库被称作NIO
2.0。引进注目标是,Java正式提供了异步文件I/O操作,同期提供了与UNIX互连网编制程序事件驱动I/O对应的AIO。

Java
IO编制程序全解(五)——AIO编制程序

假如此文对你有赞助,微信打赏笔者刹那间吗~

图片 4

留下评论

网站地图xml地图