跳至主要內容

谈关系数据库的设计与实现(5)——并发控制

pedrogaodatabasesqldatabaseoltp大约 47 分钟

并发控制

在并发 B+树这一节中,我们实现了支持并发访问的 B+树,B+树可用于存储索引,但是数据库还有更加重要的数据部分——数据表记录(Tuple)。

在 bustub 中,是通过 TransactionManager(事务管理器) 和 LockManager(锁管理器) 来实现 Tuple 并发访问控制和数据库事务的。

事务

事务是数据库访问、更新数据的一个基本执行单元,它可由多个数据库操作组成,且多个操作不可分割,要么全部成功,要么全部失败。

事务具有如下四个特性(摘自维基百科),习惯上被称之为 ACID特性open in new window

  • 原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
  • 一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。
  • 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
  • 持久性(Durability):已被提交的事务对数据库的修改应该永久保存在数据库中。

事务隔离级别

多事务并发执行的情况下,可能会出现如下几个问题:

  • 脏读(dirty read):A 事务读取 B 事务尚未提交的数据,此时如果 B 事务发生错误并执行回滚操作,那么 A 事务读取到的数据就是脏数据;

  • 不可重复读(non-repeatable read):A 事务第一次读取数据 1,然后 B 事务更改了数据 1 为 2,然后 A 事务再次读取,发现数据变成了 2;

  • 幻读(phantom read):幻读发生在其他事务 insert 数据,A 事务第一次查询有 100 条,第二次查询却发现有 200 条,原因在于其它事务插入了数据。 为了解决这些问题,就有了「事务隔离级别」这个概念。隔离级别用于控制多个事务并发访问时,彼此的隔离状态,级别越高,隔离度也越好,但是整体运行效率也越低。

  • 读未提交:一个事务还没提交,其变更就能被其它事务看到。

  • 读提交:一个事务提交之后,其变更才能被其它事务看到。

  • 可重复读:一个事务执行过程中看到的数据,总是跟这个事务在启动时看到的数据是一致的。

  • 串行化:后访问的事务必须等前一个事务执行完成,才能继续执行。 隔离级别越高,所能解决的问题就越多,但同样的,效率也就越低。如下表 1 所示,读未提交无法解决脏读、不可重复读、幻读中的任何一个,而串行化能够解决它们。

脏读不可重复读幻读
读未提交✖️✖️✖️
读提交☑️✖️✖️
可重复读☑️☑️✖️
串行化☑️☑️☑️

表 1

那么是不是隔离级别越高越好了?显然不是的,如 MySQL 默认的隔离级别是 RR(可重复读),而 PostgreSQL 默认隔离级别是 RC(读提交),性能是数据库一个重要的考量指标,而不可重读等问题实际上出现的并不多,很少有人在一个事务过程中,多次读取同一条记录。

二阶段事务

介绍事务基本概念后,又该如何实现事务了?

一个事务访问数据库时,可能会对多个数据页加锁并读取数据,那么又该如何保证事务之间不发生数据冲突了?

在 bustub 中,是通过「二阶段事务」来解决这两个问题的。事务一般有三个操作:

  • 开始(Begin):启动一个事务;
  • 提交(Commit):提交一个事务;
  • 回滚(Rollback):回滚一个事务。 二阶段事务沿用了这三个操作,在 bustub 中也可以通过 Begin 开始一个事务,Commit 提交一个事务,如果发生问题,也可以通过 Rollback 回滚事务。

但二阶段事务却有 4 个状态:

  • Growing:获取锁阶段,事务向锁管理器请求锁;
  • Shrinking:释放锁阶段,事务不能再获取锁,只能释放锁;
  • Commited:提交阶段,事务已提交;
  • Aborted:终止阶段,事务已终止。 事务是数据访问的基本单位,任何一个连接建立后,数据库系统都会为其分配一个事务(由事务 id 唯一标识),此后该连接对数据的访问都会以此事务作为基本单位。

因此对于并发连接的数据访问控制,实则就是对事务的控制。、

两阶段锁定协议(2PL)是一种并发控制协议,决定事务是否可以动态地访问数据库中的对象。

2PL 两个阶段:

  • Growing:
    • 每个事务当需要时向锁管理器请求锁;
    • 锁管理器负责向事务授予锁、或者阻塞事物。
  • Shrinking:
    • 事务无法再获取新的锁(也不能升级),只能释放原来获取的锁。 图片

2PL 仍然会存在:

  • 脏读问题,解决方案是 Strong Strict 2PL;
  • 死锁问题,解决方案是检测、预防。 为什么会存在脏读?因为再 shrinking 阶段,若事务释放了一个数据的锁,那么该锁可以被其它事务获取,而当前事务还未提交,因此其它事务可能读到脏数据。

强二阶段锁定协议示意图如下:

图片

强二阶段锁规定,shrinkong 阶段不能释放某一个锁,只能在结束时释放所有的锁。

锁管理器

锁是为了解决不同事务并发操作的隔离性问题,如下:

图片

事务 T1 最后访问 A 的同时,T2 更新了 A 并提交了,因此 T1 会读到脏数据。

我们需要一种机制来保证所有事务执行是正确的(即事务本身觉得是序列化执行的),事务不用关系其它事务之间的数据依赖。

解决办法:使用锁来保护数据对象。如下:

图片

事务 T1 来访问 A 时,必须对 A 加锁,加锁后,T2 不能访问 A,必须等待 T1 释放锁后,才能对 A 加锁,然后再更改。

锁可分为如下两种类型:

  • S:共享锁,其它事务可读;

  • X:独占锁,其它事物不可读、不可写。 锁管理器的执行机制如下:

  • 事务请求(或者升级)锁;

  • 锁管理器授予事务锁、或者阻塞事务执行;

  • 事务释放锁;

  • 锁管理器在内部维护了事务锁表:跟踪哪些事务持有哪些锁以及哪些事务正在等待获取其它锁。

死锁

当事务之间存在数据依赖,并且发生了环等待,就会产生死锁。解决死锁的方法有两种:

  • 死锁检测

  • 死锁预防 锁管理器在内部维护一个名为 wait-for 的图,用来记录事务正在等待获取的锁:

  • 图中的节点是事务;

  • 如果 T1 正在等待 T2 释放锁,那么 T1 有一条边指向 T2。 数据库系统会周期性地检查 wait-for 图,找到死锁,并决定如何打破它。

如图所示:

图片

T1 等待 T2 释放锁,T2 等待 T3 释放锁,而 T3 又在等待 T1 释放锁,因此会出现环形等待,即死锁。

当系统检测到死锁时,它会选择一个事务回滚以打破循环等待。回滚事务将重新启动或中止,这取决于具体情况。

事务淘汰策略有多种,比如:

  • By age,长事务先淘汰;
  • By number of locks,锁多的先淘汰;
  • 等等

实现

锁管理器(LockManager)负责对维护事务与数据表记录之间的锁联系,任何事务在访问数据记录之前,必须先尝试获取该记录的锁,这个过程被称为「请求锁」,请求成功后,锁管理器会赋予事务锁,否则返回失败。

比如,事务获取某条记录的共享锁:

bool LockManager::LockShared(Transaction *txn, const RID &rid) {
  // 事务共享锁加入 rid
  // Transaction txn tries to take a shared lock on record id rid.
  // This should be blocked on waiting and should return true when granted.
  // Return false if transaction is rolled back (aborts).
  // 如果是读未提交,那么直接 abort
  if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
    AbortImplicitly(txn, AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
    return false;
  }
  // 如果是可重复读,且事务处于 shrinking 状态,那么直接 abort
  // 因为 shrinking 状态下,事务不能再加锁
  if (txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ && txn->GetState() == TransactionState::SHRINKING) {
    AbortImplicitly(txn, AbortReason::LOCK_ON_SHRINKING);
    return false;
  }
  // 如果 rid 已经被锁了,那么直接返回 true
  if (txn->IsSharedLocked(rid) || txn->IsExclusiveLocked(rid)) {
    return true;
  }
  // latch_ 用来保护 lock_table
  // lock_table 中保存了 rid 对应的锁队列
  std::unique_lock<std::mutex> latch(latch_);
  auto &lock_request_queue = lock_table_[rid];
  latch.unlock();  // 解锁
  // 操作 rid 对应的队列,对队列加锁
  std::unique_lock<std::mutex> queue_latch(lock_request_queue.latch_);
  // 将事务 id 和加锁模式加入到队列
  auto &lock_request =
      lock_request_queue.request_queue_.emplace_back(txn->GetTransactionId(), LockManager::LockMode::SHARED);
  // 加入后等待
  lock_request_queue.cv_.wait(queue_latch, [&lock_request_queue, &lock_request, &txn] {
    // 如果事务 abort,或者当前锁请求与队列中的其它锁兼容,比如都是共享锁,那么直接停止等待,继续执行
    return LockManager::IsLockCompatible(lock_request_queue, lock_request) ||
           txn->GetState() == TransactionState::ABORTED;
  });
  // abort,直接返回
  if (txn->GetState() == TransactionState::ABORTED) {
    AbortImplicitly(txn, AbortReason::DEADLOCK);
  }
  // 授予锁
  lock_request.granted_ = true;
  txn->GetSharedLockSet()->emplace(rid);
  return true;
}

LockShared 方法接受两个参数:

  • RID:每一条数据记录都由 RID 唯一标识;
  • Transaction:事务,即当前请求共享锁的事务实例。 当调用 LockShared 方法时,txn 向锁管理器请求 rid 记录的共享锁。方法会根据事务隔离级别、以及事务阶段来判断是否授予锁。同理锁管理器还提供了 LockExclusive,LockUpgrade,Unlock 等方法分别用于互斥锁、锁升级和解锁等操作。

而对于死锁检测,锁管理器维护了一个 waits_for_ 等待列表,并启动一个后台线程专门用于检查事务之间的依赖关系,如下:

void LockManager::RunCycleDetection() {
  // 检查是否存在死锁
  // You should not be maintaining a graph, it should be built and destroyed every time the thread wakes up.
  while (enable_cycle_detection_) {
    std::this_thread::sleep_for(cycle_detection_interval);
    {
      std::unique_lock<std::mutex> latch(latch_);
      if (!enable_cycle_detection_) {
        break;
      }
      waits_for_.clear();
      BuildWaitsForGraph();
      txn_id_t txn_id;
      while (HasCycle(&txn_id)) {
        // 得到死锁的 txn_id
        auto txn = TransactionManager::GetTransaction(txn_id);
        txn->SetState(TransactionState::ABORTED);  // abort
        // 清除 txn_id 上的锁
        // 得到 txn_id 上所有的边
        for (const auto &wait_on_txn_id : waits_for_[txn_id]) {
          auto wait_on_txn = TransactionManager::GetTransaction(wait_on_txn_id);
          // 得到事务上的所有 share、exclusive 的 RID
          std::unordered_set<RID> lock_set;
          lock_set.insert(wait_on_txn->GetSharedLockSet()->begin(), wait_on_txn->GetSharedLockSet()->end());
          lock_set.insert(wait_on_txn->GetExclusiveLockSet()->begin(), wait_on_txn->GetExclusiveLockSet()->end());
          for (auto locked_rid : lock_set) {
            // 通知
            lock_table_[locked_rid].cv_.notify_all();
          }
        }
        // 重新建立图
        waits_for_.clear();
        BuildWaitsForGraph();
      }
    }
  }
}

后台线程每隔一段时间就会构建出事务之间的依赖图,然后判断事务之间是否存在循环依赖(Cycle),如果存在则将事务的状态设置为 Abort。

执行器

在上一讲中,我们介绍了 bustub 如何通过执行器来执行 SQL 查询计划,但很显然,前面的执行器是无法支持事务隔离的,因此我们需要通过锁管理器来完善执行器的事务支持。

仍然以全表扫描为例,SeqScanExecutor 是如何来支持事务的呢?在 Next 函数中,加上事务隔离级别的判断,如下:

bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
  Tuple tup;
  // 谓词不为空,而且执行 ok
  // tuples are returned if predicate(tuple) = true or predicate = nullptr
  do {
    // 到末尾了,直接返回
    if (*table_iterator_ == table_metadata_->table_->End()) {
      return false;
    }
    tup = *(*table_iterator_);  // 得到当前 tuple
    ++(*table_iterator_);       // 下一个
  } while (plan_->GetPredicate() != nullptr &&
           // 执行
           !plan_->GetPredicate()->Evaluate(&tup, &(table_metadata_->schema_)).GetAs<bool>());
  // 判断事务隔离级别
+  switch (exec_ctx_->GetTransaction()->GetIsolationLevel()) {
+    case IsolationLevel::READ_UNCOMMITTED:
+      break;  // 读未提交,未加任何锁,直接 break
+    case IsolationLevel::READ_COMMITTED:
+      // 读已提交
+      // 1.没有加读锁;
+      // 2.也没有加写锁;
+      // 3.不能加读锁后,立即解锁成功
+      if (!exec_ctx_->GetTransaction()->IsSharedLocked(tup.GetRid()) &&
+          !exec_ctx_->GetTransaction()->IsExclusiveLocked(tup.GetRid()) &&
+          !(exec_ctx_->GetLockManager()->LockShared(exec_ctx_->GetTransaction(), tup.GetRid()) &&
+            exec_ctx_->GetLockManager()->Unlock(exec_ctx_->GetTransaction(), tup.GetRid()))) {
+        return false;
+      }
+      break;
+    case IsolationLevel::REPEATABLE_READ:
+      // 可重读读
+      // 1. 没有加读锁
+      // 2. 没有加写锁
+      // 3. 且不能加读锁
+      if (!exec_ctx_->GetTransaction()->IsSharedLocked(tup.GetRid()) &&
+          !exec_ctx_->GetTransaction()->IsExclusiveLocked(tup.GetRid()) &&
+          !exec_ctx_->GetLockManager()->LockShared(exec_ctx_->GetTransaction(), tup.GetRid())) {
+        return false;
+      }
+      break;
+    default:
+      break;
  }
  // 一个 Tuple 是一条记录,values 是字段值,schema 是字段名称
  std::vector<Value> values;
  std::transform(plan_->OutputSchema()->GetColumns().begin(), plan_->OutputSchema()->GetColumns().end(),
                 std::back_inserter(values), [&tup, &table_metadata_ = table_metadata_](const Column &col) {
                   // Column 是数据列,即字段的定义,调用 Evaluate 获取列数据
                   return col.GetExpr()->Evaluate(&tup, &(table_metadata_->schema_));
                 });
  // 赋值
  *tuple = Tuple{values, plan_->OutputSchema()};
  *rid = tup.GetRid();
  return true;
}

对于三个事务隔离级别的处理方式如下:

  • 读未提交,未加任何锁,直接 break;
  • 读已提交,没有加读、写锁,且无法加读锁,并能解锁;
  • 可重复读:没有加读、写锁,切无法加读锁 TODO 还未完全理清楚

参考资料