事务那些事儿

事务,保证数据一致性,非常重要,开发人员必备知识点。看看它的定义[1]

数据库事务(简称:事务)是数据库管理系统执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成。

再复习一下它的特性:

并非任意的对数据库的操作序列都是数据库事务。数据库事务拥有以下四个特性,习惯上被称之为ACID特性

  • 原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
  • 一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。
  • 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
  • 持久性(Durability):已被提交的事务对数据库的修改应该永久保存在数据库中。

话不多说,简单整理总结一下这些年来遇到的一些事务(数据库事务或者框架层的事务)问题及一些重要知识点。

问题

分布式锁和事务的顺序问题

一个经典问题,事务须在分布式锁内部

如果事务在分布式锁的外部,那么考虑这样一种场景:线程A和线程B出现竞争,均先开启了事务。然后A先获取到锁,B则阻塞等待。A执行完业务逻辑,释放了锁之后才会进行事务提交。那么,在A的事务未提交完成时,B可能已经获取了锁,进入了业务逻辑查到了A提交前的数据或者重复写入了数据(如果表没有唯一键之类的控制)。

因此,把事务放在分布式锁的内部,在事务提交完成之后再释放锁,能规避此类问题。

Spring注解式事务不生效

主要是两种情况:自调用(self-invocation)或者注解放在private方法上。

放在private方法上:

@Transactional
private void doTransaction() {
    ...
}

自调用:

public void doSomething() {
    this.doTransaction();
}

@Transactional
public void doTransaction() {
    ...
}

这两种情况事务都不会生效(默认Spring AOP的动态代理机制下)。直接看官方说明[2]

Method visibility and @Transactional in proxy mode

The @Transactional annotation is typically used on methods with public visibility. As of 6.0, protected or package-visible methods can also be made transactional for class-based proxies by default. Note that transactional methods in interface-based proxies must always be public and defined in the proxied interface. For both kinds of proxies, only external method calls coming in through the proxy are intercepted.

...

In proxy mode (which is the default), only external method calls coming in through the proxy are intercepted. This means that self-invocation (in effect, a method within the target object calling another method of the target object) does not lead to an actual transaction at runtime even if the invoked method is marked with @Transactional. Also, the proxy must be fully initialized to provide the expected behavior, so you should not rely on this feature in your initialization code — e.g. in a @PostConstruct method.

即注解必须放在public方法上且必须是外部对象触发的方法调用,代理才会被触发,进而事务才会生效。

Spring的TransactionTemplate

在某些场景下,可能不想使用@Transactional注解来控制事务,那么可以使用TransactionTemplate手动开启更小粒度的事务,而不用拆分太多方法和类。

查看TransactionTemplate的源码可以知道,它也是遵循Spring的事务传播机制的,但是得通过构造器手动指定。Spring初始化的默认bean的传播机制就会执行默认的REQUIRED - 如果当前没有事务,则自己新建一个事务,如果当前存在事务,则加入这个事务

因此,推荐这两种方式不要混用,很容易产生事务控制不当的问题,因为可能正常情况下都不会单独实例化TransactionTemplate且改变其传播属性。譬如之前就遇到过这样的代码,外层@Transactional默认机制,内层TransactionTemplate手动控制也是默认机制,那就起不到内外隔离的作用。

TransactionTemplate构造器:

public TransactionTemplate(PlatformTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
    // 使用transactionDefinition可以指定此模板bean的事务传播机制和隔离级别
    super(transactionDefinition);
    this.transactionManager = transactionManager;
}

TransactionTemplate#execute方法:

public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

    if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager cpptm) {
        return cpptm.execute(this, action);
    }
    else {
        // 这里会去查找上下文的事务或者创建一个新事务
        TransactionStatus status = this.transactionManager.getTransaction(this);
        T result;
        try {
            result = action.doInTransaction(status);
        }
        catch (RuntimeException | Error ex) {
            // Transactional code threw application exception -> rollback
            rollbackOnException(status, ex);
            throw ex;
        }
        catch (Throwable ex) {
            // Transactional code threw unexpected exception -> rollback
            rollbackOnException(status, ex);
            throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
        }
        this.transactionManager.commit(status);
        return result;
    }
}

AbstractPlatformTransactionManager#getTransaction方法:

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        // 此方法check事务传播机制的配置,如无指定,则默认 PROPAGATION_REQUIRED
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    ...
}

MyBatis流式查询和Spring事务

此处指MyBatis指定ResultHandler的所谓流式查询。由于其特性,SqlSession开启后,直到查询完毕才会关闭。

以下是我遇到的一个问题:PostgreSQL数据库,利用这种流式查询在ResultHandler中对数据进行批处理,要求每批数据的事务处理相互独立。使用了Spring的事务注解,但使用了默认的传播机制REQUIRED。当某一批次出现问题时,会抛异常回滚事务,然后继续处理下一批。然而这批数据正常处理完毕,提交事务时会抛错:

current transaction is aborted, commands ignored until end of transaction block.

显然,上一批数据虽然抛出异常但实际并未回滚成功,导致事务处于了aborted的状态而无法提交。解决方案很简单,指定事务的传播级别为REQUIRES_NEW,或者直接使用SqlSession手动控制每次的commit和rollback。猜测是因为在流式查询中SqlSession一直未关闭?

当然,问题目前只出现在PostgreSQL数据库中,其他数据库是否也会有类似的问题,有待进一步验证。

模拟代码如下:

public class Task {
   	public void execute() {  
        try (SqlSession sqlSession = sqlSessionFactory.openSession(false)) {
            sqlSession.select(statement, (resultContext) -> {
                Object obj = resultContext.getResultObject();
                // 这里假定一次只处理一条,实际上是每批次多条
                service.doSomething(obj);
        });
    }
}

public class Service {
    // open a new transaction use spring annotation
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void doSomething(Object obj) {
        // do something
        ...
    }

    // or use sqlSession
    public void doSomething(SqlSession sqlSession, Object obj) {
        try {
            // do something
            ...
            sqlSession.commit();
        } catch(Throwable ex) {
            sqlSession.rollback();
        }
    }
}

PostgreSQL数据库事务aborted

正如上一节提到的报错:

current transaction is aborted, commands ignored until end of transaction block.

PostgreSQL数据库事务有一个特性,如果一个SQL出错后,事务就会变成aborted的状态,此时必须调用rollback或者abort[3]命令来回滚事务。所以,如果想实现类似下面这种功能,那必须得提前设置一个savepoint便于回滚或者修改数据库连接参数

public void doSomething(SqlSession sqlSession) {
    // 第一个更新操作
    mapper1.update(...);
    // 第一个更新之后设置一个savePoint
    Connection connection = sqlSession.getConnection();
    Savepoint savepoint = connection.getSavepoint();
    try {
        // 尝试插入,如果抛出DuplicateKey的异常,则更新
        mapper2.insert(...);
    } catch(DuplicateKeyException ex) {
        // 先rollback至savepoint,再更新。否则会抛错
        connection.rollback(savepoint);
        mapper2.update(...);
    }
}

MyBatis事务管理器和ShardingJdbc

MyBatis提供了几种事务管理器,常用的可能就是JdbcTransactionSpringManagedTransaction,后者在引入了mybatis-spring的整合包之后才会有。

一般情况下Spring和MyBatis集成,默认都是使用的SpringManagedTransaction。实例化org.mybatis.spring.SqlSessionFactoryBean的时候可以看出来:

// SqlSessionFactoryBean#buildSqlSessionFactory

protected SqlSessionFactory buildSqlSessionFactory() throws Exception {
    ...
    // 默认情况下指定SpringManagedTransactionFactory
    targetConfiguration.setEnvironment(new Environment(this.environment,
            this.transactionFactory == null ? new SpringManagedTransactionFactory() : this.transactionFactory,
            this.dataSource));
    ...
}

所以,如果集成了mybatis-spring,但又想要手动控制SqlSession(autoCommit置为false,在需要的时候手动提交和回滚),那么一定记得使用JdbcTransaction。因为SpringManagedTransaction每次开启connection时,总是会从connection中取autoCommit值,可能导致手动控制事务失效(目前此处的问题是和ShardingJdbc的集成有关)。下面来追踪一下。

先来看看JdbcTransaction的源码:

public class JdbcTransaction implements Transaction {
  
  // 持有autoCommit属性,未指定默认值
  protected boolean autoCommit

  // 构造器提供了autoCommit参数
  public JdbcTransaction(DataSource ds, TransactionIsolationLevel desiredLevel, boolean desiredAutoCommit) {
    this(ds, desiredLevel, desiredAutoCommit, false);
  }

  public JdbcTransaction(DataSource ds, TransactionIsolationLevel desiredLevel, boolean desiredAutoCommit, boolean skipSetAutoCommitOnClose) {
    dataSource = ds;
    level = desiredLevel;
    autoCommit = desiredAutoCommit;
    this.skipSetAutoCommitOnClose = skipSetAutoCommitOnClose;
  }

  protected void openConnection() throws SQLException {
    if (log.isDebugEnabled()) {
      log.debug("Opening JDBC Connection");
    }
    connection = dataSource.getConnection();
    if (level != null) {
      connection.setTransactionIsolation(level.getLevel());
    }
    setDesiredAutoCommit(autoCommit);
  }

  protected void setDesiredAutoCommit(boolean desiredAutoCommit) {
    try {
      if (connection.getAutoCommit() != desiredAutoCommit) {
        if (log.isDebugEnabled()) {
          log.debug("Setting autocommit to " + desiredAutoCommit + " on JDBC Connection [" + connection + "]");
        } 
        // 此处会重置autoCommit的值为transaction实例化时指定的值。如果手动开启事务时指定了false,那么这里就一定会被置为false
        connection.setAutoCommit(desiredAutoCommit);
      }
    } catch (SQLException e) {
      // Only a very poorly implemented driver would fail here,
      // and there's not much we can do about that.
      throw new TransactionException("Error configuring AutoCommit.  "
          + "Your driver may not support getAutoCommit() or setAutoCommit(). "
          + "Requested setting: " + desiredAutoCommit + ".  Cause: " + e, e);
    }
  }
}

再来SpringManagedTransaction的源码:

public class SpringManagedTransaction implements Transaction {

  // 持有autoCommit属性,未指定默认值
  private boolean autoCommit;

  // 构造器未提供autoCommit参数
  public SpringManagedTransaction(DataSource dataSource) {
    notNull(dataSource, "No DataSource specified");
    this.dataSource = dataSource;
  }
 
  // 这里有关于autoCommit的解释,为什么每次开启都需要从connection中取值
  /**
   * Gets a connection from Spring transaction manager and discovers if this {@code Transaction} should manage
   * connection or let it to Spring.
   * <p>
   * It also reads autocommit setting because when using Spring Transaction MyBatis thinks that autocommit is always
   * false and will always call commit/rollback so we need to no-op that calls.
   */
  private void openConnection() throws SQLException {
    this.connection = DataSourceUtils.getConnection(this.dataSource);
    // 这里会从connection中取值,那么手动开启事务时指定的autoCommit=false,可能就会失效
    this.autoCommit = this.connection.getAutoCommit();
    this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);

    LOGGER.debug(() -> "JDBC Connection [" + this.connection + "] will"
        + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
  }

  public void commit() throws SQLException {
    if (this.connection != null && !this.isConnectionTransactional && !this.autoCommit) {
      LOGGER.debug(() -> "Committing JDBC Connection [" + this.connection + "]");
      this.connection.commit();
    }
  }

  public void rollback() throws SQLException {
    if (this.connection != null && !this.isConnectionTransactional && !this.autoCommit) {
      LOGGER.debug(() -> "Rolling back JDBC Connection [" + this.connection + "]");
      this.connection.rollback();
    }
  }
}

从注释来看,这里会出现两种情况:一种是事务完全交给Spring管理,那这里autoCommit自然不为true且总是会调用commit或者rollback方法提交或回滚。另一种情况则由connection本身来决定,Spring无需干涉。

所以,当引入了ShardingSphere并完全使用ShardingSphereDataSource来进行数据源的管理时,问题就出现了。因为Connection的实现类会被替换成ShardingSphereConnection

public final class ShardingSphereConnection extends AbstractConnectionAdapter {
	// 这里,autoCommit默认为true
    private boolean autoCommit = true;
}

那么这里默认为true,再结合上面对SpringManagedTransaction的分析,不难看出会忽略手动开启事务时设置的autoCommit值,事务也随之失效。

issue上找到一些官方的回复,可以知道此处的默认值跟其底层设计理念有关:

最后再梳理下,可以看出其实MyBatis配置错误才是根因:

  • MyBatis指定事务控制实现时,未正确配置对应的实现类,想手动控制事务却指定了SpringManagedTransaction的实现

  • 在配置错误的情况下,引入ShardingJdbc,再由于ShardingSphereConnection的实现对于autoCommit值的处理有其特殊性,最终导致事务失效

当然,除了替换Transaction的实现,也可以通过下面这种方式,直接使用connection来手动控制(并不推荐):

要点

Spring事务传播

老生常谈,复习下Spring的几种事务传播机制:

public enum Propagation {

	/**
	 * Support a current transaction, create a new one if none exists.
	 * Analogous to EJB transaction attribute of the same name.
	 * <p>This is the default setting of a transaction annotation.
	 */
    // 如果上下文存在事务,则加入当前事务。如果不存在,则创建一个新事务并执行
	REQUIRED(TransactionDefinition.PROPAGATION_REQUIRED),

	/**
	 * Support a current transaction, execute non-transactionally if none exists.
	 * Analogous to EJB transaction attribute of the same name.
	 * <p>Note: For transaction managers with transaction synchronization,
	 * {@code SUPPORTS} is slightly different from no transaction at all,
	 * as it defines a transaction scope that synchronization will apply for.
	 * As a consequence, the same resources (JDBC Connection, Hibernate Session, etc)
	 * will be shared for the entire specified scope. Note that this depends on
	 * the actual synchronization configuration of the transaction manager.
	 * @see org.springframework.transaction.support.AbstractPlatformTransactionManager#setTransactionSynchronization
	 */
    // 如果上下文存在事务,则加入当前事务。如果不存在,则以非事务的方式执行
	SUPPORTS(TransactionDefinition.PROPAGATION_SUPPORTS),

	/**
	 * Support a current transaction, throw an exception if none exists.
	 * Analogous to EJB transaction attribute of the same name.
	 */
    // 如果上下文存在事务,则加入当前事务。如果不存在,则抛出异常
	MANDATORY(TransactionDefinition.PROPAGATION_MANDATORY),

	/**
	 * Create a new transaction, and suspend the current transaction if one exists.
	 * Analogous to the EJB transaction attribute of the same name.
	 * <p><b>NOTE:</b> Actual transaction suspension will not work out-of-the-box
	 * on all transaction managers. This in particular applies to
	 * {@link org.springframework.transaction.jta.JtaTransactionManager},
	 * which requires the {@code jakarta.transaction.TransactionManager} to be
	 * made available to it (which is server-specific in standard Jakarta EE).
	 * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
	 */
    // 总是创建一个新事务(完全与上下文隔离的事务),不管上下文是否已存在事务。如果上下文已存在事务,则挂起它,等待新事务执行完毕再恢复执行
	REQUIRES_NEW(TransactionDefinition.PROPAGATION_REQUIRES_NEW),

	/**
	 * Execute non-transactionally, suspend the current transaction if one exists.
	 * Analogous to EJB transaction attribute of the same name.
	 * <p><b>NOTE:</b> Actual transaction suspension will not work out-of-the-box
	 * on all transaction managers. This in particular applies to
	 * {@link org.springframework.transaction.jta.JtaTransactionManager},
	 * which requires the {@code jakarta.transaction.TransactionManager} to be
	 * made available to it (which is server-specific in standard Jakarta EE).
	 * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
	 */
    // 总是以非事务方式执行。如果上下文已存在事务,则挂起它
	NOT_SUPPORTED(TransactionDefinition.PROPAGATION_NOT_SUPPORTED),

	/**
	 * Execute non-transactionally, throw an exception if a transaction exists.
	 * Analogous to EJB transaction attribute of the same name.
	 */
    //总是以非事务方式执行。如果上下文已存在事务,则抛出异常
	NEVER(TransactionDefinition.PROPAGATION_NEVER),

	/**
	 * Execute within a nested transaction if a current transaction exists,
	 * behave like {@code REQUIRED} otherwise. There is no analogous feature in EJB.
	 * <p>Note: Actual creation of a nested transaction will only work on specific
	 * transaction managers. Out of the box, this only applies to the JDBC
	 * DataSourceTransactionManager. Some JTA providers might support nested
	 * transactions as well.
	 * @see org.springframework.jdbc.datasource.DataSourceTransactionManager
	 */
    // 如果上下文存在事务,则以嵌套子事务的方式执行。否则就同PROPAGATION_REQUIRED
	NESTED(TransactionDefinition.PROPAGATION_NESTED);

}

关于最后一个NESTED的解释,可以看看官方文档[4]的说明:

PROPAGATION_NESTED uses a single physical transaction with multiple savepoints that it can roll back to. Such partial rollbacks let an inner transaction scope trigger a rollback for its scope, with the outer transaction being able to continue the physical transaction despite some operations having been rolled back. This setting is typically mapped onto JDBC savepoints, so it works only with JDBC resource transactions. See Spring’s DataSourceTransactionManager.

区别于PROPAGATION_REQUIRES_NEW - 创建和上下文事务完全隔离的新事务,NESTED是创建一个所谓的”嵌套“事务,它是上下文事务的一部分,只有上下文事务提交后才会提交。回滚依赖设置的savepoints,回滚后不会影响到上下文事务的继续执行。只能作用于JDBC资源事务。

数据库事务隔离[5]

事务隔离(英语:Transaction Isolation)定义了数据库系统中一个事务中操作的结果在何时以何种方式对其他并发事务操作可见。隔离是事务ACID(原子性、一致性、隔离性、持久性)四大属性之一。

java.sql.Connection类中有对应的常量定义:

public interface Connection  extends Wrapper, AutoCloseable {
    /**
     * A constant indicating that transactions are not supported.
     */
    // 不支持事务
    int TRANSACTION_NONE             = 0;

    /**
     * A constant indicating that
     * dirty reads, non-repeatable reads and phantom reads can occur.
     * This level allows a row changed by one transaction to be read
     * by another transaction before any changes in that row have been
     * committed (a "dirty read").  If any of the changes are rolled back,
     * the second transaction will have retrieved an invalid row.
     */
    // 读未提交:脏读,不可重复读和幻读都可能发生
    int TRANSACTION_READ_UNCOMMITTED = 1;

    /**
     * A constant indicating that
     * dirty reads are prevented; non-repeatable reads and phantom
     * reads can occur.  This level only prohibits a transaction
     * from reading a row with uncommitted changes in it.
     */
    // 读已提交:可以避免脏读,但是不可重复读和幻读仍可能发生
    int TRANSACTION_READ_COMMITTED   = 2;

    /**
     * A constant indicating that
     * dirty reads and non-repeatable reads are prevented; phantom
     * reads can occur.  This level prohibits a transaction from
     * reading a row with uncommitted changes in it, and it also
     * prohibits the situation where one transaction reads a row,
     * a second transaction alters the row, and the first transaction
     * rereads the row, getting different values the second time
     * (a "non-repeatable read").
     */
    // 可重复读:可以避免脏读和不可重复读,但是幻读可能发生
    int TRANSACTION_REPEATABLE_READ  = 4;

    /**
     * A constant indicating that
     * dirty reads, non-repeatable reads and phantom reads are prevented.
     * This level includes the prohibitions in
     * {@code TRANSACTION_REPEATABLE_READ} and further prohibits the
     * situation where one transaction reads all rows that satisfy
     * a {@code WHERE} condition, a second transaction inserts a row that
     * satisfies that {@code WHERE} condition, and the first transaction
     * rereads for the same condition, retrieving the additional
     * "phantom" row in the second read.
     */
   	// 串行:脏读、不可重复读和幻读均可避免。但性能较低
    int TRANSACTION_SERIALIZABLE     = 8;

}

分布式事务

理论上的东西不多谈,主要整理下用过的两种分布式事务方案:

  • 基于可靠消息的最终一致性
  • 类TCC机制 + 补偿任务

基于可靠消息的最终一致性

主要是基于RocketMQ提供的事务消息,可以参见这篇文章:性能优化实践小记

类TCC机制 + 补偿任务

配置多数据源的服务,可以类似下面的处理逻辑:

  1. A库开启事务1,执行一条插入,但其状态为初始状态(待生效/草稿/处理中)。B库开启事务2执行其逻辑,亦插入一条数据,最终状态(有效/已提交/处理完成)。正常情况下的业务异常,直接抛出并回滚所有的事务。
  2. 提交事务:
    • 提交事务1,如果事务1提交失败则同时回滚事务2,AB库均不会新增数据。
    • 事务1提交成功,再提交事务2。事务2提交成功后,再开启事务3,更新A库数据状态为最终状态。事务3提交成功,则整个事务才算成功。
    • 事务2提交失败,则抛出异常后回滚(事务1不受影响)。此时需引入补偿任务,扫描A库中初始状态的数据,查B库数据是否已存在。如果不存在,则重试执行事务2。成功后,更新A库的数据为最终状态。如果已存在,则直接更新A库数据即可。重试超过一定失败次数后需引入人工处理。
  3. 回滚:目前不存在自动回滚,一直补偿直至成功或者人工介入。

对非多数据源的服务,需要通过RPC调用而出现的分布式事务问题,类似的处理,但会多一个回滚。

  1. A服务开启事务1,写入数据,调RPC接口触发B服务的事务2,写入数据,但状态为初始状态。
  2. RPC调用成功后,事务1提交。RPC调用失败或者提交失败,回滚事务1。(此时可能出现RPC超时返回失败,但B服务已处理成功的情况,需要补偿任务)
  3. 事务1提交成功后,再次调用RPC接口,更新B服务的数据为最终状态。
  4. 补偿任务,扫描B服务中初始状态的数据,反查A服务,如果存在,则更新为最终状态。如果不存在,则回滚,删除初始状态的数据。

上述情况均需保证数据的幂等,已处于初始状态或者最终状态则认为已经提交成功,直接返回。



事务那些事儿
https://luckycaesar.github.io/article/事务那些事儿/
作者
LuckyCaesar
发布于
2024年1月13日
许可协议