我为什么要记载这次 bug 剖析进程?由于此次剖析进程 3 ~ 4 小时,进程比较弯曲,可是最终的原因又很简略,让我觉得浪费了不少时间,假如其时换个思路就能更快的发现问题。一起也由于“一些原因”,让我对 RxJava3CallAdapterFactory 略有微辞。

故障现象

前提:工程里有 RxJava2 和 RxJava3 两个版别,为了一致决议全工程运用 RxJava3 版别。

代码改变内容很简略,将 RxJava2 的依靠悉数替换成 RxJava3 版别(也包括 Retrofit、Room 的 adapter 依靠)。

基础功能测验完毕,就去跑单元测验,然后单元测验成果失利了,失利的地方是网络恳求办法,反常如下:

记录一次 Bug 分析过程 关于「RxJavaCallAdapterFactory」
单元测验办法如下(类似代码演示):

@Test
fun login() {
    WanAndroidService.create()
        .login("1", "22")
        .test()
        .assertValueCount(1)
}
interface WanAndroidService {
  @FormUrlEncoded
  @POST("user/login")
  fun login(
    @Field("username") username: String,
    @Field("password") password: String
  ): Single<BaseRezhsponse<User>>
  companion object {
    private const val BASE_URL = "https://www.wanandroid.com/"
    fun create(): WanAndroidService {
      val client = OkHttpClient.Builder()
        .build()
      return Retrofit.Builder()
        .baseUrl(BASE_URL)
        .client(client)
        .addConverterFactory(GsonConverterFactory.create())
        .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
        .build()
        .create(WanAndroidService::class.java)
    }
  }
}

剖析原因

这两个类中的修正仅仅是将 RxJava2 版别更新为 RxJava3,RxJava3CallAdapterFactory 改变为 RxJava3CallAdapterFactory 。在问题出现后,我先将分支切换到 RxJava2 版别,发现测验是经过的,那问题能够承认便是在本次提交。莫非是 RxJava3 有什么机制改变?

剖析反常日志

Value counts differ; Expected: 1, Actual: 0 (latch = 1, values = 0, errors = 0, completions = 0)

预期数量为 1,实际为 0,所以抛出反常。这里有个 latch = 1 ,成功、失利、完成 数量都为 0,阐明这个 Single 订阅并没有履行完测验就结束了。来看看源码。

Single#test 办法会创立一个 TestObserver 并且直接订阅。

public final TestObserver<T> test() {
    TestObserver<T> ts = new TestObserver<T>();
    subscribe(ts);
    return ts;
}
public final U assertValueCount(int count) {
    int s = values.size();
    if (s != count) {
        throw fail("Value counts differ; Expected: " + count + ", Actual: " + s);
    }
    return (U)this;
}
protected final AssertionError fail(String message) {
    StringBuilder b = new StringBuilder(64 + message.length());
    b.append(message);
    b.append(" (")
    // 这里的 down 便是 CountDownLatch
    // 在 TestObserver 的 onComplete 和 onError 办法都会履行
    // down.countDown,办法较长就不列出来了,咱们能够自行查看
    .append("latch = ").append(done.getCount()).append(", ")
    .append("values = ").append(values.size()).append(", ")
    .append("errors = ").append(errors.size()).append(", ")
    .append("completions = ").append(completions)
    .append(')')
    ;
    AssertionError ae = new AssertionError(b.toString());
    if (!errors.isEmpty()) {
        if (errors.size() == 1) {
            ae.initCause(errors.get(0));
        } else {
            CompositeException ce = new CompositeException(errors);
            ae.initCause(ce);
        }
    }
    return ae;
}

从以上几段代码能够看到,假如 RxJava 正常履行,latch 应该为 0,那为什么这边会提前结束呢?阐明 RxJava 中切换了线程,而主线程提前结束了,所以 latch = 1

关于 CountDownLatch的机制,假如想具体了解的能够经过其他途径了解,我这边就简略说下。CountDownLatch是一个线程同步工具,其内部保护了一个计数器count,当count不为 0 时,一切调用 CountDownLatch#await() 办法的线程都会被阻塞,当 count 为 0 时,唤醒一切被阻塞线程。

为什么 RxJava 会切换线程

从代码能看到,这里并没有 subscribeOn 代码,理论上就应该是在主线程履行啊?那咱们先承认下,切换的是哪个线程,履行如下代码

fun main() {
    WanAndroidService.create()
        .login("1", "22")
        .subscribe({
            println("success:$it ${Thread.currentThread().name}")
        }) {
            println("error:$it ${Thread.currentThread().name}")
        }
}

输出成果:

success:BaseResponse(data=null, errorCode=-1, errorMsg=账号密码不匹配!) OkHttp https://www.wanandroid.com/...

OkHttp 的线程,那我能不能指定履行线程呢,试试看。

fun main() {
    WanAndroidService.create()
        .login("1", "22")
        .subscribeOn(Schedulers.io())
        .subscribe({
            println("success:$it ${Thread.currentThread().name}")
        }) {
            println("error:$it ${Thread.currentThread().name}")
        }
    Thread.sleep(4000)
}

输出成果仍是一样:

success:BaseResponse(data=null, errorCode=-1, errorMsg=账号密码不匹配!) OkHttp https://www.wanandroid.com/...

我不由对我此前的 RxJava 经历陷入了置疑,竟然指定线程不收效?RxJava3 跟 RxJava2 有差异?根本没这个或许,作为全球受众极广的技术结构,不或许有这种差异还没人评论。这咋办呢?只能先看看 OkHttp 源码,看看在什么地方创立线程吧。

OkHttp 源码

OkHttp#Call 履行网络恳求的办法有两个,一个同步一个异步

fun execute(): Response
fun enqueue(responseCallback: Callback)

enqueue 办法调用时不需求切线程,由于其内部履行网络恳求操作时会切换到子线程,在调用 Callback 回调时再切换到主线程,而 execute 办法不会切线程,是同步办法。所以切换线程的操作肯定是在这个办法内,咱们来看看这个办法,这个办法在 Call 的子类 RealCall中。

override fun enqueue(responseCallback: Callback) {
  check(executed.compareAndSet(false, true)) { "Already Executed" }
  callStart()
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}

这个办法将 responseCallback 包装到 AsyncCall中,再将其入队到 client.dispatcher,而 AsyncCall 是一个 Runnable,所以应该能猜到,client.dispatcher 便是一个线程调度器。

internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
    readyAsyncCalls.add(call)
    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.call.forWebSocket) {
      val existingCall = findExistingCallWithHost(call.host)
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
  }
  promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()
  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {//循环遍历 executableCalls,executableCalls里正是`AsyncCall`调集
      val asyncCall = i.next()
      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
      i.remove()
      asyncCall.callsPerHost.incrementAndGet()
      executableCalls.add(asyncCall)//asynCall 加到 executableCalls
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }
  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService) //真实履行恳求操作的地方
  }
  return isRunning
}

这里主要看 promoteAndExecute办法,这个办法会遍历可履行的 asnycCall, asyncCall.executeOn(executorService) 这段代码的意思是在指定的线程池中履行 asyncCall

fun executeOn(executorService: ExecutorService) {
    client.dispatcher.assertThreadDoesntHoldLock()
    var success = false
    try {
      executorService.execute(this)
      success = true
    } catch (e: RejectedExecutionException) {
      val ioException = InterruptedIOException("executor rejected")
      ioException.initCause(e)
      noMoreExchanges(ioException)
      responseCallback.onFailure(this@RealCall, ioException)
    } finally {
      if (!success) {
        client.dispatcher.finished(this) // This call is no longer running!
      }
    }
  }
  override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
      var signalledCallback = false
      timeout.enter()
      try {
        val response = getResponseWithInterceptorChain()
        signalledCallback = true
        responseCallback.onResponse(this@RealCall, response)
      } catch (e: IOException) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
        } else {
          responseCallback.onFailure(this@RealCall, e)
        }
      } catch (t: Throwable) {
        cancel()
        if (!signalledCallback) {
          val canceledException = IOException("canceled due to $t")
          canceledException.addSuppressed(t)
          responseCallback.onFailure(this@RealCall, canceledException)
        }
        throw t
      } finally {
        client.dispatcher.finished(this)
      }
    }
  }
}

看到了,看到了,threadName("OkHttp ${redactedUrl()}") 这不便是刚刚看到的线程名么?那思路便是没错,确实是履行了Call#enque 才会导致切换了新线程。可是在 RxJava2 中履行,我记住是会抛出不能在主线程履行网络操作的反常呀。那就只有一个或许了,RxJava3 精确的说是 RxJava3CallAdapterFactory一定跟 RxJava2CallAdapterFactory 有差异,比照看一下。

记录一次 Bug 分析过程 关于「RxJavaCallAdapterFactory」

记录一次 Bug 分析过程 关于「RxJavaCallAdapterFactory」

这便是破案了呀,结构参数isAsync 在 RxJava2 为 fasle, 在 RxJava3 中为 true,然后这个参数在 RxJavaCallAdapter 中用来创立不同的 Observer

public Object adapt(Call<R> call) {
  Observable<Response<R>> responseObservable =
      isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call);
      ......
  }

创立了 EnqueObservable 和 ExecuteObservable,相应去履行 Call#enque Call#execute 两个办法。到这边其实就定位到问题根本原因了,便是 RxJava2CallAdapterFactoryRxJava3CallAdapterFactory 的 create 办法不一样,前者是同步,后者是异步。

考虑

为什么两个版别要做这个调整?

public static RxJava2CallAdapterFactory create() {
  return new RxJava2CallAdapterFactory(null, false);
}
public static RxJava2CallAdapterFactory createAsync() {
  return new RxJava2CallAdapterFactory(null, true);
}
public static RxJava3CallAdapterFactory create() {
  return new RxJava3CallAdapterFactory(null, true);
}
public static RxJava3CallAdapterFactory createSynchronous() {
  return new RxJava3CallAdapterFactory(null, false);
}

能够看到旧版别 create 创立的是同步恳求,而新版别变成了异步。我大概能猜到作者的意图,作者应该是觉得:网络恳求必须在子线程,create办法又是最常用的办法,假如是同步恳求,那需求开发者去指定订阅 Scheduler,这个作业有点重复。假如是异步恳求,则开发者不管指定不指定 Scheduler最终都会在 OkHttp的线程履行,虽然由同步变成了异步,不过在大多数状况开发者是无感知的,不会影响前史代码,而假如开发者看到了 create 的改变,则下次履行 Retrofit 恳求或许就不会去指定 Scheduler了。也许是这样吧。

确实,上述单元测验没过的代码,在业务代码中履行是没有问题的。可是当一段代码在大多数时候履行没有问题,然后在某一刻出现不可思议的反常时,往往网络上的相关资料是比较少的,在定位问题时就会走一些弯路。Square在全世界的 Android 开发者中有着无足轻重的地位,这个改变他们团队应该也是深思熟虑的决议。只不过我个人变成了遇到问题的少数人,当定位到问题后仍是有些许震惊,希望能帮到有类似疑惑的人把。

总结

在今后的作业中,遇到类似问题,真不能着急自我否定,仍是着手于当前的改变内容,剖析差异。假如这个问题,我早点看到改变类的create办法,或许半小时就搞定了。