深入Jetty源码之Connection

简介:

概述

当Jetty中的Connector收到一个客户端的连接时(ServerSocket或ServerSocketChannel的accept()方法返回),Connector会首先创建一个ConnectedEndPoint用于和连接的底层(Socket、Channel)打交道(读写数据),在创建的ConnectedEndPoint时会同时使用该EndPoint创建相应类型的Connection,然后会创建一个Task仍给线程池,最终线程池会启动一个线程启动这个Task,而在这个Task中调用Connection中的handle()方法,以处理当前的连接请求,在这个Task中,它会持续的调用Connection中的handle()方法直到连接关闭或Connector停止。在ConnectorEndPoint中,它自己就实现了Runnable接口,因而可以将它自己丢给线程池,而在SelectChannelEndPoint中则交给SelectorManager来管理客户端连接过来的Channel,并调用Connection的handle方法。 

Connection接口定义

Connection的定义如下: 
public  interface Connection {
     //  Connection中的核心逻辑,在HttpConnection中,它使用HttpParser解析请求数据,HttpParser采用事件相应机制,可以通过注册HttpParser.EventHandler(RequestHandler)填充HttpConnection中的需要从请求消息中获取的信息,最后在messageComplete()事件相应方法中调用handleRequest()方法将解析后的Request请求交由Server实例的handle()方法处理。在Jetty中,Server是HandlerWrapper子类,它存储了所有注册的Handler,从而将最终的处理流程传导给所有注册的Handler。在所有注册的Handler处理完成后,Connection中的handle()方法会继续执行,使用HttpGenerator、NestedGenerator将缓存的数据刷新到EndPoint中。如果当前请求的相应状态是101(Switching Protocols),则handle方法返回的Connection实例是从注册的以"org.eclipse.jetty.io.Connection"为key的实例,也正是因为这个相应状态码的存在,这个handle方法的返回值是一个Connection。
    Connection handle()  throws IOException;

     //  除了handle方法,Connection中还有一些提供了一些包含Connection状态的方法:

    
//  返回Connection创建的时间戳。
     long getTimeStamp();
     //  当前Connection是否处于Idle状态,如HttpParser、HttpGenerator都处于Idle状态。
     boolean isIdle();
     //  当前Connection是否处于Suspended状态,用于Continuation机制。
     boolean isSuspended();
     //  当Connection关闭时会调用这个方法。
     void closed();
     //  当连接的Idle时间超时后调用该方法,在HttpConnection中,该方法会关闭EndPoiont。
     void idleExpired();
}

Connection类图



HttpConnection实现

HttpConnection是Jetty中对Connection的主要实现,它表示Http客户端和服务器的一次连接,用于将Request、Response、EndPoint联系在一起。同时HttpConnection也是在避免使用pooling的方式下重用Request、Response、HttpParser、HttpGenerator、HttpFields(requestFields、responseFields)、Buffer、HttpURI等(因为Jetty保证了每一次连接只创建一个HttpConnection实例,这是一个可以学习的点,不用pooling方式的重用,以进一步提升性能)。另外,HttpConnection还有对Connector和Server实例的引用,并且用request字段记录了该Connection总共处理的请求数(在headerComplete回调函数中自增)。

如果请求包含Expect头,并且其值是100-continue,表示客户端希望在请求被正真处理前发送一个响应以表示是否能处理该请求,因而在第一次调用getInputStream时表示服务器已经准备好开始处理请求消息体了,此时在返回ServletInputStream之前,服务器要发送100 Continue响应消息给客户端(通过调用HttpGenerator中的send1xx()方法)。在Jetty中,HttpInput类继承自ServletInputStream,它从HttpParser中读取请求消息体数据。
如果请求头包含Expect头,并且它的值是102-processing,此时服务器可能会发送102状态码的响应,表示请求正在被处理,之后会发送最终的响应。在Jetty中,可以通过Response中的sendError()方法,传入102的状态码以发送102状态码的响应(使用HttpGenerator中的send1xx()方法)。

handle()方法是HttpConnection中的核心方法,在每一个连接到来时,Connector会创建一个Runnable实例,将该Runnable实例扔到线程池中,在该Runnable的run()方法实现中不断的调用Connection的handle()方法直到当前连接或Connector关闭。在该方法的实现中:
  1. 它首先设置_handling字段,表示当前正在处理,并且将当前HttpConnection实例设置到__currentConnection的ThreadLocal变量中。
  2. 循环处理请求消息直到EndPoint关闭或者在more_in_buffer为true(初始值为true)。
  3. 如果当前Request处于Async状态,并且还Async状态还没有结束,直接调用handleRequest()方法,如果Async状态结束了,但是HttpParser还没有结束,则继续使用HttpParser解析,在解析过程中,可能会在headerComplete()、content()、messageComplete()回调函数中调用handleRequest()方法(具体参见HttpParser的实现);而后在HttpGenerator已经Commit(所有的响应头已经准备好,并已经写入到EndPoint中),但是还没有完成的情况下,将HttpGenerator中的数据Flush到EndPoint中;此时如果EndPoint还存在输出缓存,则将其Flush到底层链路中。
  4. 如果当前Request没有处于Async状态,如果HttpPaser还没有结束,使用HttpParser解析,在解析过程中,可能会在headerComplete()、content()、messageComplete()回调函数中调用handleRequest()方法(具体参见HttpParser的实现);而后在HttpGenerator已经Commit(所有的响应头已经准备好,并已经写入到EndPoint中),但是还没有完成的情况下,持续的将HttpGenerator中的数据Flush到EndPoint中,如果EndPoint还存在输出缓存,则将其Flush到底层链路中;如果HttpGenerator已经处于完成状态,但是EndPoint中还有输出缓存数据,此时将这些数据Flush到底层链路,如果写完缓存中的数据,将progress设置为true,表示handle方法需要继续处理。
  5. 在这些过成中如果出现任何HttpException,则使用HttpGenerator发送错误响应码给客户端(使用sendError()方法,并关闭EndPoint)。
  6. 如果HttpParser中还有数据未处理或者EndPoint中还有输入数据未处理,则循环继续。
  7. 如果此时HttpParser已经处理完成,HttpGenerator已经处理完成,并且EndPoint中的输出缓存中已经没有任何数据:1. 如果响应状态码时101 Switching Protocols,且在Request存在org.eclipse.jetty.io.Connection的Connection实例,则新的Connection从Request的该Attribute中获取,并重置HttpParser和HttpGenerator;2. 如果Request不存在该Attribute的Connection,HttpGenerator非persisent状态或EndPoint的InputStream已经关闭,则重置HttpParser,关闭EndPoint,设置more_in_buffer为false,重置当前HttpConnection。
  8. 如果HttpParser处于idle状态,并且EndPoint的InputStream已经关闭,则关闭当前EndPoint,并设置more_in_buffer为false。
  9. 如果Request的Async处于启动状态,则设置more_in_buffer为false。
  10. 如果EndPoint是AsyncEndPoint,Generator已经Commit,但是还未Complete,则该EndPoint schedule一个write操作。
  11. 最后,清理_handle字段和__currentConnection的ThreadLocal字段。

handleRequest()是HttpConnection在对请求消息头解析完成后执行的真正处理逻辑方法:
  1. 对任何Request还没有处理完成,并且Server不为null且处于Running状态,循环处理。
  2. 设置Request的handled为false,以及PathInfo字段(如果pathInfo为null,又不是Connect请求,则为400 Error)。
  3. 如果_out字段不为null,reopen it。
  4. 如果Request处于initial状态,设置Request的DispatcherType为REQUEST,使用当前的EndPoint和Request实例配置Connector,并调用使用当前HttpConnection作为参数调用Server的handle()方法;否则,设置Request的DispatcherType为ASYNC,调用Server的handleAsync()方法(传入当前HttpConnection做为参数)。
  5. 对任何非ContinuationThrowable异常,设置Request的handled为true,error为true,对HttpException使用Response发送响应状态码给客户端,而对Throwable,使用HttpGenerator发送400或500状态码给客户端。
  6. 如果此时Request处于为完成状态,调用AsyncContinuation.doComplete()方法;如果100 Continue响应没有发送给客户端,则清除该状态,但是如果此时Response还没有Commit,则设置HttpGenerator的persistent为false,表示客户端并没有发送数据过来,我们可以关闭该连接了;如果EndPoint关闭了,则调用Response的complete方法;如果EndPoint没有关闭并且有error,直接关闭EndPoint;如果EndPoint没有关闭,也没有error,但是HttpGenerator没有Commit,Request也没有被handle,则使用resonse发送404 Resource Not Found响应消息,之后调用Response的complete方法;最后设置Request的handled为true。

commitResponse()方法,用于控制HttpGenerator的执行流程:
  1. 在HttpGenerator还没有Commit之前(即响应状态行和响应消息头还没写入到EndPoint中)时,先调用HttpGenerator的setResponse()方法设置状态行。
  2. 然后调用HttpGenerator的completeHeader()方法将响应消息头写入到EndPoint中。
  3. 最后调用HttpGenerator的complete方法,不断的将HttpGenerator中的缓存写入到EndPoint中。

flushResponse()方法只是调用了commitResponse方法。 

HttpOutput时Jetty中继承自ServletOutputStream的类,它使用AbstractorGenerator向底层EndPoint中写入数据。

Output时HttpConnection中的内部类,它继承自HttpOutput,它在调用close/flush时会先调用commitResponse/flushReponse方法,保证响应消息先写状态行,然后是响应消息头,最后才是响应消息体。该类还实现了sendContent方法,其参数可以是HttpContent类型或Resource类型,该方法是一个Util方法,它会自动设置Content-Type、Content-Length、Last-Modified等头,并将HttpContent或Resource对应的数据写入到EndPoint中。

相关文章
|
缓存 分布式计算 API
Spark Netty与Jetty (源码阅读十一)
  spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。   创建了一个线程工厂,生成的线程都给定一个前缀名。      像一般的netty框架一样,创建Netty的EventLoopGroup:      在常用...
1068 0