spring jdbc源码

2018-02-09 21:18:57

源码分析基于spring 4.3.x

文章主要记录看spring jdbc源码时的一些关键点。主要关注如下几点

  1. 事务传播性
  2. 数据库连接
  3. 动态代理

事务传播性

transaction.png

spring事务管理的核心在于PlatformTransactionManager, PlatformTransactionManager中的getTransaction/commit方法提供了spring事务管理的基石。
PlatformTransactionManager的基本逻辑由AbstractPlatformTransactionManager实现

看看AbstractPlatformTransactionManager.getTransaction

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    // 模板方法,由子类实现
    Object transaction = doGetTransaction();

    // 已存在事务
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // PROPAGATION_MANDATORY:如果当前没有事务,就抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);

        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 创建一个TransactionStatus
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 开启一个事务                    
            doBegin(transaction, definition);
            // 绑定事务信息到当前线程中
            prepareSynchronization(status, definition);
            return status;
        }
        ...
    }
    else {
        // SUPPORTS:如果当前没有事务,就以非事务方式执行
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}

看看已存在事务时,不同事务传播性的处理方案

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {
    // PROPAGATION_NEVER: 已有事务就抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

    // PROPAGATION_NOT_SUPPORTED: 已有事务就挂起事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // PROPAGATION_REQUIRES_NEW: 每次都要一个新事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        ...
    }
    // PROPAGATION_NESTED: 嵌套事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        ...

    }


    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

数据库连接

doGetTransaction负责创建Transaction,由子类实现,看看DataSourceTransactionManager.doGetTransaction

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();    
    txObject.setSavepointAllowed(isNestedTransactionAllowed());        // 设置Savepoint
    // 获取数据库连接
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

TransactionSynchronizationManager.getResource主要调用doGetResource方法

private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    Object value = map.get(actualKey);
    ...
    return value;
}

主要这里的 resources属性是ThreadLocal<Map<Object, Object>> resources,spring jdbc就是通过ThreadLocal上下文存储每一线程上,每一个dataSource和对应的Connection。

doBegin负责开启数据库事务,它也是模板方法,由DataSourceTransactionManager实现

protected void doBegin(Object transaction, TransactionDefinition definition) {
    // 获取数据库连接
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        // 创建数据库连接
        if (txObject.getConnectionHolder() == null ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            Connection newCon = this.dataSource.getConnection();
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        // 数据库操作
        ...

        // 绑定数据库连接
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(getDataSource(),         txObject.getConnectionHolder());
        }
     catch (Throwable ex) {
        ...
     }
}

从doBegin可以看到,如果当前上下文中没有Connection,会创建一个Connection,最后会绑定到上下文中。

动态代理

我们都知道,只要在spring 配置中开启事务管理tx:annotation-driven, 然后在spring bean的方法上添加注解@Transactional, spring就会为我们管理事务,那么spring如何做到的呢?

从TxNamespaceHandler看到AnnotationDrivenBeanDefinitionParser解析标签annotation-driven

public BeanDefinition parse(Element element, ParserContext parserContext) {
    registerTransactionalEventListenerFactory(parserContext);
    String mode = element.getAttribute("mode");
    if ("aspectj".equals(mode)) {
        // mode="aspectj"
        registerTransactionAspect(element, parserContext);
    }
    else {
        // mode="proxy"
        AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
    }
    return null;
}

下面只关注mode为”proxy”的处理,看看AopAutoProxyConfigurer的configureAutoProxyCreator

public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
    AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

    String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
    if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
        Object eleSource = parserContext.extractSource(element);

        // Create the TransactionAttributeSource definition.
        RootBeanDefinition sourceDef = new RootBeanDefinition(
                "org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
        ...
        // Create the TransactionInterceptor definition.
        RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
        ...

        // Create the TransactionAttributeSourceAdvisor definition.
        RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
        ...
    }
}

代码很多,但主要是创建了TransactionAttributeSource/TransactionInterceptor/TransactionAttributeSourceAdvisor definition。
注意,AopNamespaceUtils.registerAutoProxyCreatorIfNecessary会调用AopConfigUtils.registerAutoProxyCreatorIfNecessary

public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, Object source) {
    return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}

InfrastructureAdvisorAutoProxyCreator继承了AbstractAdvisorAutoProxyCreator,正是它负责完成动态代理工作。

看看关键的advisor类BeanFactoryTransactionAttributeSourceAdvisor,它聚合了切入点TransactionAttributeSourcePointcut,

public boolean matches(Method method, Class<?> targetClass) {
    if (TransactionalProxy.class.isAssignableFrom(targetClass)) {
        return false;
    }
    TransactionAttributeSource tas = getTransactionAttributeSource();
    return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

getTransactionAttribute获取目标方法注解Transactional中的属性,如timeout/readOnly等。

拦截类TransactionInterceptor实现了MethodInterceptor,它负责实际的事务管理工作。 核心方法为

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
        //声明式事务处理
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
        //编程式事务处理
        else {
            ...
        }
    }

completeTransactionAfterThrowing会根据异常类型处理。

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.hasTransaction()) {
        // 支持回滚的异常,进行回滚操作
        if (txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            }
            ...

        }
        // 不支持回滚的异常,继续提交
        else {
            // We don't roll back on this exception.
            // Will still roll back if TransactionStatus.isRollbackOnly() is true.
            try {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
            ...
        }
    }
}