前语
介绍高功能行列Disruptor原理以及运用比如。
Disruptor是什么?
Disruptor是外汇和加密货币交易所运营商 LMAX group 建立高功能的金融交易所的成果。用于处理出产者、顾客及其数据存储的规划问题的高功能行列完成。能够对标JDK中的ArrayBlockingQueue。是目前单机且依据内存存储的最高功能的行列完成。见 与ArrayBlockingQueue功能对比。
Disruptor高功能秘诀
运用CAS替代锁
锁非常贵重,因为它们在竞赛时需求裁定。这种裁定是经过到操作系统内核的上下文切换来完成的,该内核将挂起等候锁的线程,直到它被开释。系统供给的原子操作CAS(Compare And Swap/Set)是很好的锁替代计划,Disruptor中同步便是运用的这种。
比如多出产者形式中com.lmax.disruptor.MultiProducerSequencer便是用了Java里sun.misc.Unsafe类依据CAS完成的API。
等候战略com.lmax.disruptor.BlockingWaitStrategy运用了依据CAS完成的ReentrantLock。
独占缓存行
为了进步效率CPU硬件不会以字节或字为单位移动内存,而是以缓存行,通常大小为 32-256 字节的缓存行,最常见的缓存行是 64 字节。这意味着,假如两个变量在同一个缓存行中,而且由不同的线程写入,那么它们会出现与单个变量相同的写入争用问题。为了获得高功能,假如要最小化争用,那么确保独立但一起写入的变量不同享相同的缓存行是很重要的。
比如com.lmax.disruptor.RingBuffer中特点前后都用未赋值的long来独占。com.lmax.disruptor.SingleProducerSequencerPad也有相同处理方法。
环形行列
- 运用有界行列,削减线程争用
行列比较链表在拜访速度上占据优势,而有界行列比较可动态扩容的无界行列则防止扩容产生的同步问题效率更高。Disruptor和JDK中的ArrayBlockingQueue相同运用有界行列。行列长度要设为2的n次幂,有利于二进制计算。
- 运用环形数组,防止出产和消费速度差异导致行列头和尾争用
Disruptor在逻辑大将数组的的头尾看成是相连的,即一个环形数组(RingBuffer)。
- Sequence
出产和消费都需求维护自增序列值(Sequence),从0开端。
出产方只维护一个代表出产的最终一个元素的序号。代表出产的最终一个元素的序号。每次向Disruptor发布一个元素都调用Sequenced.next()来获取下个方位的写入权。
在单出产者形式(SINGLE)因为不存在并发写入,则不需求处理同步问题。在多出产者形式(MULTI)就需求借助JDK中依据CAS(Compare And Swap/Set)完成的API来确保线程安全。
多个顾客各自维护自己的消费序列值(Sequence)保存数组中。
而环形经过与运算(sequence & indexMask)完成的,indexMask便是环形行列的长度-1。以环形行列长度8为例,第9个元素Sequence为8,8 & 7 = 0,刚好又回到了数组第1个方位。
见com.lmax.disruptor.RingBuffer.elementAt(long sequence)
预分配内存
环形行列存放的是Event目标,而且是在Disruptor创立的时候调用EventFactory创立并一次将行列填满。Event保存出产者出产的数据,消费也是经过Event获取,后续出产则只需求替换掉Event中的特点值。这种方法防止了重复创立目标,下降JVM的GC产频率。
见com.lmax.disruptor.RingBuffer.fill(EventFactory eventFactory)
顾客8种等候战略
当消费速度大于出产速度状况下,顾客履行的等候战略。
战略类名 | 描绘 |
---|---|
BlockingWaitStrategy(常用) | 运用ReentrantLock,失败则进入等候行列等候唤醒重试。当吞吐量和低推迟不如CPU资源重要时运用。 |
YieldingWaitStrategy(常用) | 测验100次,全失败后调用Thread.yield()让出CPU。该战略将运用100%的CPU,假如其他线程请求CPU资源,这种战略更简单让出CPU资源。 |
SleepingWaitStrategy(常用) | 测验200次 。前100次直接重试,后100次每次失败后调用Thread.yield()让出CPU,全失败线程睡觉(默许100纳秒 )。 |
BusySpinWaitStrategy | 线程一直自旋等候,比较耗CPU。最好是将线程绑定到特定的CPU核心上运用。 |
LiteBlockingWaitStrategy | 与BlockingWaitStrategy相似,差异在增加了原子变量signalNeeded,假如两个线程一起别离拜访waitFor()和signalAllWhenBlocking(),能够削减ReentrantLock加锁次数。 |
LiteTimeoutBlockingWaitStrategy | 与LiteBlockingWaitStrategy相似,差异在于设置了阻塞时刻,超过时刻后抛反常。 |
TimeoutBlockingWaitStrategy | 与BlockingWaitStrategy相似,差异在于设置了阻塞时刻,超过时刻后抛反常。 |
PhasedBackoffWaitStrategy | 依据时刻参数和传入的等候战略来决定运用哪种等候战略。当吞吐量和低推迟不如CPU资源重要时,能够运用此战略。 |
顾客序列
所有顾客的消费序列(Sequence)都放在一个数组中,见com.lmax.disruptor.AbstractSequencer,经过SEQUENCE_UPDATER来更新对应的序列值。
调用更新的当地在com.lmax.disruptor.RingBuffer.addGatingSequences(Sequence… gatingSequences)。
消费太慢行列满了怎么办?
出产者线程被阻塞。出产者调用Sequenced.next()争夺写入权的时候需求判断最小的消费序列值进行比较。假如写入的方位还未消费则会进入循环不断获取最小消费序列值进行比较。
见包com.lmax.disruptor下SingleProducerSequencer或MultiProducerSequencer中next(int n)方法。
Disruptor开发步骤
- 创立Event、EventFactory、EventHandler和ExceptionHandler类
Event是环形行列(RingBuffer)中的元素,是出产者数据的载体;EventFactory是界说Event创立方法的工厂类;EventHandler则是Event的处理器,界说如何消费Event中的数据。
别的有必要界说一个消费反常处理器ExceptionHandler,它是和EventHandler绑定的。当EventHandler.onEvent()履行抛出反常时会履行对应的反常回调方法。
- 实例化Disruptor
创立Disruptor需求指定5个参数eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy。
EventFactory是上面界说的Event工厂类;
ringBufferSize是环形行列的长度,这个值要是2的N次方;
threadFactory是界说顾客线程创立方法的工厂类;
producerType是指明出产者是一个(SINGLE)仍是多个(MULTI)。默许是MULTI,会运用CAS(Compare And Swap/Set)确保线程安全。假如指定为SINGLE,则不运用没必要的CAS,使单线程处理更高效。
waitStrategy指明顾客等候出产时的战略。
- 设置顾客
指明EventHandler并绑定ExceptionHandler。指定多个EventHandler时,会为每个EventHandler分配一个线程,一个Event会被多个并行EventHandler处理。
也能够指明多个WorkHandler,每个WorkHandler分配一个线程并行消费行列中的Event,一个Event只会被一个WorkHandler处理。
- 创立/实例化EventTranslator
EventTranslator界说出产者数据转换为Event的方法,不同数量参数有不同的接口用来完成。
- 最终用Disruptor.publishEvent() 来发布元素指明EventTranslator和参数
比如程序
- 先引入Maven依靠
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
- Event
/**
* 事情
*
* @param <T>发布的数据类型
*/
public class MyEvent<T> {
private T data;
public T getData() {
return data;
}
public MyEvent<T> setData(T data) {
this.data = data;
return this;
}
}
- EventFactory
import com.lmax.disruptor.EventFactory;
/**
* 创立事情的工厂
*
* @param <T>发布的数据类型
*/
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {
@Override
public MyEvent<T> newInstance() {
return new MyEvent<>();
}
}
- EventHandler
import com.lmax.disruptor.EventHandler;
/**
* 事情消费方法
*
* @param <T>发布的数据类型
*/
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {
@Override
public void onEvent(MyEvent<T> tMyEvent, long l, boolean b) throws Exception {
System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + tMyEvent.getData());
}
}
- ExceptionHandler
import com.lmax.disruptor.ExceptionHandler;
/**
* 顾客反常处理器
*
* @param <T>发布的数据类型
*/
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {
@Override
public void handleEventException(Throwable ex, long sequence, MyEvent<T> event) {
System.out.println("handleEventException");
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("handleOnStartException");
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("handleOnShutdownException");
}
}
单顾客
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
/**
* 单顾客
*/
public class SingleConsumerSample {
public static void main(String[] args) {
// 环形数组长度,有必要是2的n次幂
int ringBufferSize = 1024;
// 创立事情(Event)目标的工厂
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 创立顾客线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等候战略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 指定一个处理器
MyEventHandler<String> eventHandler = new MyEventHandler<>();
disruptor.handleEventsWith(eventHandler);
// 处理器反常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
disruptor.start();
// 经过事情转换器(EventTranslator)来指明如何将发布的数据转换到事情目标(Event)中
// 这里是一个参数的转换器,别的还有两个(EventTranslatorTwoArg)、三个(EventTranslatorThreeArg)
// 和多个(EventTranslatorVararg)参数的转换器能够运用,参数类型能够不相同
EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
new EventTranslatorOneArg<MyEvent<String>, String>() {
@Override
public void translateTo(MyEvent<String> event, long sequence, String arg0) {
event.setData(arg0);
}
};
// 发布
for (int i = 0; i < 10; i++) {
disruptor.publishEvent(eventTranslatorOneArg, "One arg " + i);
}
disruptor.shutdown();
}
}
单顾客Lambda写法
这种只是迎合Java8 Lambda语法特性,代码更简洁。
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
public class LambdaSample {
public static void main(String[] args) {
// 环形数组长度,有必要是2的n次幂
int ringBufferSize = 1024;
// 创立顾客线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等候战略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 指定一个处理器
EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
disruptor.handleEventsWith(eventHandler);
// 处理器反常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
disruptor.start();
// 经过事情转换器(EventTranslator)来指明如何将发布的数据转换到事情目标(Event)中
// 一个参数的转换器
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg ");
// 两个参数的转换器
disruptor.publishEvent((event, sequence, pA, pB) -> event.setData(pA + pB), "Two arg ", 1);
// 三个参数的转换器
disruptor.publishEvent((event, sequence, pA, pB, pC) -> event.setData(pA + pB + pC)
, "Three arg ", 1, false);
// 多个参数的转换器
disruptor.getRingBuffer().publishEvent((event, sequence, params) -> {
List<String> paramList = Arrays.stream(params).map(Object::toString).collect(Collectors.toList());
event.setData("Var arg " + String.join(",", paramList));
}, "param1", "param2", "param3");
disruptor.shutdown();
}
}
多顾客重复消费元素
要害只在于指定多个EventHandler,而且EventHandler还能够别离绑定不同的ExceptionHandler。
每个EventHandler分配一个线程,一个Event会被每个EventHandler处理,合适两个不同的业务都需求处理同一个元素的状况,相似播送形式。
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
/**
* 一个元素多个顾客重复消费
*/
public class RepetitionConsumerSample {
public static void main(String[] args) {
// 环形数组长度,有必要是2的n次幂
int ringBufferSize = 1024;
// 创立事情(Event)目标的工厂
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 创立顾客线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等候战略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 这里指定了2个顾客,那就会产生2个消费线程,一个事情会被消费2次
EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler——2消费:" + event.getData());
disruptor.handleEventsWith(eventHandler, eventHandler2);
// 别离指定反常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);
disruptor.handleExceptionsFor(eventHandler2).with(exceptionHandler);
disruptor.start();
for (int i = 0; i < 10; i++) {
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
}
disruptor.shutdown();
}
}
多顾客
要害只在于界说WorkHandler,然后实例化多个来消费。
每个WorkHandler分配一个线程,一个元素只会被一个WorkHandler处理。
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
public class MultiConsumerSample {
public static void main(String[] args) {
// 环形数组长度,有必要是2的n次幂
int ringBufferSize = 1024;
// 创立事情(Event)目标的工厂
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 创立顾客线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等候战略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 处理器反常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
// 设置2个顾客,2个线程,一个Event只被一个顾客消费
WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
System.out.println(Thread.currentThread().getName() + "WorkHandler消费:" + tMyEvent.getData());
disruptor.handleEventsWithWorkerPool(workHandler, workHandler2);
disruptor.start();
for (int i = 0; i < 10; i++) {
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
}
disruptor.shutdown();
}
}
参考链接
Disruptor 主页
Disruptor 技能文档
GitHub Disruptor
GitHub Disruptor Getting Started
Maven Repository Disruptor Framework
LMAX 官网