解读Disruptor系列–解读源码(0)之源码导读

澳门新葡亰网站注册 7

这个故事源自一个很简单的想法:创建一个对开发人员友好的、简单轻量的线程间通讯框架,完全不用锁、同步器、信号量、等待和通知,在Java里开发一个轻量、无锁的线程内通讯框架;并且也没有队列、消息、事件或任何其他并发专用的术语或工具。

本篇文章是后续解读Disruptor源码的导读,适合对Disruptor还不了解的同学。如果有兴趣,还可以看下我之前发的Disruptor系列文章。
要大概弄明白Disruptor是个什么玩意,可以先回答这几个问题:
Disruptor是什么?
为什么要用Disruptor?
Disruptor为什么那么快?

只用普通的老式Java接口实现POJO的通讯。

Disruptor是什么?

我也来个一句话总结:Disruptor是LMAX开源的、用于替代并发线程间数据交换的有界队列的、可选无锁的、高性能的线程间通讯框架。

听起来是不是感觉高大上?是不是觉得很好奇怎么个高性能?是不是还有点云里雾里咋交换数据?这也是我最初的感觉。
慢慢来,我们逐步分解这些名词。
Disruptor:首先我们看看Disruptor这个词。熟悉星际迷航(Star
Trek)的同学可能对这个词会更熟悉,Disruptor(裂解炮)和Phaser(相位枪)都是星际迷航中的武器,Phaser是联邦武器,而Disruptor是克林贡的等价物。看起来Disruptor和Phaser是类似的,那Phaser又是啥呢?在jdk1.7中,Doug
Lea大师为我们带来了Phaser,一个可重用的同步屏障,功能类似CyclicBarrier和CountDownLatch,但是使用更加灵活。
LMAX团队之所以为Disruptor起了这个名字,一方面是因为Disruptor和Phaser在处理依赖图表(或者说是消费链、多阶段处理、流水线)时有类似的地方;另一方面,Disruptor这个词本身就有分裂、破坏的意思,很符合Disruptor中的设计思想对于传统并发编程思想的某种“对立”–并发编程总是在想如何利用多线程去拥有更大的吞吐量、更少的延迟,而Disruptor却通过非常多的性能测试发现,在并发编程中正是过多的线程间通讯、争抢导致了性能瓶颈,而最终选择了单线程的处理逻辑去完成业务处理。
LMAX:LMAX是英国一个交易量很大的金融交易所,也是开源Disruptor的组织。
开源:https://github.com/LMAX-Exchange/disruptor
并发线程间数据交换的有界队列
:在并发编程时,经常需要在线程间交换数据,如任务分发、事件传递、状态变更等,
这时通常需要多个线程去访问一个线程安全的数据结构,一般选择使用队列(Queue)实现。队列在容量上又分为有界和无界,使用无界队列通常都需要保证生产者生产速度小于消费者消费速度,否则将由于内存耗尽导致灾难结果。为了避免这种情况,队列通常是有界的。Java中经常使用BlockingQueue作为并发线程使用的有界队列,使用put()、take()在队列满、空的情况下进行等待,非常适合多线程间的数据共享,实现方式一般有ArrayBlockingQueue和LinkedBlockingQueue。
线程间通讯:等同于并发线程间的数据交换。
可选无锁:使用Disruptor唯一可能遇到Java锁的时候,就是在消费者等待可用事件进行消费时。而Disruptor为这个等待过程,编写了包括使用锁和不使用锁的多种策略,可根据不同场景和需求进行选择。
高性能:单生产者+单消费者性能测试:
使用LinkedBlockingQueue(https://github.com/LMAX-Exchange/disruptor/blob/master/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueThroughputTest.java

Starting Queue tests
Run 0, BlockingQueue=3,687,315 ops/sec
Run 1, BlockingQueue=2,721,088 ops/sec
Run 2, BlockingQueue=3,471,017 ops/sec
Run 3, BlockingQueue=5,347,593 ops/sec
Run 4, BlockingQueue=5,316,321 ops/sec
Run 5, BlockingQueue=5,449,591 ops/sec
Run 6, BlockingQueue=5,173,305 ops/sec

使用常用的带Translator的Disruptor测试(https://github.com/LMAX-Exchange/disruptor/blob/master/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java)

Starting Disruptor tests
Run 0, Disruptor=20,222,446 ops/sec
Run 1, Disruptor=70,671,378 ops/sec
Run 2, Disruptor=37,878,787 ops/sec
Run 3, Disruptor=37,105,751 ops/sec
Run 4, Disruptor=38,986,354 ops/sec
Run 5, Disruptor=27,578,599 ops/sec
Run 6, Disruptor=34,281,796 ops/sec

综上的名词解释,聪明如你,想必已经明白个大概了。如果还是不明白,肯定是我讲的不好,请留言。

它可能跟Akka的类型化actor类似,但作为一个必须超级轻量,并且要针对单台多核计算机进行优化的新框架,那个可能有点过了。

为什么要用Disruptor?

那为什么要用Disruptor呢?或者说Disruptor解决了什么问题呢?
一般情况下,新的发明创造都是伴随着对旧有事物的痛点产生的。在并发线程间交换数据这个问题上,使用传统的阻塞队列有什么问题呢?
根据LMAX团队在2011年发布的论文,可简单归结为以下几个问题:

  1. 锁的成本:
    传统阻塞队列使用锁保证线程安全。而锁通过操作系统内核的上下文切换实现,会暂停线程去等待锁直到释放。执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态。这种状态会导致高水平的写入争用。
  2. 伪共享问题导致的性能低下。
  3. 澳门新葡亰网站注册,队列是垃圾的重要来源,队列中的元素和用于存储元素的节点对象需要进行频繁的重新分配。

论文中给出的,循环5亿次64位计数操作使用时间对比:

澳门新葡亰网站注册 1

执行5亿次64位计数操作使用时间对比

Disruptor通过良好的设计,最大限度解决了上述三个问题。
好奇如你,难道就不想知道其中奥妙吗?

当actor跨越不同JVM实例(在同一台机器上,或分布在网络上的不同机器上)的进程边界时,Akka框架很善于处理进程间的通讯。

Disruptor为什么那么快?

简单来说,Disruptor在设计上有以下几点优势:

  1. 内部数据存储使用环形缓冲(Ring
    Buffer),在启动时进行对象内存分配,这个对象并非数据本身,而只是一个数据容器。这个容器由用户提供,在Disruptor运行时,生产者负责拿到容器设置好数据,消费者再去可用的容器中拿到数据完成消费。这样分配对象内存,将有极大可能让这些对象内存在主存中连续分配,从而支持了CPU缓存位置预测。否则每次new一个对象,很难知道对象内存分配到哪了。这样做好有个好处,环形缓冲在JVM生命周期中通常是永生的,GC的压力更小
  2. 尽量使用无锁设计,合理使用CAS。举两个例子。
    a.
    通过区分是否允许并发生产者,细分单生产者模式和多生产者模式,单生产者单线程更新游标,多生产者模式使用CAS更新游标位置。
    b.
    在阻塞型等待策略中,将等待分两部分。第一部分,消费者等待生产者发布新数据,由于不确定等待时间,使用锁的条件等待。而第二部分,消费者等待上一组消费者(Disruptor支持链式消费,类似Phaser分阶段处理)
    完成此数据消费时,由于已确定有可消费数据,且假定通常的消费时间较短,所以使用自旋(忙循环)来避免上下文切换导致的性能开销。能不用锁就不用锁,即便要用锁,也要将使用锁的粒度用的最小。
  3. 优化数据结构,解决伪共享问题。Java中通过填充缓存行,来解决伪共享问题的思路,现在可能已经是老生常谈,连Java8中都新增了sun.misc.Contended注解来避免伪共享问题。但在Disruptor刚出道那会儿,能够以“机械同情(mechanical
    sympathy)”的思考方式,来优化Java数据结构,这恐怕还很新潮。
  4. 优化其他实现细节。举几个例子。
    a.
    如一个RingBuffer,容量bufferSize为2的幂。使用sequence表示事件序号,可通过sequence & (bufferSize - 1)定位元素的index,比普通的求余取模(%)要快得多。事先计算好bufferSize以2为底的对数indexShift,可通过sequence >>> indexShift快速计算出sequence/bufferSize的商flag(其实相当于当前sequence在环形跑道上跑了几圈,在数据生产时要设置好flag,在数据消费时就可以通过比对flag判断此位置的数据是不是本圈的数据),比除法要快得多。
    b.
    合理使用Unsafe,实现更加高效地内存管理和原子访问。Unsafe封装了一些类似C/C++中的指针操作,除了JDK,在Netty、Spring、Kafka、Storm等非常多的流行开源项目中都使用了Unsafe。

简单讲到这,后续开始分享解读Disruptor源码的内容。


参考资料
1.Java8使用@sun.misc.Contended避免伪共享 –
http://www.jianshu.com/p/c3c108c3dcfd
2.写Java也得了解CPU–CPU缓存
http://www.cnblogs.com/techyc/p/3607085.html
3.linux查看CPU高速缓存(cache)信息
http://www.cnblogs.com/kekukele/p/3829369.html
4.理解 CPU Cache
http://wsfdl.com/linux/2016/06/11/%E7%90%86%E8%A7%A3CPU%E7%9A%84cache.html
5.关于CPU Cache — 程序猿需要知道的那些事
http://cenalulu.github.io/linux/all-about-cpu-cache/
6.Mechanical Sympathy – Memory Barriers/Fences
https://mechanical-sympathy.blogspot.tw/2011/07/memory-barriersfences.html
7.《Java高并发程序设计》- 4.4.3 Java中的指针:Unsafe类

但对于那种只需要线程间通讯的小型项目而言,用Akka类型化actor可能有点儿像用牛刀杀鸡,不过类型化actor仍然是一种理想的实现方式。

我花了几天时间,用动态代理,阻塞队列和缓存线程池创建了一个解决方案。

图一是这个框架的高层次架构:

澳门新葡亰网站注册 2
图一: 框架的高层次架构

SPSC队列是指单一生产者/单一消费者队列。MPSC队列是指多生产者/单一消费者队列。

派发线程负责接收Actor线程发送的消息,并把它们派发到对应的SPSC队列中去。

接收到消息的Actor线程用其中的数据调用相应的actor实例中的方法。借助其他actor的代理,actor实例可以将消息发送到MPSC队列中,然后消息会被发送给目标actor线程。

我创建了一个简单的例子来测试,就是下面这个打乒乓球的程序:

public interface PlayerA (
  void pong(long ball); //发完就忘的方法调用 
}
public interface PlayerB {   
  void ping(PlayerA playerA, long ball); //发完就忘的方法调用 
}    
public class PlayerAImpl implements PlayerA {    
  @Override    
  public void pong(long ball) {    
  }    
}
public class PlayerBImpl implements PlayerB {   
  @Override    
  public void ping(PlayerA playerA, long ball) {    
    playerA.pong(ball);    
  }    
}
public class PingPongExample {   
  public void testPingPong() {
    // 管理器隐藏了线程间通讯的复杂性
    // 控制actor代理,actor实现和线程  
    ActorManager manager = new ActorManager();
    // 在管理器内注册actor实现 
    manager.registerImpl(PlayerAImpl.class);    
    manager.registerImpl(PlayerBImpl.class);
    //创建actor代理。代理会将方法调用转换成内部消息。 
    //会在线程间发给特定的actor实例。    
    PlayerA playerA = manager.createActor(PlayerA.class);    
    PlayerB playerB = manager.createActor(PlayerB.class);    
    for(int i = 0; i < 1000000; i++) {    
       playerB.ping(playerA, i);     
   }    
}

经过测试,速度大约在每秒500,000
次乒/乓左右;还不错吧。然而跟单线程的运行速度比起来,我突然就感觉没那么好了。在 单线程中运行的代码每秒速度能达到20亿
(2,681,850,373)!

居然差了5,000
多倍。太让我失望了。在大多数情况下,单线程代码的效果都比多线程代码更高效。

我开始找原因,想看看我的乒乓球运动员们为什么这么慢。经过一番调研和测试,我发现是阻塞队列的问题,我用来在actor间传递消息的队列影响了性能。

澳门新葡亰网站注册 3

图 2: 只有一个生产者和一个消费者的SPSC队列

所以我发起了一场竞赛,要将它换成Java里最快的队列。我发现了Nitsan
Wakart的 博客 。他发了几篇文章介绍单一生产者/单一消费者(SPSC)无锁队列的实现。这些文章受到了Martin
Thompson的演讲 终极性能的无锁算法的启发。

跟基于私有锁的队列相比,无锁队列的性能更优。在基于锁的队列中,当一个线程得到锁时,其它线程就要等着锁被释放。而在无锁的算法中,某个生产者线程生产消息时不会阻塞其它生产者线程,消费者也不会被其它读取队列的消费者阻塞。

在Martin
Thompson的演讲以及在Nitsan的博客中介绍的SPSC队列的性能简直令人难以置信—— 超过了100M
ops/sec。比JDK的并发队列实现还要快10倍
(在4核的 Intel Core i7 上的性能大约在 8M ops/sec 左右)。

我怀着极大的期望,将所有actor上连接的链式阻塞队列都换成了无锁的SPSC队列。可惜,在吞吐量上的性能测试并没有像我预期的那样出现大幅提升。不过很快我就意识到,瓶颈并不在SPSC队列上,而是在多个生产者/单一消费者(MPSC)那里。

用SPSC队列做MPSC队列的任务并不那么简单;在做put操作时,多个生产者可能会覆盖掉彼此的值。SPSC
队列就没有控制多个生产者put操作的代码。所以即便换成最快的SPSC队列,也解决不了我的问题。

为了处理多个生产者/单一消费者的情况,我决定启用LMAX
Disruptor ——一个基于环形缓冲区的高性能进程间消息库。

澳门新葡亰网站注册 4

图3: 单一生产者和单一消费者的LMAX Disruptor

借助Disruptor,很容易实现低延迟、高吞吐量的线程间消息通讯。它还为生产者和消费者的不同组合提供了不同的用例。几个线程可以互不阻塞地读取环形缓冲中的消息:

澳门新葡亰网站注册 5

图 4: 单一生产者和两个消费者的LMAX Disruptor    

下面是有多个生产者写入环形缓冲区,多个消费者从中读取消息的场景。

澳门新葡亰网站注册 6

图 5: 两个生产者和两个消费者的LMAX Disruptor

经过对性能测试的快速搜索,我找到了 三个发布者和一个消费者的吞吐量测试。
这个真是正合我意,它给出了下面这个结果:

LinkedBlockingQueue Disruptor
Run 0 4,550,625 ops/sec 11,487,650 ops/sec
Run 1 4,651,162 ops/sec 11,049,723 ops/sec
Run 2 4,404,316 ops/sec 11,142,061 ops/sec

在3 个生产者/1个 消费者场景下,
Disruptor要比LinkedBlockingQueue快两倍多。然而这跟我所期望的性能上提升10倍仍有很大差距。

这让我觉得很沮丧,并且我的大脑一直在搜寻解决方案。就像命中注定一样,我最近不在跟人拼车上下班,而是改乘地铁了。突然灵光一闪,我的大脑开始将车站跟生产者消费者对应起来。在一个车站里,既有生产者(车和下车的人),也有消费者(同一辆车和上车的人)。

我创建了
Railway类,并用AtomicLong追踪从一站到下一站的列车。我先从简单的场景开始,只有一辆车的铁轨。

public class RailWay {  
 private final Train train = new Train();  
 // stationNo追踪列车并定义哪个车站接收到了列车
 private final AtomicInteger stationIndex = new AtomicInteger();
// 会有多个线程访问这个方法,并等待特定车站上的列车 
public Train waitTrainOnStation(final int stationNo) {

   while (stationIndex.get() % stationCount != stationNo) {
    Thread.yield(); // 为保证高吞吐量的消息传递,这个是必须的。
                   //但在等待列车时它会消耗CPU周期 
   }  
   // 只有站号等于stationIndex.get() % stationCount时,这个忙循环才会返回

   return train;
 }
// 这个方法通过增加列车的站点索引将这辆列车移到下一站
  public void sendTrain() {
    stationIndex.getAndIncrement();
   }
  }

为了测试,我用的条件跟在Disruptor性能测试中用的一样,并且也是测的SPSC队列——测试在线程间传递long值。我创建了下面这个Train类,其中包含了一个long数组:

public class Train {   
  //   
  public static int CAPACITY = 2*1024;
  private final long[] goodsArray; // 传输运输货物的数组

  private int index;

  public Train() {   
      goodsArray = new long[CAPACITY];     
 }

 public int goodsCount() { //返回货物数量    
  return index;    
 }    
 public void addGoods(long i) { // 向列车中添加条目    
  goodsArray[index++] = i;    
 }    
 public long getGoods(int i) { //从列车中移走条目    
  index--;    
  return goodsArray[i];    
 }    
}

然后我写了一个简单的测试 :两个线程通过列车互相传递long值。

澳门新葡亰网站注册 7

图 6: 使用单辆列车的单一生产者和单一消费者Railway

public void testRailWay() {   
  final Railway railway = new Railway();    
  final long n = 20000000000l;    
  //启动一个消费者进程 
  new Thread() {    
   long lastValue = 0;
   @Override   
   public void run() {    
    while (lastValue < n) {    
      Train train = railway.waitTrainOnStation(1); //在#1站等列车
      int count = train.goodsCount();    
      for (int i = 0; i < count; i++) {    
        lastValue = train.getGoods(i); // 卸货   
      }    
      railway.sendTrain(); //将当前列车送到第一站 
     }    
   }    
 }.start();

final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
 Train train = railway.waitTrainOnStation(0); // 在#0站等列车    
 int capacity = train.getCapacity();    
 for (int j = 0; j < capacity; j++) {    
   train.addGoods((int)i++); // 将货物装到列车上 
 }    
 railway.sendTrain();
 if (i % 100000000 == 0) { //每隔100M个条目测量一次性能 
    final long duration = System.nanoTime() - start;    
    final long ops = (i * 1000L * 1000L * 1000L) / duration;    
    System.out.format("ops/sec = %,dn", ops);    
    System.out.format("trains/sec = %,dn", ops / Train.CAPACITY);    
    System.out.format("latency nanos = %.3f%nn", 
    duration / (float)(i) * (float)Train.CAPACITY);    
  }    
 }    
}

在不同的列车容量下运行这个测试,结果惊着我了:

容量 吞吐量: ops/sec 延迟: ns
1 5,190,883 192.6
2 10,282,820 194.5
32 104,878,614 305.1
256 344,614,640 742. 9
2048 608,112,493 3,367.8
32768 767,028,751 42,720.7

在列车容量达到32,768时,两个线程传送消息的吞吐量达到了767,028,751
ops/sec。比Nitsan博客中的SPSC队列快了几倍。