它是为了实现比 LinkedBlockingQueue 和 LinkedTransferQueue 更高的性能而特别定制的,根据连接池的特殊场景做了一些性能优化
public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {
// 底层存储 CopyOnWriteArrayList,可以无锁安全的读取连接池的信息
private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals; //默认是false
// 线程隔离的 FastList<PoolEntry>
private final ThreadLocal<List<Object>> threadList;
// 其实就是HikariPool,用来新建PoolEntry
private final IBagStateListener listener;
// 等待获取db连接的线程个数
private final AtomicInteger waiters;
private volatile boolean closed;
private final SynchronousQueue<T> handoffQueue;
//..
}
// 容器存放的的 IConcurrentBagEntry 接口实现类 PoolEntry
public interface IConcurrentBagEntry
{
// 空闲状态
int STATE_NOT_IN_USE = 0;
// 连接在使用
int STATE_IN_USE = 1;
// remove的时候先标记为被移除
int STATE_REMOVED = -1;
// 被预留状态,不可用但是可以移除
// 主要在检查线程中,要对连接进行softEvictConnection,确保能从not-in-use转到 reserved
int STATE_RESERVED = -2;
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
public ConcurrentBag(final IBagStateListener listener)
{
// 其实就是HikariPool, 负责创建PoolEntry
this.listener = listener;
// false
this.weakThreadLocals = useWeakThreadLocals();
this.handoffQueue = new SynchronousQueue<>(true);
// 等待获取db连接的线程个数
this.waiters = new **AtomicInteger**();
//底层存储 CopyOnWriteArrayList
this.sharedList = new **CopyOnWriteArrayList**<>();
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
}
else {
// ThreadLocal, 使用自定义的FastList
this.threadList = ThreadLocal.withInitial(() -> new **FastList**<>(IConcurrentBagEntry.class, 16));
}
}
borrow(long timeout, TimeUnit timeunit)
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
// **1.** 先从 threadlocal的FastList中取,倒序遍历, 同一个线程归还连接后再获取最新的连接 可避免一些检查
// 如果能取到就直接返回 这样无需锁
final List<Object> list = threadList.get();
for (int i = **list.size() - 1;** i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// **2.** threadlocal获取不到,则需要从底层的sharedList中去寻找
// 累加等待计数
// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
// 如果连接池里边无空闲连接了,则需要HikariPool去增加连接
listener.addBagItem(waiting);
// 进入超时等待循环
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
// 扣减等待计数
waiters.decrementAndGet();
}
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !**handoffQueue.offer(bagEntry))** {
Thread.yield();
}
}
将对象塞到底层 copyOnWriteArrayList里,如果有线程在等待连接,会把新增的这个 bagEntry塞到 handoffQueue,给阻塞在 borrow() 超时等待的线程
public void requite(final T bagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || **handoffQueue.offer(bagEntry)**) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
Thread.yield();
}
}
final List<Object> threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
**threadLocalList**.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}