速率约束

实际国际中的用户是残暴的,而且没耐性,充满着各种不确定性。在高并发体系中,或许会呈现服务器被虚假恳求轰炸的状况,因此您或许希望操控这种状况。

一些实际运用情形或许如下所示:

  1. API配额办理-作为提供者,您或许希望根据用户的付款状况约束向服务器宣布API恳求的速率。这能够在客户端或服务端完结。
  2. 安全性-防止DDOS攻击。
  3. 本钱操控–这对服务方甚至客户方来说都不是必需的。假如某个组件以十分高的速率宣布一个事情,它或许有助于操控它,它或许有助于操控从客户端发送的遥测。

限速处理时的选项

根据咱们处理的恳求/事情类型,或许会产生以下状况:

  1. 咱们能够抛弃额外的恳求
  2. 咱们能够选择让恳求等候,直到体系将它们降低到预界说的速率。

常用限速算法

  1. 令牌桶算法
  2. 漏桶算法

咱们将不深化讨论这些算法的内部细节,因为这超出了本文的范围。

咱们将以令牌桶算法为中心。其要求如下。

令牌桶算法根据以固定速率添加令牌的固定容量桶的类比。在答应API继续之前,将检查桶,以检查它当时是否包括至少一个令牌。假如令牌存在,则进行API调用。假如不是,则丢掉该消息/或使其等候。

需求

  1. 应该能够承受每秒所需的(TPS)业务或速率。
  2. 假如超过咱们界说的比率,则应抛弃买卖。
  3. 应该在同时产生的状况下起作用。

高级功用(在后续文章中完结)

  1. 应该能够滑润突发的恳求。例如,假如咱们将TPS界说为5,而且所有五个恳求都在同一时间到达,那么它应该能够以固定的时间间隔将它们排成一行,即以200ms的时间间隔履行每个恳求。它需要一个内部定时电路。
  2. 假如咱们的TPS为5,而且在其间一个1秒的时段中,咱们鄙人一秒只运用3个代币,那么咱们应该能够提供5+2 = 7个代币作为奖赏。但速率为每个令牌1/7(142.28ms)。奖金不应结转到下一个插槽。

让咱们首先界说咱们的 速率约束器

/**
 * Rate limiter helps in limiting the rate of execution of a piece of code. The rate is defined in terms of
 * TPS(Transactions per second). Rate of 5 would suggest, 5 transactions/second. Transaction could be a DB call, API call,
 * or a simple function call.
 * <p>
 * Every {@link RateLimiter} implementation should implement either {@link RateLimiter#throttle(Code)} or, {@link RateLimiter#enter()}.
 * They can also choose to implement all.
 * <p>
 * {@link Code} represents a piece of code that needs to be rate limited. It could be a function call, if the code to be rate limited
 * spreads across multiple functions, we need to use entry() and exit() contract.
 */
public interface RateLimiter {
/**
     * Rate limits the code passed inside as an argument.
     *
     * @param code representation of the piece of code that needs to be rate limited.
     * @return true if executed, false otherwise.
     */
    boolean throttle(Code code);
    /**
     * When the piece of code that needs to be rate limited cannot be represented as a contiguous
     * code, then entry() should be used before we start executing the code. This brings the code inside the rate
     * limiting boundaries.
     *
     * @return true if the code will execute and false if rate limited.
     * <p
     */
    boolean enter();
    /**
     * Interface to represent a contiguous piece of code that needs to be rate limited.
     */
    interface Code {
        /**
         * Calling this function should execute the code that is delegated to this interface.
         */
        void invoke();
    }
}

咱们的 RateLimit有两组API:一个是throttle(code),另一个是enter()。这两种办法都满足相同的功用,但采用以下两种方法:

  1. boolean throttle(代码)-假如咱们有连续的代码,能够用来传递一个代码块。
  2. 布尔输入() – 通常能够在API、DB或任何咱们想要节省的调用之前运用。假如履行此代码后边的代码,则将回来 ,以及 假的假如它是速率受限的话。您能够将这些恳求排队或拒绝。

在生产环境中您永远不会看到节省(代码)完结,因为它不是最佳的。请在谈论中告知我原因。大多数速率约束器运用类似于enter()的API

中心功用

为了构建速率约束器的中心,咱们需要确保在任意两秒之间不答应超过N个业务。咱们将怎么做到这一点?

考虑咱们进行第一笔买卖的时间t0。 t0 .所以,
直到(t0 + 1)s,咱们只答应进行N次买卖。 (t0 + 1)s , we are allowed to make only N transactions.怎么确保这一点?鄙人次买卖时,咱们将检查
当时时间≤(t0 + 1)。.假如没有,那么这意味着咱们进入了不同的秒,而且咱们被答应进行N次买卖。 N transactions.让咱们看一小段代码,它演示了:

long now = System.nanoTime();
if (now <= mNextSecondBoundary) { // If we are within the time limit of current second
    if (mCounter < N) { // If token available
        mLastExecutionNanos = now;
        mCounter++; // Allocate token
        invoke(code); // Invoke the code passed the throttle method.
    }
}

那么,咱们怎么界说mNextSecondBoundary呢?这将在咱们进行第一个业务时完结,如前所述,咱们将在完结第一个业务的时间添加一秒。

if (mLastExecutionNanos == 0L) {
    mCounter++; // Allocate the very first token here.
    mLastExecutionNanos = System.nanoTime();
    mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;  // (10^9)
}

现在,假如咱们履行代码并看到咱们进入了不同的秒,咱们应该怎么做?咱们将经过重置上次履行时间、可用令牌数来增强前面的代码,并经过调用 节省阀()再一次。咱们的办法现已知道怎么处理新的秒。

@Override
public boolean throttle(Code code) {
    if (mTPS <= 0) {
        // We do not want anything to pass.
        return false;
    }
synchronized (mLock) {
        if (mLastExecutionNanos == 0L) {
            mCounter++;
            mLastExecutionNanos = System.nanoTime();
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            invoke(code);
            return true;
        } else {
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) {
                if (mCounter < mTPS) {
                    mLastExecutionNanos = now;
                    mCounter++;
                    invoke(code);
                    return true;
                } else {
                    return false;
                }
            } else {
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return throttle(code);
            }
        }
    }
}

在这个完结中,咱们能够传递需要节省的代码块,可是这个代码有一个问题。这将作业,但它会体现欠安。不推荐,但为什么呢?请在谈论中告知我。

现在,能够运用相同的构建块和enter()构建第二个API。咱们将运用相同的逻辑,但咱们不会履行办法内部的代码块。相反,它将在调用enter()之后履行,就像咱们履行状况办理一样。该办法的完结如下:

@Override
public boolean enter() {
    if (mTPS == 0L) {
        return false;
    }
synchronized (mBoundaryLock) {
        if (mLastExecutionNanos == 0L) {
            mLastExecutionNanos = System.nanoTime();
            mCounter++;
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            return true;
        } else {
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) {
                if (mCounter < mTPS) {
                    mLastExecutionNanos = now;
                    mCounter++;
                    return true;
                } else return false;
            } else {
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return enter();
            }
        }
    }
}

现在,咱们简单的速率约束器现已能够运用了。您能够检查完好的代码 这里。

结果

咱们将测验创立一个可创立六个线程的驱动程序代码。每个线程测验从0到100计数,推迟为50ms(能够设置为任何数字)。咱们将按如下方法发动咱们的限速器:

public static void main(String[] args) {
    RateLimiter limiter = new SimpleTokenBucketRateLimiter(1);
    Thread[] group = new Thread[6];
    Runnable r = () -> {
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (limiter.enter()) {
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
            }
        }
    };
for (int i = 0; i < 6; i++) {
        group[i] = new Thread(r);
        group[i].start();
    }
}

咱们的API不支持滑润业务,而是让业务等候下一个令牌被分配,而不是丢掉恳求。在拒绝它们之后,它回来false,所以假如咱们真的想的话,咱们能够把它们排队。

if (limiter.enter()) {
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
} else { // queue the work again }

Java:构建简单的速率限制器

这是TPS设置为1时的输出。

当咱们测验将TPS设置为 2咱们将看到以下输出:

Java:构建简单的速率限制器

真管用!

Android的视点看

  1. 考虑这样一种状况:您正在编写代码以捕获用户签名。当他们拖动指针时,您会捕获数千个点。滑润签名或许不需要所有这些参数,因此您运用速率约束进行采样。
  2. 一些事情调用频率很高。你能操控的。
  3. 咱们有MessageQueue的闲暇侦听器。当咱们在主线程中侦听它时,它被随意调用。有时候,它在一秒钟内被调用好几次。假如咱们想构建一个心跳体系来告知咱们主线程何时闲暇,咱们能够运用它来接收每秒的事情。假如咱们一秒钟内没有收到事情,咱们能够假定主线程处于繁忙状况。
  4. 对于您的结构/库的API配额办理,您能够根据用户选择的付款计划状况API调用。

今日先到这里吧。 咱们将在后续文章中构建一个更杂乱的速率约束器。