HikariDataSource核心源码分析

一、前言

上一篇文章讲了不合理的连接池代码导致的内存泄露事件,详见这篇文章记一次多数据源路由造成的数据库连接泄露排查过程。其中粗略的分析了HikariDataSource连接池的代码,并没有仔细分析。本篇文章带读者们一起去分析一波源码,看完本篇文章后,你可以
1、对锁和高并发有一定理解以及其在连接池中的运用
2、了解HikariDataSource业界性能最高连接池的原因
3、可以对连接池的原理有大致的了解,可以尝试自己实现一个连接池

二、源码分析

2.0 关键代码类介绍

  1. HikariDataSource对象:Hikari中的核心类为HikariDataSource,实现了DataSource的getConnetion接口
  2. HikariPool对象:HikariDataSource中有两个HikariPool对象,一个是fastPathPool是在HikariPool有参构造函数中创建, 如果没有创建fastPathPool,那么就会在getConnection方法时创建pool对象。
  3. ConcurrentBag对象:连接池的真正实现,实现了高性能高并发的无锁设计,主要的方法有borrow 借,requite 归还,add 新增,remove 去除。
  4. PoolEntry对象:对数据库connection对象进行包装,增加额外的属性,包括最后一次访问时间,是否丢弃,当前状态等等。HikariDataSource源码底层里面都是操作这个对象。

2.1 初始化(以下所有代码只列出关键实现)

初始化工作包括HikariDataSource初始化,HikariPool初始化,connectionBag初始化,一些线程池的初始化,最小连接数的初始化,以下逐一分析

public HikariDataSource(HikariConfig configuration)
{
  // 一个datasource有两个HikariPool成员变量,fastPathPool无参构造为null,用final修饰,pool有参构造,用volatile修饰
  // 因为volatile修饰的对象,需要从主内存读取,而且需要写入主内存等操作,所以最好在使用上用有参构造来构造HikariDataSource
  pool = fastPathPool = new HikariPool(this);
  // 连接池配置锁定无法修改
  this.seal();
}
// HikariPool初始化成员变量EntryCreator,创建PoolEntry的任务
private final PoolEntryCreator postFillPoolEntryCreator = new PoolEntryCreator("After adding ");

public HikariPool(final HikariConfig config)
{
  // 初始化配置,根据配置生成datasource变量
  super(config);
  // 生成真正的连接池,后续的获取连接释放连接都是从这里面弄的
  this.connectionBag = new ConcurrentBag<>(this);
  // 测试数据库的连通性
  checkFailFast();
  // 增加连接的线程池
  this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
  // 关闭连接的线程池
  this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
  // 初始化维持最小连接数任务
  this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
}

重点是维持最小连接数任务,如下:

private final class HouseKeeper implements Runnable{
  public void run(){
     // Detect retrograde time, allowing +128ms as per NTP spec.
    if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
       previous = now;
       // 标为丢弃的连接关闭
       softEvictConnections();
       return;
    }
    if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
       // 获取当前连接池中已经不是使用中的连接集合
       final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
       int toRemove = notInUse.size() - config.getMinimumIdle();
       for (PoolEntry entry : notInUse) {
          // 如果PoolEntry的最后一次访问的时间超过了idleTimeout并且将这个PoolEntry的状态变为不可借状态STATE_RESERVED
          // STATE_RESERVED状态底层变更是CAS变更,用到的类是AtomicIntegerFieldUpdater,可以对指定类的指定 volatile int 字段进行原子更新
          if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
             // 关闭连接
             closeConnection(entry, "(connection has passed idleTimeout)");
             toRemove--;
          }
       }
    }
    // 填充最小连接到minimum。 在初始化时候就填充连接,异步填充
    fillPool();
  }
}
// 填充连接
// 当没有达到最大连接数之前 或者 空闲连接数小于最小连接数时候 就异步提交创建poolEntryCreator任务
private synchronized void fillPool(){
  final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
                               - addConnectionQueueReadOnlyView.size();
  for (int i = 0; i < connectionsToAdd; i++) {
     addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
  }
}

// 关闭连接
void closeConnection(final PoolEntry poolEntry, final String closureReason){
    // 如果remove成功,将状态设置为STATE_REMOVED
	if (connectionBag.remove(poolEntry)) {
     final Connection connection = poolEntry.close();
     // 异步关闭
     closeConnectionExecutor.execute(() -> {
        quietlyCloseConnection(connection, closureReason);
        if (poolState == POOL_NORMAL) {
           fillPool();
        }
     });
  }
}

以上大致逻辑就是,将被标为丢弃的连接关闭,将空闲超时的连接进行关闭,然后进行进连接填充连接,填充连接的逻辑就是增加poolEntryCreator任务,poolEntryCreator逻辑在后面分析。

2.2 获取连接

获取连接是主要的实现逻辑,首先看HikariDataSource对象getConnnection方法

// 双重锁实现
public Connection getConnection(){
	HikariPool result = pool;
	if (result == null) {
	 synchronized (this) {
	    result = pool;
	    if (result == null) {
	       pool = result = new HikariPool(this);
	       // 锁定配置,不能热更新配置
	       this.seal();
	    }
	 }
	}
	return result.getConnection();
}

result.getConnection()就是HikariPool getConnection方法,这里面大致核心逻辑就是加锁在超时时间内获取poolEntry的connectionBag.borrow方法,重点着重borrow方法实现。

在讲之前,先介绍以下connectionBag的几个重要的成员变量

  1. final CopyOnWriteArrayList sharedList:当前所有缓存的poolEntry连接,都在这个list内,CopyOnWriteArrayList写时复制,在读多写少的场景下性能更高,一般情况下连接池中的poolEntry连接不会增加或者关闭,读场景多。
  2. final ThreadLocal<List> threadList :当前线程缓存的本地poolEntry的list。朝生夕灭的线程,是无法有效利用本地线程缓存的,只有在线程池场景或者当前线程多次使用getConnetion获取connection方法进行增删改时候,才会有效的使用ThreadLocal。
  3. final boolean weakThreadLocals:是否是弱引用,ThreadLocal可能会存在内存泄露的风险,当值为true时包装的poolEntry对象是弱引用,在内存不足时GC的时候会被回收,避免了出现内存泄露的问题。
  4. final IBagStateListener listener:监听器,监听创建poolEntry的任务
  5. final AtomicInteger waiters:当前正在等待获取连接的数量
  6. final SynchronousQueue handoffQueue:无存储元素的单个提供者和消费者通信队列,并且是公平模式。比如,是谁先来take操作,谁就会优先take成功,类似FIFO。
  7. this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16)):默认情况下会ThreadLocal value默认使用fastList来存储poolEntry,fastList是Hikari自己写一个不需要范围检查的一个List,而且它的remove方法是从后往前遍历删除的(和arrayList相反),刚好符合下面倒叙遍历获取poolEntry的逻辑
  8. public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
       {
          // Try the thread-local list first
          // 1、优先从本地线程缓存中获取poolEntry
          final List<Object> list = threadList.get();
          // 倒叙遍历,优先获取最近使用的poolEntry
          for (int i = list.size() - 1; i >= 0; i--) {
             final Object entry = list.remove(i);
             // 开启弱引用就对value进行弱引用包装
             final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
             // CAS成功就返回poolEntry
             if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                return bagEntry;
             }
          }
    
          // Otherwise, scan the shared list ... then poll the handoff queue
          // 2、本地缓存没有,那么从所有缓存的poolEntry连接列表中获取
          final int waiting = waiters.incrementAndGet();
          try {
             for (T bagEntry : sharedList) {
                if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                   // If we may have stolen another waiter's connection, request another bag add.
                   // 如果等待的任务大于1,添加一个监听任务
                   if (waiting > 1) {
                      listener.addBagItem(waiting - 1);
                   }
                   return bagEntry;
                }
             }
    
             // 如果sharedList都在使用状态中,添加一个监听任务
             listener.addBagItem(waiting);
    
    		 // 3、所有的连接正在被使用,超时等待其他poolEntry被归还通知handoffQueue
             timeout = timeUnit.toNanos(timeout);
             do {
                final long start = currentTime();
                // 从阻塞队列中获取bagEntry
                final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
                if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                   return bagEntry;
                }
    
                timeout -= elapsedNanos(start);
             } while (timeout > 10_000);
             return null;
          }finally {
             waiters.decrementAndGet();
          }
       }
    

    以上具体实现步骤

    1. 优先从本地线程缓存中获取poolEntry
    2. 本地缓存没有,那么从所有缓存的poolEntry连接列表中获取
    3. 所有的连接正在被使用,增加一个监听任务,这个任务就是异步创建poolEntry,以便给此次阻塞的线程提供poolEntry
    4. 超时等待其他poolEntry被归还或者新建后 通知handoffQueue,以便获取poolEntry

    可以总结出,Hikari连接池最大限度上减少多线程锁竞争,提升连接池的性能。

    然后看一下异步创建poolEntry poolEntryCreator的实现:

    private final class PoolEntryCreator implements Callable<Boolean> {
      @Override
      public Boolean call()
      {
         // 只有特定条件下才创建PoolEntry
         while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
            // 创建PoolEntry
            final PoolEntry poolEntry = createPoolEntry();
            if (poolEntry != null) {
               // 在连接池中增加PoolEntry
               connectionBag.add(poolEntry);
               return Boolean.TRUE;
            }
         }
    
         // Pool is suspended or shutdown or at max size
         return Boolean.FALSE;
      }
    
      // 1、总的连接数小于最大连接数 
      // 2、当前连接池中的等待获取连接的线程大于0  或者 连接池中的空闲连接小于最小连接池数
      // 满足所有上述条件后才创建poolEntry
      private synchronized boolean shouldCreateAnotherConnection() {
         return getTotalConnections() < config.getMaximumPoolSize() &&
            (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
      }
    }
    
    // 真正创建poolEntry的实现
    private PoolEntry createPoolEntry(){
      try {
          // 创建poolEntry
         final PoolEntry poolEntry = newPoolEntry();
    
         final long maxLifetime = config.getMaxLifetime();
         if (maxLifetime > 0) {
            // variance up to 2.5% of the maxlifetime
            final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
            final long lifetime = maxLifetime - variance;
            // 如果配置了maxlifetime,那么会给这一个连接增加一个延迟任务
            // 延迟任务主要就是将这个连接标记为Evict不可用
            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
               () -> {
                  if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
                     addBagItem(connectionBag.getWaitingThreadCount());
                  }
               },
               lifetime, MILLISECONDS));
         }
         return poolEntry;
      }
      return null;
    }
    

    由上面createPoolEntry可以知道,HikariCP在使用时不会关闭连接。如果使用中的连接到达maxLifetime时它将被标记为驱逐,并且在下一次线程尝试借用它时将被驱逐,这也是Hikari连接池设计的一个核心精髓。

    再来看一下connectionBag的add方法

    public void add(final T bagEntry){
          // 在sharedList增加bagEntry
          sharedList.add(bagEntry);
          // spin until a thread takes it or none are waiting
          // 满足一下条件后,让出当前线程CPU时间片,让其他线程去工作
          // 1、当等待获取连接的线程大于0  2、当前的PoolEntry状态是没占用  3、没有其他线程去从队列中取任务
          while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
             Thread.yield();
          }
       }
    

    可以看到,新增的poolEntry会加入到sharedList所有的缓存连接中,并且要满足上述三个条件的时候会一直循环让出CPU时间片,让其他线程从无界队列中去取连接

    2.3 释放连接

    Hikari实现了关闭连接Connection的方法,不是真正的关闭连接,而是归还到连接池当中,实现逻辑在ProxyConnection的close方法中,然后会调用poolEntry的recycle方法,最终会调用ConcurrentBag的requite方法,我们着重分析这个方法:

    // 此方法会将借来的对象返回到ConcurrentBag中。如果不归还会导致内存泄露
    public void requite(final T bagEntry){
      // 设置为未使用状态以便其他线程获取连接
      bagEntry.setState(STATE_NOT_IN_USE);
      for (int i = 0; waiters.get() > 0; i++) {
         // 传递信号给队列,优先告诉其他阻塞等待获取的线程获取poolEntry
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
         // 循环多次后,等待阻塞10纳秒
         else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
            // 让出时间片
            Thread.yield();
         }
      }
    
      // 更新最新poolEntry的本地线程的缓存中,以便当前线程下次获取连接
      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         // 对应上文中的fastList, 每次新增都是放到数组最后面
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
    }
    

    最后我们分析以下remove方法,很简单

    public boolean remove(final T bagEntry)
       {
          if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
             LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
             return false;
          }
          // 从所有缓存的list去除
          final boolean removed = sharedList.remove(bagEntry);
          // 从当前本地线程缓存中去除
          threadList.get().remove(bagEntry);
          return removed;
       }
    
    

    什么时候会调用remove,笔者总结了以下:
    1、当调用datasource shutdown方法时候
    2、当前connetion没有用的时候(可能是MySQL服务器down机)
    3、当前conntion被标记为丢弃时候,超过了maxLifetime被标记为丢弃
    4、当前connetion的最后一次使用时间和当前时间的差值大于idleTimeout时候

    以上可以理解了从初始化到getConnection获取连接,到closeConnectiond的核心逻辑,实现了核心逻辑的小闭环。

    三、自己写个连接池需要考虑哪些方面

    根据上面分析的源码,可以看到,如果自己实现一个连接池的话,需要考虑:

    1. 初始化的步骤,初始化最初的最小空闲数连接
    2. 取连接,从连接池中取,并且要上锁
    3. 归还连接,需要放回到连接池中,要上锁
    4. 如果连接池全被占用,是返回失败,还是让上游等待
    5. 拿到的连接,需要检测这个连接是否可用,是否还是活的,因为不知道服务端是否挂掉,抑或是连接超过maxlife被标记为evit正在关闭
    6. 连接池用什么数据结构存储,数组还是链表,还要考虑并发安全

    四、总结

    本篇文章从源码角度分析了一波,明白了为什么HikariDataSource是业界性能最高连接池的原因。我们可以更深入理解连接池背后的工作原理,以便后面出了线上问题可以轻松应对。最后读者可以试试自己实现一个连接池加深理解。
    以上文章有任何表达上或者技术问题欢迎指正。

    四、参考

    1. 数据库连接池之Hikari源码解析——https://www.cnblogs.com/jackion5/p/14193025.html