我正在参加会员专属活动-源码共读第一期,点击参加
在常见的网络请求的过程中,咱们都会遇到并发的状况,如果一次性建议过多的请求,会导致服务器压力过大,甚至会导致服务器溃散,所以咱们需要操控并发的数量,这样才干保证服务器的正常运转。
今天带来的便是并发操控的库:p-limit
运用
依据README
的介绍,咱们能够经过p-limit
来创立一个约束并发的函数,然后经过这个函数来履行咱们的异步使命。
import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);
源码剖析
咱们先来看一下p-limit
的源码:
import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = new Queue();
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
return generator;
}
加上注释和换行只要68行
代码,非常简单,咱们来一行一行的剖析:
能够看到最开端就导入了yocto-queue
这个库,这个库之前有剖析过:【源码共读】yocto-queue 一个微型行列数据结构
这个库便是一个行列的数据结构,不明白的能够直接将这个理解为数组就好;
跟着运用的代码来看,最开端便是经过pLimit
来创立一个约束并发的函数,这个函数接纳一个参数concurrency
,然后回来一个函数,来看看这一步的代码:
function pLimit(concurrency) {
if (
!((Number.isInteger(concurrency)
|| concurrency === Number.POSITIVE_INFINITY)
&& concurrency > 0)
) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
return generator;
}
首要这个函数接纳一个参数concurrency
,然后判断这个参数是否是一个大于0的整数,如果不是就抛出一个过错;
回来的函数很简单,便是接纳一个函数fn
和参数args
,然后回来一个Promise
;
然后调用回来的generator
函数就会履行enqueue
函数,对应的代码如下:
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
这个函数接纳三个参数,fn
、resolve
、args
,然后将run
函数放入行列中;
这儿run
运用bind
是因为并不需要当即履行,参阅:function.prototype.bind()
然后当即履行一个异步函数,这个里边首要会等候下一个微使命,注释解释了这个原因,因为activeCount
是异步更新的,所以需要等候下一个微使命才干获取到最新的值;
然后判断activeCount
是否小于concurrency
,而且行列中有使命,如果满足条件就会将行列中的使命取出来履行,这一步便是并发的中心了;
这儿的
queue.dequeue()()
履行的是run
函数,这儿容易理解过错,所以框起来。
接下来看看run
函数:
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {
}
next();
};
run
函数便是用来履行异步并发使命的;
首要activeCount
加1,表明当时并发数加1;
然后履行fn
函数,这个函数便是咱们传入的异步函数,然后将结果赋值给result
,留意现在的result
是一个处在pending
状况的Promise
;
然后将result
传入resolve
函数,这个resolve
函数便是enqueue
函数中回来的Promise
的resolve
函数;
然后等候result
的状况产生改动,这儿运用了try...catch
,因为result
可能会出现异常,所以需要捕获异常;
最后履行next
函数,这个函数便是用来处理并发数的,对应的代码如下:
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
首要activeCount
减1,表明当时并发数减1;
然后判断行列中是否还有使命,如果有就取出来履行;
queue.dequeue()
能够理解为[].shift()
,取出行列中的第一个使命,因为确认里边是一个函数,所以直接履行就能够了;
最后面还看到了运用Object.defineProperties
为generator
函数添加了几个特点,来看看:
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
-
activeCount
:当时并发数 -
pendingCount
:行列中的使命数 -
clearQueue
:清空行列
这些特点都是只读的,能够让咱们在外部知道当时的并发数和行列中的使命数,而且手动清空行列;
动手完成
接下来咱们来动手完成一个,抛开行列直接运用数组
+ class
完成一个简易版:
class PLimit {
constructor(concurrency) {
this.concurrency = concurrency;
this.activeCount = 0;
this.queue = [];
return (fn, ...args) => {
return new Promise(resolve => {
this.enqueue(fn, resolve, args);
});
}
}
enqueue(fn, resolve, args) {
this.queue.push(this.run.bind(this, fn, resolve, args));
(async () => {
await Promise.resolve();
if (this.activeCount < this.concurrency && this.queue.length > 0) {
this.queue.shift()();
}
})();
}
async run(fn, resolve, args) {
this.activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {
}
this.next();
}
next() {
this.activeCount--;
if (this.queue.length > 0) {
this.queue.shift()();
}
}
}
一共十个并发的使命,每个使命花费 2秒,操控并发数为 2 时,一共花费 10秒。
总结
这篇文章主要介绍了Promise
的并发操控,主要是经过行列来完成的。
并发操控的中心便是操控并发数,所以咱们需要一个行列来存储使命,然后操控并发数,当并发数小于最大并发数时,就从行列中取出使命履行,这样就能够操控并发数了。
等候上一个使命的履行经过await
来完成,这样就能够保证每次都只要可控的并发数在履行。
代码量并不多,可是内部的操作还有细节处理都是知识点。