当应用需要进行业务处理时,首先会执行一个getConnection的操作,用于从连接池中获取连接,当业务处理完成后,需要把连接放回到连接池中,执行一个returnConnection的操作。
下面先看一下getConnection的源码:
//getConnection方法返回的值是一个连接监听对象ConnectionListener public ConnectionListener getConnection(Subject subject, ConnectionRequestInfo cri) throws ResourceException { subject = (subject == null) ? defaultSubject : subject; //获取连接信息 cri = (cri == null) ? defaultCri : cri; //打印startWait,即当前时间,精确到毫秒 long startWait = System.currentTimeMillis(); try { /*等待最多xx毫秒获取一个信号量(permit),即permit操作。 * permit操作用于获取当前可用的信号量,即是否有可以使用的信号量。 因此,因为在创建连接池的时候,我们也创建了一个max值的信号集, 所以,对于连接池中的连接数未达到max值的时候,肯定有可以使用的信号量。 除非,所有的连接都已经在使用状态,并且连接数已经达到max值, 这时,才有可能出现没有信号量,并出现超时的情况。 */ if (permits.attempt(poolParams.blockingTimeout)) { //计算本次获取连接的阻塞时间=当前时间-开始获取信号量的时间 long poolBlockTime = System.currentTimeMillis() - startWait ; //累加阻塞时间。 connectionCounter.updateBlockTime(poolBlockTime); //我们有一个权限去获取一个连接,判断是否在连接池中已经有一个可用的连接? ConnectionListener cl = null; do { //线程安全,即从连接池中获取连接是一个串行操作 synchronized (cls) { //判断连接池是否已经被shutdown,如果被shutdown, //则抛出异常"The pool has been shutdown",并释放信号量 if (shutdown.get()) { permits.release(); throw new ResourceException("The pool has been shutdown"); } //如果可用的连接事件监听的arraylist大于0,则从arraylist的尾部取一个连接 if (cls.size() > 0) { cl = (ConnectionListener) cls.remove(cls.size() - 1); //将arraylist中获取的连接加到到checkdout的一个hash结构中. checkedOut.add(cl); //计算已经使用的连接数 int size = (int) (maxSize - permits.permits()); //更新当前已经使用的最大连接数 if (size > maxUsedConnections) maxUsedConnections = size; } } //如果已经从连接事件监听数组中获取到了连接 if (cl != null) { //我们从一个pool取一个ManagedConnection,并检查它是否符合要求? try { Object matchedMC = mcf.matchManagedConnections (Collections.singleton(cl.getManagedConnection()) ,subject, cri); if (matchedMC != null) { if (trace) log.trace("supplying ManagedConnection from pool: " + cl); //通知connection listener是否它拥有权限 cl.grantPermit(true); //返回连接,结束 return cl; } /* * 匹配不成功,并且没有异常信息抛出。 * 在检查的时候,要么我们匹配错误,要么连接已经死亡 我们需要去辨别这些场景,但是现在,不管怎么样,我们都销毁连接。 */ log.warn("Destroying connection that could not be successfully matched: " + cl); synchronized (cls) { //从checkout的hashset中删除已经获取的连接。 checkedOut.remove(cl); } //销毁连接 doDestroy(cl); cl = null; } //不管发生任何事,都销毁连接 catch (Throwable t) { log.warn("Throwable while trying to match ManagedConnection, destroying connection: " + cl, t); synchronized (cls) { checkedOut.remove(cl); } doDestroy(cl); cl = null; } //如果发生意外,我们应该决定是否应该继续尝试去建立连接, //由jboss配置文件中的的useFastFail参数来决定,默认为false。 //这个useFastFail设置为true,则立刻跳出get connection,并报错。 if(poolParams.useFastFail) { log.trace("Fast failing for connection attempt. No more attempts will be made to acquire connection from pool and a new connection will be created immeadiately"); break; } } } //当连接监听队列>0,即还有可用的连接监听器 while (cls.size() > 0);//end of do loop //OK, 我们不能够找到一个可以使用的连接,则新建一个 try { //创建一个新的连接。这里不需要判断是否已经到达max连接数 //因为前面已经获取了信号量,所以肯定可以创建连接。 cl = createConnectionEventListener(subject, cri); synchronized (cls) { //将创建的连接加入到checkout数组中。 checkedOut.add(cl); int size = (int) (maxSize - permits.permits()); //更新当前已经使用的最大连接数 if (size > maxUsedConnections) maxUsedConnections = size; } //如果连接池还没有启动,则初始化连接池,并设置值started为true。 //这里连接池可能被启用多次(因为非线程安全),但是这里没有危害。 if (started == false) { started = true; if (poolParams.minSize > 0) PoolFiller.fillPool(this); } if (trace) log.trace("supplying new ManagedConnection: " + cl); //通知connection listener是否它拥有权限 cl.grantPermit(true); return cl; } catch (Throwable t) { log.warn("Throwable while attempting to get a new connection: " + cl, t); //return permit and rethrow synchronized (cls) { checkedOut.remove(cl); } permits.release(); JBossResourceException.rethrowAsResourceException( "Unexpected throwable while trying to create a connection: " + cl, t); throw new UnreachableStatementException(); } } //这里的else操作,不能获取信号量,则抛出异常,报错连接池超时。 else { // we timed out throw new ResourceException("No ManagedConnections available within configured blocking timeout ( " + poolParams.blockingTimeout + " [ms] )"); } } catch (InterruptedException ie) { long end = System.currentTimeMillis() - startWait; connectionCounter.updateBlockTime(end); throw new ResourceException("Interrupted while requesting permit! Waited " + end + " ms"); } }
关于getConnetion的几点说明
1.blockingTimeout是一个jboss的参数:< blocking-timeout-millis >5000< /blocking-timeout-millis >,它是一个获取信号量的超时时间,更确切的说,是从连接池中获取一个连接的超时时间。如果超过5000ms,不能够获取到信号量(连接),则jboss会抛出异常: “can not get connection,No ManagedConnections available within configured blocking timeout [xx] ms”。当然,只要当前正在使用的连接数没有到达MAX值,这个信号量一定能够被获取到。因为信号量一共有MAX值个,如果连接池中当前的连接不够用时,在获取信号量之后,新创建一个连接即可。建议可以设置成可以设成500ms左右,不需要设计过大,当应用中存在多个数据源时,可以防止因DB异常线程池带来的阻塞。如果网络环境不好的话,可以设置的更高一点。
2.连接池内部就是一个连接监听队列,每次都从队列的尾部获取连接。而IdleRemove线程,则是从这个队列头部开始进行清理。
3.连接池的获取,销毁,创建,这三个操作都是线程安全的操作。
4.业务在使用连接的过程中,会一直占有这个信号量,在returnConnection或者发生异常时释放信号量。能够获取信号量,则意味着肯定可以获取到连接。第二节中,我们讲到JBOSS在启动时会初始化一个信号量数组,长度为连接池的max参数。
当业务系统使用完连接后,需要把连接放回到连接池中它的主要见下图,源代码如下:
public void returnConnection(ConnectionListener cl, boolean kill) { synchronized (cls) { /* * 判断连接是否已经被DESTROYED? * 可能有其它的线程如background-validation及shuwdown * 标记这个连接临听器为DESTORYED状态。 * */ if (cl.getState() == ConnectionListener.DESTROYED) { if (trace) log .trace("ManagedConnection is being returned after it was destroyed" + cl); //释放信号量,并直接返回 if (cl.hasPermit()) { // release semaphore cl.grantPermit(false); permits.release(); } return; } } if (trace) log.trace("putting ManagedConnection back into pool kill=" + kill + " cl=" + cl); try { //前台应用强制清理连接。 cl.getManagedConnection().cleanup(); } catch (ResourceException re) { log.warn("ResourceException cleaning up ManagedConnection: " + cl, re); //清理失败,抛出异常,清理失败。 kill = true; } synchronized (cls) { // 连接监听的状态为DESTROY或者DESTROYED,则设置kill为true if (cl.getState() == ConnectionListener.DESTROY || cl.getState() == ConnectionListener.DESTROYED) kill = true; //checkedOut队列中移除连接监听器。 checkedOut.remove(cl); //如果kill==false,并且连接数>=最大连接max值,说明异常发生,再次设置kill=true if (kill == false && cls.size() >= poolParams.maxSize) { log .warn("Destroying returned connection, maximum pool size exceeded " + cl); kill = true; } //kill连接 if (kill) { // Adrian Brock: A resource adapter can asynchronously notify us // that // a connection error occurred. // This could happen while the connection is not checked out. // e.g. JMS can do this via an ExceptionListener on the // connection. // I have twice had to reinstate this line of code, PLEASE DO // NOT REMOVE IT! cls.remove(cl); } //如果kill==false else { cl.used(); //这个连接监听不属于连接监听队列,则加入。 if (cls.contains(cl) == false) cls.add(cl); else log.warn("Attempt to return connection twice (ignored): " + cl, new Throwable("STACKTRACE")); } if (cl.hasPermit()) { //释放信号量 cl.grantPermit(false); permits.release(); } } if (kill) { if (trace) log.trace("Destroying returned connection " + cl); //销毁连接。 doDestroy(cl); } }
ReturnConnetion总结:
1.释放连接也是一个线程安全的操作。
2.在连接return时,有可能已经是destory的状态(前面第三节中讲到的SHUTDOWN操作,会对连接打上DESTORY的标记),这时,直接进行remove即可。
3.释放的连接若不属于连接监听队列(连接池),即加入到连接监听队列中(即连接池中)。
4.释放连接需要释放信号量。
5.在释放过程中,出现任何异常,则将连接从连接池中移除,并进行强制销毁。
JBOSS对于异常连接的处理:
默认情况下,JBOSS不会对无效的连接进行销毁。
如果我们需要对异常列表中的连接进行销毁,则需要在连接池的ds.xml中添加以下配置:
< exception-sorter-class-name> org.jboss.resource.adapter.jdbc.vendor.OracleExceptionSorter < /exception-sorter-class-name>
这个是ORACLE的异常列表,可以查看这个类里面定义了ORACLE的异常列表,当连接抛出这个列表中的错误时,即会进行销毁,如不能销毁,异常连接会一直存在连接池中。ORACLE异常列表如下:
public boolean isExceptionFatal(final SQLException e) { // I can't remember if the errors are negative or positive. final int error_code = Math.abs( e.getErrorCode() ); if( ( error_code == 28 ) //session has been killed || ( error_code == 600 ) //Internal oracle error || ( error_code == 1012 ) //not logged on || ( error_code == 1014 ) //Oracle shutdown in progress || ( error_code == 1033 ) //Oracle initialization or shutdown in progress || ( error_code == 1034 ) //Oracle not available || ( error_code == 1035 ) //ORACLE only available to users with RESTRICTED SESSION privilege || ( error_code == 1089 ) //immediate shutdown in progress - no operations are permitted || ( error_code == 1090 ) //shutdown in progress - connection is not permitted || ( error_code == 1092 ) //ORACLE instance terminated. Disconnection forced || ( error_code == 1094 ) //ALTER DATABASE CLOSE in progress. Connections not permitted || ( error_code == 2396 ) //exceeded maximum idle time, please connect again || ( error_code == 3106 ) //fatal two-task communication protocol error || ( error_code == 3111 ) //break received on communication channel || ( error_code == 3113 ) //end-of-file on communication channel || ( error_code == 3114 ) //not connected to ORACLE || ( error_code >= 12100 && error_code = 21000 ) && ( (error_text.indexOf("SOCKET") > -1) //for control socket error || (error_text.indexOf("CONNECTION HAS ALREADY BEEN CLOSED") > -1) || (error_text.indexOf("BROKEN PIPE") > -1) ) ) { return true; } return false; }
类似的,我们也可以找到一个MYSQL的异常列表, org.jboss.resource.adapter.jdbc.vendor.OracleExceptionSorter:
public boolean isExceptionFatal(SQLException e) { if (e.getSQLState() != null) { // per Mark Matthews at MySQL if (e.getSQLState().startsWith("08")) { return true; } } switch (e.getErrorCode()) { // Communications Errors case 1040: // ER_CON_COUNT_ERROR case 1042: // ER_BAD_HOST_ERROR case 1043: // ER_HANDSHAKE_ERROR case 1047: // ER_UNKNOWN_COM_ERROR case 1081: // ER_IPSOCK_ERROR case 1129: // ER_HOST_IS_BLOCKED case 1130: // ER_HOST_NOT_PRIVILEGED // Authentication Errors case 1045: // ER_ACCESS_DENIED_ERROR // Resource errors case 1004: // ER_CANT_CREATE_FILE case 1005: // ER_CANT_CREATE_TABLE case 1015: // ER_CANT_LOCK case 1021: // ER_DISK_FULL case 1041: // ER_OUT_OF_RESOURCES // Out-of-memory errors case 1037: // ER_OUTOFMEMORY case 1038: // ER_OUT_OF_SORTMEMORY return true; } return false; }
还有各种数据库的异常列表,都可以在自行配置,都定义在这个包下: org.jboss.resource.adapter.jdbc.vendor。
发表评论