关于多个事务并发执行的一个问题

今天群里一个哥们提了一个问题,他面试中遇到的,问题大概是这么个意思:
现在有10个任务,每个任务需要执行10s,用线程池并发处理,每个线程会插入数据,需求:只要有一个线程插入数据失败了,其他的所有线程的数据都需要回滚。
这个是一个并发事务问题,也可以说是一个分布式事务问题,解决方案也不少,这里基于一种两阶段提交的方式进行处理。
即这里主线程作为事务协调者,多个线程作为参与者,直到参与者全部执行了事务操作后会给协调者返回执行结果,在此期间协调者会一直阻塞,同时参与者也会阻塞直到协调者收到所有参与者的执行结果,只要有一个参与者执行失败,协调者就会通知所有参与者都进行回滚操作,反之则提交事务。
代码如下:
建表语句:

create table `abc` (
	`age` int (11)
); 

Mapper 接口:

/**
 * @author Dongguabai
 * @description
 * @date 2021-03-24 18:49
 */
public interface AbcMapper {

    @Insert("insert into abc values (#{val})")
    int insert(@Param("val")Object val);

}

测试类:

	@Autowired
    private AbcMapper abcMapper;

    @Autowired
    private PlatformTransactionManager transactionManager;

    private CountDownLatch countDownLatch = new CountDownLatch(10);

    private final Object lock = new Object();

    private volatile boolean commit = true;

    @Test
    public void tes() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for (int i = 1; i <= 10; i++) {
            int a = i;
            executorService.execute(() -> {
                Object value = a == 10 ? "a" : a;
                //Object value = a ;
                insert(value);
            });
        }
        countDownLatch.await();
        synchronized (lock){
            lock.notifyAll();
        }
    }

    private void insert(Object value) {
        TransactionStatus txStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());
        try {
            abcMapper.insert(value);
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
            countDownLatch.countDown();
        } catch (Exception e) {
            countDownLatch.countDown();
            commit = false;
            System.out.println(Thread.currentThread().getName() + "->error");
        }
        synchronized (lock) {
            try {
                System.out.println(Thread.currentThread().getName()+"->wait");
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName()+"->do");
        System.out.println("commit:"+commit);
        if (commit) {
            transactionManager.commit(txStatus);
            System.out.println(Thread.currentThread().getName() + "->commit");
            return;
        }
        try {
            transactionManager.rollback(txStatus);
        }catch (Exception e){
            System.out.println("exception:"+e.getLocalizedMessage());
        }
        System.out.println(Thread.currentThread().getName() + "->rollback");
    }

执行结果:

pool-4-thread-10->error
pool-4-thread-10->wait
pool-4-thread-8->wait
pool-4-thread-2->wait
pool-4-thread-6->wait
pool-4-thread-5->wait
pool-4-thread-9->wait
pool-4-thread-1->wait
pool-4-thread-4->wait
pool-4-thread-3->wait
pool-4-thread-7->wait
pool-4-thread-7->do
pool-4-thread-3->do
commit:false
pool-4-thread-5->do
pool-4-thread-2->do
commit:false
pool-4-thread-1->do
commit:false
pool-4-thread-10->do
pool-4-thread-4->do
commit:false
pool-4-thread-6->do
commit:false
pool-4-thread-8->do
commit:false
pool-4-thread-9->do
commit:false
commit:false
commit:false
commit:false
pool-4-thread-10->rollback
pool-4-thread-1->rollback
pool-4-thread-5->rollback
pool-4-thread-6->rollback
pool-4-thread-2->rollback
pool-4-thread-8->rollback
pool-4-thread-4->rollback
pool-4-thread-7->rollback
pool-4-thread-9->rollback
pool-4-thread-3->rollback

同时 abc 表中也并未插入数据,实现了问题中的需求。

欢迎关注公众号:
在这里插入图片描述