哎,各种各样杂七杂八的事情。。。好久没有看代码了,其实要搞明白一个与IO相关的框架,最好的办法就是把它的I/0的读写两个过程搞清楚。。。例如在netty中,如果能将eventLoop的运行原理搞清楚,然后摸清楚整个I/O读写两个过程,那么也就差不太多了。。。。
这次来看看ZeroMQ(java)中如何来处理I/O的,先来看看一个类型的定义,IOObject类型,这个类型应该扮演的是工具类的形象,前面看过在ZeroMQ中所谓的IO线程的定义,那么IOObject就是用于直接与IO线程交互的,或者说的更直接的一点就是它是与IO线程里的poller对象交互的。。。
那么先来看看IOObject的类图吧:
这张图应该将IOObject与IOThread以及Poller之间的关系表现的很清楚了吧。。。。IOObject实现了IPollEvents接口,那么也就代表它可以响应IO事件。。。不过其实它并不直接实现这些IO事件,而是将其委托给内部的一个IPollEvents对象。。只不过是做了一层代理而已。。。
好了,接下来来看看IOObject的代码吧,先来看看它的属性申明:
[java] view plain copy private Poller poller; //poller对象 private IPollEvents handler; //用于执行事件回调的handler这个poller就是从IO线程里面获取过来的,handler就是刚刚提到的事件回调的处理对象。。。IOObject不过是对其进行了一层包装而已。。。
那么接下来来看看重要的方法定义:
[java] view plain copy //在将一个IO对象加入到一个IO线程的时候,要注意确定当前IO对象之前没有加入到任何IO线程或者已经从别的IO线程上面退下来了 //将当前这个IO对象加入到IO线程上面去,说白了主要是获取这个IO线程的poller对象 public void plug(IOThread io_thread_) { assert (io_thread_ != null); assert (poller == null); poller = io_thread_.get_poller (); //获取这个线程的poller对象 }这个方法用于将当前这个IO对象加入到一个IO线程,其实主要的是要获取这个IO线程的Poller对象。。好了,接下来再来看看如何注册channel以及事件吧:
[java] view plain copy //在poller里面移除channel public final void rm_fd(SelectableChannel handle) { poller.rm_fd(handle); } //给这个channel注册读取的事件 public final void set_pollin (SelectableChannel handle_) { poller.set_pollin (handle_); } //在这个channel上面注册写事件 public final void set_pollout (SelectableChannel handle_) { poller.set_pollout (handle_); } //注册链接事件 public final void set_pollconnect(SelectableChannel handle) { poller.set_pollconnect(handle); } //注册accept事件 public final void set_pollaccept(SelectableChannel handle) { poller.set_pollaccept(handle); } //取消读取事件的注册 public final void reset_pollin(SelectableChannel handle) { poller.reset_pollin (handle); } //取消写事件的注册 public final void reset_pollout(SelectableChannel handle) { poller.reset_pollout (handle); }这部分代码应该很简单吧,而且应该对IOObject的用处比较的清楚了,然后至于说IOObject对象如何响应in_event什么的,前面已经说过了,其实是委托给了handler对象来处理。。。好啦,IOObject的分析就算差不多了。。接下来来看看StreamEngine类型的实现吧,还是先来看看它初略的类图吧:
其实觉得看一个类的类图,基本上就能看出这个类的很多情况,好了,不说闲话了,来看看它的属性的定义吧:
[java] view plain copy private static final int GREETING_SIZE = 12; //问候msg的大小,12个字节 (10字节的头,1字节的版本,1字节的socket类型) // True iff we are registered with an I/O poller. private boolean io_enabled; //如果是true的话,表示当前已经注册到了poller上面去 private SocketChannel handle; //真正底层用于通信的socketChannel private ByteBuffer inbuf; //接收数据的buf private int insize; //记录接收的数据的大小 private DecoderBase decoder; //decoder private Transfer outbuf; //outbuf private int outsize; //outbuf的大小 private EncoderBase encoder; //encoder // When true, we are still trying to determine whether // the peer is using versioned protocol, and if so, which // version. When false, normal message flow has started. private boolean handshaking; //是否是在握手中,当值为false的时候代表握手已经完成了 // The receive buffer holding the greeting message // that we are receiving from the peer. private final ByteBuffer greeting; //用于接收问候msg的buf // The send buffer holding the greeting message // that we are sending to the peer. private final ByteBuffer greeting_output_buffer; //用于发送问候msg的buf private SessionBase session; //所属的session private Options options; //选项配置 // String representation of endpoint private String endpoint; //这里一般是地址信息 private boolean plugged; //是否已经加入了 private boolean terminating; //是否已经停止了 // Socket private SocketBase socket; //所属的socket private IOObject io_object; //拥有的IO对象这里面有很多重要的属性,例如handler是SocketChannel类型的,可以知道它才是实际上底层用于通信的,然后又inbuf以及outbuf,这两个东西是干嘛用的应该一眼就看出来了吧,然后还有encoder和decoder,呵呵,可以猜到,读取到的数据先要经过decoder的处理才提交给上层,发送出去的数据也会通过encoder处理成二进制再发送出去。。。然后还有一个io_objcet对象。。。
接下来来看看构造方法吧:
[java] view plain copy //构造函数,第一个参数是底层的channel, public StreamEngine (SocketChannel fd_, final Options options_, final String endpoint_) { handle = fd_; inbuf = null; insize = 0; io_enabled = false; outbuf = null; outsize = 0; handshaking = true; //初始化为ture,表示还没有完成握手 session = null; options = options_; plugged = false; terminating = false; endpoint = endpoint_; socket = null; greeting = ByteBuffer.allocate (GREETING_SIZE); //创建用于接收问候msg的buf greeting_output_buffer = ByteBuffer.allocate (GREETING_SIZE); //创建用于发送握手信息的buf encoder = null; decoder = null; try { Utils.unblock_socket (handle); //将底层的channel设置为非阻塞的 if (options.sndbuf != 0) { //设置底层的socket的发送缓冲大小 handle.socket().setSendBufferSize((int)options.sndbuf); } if (options.rcvbuf != 0) { //设置底层的socket的接收缓冲大小 handle.socket().setReceiveBufferSize((int)options.rcvbuf); } } catch (IOException e) { throw new ZError.IOException(e); } }这个比较有意思的就是将channel设置为了非阻塞的模式,然后设置了底层socket的发送以及接受缓冲的大小。。其余的就没啥意思了。。。
[java] view plain copy //将当前engine加入到IO线程以及session,其实这里最主要的事情是将channel注册到poller上面去 public void plug (IOThread io_thread_, SessionBase session_) { assert (!plugged); plugged = true; //标志位 // Connect to session object. assert (session == null); assert (session_ != null); session = session_; //当前所属的session socket = session.get_soket (); //获取所属的scoekt,这个是ZMQ的socket io_object = new IOObject(null); //创建IO对象, io_object.set_handler(this); //设置IO对象的事件回调 // Connect to I/O threads poller object. io_object.plug (io_thread_); // 将IO对象搞到这个IO线程上面去,其实最主要的就是获取这个IO线程的poller对象 io_object.add_fd (handle); //将底层的channel加入 io_enabled = true; //表示已经加入了 // Send the 'length' and 'flags' fields of the identity message. // The 'length' field is encoded in the long format. //设置发送的问候msg的信息 greeting_output_buffer.put ((byte) 0xff); greeting_output_buffer.putLong (options.identity_size + 1); greeting_output_buffer.put ((byte) 0x7f); io_object.set_pollin (handle); //注册当前channel的读事件 // When there's a raw custom encoder, we don't send 10 bytes frame boolean custom = false; try { custom = options.encoder != null && options.encoder.getDeclaredField ("RAW_ENCODER") != null; } catch (SecurityException e) { } catch (NoSuchFieldException e) { } if (!custom) { outsize = greeting_output_buffer.position (); outbuf = new Transfer.ByteBufferTransfer ((ByteBuffer) greeting_output_buffer.flip ()); //设置需要发送的buf,将问候信息发送出去 io_object.set_pollout (handle); } // Flush all the data that may have been already received downstream. in_event (); //看是否有数据读取了 }这个方法用于将当前IO对象注册到IO线程上面去,并且还要管理session,可以看到这里主要是利用IOObject对象,用于在poller对象上面注册channel,以及读写事件。。。另外还有对握手信息的处理。。。好了,握手这部分的内容,因为现在还没有看,不知道具体的流程是啥样的,就先放一下。。。再来看两个重要的方法定义吧:
[java] view plain copy //当底层的chanel有数据可以读取的时候的回调方法 public void in_event () { if (handshaking) if (!handshake ()) return; assert (decoder != null); boolean disconnection = false; // If there's no data to process in the buffer... if (insize == 0) { //如果inbuf里面没有数据需要处理 // Retrieve the buffer and read as much data as possible. // Note that buffer can be arbitrarily large. However, we assume // the underlying TCP layer has fixed buffer size and thus the // number of bytes read will be always limited. inbuf = decoder.get_buffer (); //从解码器里面获取buf,用于写入读取的数据,因为在已经设置了底层socket的TCP接收缓冲区的大小 insize = read (inbuf); //用于将发送过来的数据写到buf中去,并记录大小 inbuf.flip(); //这里准备从buf里面读取数据了 // Check whether the peer has closed the connection. if (insize == -1) { //如果是-1的话,表示底层的socket连接已经出现了问题 insize = 0; disconnection = true; } } // Push the data to the decoder. int processed = decoder.process_buffer (inbuf, insize); //解析这些读取到的数据 if (processed == -1) { disconnection = true; } else { // Stop polling for input if we got stuck. if (processed < insize) //如果处理的数据居然还没有读到的数据多,那么取消读取事件的注册 io_object.reset_pollin (handle); // Adjust the buffer. insize -= processed; //还剩下没有处理的数据的大小 } // Flush all messages the decoder may have produced. session.flush (); //将decoder解析出来的数据交给session // An input error has occurred. If the last decoded message // has already been accepted, we terminate the engine immediately. // Otherwise, we stop waiting for socket events and postpone // the termination until after the message is accepted. if (disconnection) { //表示已经断开了连接,那么需要处理一下 if (decoder.stalled ()) { io_object.rm_fd (handle); io_enabled = false; } else error (); } } //表示可以写数据了 public void out_event () { // If write buffer is empty, try to read new data from the encoder. if (outsize == 0) { //需要写的数据量为0 // Even when we stop polling as soon as there is no // data to send, the poller may invoke out_event one // more time due to 'speculative write' optimisation. if (encoder == null) { assert (handshaking); return; } outbuf = encoder.get_data (null); //从encoder里面获取数据 outsize = outbuf.remaining(); // If there is no data to send, stop polling for output. if (outbuf.remaining() == 0) { //如果确实没有数据要写,那么取消写事件的注册 io_object.reset_pollout (handle); // when we use custom encoder, we might want to close if (encoder.is_error()) { error(); } return; } } // If there are any data to write in write buffer, write as much as // possible to the socket. Note that amount of data to write can be // arbitratily large. However, we assume that underlying TCP layer has // limited transmission buffer and thus the actual number of bytes // written should be reasonably modest. int nbytes = write (outbuf); //写数据 // IO error has occurred. We stop waiting for output events. // The engine is not terminated until we detect input error; // this is necessary to prevent losing incomming messages. if (nbytes == -1) { //如果-1,那么表示底层用到的socket其实已经出现了问题 io_object.reset_pollout (handle); //取消写事件的注册 if (terminating) terminate (); return; } outsize -= nbytes; //这里更新需要写的数据的数量 // If we are still handshaking and there are no data // to send, stop polling for output. if (handshaking) if (outsize == 0) io_object.reset_pollout (handle); // when we use custom encoder, we might want to close after sending a response if (outsize == 0) { if (encoder != null && encoder.is_error ()) { error(); return; } if (terminating) terminate (); } }这两个方法是用于相应IO事件的,前面提到的IOObject将IO事件其实委托给了内部的handler来处理,其实这个handler对象就是SteamEngine对象,也就是底层的channel有数据可以读写的时候,将会用上面的两个方法来处理。这里就可以看到读写事件最原始的处理流程了,而且也看到了encoder以及decoder的用处。。。这里代码应该还算是比较的简单,由于这部分还涉及到与上层的session对象之间的交互,这个还要等到以后来分析。。。
好了,那么到这里ZeroMQ中IO的处理流程也就算是有了基本的了解了。。。。
顶转载于:https://www.cnblogs.com/jym-sunshine/p/5441805.html
相关资源:ZeroMQ及java绑定