【源码共读】大并发量如何控制并发数

【源码共读】大并发量如何控制并发数

我正在参加会员专属活动-源码共读第一期,点击参加

在常见的网络请求的过程中,咱们都会遇到并发的状况,如果一次性建议过多的请求,会导致服务器压力过大,甚至会导致服务器溃散,所以咱们需要操控并发的数量,这样才干保证服务器的正常运转。

今天带来的便是并发操控的库:p-limit

运用

依据README的介绍,咱们能够经过p-limit来创立一个约束并发的函数,然后经过这个函数来履行咱们的异步使命。

import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
   limit(() => fetchSomething('foo')),
   limit(() => fetchSomething('bar')),
   limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);

源码剖析

咱们先来看一下p-limit的源码:

import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
   if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
      throw new TypeError('Expected `concurrency` to be a number from 1 and up');
   }
   const queue = new Queue();
   let activeCount = 0;
   const next = () => {
      activeCount--;
      if (queue.size > 0) {
         queue.dequeue()();
      }
   };
   const run = async (fn, resolve, args) => {
      activeCount++;
      const result = (async () => fn(...args))();
      resolve(result);
      try {
         await result;
      } catch {}
      next();
   };
   const enqueue = (fn, resolve, args) => {
      queue.enqueue(run.bind(undefined, fn, resolve, args));
      (async () => {
         // This function needs to wait until the next microtask before comparing
         // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
         // when the run function is dequeued and called. The comparison in the if-statement
         // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
         await Promise.resolve();
         if (activeCount < concurrency && queue.size > 0) {
            queue.dequeue()();
         }
      })();
   };
   const generator = (fn, ...args) => new Promise(resolve => {
      enqueue(fn, resolve, args);
   });
   Object.defineProperties(generator, {
      activeCount: {
         get: () => activeCount,
      },
      pendingCount: {
         get: () => queue.size,
      },
      clearQueue: {
         value: () => {
            queue.clear();
         },
      },
   });
   return generator;
}

加上注释和换行只要68行代码,非常简单,咱们来一行一行的剖析:

能够看到最开端就导入了yocto-queue这个库,这个库之前有剖析过:【源码共读】yocto-queue 一个微型行列数据结构

这个库便是一个行列的数据结构,不明白的能够直接将这个理解为数组就好;

跟着运用的代码来看,最开端便是经过pLimit来创立一个约束并发的函数,这个函数接纳一个参数concurrency,然后回来一个函数,来看看这一步的代码:

function pLimit(concurrency) {
   if (
        !((Number.isInteger(concurrency)
        || concurrency === Number.POSITIVE_INFINITY)
        && concurrency > 0)
    ) {
      throw new TypeError('Expected `concurrency` to be a number from 1 and up');
   }
   const generator = (fn, ...args) => new Promise(resolve => {
      enqueue(fn, resolve, args);
   });
   return generator;
}

首要这个函数接纳一个参数concurrency,然后判断这个参数是否是一个大于0的整数,如果不是就抛出一个过错;

回来的函数很简单,便是接纳一个函数fn和参数args,然后回来一个Promise;

然后调用回来的generator函数就会履行enqueue函数,对应的代码如下:

const enqueue = (fn, resolve, args) => {
    queue.enqueue(run.bind(undefined, fn, resolve, args));
    (async () => {
        // This function needs to wait until the next microtask before comparing
        // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
        // when the run function is dequeued and called. The comparison in the if-statement
        // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
        await Promise.resolve();
        if (activeCount < concurrency && queue.size > 0) {
            queue.dequeue()();
        }
    })();
};

这个函数接纳三个参数,fnresolveargs,然后将run函数放入行列中;

这儿run运用bind是因为并不需要当即履行,参阅:function.prototype.bind()

然后当即履行一个异步函数,这个里边首要会等候下一个微使命,注释解释了这个原因,因为activeCount是异步更新的,所以需要等候下一个微使命才干获取到最新的值;

然后判断activeCount是否小于concurrency,而且行列中有使命,如果满足条件就会将行列中的使命取出来履行,这一步便是并发的中心了;

这儿的queue.dequeue()()履行的是run函数,这儿容易理解过错,所以框起来。

接下来看看run函数:

const run = async (fn, resolve, args) => {
    activeCount++;
    const result = (async () => fn(...args))();
    resolve(result);
    try {
        await result;
    } catch {
    }
    next();
};

run函数便是用来履行异步并发使命的;

首要activeCount加1,表明当时并发数加1;

然后履行fn函数,这个函数便是咱们传入的异步函数,然后将结果赋值result,留意现在的result是一个处在pending状况的Promise

然后将result传入resolve函数,这个resolve函数便是enqueue函数中回来的Promiseresolve函数;

然后等候result的状况产生改动,这儿运用了try...catch,因为result可能会出现异常,所以需要捕获异常;

最后履行next函数,这个函数便是用来处理并发数的,对应的代码如下:

const next = () => {
    activeCount--;
    if (queue.size > 0) {
        queue.dequeue()();
    }
};

首要activeCount减1,表明当时并发数减1;

然后判断行列中是否还有使命,如果有就取出来履行;

queue.dequeue()能够理解为[].shift(),取出行列中的第一个使命,因为确认里边是一个函数,所以直接履行就能够了;

最后面还看到了运用Object.definePropertiesgenerator函数添加了几个特点,来看看:

Object.defineProperties(generator, {
    activeCount: {
        get: () => activeCount,
    },
    pendingCount: {
        get: () => queue.size,
    },
    clearQueue: {
        value: () => {
            queue.clear();
        },
    },
});
  • activeCount:当时并发数
  • pendingCount:行列中的使命数
  • clearQueue:清空行列

这些特点都是只读的,能够让咱们在外部知道当时的并发数和行列中的使命数,而且手动清空行列;

动手完成

接下来咱们来动手完成一个,抛开行列直接运用数组 + class完成一个简易版:

class PLimit {
    constructor(concurrency) {
        this.concurrency = concurrency;
        this.activeCount = 0;
        this.queue = [];
        return (fn, ...args) => {
            return new Promise(resolve => {
               this.enqueue(fn, resolve, args);
            });
        }
    }
    enqueue(fn, resolve, args) {
        this.queue.push(this.run.bind(this, fn, resolve, args));
        (async () => {
            await Promise.resolve();
            if (this.activeCount < this.concurrency && this.queue.length > 0) {
                this.queue.shift()();
            }
        })();
    }
    async run(fn, resolve, args) {
        this.activeCount++;
        const result = (async () => fn(...args))();
        resolve(result);
        try {
            await result;
        } catch {
        }
        this.next();
    }
    next() {
        this.activeCount--;
        if (this.queue.length > 0) {
            this.queue.shift()();
        }
    }
}

【源码共读】大并发量怎么操控并发数

一共十个并发的使命,每个使命花费 2秒,操控并发数为 2 时,一共花费 10秒。

总结

这篇文章主要介绍了Promise的并发操控,主要是经过行列来完成的。

并发操控的中心便是操控并发数,所以咱们需要一个行列来存储使命,然后操控并发数,当并发数小于最大并发数时,就从行列中取出使命履行,这样就能够操控并发数了。

等候上一个使命的履行经过await来完成,这样就能够保证每次都只要可控的并发数在履行。

代码量并不多,可是内部的操作还有细节处理都是知识点。