前语
最近在收拾面试笔记,两三年前在大众号写过两篇文章,由于现在很少写了怕有一天忘掉登录就看不到之前写的东西了,所以决议挪过来趁便捋一遍。
OkHttp源码剖析需求了解的几件工作
- okhttp恳求流程
- 使命调度
- 阻拦器
- 缓存策略
- 失利重连
- okhttp的复用衔接池
这么说看或许没啥动力,下面是我收集的关于okhttp的面试题,结合问题来看或许更有感觉一些
面试或许会问到的问题
- 简略说一下okhttp
- okhttp的中心类有哪些?
- okhttp关于网络恳求做了哪些优化,如何完成的?
- okhttp架构顶用到了哪些规划形式?
- okhttp阻拦器的履行次序
okhttp恳求进程
这种图四okhttp恳求的全部进程
先看看正常代码里怎样运用
/**
* 异步get恳求
*/
public static void get() {
//1.创立OkHttpClient目标
OkHttpClient okHttpClient =new OkHttpClient.Builder().retryOnConnectionFailure(true).connectTimeout(3000, TimeUnit.SECONDS).build();
//2.创立Request目标,设置一个url地址,设置恳求办法。
Request request = new Request.Builder().url("http://xxxx")
.method("GET", null)
.build();
//3.创立一个call目标,参数便是Request恳求目标
Call call = okHttpClient.newCall(request);
//4.恳求参加调度,重写回调办法
call.enqueue(new Callback() {
//恳求失利履行的办法
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: 失利===》" + e.getMessage());
}
//恳求成功履行的办法
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, "onResponse: " + response.body().string());
}
});
}
无论什么恳求都需求用到okhttpclient,咱们能够经过new的办法也能够经过Builder(制作者形式)的办法获取。显然这个办法也是用来初始化一些配置参数的。为了不占空间就不贴源码了。 接着okhttpClient会调用newCall办法,这个办法把恳求的参数传入。下面开端时源码
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
回来一个RealCall 这个类把okhttpclient和恳求参数做了封装,履行execute或许enqueue真实开端履行恳求。
enqueque()
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
先看看咱们用的比较多的异步恳求enqueue,实践上在enqueque办法中把恳求交给了okhttpClient里边封装的dispatch(使命调度器) 的enqueue。所以看看Dispatch这个类
Dispatch 使命调度器
成员变量如下.
/**最大并发数**/
private int maxRequests = 64;
/**每个主机最大恳求数**/
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** 线程池 Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/**正在等待的异步双端行列 Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在运转的异步双端行列 Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在运转的同步双端行列 Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
再看看构造办法
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
假如有配置线程池就用配置的,假如没有就用默许的。这儿感觉跟系统的Asynctask
差不多。默许的适合履行很多耗时比较少的操作,供给自界说线程池一般看需求,不过一般用默许的就够了。回到Dispatch
的enqueue
办法
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
/**
* Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
* them on the executor service. Must not be called with synchronization because executing calls
* can call into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
这儿的代码跟之前的版别不相同了,看了篇博客曾经的代码是这样的。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
为什么改成这样,咱们来问问ChatGPT
不得不说,ChatGPT真的很强,非常推荐运用
简略的说手动变变成自动了,总归enqueue
这个办法的判别是没有变的,最终都调用到了asyncCall.executeOn(executorService());
最终调用的也是executorService().execute(call);
再来看看的aysncCalls源码
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
其实咱们还有一个当地没有看,同步恳求,结合一同看或许会更好了解
@Override public Response execute() throws IOException {
if (originalRequest.body instanceof DuplexRequestBody) {
DuplexRequestBody duplexRequestBody = (DuplexRequestBody) originalRequest.body;
return duplexRequestBody.awaitExecute();
}
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
同步履行的excute办法是RellCall里边的办法,和AsyncCall里边的excute都句要害的代码。
Response response = getResponseWithInterceptorChain()
这儿的代码先不看,这个是同步和异步后续要履行的动作。同样的同步履行的excute也会经过调度器,可是经过源码发现,这儿仅仅单纯的增加到容器和从容器中删除。这样做目的是为了方便一致取消恳求以及需求记载恳求数量(异步+同步)。
来到这儿 能够发现,Dispatcher的效果跟它的命名相同是用来调度的,假如是异步恳求会在这儿创立线程池、把异步同步的使命分配到asyncCalls履行。假如是同步把恳求增加到双端行列中。而RellCall则是封装了okhttpclient以及恳求request(恳求参数)的恳求的建议类。
接着看
Response getResponseWithInterceptorChain() throws IOException {
// 增加一系列阻拦器,留意增加的次序
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
// 桥阻拦器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 缓存阻拦器:从缓存中拿数据
interceptors.add(new CacheInterceptor(client.internalCache()));
// 网络衔接阻拦器:建立网络衔接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
// 服务器恳求阻拦器:向服务器建议恳求获取数据
interceptors.add(new CallServerInterceptor(forWebSocket));
// 构建一条职责链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 处理职责链
return chain.proceed(originalRequest);
}
这个办法增加了一堆阻拦器,而且能够看到有Chain,这儿是一个职责链形式。
再看看proceed办法究竟做了什么
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
return response;
}
会发现其实调用了proceed时由于index每次+1会经过下一个阻拦器,当下一个阻拦器内部调用proceed时会以此类推的往下履行。回想一下咱们自界说阻拦器时,最终必然会回来的便是proceed办法。
return chain.proceed(builder.build());
假如不再深化阻拦器的话到这儿其完成已能够结束了,由于经过阻拦器咱们现已拿到了要回来的Response。下面是最初调用的例子
responseCallback.onResponse(RealCall.this, response);
剖析到这儿的大概流程
阻拦器
他人博客的一张图,对应源码其实能够知道,一开端增加的是自界说intercept的list。而随后顺次增加的阻拦器顺次的效果是:重连重定向–>构建恳求参数–>缓存(假如有则工作在这儿耗费)–>网络恳求
失利重连:RetryAndFollowUpInterceptor
@Override public Response intercept(Chain chain) throws IOException {
// ...
// 留意这儿咱们初始化了一个 StreamAllocation 并赋值给全局变量,它的效果咱们后面会说到
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
// 用来记载重定向的次数
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
// 这儿从当时的职责链开端履行一遍职责链,是一种重试的逻辑
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 调用 recover 办法从失利中进行康复,假如能够康复就回来true,否则回来false
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// 重试与服务器进行衔接
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// 假如 releaseConnection 为 true 则标明中心呈现了异常,需求释放资源
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// 运用之前的呼应 priorResponse 构建一个呼应,这种呼应的呼应体 body 为空
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder().body(null).build())
.build();
}
// 依据得到的呼应进行处理,或许会增加一些认证信息、重定向或许处理超时恳求
// 假如该恳求无法持续被处理或许呈现的过错不需求持续处理,将会回来 null
Request followUp = followUpRequest(response, streamAllocation.route());
// 无法重定向,直接回来之前的呼应
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
// 封闭资源
closeQuietly(response.body());
// 达到了重定向的最大次数,就抛出一个异常
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
// 这儿判别新的恳求是否能够复用之前的衔接,假如无法复用,则创立一个新的衔接
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
intercept
办法是每个阻拦器的中心办法,重连首要做的工作是,重连超越默许次数MAX_FOLLOW_UPS
(20)抛出异常,衔接成功则将恳求和重连成果一并传给下一个阻拦器。
桥接阻拦器: BridgeInterceptor
...
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
...
桥接阻拦器首要做的工作是构建恳求参数,曾经咱们会有个疑问便是都说okhttp是依据socket的而不是依据http的,答案就在这儿,它手动构建了恳求。socket仅仅一个通讯的通道类,能够了解为公路,而http跟udp这些能够了解为国道、省道、高速公路,他们有自己的规则,而这儿构建的便是http的规则。
运用缓存:CacheInterceptor
public final class CacheInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;
long now = System.currentTimeMillis();
// 依据恳求和缓存的呼应中的信息来判别是否存在缓存可用
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest; // 假如该恳求没有运用网络就为空
Response cacheResponse = strategy.cacheResponse; // 假如该恳求没有运用缓存就为空
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
// 恳求不运用网络而且不运用缓存,相当于在这儿就阻拦了,没必要交给下一级(网络恳求阻拦器)来履行
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 该恳求运用缓存,可是不运用网络:从缓存中拿成果,没必要交给下一级(网络恳求阻拦器)履行
if (networkRequest == null) {
return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
}
Response networkResponse = null;
try {
// 这儿调用了履行链的处理办法,实践便是交给自己的下一级来履行了
networkResponse = chain.proceed(networkRequest);
} finally {
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 这儿当拿到了网络恳求之后调用,下一级履行结束会交给它持续履行,假如运用了缓存就把恳求成果更新到缓存里
if (cacheResponse != null) {
// 服务器回来的成果是304,回来缓存中的成果
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
cache.trackConditionalCacheHit();
// 更新缓存
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
// 把恳求的成果放进缓存里
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
}
缓存阻拦器会依据恳求的信息和缓存的呼应的信息来判别是否存在缓存可用,假如有能够运用的缓存,那么就回来该缓存给用户,否则就持续运用职责链形式来从服务器中获取呼应。当获取到呼应的时分,又会把呼应缓存到磁盘上面。
缓存均依据map来缓存,key是恳求中url的md5,value是文件中查询到的缓存,页面依据LRU算法。cacheCandidate
是一个能够读取到Header
和Response
的类。总结做了以下几件工作
- cache若不为空则赋给cacheCandidate目标
- 获取缓存策略能够自己界说,默许为
Cachecontrol.FORCE_NETWORK
强制运用网络,也能够设置为FORCE_CACHE
强制运用本地缓存,假如没有缓存可用回来504 - 在不为空且有缓存策略时,若回来304则直接呼应缓存创立Response回来
- 若无网无缓存回来504
- 在有缓存策略但没有缓存时调用cache的put办法进行缓存
衔接:ConnectInterceptor
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
衔接阻拦器首要做了两件工作
- 加上url生成一个真实的恳求的流
RealConnection
- 将流和httpCode一并交给下一层阻拦器进行恳求,并回来Response 所以这儿仅仅向服务器建议衔接,而且真实的衔接仅仅在RealConnection里边。
恳求: CallServerInterceptor
public final class CallServerInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
// 获取 ConnectInterceptor 中初始化的 HttpCodec
HttpCodec httpCodec = realChain.httpStream();
// 获取 RetryAndFollowUpInterceptor 中初始化的 StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();
// 获取 ConnectInterceptor 中初始化的 RealConnection
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
// 在这儿写入恳求头
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
// 在这儿写入恳求体
if (responseBuilder == null) {
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
// 写入恳求体
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
// 读取呼应头
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
// 读取呼应体
int code = response.code();
if (code == 100) {
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener().responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
// ...
return response;
}
}
首要做的工作1.建议恳求2.完结读写3.依据回来码处理成果4.封闭衔接。
职责链最终一个阻拦器,拿到恳求成果后回来给上一级。
复用衔接池
private final ConnectionPool connectionPool;
里边有个复用衔接池,其实便是类似于线程池。便是咱们想知道的衔接复用的最中心的当地了。
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = () -> {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
/** Returns the number of idle connections in the pool. */
public synchronized int idleConnectionCount() {
int total = 0;
for (RealConnection connection : connections) {
if (connection.allocations.isEmpty()) total++;
}
return total;
}
/** Returns total number of connections in the pool. */
public synchronized int connectionCount() {
return connections.size();
}
/**
* Acquires a recycled connection to {@code address} for {@code streamAllocation}. If non-null
* {@code route} is the resolved route for a connection.
*/
void acquire(Address address, StreamAllocation streamAllocation, @Nullable Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return;
}
}
}
/**
* Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
* This recovers when multiple multiplexed connections are created concurrently.
*/
@Nullable Socket deduplicate(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, null)
&& connection.isMultiplexed()
&& connection != streamAllocation.connection()) {
return streamAllocation.releaseAndAcquire(connection);
}
}
return null;
}
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
/**
* Notify this pool that {@code connection} has become idle. Returns true if the connection has
* been removed from the pool and should be closed.
*/
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
if (connection.noNewStreams || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}
/** Close and remove all idle connections in the pool. */
public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.allocations.isEmpty()) {
connection.noNewStreams = true;
evictedConnections.add(connection);
i.remove();
}
}
}
for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
}
- 中心参数:
executor线程池、Deque双向行列保护RealConnect。也便是Socket的包装、RounteDatabase记载衔接失利时的的路线名单
- 构造办法中的参数:
最大衔接数默许为5个、保活时刻为5分钟
- 判别当时的衔接是否能够运用:流是否现已被封闭,而且现已被约束创立新的流;
- 假如当时的衔接无法运用,就从衔接池中获取一个衔接;
- 衔接池中也没有发现可用的衔接,创立一个新的衔接,并进行握手,然后将其放到衔接池中。
总结
okhttp的中心类有哪些?
Dispatch
- Dispatch经过保护一个线程池,来保护、办理、履行okhttp的恳求。整体能够看成是生成者与顾客模型
- Dispatch保护着三个行列分别是同步恳求行列
runningSyncCalls
、异步恳求行列runningAsyn
,异步缓存行列readyAsynCalls
和一个线程池executorService
2.Intercept
- 阻拦器是okhttp一种强壮的机制,它能够完成网络监听、恳求、以及呼应重写、恳求失利重试等功能。比如开发进程中咱们打印log、增加公共恳求头都是用阻拦器做的
- 不算自界说的阻拦器一共有5个,分别履行的次序是
1. 重连重定向阻拦器 2. 桥接阻拦器(封装恳求头,让socket支持http) 3. 缓存阻拦器(判别有没有缓存,有缓存能够设置不往下走) 4. 衔接阻拦器(判别没有缓存就要衔接服务器) 5. 网络阻拦器(真实建议恳求)
okhttp关于网络恳求做了哪些优化,如何完成的?
- 经过衔接池来削减求延时(有5分钟保活的长衔接)
- 缓存呼应削减重复的网络恳求
okhttp架构顶用到了哪些规划形式?
制作者形式、工厂形式、单例形式、职责链形式 前面三个基本上一切结构或多或少都有 真实比较有特点的是阻拦器里边的职责链形式
-
职责链形式的界说:使多个目标都有时机处理恳求,从而防止恳求的发送者和接受者之间的耦合联系, 将这个目标连成一条链,并沿着这条链传递该恳求,直到有一个目标处理他为止。
-
职责链形式应用场景:
1、对多个目标都能够处理同一恳求,但详细由哪一个处理则在运转时决议。
2、在恳求处理者不明确的情况下向多个目标中的一个提交一个恳求
3、需求动态指定一组目标处理恳求时