前语
在开发一些杂乱的事务逻辑涉及多线程环境的情况下,或许对代码做一些优化的时候,咱们往往会涉及到一些多线程的战略“等候一切的线程使命履行完结后再进行下一步”,我将这种场景称为多使命的并发分合战略,或许多线程分合战略。这次就来简单聊聊有哪些办法能完成该战略。
正常做法
先写个Demo看正常情况下多线程的履行结果。
private fun initThreadPool() {
val threadPool =
ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))
Log.d("mmp", "========= task start")
threadPool.execute {
Log.d("mmp", "--------- A start")
taskA()
Log.d("mmp", "--------- A end")
}
threadPool.execute {
Log.d("mmp", "--------- B start")
taskB()
Log.d("mmp", "--------- B end")
}
threadPool.execute {
Log.d("mmp", "--------- C start")
taskC()
Log.d("mmp", "--------- C end")
}
Log.d("mmp", "========= task end")
}
private fun taskA() {
Thread.sleep(1000)
}
private fun taskB() {
Thread.sleep(2000)
}
private fun taskC() {
Thread.sleep(3000)
}
能够看到结果
这是一个正常情况下多线程分散履行的结果,要是想让使命有序,那便是履行完一个恳求之后再回调中再履行另一个恳求。
Log.d("mmp", "========= task start")
threadPool.execute {
Log.d("mmp", "--------- A start")
taskA()
Log.d("mmp", "--------- A end")
threadPool.execute {
Log.d("mmp", "--------- B start")
taskB()
Log.d("mmp", "--------- B end")
threadPool.execute {
Log.d("mmp", "--------- C start")
taskC()
Log.d("mmp", "--------- C end")
Log.d("mmp", "========= task end")
}
}
}
这样就能按照次序去履行使命,可是这种就不是一个并发的作用。
多线程并发分合场景
现在讲一种开发时会常常用到的多使命情况下的开发战略。完成并发的情况下并且在一切并发的使命履行完之后再一致做操作。
现在讲一种开发时会常常用到的多使命情况下的开发战略。完成并发的情况下并且在一切并发的使命履行完之后再一致做操作。
比如说我有这么一个需求,我有一个杂乱的页面,由于MVVM的设计去拆分子View,每个View有自己的情况,我这个页面要进行3个不同的网络恳求拿到页面数据,等悉数拿到之后显示在页面后,再去恳求一个弹窗的流程接口。或许举这个比如不是太好,反正意思便是等3次恳求完结之后,再去履行第四次恳求,这样说或许会比较容易解说这个场景。
由于大项目中,假如是一些事务逻辑,或许说优化点,就会用到这样的开发战略,由于我这边是不能直接拿我自己的项目的场景来举例,所以就随便想了上面的这个比如。小项目或许不会碰到这种场景,由于小项目连多线程的场景都少。这个模型大概的履行流程是这样。
这个战略我找不到一个具体的说法,表述上来说大致便是“并发处理使命,并等一切使命完结之后再进行下一步”,我称为为多使命的并发分合战略,也能够叫多线程分合战略。
1. 运用计数器
ok,要完成这个战略,其实办法很多,首要能够用一个计数器来完成吧,我每履行完一次使命,计数器就+1,当计数器到达使命数量时再进行下一步。
把上边的Demo代码改成这样
private val counter : AtomicInteger = AtomicInteger(0)
private fun initThreadPool() {
val threadPool =
ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))
Log.d("mmp", "========= task start")
threadPool.execute {
Log.d("mmp", "--------- A start")
taskA()
Log.d("mmp", "--------- A end")
counter.incrementAndGet()
nextTask()
}
threadPool.execute {
Log.d("mmp", "--------- B start")
taskB()
Log.d("mmp", "--------- B end")
counter.incrementAndGet()
nextTask()
}
threadPool.execute {
Log.d("mmp", "--------- C start")
taskC()
Log.d("mmp", "--------- C end")
counter.incrementAndGet()
nextTask()
}
}
private fun nextTask(){
if (counter.get() == 3){
Log.d("mmp", "========= task end")
}
}
能够看到这样就能到达咱们的作用了,可是如同不太高雅,并且是不是感觉有点古怪,古怪在于“每个使命履行完之后都会调用这个办法,然后由办法内部做判别”,虽然没问题,可是缺少高雅。
2. CountDownLatch
而关于这样的计划,其实java有供给一个类CountDownLatch,能够运用CountDownLatch直接完成咱们想要的作用。
private val counter = CountDownLatch(3)
private fun initThreadPool() {
val threadPool =
ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))
Log.d("mmp", "========= task start")
threadPool.execute {
Log.d("mmp", "--------- A start")
taskA()
Log.d("mmp", "--------- A end")
counter.countDown()
}
threadPool.execute {
Log.d("mmp", "--------- B start")
taskB()
Log.d("mmp", "--------- B end")
counter.countDown()
}
threadPool.execute {
Log.d("mmp", "--------- C start")
taskC()
Log.d("mmp", "--------- C end")
counter.countDown()
}
counter.await()
Log.d("mmp", "========= task end")
}
CountDownLatch为什么能完成这个作用呢?它内部其实是基于AQS,有爱好的能够看看我曾经写的这篇文章 /post/716801…
假如你想快速完成多使命的并发分合战略,CountDownLatch会是一个不错的选择,并且它是官方帮你封装好的,质量必定是有保证的。
3. wait/notify
可是咱们不能只会运用官方的东西,这样会禁锢咱们的思维,要知道,那些牛逼的结构,那些主流的结构,便是做到了官方想不到的工作。所以已然提到AQS,那又能联想到别的一个模型“等候通知范式”,用它也能来完成这个作用。
这个模型大概是这样的
synchronized (object){
while(条件不满足){
object.wait();
}
对应的处理逻辑
}
synchronized (object){
改动条件
object.notifyAll();
}
那运用在这个战略上,大概的写法能够是这样
private var counter: Int = 0
private val lock = java.lang.Object()
private fun initThreadPool() {
val threadPool =
ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))
Log.d("mmp", "========= task start")
threadPool.execute {
Log.d("mmp", "--------- A start")
taskA()
Log.d("mmp", "--------- A end")
synchronized(lock) {
counter++
lock.notifyAll()
}
}
threadPool.execute {
Log.d("mmp", "--------- B start")
taskB()
Log.d("mmp", "--------- B end")
synchronized(lock) {
counter++
lock.notifyAll()
}
}
threadPool.execute {
Log.d("mmp", "--------- C start")
taskC()
Log.d("mmp", "--------- C end")
synchronized(lock) {
counter++
lock.notifyAll()
}
}
synchronized(lock) {
while (counter < 3) {
lock.wait()
}
Log.d("mmp", "========= task end")
}
}
看到是能完成这么一个作用,可是作用如同和计数器一样,是的,有一部分的解题思路是用了计数器,可是另一半的思路,第一个比如是每个使命履行完结后都调用一个办法,这个比如是经过wait/notify。已然如此,这儿扩展一下提出一个问题,为什么第一个比如中counter目标用了AtomicInteger而这儿直接用Int?
当然留意一下,主线程中可不能直接这样玩。
4. 协程
ok,Java的东西玩完了,咱们来玩一点kotlin的东西,我想都不用想,我就觉得协程应该是能快速的完成这样的战略。把代码改成运用协程的方法。
lifecycle.coroutineScope.launch {
Log.d("mmp", "========= task start")
val joinA = GlobalScope.launch{
Log.d("mmp", "--------- A start")
taskA()
Log.d("mmp", "--------- A end")
}
val joinB = GlobalScope.launch{
Log.d("mmp", "--------- B start")
taskB()
Log.d("mmp", "--------- B end")
}
val joinC = GlobalScope.launch{
Log.d("mmp", "--------- C start")
taskC()
Log.d("mmp", "--------- C end")
}
joinA.join()
joinB.join()
joinC.join()
Log.d("mmp", "========= task end")
能够看到运用协程来完成仍是很方便的。
5. flow
已然运用协程能完成这个作用,那相同的运用flow必定也能完成。 声明一下,flow用得不多,我这个写法或许不太好,仅仅用于演示,假如各位大佬有比较好的写法,也能够分享让我学习一下。
private fun initThreadPool() {
runBlocking {
Log.d("mmp", "========= task start")
val actionA = flow {
emit(taskA())
}.flowOn(Dispatchers.IO)
val actionB = flow {
emit(taskB())
}.flowOn(Dispatchers.IO)
val actionC = flow {
emit(taskC())
}.flowOn(Dispatchers.IO)
actionA.zip(actionB){_,_ ->
}.zip(actionC){_,_ ->
}.collect{
Log.d("mmp", "========= task end")
}
}
}
private fun taskA() {
Log.d("mmp", "--------- A start")
Thread.sleep(1000)
Log.d("mmp", "--------- A end")
}
private fun taskB() {
Log.d("mmp", "--------- B start")
Thread.sleep(2000)
Log.d("mmp", "--------- B end")
}
private fun taskC() {
Log.d("mmp", "--------- C start")
Thread.sleep(3000)
Log.d("mmp", "--------- C end")
}
ok,这样便是用flow完成这个作用,当然这写法是有点不漂亮,我是必定信任flow能有更漂亮的写法来完成这个作用,主要我用得不多,所以这儿就献丑了。当然已然flow能完成的话,rxjava等办法也相同能完成这个作用,这儿就不再演示了。
总结
总结一下,其实这儿主要是拿多使命并发的一个场景进行分析举例。咱们往常的开发中,是有一些比较好的战略、思考去处理一些问题,能够是完成某些功能,也能够是做某些优化,由于我最近看文章的话会觉得写一些开发战略、思维的文章会比较少。我觉得这对开发来说也是比较重要的一个东西,先有了一些处理问题的思路之后(这很重要),然后你能够围绕这个思路,有很多种方法去完成。