百度 - UidGenerator源码解析

Understand how Baidu UidGenerator works

Posted by Chris on August 2, 2017


UidGenerator是Java实现的,基于Snowflake算法的唯一ID生成器。UidGenerator以组件形式工作在应用项目中,支持自定义workerId位数和初始化策略,从而适用于Docker等虚拟化环境下实例自动重启、漂移等场景。 在实现上,UidGenerator通过采用RingBuffer来缓存已生成的UID,并行化UID的生产和消费,同时对CacheLine补齐,避免了由RingBuffer带来的硬件级「伪共享」问题。


本文基于commit id ba696f535ba6b000b96dd73a7b697e4a00c88085所写,为编写本文时(2017-08-02 21:59:47)的最新的Master分支,阅读时须注意未来的版本迭代有可能造成功能上的差异。


 └── baidu
     └── fsg
         └── uid
             ├── BitsAllocator.java			- Bit分配器(C)
             ├── UidGenerator.java			- UID生成的接口(I)
             ├── buffer
             │   ├── BufferPaddingExecutor.java		- 填充RingBuffer的执行器(C)
             │   ├── BufferedUidProvider.java		- RingBuffer中UID的提供者(C)
             │   ├── RejectedPutBufferHandler.java	- 拒绝Put到RingBuffer的处理器(C)
             │   ├── RejectedTakeBufferHandler.java	- 拒绝从RingBuffer中Take的处理器(C)
             │   └── RingBuffer.java			- 内含两个环形数组(C)
             ├── exception
             │   └── UidGenerateException.java		- 运行时异常
             ├── impl
             │   ├── CachedUidGenerator.java		- RingBuffer存储的UID生成器(C)
             │   └── DefaultUidGenerator.java		- 无RingBuffer的默认UID生成器(C)
             ├── utils
             │   ├── DateUtils.java
             │   ├── DockerUtils.java
             │   ├── EnumUtils.java
             │   ├── NamingThreadFactory.java
             │   ├── NetUtils.java
             │   ├── PaddedAtomicLong.java
             │   └── ValuedEnum.java
             └── worker
                 ├── DisposableWorkerIdAssigner.java	- 用完即弃的WorkerId分配器(C)
                 ├── WorkerIdAssigner.java		- WorkerId分配器(I)
                 ├── WorkerNodeType.java		- 工作节点类型(E)
                 ├── dao
                 │   └── WorkerNodeDAO.java		- MyBatis Mapper
                 └── entity
                     └── WorkerNodeEntity.java		- MyBatis Entity




BitsAllocator - Bit分配器



     * Allocate bits for UID according to delta seconds & workerId & sequence<br>
     * <b>Note that: </b>The highest bit will always be 0 for sign
     * @param deltaSeconds
     * @param workerId
     * @param sequence
     * @return
    public long allocate(long deltaSeconds, long workerId, long sequence) {
        return (deltaSeconds << timestampShift) | (workerId << workerIdShift) | sequence;

DisposableWorkerIdAssigner - Worker ID分配器


    public long assignWorkerId() {
        // build worker node entity
        WorkerNodeEntity workerNodeEntity = buildWorkerNode();

        // add worker node for new (ignore the same IP + PORT)
        LOGGER.info("Add worker node:" + workerNodeEntity);

        return workerNodeEntity.getId();


RingBuffer - 用于存储UID的双环形数组结构

我们先看RingBuffer的field outline,这样能大致了解到他的工作模式:

     * Constants
    private static final int START_POINT = -1;
    private static final long CAN_PUT_FLAG = 0L;
    private static final long CAN_TAKE_FLAG = 1L;
	// 默认扩容阈值
    public static final int DEFAULT_PADDING_PERCENT = 50;

     * The size of RingBuffer's slots, each slot hold a UID
     * <p>
     * buffer的大小为2^n
    private final int bufferSize;
     * 因为bufferSize为2^n,indexMask为bufferSize-1,作为被余数可快速取模
    private final long indexMask;
     * 盛装UID的数组
    private final long[] slots;
     * 盛装flag的数组(是否可读或者可写)
    private final PaddedAtomicLong[] flags;

     * Tail: last position sequence to produce
    private final AtomicLong tail = new PaddedAtomicLong(START_POINT);

     * Cursor: current position sequence to consume
    private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);

     * Threshold for trigger padding buffer
    private final int paddingThreshold;

     * Reject putbuffer handle policy
     * <p>
     * 拒绝方式为打印日志
    private RejectedPutBufferHandler rejectedPutHandler = this::discardPutBuffer;
     * Reject take buffer handle policy
     * <p>
     * 拒绝方式为抛出异常并打印日志
    private RejectedTakeBufferHandler rejectedTakeHandler = this::exceptionRejectedTakeBuffer;

     * Executor of padding buffer
     * <p>
     * 填充RingBuffer的executor
    private BufferPaddingExecutor bufferPaddingExecutor;


即使是不同线程间对slots进行串行写操作(下文会详述)在多核处理器下应该也会使得该数组发生伪共享问题,因为Java线程在目前来说并不能绑定CPU,所以在修改相同的Cache Line的时候,是有十分可能产生RFO信号的。





     * Put an UID in the ring & tail moved<br>
     * We use 'synchronized' to guarantee the UID fill in slot & publish new tail sequence as atomic operations<br>
     * <p>
     * <b>Note that: </b> It is recommended to put UID in a serialize way, cause we once batch generate a series UIDs and put
     * the one by one into the buffer, so it is unnecessary put in multi-threads
     * @param uid
     * @return false means that the buffer is full, apply {@link RejectedPutBufferHandler}
    public synchronized boolean put(long uid) {
        long currentTail = tail.get();
        long currentCursor = cursor.get();
        // 首次put时,currentTail为-1,currentCursor为0,此时distance为-1
        long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
        // tail catches the cursor, means that you can't put anything cause of RingBuffer is full
        if (distance == bufferSize - 1) {
            rejectedPutHandler.rejectPutBuffer(this, uid);
            return false;

        // 1. pre-check whether the flag is CAN_PUT_FLAG
        // 首次put时,currentTail为-1
        int nextTailIndex = calSlotIndex(currentTail + 1);
        if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
            rejectedPutHandler.rejectPutBuffer(this, uid);
            return false;

        // 2. put UID in the next slot
        slots[nextTailIndex] = uid;
        // 3. update next slot' flag to CAN_TAKE_FLAG
        // 4. publish tail with sequence increase by one

        // The atomicity of operations above, guarantees by 'synchronized'. In another word,
        // the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet())
        return true;


UID的读取是一个lock free操作,使用CAS成功将tail往后移动之后即视为线程安全。

     * Take an UID of the ring at the next cursor, this is a lock free operation by using atomic cursor<p>
     * <p>
     * Before getting the UID, we also check whether reach the padding threshold,
     * the padding buffer operation will be triggered in another thread<br>
     * If there is no more available UID to be taken, the specified {@link RejectedTakeBufferHandler} will be applied<br>
     * @return UID
     * @throws IllegalStateException if the cursor moved back
    public long take() {
        // spin get next available cursor
        long currentCursor = cursor.get();
        // cursor初始化为-1,现在cursor等于tail,所以初始化时nextCursor为-1
        long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);

        // check for safety consideration, it never occurs
        // 初始化或者全部UID耗尽时nextCursor == currentCursor
        Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");

        // trigger padding in an async-mode if reach the threshold
        long currentTail = tail.get();
        // 会有多个线程去触发padding事件,但最终只会有一条线程执行padding操作
        if (currentTail - nextCursor < paddingThreshold) {
            LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
                    nextCursor, currentTail - nextCursor);
            bufferPaddingExecutor.asyncPadding();	// (a)

        // cursor catch the tail, means that there is no more available UID to take
        if (nextCursor == currentCursor) {

        // 1. check next slot flag is CAN_TAKE_FLAG
        int nextCursorIndex = calSlotIndex(nextCursor);
        // 这个位置必须要是可以TAKE
        Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");

        // 2. get UID from next slot
        // 取出UID
        long uid = slots[nextCursorIndex];
        // 3. set next slot flag as CAN_PUT_FLAG.
        // 告知flags数组这个位置是可以被重用了

        // Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
        // slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
        return uid;


BufferPaddingExecutor - RingBuffer元素填充器


     * Padding buffer fill the slots until to catch the cursor
     * <p>
     * 该方法被即时填充和定期填充所调用
    public void paddingBuffer() {
        LOGGER.info("{} Ready to padding buffer lastSecond:{}. {}", this, lastSecond.get(), ringBuffer);

        // is still running
        // 这个是代表填充executor在执行,不是RingBuffer在执行。为免多个线程同时扩容。
        if (!running.compareAndSet(false, true)) {
            LOGGER.info("Padding buffer is still running. {}", ringBuffer);

        // fill the rest slots until to catch the cursor
        boolean isFullRingBuffer = false;
        while (!isFullRingBuffer) {
            // 填充完指定SECOND里面的所有UID,直至填满
            List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
            for (Long uid : uidList) {
                isFullRingBuffer = !ringBuffer.put(uid);
                if (isFullRingBuffer) {

        // not running now
        running.compareAndSet(true, false);
        LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);


  1. 该类还提供定时填充功能,如果有设置开关则会生效,默认不会启用周期性填充。
     * Start executors such as schedule
    public void start() {
        if (bufferPadSchedule != null) {
            bufferPadSchedule.scheduleWithFixedDelay(this::paddingBuffer, scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
  1. 在take()方法中检测到达到填充阈值时,会进行异步填充。
     * Padding buffer in the thread pool
    public void asyncPadding() {


BufferedUidProvider - UID的提供者,在本仓库中以lambda形式出现在com.baidu.fsg.uid.impl.CachedUidGenerator#nextIdsForOneSecond

RejectedPutBufferHandler - 当RingBuffer满时拒绝继续添加的处理者,在本仓库中的表现形式为com.baidu.fsg.uid.buffer.RingBuffer#discardPutBuffer

RejectedTakeBufferHandler - 当RingBuffer为空时拒绝获取UID的处理者,在本仓库中的表现形式为com.baidu.fsg.uid.buffer.RingBuffer#exceptionRejectedTakeBuffer

CachedUidGenerator - 使用RingBuffer的UID生成器

该类在应用中作为Spring Bean注入到各个组件中,主要作用是初始化RingBufferBufferPaddingExecutor。获取ID是通过委托RingBuffer的take()方法达成的,而最重要的方法为BufferedUidProvider的提供者,即lambda表达式中的nextIdsForOneSecond(long)方法

     * Get the UIDs in the same specified second under the max sequence
     * @param currentSecond
     * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
    protected List<Long> nextIdsForOneSecond(long currentSecond) {
        // Initialize result list size of (max sequence + 1)
        int listSize = (int) bitsAllocator.getMaxSequence() + 1;
        List<Long> uidList = new ArrayList<>(listSize);

        // Allocate the first sequence of the second, the others can be calculated with the offset
        long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
        for (int offset = 0; offset < listSize; offset++) {
            uidList.add(firstSeqUid + offset);

        return uidList;



  1. RIngBuffer的填充时机有3个:CachedUidGenerator时对RIngBuffer初始化、RIngBuffer#take()时检测达到阈值和周期性填充(如果有打开)。
  2. RingBuffer的slots数组多读少写,不考虑伪共享问题。
  3. JDK8中-XX:-RestrictContended搭配@sun.misc.Contended