准备知识

通常咱们进行HTTP衔接网络的时候咱们会进行TCP的三次握手,然后传输数据,然后再释放衔接。 大量的衔接每次衔接关闭都要三次握手四次分手的很显然会形成功用低下, 因而http有一种叫做keep-alive connections的机制(HTTP1.1今后默认敞开),它能够在传输数据后仍然保持衔接, 当客户端需求再次获取数据时,直接运用刚刚闲暇下来的衔接而不需求再次握手。

OkHttp源码分析之连接池复用

OkHttp的复用衔接池便是为了复用这些没有断开衔接的TCP衔接的。

衔接池概述

okhttp衔接的树立首要是环绕ConnectInterceptor来的,流程为首要经过RealCall的initExchange(chain)创立一个Exchange方针,其中会翻开与方针服务器的链接, 并调用 Chain.proceed()办法进入下一个拦截器。 initExchange()办法中会先经过 ExchangeFinder 尝试去 RealConnectionPool 中寻觅已存在的衔接,未找到则会重新创立一个RealConnection 并开始衔接, 然后将其存入RealConnectionPool,现在现已准备好了RealConnection 方针,然后经过恳求协议创立不同的ExchangeCodec 并回来,回来的ExchangeCodec正是创立Exchange方针的一个参数。 ConnectInterceptor的代码很简单,首要的功用便是初始化RealCall的Exchange。这个Exchange的功用便是基于RealConnection+ExchangeCodec进行数据交换。

咱们首要重视衔接的这几个方面,怎么放入?怎么取出?复用条件?怎么整理收回?

核心类介绍

RealConnection

RealConnection 完结了 Connection接口,其中运用 Socket树立HTTP/HTTPS衔接,而且获取 I/O 流,内部持有输入和输出流。

假如拥有了一个RealConnection就代表了咱们现已跟服务器有了一条通讯链路,而且经过RealConnection代表是衔接socket链路,RealConnection方针意味着咱们现已跟服务端有了一条通讯链路了,同一个 Connection 可能会承载多个 HTTP 的恳求与响应。

类构造与要害特点:

<!--
class RealConnection(
  val connectionPool: RealConnectionPool,
  private val route: Route
) : Http2Connection.Listener(), Connection {
  //和服务器直接通讯的socket实例和用于数据读写的输入输出流
  private var rawSocket: Socket? = null
  private var source: BufferedSource? = null
  private var sink: BufferedSink? = null
  ...
  //引证计数法记录本衔接被多少个恳求持有,用于收回管理中判别该衔接是否闲暇
  val calls = mutableListOf<Reference<RealCall>>()
  ...
}
-->

RealConnectionPool

这是用来存储 RealConnection 的池子,内部运用一个双端行列来进行存储。

在 OkHttp 中,一个衔接(RealConnection)用完后不会立马被关闭并释放掉,而且是会存储到衔接池(RealConnectionPool)中。 除了缓存衔接外,缓存池还担任定期整理过期的衔接,在 RealConnection 中会维护一个用来描述该衔接闲暇时刻的字段,每增加一个新的衔接到衔接池中时都会进行一次检测,遍历一切的衔接,找出当时未被运用且闲暇时刻最长的那个衔接,假如该衔接闲暇时长超出阈值,或许衔接池已满,将会关闭该衔接。

类构造与要害特点:

<!--
class RealConnectionPool(
	taskRunner: TaskRunner,
	//每个闲暇Socket的最大衔接数,默以为5
	private val maxIdleConnections: Int,
	keepAliveDuration: Long,
	timeUnit: TimeUnit
) {
	//衔接保活时刻,默认5分钟
	private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
	private val cleanupQueue: TaskQueue = taskRunner.newQueue()
	private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
		override fun runOnce() = cleanup(System.nanoTime())
	}
	//RealConnection是Socket方针的包装类,connections也便是对这些衔接的缓存
	private val connections = ArrayDeque<RealConnection>()
	...
}
-->

ExchangeCodec

ExchangeCodec 的功用便是对http报文的编解码,担任对Request 编码及解码 Response,也便是写入恳求及读取响应,咱们的恳求及响应数据都经过它来读写。

其完结类有两个:Http1ExchangeCodec 及 Http2ExchangeCodec,别离对应两种协议版别。

Exchange

功用相似 ExchangeCodec,但它是对应的是单个恳求,其在 ExchangeCodec 基础上背负了一些衔接管理及事件分发的效果。 详细而言,Exchange 与 Request 以及ExchangeCodec 一一对应,新建一个恳求时就会创立一个 Exchange,该 Exchange 担任将这个恳求发送出去并读取到响应数据,而发送与接收数据运用的是 ExchangeCodec。

要害流程解析

怎么从衔接池取出和放入衔接?

调用链

--ConnectInterceptor.intercept
	--RealCall.initExchange
		--ExchangeFinder.find
			--ExchangeFinder.findHealthyConnection
				--ExchangeFinder.findConnection
				       --RealConnectionPool.callAcquirePooledConnection
						...
				--RealConnection.newCodec

经过ExchangeFinder找到或许新建衔接

假如看过okHttp曾经的源码的话,这儿ExchangeFinder.find的功用其实和3.x版别的StreamAllocation.newStream相似:

<!--
class ExchangeFinder(
 ...
) {
	...
	fun find(client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec {
		...
		val resultConnection = findHealthyConnection(...)
		return resultConnection.newCodec(client, chain)
		...
	}
	private fun findHealthyConnection(...): RealConnection {
		...
		val candidate = findConnection(...)
		...
	}
	private fun findConnection(...): RealConnection {
		...
	// 从衔接池里边查找可用衔接
		if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
			val result = call.connection!!
			eventListener.connectionAcquired(call, result)
			return result
		}
		...
	// 找不到的话创立新的衔接
		val newConnection = RealConnection(connectionPool, route)
		...
	// 衔接socket
		newConnection.connect(...)
		...
	// 将新衔接丢到衔接池
		connectionPool.put(newConnection)
	// 绑定RealCall和衔接
		call.acquireConnectionNoEvents(newConnection)
		...
		return newConnection
	}
	...
}
class RealCall(...) : Call {
	fun acquireConnectionNoEvents(connection: RealConnection) {
		...
		this.connection = connection
		connection.calls.add(CallReference(this, callStackTrace))
	}
}
-->

咱们能够看到这儿首要干了:

1.从衔接池里边查找可用衔接

2.找不到的话创立新的衔接RealConnection,衔接socket

3.将新衔接丢到衔接池

4.绑定RealCall和衔接RealConnection,也便是将RealConnection内部的引证计数+1

能够用图来描述便是:

OkHttp源码分析之连接池复用

从衔接池里边查找可用衔接

咱们详细看一下是怎么从衔接池获取符合条件的connection并弄清楚衔接复用的条件。

<!--RealConnectionPoll.kt
//
fun callAcquirePooledConnection(
	...
): Boolean {
	...
	for (connection in connections) {
		//当需求进行多路复用且当时的衔接不是 HTTP/2 衔接时,则抛弃当时衔接
		if (requireMultiplexed && !connection.isMultiplexed) continue
		//当时衔接不能用于此次恳求传入的address 分配 stream,则抛弃当时衔接。
		if (!connection.isEligible(address, routes)) continue
		call.acquireConnectionNoEvents(connection)	//将call方针作为弱引证参加connection的calls衔接行列中
		return true
	}
	return false
}
-->
<!--RealConnection.kt
//判别衔接池已有的Connection是否满意此次新恳求的复用条件
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
	assertThreadHoldsLock()
	//衔接次数是否已满,在HTTP 1.X的状况下allocationLimit总是为1,即线头堵塞,
	//前一个恳求彻底完毕后后一个恳求才能复用,不然只能新开Connection
	if (calls.size >= allocationLimit || noNewExchanges) return false
	//非Host的地址部分是否相等,内部会比较dns、protocols、proxy、sslSocketFactory、port等
	if (!this.route.address.equalsNonHost(address)) return false
	//host是否相同
	if (address.url.host == this.route().address.url.host) {
	  return true // This connection is a perfect match.host相同就直接可复用
	}
	if (http2Connection == null) return false
	...
	return true // The caller's address can be carried by this connection.
  }
-->

咱们能够得到衔接池复用的条件为:

1.当时衔接可用恳求次数已满不行复用,在HTTP 1.X的状况下allocationLimit总是为1,即线头堵塞,前一个恳求彻底完毕后后一个恳求才能复用此衔接,不然只能新开Connection

2.非Host的地址部分不相等不行复用,内部会比较dns、protocols、proxy、sslSocketFactory、port等

3.前面满意的前提下,host假如相同可复用

4.假如host不相等,但当时http协议是http2那就需求持续判别其他条件决议是否可复用

这儿要特别注意

allocationLimit 在HTTP 1.X的状况下allocationLimit总是为1,保证了HTTP 1.X的状况下每次只能跑一个恳求, 也便是说有必要一个将上次Request的Response彻底读取之后才能发送下一次Request。

但http2在多路复用+二进制帧的加持下是答应一个衔接同时被多个恳求运用的,答应连续发送多个Request 。

Connection引证计数

创立或许复用Connection的时候都会调用到RealCall.acquireConnectionNoEvents,将RealCall的弱引证丢到connection.calls里边,于是就完结了恳求对Connection引证计数+1;

<!--RealCall.kt
class RealCall(...) : Call {
	fun acquireConnectionNoEvents(connection: RealConnection) {
		...
		this.connection = connection
		connection.calls.add(CallReference(this, callStackTrace))
	}
}
-->

有add就有remove,恳求完结后,会调用Exchange.complete办法,最终调到RealCall.releaseConnectionNoEvents将引证从connection.calls里边删掉,于是就完结了恳求对Connection引证计数-1;

<!--RealCall.kt
  internal fun releaseConnectionNoEvents(): Socket? {
	...
	val calls = connection.calls
	val index = calls.indexOfFirst { it.get() == this@RealCall }//找到当时恳求RealCall对应的Connecton.calls列表中的RealCall
	check(index != -1)
	calls.removeAt(index)
	this.connection = null
	...
	return null
  }	
-->

Sockt衔接的树立

衔接的真正树立其实也是经过Socket来完结的,咱们能够简单看一下:

<!--RealConnection.kt
  fun connect(
	connectTimeout: Int,
	readTimeout: Int,
	writeTimeout: Int,
	pingIntervalMillis: Int,
	connectionRetryEnabled: Boolean,
	call: Call,
	eventListener: EventListener
  ) {
	...//检查ssl
	while (true) {
	  try {
		if (route.requiresTunnel()) {
		  connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
		  if (rawSocket == null) {
			// We were unable to connect the tunnel but properly closed down our resources.
			break
		  }
		} else {
		  connectSocket(connectTimeout, readTimeout, call, eventListener)//socket衔接树立
		}
		establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)//协议树立
		...
	  }
	}
	...
  }
	private fun connectSocket(
	connectTimeout: Int,
	readTimeout: Int,
	call: Call,
	eventListener: EventListener
  ) {
	...
	val rawSocket = when (proxy.type()) {
	  Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
	  else -> Socket(proxy)
	}
	...
	rawSocket.soTimeout = readTimeout
	try {
	  Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
	} catch (e: ConnectException) {
	  throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
		initCause(e)
	  }
	}
	...
	try {//拿到输入输出流
	  source = rawSocket.source().buffer()
	  sink = rawSocket.sink().buffer()
   }
   ...
  }
-->

Sockt衔接的断开

因为咱们需求复用衔接,因而Socket衔接并不是在恳求完结后就断开的,假如闲暇衔接数在答应范围内(默认5个),他会保持闲暇存活keep-alive的时刻后(默认5分钟)由okhttp的自动整理机制进行整理并关闭。详细分析见下面的闲暇衔接整理部分。

怎么整理闲暇衔接?

从上面的复用机制咱们看到,socket衔接在上一次恳求完结之后是不会断开的,等候下次恳求复用。 假如一向不去断开的话,就会有一个资源占用的问题。

那么OkHttp是在什么时候断开衔接的呢? 其实RealConnectionPool内部会有个cleanupTask专门用于衔接的整理,它会在RealConnectionPool的put(参加新衔接)、connectionBecameIdle(有衔接闲暇)里边被调用。

<!--RealConnectionPool.kt
  private val cleanupQueue: TaskQueue = taskRunner.newQueue()
  private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
    override fun runOnce() = cleanup(System.nanoTime())
  }
  //参加新衔接
  fun put(connection: RealConnection) {
    ...
    cleanupQueue.schedule(cleanupTask)
  }
  //有衔接闲暇,对应引证计数的-1,也便是RealCall.releaseConnectionNoEvents
  fun connectionBecameIdle(connection: RealConnection): Boolean {
    connection.assertThreadHoldsLock()
    return if (connection.noNewExchanges || maxIdleConnections == 0) {
      connection.noNewExchanges = true
      connections.remove(connection)
      if (connections.isEmpty()) cleanupQueue.cancelAll()
      true
    } else {
      cleanupQueue.schedule(cleanupTask)
      false
    }
  }  
-->

cleanupQueue会依据 Task.runOnce的回来值等候一段时刻再次调用runOnce, 这样设计是为了在本次履行整理后,拿到最近一个需求整理的衔接到期剩余的时刻,便利第一时刻将过期的衔接整理掉。 这儿的runOnce实际便是cleanup办法,这儿面会查找闲暇过久的衔接,然后关闭它的socket:

<!--RealConnectionPool.kt
fun cleanup(now: Long): Long {
    var inUseConnectionCount = 0
    var idleConnectionCount = 0
    var longestIdleConnection: RealConnection? = null
    var longestIdleDurationNs = Long.MIN_VALUE
    // 找到下一次闲暇衔接超时的时刻
    for (connection in connections) {
      synchronized(connection) {
        // 假如这个connection还在运用(Response还没有读完),就计数然后持续搜索
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++
        } else {
          idleConnectionCount++
          // 这个衔接现已闲暇,核算它闲暇了多久,而且保存闲暇了最久的衔接
          val idleDurationNs = now - connection.idleAtNs
          if (idleDurationNs > longestIdleDurationNs) {
            longestIdleDurationNs = idleDurationNs
            longestIdleConnection = connection
          } else {
            Unit
          }
        }
      }
    }
    when {
      longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections -> {
        // 假如闲暇最久的衔接比keepAliveDurationNs这个值要大就收回
        val connection = longestIdleConnection!!
        ...
        // 关闭socket
        connection.socket().closeQuietly()
        if (connections.isEmpty()) cleanupQueue.cancelAll()
        // 咱们只收回了闲暇超时最久的衔接,可能还会有其他衔接也超时了,回来0让它立马进行下一次整理
        return 0L
      }
      idleConnectionCount > 0 -> {
        // 假如有闲暇衔接,就核算最近的一次闲暇超时的时刻,去等候
        return keepAliveDurationNs - longestIdleDurationNs
      }
      inUseConnectionCount > 0 -> {
        // 假如一切衔接都在运用,就等候这个超时时刻去重新检查整理
        return keepAliveDurationNs
      }
      else -> {
        // 假如没有衔接,就不需求再检查了
        return -1
      }
    }
}
-->

这儿面首要是干了这几个工作:

1.遍历缓存池中一切的Connection,依据pruneAndGetAllocationCount核算每个Connection被多少个恳求引证决议该Connection是否要进入收回判别逻辑, 假如需求被收回,得到闲暇时刻最长的Connection和时刻

2.对比方才收集到的最长时刻的Connection和keepAlive的时刻

3.对比当时闲暇的数量和衔接池答应的最大的闲暇数量

4.对满意23条件的Connection进行Socket断开操作,而且回来0立刻进行下一次cleanup收回,因为咱们只收回了闲暇时刻最久的衔接

5.假如有闲暇衔接,可是还没到最大闲暇时刻,那就回来时刻差值,等候这个时刻后再次履行cleanup收回

6.假如没有闲暇衔接,就等候keepAlive时刻后再次进行检查

7.假如没有衔接,就回来-1不检查了

pruneAndGetAllocationCount回来的是正在占用的恳求数,用于检测衔接是否闲暇,prune有修剪的意思, 除了核算被引证的次数外,内部遍历RealConnection的allocations弱引证列表,修剪并移除掉RealConnection.calls引证计数列表中现已内存收回的RealCall对应的弱引证自身 Refrence,这儿奇妙利用了弱引证的原理,相似WeakedHashMap:

<!--RealConnectionPool.kt
  private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
    connection.assertThreadHoldsLock()
    val references = connection.calls
    var i = 0
    while (i < references.size) {
      val reference = references[i]
      if (reference.get() != null) {//假如得到的为null,阐明弱引证指向的方针自身现已产生了内存走漏
        i++
        continue
      }
      // We've discovered a leaked call. This is an application bug.
      val callReference = reference as CallReference
      val message = "A connection to ${connection.route().address.url} was leaked. " +
          "Did you forget to close a response body?"
      Platform.get().logCloseableLeak(message, callReference.callStackTrace)
      references.removeAt(i)
      connection.noNewExchanges = true
      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNs = now - keepAliveDurationNs
        return 0
      }
    }
    return references.size
  }
-->

什么状况下会产生reference.get()==null的状况呢?

既然前面提到RealCall.releaseConnectionNoEvents中会自动对引证计数进行remove,那什么时候才会产生走漏的状况呢?比方得到一个Response之后一向不去读取的话实际上它会一向占中这个RealConnection,详细可能是下面的姿态:

<!--
client.newCall(getRequest()).enqueue(new Callback() {
  @Override
  public void onFailure(@NotNull Call call, @NotNull IOException e) {
  }
  @Override
  public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
    // 啥都不干
  }
});
-->

onResponse传入的response没有人去读取数据,就会一向占用衔接,可是因为它在后边又没有人引证就会被GC收回导致这条衔接再也不能断开。 pruneAndGetAllocationCount里边就经过弱引证get回来null的方式去检查到这样的异常,进行整理动作。

咱们也能够联想到OkHttp常常需求注意的两个问题的原因:

1.Response.string只能调用一次

因为Response.string读取完结之后这次恳求其实就现已完毕了,而且OkHttp并没有对这个成果做缓存, 所以下次再读取就会呈现java.lang.IllegalStateException: closed异常

2.Response有必要被及时读取

假如咱们得到一个Response之后一向不去读取的话实际上它会一向占中这这个Connect,下次HTTP 1.X的恳求就不能复用这套链接,要新建一条Connection

遗留问题

:因为allocationLimit限制了同一个RealConnection只能有一个恳求同时运用, 那便是说Http1.1的管道化piepling其实在okhttp中没有应用?