它是为了实现比 LinkedBlockingQueue 和 LinkedTransferQueue 更高的性能而特别定制的,根据连接池的特殊场景做了一些性能优化

容器类定义

public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {
   // 底层存储 CopyOnWriteArrayList,可以无锁安全的读取连接池的信息
   private final CopyOnWriteArrayList<T> sharedList;
   private final boolean weakThreadLocals; //默认是false 
	 // 线程隔离的 FastList<PoolEntry>
   private final ThreadLocal<List<Object>> threadList;
	 // 其实就是HikariPool,用来新建PoolEntry
   private final IBagStateListener listener;
	 // 等待获取db连接的线程个数
   private final AtomicInteger waiters;
   private volatile boolean closed;

   private final SynchronousQueue<T> handoffQueue;
  //..
}

容器存储对象(条目)的定义

// 容器存放的的 IConcurrentBagEntry 接口实现类 PoolEntry 
public interface IConcurrentBagEntry
   {  
      // 空闲状态
      int STATE_NOT_IN_USE = 0;
      // 连接在使用
      int STATE_IN_USE = 1;
      // remove的时候先标记为被移除
      int STATE_REMOVED = -1;
      // 被预留状态,不可用但是可以移除 
      // 主要在检查线程中,要对连接进行softEvictConnection,确保能从not-in-use转到 reserved
      int STATE_RESERVED = -2;

      boolean compareAndSet(int expectState, int newState);
      void setState(int newState);
      int getState();
   }

它的几种状态扭转

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/3380ec01-f4eb-41f1-b764-97b0136dcc25/Untitled.png

连接池容器初始化

public ConcurrentBag(final IBagStateListener listener)
{
	 // 其实就是HikariPool, 负责创建PoolEntry
   this.listener = listener;
	 // false
   this.weakThreadLocals = useWeakThreadLocals();
   this.handoffQueue = new SynchronousQueue<>(true);
   // 等待获取db连接的线程个数
   this.waiters = new **AtomicInteger**();
   //底层存储 CopyOnWriteArrayList
   this.sharedList = new **CopyOnWriteArrayList**<>();
   if (weakThreadLocals) {
      this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
   }
   else {
			// ThreadLocal, 使用自定义的FastList
      this.threadList = ThreadLocal.withInitial(() -> new **FastList**<>(IConcurrentBagEntry.class, 16));
   }
}

容器操作 crud

  1. 从容器获取对象 borrow(long timeout, TimeUnit timeunit)
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
   // Try the thread-local list first
   // **1.** 先从 threadlocal的FastList中取,倒序遍历, 同一个线程归还连接后再获取最新的连接 可避免一些检查
   // 如果能取到就直接返回 这样无需锁
   final List<Object> list = threadList.get();
   for (int i = **list.size() - 1;** i >= 0; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked")
      final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }

   // **2.** threadlocal获取不到,则需要从底层的sharedList中去寻找
   // 累加等待计数
   // Otherwise, scan the shared list ... then poll the handoff queue
   final int waiting = waiters.incrementAndGet();
   try {
      for (T bagEntry : sharedList) {
         if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            // If we may have stolen another waiter's connection, request another bag add.
            if (waiting > 1) {
               listener.addBagItem(waiting - 1);
            }
            return bagEntry;
         }
      }

		  // 如果连接池里边无空闲连接了,则需要HikariPool去增加连接
      listener.addBagItem(waiting);
		  
      // 进入超时等待循环 
      timeout = timeUnit.toNanos(timeout);
      do {
         final long start = currentTime();
         final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
         if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }

         timeout -= elapsedNanos(start);
      } while (timeout > 10_000);

      return null;
   }
   finally {
     // 扣减等待计数
      waiters.decrementAndGet();
   }
  1. 新增对象 add(final T bagEntry)
public void add(final T bagEntry)
{
   if (closed) {
      LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }

   sharedList.add(bagEntry);

   // spin until a thread takes it or none are waiting
   while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !**handoffQueue.offer(bagEntry))** {
      Thread.yield();
   }
}

将对象塞到底层 copyOnWriteArrayList里,如果有线程在等待连接,会把新增的这个 bagEntry塞到 handoffQueue,给阻塞在 borrow() 超时等待的线程

  1. 归还对象 requite(final T bagEntry)
public void requite(final T bagEntry)
{
   bagEntry.setState(STATE_NOT_IN_USE);

   for (int i = 0; waiters.get() > 0; i++) {
      if (bagEntry.getState() != STATE_NOT_IN_USE || **handoffQueue.offer(bagEntry)**) {
         return;
      }
      else if ((i & 0xff) == 0xff) {
         parkNanos(MICROSECONDS.toNanos(10));
      }
      else {
         Thread.yield();
      }
   }

   final List<Object> threadLocalList = threadList.get();
   if (threadLocalList.size() < 50) {
      **threadLocalList**.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
}