分布式事务系列(3.2)jotm分布式事务源码分析

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

1 系列目录

2 了解xapool

我们在前一篇文章中了解到jotm配合xapool共同完成了分布式事务。jotm主要提供了事务管理器TransactionManager的功能。而xapool则通过使用非XA数据库驱动实现了XA数据库驱动的效果。深入了解xapool之前,我们需要认识下XA数据库驱动到底是什么

2.1 XA数据库驱动

2.1.1 DataSource和ConnectionPoolDataSource

先来看下javax.sql.DataSource接口内容:

public interface DataSource  extends CommonDataSource,Wrapper {
  Connection getConnection() throws SQLException;
  Connection getConnection(String username, String password)
    throws SQLException;
}

DataSource就是提供Connection的。再来看下ConnectionPoolDataSource接口内容

public interface ConnectionPoolDataSource  extends CommonDataSource {
  PooledConnection getPooledConnection() throws SQLException;
  PooledConnection getPooledConnection(String user, String password)
    throws SQLException;
 }

ConnectionPoolDataSource就是提供PooledConnection,它代表了一个与数据库的物理连接,接口如下:

    public interface PooledConnection {
        Connection getConnection() throws SQLException;
      }

这个物理连接能够产生Connection。Connection作为PooledConnection的一个handle,即PooledConnection可以产生多个Connection来供使用。总结如下:

ConnectionPoolDataSource-》PooledConnection-》Connection

DataSource-》Connection

更多详细内容见JDBC分布式事务浅析

总之需要带着怀疑的眼光去看待问题,以上看法不一定对。如果想深入探究,就需要各位去仔细去斟酌。

2.1.2 XADataSource

再来看下javax.sql.XADataSource的接口内容:

public interface XADataSource extends CommonDataSource {
  XAConnection getXAConnection() throws SQLException;
  XAConnection getXAConnection(String user, String password)
    throws SQLException;
 }

XADataSource就是提供XAConnection的,什么是XAConnection呢?

    public interface XAConnection extends PooledConnection {
          javax.transaction.xa.XAResource getXAResource();
        }

可以看到XAConnection不仅能够获取Connection(PooledConnection的功能),还能获取与数据库通信的一个代表XAResource(上上一篇文章已说明,见AP、TM、RM三大对象

有了这个通信代表,我们就可以与数据库进行交互,实现两阶段提交协议。

2.1.3 什么是XA数据库驱动

即实现了XADataSource接口的数据库驱动,它能够为我们创建XAConnection,有了XAConnection我们既能获取普通常见的Connection,又能获取XAResource,实现与数据库的交互,进而可以实现两阶段提交协议。

所以XA数据库驱动的最重要核心就是:有了XAResource的实现,能与数据库进行双向交互

目前大部分数据库都是支持XA驱动的,如

  • 对mysql来说,我们所用的mysql-connector-java jar包,就是支持的,它所提供的XADataSource实现是: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
  • 对oracle来说,它所提供的XADataSource实现是: oracle.jdbc.xa.client.OracleXADataSource

2.2 xapool的内容

XA数据库驱动按道理上讲,应该是数据库来负责提供的。xapool就是在数据库驱动不是XA驱动的前提下,模拟了XA驱动。这种模拟情况下,与数据库的交互仍然是单向的。

2.2.1 xapool的整体功能

  • Generic Pool:一个pool的功能,用来存储Object
  • StandardDataSource和StandardXADataSource,分别用于产生Connection和XAConnection
  • StandardPoolDataSource和StandardXAPoolDataSource 对上述那一对dataSource都加上了Generic Pool的功能,使用pool来维护和管理Connection(实际上是PooledConnection,后文会说到)和XAConnection

2.2.2 认识GenericPool

它实现了一个pool的功能,和其他的pool的大体功能都差不多,它可以存储和管理任何对象。只要你实现了相应的接口,这里即PoolHelper接口,看下GenericPool的构造函数:

public GenericPool(PoolHelper helper) {
    this(
        helper,
        DEFAULT_MINSIZE,
        DEFAULT_MAXSIZE,
        DEFAULT_EXPIRATION,
        DEFAULT_SLEEPTIME);
}

GenericPool需要一个PoolHelper和一些参数设置(pool中object个数的最大值、最小值等)。来看下PoolHelper:

public interface PoolHelper {
    public void expire(Object o); // object specific work to kill the object
    public boolean checkThisObject(Object o);
    // check if the object is still valid
    public boolean testThisObject(Object o); // check if the object is closed
    public GenerationObject create() throws SQLException;
    public GenerationObject create(String _user, String _password)
        throws SQLException;
    public String toString();
}

可以看到GenericPool通过PoolHelper对外暴漏出一些方法,通过PoolHelper的create方法将创建的object存放至pool中,通过PoolHelper的expire方法将pool中的object真正意义上的删除掉(而不仅仅是从pool中移除)。

2.2.3 StandardDataSource和StandardXADataSource

StandardDataSource需要实现DataSource的功能,即能够获取Connection。这个实现还是比较简单的,就是利用jdbc原始方式DriverManager来获取,如下:

Connection conn=DriverManager.getConnection(url, prop)

StandardXADataSource需要实现XADataSource的功能,即能够获取XAConnection,同时通过XAConnection能获取XAResource。

这里的XAResource留到和jotm的事务管理器一起来说。

2.2.4 StandardPoolDataSource和StandardXAPoolDataSource

StandardPoolDataSource:内部拥有一个GenericPool,它自己实现了PoolHelper,我们来看看它把什么对象交给pool来管理了,即看看它怎么实现PoolHelper的create方法的:

StandardPoolDataSource向pool中存放的对象

它内部拥有一个ConnectionPoolDataSource对象,即图片中cpds对象,它能够产生PooledConnection(上文已说过,可以回去查看)。

可以看到向pool中存放的对象,并不是Connection,而是PooledConnection,上文说过,它是与数据库的一个物理连接,它能够产生Connection。

总结一下就是:StandardPoolDataSource利用内部ConnectionPoolDataSource对象来创建PooledConnection,然后把创建的PooledConnection交给内部的GenericPool来维护和管理。

StandardXAPoolDataSource:内部拥有一个GenericPool,它自己实现了PoolHelper,我们来看看它把什么对象交给pool来管理了,即看看它怎么实现PoolHelper的create方法的:

StandardXAPoolDataSource向pool中存放的对象

它内部拥有一个XADataSource对象,即图片中的xads,它能够产生XAConnection(上文已说过,可以回去查看)。

可以看到向pool中存放的对象是XAConnection。

总结一下就是:StandardXAPoolDataSource利用内部的XADataSource对象来创建XAConnection,然后把创建的XAConnection交给内部的GenericPool来维护和管理。

3 jotm的事务管理器

3.1 jotm要实现的JTA接口

jotm需要实现的JTA定义的接口有:

  • javax.transaction.UserTransaction:面向开发者的使用接口(jotm的实现是org.objectweb.jotm.Current)
  • javax.transaction.TransactionManager:事务管理器接口(jotm的实现是org.objectweb.jotm.Current)
  • javax.transaction.Transaction:事务接口(jotm的实现是org.objectweb.jotm.TransactionImpl)
  • javax.transaction.xa.Xid:全局事务的唯一标示(jotm的实现是org.objectweb.jotm.XidImpl)

3.2 UserTransaction开启一个事务

即org.objectweb.jotm.Current实现begin方法的源码内容:

  • 第一步:创建一个全局的唯一标示Xid

    XidImpl otid = new XidImpl();

  • 第二步:根据唯一标示和超时设置创建一个事务

    TransactionImpl tx = new TransactionImpl(otid, transactionTimeout)

  • 第三步:对上述事务设定一个定时器

    tx.setTimer(timermgr.addTimer(tx, transactionTimeout, null, false));

  • 第四步:把上述事务和Xid关系绑定到当前线程

    threadTx.set(tx); txXids.put(xid, tx);

    org.objectweb.jotm.Current有这三个线程绑定数据:

    private static transient ThreadLocal<TransactionImpl> threadTx = new ThreadLocal<TransactionImpl>(); private static transient ThreadLocal<Integer> threadTimeout = new ThreadLocal<Integer>(); private transient static Map<Xid, TransactionImpl> txXids = new HashMap<Xid, TransactionImpl>();

3.3 TransactionImpl与XAResource的交互

在我们创建完成事务后,开始使用业务逻辑操作时即上一篇的工程项目中使用JdbcTemplate时,就会执行TransactionImpl与XAResource的交互,即javax.transaction.Transaction接口方法enlistResource(XAResource xaRes):即把与资源管理器的通信代表XAResource加入到当前事务中来,在看TransactionImpl是如何来实现的之前,先了解下TransactionImpl中SubCoordinator的内部结构

3.3.1 TransactionImpl中SubCoordinator的内部结构

TransactionImpl内部有一个SubCoordinator,这就是一个两阶段提交中的协调者:

SubCoordinator内部含有2个集合:

private Vector resourceList = new Vector();
private Vector javaxxidList = new Vector();

一个是存放XAResource的,另一个是存放对应XAResource的xid。这样的做法有点难以接受,他们仅仅靠位置来对应XAResource和Xid的关系。这里说明下,每个事务都会有一个xid进行标示,每个XAResource都会有另一个xid来标示。

SubCoordinator实现了一个接口org.objectweb.jotm.Resource,接口定义如下:

public interface Resource extends Remote {

    /** 
     * phase 1 of the 2PC.
     *
     * @return int vote commit, rollback, or readonly.
     */
    public int prepare() throws RemoteException;
    public final static int VOTE_COMMIT = 0;
    public final static int VOTE_ROLLBACK = 1;
    public final static int VOTE_READONLY = 2;

    /** 
     * rollback transaction
     */
    public void rollback() throws RemoteException;

    /** 
     * phase 2 of the 2PC.
     */
    public void commit() throws RemoteException;

    /** 
     * commit 1 phase.
     */
    public void commit_one_phase() throws RemoteException;

    /** 
     * forget heuristics about this transaction.
     */
    public void forget() throws RemoteException;
}

可以清楚的看到这个org.objectweb.jotm.Resource接口就是针对两阶段提交来定义的。先有prepare方法进行投票,根据返回值结果(上述三个结果VOTE_COMMIT、VOTE_ROLLBACK、VOTE_READONLY)来选择下一步是执行commit还是rollback还是forget(当返回结果为VOTE_READONLY只读时,执行forget,即忽略这个事务)。

来看下SubCoordinator是如何来实现这个接口的:

接口方法int prepare():

  • 第一步:先将所有的XAResource结束事务边界,即调用XAResource的end方法,然后从事务TransactionImpl的enlistedXARes列表中移至delistedXARes

    事务TransactionImpl也有如下三个数据:

    private List<XAResource> enlistedXARes = Collections.synchronizedList(new ArrayList<XAResource>());

    private List<XAResource> delistedXARes = null;

    private List<javax.transaction.xa.Xid> enlistedJavaxXid = Collections.synchronizedList(new ArrayList<javax.transaction.xa.Xid>());

    也就是说事务中保存了一份XAResource,事务中的SubCoordinator也保存了一份

    整个过程代码如下:

    //先将enlistedXARes全部移至delistedXARes delistedXARes = new ArrayList<XAResource>(enlistedXARes); for (XAResource xar : delistedXARes) {

       delistResource(xar, flag);
    

    }

    public boolean delistResource(XAResource xares, int flag){

       //结束事务边界
       xares.end (myjavaxxid, flag);
    
       //从enlistedXARes、enlistedJavaxXid列表中移除
       enlistedXARes.remove(xares);
       enlistedJavaxXid.remove(javaxxid);
       //略
    

    }

  • 第二步:开始准备预提交,遍历SubCoordinator中所有的XAResource,依次进行预提交即执行每个XAResource的prepare方法(简化了部分内容)

    ret=VOTE_READONLY; int errors = 0; for (int i = 0; i < resourceList.size(); i++) {

       XAResource res = (XAResource) resourceList.elementAt(i);
       javax.transaction.xa.Xid myjavaxxid = (javax.transaction.xa.Xid) javaxxidList.elementAt(i);
       if (errors > 0) {
           res.rollback(myjavaxxid);
       }else{
           try {
               switch (res.prepare(myjavaxxid)) {
                       case XAResource.XA_OK :
                           ret = VOTE_COMMIT;
                           break;
                       case XAResource.XA_RDONLY :
                           break;
                       }
           } catch (XAException e) {
                ret = VOTE_ROLLBACK;
                errors++
           }
       }
    

    } return ret;

这里还是同步执行,效率肯定很慢。

接口方法void rollback():

  • 第一步:同上第一步,先将所有的XAResource结束事务边界,然后从事务TransactionImpl的enlistedXARes列表中移至delistedXARes
  • 第二步:遍历SubCoordinator中所有的XAResource,依次执行每个XAResource的rollback方法。

接口方法void commit():也是类似,将XAResource依次执行commit操作。

3.3.2 TransactionImpl的enlistResource操作

即把与资源管理器的通信代表XAResource加入到当前事务中来,enlistResource(XAResource xares)含有如下操作:

  • 第一步:添加到TransactionImpl中的SubCoordinator中

    subcoord.addResource(xares);

  • 第二步:创建该XAResource对应的xid

    Xid resXid = new XidImpl( getXid(),subcoord.getXaresIndex(xares) ); javax.transaction.xa.Xid javaxxid = new JavaXidImpl(resXid);

  • 第三步:把该xid加入进SubCoordinator中

    subcoord.addJavaxXid(javaxxid);

  • 第四步:开始XAResource的事务边界

    xares.start (javaxxid, flag);

  • 第五步:把上述XAResource和xid也存放到事务TransactionImpl中

    enlistedXARes.add(xares); enlistedJavaxXid.add(javaxxid);

TransactionImpl的delistResource操作就不再说明了。详情自己去看源码。

3.4 UserTransaction提交或回滚一个事务

这个就不再详细说明了。

  • 第一步:首先从当前线程中获取绑定的事务即TransactionImpl,依托该事务进行回滚或者提交,TransactionImpl会依托内部的协调者SubCoordinator来进行提交或者回滚
  • 第二步:清空当前线程绑定的事务

4 jotm和xapool的交互

4.1 在业务逻辑操作中XAResource加入进事务

在执行业务逻辑的时候即如下:

jdbcTemplate.update("insert into user(name,age) values(?,?)",user.getName(),user.getAge());

再对照着配置文件来看:

<bean id="dataSourceA" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource"  destroy-method="shutdown">
     <property name="dataSource">  
        <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">  
            <property name="transactionManager" ref="jotm" />  
            <property name="driverName" value="com.mysql.jdbc.Driver" />  
            <property name="url" value="jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8" />  
        </bean>  
    </property>     
    <property name="user" value="root" />  
    <property name="password" value="ligang" />  
</bean>

这时候jdbcTemplate会从dataSource中获取一个Connection来执行sql,即从StandardXAPoolDataSource中获取Connection。具体步骤如下:

  • 第一步:StandardXAPoolDataSource发现它内部的pool还没有初始化,启动初始化。初始化会创建指定数量的的对象,StandardXAPoolDataSource在pool中存放和维护的对象是StandardXAConnection。而StandardXAConnection的创建委托给了由内部的StandardXADataSource来进行创建。初始化过后,pool中就存在了指定数量的StandardXAConnection对象
  • 第二步:获取Connection,会先从pool中获取一个StandardXAConnection,该对象再进行包装,封装成一个StandardXAConnectionHandle,返回给用户,所以用户使用的Connection是StandardXAConnectionHandle类型的
  • 第三步:使用StandardXAConnectionHandle来执行sql操作前,会进行事务的判断,如下:

    StandardXAConnectionHandle的事务逻辑

    先获取和当前线程绑定的事务,它是通过上述配置文件中配置的transactionManager来获取的,如下:

    Transaction ntx = transactionManager.getTransaction(); 如果存在事务,则设置当前自动提交为false。因为StandardXAConnection底层还是使用的是普通的Connection来完成的,所以此操作就是设置一个普通的Connection的自动提交为false。

    将XAResource加入到当前事务中。这部分内容上文已经说了。

这个XAResource是什么类型呢?就是StandardXAConnection,它实现了XAResource:

public class StandardXAConnection extends StandardPooledConnection implements XAConnection, XAResource

也就是说事务TransactionImpl的提交和回滚和prepare操作依赖于TransactionImpl内部的SubCoordinator,而SubCoordinator又会依次调用每个XAResource(这里即StandardXAConnection)的提交和回滚和prepare操作。而StandardXAConnection内部只有一个普通的Connection,所以StandardXAConnection要利用普通的Connection来模拟XAResource的操作。具体怎么模拟的,这里就不再介绍了,详情去看源代码。

4.2 案例总结

针对上一篇文章介绍的工程项目,简单总结下:

@Transactional
public void save(User user){
    userDao.save(user);
    logDao.save(user);
    throw new RuntimeException();
}
  • 第一步:有了@Transactional注解,会把创建出代理对象,加入事务拦截器,在执行save方法之前,会先使用UserTransaction开启一个事务即TransactionImpl,并创建一个xid,进行标示,然后把该事务绑定到当前线程。
  • 第二步:使用userDao执行业务逻辑时,即使用JdbcTemplate操作时,会从StandardXAPoolDataSource中获取一个Connection。

    首选会初始化StandardXAPoolDataSource中的pool,创建出指定数量的StandardXAConnection

    然后再从pool中获取一个StandardXAConnection,StandardXAConnection又进行再次包装成StandardXAConnectionHandle,返回给用户作为Connection。

  • 第三步:使用Connection(这里即StandardXAConnectionHandle)执行sql前,会根据事务管理器获取当前线程绑定的事务,如果有,则设置StandardXAConnectionHandle的自动提交为false,最终是设置到普通的Connection上了。并且把XAResource加入到当前事务中,即把StandardXAConnection加入到TransactionImpl中,同时开启XAResource的事务边界,即调用start方法

  • 第四步:一旦执行过程发生异常,spring的PlatformTransactionManager这里即JtaTransactionManager,会获取UserTransaction(这里即org.objectweb.jotm.Current)进行回滚操作,它会获取当前此线程绑定的事务TransactionImpl进行回滚,TransactionImpl会委托到内部的协调者SubCoordinator,SubCoordinator会调用每个加入进来的XAResource(这里即StandardXAConnection)执行回滚操作,StandardXAConnection则会依托内部的普通Connection进行回滚操作。

4.3 遗留的问题

可以看到上述过程,事务TransactionImpl内部的协调者SubCoordinator虽然实现了两阶段提交过程的代码,但是在上述案例中并没有体现出来,也就是没有去调用过prepare过程。

上述过程SubCoordinator仅仅起到了一个收集Connection的作用,首选把所有的Connection的自动提交设置为false,执行业务操作,一旦发现异常,则执行每个Connection的rollback操作。

所以到底怎么去使用两阶段提交模式呢?还需要去仔细去研究研究这两方面的内容:

  • javax.resource.spi.XATerminator接口和org.objectweb.jotm.XATerminatorImpl实现,该接口也是javax针对两阶段提交协议定义的接口,和jotm中定义的org.objectweb.jotm.Resource差不多。实现其实还是基于SubCoordinator来执行的,这在什么情况下使用呢?

  • SubCoordinator的源代码,它还继承了一个远程调用的PortableRemoteObject。有兴趣的可以去研究下

    /**

    * This object is the local coordinator. It may be registered as
    * sub-coordinator in case of distributed transaction, so it must
    * be callable remotely and implement Resource
    */
    

    public class SubCoordinator extends PortableRemoteObject implements Resource

5 结束语

本篇文章主要介绍了使用jotm和xapool实现分布式事务的原理。下一篇就开始介绍atomikos对于分布式事务的支持。同样先给出例子:

  • atomikos使用非XA数据库驱动实现分布式事务的例子
  • atomikos使用XA数据库驱动实现分布式事务的例子
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
缓存 算法 NoSQL
【分布式详解】一致性算法、全局唯一ID、分布式锁、分布式事务、 分布式缓存、分布式任务、分布式会话
分布式系统通过副本控制协议,使得从系统外部读取系统内部各个副本的数据在一定的约束条件下相同,称之为副本一致性(consistency)。副本一致性是针对分布式系统而言的,不是针对某一个副本而言。强一致性(strong consistency):任何时刻任何用户或节点都可以读到最近一次成功更新的副本数据。强一致性是程度最高的一致性要求,也是实践中最难以实现的一致性。单调一致性(monotonic consistency):任何时刻,任何用户一旦读到某个数据在某次更新后的值,这个用户不会再读到比这个值更旧的值。
380 0
|
2月前
|
消息中间件 Dubbo 应用服务中间件
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
74 0
|
2月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
54 0
|
Dubbo 应用服务中间件 微服务
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)(上)
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
41 1
|
开发框架 Java 数据库连接
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)(下)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
37 0
|
数据库 微服务
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)(上)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
39 0
|
21天前
|
存储 Java 应用服务中间件
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
42 0
|
5月前
|
负载均衡 Java 应用服务中间件
分布式系列教程(23) -分布式事务解决方案(实践篇)
分布式系列教程(23) -分布式事务解决方案(实践篇)
57 0
|
5月前
|
SQL Dubbo Java
分布式系列教程(22) -分布式事务解决方案(设计篇)
分布式系列教程(22) -分布式事务解决方案(设计篇)
65 0
|
5月前
|
SQL NoSQL 关系型数据库
分布式系列教程(21) -分布式事务解决方案(框架篇)
分布式系列教程(21) -分布式事务解决方案(框架篇)
63 0

热门文章

最新文章