面试可能会问题的问题
Rxjava
- Rxjava常用操作符
- map和flatMap有什么差异
- Rxjava1.0和Rxjava2.0有什么差异?
- subscribeOn与observeOn屡次履行会怎样样?
- Rxjava是怎样切回到主线程的
协程
- 进程、线程、协程的差异
- 什么回调地狱以及协程在这方面的处理
- 开发中怎样挑选合适的调度器
Rxjava
Rxjava常用操作符
-
map()
操作符:用于将流中的每个元素经过一个函数转换为另一个元素。 -
flatMap()
操作符:用于将流中的每个元素经过一个函数转换为多个元素,并将这些元素组合成一个新的流。 -
filter()
操作符:用于过滤流中的元素,只保存契合条件的元素。 -
take()
操作符:用于从流中取前 n 个元素。 -
reduce()
操作符:用于将流中的元素经过一个函数进行累加,得到一个终究成果。 -
scan()
操作符:用于将流中的元素经过一个函数进行累加,得到每一步的中间成果。 -
concat()
操作符:用于将多个流组合成一个新的流。 -
merge()
操作符:用于将多个流合并成一个新的流。 -
zip()
操作符:用于将多个流中的元素按次序一一组合成一个新的元素,并形成一个新的流。
–debounce()
操作符:用于过滤流中发射过快的元素,只保存一个元素。
map和flatMap有什么差异
-
map
和flatMap
都能够用来对数据流中的数据进行改换,但它们的完成方法有所不同。map 只进行一次改换,并将改换后的成果发射出去,而 flatMap 则进行屡次改换,并将得到的 Observable 合并成一个新的 Observable 发射出去
在源码层面,map 操作符的完成十分简略,它实际上便是在原有的 Observable 上添加了一个新的 MapObservable 观察者,并将改换函数作为参数传递给 MapObservable。在 MapObservable 的 onNext 方法中,会将接收到的元素传递给改换函数进行改换,并将改换后的成果作为新的元素发射出去。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
在源码层面,flatMap 操作符的完成相对比较复杂。它实际上是在原有的 Observable 上添加了一个新的 FlatMapObservable 观察者,并将改换函数作为参数传递给 FlatMapObservable。在 FlatMapObservable 的 onNext 方法中,会将接收到的元素传递给改换函数进行改换,并得到一个新的 Observable。然后,它会将这个新的 Observable 注册到一个 FlatMapSubscriber 中,等候下一次数据的到来。当所有数据都处理完成后,FlatMapObservable 会调用 FlatMapSubscriber 的 onComplete 方法,将所有得到的 Observable 合并成一个新的 Observable,并将它发送给下流的观察者。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false, bufferSize(), bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
ScalarCallable<T> scalarCallable = (ScalarCallable<T>)this;
R r = scalarCallable.call();
if (r == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(r, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
Rxjava1.0和Rxjava2.0有什么差异?
-
改善的反常处理:RxJava 2.0 改善了反常处理机制,使得开发者能够更好地处理反常,防止应用程序溃散。
-
新的操作符:RxJava 2.0 引入了一些新的操作符,如 Flowable,Single 和 Completable,来取代旧版本的 Observable。这些新的操作符能够更好地处理背压(backpressure)和错误处理。
-
改善的背压支撑:RxJava 2.0 引入了更好的背压支撑,能够更好地处理在数据源发送很多数据时的状况。
-
改善的线程调度:RxJava 2.0 改善了线程调度机制,使得开发者能够更好地操控并发性。
-
更好的功能:RxJava 2.0 在功能上也有所提高,能够更好地处理很多数据流。
总的来说,RxJava 2.0 在反常处理、背压支撑、线程调度和功能等方面都有所改善和提高
什么是背压?怎样改善的?
背压(Backpressure)是指当数据发生速度大于消费速度,程序处理不过来是音讯就会呈现堆积。然后导致内存溢出、程序溃散等问题。这种状况被称为背压问题
逻辑上的改善方法
- 生产者数量=顾客数量
- 节省,丢掉一部分请求
- 打包,把所有事情封装在一个集合中发送
Rxjava1.x的时分没有对背压的支撑,只提供了onBackpressureBuffer(time)、onBackpressureDrop() 等)来缓解背压问题,但这些解决方案都仅仅对数据流进行了缓存或者丢掉处理
RxJava 2.0后 引入了新的数据类型 Flowable
,它支撑背压,并提供了更多的背压操控战略。
Flowable 类型是一个支撑背压的数据源,能够经过 onBackpressureBuffer
,onBackpressureDrop
,onBackpressureLatest
等方法来处理背压问题。其间
-
onBackpressureBuffer
战略会在内存中缓存数据,直到顾客能够消费这些数据; -
onBackpressureDrop
战略会在数据流中丢掉一部分数据,直到顾客能够消费; -
onBackpressureLatest
战略会只保存最新的数据,丢掉旧数据。
别的Flowable
的方法和 Observable
相似,仅仅Flowable
在运用的时分需求注意要拟定背压战略。
subscribeOn
与observeOn
屡次履行会怎样样?
结论:subscribeOn
只跟第一次指定的线程有关,履行屡次跟最终一次有关。
-
subscribeOn
只要第一次会收效,所以只跟第一次指定的线程有关。
当咱们在一个 Observable
中运用多个 subscribeOn
操作符时,它们的履行次序只会影响到代码中的次序,但实际上只要第一个 subscribeOn
会收效。原因是在 ObservableSubscribeOn
类的完成中,只会在第一个 subscribeOn
操作符中调用 scheduler.scheduleDirect
方法,后面的 subscribeOn
操作符调用该方法也会被拦截,也就不会改动 Observable 的履行线程。这便是为什么在同一个 Observable 中运用多个 subscribeOn
操作符时,只要第一个 subscribeOn
会收效的原因。
//Observable.java
@Override
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
//-----------------
// ObservableSubscribeOn.java
@Override
public void subscribeActual(Observer<? super T> observer) {
if (once) {
source.subscribe(observer);
return;
}
once = true;
Scheduler scheduler = this.scheduler;
SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent, source)));
}
-
observeOn
履行屡次跟最终一次有关。
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker worker = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, worker, delayError, bufferSize));
}
其实比较好了解,subscribeOn了解为一个管道的进口,observeOn 了解为一个管道的出口。数据进去之后就没方法指定了,但是数据出来之前都能够在切换出口
Rxjava是怎样切回到主线程的
运用observeOn(AndroidSchedulers.mainThread())
,内部的完成其实是new Handler(Looper.getMainLooper())
public class MainThreadScheduler extends Scheduler {
private static MainThreadScheduler INSTANCE;
private MainThreadScheduler() {}
public static MainThreadScheduler instance() {
if (INSTANCE == null) {
INSTANCE = new MainThreadScheduler();
}
return INSTANCE;
}
@NonNull
@Override
public Worker createWorker() {
return new MainThreadWorker(new Handler(Looper.getMainLooper()));
}
private static class MainThreadWorker extends Worker {
private final Handler mHandler;
MainThreadWorker(Handler handler) {
mHandler = handler;
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable runnable) {
mHandler.post(runnable);
return Disposables.empty();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable runnable, long delay, @NonNull TimeUnit unit) {
mHandler.postDelayed(runnable, unit.toMillis(delay));
return Disposables.empty();
}
@Override
public void dispose() {}
@Override
public boolean isDisposed() {
return false;
}
}
}
协程
进程、线程、协程的差异
- 进程(Process)是指操作系统中的一个履行单位,它有自己独立的内存空间和资源,能够履行独立的程序,是程序运转的基本单位。一个进程能够包括多个线程。
- 线程(Thread)是进程中的一个履行单元,它同享进程的内存空间和资源,但具有独立的履行序列和运转堆栈。一个进程能够包括多个线程,线程之间能够并发履行,完成多使命处理。
- 协程(Coroutine)是一种用户态的轻量级线程,由程序员自己操控调度,而不是由操作系统操控。协程能够在同一线程中完成并发履行,利用时间片轮转算法切换使命,防止了线程上下文切换带来的开销,能够提高程序的履行效率。
Kotlin 的协程是依据 Kotlin 规范库中的协程结构完成的。该结构依据一种称为“挂起函数”的特别函数类型完成,这些函数能够暂停履行并在稍后的某个时分康复履行,然后完成了协程的作用。不依赖于操作系统和编译器。
什么回调地狱以及协程在这方面的处理
回调地狱指的是在异步编程中,假如屡次嵌套运用回调函数来处理异步操作,会形成代码的可读性和可维护性变差,代码逻辑难以了解和调试的状况。举个比方
getUserInfo(userId) { user ->
getUserOrders(user.id) { orders ->
for (order in orders) {
getItems(order.id) { items ->
for (item in items) {
processItem(item) { result ->
saveResult(result) {
// ...
}
}
}
}
}
}
}
协程中的挂起函数写法是
suspend fun processOrders(userId: String) = withContext(Dispatchers.IO) {
val user = getUserInfo(userId)
val orders = getUserOrders(user.id)
for (order in orders) {
val items = getItems(order.id)
for (item in items) {
val result = processItem(item)
saveResult(result)
}
}
}
运用
withContext
能够指定协程履行的上下文,这儿运用了 IO 线程池,防止了主线程的堵塞。
开发中怎样挑选合适的调度器
其实无非便是三个,一个主线程,一个io密集型,一个cpu密集型
rxjava中的调度器
-
Schedulers.io()
:用于 I/O 密集型使命,比方网络请求等。 -
Schedulers.computation()
:用于 CPU 密集型使命,比方相似视频编解码这种很多的计算和数据处理等。 -
Schedulers.newThread()
:每次都创立一个新线程,不引荐运用。 -
AndroidSchedulers.mainThread()
:用于 Android 平台的 UI 线程。
对应kotlin协程里面的是
-
Dispatchers.Default
:合适履行 CPU 密集型使命的调度器,它会主动依据可用的 CPU 数量进行调度。 -
Dispatchers.IO
:合适履行 I/O 密集型使命的调度器,比方网络请求和磁盘 I/O 等。 -
Dispatchers.Main
:合适在 Android 应用程序中履行 UI 操作的调度器。在 Android 应用程序中,Main 调度器会将协程的履行切换到主线程上。 -
Dispatchers.Unconfined
:一个不受约束的调度器,它答应协程在调用挂起函数的线程中继续履行。运用这个调度器时需求特别小心,因为它可能会导致一些古怪的行为。