一、前言
上一篇文章讲了不合理的连接池代码导致的内存泄露事件,详见这篇文章记一次多数据源路由造成的数据库连接泄露排查过程。其中粗略的分析了HikariDataSource连接池的代码,并没有仔细分析。本篇文章带读者们一起去分析一波源码,看完本篇文章后,你可以
1、对锁和高并发有一定理解以及其在连接池中的运用
2、了解HikariDataSource业界性能最高连接池的原因
3、可以对连接池的原理有大致的了解,可以尝试自己实现一个连接池
二、源码分析
2.0 关键代码类介绍
- HikariDataSource对象:Hikari中的核心类为HikariDataSource,实现了DataSource的getConnetion接口
- HikariPool对象:HikariDataSource中有两个HikariPool对象,一个是fastPathPool是在HikariPool有参构造函数中创建, 如果没有创建fastPathPool,那么就会在getConnection方法时创建pool对象。
- ConcurrentBag对象:连接池的真正实现,实现了高性能高并发的无锁设计,主要的方法有borrow 借,requite 归还,add 新增,remove 去除。
- PoolEntry对象:对数据库connection对象进行包装,增加额外的属性,包括最后一次访问时间,是否丢弃,当前状态等等。HikariDataSource源码底层里面都是操作这个对象。
2.1 初始化(以下所有代码只列出关键实现)
初始化工作包括HikariDataSource初始化,HikariPool初始化,connectionBag初始化,一些线程池的初始化,最小连接数的初始化,以下逐一分析
public HikariDataSource(HikariConfig configuration)
{
// 一个datasource有两个HikariPool成员变量,fastPathPool无参构造为null,用final修饰,pool有参构造,用volatile修饰
// 因为volatile修饰的对象,需要从主内存读取,而且需要写入主内存等操作,所以最好在使用上用有参构造来构造HikariDataSource
pool = fastPathPool = new HikariPool(this);
// 连接池配置锁定无法修改
this.seal();
}
// HikariPool初始化成员变量EntryCreator,创建PoolEntry的任务
private final PoolEntryCreator postFillPoolEntryCreator = new PoolEntryCreator("After adding ");
public HikariPool(final HikariConfig config)
{
// 初始化配置,根据配置生成datasource变量
super(config);
// 生成真正的连接池,后续的获取连接释放连接都是从这里面弄的
this.connectionBag = new ConcurrentBag<>(this);
// 测试数据库的连通性
checkFailFast();
// 增加连接的线程池
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
// 关闭连接的线程池
this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化维持最小连接数任务
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
}
重点是维持最小连接数任务,如下:
private final class HouseKeeper implements Runnable{
public void run(){
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
previous = now;
// 标为丢弃的连接关闭
softEvictConnections();
return;
}
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
// 获取当前连接池中已经不是使用中的连接集合
final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
int toRemove = notInUse.size() - config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
// 如果PoolEntry的最后一次访问的时间超过了idleTimeout并且将这个PoolEntry的状态变为不可借状态STATE_RESERVED
// STATE_RESERVED状态底层变更是CAS变更,用到的类是AtomicIntegerFieldUpdater,可以对指定类的指定 volatile int 字段进行原子更新
if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
// 关闭连接
closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
}
// 填充最小连接到minimum。 在初始化时候就填充连接,异步填充
fillPool();
}
}
// 填充连接
// 当没有达到最大连接数之前 或者 空闲连接数小于最小连接数时候 就异步提交创建poolEntryCreator任务
private synchronized void fillPool(){
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueueReadOnlyView.size();
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
}
}
// 关闭连接
void closeConnection(final PoolEntry poolEntry, final String closureReason){
// 如果remove成功,将状态设置为STATE_REMOVED
if (connectionBag.remove(poolEntry)) {
final Connection connection = poolEntry.close();
// 异步关闭
closeConnectionExecutor.execute(() -> {
quietlyCloseConnection(connection, closureReason);
if (poolState == POOL_NORMAL) {
fillPool();
}
});
}
}
以上大致逻辑就是,将被标为丢弃的连接关闭,将空闲超时的连接进行关闭,然后进行进连接填充连接,填充连接的逻辑就是增加poolEntryCreator任务,poolEntryCreator逻辑在后面分析。
2.2 获取连接
获取连接是主要的实现逻辑,首先看HikariDataSource对象getConnnection方法
// 双重锁实现
public Connection getConnection(){
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
pool = result = new HikariPool(this);
// 锁定配置,不能热更新配置
this.seal();
}
}
}
return result.getConnection();
}
result.getConnection()就是HikariPool getConnection方法,这里面大致核心逻辑就是加锁在超时时间内获取poolEntry的connectionBag.borrow方法,重点着重borrow方法实现。
在讲之前,先介绍以下connectionBag的几个重要的成员变量
- final CopyOnWriteArrayList
sharedList:当前所有缓存的poolEntry连接,都在这个list内,CopyOnWriteArrayList写时复制,在读多写少的场景下性能更高,一般情况下连接池中的poolEntry连接不会增加或者关闭,读场景多。 - final ThreadLocal<List
- final boolean weakThreadLocals:是否是弱引用,ThreadLocal可能会存在内存泄露的风险,当值为true时包装的poolEntry对象是弱引用,在内存不足时GC的时候会被回收,避免了出现内存泄露的问题。
- final IBagStateListener listener:监听器,监听创建poolEntry的任务
- final AtomicInteger waiters:当前正在等待获取连接的数量
- final SynchronousQueue
handoffQueue:无存储元素的单个提供者和消费者通信队列,并且是公平模式。比如,是谁先来take操作,谁就会优先take成功,类似FIFO。 - this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16)):默认情况下会ThreadLocal value默认使用fastList来存储poolEntry,fastList是Hikari自己写一个不需要范围检查的一个List,而且它的remove方法是从后往前遍历删除的(和arrayList相反),刚好符合下面倒叙遍历获取poolEntry的逻辑
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
// 1、优先从本地线程缓存中获取poolEntry
final List<Object> list = threadList.get();
// 倒叙遍历,优先获取最近使用的poolEntry
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
// 开启弱引用就对value进行弱引用包装
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
// CAS成功就返回poolEntry
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// Otherwise, scan the shared list ... then poll the handoff queue
// 2、本地缓存没有,那么从所有缓存的poolEntry连接列表中获取
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
// 如果等待的任务大于1,添加一个监听任务
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
// 如果sharedList都在使用状态中,添加一个监听任务
listener.addBagItem(waiting);
// 3、所有的连接正在被使用,超时等待其他poolEntry被归还通知handoffQueue
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
// 从阻塞队列中获取bagEntry
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}finally {
waiters.decrementAndGet();
}
}
以上具体实现步骤
- 优先从本地线程缓存中获取poolEntry
- 本地缓存没有,那么从所有缓存的poolEntry连接列表中获取
- 所有的连接正在被使用,增加一个监听任务,这个任务就是异步创建poolEntry,以便给此次阻塞的线程提供poolEntry
- 超时等待其他poolEntry被归还或者新建后 通知handoffQueue,以便获取poolEntry
可以总结出,Hikari连接池最大限度上减少多线程锁竞争,提升连接池的性能。
然后看一下异步创建poolEntry poolEntryCreator的实现:
private final class PoolEntryCreator implements Callable<Boolean> {
@Override
public Boolean call()
{
// 只有特定条件下才创建PoolEntry
while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
// 创建PoolEntry
final PoolEntry poolEntry = createPoolEntry();
if (poolEntry != null) {
// 在连接池中增加PoolEntry
connectionBag.add(poolEntry);
return Boolean.TRUE;
}
}
// Pool is suspended or shutdown or at max size
return Boolean.FALSE;
}
// 1、总的连接数小于最大连接数
// 2、当前连接池中的等待获取连接的线程大于0 或者 连接池中的空闲连接小于最小连接池数
// 满足所有上述条件后才创建poolEntry
private synchronized boolean shouldCreateAnotherConnection() {
return getTotalConnections() < config.getMaximumPoolSize() &&
(connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
}
}
// 真正创建poolEntry的实现
private PoolEntry createPoolEntry(){
try {
// 创建poolEntry
final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
// 如果配置了maxlifetime,那么会给这一个连接增加一个延迟任务
// 延迟任务主要就是将这个连接标记为Evict不可用
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
() -> {
if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
addBagItem(connectionBag.getWaitingThreadCount());
}
},
lifetime, MILLISECONDS));
}
return poolEntry;
}
return null;
}
由上面createPoolEntry可以知道,HikariCP在使用时不会关闭连接。如果使用中的连接到达maxLifetime时它将被标记为驱逐,并且在下一次线程尝试借用它时将被驱逐,这也是Hikari连接池设计的一个核心精髓。
再来看一下connectionBag的add方法
public void add(final T bagEntry){
// 在sharedList增加bagEntry
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
// 满足一下条件后,让出当前线程CPU时间片,让其他线程去工作
// 1、当等待获取连接的线程大于0 2、当前的PoolEntry状态是没占用 3、没有其他线程去从队列中取任务
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
可以看到,新增的poolEntry会加入到sharedList所有的缓存连接中,并且要满足上述三个条件的时候会一直循环让出CPU时间片,让其他线程从无界队列中去取连接
2.3 释放连接
Hikari实现了关闭连接Connection的方法,不是真正的关闭连接,而是归还到连接池当中,实现逻辑在ProxyConnection的close方法中,然后会调用poolEntry的recycle方法,最终会调用ConcurrentBag的requite方法,我们着重分析这个方法:
// 此方法会将借来的对象返回到ConcurrentBag中。如果不归还会导致内存泄露
public void requite(final T bagEntry){
// 设置为未使用状态以便其他线程获取连接
bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {
// 传递信号给队列,优先告诉其他阻塞等待获取的线程获取poolEntry
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
// 循环多次后,等待阻塞10纳秒
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
// 让出时间片
Thread.yield();
}
}
// 更新最新poolEntry的本地线程的缓存中,以便当前线程下次获取连接
final List<Object> threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
// 对应上文中的fastList, 每次新增都是放到数组最后面
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
最后我们分析以下remove方法,很简单
public boolean remove(final T bagEntry)
{
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
// 从所有缓存的list去除
final boolean removed = sharedList.remove(bagEntry);
// 从当前本地线程缓存中去除
threadList.get().remove(bagEntry);
return removed;
}
什么时候会调用remove,笔者总结了以下:
1、当调用datasource shutdown方法时候
2、当前connetion没有用的时候(可能是MySQL服务器down机)
3、当前conntion被标记为丢弃时候,超过了maxLifetime被标记为丢弃
4、当前connetion的最后一次使用时间和当前时间的差值大于idleTimeout时候
以上可以理解了从初始化到getConnection获取连接,到closeConnectiond的核心逻辑,实现了核心逻辑的小闭环。
三、自己写个连接池需要考虑哪些方面
根据上面分析的源码,可以看到,如果自己实现一个连接池的话,需要考虑:
- 初始化的步骤,初始化最初的最小空闲数连接
- 取连接,从连接池中取,并且要上锁
- 归还连接,需要放回到连接池中,要上锁
- 如果连接池全被占用,是返回失败,还是让上游等待
- 拿到的连接,需要检测这个连接是否可用,是否还是活的,因为不知道服务端是否挂掉,抑或是连接超过maxlife被标记为evit正在关闭
- 连接池用什么数据结构存储,数组还是链表,还要考虑并发安全
四、总结
本篇文章从源码角度分析了一波,明白了为什么HikariDataSource是业界性能最高连接池的原因。我们可以更深入理解连接池背后的工作原理,以便后面出了线上问题可以轻松应对。最后读者可以试试自己实现一个连接池加深理解。
以上文章有任何表达上或者技术问题欢迎指正。
四、参考
- 数据库连接池之Hikari源码解析——https://www.cnblogs.com/jackion5/p/14193025.html