高性能的消息框架 go-disruptor
GleQav
8年前
<p>Java程序员都知道, <a href="/misc/goto?guid=4958857406893331306" rel="nofollow,noindex">Disruptor</a> 是一个高性能的线程间通信的框架,即在同一个JVM进程中的多线程间消息传递,由LMAX开发。</p> <p>Disruptor性能是如此之高,LMAX利用它可以处理每秒6百万订单,用1微秒的延迟获得吞吐量为100K+。那么Go语言生态圈中有没有这样的库呢?</p> <p><a href="/misc/goto?guid=4959675740128920947" rel="nofollow,noindex">go-disruptor</a> 就是对Java Disruptor的移植,它也提供了与Java Disruptor类似的API设计,使用起来也算不上麻烦。</p> <p>至于性能呢,下面就会介绍,这也是本文的重点。</p> <p>因为Disruptor的高性能, 好多人对它都有所关注, 有一系列的文章介绍Disruptor,比如下列的文章和资源:</p> <ul> <li><a href="/misc/goto?guid=4959675740227817536" rel="nofollow,noindex">Disruptor Google Group</a></li> <li><a href="/misc/goto?guid=4959675740305666559" rel="nofollow,noindex">Bad Concurrency</a> (Michael Barker)</li> <li><a href="/misc/goto?guid=4959675740392487096" rel="nofollow,noindex">LMAX</a> (Planet)</li> <li><a href="/misc/goto?guid=4959675740488208661" rel="nofollow,noindex">LMAX Exchange</a></li> <li><a href="/misc/goto?guid=4959619587769777873" rel="nofollow,noindex">Disruptor presentation @ QCon SF</a></li> <li><a href="/misc/goto?guid=4959675740603254384" rel="nofollow,noindex">Disruptor Technical Paper</a></li> <li><a href="/misc/goto?guid=4958529908407552812" rel="nofollow,noindex">Mechanical Sympathy</a> (Martin Thompson)</li> <li><a href="/misc/goto?guid=4959675740723841408" rel="nofollow,noindex">Martin Fowler's Technical Review</a></li> <li><a href="/misc/goto?guid=4959675740801040282" rel="nofollow,noindex">.NET Disruptor Port</a></li> <li><a href="/misc/goto?guid=4959675740875558604" rel="nofollow,noindex">Introduction to the Disruptor</a></li> <li><a href="/misc/goto?guid=4959675740970316034" rel="nofollow,noindex">Disruptor wiki</a></li> </ul> <p>也有一些中文的翻译和介绍,比如 <a href="/misc/goto?guid=4959618569401777925" rel="nofollow,noindex">并发编程网的Disrutpor专题</a> 。</p> <p><a href="/misc/goto?guid=4959675741085571633" rel="nofollow,noindex">阿里巴巴封仲淹:如何优雅地使用Disruptor</a> 。</p> <p>Disruptor由LMAX开发,LMAX目标是要称为世界上最快的交易平台,为了取得低延迟和高吞吐率的目标,它们不得不开发一套高性能的生产者-消费者的消息框架。Java自己的Queue的性能还是有所延迟的,下图就是Disruptor和JDK ArrayBlockingQueue的性能比较。</p> <p><img src="https://simg.open-open.com/show/6ccaefdb9b6ea3e856579cfa8cbdb328.png"></p> <p>X轴显示的是延迟时间,Y轴是操作次数。可以看到Disruptor的延迟小,吞吐率高。</p> <p>Disruptor有多种使用模型和配置,官方的一些模型的测试结果的链接在 <a href="/misc/goto?guid=4959618569235981727" rel="nofollow,noindex">这里</a> 。</p> <p>我想做的其实就是go-disruptor和官方的Java Disruptor的性能比较。因为Disruptor有多种配置方式,单生产者和多生产者,单消费者和多消费者,配置的不同性能差别还是蛮大的,所以公平地讲,两者的比较应该使用相同的配置,尽管它们是由不同的编程语言开发的。</p> <p>我选取的一个测试方案是:3个生产者和一个消费者,如果使用一个生产者Java Disruptor的性能会成倍的提升。</p> <h2>Java Disruptor</h2> <p>Java的测试主类如下:</p> <pre> <code class="language-java">publicclassMain{ privatestaticfinalintNUM_PUBLISHERS =3;//Runtime.getRuntime().availableProcessors(); privatestaticfinalintBUFFER_SIZE =1024*64; privatestaticfinallongITERATIONS =1000L *1000L *20L; privatefinalExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS +1, DaemonThreadFactory.INSTANCE); privatefinalCyclicBarrier cyclicBarrier =newCyclicBarrier(NUM_PUBLISHERS +1); privatefinalRingBuffer<ValueEvent> ringBuffer = createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE,newBusySpinWaitStrategy()); privatefinalSequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); privatefinalValueAdditionEventHandler handler =newValueAdditionEventHandler(); privatefinalBatchEventProcessor<ValueEvent> batchEventProcessor =newBatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); privatefinalValueBatchPublisher[] valuePublishers =newValueBatchPublisher[NUM_PUBLISHERS]; { for(inti =0; i < NUM_PUBLISHERS; i++) { valuePublishers[i] = newValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS,16); } ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); } publiclongrunDisruptorPass()throwsException { finalCountDownLatch latch =newCountDownLatch(1); handler.reset(latch, batchEventProcessor.getSequence().get() + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS)); Future<?>[] futures = newFuture[NUM_PUBLISHERS]; for(inti =0; i < NUM_PUBLISHERS; i++) { futures[i] = executor.submit(valuePublishers[i]); } executor.submit(batchEventProcessor); longstart = System.currentTimeMillis(); cyclicBarrier.await(); //start test for(inti =0; i < NUM_PUBLISHERS; i++) { futures[i].get(); } //all published latch.await(); //all handled longopsPerSecond = (ITERATIONS *1000L) / (System.currentTimeMillis() - start); batchEventProcessor.halt(); returnopsPerSecond; } publicstaticvoidmain(String[] args)throwsException { Main m = newMain(); System.out.println("opsPerSecond:"+ m.runDisruptorPass()); } } </code></pre> <p>生产者和消费者类如下:</p> <pre> <code class="language-java">publicfinalclassValueAdditionEventHandlerimplementsEventHandler<ValueEvent> { privatelongvalue =0; privatelongcount; privateCountDownLatch latch; publiclonggetValue() { returnvalue; } publicvoidreset(finalCountDownLatch latch,finallongexpectedCount) { value = 0; this.latch = latch; count = expectedCount; } @Override publicvoidonEvent(finalValueEvent event,finallongsequence,finalbooleanendOfBatch)throwsException { value = event.getValue(); if(count == sequence) { latch.countDown(); } } } </code></pre> <pre> <code class="language-java">publicfinalclassValueBatchPublisherimplementsRunnable { privatefinalCyclicBarrier cyclicBarrier; privatefinalRingBuffer<ValueEvent> ringBuffer; privatefinallongiterations; privatefinalintbatchSize; publicValueBatchPublisher( finalCyclicBarrier cyclicBarrier, finalRingBuffer<ValueEvent> ringBuffer, finallongiterations, finalintbatchSize) { this.cyclicBarrier = cyclicBarrier; this.ringBuffer = ringBuffer; this.iterations = iterations; this.batchSize = batchSize; } @Override publicvoidrun() { try { cyclicBarrier.await(); for(longi =0; i < iterations; i += batchSize) { longhi = ringBuffer.next(batchSize); longlo = hi - (batchSize -1); for(longl = lo; l <= hi; l++) { ValueEvent event = ringBuffer.get(l); event.setValue(l); } ringBuffer.publish(lo, hi); } } catch(Exception ex) { thrownewRuntimeException(ex); } } } </code></pre> <pre> <code class="language-java">publicfinalclassValueEvent { privatelongvalue; publiclonggetValue() { returnvalue; } publicvoidsetValue(finallongvalue) { this.value = value; } publicstaticfinalEventFactory<ValueEvent> EVENT_FACTORY =newEventFactory<ValueEvent>() { publicValueEventnewInstance() { returnnewValueEvent(); } }; } </code></pre> <p>生产者使用三个线程去写数据,一个消费者进行处理。生产者运行在三个线程中,批处理写入,每次写16个数据。</p> <p>实际测试每秒能达到 <strong>183486238</strong> 的吞吐率, 也就是1.8亿的吞吐率。</p> <h2>go-disruptor</h2> <p>下面看看go-disruptor的性能能达到多少。</p> <p>我们知道,Go语言内置的goroutine之间的消息传递是通过channel实现的,go-disruptor官方网站上比较了go-disruptor和channel的性能,明显go-disruptor要比channel要好:</p> <table> <thead> <tr> <th>cenario</th> <th>Per Operation Time</th> </tr> </thead> <tbody> <tr> <td>Channels: Buffered, Blocking, GOMAXPROCS=1</td> <td>58.6 ns</td> </tr> <tr> <td>Channels: Buffered, Blocking, GOMAXPROCS=2</td> <td>86.6 ns</td> </tr> <tr> <td>Channels: Buffered, Blocking, GOMAXPROCS=3, Contended Write</td> <td>194 ns</td> </tr> <tr> <td>Channels: Buffered, Non-blocking, GOMAXPROCS=1</td> <td>26.4 ns</td> </tr> <tr> <td>Channels: Buffered, Non-blocking, GOMAXPROCS=2</td> <td>29.2 ns</td> </tr> <tr> <td>Channels: Buffered, Non-blocking, GOMAXPROCS=3, Contended Write</td> <td>110 ns</td> </tr> <tr> <td>Disruptor: Writer, Reserve One</td> <td>4.3 ns</td> </tr> <tr> <td>Disruptor: Writer, Reserve Many</td> <td>1.0 ns</td> </tr> <tr> <td>Disruptor: Writer, Reserve One, Multiple Readers</td> <td>4.5 ns</td> </tr> <tr> <td>Disruptor: Writer, Reserve Many, Multiple Readers</td> <td>0.9 ns</td> </tr> <tr> <td>Disruptor: Writer, Await One</td> <td>3.0 ns</td> </tr> <tr> <td>Disruptor: Writer, Await Many</td> <td>0.7 ns</td> </tr> <tr> <td>Disruptor: SharedWriter, Reserve One</td> <td>13.6 ns</td> </tr> <tr> <td>Disruptor: SharedWriter, Reserve Many</td> <td>2.5 ns</td> </tr> <tr> <td>Disruptor: SharedWriter, Reserve One, Contended Write</td> <td>56.9 ns</td> </tr> <tr> <td>Disruptor: SharedWriter, Reserve Many, Contended Write</td> <td>3.1 ns</td> </tr> </tbody> </table> <p>在与Java Disruptor相同的测试条件下go-disruptor的性能呢?</p> <p>下面是测试代码:</p> <pre> <code class="language-java">packagemain import( "fmt" "runtime" "sync" "time" disruptor "github.com/smartystreets/go-disruptor" ) const( RingBufferSize =1024*64 RingBufferMask = RingBufferSize -1 ReserveOne =1 ReserveMany =16 ReserveManyDelta = ReserveMany -1 DisruptorCleanup = time.Millisecond *10 ) varringBuffer = [RingBufferSize]int64{} funcmain() { NumPublishers :=3//runtime.NumCPU() totalIterations := int64(1000*1000*20) iterations := totalIterations / int64(NumPublishers) totalIterations = iterations * int64(NumPublishers) fmt.Printf("Total: %d, Iterations: %d, Publisher: %d, Consumer: 1\n", totalIterations, iterations, NumPublishers) runtime.GOMAXPROCS(NumPublishers) varconsumer = &countConsumer{TotalIterations: totalIterations, Count:0} consumer.WG.Add(1) controller := disruptor.Configure(RingBufferSize).WithConsumerGroup(consumer).BuildShared() controller.Start() defercontroller.Stop() varwg sync.WaitGroup wg.Add(NumPublishers +1) varsendWG sync.WaitGroup sendWG.Add(NumPublishers) fori :=0; i < NumPublishers; i++ { gofunc() { writer := controller.Writer() wg.Done() wg.Wait() current := disruptor.InitialSequenceValue forcurrent < totalIterations { current = writer.Reserve(ReserveMany) forj := current - ReserveMany; j <= current; j++ { ringBuffer[j&RingBufferMask] = j } writer.Commit(current-ReserveMany, current) } sendWG.Done() }() } wg.Done() t := time.Now().UnixNano() wg.Wait() //waiting for ready as a barrier fmt.Println("start to publish") sendWG.Wait() fmt.Println("Finished to publish") consumer.WG.Wait() fmt.Println("Finished to consume")//waiting for consumer t = (time.Now().UnixNano() - t) /1000000//ms fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t) } typecountConsumerstruct{ Count int64 TotalIterations int64 WG sync.WaitGroup } func(cc *countConsumer) Consume(lower, upperint64) { forlower <= upper { message := ringBuffer[lower&RingBufferMask] ifmessage != lower { warning := fmt.Sprintf("\nRace condition--Sequence: %d, Message: %d\n", lower, message) fmt.Printf(warning) panic(warning) } lower++ cc.Count++ //fmt.Printf("count: %d, message: %d\n", cc.Count-1, message) ifcc.Count == cc.TotalIterations { cc.WG.Done() return } } } </code></pre> <p>实际测试go-disruptor的每秒的吞吐率达到 <strong>137931020</strong> 。</p> <p>好了,至少我们在相同的测试case情况下得到了两组数据,另外我还做了相同case情况的go channel的测试,所以一共三组数据:</p> <ul> <li>Java Disruptor : 183486238 ops/s</li> <li>go-disruptor : 137931020 ops/s</li> <li>go channel : 6995452 ops/s</li> </ul> <p>可以看到go-disruptor的性能要略微低于Java Disruptor,但是也已经足够高了,达到1.4亿/秒,所以它还是值的我们关注的。go channel的性能远远不如前两者。</p> <h2>Go Channel</h2> <p>如果通过Go Channel实现,每秒的吞吐率为 6995452。</p> <p>代码如下:</p> <pre> <code class="language-java">funcmain() { NumPublishers :=3//runtime.NumCPU() totalIterations := int64(1000*1000*20) iterations := totalIterations / int64(NumPublishers) totalIterations = iterations * int64(NumPublishers) channel := make(chanint64,1024*64) varwg sync.WaitGroup wg.Add(NumPublishers +1) varreaderWG sync.WaitGroup readerWG.Add(1) fori :=0; i < NumPublishers; i++ { gofunc() { wg.Done() wg.Wait() fori :=int64(0); i < iterations; { select{ casechannel <- i: i++ default: continue } } }() } gofunc() { fori :=int64(0); i < totalIterations; i++ { select{ casemsg := <-channel: ifNumPublishers ==1&& msg != i { //panic("Out of sequence") } default: continue } } readerWG.Done() }() wg.Done() t := time.Now().UnixNano() wg.Wait() readerWG.Wait() t = (time.Now().UnixNano() - t) /1000000//ms fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t) } </code></pre> <p> </p> <p>来自:http://colobu.com/2016/07/22/using-go-disruptor/</p> <p> </p>