有网友碰到这样的问题“聊一聊disruptor-无锁并发框架”。小编为您整理了以下解决方案,希望对您有帮助:
解决方案1:
Disruptor论文中讲述一个实验,一个计数器循环自增5亿次
CAS:Compare And Swap/Set 顾名思义比较和交换
CPU级别的指令,cpu去更新一个值,但如果跟新过程中值发生了变化,操作就失败,然后重试,直到更新成功!
Disruptor的sequence的自增就是CAS的自旋自增,对应的,ArrayBlockQueue的数组索引index是互斥自增!
ArrayBloackQueue出队takeIndex索引所在元素设置为NULL,高吞吐量下队列会产生大量GC
RingBuffer的指针(cursor)属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一
生产者对RingBuffer更新序列号,之后会对volatile字段(cursor)的写操作创建了一个内存屏障,这个屏障将刷新所有缓存里的值(缓存失效)
消费者获取RingBuffer序列号,涉及到读冲突的缓存失效,C2在C1之后,C2拿到C1更新过的序列号之后,C2才能获取next序列号。内存屏障保证了他们之前的执行顺序,消费者总能获取最新的序列号
读写并行简图
多个生产者的情况下,会遇到“多个线程重复写同一个元素”的问题,解决方法是,每个线程获取不同的一段数组空间进行操作,这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS无脑自增即可判断。
Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:AvailableBuffer。当某个位置写入成功的时候,便把Availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。
消费端部分源码分析
1、读写不存在冲突:消费者读取到序号 x 位置元素都被生产者写入成功,消费者消费这一段区间数据。
2、读写存在冲突:消费者读取到序号x位置生产者正在写入,也就是下图availble Buffer中标记为-1的位置,则消费者返回该序号x,并执行一段等待策略
多线程环境下,多个生产者通过do/while循环的条件CAS,来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。
计算机系统中为了解决主内存与CPU运行速度的差距,在CPU与主内存之间添加(Cache)CPU硬件级别缓存系统中是以缓存行(cache line)为单位存储的,当多线程修改互相的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享,多线程环境下会导致缓存命中率很低!
Sequence实际value变量的左右均被填充了7个long型变量,其自身也是long型变量,一个long型变量占据8个字节,所以序号与他上一个/下一个序号之间的 最小内存 距离为:15 8=120byte,加上对象头的8个字节,可以确保sequence大小128byte=2 byte(有的CPU缓存行是128byte)
这样直接的代价就是增大的15倍的内存消耗空间,这样的设计导致不可能有两个cursor出现在同一个cpu cache line中, 就解决了”伪共享”问题!