[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?

          

                 

                我们在之前分析源码的时候 ,说到了连接池初始化的中非常关键的两个方法 ,分别是  createAndStartCreatorThread()  和 createAndStartDestroyThread(),他们分别代表了连接的创建和销毁逻辑,我们上次从整个流程提炼出来,并对销毁逻辑做了拆解,这次我们再补齐对连接的创建逻辑的内容:


创建连接任务的主要流程如下:

[ 创建链接的核心流程 ]


0.入口



protected void createAndStartCreatorThread() {
    if (createScheduler == null) {
        String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
        createConnectionThread = new CreateConnectionThread(threadName);
        createConnectionThread.start();
        return;
    }
    
    initedLatch.countDown();
}

1.核心创建流程




public void run() {
    initedLatch.countDown();
    
    long lastDiscardCount = 0;
    int errorCount = 0;
    
    
    for (;;) {
        // addLast
        try {
            // 获取锁
            lock.lockInterruptibly();
        } catch (InterruptedException e2) {
            break;
        }
        
        long discardCount = DruidDataSource.this.discardCount;
        boolean discardChanged = discardCount - lastDiscardCount > 0;
        lastDiscardCount = discardCount;
        
        try {
            boolean emptyWait = true;
            
            // 螺丝刀补充: 创建失败,池中数量为0,抛弃状态false  -- 取消等待状态
            if (createError != null
                && poolingCount == 0
                && !discardChanged) {
                emptyWait = false;
            }
            
            // 螺丝刀补充: 异步初始化,链接创建数小于初始化大小  -- 取消等待状态
            if (emptyWait
                && asyncInit && createCount < initialSize) {
                emptyWait = false;
            }
            
            // 螺丝刀补充: 等待
            if (emptyWait) {
                // 必须存在线程等待,才创建连接
                if (poolingCount >= notEmptyWaitThreadCount
                    // 未开启保活 , 闲置连接数大于最小闲置连接
                    && (!(keepAlive && activeCount + poolingCount < minIdle))
                    // 没有执行失败的链接
                    && !isFailContinuous()
                   ) {
                    empty.await();
                }
                
                // 防止创建超过maxActive数量的连接
                if (activeCount + poolingCount >= maxActive) {
                    empty.await();
                    continue;
                }
            }
            
        } catch (InterruptedException e) {
            lastCreateError = e;
            lastErrorTimeMillis = System.currentTimeMillis();
            
            if ((!closing) && (!closed)) {
                LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
            }
            break;
        } finally {
            lock.unlock();
        }
        
        PhysicalConnectionInfo connection = null;
        
        try {
            // 螺丝刀补充:创建物理链接
            connection = createPhysicalConnection();
        } catch (SQLException e) {
            LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
                      + ", state " + e.getSQLState(), e);
            
            errorCount++;
            // 螺丝刀补充: 如果失败了 但是 没有超过重试时间,则执行重试
            if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                // fail over retry attempts
                setFailContinuous(true);
                // 快速失败
                if (failFast) {
                    lock.lock();
                    try {
                        // 唤醒所有
                        notEmpty.signalAll();
                    } finally {
                        lock.unlock();
                    }
                }
                
                if (breakAfterAcquireFailure) {
                    break;
                }
                
                try {
                    Thread.sleep(timeBetweenConnectErrorMillis);
                } catch (InterruptedException interruptEx) {
                    break;
                }
            }
        } catch (RuntimeException e) {
            LOG.error("create connection RuntimeException", e);
            setFailContinuous(true);
            continue;
        } catch (Error e) {
            LOG.error("create connection Error", e);
            setFailContinuous(true);
            break;
        }
        
        if (connection == null) {
            continue;
        }
        
        // 螺丝刀补充: 把创建的连接放入池子中
        boolean result = put(connection);
        if (!result) {
            JdbcUtils.close(connection.getPhysicalConnection());
            LOG.info("put physical connection to pool failed.");
        }
        
        errorCount = 0; // reset errorCount
        
        if (closing || closed) {
            break;
        }
    }
}
    }


2.物理链接创建




public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
    
        // 下面是一系列创建连接的基础属性设置
        String url = this.getUrl();
        Properties connectProperties = getConnectProperties();

        String user;
        if (getUserCallback() != null) {
            user = getUserCallback().getName();
        } else {
            user = getUsername();
        }

        String password = getPassword();
        PasswordCallback passwordCallback = getPasswordCallback();

        if (passwordCallback != null) {
            if (passwordCallback instanceof DruidPasswordCallback) {
                DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;

                druidPasswordCallback.setUrl(url);
                druidPasswordCallback.setProperties(connectProperties);
            }

            char[] chars = passwordCallback.getPassword();
            if (chars != null) {
                password = new String(chars);
            }
        }

        Properties physicalConnectProperties = new Properties();
        if (connectProperties != null) {
            physicalConnectProperties.putAll(connectProperties);
        }

        if (user != null && user.length() != 0) {
            physicalConnectProperties.put("user", user);
        }

        if (password != null && password.length() != 0) {
            physicalConnectProperties.put("password", password);
        }

        Connection conn = null;

        long connectStartNanos = System.nanoTime();
        long connectedNanos, initedNanos, validatedNanos;

        Map<String, Object> variables = initVariants
                ? new HashMap<String, Object>()
                : null;
        Map<String, Object> globalVariables = initGlobalVariants
                ? new HashMap<String, Object>()
                : null;

        createStartNanosUpdater.set(this, connectStartNanos);
        creatingCountUpdater.incrementAndGet(this);
        try {
            
            // 螺丝刀补充: 通过驱动创建物理连接核心逻辑
            conn = createPhysicalConnection(url, physicalConnectProperties);
            connectedNanos = System.nanoTime();

            if (conn == null) {
                throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
            }

            // 螺丝刀补充: 初始化物理连接
            initPhysicalConnection(conn, variables, globalVariables);
            initedNanos = System.nanoTime();

            // 螺丝刀补充: 校验链接
            validateConnection(conn);
            validatedNanos = System.nanoTime();

            setFailContinuous(false);
            setCreateError(null);
        } catch (SQLException ex) {
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (RuntimeException ex) {
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (Error ex) {
            createErrorCountUpdater.incrementAndGet(this);
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } finally {
            long nano = System.nanoTime() - connectStartNanos;
            createTimespan += nano;
            creatingCountUpdater.decrementAndGet(this);
        }

        return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
    }


2.1通过JDBC驱动创建连接




public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
        Connection conn;

        // 螺丝刀补充:JDBC驱动创建完链接并且记录( 先判断过滤器是否为空,不为空则先走过滤器 )
        if (getProxyFilters().isEmpty()) {
            conn = getDriver().connect(url, info);
        } else {
            conn = new FilterChainImpl(this).connection_connect(info);
        }

        createCountUpdater.incrementAndGet(this);

        return conn;
    }


2.2 创建连接如果失败的话,进行快速重试 以及 空连接判断和重试




        try {

                    // 螺丝刀补充:创建物理链接
                    connection = createPhysicalConnection();
                } catch (SQLException e) {
                    LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
                              + ", state " + e.getSQLState(), e);

                    errorCount++;
                    // 螺丝刀补充: 如果失败了 但是 没有超过重试时间,则执行重试
                    if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                        // fail over retry attempts
                        setFailContinuous(true);
                        // 快速失败
                        if (failFast) {
                            lock.lock();
                            try {
                                // 唤醒所有
                                notEmpty.signalAll();
                            } finally {
                                lock.unlock();
                            }
                        }

                        if (breakAfterAcquireFailure) {
                            break;
                        }

                        try {
                            Thread.sleep(timeBetweenConnectErrorMillis);
                        } catch (InterruptedException interruptEx) {
                            break;
                        }
                    }
                } catch (RuntimeException e) {
                    LOG.error("create connection RuntimeException", e);
                    setFailContinuous(true);
                    continue;
                } catch (Error e) {
                    LOG.error("create connection Error", e);
                    setFailContinuous(true);
                    break;
                }

                if (connection == null) {
                    continue;
                }


3.放入连接池(数组)




protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
        DruidConnectionHolder holder = null;
        try {
            // 螺丝刀补充: 创建 holder
            holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
        } catch (SQLException ex) {
            // 螺丝刀补充: 如果创建过程发生了异常则清除创建记录
            lock.lock();
            try {
                if (createScheduler != null) {
                    clearCreateTask(physicalConnectionInfo.createTaskId);
                }
            } finally {
                lock.unlock();
            }
            LOG.error("create connection holder error", ex);
            return false;
        }

        return put(holder, physicalConnectionInfo.createTaskId, false);
    }



3.1 put() 放入连接池最终的核心逻辑




private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
        lock.lock();
        try {

            // 螺丝刀补充: 下面三个目的都是判断是否要把链接记录清除
            if (this.closing || this.closed) {
                return false;
            }

            if (poolingCount >= maxActive) {
                if (createScheduler != null) {
                    clearCreateTask(createTaskId);
                }
                return false;
            }

            if (checkExists) {
                for (int i = 0; i < poolingCount; i++) {
                    if (connections[i] == holder) {
                        return false;
                    }
                }
            }

            // 螺丝刀补充: 关键点,holder放入池子中,也就是所谓的数组,并且累加记录
            connections[poolingCount] = holder;
            incrementPoolingCount();

            if (poolingCount > poolingPeak) {
                poolingPeak = poolingCount;
                poolingPeakTime = System.currentTimeMillis();
            }

            // 唤醒notEmpty锁
            notEmpty.signal();
            notEmptySignalCount++;

            // 螺丝刀补充: 这里判断,如果创建链接的任务不为空,则清除当前任务(因为已经完成了创建)
            if (createScheduler != null) {
                clearCreateTask(createTaskId);

                // 螺丝刀补充: 这里又是一堆判断, 相当于doubleCheck,然后唤醒empty锁,创建链接
                if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
                    && activeCount + poolingCount + createTaskCount < maxActive) {
                    emptySignal();
                }
            }
        } finally {
            lock.unlock();
        }
        return true;
    }