volley源码解析

前言

最近看完okhttp3的源码,想起之前看过《Android开发进阶:从小工到专家》这本书,里边依据volley写入一个简略的网络恳求结构,当时关于泛型都不会用的我来说,真的挺震慑的。又开发了几年,感觉可以运用这个机会学习下这个库,还能趁okhttp3的常识还没忘记,正好对比下,看看都有哪些优缺点。

生活总要给自己找一些事做,否则浑浑噩噩,什么都不想干,恍恍惚惚一天又一天的,白白就过去了。

整体结构

首先贴上volley官网的阐明,英文的,不过也能看了:

官网阐明链接

再一个看一个库的源码,最好了解下源码的整体结构,网上尽管有许多解析volley的文章,大多都是用的官网的这张图:

volley源码解析

可是我觉得这图还不如《Android开发进阶:从小工到专家》里边的结构明晰,于是我拿书里的图改了改,希望能让读者了解起来轻松点:

volley源码解析

图改的不怎么样,大致便是这样一个流程吧。

基本运用

和okhttp相同,这儿先看下简略运用,再一步一步终究完结原理.

        // 1,创立RequestQueue,默许运用BasicNetwork、HurlStack
        val requestQueue = Volley.newRequestQueue(context)
        val url = "https://www.baidu.com"
        // 2,结构一个 request(恳求)
        val request = StringRequest(Request.Method.GET, url, {
            Log.d("TAG", "useVolley result: $it")
        }, {
            Log.d("TAG", "useVolley error: ${it.message}")
        })
        // 3,把request添加进恳求行列RequestQueue里边
        requestQueue.add(request)

好了,记住这个运用,接下来咱们就进入源码剖析环节。

源码剖析

创立RequestQueue

首先来看第一步,这儿经过Volley类创立一个RequestQueue目标,点开Volley类,可以发现三个创立RequestQueue的办法,一个抛弃了,咱们看下剩下的两个办法:

public static RequestQueue newRequestQueue(Context context)
public static RequestQueue newRequestQueue(Context context, BaseHttpStack stack)

疏忽SDK_INT < 9的状况,代码可以简化到下面方式:

    public static RequestQueue newRequestQueue(Context context, BaseHttpStack stack) {
        BasicNetwork network;
        if (stack == null) {
            // 默许状况
            network = new BasicNetwork(new HurlStack());
        } else {
            network = new BasicNetwork(stack);
        }
        return newRequestQueue(context, network);
    }
    private static RequestQueue newRequestQueue(Context context, Network network) {
        final Context appContext = context.getApplicationContext();
        // Use a lazy supplier for the cache directory so that newRequestQueue() can be called on
        // main thread without causing strict mode violation.
        DiskBasedCache.FileSupplier cacheSupplier =
                new DiskBasedCache.FileSupplier() {
                    private File cacheDir = null;
                    @Override
                    public File get() {
                        // 运用届时才创立文件
                        if (cacheDir == null) {
                            cacheDir = new File(appContext.getCacheDir(), DEFAULT_CACHE_DIR);
                        }
                        return cacheDir;
                    }
                };
        // 创立queue,并发动
        RequestQueue queue = new RequestQueue(new DiskBasedCache(cacheSupplier), network);
        queue.start();
        return queue;
    }

代码很明晰,便是创立HttpStack、Network以及RequestQueue,HttpStack和Network默许完结是HurlStack及BasicNetwork。

下面咱们先简略看下HttpStack、Network、RequestQueue三个类。

HttpStack

HttpStack是volley中实践建议恳求的接口,输出HttpResponse(body还在流中),详细完结类是HurlStack,它依据HttpURLConnection来网络交互。

由于HttpClient被安卓抛弃了,而原有接口HttpStack用到了,所以volley后边运用BaseHttpStack替代了它,并用executeRequest替代了原有的performRequest办法。

public abstract class BaseHttpStack implements HttpStack {
    // 实践运用办法
    public abstract HttpResponse executeRequest(
            Request<?> request, Map<String, String> additionalHeaders)
            throws IOException, AuthFailureError;
    @Deprecated
    @Override
    public final org.apache.http.HttpResponse performRequest(
            Request<?> request, Map<String, String> additionalHeaders)
            throws IOException, AuthFailureError {...}
}

BaseHttpStack有两个完结,AsyncHttpStack和HurlStack,疏忽AdaptedHttpStack(适配HttpClient的),这个后边运用届时再详解。

Network

Network是volley中的网络层,详细完结类是BasicNetwork,输出NetworkResponse,与BaseHttpStack不同的是会读取body流到byte数组,并会对header进行些处理。中心代码如下:

    @Override
    public NetworkResponse performRequest(Request<?> request) throws VolleyError {
        long requestStart = SystemClock.elapsedRealtime();
        // 留意这个循环,重试、重定向可能用到
        while (true) {
            HttpResponse httpResponse = null;
            byte[] responseContents = null;
            List<Header> responseHeaders = Collections.emptyList();
            try {
                // 额定的header信息
                Map<String, String> additionalRequestHeaders = ...
                httpResponse = mBaseHttpStack.executeRequest(request, additionalRequestHeaders);
                // 状况码及返回的header
                int statusCode = httpResponse.getStatusCode();
                responseHeaders = httpResponse.getHeaders();
                // 更新缓存header
                if (statusCode == HttpURLConnection.HTTP_NOT_MODIFIED) {...}
                // 处理body
                InputStream inputStream = httpResponse.getContent();
                if (inputStream != null) {...} else {...}
                // 反常状况码
                if (statusCode < 200 || statusCode > 299) {throw new IOException();}
                return new NetworkResponse(...);
            } catch (IOException e) {
                // 重试,下一个循环
                RetryInfo retryInfo = ...
                NetworkUtility.attemptRetryOnException(request, retryInfo);
            }
        }
    }

Network还有另一个承继类AsyncNetwork,而BasicAsyncNetwork又承继了AsyncNetwork,这个咱们后边也会讲解,先了解下。

RequestQueue

RequestQueue与其说它是一个恳求行列,倒不如说它是Request的一个manager,它担任办理request,并整合了CacheDispatcher和NetworkDispatcher。

public void start()
public void stop()
public void cancelAll(RequestFilter filter)
public void cancelAll(final Object tag)
public <T> Request<T> add(Request<T> request)

RequestQueue并不是一个queue,可是它里边有两个优先级堵塞行列: mCacheQueue和mNetworkQueue,RequestQueue中start办法发动后,CacheDispatcher和NetworkDispatcher就会从这两个堵塞行列中取出request去履行。

    // 在Volley的newRequestQueue办法中调用
    public void start() {
        stop(); // Make sure any currently running dispatchers are stopped.
        // Create the cache dispatcher and start it.
        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
        mCacheDispatcher.start();
        // Create network dispatchers (and corresponding threads) up to the pool size.
        for (int i = 0; i < mDispatchers.length; i++) {
            NetworkDispatcher networkDispatcher =
                    new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery);
            mDispatchers[i] = networkDispatcher;
            networkDispatcher.start();
        }
    }

关于CacheDispatcher和NetworkDispatcher的内容后边详解,这儿知道他们是处理缓存和网络的线程就行,便是要留意下CacheDispatcher只需一个,而NetworkDispatcher有默许四个,而且并未用到线程池。

    public <T> Request<T> add(Request<T> request) {
        // ...
        synchronized (mCurrentRequests) {
            // 便于一致操作request
            mCurrentRequests.add(request);
        }
        // ...
        beginRequest(request);
        return request;
    }
    <T> void beginRequest(Request<T> request) {
        // If the request is uncacheable, skip the cache queue and go straight to the network.
        if (!request.shouldCache()) {
            sendRequestOverNetwork(request);
        } else {
            mCacheQueue.add(request);
        }
    }

调用RequestQueue的add办法传入request,就会将request放入mCacheQueue或许mNetworkQueue,由是request的mShouldCache决定。

创立Request

RequestQueue的运用需求一个request目标,Request是一个笼统类,需求承继它并重写parseNetworkResponse和deliverResponse办法:

public class StringRequest extends Request<String> {
    // 将response目标以何种办法传递给用户代码
    @Override
    protected void deliverResponse(String response) {
        Response.Listener<String> listener;
        synchronized (mLock) {
            listener = mListener;
        }
        if (listener != null) {
            listener.onResponse(response);
        }
    }
    // 将数据解析成详细的数据类
    @Override
    protected Response<String> parseNetworkResponse(NetworkResponse response) {
        String parsed;
        try {
            parsed = new String(response.data, HttpHeaderParser.parseCharset(response.headers));
        } catch (UnsupportedEncodingException e) {
            parsed = new String(response.data);
        }
        return Response.success(parsed, HttpHeaderParser.parseCacheHeaders(response));
    }
}

Request类并不仅仅一个数据类,它还持有了许多引证,如RequestQueue、Cache.Entry、NetworkRequestCompleteListener等,还听供给了一些对request处理的办法:

void finish(final String tag)
public void cancel()

经过parseNetworkResponse和deliverResponse办法后,用户代码应该就能拿到response目标了,或许收到一个VolleyError。

添加Request使命

其实前面都是开胃菜,为什么RequestQueue中添加(add) request后,咱们就能能拿到response了呢?RequestQueue的add办法咱们前面已经铁过代码了,下面咱们再贴一遍:

    public <T> Request<T> add(Request<T> request) {
        // ...
        synchronized (mCurrentRequests) {
            // 便于一致操作request
            mCurrentRequests.add(request);
        }
        // ...
        beginRequest(request);
        return request;
    }
    <T> void beginRequest(Request<T> request) {
        // If the request is uncacheable, skip the cache queue and go straight to the network.
        if (!request.shouldCache()) {
            sendRequestOverNetwork(request);
        } else {
            mCacheQueue.add(request);
        }
    }
    <T> void sendRequestOverNetwork(Request<T> request) {
        mNetworkQueue.add(request);
    }

其实仅仅将request加到了mCacheQueue和mNetworkQueue中,那为什么request会被履行并拿到response呢?实践原理是在RequestQueue的start办法上

    // 在Volley的newRequestQueue办法中调用
    public void start() {
        stop(); // Make sure any currently running dispatchers are stopped.
        // Create the cache dispatcher and start it.
        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
        mCacheDispatcher.start();
        // Create network dispatchers (and corresponding threads) up to the pool size.
        for (int i = 0; i < mDispatchers.length; i++) {
            NetworkDispatcher networkDispatcher =
                    new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery);
            mDispatchers[i] = networkDispatcher;
            networkDispatcher.start();
        }
    }

Volley的newRequestQueue办法发动了RequestQueue,还记得说CacheDispatcher和NetworkDispatcher是线程吗?是的,他们发动后就在死循环,等request添加到mCacheQueue和mNetworkQueue中,然后供它们take。

    private static RequestQueue newRequestQueue(Context context, Network network) {
        // ...
        RequestQueue queue = new RequestQueue(new DiskBasedCache(cacheSupplier), network);
        queue.start();
        return queue;
    }

网络流程

下面就依据NetworkDispatcher来讲解下网络恳求的流程。

概述

NetworkDispatcher比较简略,它会从mNetworkQueue中取出request,再经过mNetwork拿到networkResponse,随后经过request.parseNetworkResponse拿到详细的response,然后由mDelivery传递出去。

    private final BlockingQueue<Request<?>> mQueue;
    // 结构办法
    public NetworkDispatcher(
            BlockingQueue<Request<?>> queue,
            Network network,
            Cache cache,
            ResponseDelivery delivery) {
        mQueue = queue;
        mNetwork = network;
        mCache = cache;
        mDelivery = delivery;
    }

线程run

    @Override
    public void run() {
        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
        while (true) {
            try {
                processRequest();
            } catch (InterruptedException e) {
                // We may have been interrupted because it was time to quit.
                if (mQuit) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

尽管NetworkDispatcher比较简略,可是首要学东西嘛,这儿有个线程的interrupt有意思,可以看下。这个mQuit设置来源于quit办法:

    public void quit() {
        mQuit = true;
        interrupt();
    }
    // RequestQueue
    public void stop() {
        if (mCacheDispatcher != null) {
            mCacheDispatcher.quit();
        }
        for (final NetworkDispatcher mDispatcher : mDispatchers) {
            if (mDispatcher != null) {
                mDispatcher.quit();
            }
        }
    }

RequestQueue调用stop办法,NetworkDispatcher被interrupt,线程会中止,可是request被撤销并不会影响线程,也不会interrupt:

// NetworkDispatcher的processRequest办法中
if (request.isCanceled()) {
    request.finish("network-discard-cancelled");
    request.notifyListenerResponseNotUsable();
    return;
}

中心办法processRequest剖析

    private void processRequest() throws InterruptedException {
        // Take a request from the queue. 四个NetworkDispatcher轮番拿取request
        Request<?> request = mQueue.take();
        processRequest(request);
    }
    void processRequest(Request<?> request) {
        long startTimeMs = SystemClock.elapsedRealtime();
        request.sendEvent(RequestQueue.RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED);
        try {
            request.addMarker("network-queue-take");
            // If the request was cancelled already, do not perform the
            // network request.
            if (request.isCanceled()) {
                request.finish("network-discard-cancelled");
                request.notifyListenerResponseNotUsable();
                return;
            }
            addTrafficStatsTag(request);
            // Perform the network request. 实践建议恳求
            NetworkResponse networkResponse = mNetwork.performRequest(request);
            request.addMarker("network-http-complete");
            // If the server returned 304 AND we delivered a response already,
            // we're done -- don't deliver a second identical response.
            if (networkResponse.notModified && request.hasHadResponseDelivered()) {
                request.finish("not-modified");
                request.notifyListenerResponseNotUsable();
                return;
            }
            // Parse the response here on the worker thread. 将networkResponse转化成详细的Response
            Response<?> response = request.parseNetworkResponse(networkResponse);
            request.addMarker("network-parse-complete");
            // Write to cache if applicable. 更新缓存
            if (request.shouldCache() && response.cacheEntry != null) {
                mCache.put(request.getCacheKey(), response.cacheEntry);
                request.addMarker("network-cache-written");
            }
            // Post the response back.
            request.markDelivered();
            // 由mDelivery从线程中转化到主线程,投递response
            mDelivery.postResponse(request, response);
            // 网络完结的成功回调
            request.notifyListenerResponseReceived(response);
        } catch (VolleyError volleyError) {
            volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs);
            // 里边会parseNetworkError,并由mDelivery投递error
            parseAndDeliverNetworkError(request, volleyError);
            // 网络完结的失利回调
            request.notifyListenerResponseNotUsable();
        } catch (Exception e) {
            VolleyLog.e(e, "Unhandled exception %s", e.toString());
            VolleyError volleyError = new VolleyError(e);
            volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs);
            mDelivery.postError(request, volleyError);
            request.notifyListenerResponseNotUsable();
        } finally {
            request.sendEvent(RequestQueue.RequestEvent.REQUEST_NETWORK_DISPATCH_FINISHED);
        }
    }

这儿需求留意的是,只需运用了NetworkDispatcher那便是异步恳求,而且volley内是四个NetworkDispatcher一起进行的,轮番从mQueue(BlockingQueue<Request<?>>)中拿取(take)request。

一大堆代码下来,其实就两个要点,一是经过mNetwork去取得networkResponse,二是经过mDelivery投递response或许error。

mNetwork去取得networkResponse

上面讲到了NetworkDispatcher经过mNetwork去取得NetworkResponse,mNetwork的详细完结是BasicNetwork,而BasicNetwork经过mBaseHttpStack去获取httpResponse,会传入request和一些额定的header信息

    public NetworkResponse performRequest(Request<?> request) throws VolleyError {
        long requestStart = SystemClock.elapsedRealtime();
        // 留意这个循环,会重试、重定向
        while (true) {
            HttpResponse httpResponse = null;
            byte[] responseContents = null;
            List<Header> responseHeaders = Collections.emptyList();
            try {
                // Gather headers. 额定的header信息: "If-None-Match"、"If-Modified-Since"
                Map<String, String> additionalRequestHeaders =
                        HttpHeaderParser.getCacheHeaders(request.getCacheEntry());
                // mBaseHttpStack经过HttpURLConnection去网络恳求
                httpResponse = mBaseHttpStack.executeRequest(request, additionalRequestHeaders);
                int statusCode = httpResponse.getStatusCode();
                // 更新缓存header
                responseHeaders = httpResponse.getHeaders();
                // Handle cache validation.
                if (statusCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
                    long requestDuration = SystemClock.elapsedRealtime() - requestStart;
                    return NetworkUtility.getNotModifiedNetworkResponse(
                            request, requestDuration, responseHeaders);
                }
                // 读取body到byte数组,直接进内存!!!==> 大文件恳求别这么搞。。。
                InputStream inputStream = httpResponse.getContent();
                if (inputStream != null) {
                    responseContents =
                            NetworkUtility.inputStreamToBytes(
                                    inputStream, httpResponse.getContentLength(), mPool);
                } else {
                    // Add 0 byte response as a way of honestly representing a
                    // no-content request.
                    responseContents = new byte[0];
                }
                // if the request is slow, log it.
                long requestLifetime = SystemClock.elapsedRealtime() - requestStart;
                NetworkUtility.logSlowRequests(
                        requestLifetime, request, responseContents, statusCode);
                // 错误状况码
                if (statusCode < 200 || statusCode > 299) {
                    throw new IOException();
                }
                return new NetworkResponse(
                        statusCode,
                        responseContents,
                        /* notModified= */ false,
                        SystemClock.elapsedRealtime() - requestStart,
                        responseHeaders);
            } catch (IOException e) {
                // This will either throw an exception, breaking us from the loop, or will loop
                // again and retry the request.
                RetryInfo retryInfo = NetworkUtility.shouldRetryException(
                                request, e, requestStart, httpResponse, responseContents);
                // 记录信息,并重试
                // We should already be on a background thread, so we can invoke the retry inline.
                NetworkUtility.attemptRetryOnException(request, retryInfo);
            }
        }
    }

这个Network的performRequest上面简略讲过,这儿代码完整一些。

这儿留意下BasicNetwork拿到httpResponse后,会读取出其间的body(inputStream),并保存到byte[]数组中,并转成NetworkResponse,body的inputStream被封装为UrlConnectionInputStream,读取完结后会封闭UrlConnection。

mBaseHttpStack去取得httpResponse

接下来BaseHttpStack经过executeRequest办法取得httpResponse,完结类是HurlStack,代码如下:

    @Override
    public HttpResponse executeRequest(Request<?> request, Map<String, String> additionalHeaders)
            throws IOException, AuthFailureError {
        // 恳求链接
        String url = request.getUrl();
        // 处理悉数header
        HashMap<String, String> map = new HashMap<>();
        map.putAll(additionalHeaders);
        // Request.getHeaders() takes precedence over the given additional (cache) headers).
        map.putAll(request.getHeaders());
        // mUrlRewriter要自行设置,可疏忽
        if (mUrlRewriter != null) {
            String rewritten = mUrlRewriter.rewriteUrl(url);
            if (rewritten == null) {
                throw new IOException("URL blocked by rewriter: " + url);
            }
            url = rewritten;
        }
        // 运用HttpURLConnection恳求
        URL parsedUrl = new URL(url);
        HttpURLConnection connection = openConnection(parsedUrl, request);
        boolean keepConnectionOpen = false;
        try {
            for (String headerName : map.keySet()) {
                connection.setRequestProperty(headerName, map.get(headerName));
            }
            // 设置HttpURLConnection的恳求办法,GET、POST等
            setConnectionParametersForRequest(connection, request);
            // Initialize HttpResponse with data from the HttpURLConnection.
            int responseCode = connection.getResponseCode();
            if (responseCode == -1) {
                // -1 is returned by getResponseCode() if the response code could not be retrieved.
                // Signal to the caller that something was wrong with the connection.
                throw new IOException("Could not retrieve response code from HttpUrlConnection.");
            }
            // 恳求没有body
            if (!hasResponseBody(request.getMethod(), responseCode)) {
                return new HttpResponse(responseCode, convertHeaders(connection.getHeaderFields()));
            }
            // Need to keep the connection open until the stream is consumed by the caller. Wrap the
            // stream such that close() will disconnect the connection.
            keepConnectionOpen = true;
            return new HttpResponse(
                    responseCode,
                    // 处理Headers
                    convertHeaders(connection.getHeaderFields()),
                    connection.getContentLength(),
                    // 把body封装成UrlConnectionInputStream
                    createInputStream(request, connection));
        } finally {
            if (!keepConnectionOpen) {
                connection.disconnect();
            }
        }
    }

这儿代码也不杂乱,便是要看下body的封装:

    static class UrlConnectionInputStream extends FilterInputStream {
        private final HttpURLConnection mConnection;
        UrlConnectionInputStream(HttpURLConnection connection) {
            super(inputStreamFromConnection(connection));
            mConnection = connection;
        }
        @Override
        public void close() throws IOException {
            super.close();
            mConnection.disconnect();
        }
    }

body被封装成了UrlConnectionInputStream,这儿当body的InputStream被close的时分,mConnection也会被封闭衔接。

mDelivery投递response

NetworkDispatcher终究经过mDelivery传递response,mDelivery是一个ExecutorDelivery目标,内部有一个Executor来履行Runnable,默许在主线程handler履行:

    // RequestQueue结构函数中创立
    public RequestQueue(Cache cache, Network network, int threadPoolSize) {
        this(
                cache,
                network,
                threadPoolSize,
                 // 传入主线程Looper
                new ExecutorDelivery(new Handler(Looper.getMainLooper())));
    }
    // 结构办法
    public ExecutorDelivery(final Handler handler) {
        // Make an Executor that just wraps the handler.
        mResponsePoster =
                new Executor() {
                    @Override
                    public void execute(Runnable command) {
                        handler.post(command);
                    }
                };
    }

ExecutorDelivery中实践便是对mRequest做一些符号,最后经过Request的deliverResponse办法,将Response.result传递过去了,实践便是交给了Request里边的listener,经过回调将response交出去

    @Override
    public void postResponse(Request<?> request, Response<?> response, Runnable runnable) {
        request.markDelivered();
        request.addMarker("post-response");
        mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, runnable));
    }

下面是ResponseDeliveryRunnable的run办法:

        @Override
        public void run() {
            // If this request has canceled, finish it and don't deliver.
            if (mRequest.isCanceled()) {
                mRequest.finish("canceled-at-delivery");
                return;
            }
            // Deliver a normal response or error, depending.
            if (mResponse.isSuccess()) {
                mRequest.deliverResponse(mResponse.result);
            } else {
                mRequest.deliverError(mResponse.error);
            }
            // If this is an intermediate response, add a marker, otherwise we're done
            // and the request can be finished.
            if (mResponse.intermediate) {
                mRequest.addMarker("intermediate-response");
            } else {
                mRequest.finish("done");
            }
            // 传入的额定的runnable
            // If we have been provided a post-delivery runnable, run it.
            if (mRunnable != null) {
                mRunnable.run();
            }
        }

至于怎么到达用户代码,前面Request中咱们已经有提到了,要害就在Request的两个笼统办法: parseNetworkResponse和deliverResponse。

缓存流程

CacheDispatcher取缓存

CacheDispatcher和NetworkDispatcher相似,结构办法内多个WaitingRequestManager:

    public CacheDispatcher(
            BlockingQueue<Request<?>> cacheQueue,
            BlockingQueue<Request<?>> networkQueue,
            Cache cache,
            ResponseDelivery delivery) {
        mCacheQueue = cacheQueue;
        mNetworkQueue = networkQueue;
        mCache = cache;
        mDelivery = delivery;
        mWaitingRequestManager = new WaitingRequestManager(this, networkQueue, delivery);
    }

CacheDispatcher也是一个线程,它的run办法中会对mCache进行初始化,剩下的和CacheDispatcher相似:

@Override
    public void run() {
        // ...
        // Make a blocking call to initialize the cache.
        mCache.initialize();
        while (true) {
            try {
                processRequest();
            } catch (InterruptedException e) {
                // ...
            }
        }
    }

它从mCacheQueue取出request,再从mCache中经过request.getCacheKey取缓存,没缓存还要经过WaitingRequestManager判断下:

    private void processRequest() throws InterruptedException {
        final Request<?> request = mCacheQueue.take();
        processRequest(request);
    }
    void processRequest(final Request<?> request) throws InterruptedException {
        // ...
        try {
            // If the request has been canceled, don't bother dispatching it.
            if (request.isCanceled()) {
                request.finish("cache-discard-canceled");
                return;
            }
            // Attempt to retrieve this item from cache.
            Cache.Entry entry = mCache.get(request.getCacheKey());
            if (entry == null) {
                request.addMarker("cache-miss");
                // 未获取到缓存,则交给网络恳求堵塞行列,并完毕此processRequest
                if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
                    mNetworkQueue.put(request);
                }
                return;
            }
            // 缓存超时也重新网络恳求
            long currentTimeMillis = System.currentTimeMillis();
            if (entry.isExpired(currentTimeMillis)) {
                request.addMarker("cache-hit-expired");
                request.setCacheEntry(entry);
                if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
                    mNetworkQueue.put(request);
                }
                return;
            }
            // 击中缓存
            request.addMarker("cache-hit");
            Response<?> response = request.parseNetworkResponse(
                            new NetworkResponse(entry.data, entry.responseHeaders));
            request.addMarker("cache-hit-parsed");
            // 拿到缓存可是处理失利了
            if (!response.isSuccess()) {
                request.addMarker("cache-parsing-failed");
                mCache.invalidate(request.getCacheKey(), true);
                request.setCacheEntry(null);
                // 假如有和这个request相同使命的request正在恳求,可以等一等,由于一瞬间可以取得最新数据
                if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
                    mNetworkQueue.put(request);
                }
                return;
            }
            if (!entry.refreshNeeded(currentTimeMillis)) {
                // 不需求更新,立即返回
                mDelivery.postResponse(request, response);
            } else {
                // 即使击中了缓存,也去恳求下,取得最新书
                // Soft-expired cache hit. We can deliver the cached response,
                // but we need to also send the request to the network for
                // refreshing.
                request.addMarker("cache-hit-refresh-needed");
                request.setCacheEntry(entry);
                // Mark the response as intermediate. 立即收效
                response.intermediate = true;
                if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
                    // 不在等候行列中,投递response数据后,加到网络恳求堵塞行列去恳求
                    mDelivery.postResponse(
                            request,
                            response,
                            new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        mNetworkQueue.put(request);
                                    } catch (InterruptedException e) {
                                        // Restore the interrupted status
                                        Thread.currentThread().interrupt();
                                    }
                                }
                            });
                } else {
                    // 等候行列中有相同目的的request
                    mDelivery.postResponse(request, response);
                }
            }
        } finally {
            request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED);
        }
    }

注释写的清楚了,这儿要特别讲下WaitingRequestManager,它里边有一个mWaitingRequests储存着等候有相同恳求使命的request,这些request会等候第一个request(网络恳求)的结果,并更新它们一切。不太好了解,着重看下下面代码:

if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
    mDelivery.postResponse(request,response,new Runnable() {
        @Override
        public void run() {
            try {
                mNetworkQueue.put(request);
            } catch (InterruptedException e) {
                // Restore the interrupted status
                Thread.currentThread().interrupt();
            }
        }
    });
    // ...    
}

这是缓存击中后,去更新缓存的动作,maybeAddToWaitingRequests中返回false的那里,会把这个request加到mWaitingRequests中(第一个):

        } else {
            mWaitingRequests.put(cacheKey, null);
            request.setNetworkRequestCompleteListener(this);
            return false;
        }

再来看在mWaitingRequests有相同目的的request时,maybeAddToWaitingRequests也会把它加到mWaitingRequests中,可是它就(很有可能)不是第一个了,而是要第一个request更新的东东。

// maybeAddToWaitingRequests中
if (mWaitingRequests.containsKey(cacheKey)) {
    // There is already a request in flight. Queue up.
    List<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
    if (stagedRequests == null) {
        stagedRequests = new ArrayList<>();
    }
    request.addMarker("waiting-for-response");
    stagedRequests.add(request);
    // 加到等候行列
    mWaitingRequests.put(cacheKey, stagedRequests);
    return true;
}
if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
    // ...
} else {
    mDelivery.postResponse(request, response);
}

至于mWaitingRequests怎么被通知的,咱们看下下面代码:

    // WaitingRequestManager中
    @Override
    public void onResponseReceived(Request<?> request, Response<?> response) {
        // 。。。for循环更新,疏忽
    }
    // Request中被调用
    void notifyListenerResponseReceived(Response<?> response) {
        NetworkRequestCompleteListener listener;
        synchronized (mLock) {
            listener = mRequestCompleteListener;
        }
        if (listener != null) {
            listener.onResponseReceived(this, response);
        }
    }

实践便是Request中的notifyListenerResponseReceived会更新mWaitingRequests中这些request,而Request中的notifyListenerResponseReceived前面有提到,是在NetworkDispatcher的processRequest办法中调用的。

好了,这些代码有点绕,可是看懂它的先后次序,那就明了了。

缓存Cache

DiskBasedCache是担任Volley缓存的完结类,NoCache和ThrowingCache也承继了Cache接口,可是一个是没完结,一个是抛出反常。Cache接口办法如下,里边还有个Entry类就疏忽了:

public interface Cache {
    @Nullable
    Entry get(String key);
    void put(String key, Entry entry);
    void initialize();
    void invalidate(String key, boolean fullExpire);
    void remove(String key);
    void clear();
}

DiskBasedCache缓存经过mRootDirectorySupplier目录下的文件完结:

    // Volley中创立
    @NonNull
    private static RequestQueue newRequestQueue(Context context, Network network) {
        final Context appContext = context.getApplicationContext();
        DiskBasedCache.FileSupplier cacheSupplier =
                new DiskBasedCache.FileSupplier() {
                    private File cacheDir = null;
                    @Override
                    public File get() {
                        if (cacheDir == null) {
                            cacheDir = new File(appContext.getCacheDir(), DEFAULT_CACHE_DIR);
                        }
                        return cacheDir;
                    }
                };
        RequestQueue queue = new RequestQueue(new DiskBasedCache(cacheSupplier), network);
        queue.start();
        return queue;
    }

从initialize办法中可以看出,每个文件包括CacheHeader类部分信息、ResponseHeaders以及body数据。

    @Override
    public synchronized void initialize() {
        File rootDirectory = mRootDirectorySupplier.get();
        if (!rootDirectory.exists()) {
            if (!rootDirectory.mkdirs()) {
                VolleyLog.e("Unable to create cache dir %s", rootDirectory.getAbsolutePath());
            }
            return;
        }
        File[] files = rootDirectory.listFiles();
        if (files == null) {
            return;
        }
        for (File file : files) {
            try {
                long entrySize = file.length();
                CountingInputStream cis =
                        new CountingInputStream(
                                new BufferedInputStream(createInputStream(file)), entrySize);
                try {
                    // 只读取header信息,CountingInputStream会记录剩下巨细,及body的巨细
                    CacheHeader entry = CacheHeader.readHeader(cis);
                    entry.size = entrySize;
                    putEntry(entry.key, entry);
                } finally {
                    // Any IOException thrown here is handled by the below catch block by design.
                    //noinspection ThrowFromFinallyBlock
                    cis.close();
                }
            } catch (IOException e) {
                //noinspection ResultOfMethodCallIgnored
                file.delete();
            }
        }
    }

DiskBasedCache内经过mEntries(Map<String, CacheHeader>)维持缓存的一些信息,但不包括body数据,可是body会保存在Entry目标里边(data特点)。CacheHeader可以经过toCacheEntry得到Entry目标,需求传入body数据(byte[] data)

        // DiskBasedCache特点
        private final Map<String, CacheHeader> mEntries = new LinkedHashMap<>(16, .75f, true);
        // Entry的办法
        Entry toCacheEntry(byte[] data) {
            Entry e = new Entry();
            e.data = data;
            e.etag = etag;
            e.serverDate = serverDate;
            e.lastModified = lastModified;
            e.ttl = ttl;
            e.softTtl = softTtl;
            e.responseHeaders = HttpHeaderParser.toHeaderMap(allResponseHeaders);
            e.allResponseHeaders = Collections.unmodifiableList(allResponseHeaders);
            return e;
        }

CacheDispatcher中经过CacheKey拿到Cache.Entry,由entry的data(body数据)和responseHeaders即可构建NetworkResponse目标,继而转成Response,拿到缓存

    @Override
    public synchronized Entry get(String key) {
        CacheHeader entry = mEntries.get(key);
        // if the entry does not exist, return.
        if (entry == null) {
            return null;
        }
        File file = getFileForKey(key);
        try {
            CountingInputStream cis =
                    new CountingInputStream(
                            new BufferedInputStream(createInputStream(file)), file.length());
            try {
                // 读出header数据
                CacheHeader entryOnDisk = CacheHeader.readHeader(cis);
                // 文件反常了
                if (!TextUtils.equals(key, entryOnDisk.key)) {
                    // File was shared by two keys and now holds data for a different entry!
                    removeEntry(key);
                    return null;
                }
                // 读出body数据
                byte[] data = streamToBytes(cis, cis.bytesRemaining());
                return entry.toCacheEntry(data);
            } finally {
                // 不管任何反常,这儿都会封闭
                cis.close();
            }
        } catch (IOException e) {
            remove(key);
            return null;
        }
    }

DiskBasedCache缓存时,会依据key创立一个文件,然后按次序写入CacheHeader的一些信息、header信息,最后写入entry的data数组,即body信息

@Override
    public synchronized void put(String key, Entry entry) {
        // 满了
        if (mTotalSize + entry.data.length > mMaxCacheSizeInBytes
                && entry.data.length > mMaxCacheSizeInBytes * HYSTERESIS_FACTOR) {
            return;
        }
        File file = getFileForKey(key);
        try {
            BufferedOutputStream fos = new BufferedOutputStream(createOutputStream(file));
            CacheHeader e = new CacheHeader(key, entry);
            // 先写入CacheHeader的一些信息、header
            boolean success = e.writeHeader(fos);
            if (!success) {
                fos.close();
                throw new IOException();
            }
            // 再写入body
            fos.write(entry.data);
            fos.close();
            // 加到mEntries中,记录起来
            e.size = file.length();
            putEntry(key, e);
            // 判断是否要裁切
            pruneIfNeeded();
        } catch (IOException e) {
            boolean deleted = file.delete();
            initializeIfRootDirectoryDeleted();
        }
    }

AsyncRequestQueue相关

ps. 感觉AsyncRequestQueue这一套完全不相同,假如仅仅了解volley原理,看上面内容就行了。 不过在我看来,下面的代码等于对上面volley的一个重构,选用线程池和ALL TASK的思想完结,可以学习下,下面写得比较糙,过一下

Volley里边还有个AsyncRequestQueue,用于恳求,它不是经过Volley类直接创立,而是要经过Builder创立,需求一个AsyncNetwork,其他自行配置,不设置在build的时分会创立默许的

    val asyncNetwork = BasicAsyncNetwork.Builder(object : AsyncHttpStack() {
        override fun executeRequest(
            request: Request<*>?,
            additionalHeaders: MutableMap<String, String>?,
            callback: OnRequestComplete?
        ) {
        }
    }).build()
    val asyncRequestQueue = AsyncRequestQueue.Builder(asyncNetwork).build()
    asyncRequestQueue.add(request)
    asyncRequestQueue.start()

AsyncRequestQueue中调用start之后就会开端履行线程使命,与RequestQueue不同的是,AsyncRequestQueue是经过mBlockingExecutor和mNonBlockingExecutor两个线程池去履行使命的

@Override
    public void start() {
        stop(); // Make sure any currently running threads are stopped
        // 由mExecutorFactory创立线程池
        mNonBlockingExecutor = mExecutorFactory.createNonBlockingExecutor(getBlockingQueue());
        mBlockingExecutor = mExecutorFactory.createBlockingExecutor(getBlockingQueue());
        mNonBlockingScheduledExecutor = mExecutorFactory.createNonBlockingScheduledExecutor();
        // 这两个是履行使命的线程池
        mNetwork.setBlockingExecutor(mBlockingExecutor);
        mNetwork.setNonBlockingExecutor(mNonBlockingExecutor);
        // 这个没用到好像
        mNetwork.setNonBlockingScheduledExecutor(mNonBlockingScheduledExecutor);
        // Kick off cache initialization, which must complete before any requests can be processed.
        if (mAsyncCache != null) {
            mNonBlockingExecutor.execute( () -> {
                mAsyncCache.initialize( 
                    new AsyncCache.OnWriteCompleteCallback() {
                        @Override
                        public void onWriteComplete() {
                            // 这么长就这个办法有用。。。
                            onCacheInitializationComplete();
                        }
                });
            } 
        } else {
            // 没缓存,经过堵塞去履行,mAsyncCache和cache的区别
            mBlockingExecutor.execute( () -> {
                getCache().initialize();
                mNonBlockingExecutor.execute( () -> {
                    onCacheInitializationComplete();
                }
            }
        }
    }
    // 等缓存初始化完结后,再发动前面的request
    private void onCacheInitializationComplete() {
        List<Request<?>> requestsToDispatch;
        synchronized (mCacheInitializationLock) {
            requestsToDispatch = new ArrayList<>(mRequestsAwaitingCacheInitialization);
            mRequestsAwaitingCacheInitialization.clear();
            mIsCacheInitialized = true;
        }
        // Kick off any requests that were queued while waiting for cache initialization.
        for (Request<?> request : requestsToDispatch) {
            beginRequest(request);
        }
    }

AsyncRequestQueue内两个线程池履行的是CacheTask和NetworkTask,AsyncRequestQueue内界说了许多承继于RequestTask的类,实践便是一个可比较的Runnable,恳求的一系列操作都经过各种RequestTask完结,比方缓存、处理缓存、处理response、处理error等

private class CacheParseTask<T> extends RequestTask<T>
private class CachePutTask<T> extends RequestTask<T>
private class CacheTask<T> extends RequestTask<T>
private class InvokeRetryPolicyTask<T> extends RequestTask<T>
private class NetworkParseTask<T> extends RequestTask<T>
private class NetworkTask<T> extends RequestTask<T>
private class ParseErrorTask<T> extends RequestTask<T>
private class ResponseParsingTask<T> extends RequestTask<T>

AsyncRequestQueue承继了RequestQueue,调用add办法也会触发beginRequest,继而履行使命,假如缓存还没有初始化,还得加到等候列表,需求等start办法发动,start办法发动后等候列表的恳求会实践运转

    @Override
    <T> void beginRequest(Request<T> request) {
        // If the cache hasn't been initialized yet, add the request to a temporary queue to be
        // flushed once initialization completes.
        if (!mIsCacheInitialized) {
            synchronized (mCacheInitializationLock) {
                if (!mIsCacheInitialized) {
                    // 先保存起来,等候缓存初始化
                    mRequestsAwaitingCacheInitialization.add(request);
                    return;
                }
            }
        }
        // If the request is uncacheable, send it over the network.
        if (request.shouldCache()) {
            if (mAsyncCache != null) {
                mNonBlockingExecutor.execute(new CacheTask<>(request));
            } else {
                // 没缓存,经过堵塞去履行,mAsyncCache和cache的区别
                mBlockingExecutor.execute(new CacheTask<>(request));
            }
        } else {
            // 便是添加了个NetworkTask
            sendRequestOverNetwork(request);
        }
    }
    @Override
    <T> void sendRequestOverNetwork(Request<T> request) {
        mNonBlockingExecutor.execute(new NetworkTask<>(request));
    }

AsyncRequestQueue中的缓存由CacheTask完结,可以设置一个mAsyncCache来异步处理缓存的增修改查,假如没在builder中供给,则必须供给一个cache,操作就和RequestQueue相同了

    private class CacheTask<T> extends RequestTask<T> {
        // ...
        @Override
        public void run() {
            // If the request has been canceled, don't bother dispatching it.
            if (mRequest.isCanceled()) {
                mRequest.finish("cache-discard-canceled");
                return;
            }
            mRequest.addMarker("cache-queue-take");
            // Attempt to retrieve this item from cache.
            if (mAsyncCache != null) {
                mAsyncCache.get(
                        mRequest.getCacheKey(),
                        new OnGetCompleteCallback() {
                            @Override
                            public void onGetComplete(Entry entry) {
                                handleEntry(entry, mRequest);
                            }
                        });
            } else {
                Entry entry = getCache().get(mRequest.getCacheKey());
                handleEntry(entry, mRequest);
            }
        }
    }
    // 和上面处理缓存的功能差不多
    private void handleEntry(final Entry entry, final Request<?> mRequest) {
        if (entry == null) {
            mRequest.addMarker("cache-miss");
            // 取缓存失利,直接网络恳求
            if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) {
                sendRequestOverNetwork(mRequest);
            }
            return;
        }
        // 超时也运用网络恳求
        long currentTimeMillis = System.currentTimeMillis();
        if (entry.isExpired(currentTimeMillis)) {
            mRequest.addMarker("cache-hit-expired");
            mRequest.setCacheEntry(entry);
            if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) {
                sendRequestOverNetwork(mRequest);
            }
            return;
        }
        // We have a cache hit; parse its data for delivery back to the request.
        mBlockingExecutor.execute(new CacheParseTask<>(mRequest, entry, currentTimeMillis));
    }

AsyncRequestQueue中的网络恳求由NetworkTask完结,NetworkTask中由AsyncNetwork的performRequest办法履行

    private class NetworkTask<T> extends RequestTask<T> {
        // ...
        @Override
        public void run() {
            // If the request was cancelled already, do not perform the network request.
            if (mRequest.isCanceled()) { // ... }
            final long startTimeMs = SystemClock.elapsedRealtime();
            mRequest.addMarker("network-queue-take");
            // Perform the network request.
            mNetwork.performRequest(
                    mRequest,
                    new OnRequestComplete() {
                        @Override
                        public void onSuccess(final NetworkResponse networkResponse) {
                            // ...
                            // 拿到的networkResponse经过mBlockingExecutor履行NetworkParseTask得到,再投递到主线程
                            mBlockingExecutor.execute(
                                    new NetworkParseTask<>(mRequest, networkResponse));
                        }
                        @Override
                        public void onError(final VolleyError volleyError) {
                            // 反常也要经过ParseErrorTask再处理
                            volleyError.setNetworkTimeMs(
                                    SystemClock.elapsedRealtime() - startTimeMs);
                            mBlockingExecutor.execute(new ParseErrorTask<>(mRequest, volleyError));
                        }
                    });
        }
    }

AsyncNetwork的完结类是BasicAsyncNetwork,可是AsyncRequestQueue中并未默许供给,需求在其Builder结构办法中传入

    // AsyncNetwork需求自己构建并传入
    val asyncRequestQueue = AsyncRequestQueue.Builder(asyncNetwork).build()

BasicAsyncNetwork内部持有一个mAsyncStack,网络恳求实践是经过AsyncHttpStack完结的,AsyncHttpStack是个笼统类,需求在BasicAsyncNetwork的Builder结构函数传入,结合上面AsyncNetwork也是要用户创立,真就呵呵了,AsyncHttpStack的executeRequest办法要用户自己写

    // BasicAsyncNetwork.Builder中要传入一个AsyncHttpStack
    val asyncNetwork = BasicAsyncNetwork.Builder(object : AsyncHttpStack() {
        override fun executeRequest(
            request: Request<*>?,
            additionalHeaders: MutableMap<String, String>?,
            callback: OnRequestComplete?
        ) {
            // 需求自己写怎么处理request
        }
    }).build()

总而言之,AsyncRequestQueue这个没人写教程,网上也没人提,看完源码才知道是干嘛的,不过思路仍是可以学习学习的。

总结

和okhttp3比较,volley仍是简略了许多,一个是没有okio那样的IO优化,网络部分交给了HttpURLConnection去处理(最杂乱的部分),缓存也就一条恳求一个文件,甚至缓存的body直接读到内存中,在异步线程上,volley还仅仅四个线程,并没有用到线程池进行办理。

当然,尽管volley没有okhttp那么强大,可是假如仅仅普通恳求,仍是能cover的,增加了泛型及数据的parse,可以自动线程转化,用起来仍是很便利的。