druid源码解读 您所在的位置:网站首页 druid超时回收 druid源码解读


2023-09-10 12:24| 来源: 网络整理| 查看: 265



DruidDataSource#getConnection //重写DruidAbstractDataSource的获取连接方法 @Override public DruidPooledConnection getConnection() throws SQLException { return getConnection(maxWait); } public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { //初始化 所以init方法是第一次获取连接的时候才会初始化 init(); if (filters.size() > 0) { // 如果有拦截器 拦截器链 责任链模式执行所有的拦截 FilterChainImpl filterChain = new FilterChainImpl(this); // 拦截器获取数据库连接代码详解 单独分析 应该也是执行完拦截后获取连接 return filterChain.dataSource_connect(this, maxWaitMillis); } else { // 直接获取连接 return getConnectionDirect(maxWaitMillis); } } // 拦截器获取数据库连接代码详解 @Override public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException { if (this.pos //超时重试的次数 int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { //获取连接的内部方法 poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; } if (testOnBorrow) { // 借用测试 boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { //验证异常 if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); } //关闭连接 discardConnection(poolableConnection.holder); continue; } } else { if (poolableConnection.conn.isClosed()) { discardConnection(poolableConnection.holder); // 传入null,避免重复关闭 continue; } // todo 不太能理解 既然已经在init的时候已经启动了守护线程createAndStartDestroyThread去进行空闲线程和超时线程的定时扫描和回收,为什么在获取连接的时候还要重新处理一次呢??? if (testWhileIdle) { //空闲检测有效性 final DruidConnectionHolder holder = poolableConnection.holder; long currentTimeMillis = System.currentTimeMillis(); long lastActiveTimeMillis = holder.lastActiveTimeMillis; long lastExecTimeMillis = holder.lastExecTimeMillis; long lastKeepTimeMillis = holder.lastKeepTimeMillis; //更新最近活跃时间 if (checkExecuteTime && lastExecTimeMillis != lastActiveTimeMillis) { lastActiveTimeMillis = lastExecTimeMillis; } if (lastKeepTimeMillis > lastActiveTimeMillis) { lastActiveTimeMillis = lastKeepTimeMillis; } //存活时间 long idleMillis = currentTimeMillis - lastActiveTimeMillis; long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis; if (timeBetweenEvictionRunsMillis //校验是否有效 boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); } //关闭连接 discardConnection(poolableConnection.holder); continue; } } } } if (removeAbandoned) { //当前连接 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); poolableConnection.connectStackTrace = stackTrace; poolableConnection.setConnectedTimeNano(); poolableConnection.traceEnable = true; activeConnectionLock.lock(); try { //活跃的连接 activeConnections.put(poolableConnection, PRESENT); } finally { activeConnectionLock.unlock(); } } if (!this.defaultAutoCommit) { //自动提交 poolableConnection.setAutoCommit(false); } return poolableConnection; } } // 获取连接getConnectionInternal时序图


// 获取连接getConnectionInternal源码分析 private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { if (closed) { //关闭 connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); } if (!enable) { //递增获取 connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } // 最大等待时长 final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); //最大等待线程数量 final int maxWaitThreadCount = this.maxWaitThreadCount; DruidConnectionHolder holder; for (boolean createDirect = false;;) { if (createDirect) { //直接创建 createStartNanosUpdater.set(this, System.nanoTime()); if (creatingCountUpdater.compareAndSet(this, 0, 1)) { //创建物理连接 PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); holder.lastActiveTimeMillis = System.currentTimeMillis(); creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this); if (LOG.isDebugEnabled()) { LOG.debug("conn-direct_create "); } boolean discard = false; lock.lock(); try { if (activeCount //如果当前连接大小大于峰值 更新峰值 activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } break; } else { //活跃连接大于最大连接 直接关闭 discard = true; } } finally { lock.unlock(); } if (discard) { //关闭连接 JdbcUtils.close(pyConnInfo.getPhysicalConnection()); } } } try { //尝试获取锁 lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); } try { if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { //有最大等待线程数量并且不为空等待线程数量大于最大等待线程数量,抛出异常 connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } if (onFatalError && onFatalErrorMaxActive > 0 && activeCount >= onFatalErrorMaxActive) { connectErrorCountUpdater.incrementAndGet(this); StringBuilder errorMsg = new StringBuilder(); errorMsg.append("onFatalError, activeCount ") .append(activeCount) .append(", onFatalErrorMaxActive ") .append(onFatalErrorMaxActive); if (lastFatalErrorTimeMillis > 0) { errorMsg.append(", time '") .append(StringUtils.formatDateTime19( lastFatalErrorTimeMillis, TimeZone.getDefault())) .append("'"); } if (lastFatalErrorSql != null) { errorMsg.append(", sql \n") .append(lastFatalErrorSql); } throw new SQLException( errorMsg.toString(), lastFatalError); } // 成功创建连接 connectCount++; if (createScheduler != null && poolingCount == 0 && activeCount createDirect = true; continue; } } if (maxWait > 0) { //如果有等待的 则把当前这个创建连接方在等待队列的最后 等待创建 // todo 下面详细解读 holder = pollLast(nanos); } else { //触发创建连接池的信号量v直接触发创建 // todo 下面详细解读 holder = takeLast(); } if (holder != null) { if (holder.discard) { continue; } activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException(e.getMessage(), e); } catch (SQLException e) { connectErrorCountUpdater.incrementAndGet(this); throw e; } finally { lock.unlock(); } break; } if (holder == null) { //异常日志打印 ....... } holder.incrementUseCount(); // 封装成DruidPooledConnection对象返回 DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); return poolalbeConnection; } pollLast(nanos) or takeLast(); // 直接创建 DruidConnectionHolder takeLast() throws InterruptedException, SQLException { try { while (poolingCount == 0) { //池中的连接数=0,释放empty信号 emptySignal(); if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } // 非空等待线程池的数量??? notEmptyWaitThreadCount++; //峰值判断 if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { //等待notEmpty,就是CreateConnectionTask往datasource的连接池put的时候 notEmpty.await(); // signal by recycle or creator } finally { //等待线程池 -- notEmptyWaitThreadCount--; } // 非空等待数量 这个能用来干嘛呢 notEmptyWaitCount++; if (!enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } } } catch (InterruptedException ie) { // 释放信号量 notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } decrementPoolingCount(); //获取最后一个 DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; return last; } //放在等待线程池最后面进行创建 private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { long estimate = nanos; for (;;) { if (poolingCount == 0) { //同上 等待创建 emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } if (estimate notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { long startEstimate = estimate; estimate = notEmpty.awaitNanos(estimate); // signal by // recycle or // creator notEmptyWaitCount++; notEmptyWaitNanos += (startEstimate - estimate); if (!enable) { //自增 connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } finally { notEmptyWaitThreadCount--; } if (poolingCount == 0) { if (estimate > 0) { continue; } waitNanosLocal.set(nanos - estimate); return null; } } // 获取最后一个连接返回 decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; long waitNanos = nanos - estimate; last.setLastNotEmptyWaitNanos(waitNanos); return last; } } 总结







      CopyRight 2018-2019 实验室设备网 版权所有