基于共享内存实现的无锁队列 0 基本方案概要之前在工作中遇到过类似结构的组件,使用共享内存来转发从终端设备到云端的日志数据,所以这次想要趁着假期时间,实现一下这个组件
实现原理基本的环形队列算法下面是复读机时间
由于共享内存是连续的,用数组实现环形队列,有一个用来读的索引位置readIndex,还有一个用来写的writeIndex,记数组的实际容器大小为capacity,已使用的容积是usedLen
那么有
123456789if readIndex < writeIndex: usedLen = writeIndex - readIndexelif readIndex == writeIndex: usedLen = 0elif readIndex > writeIndex: # 第一段是writeIndex - 0 # 第二段是 capacity - readIndex # 两段加一块就是 capacity - readIndex + writeIndex usedLen = capacity - readIndex + writeIndex
readIndex < writeIndex的情况
readIndex > writeIndex的情况
为了区分队空队满的情况,规定readIndex和writeIndex相同时为空
单生产者,单消费者的无锁队列一写一读的场景,实现起来最简单。
写操作:先判断是否可以写入,如果可以,则先写数据,写完数据后再修改write_index。
读操作:先判断是否可以读取used_len > 0,如果可以,则先读数据,读完再修改read_index。
因为read_index和write_index都只会有一个地方去写,所以其实不需要加锁也不需要原子操作,直接修改即可。需要注意读写数据的时候都需要考虑遇到数组尾部的情况。
多生产者,单消费者的无锁队列
先读取write_index,判断新的数据是否有足够的空间可以写入。如果没有足够空间则返回队列满。
如果有足够的空间,则准备写入。
单生产者,单消费者的无锁队列,是先写数据再改write_index。多生产者写的时候为了避免同时写到同一片内存,需要先申请空间再写入数据。即先原子增加write_index,如果成功,再写入数据。
为了避免在生产者还未写完数据的时候,消费者就尝试读取,所以需要个同步机制告诉消费者数据正在写入中。比如头部预留一个字节,初始为0表示正在写入,写完数据后再改为1表示写入完成。头部中一般还有2字节表示数据长度。
消费者发现used_len > 0即可尝试读取。
如果首字节为0,表示数据正在写入,等待。
如果首字节不为0,表示数据已写完,可以读取。
消费者读取数据后,需要将read_index前移到合适的位置,且因为只有一个消费者,这里无需使用原子操作。
这种实现看似没有问题,其实也有隐患。如果生产者在修改write_index之后,在修改头部首字节为1之前,这段时间内crash的话,就会导致消费者永远停留在等待生产者写完的状态上,且这个状态无法自动恢复。
针对以上提到的问题有下面的两点优化
优化1 等待适当的时间 跳过一直无法写入成功的共享内存段
消费者发现要读的block-chain头的首个字节是0的时候 则等待一段时间,根据业务的不同,可以选择一直等待,或者等待一段时间后认为该生产者已经crash了需要跳过当前消费者,根据当前生产者占用的block数量进行一段内存的整段跳过
再写入数据限制了最大长度的情况下,以现代计算机的速度 从修改write_index到copy全部的字节入共享内存中,然后修改内存段的头部的标志位为1 这段时间是非常快的
可以适当放弃某个一直卡着的的生产者
优化2 生产者需要在头部事先声明自己自己将使用的内存大小,可以以block块为单位进行跳过
将队列分成N个定长的block:
C++版本的定义
12345678910111213141516171819202122// 为什么要保证header的长度是8byte? 因为要为64位计算机的字长64bit 可以为后续解决False-sharing性能优化做准备// C++struct Block { union { struct { bool m_used; uint8_t m_blk_cnt; uint8_t m_blk_idx; uint16_t m_blk_len; }; char m_head_reserved[8]; }; char m_data[kBlockDataSize]; bool CanUsed(uint8_t expected_blk_idx) const { return m_used && expected_blk_idx == m_blk_idx && m_blk_cnt <= kMaxBlockCount && m_blk_idx < m_blk_cnt && m_blk_len <= kBlockDataSize; }};
go版本的定义
1234567891011121314package demotype blockHeader struct { completed uint8 placeholder0 uint8 blockCount uint16 blockIndex uint16 blockDataLen uint16}type block struct { blockHeader data []byte}
生产者在申请使用write_index的时候需要提前计算自己需要使用的block数量,再用CAS+自旋将write_index前移blockcount,这样可以获得一段内存,并且可以抽象化为一段block-chain,写入数据的时候从最后一个block写到第一个block,最后将block-chain中第一个block的以完成标志位设置成1
当等待一段时间之后,发现当前被申请的block-chain还没有被writer写入,认为当前的writer crash了,根据block header中记录的block count跳过这段内存读下一段block-chain
当然,根据具体的需求不同,也可以适当调整等待时间或者标记上次无法正常写入的位置 整个共享内存环环绕一圈了之后才会出现数据脏写,不过那个时候还是可以blockheader中的信息index和count来判断block是否正常,如果不正常可以进行跳过处理
多生产者,多消费者的情况我认为这种场景在使用过程中是不需要考虑的
因为单消费者可以批量读取多个生产者生产的block-chain,一次读取可以读取一定的足够的可读数据 一般单读的性能就已经足够了
可以在批量读取再给消费者做一次进程内的多线程分发
如果单消费者的性能真的不能满足需求,说明读了之后的数据处理业务逻辑非常的重,那么这个时候的性能瓶颈就不会是队列读取这个地方
共享内存的特性和golang实现第二章节
假共享优化与具体实现第三章节
C接口和跨语言调用第四章节
DPDK相关共享内存 多生产者多消费者无锁队列实现补充章节