tp5框架使用XA事务遇到的坑/think php

我tp框架版本是5.1.39 LTS,因为公司项目的原因,一个操作需要修改到两个数据库的数据,所以就用了XA分布式事务,接下来就说一下针对一些报错的解决方法。

SQLSTATE[HY000]: General error: 2014 Cannot execute queries while other unbuffered queries are active. Consider using PDOStatement::fetchAll(). Alternatively, if your code is only ever going to run against mysql, you may enable query buffering by setting the PDO::MYSQL_ATTR_USE_BUFFERED_QUERY attribute.

1.打开thinkphp/library/think/db/connector/Mysql.php
2.把startTransXa、prepareXa、commitXa、rollbackXa这4个方法里的$this->execute()改成$this->linkID->exec()

例如startTransXa方法:

* 启动XA事务 * @access public * @param string $xid XA事务id * @return void public function startTransXa($xid) $this->initConnect(true); if (!$this->linkID) { return false; $this->execute("XA START '$xid'"); * 启动XA事务 * @access public * @param string $xid XA事务id * @return void public function startTransXa($xid) $this->initConnect(true); if (!$this->linkID) { return false; $this->linkID->exec("XA START '$xid'");

SQLSTATE[XAE08]: <<Unknown error>>: 1440 XAER_DUPID: The XID already exists

1.打开thinkphp/library/think/db/Query.php,第404行
2.一次操作开启多个XA事务时,xid是不能重复的,所以我们要每开启一个XA事务就用一个新的xid
3.看下面的修改前后变化

原来的方法代码:

* 执行数据库Xa事务 * @access public * @param callable $callback 数据操作方法回调 * @param array $dbs 多个查询对象或者连接对象 * @return mixed * @throws PDOException * @throws \Exception * @throws \Throwable public function transactionXa($callback, array $dbs = []) $xid = uniqid('xa'); if (empty($dbs)) { $dbs[] = $this->getConnection(); foreach ($dbs as $key => $db) { if ($db instanceof Query) { $db = $db->getConnection(); $dbs[$key] = $db; $db->startTransXa($xid); try { $result = null; if (is_callable($callback)) { $result = call_user_func_array($callback, [$this]); foreach ($dbs as $db) { $db->prepareXa($xid); foreach ($dbs as $db) { $db->commitXa($xid); return $result; } catch (\Exception $e) { foreach ($dbs as $db) { $db->rollbackXa($xid); throw $e; } catch (\Throwable $e) { foreach ($dbs as $db) { $db->rollbackXa($xid); throw $e; * 执行数据库Xa事务 * @access public * @param callable $callback 数据操作方法回调 * @param array $dbs 多个查询对象或者连接对象 * @return mixed * @throws PDOException * @throws \Exception * @throws \Throwable public function transactionXa($callback, array $dbs = []) if (empty($dbs)) { $dbs[] = $this->getConnection(); //定义一个装xid的空数组 $xid_data=[]; //根据事务中数据库个数生成xid foreach ($dbs as $key => $db) { $xid_data[$key] = uniqid('xa'); foreach ($dbs as $key => $db) { if ($db instanceof Query) { $db = $db->getConnection(); $dbs[$key] = $db; //每个事务操作使用自己的xid $db->startTransXa($xid_data[$key]); try { $result = null; if (is_callable($callback)) { $result = call_user_func_array($callback,[$this]); foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->prepareXa($xid_data[$key]); foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->commitXa($xid_data[$key]); return $result; } catch (\Exception $e) { foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->rollbackXa($xid_data[$key]); throw $e; } catch (\Throwable $e) { foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->rollbackXa($xid_data[$key]); throw $e;

SQLSTATE[XAE07]: <<Unknown error>>: 1399 XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state

解决方法:

1.会出现这个原因主要是在xa事务中,有一个操作执行错误了,需要执行回滚操作,但是根据 XA分布式事务的相关知识 知道,提交或回滚一个事务时,这个事务的状态不能是active状态
2.所以我们只要在捕捉异常里通过执行XA END [xid]和XA PREPARE[xid]来改变事务状态就可以回滚了,那prepareXa方法里包含了这两个语句的执行,所以我们只需要在捕捉异常里执行一下这个方法就可以了,代码沿用了错误2修改后的代码

* 执行数据库Xa事务 * @access public * @param callable $callback 数据操作方法回调 * @param array $dbs 多个查询对象或者连接对象 * @return mixed * @throws PDOException * @throws \Exception * @throws \Throwable public function transactionXa($callback, array $dbs = []) if (empty($dbs)) { $dbs[] = $this->getConnection(); //定义一个装xid的空数组 $xid_data=[]; //根据事务中数据库个数生成xid foreach ($dbs as $key => $db) { $xid_data[$key] = uniqid('xa'); foreach ($dbs as $key => $db) { if ($db instanceof Query) { $db = $db->getConnection(); $dbs[$key] = $db; //每个事务操作使用自己的xid $db->startTransXa($xid_data[$key]); try { $result = null; if (is_callable($callback)) { $result = call_user_func_array($callback,[$this]); foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->prepareXa($xid_data[$key]); foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->commitXa($xid_data[$key]); return $result; } catch (\Exception $e) { //执行prepareXa方法改变事务状态 foreach ($dbs as $key => $db) { $db->prepareXa($xid_data[$key]); foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->rollbackXa($xid_data[$key]); throw $e; } catch (\Throwable $e) { //执行prepareXa方法改变事务状态 foreach ($dbs as $key => $db) { $db->prepareXa($xid_data[$key]); foreach ($dbs as $key => $db) { //每个事务操作使用自己的xid $db->rollbackXa($xid_data[$key]); throw $e;

完整测试例子代码:

      $test1='test1';
      $test2='test2';
        try {
            Db::transactionXa(function ()use($test1,$test2) {
                $ar=Db::connect('db_config_test1')->table('pre_test')->insert(['name'=>$test1]);
                if(!$ar){
                    throw new \Exception('新增失败1');
                $ar=Db::connect('db_config_test2')->table('pre_test')->insert(['name'=>$test2]);
                if(!$ar){
                    throw new \Exception('新增失败2');
            }, [Db::connect('db_config_test1'),Db::connect('db_config_test2')]);
        } catch (\Exception $e) {
            echo $e->getMessage();
            exit;
        echo 'success';