有版本的值(Versioned Value)
原文
https://martinfowler.com/articles/patterns-of-distributed-systems/versioned-value.html
将值的每次更新连同新版本一同存储起来,以允许读取历史值。
2021.6.22
问题
在分布式系统中,节点需要知道对于某个键值而言,哪个值是最新的。有时,它们知道过去的值,这样,它们就可以对一个值的变化做出恰当的反应了。
解决方案
在每个值里面都存储一个版本号。版本在每次更新时递增。这样,每次更新就都可以转换为一次新的写入,而无需阻塞读取。客户端可以读取特定版本号的对应的历史值。
考虑一个复制的键值存储的简单例子。集群的领导者处理所有对键值存储的写入。它将写入请求保存在预写日志中。预写日志使用领导者和追随者进行复制。在到达高水位标记时,领导者将预写日志的条目应用到键值存储中。这是一种标准的复制方法,称之为状态机复制(state-machine-replication)。大多数数据系统,其背后如果像 Raft 这样的共识算法支撑,都是这样实现的。在这种情况下,键值存储中会保存一个整数的版本计数器。每次根据预写日志应用键值与值的写命令时,这个计数器都要递增。然后,它会根据递增之后的版本计数器,构建一个新的键值。这样一来,不存在值的更新,但每次写入请求都会向后面的存储中附加一个新的值。
class ReplicatedKVStore…
int version = 0;
MVCCStore mvccStore = new MVCCStore();
@Override
public CompletableFuture<Response> put(String key, String value) {
return server.propose(new SetValueCommand(key, value));
}
private Response applySetValueCommand(SetValueCommand setValueCommand) {
getLogger().info("Setting key value " + setValueCommand);
version = version + 1;
mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue());
Response response = Response.success(version);
return response;
}
class ReplicatedKVStore…
int version = 0;
MVCCStore mvccStore = new MVCCStore();
@Override
public CompletableFuture<Response> put(String key, String value) {
return server.propose(new SetValueCommand(key, value));
}
private Response applySetValueCommand(SetValueCommand setValueCommand) {
getLogger().info("Setting key value " + setValueCommand);
version = version + 1;
mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue());
Response response = Response.success(version);
return response;
}
有版本键值的排序
能够快速定位到最佳匹配的版本,这是一个重要的实现考量,所以,有版本的价值常按这种方式进行组织:使用版本号当做键值的后缀,形成一个自然排序。这样就可以保持一个与底层数据结构相适应的顺序。比如说,一个键值有两个版本,key1 和 key2,key1 就会排在 key2 前面。
要存储有版本的键值与值,可以使用某种数据结构,比如,跳表,这样可以快速定位到最近的匹配版本上。使用 Java,可以像下面这样构建 MVCC 存储:
class MVCCStore…
public class MVCCStore {
NavigableMap<VersionedKey, String> kv = new ConcurrentSkipListMap<>();
public void put(VersionedKey key, String value) {
kv.put(key, value);
}
class MVCCStore…
public class MVCCStore {
NavigableMap<VersionedKey, String> kv = new ConcurrentSkipListMap<>();
public void put(VersionedKey key, String value) {
kv.put(key, value);
}
为了使用 NavigableMap,有版本的键值可以像下面这样实现。它会实现一个比较器,允许键值的自然排序。
class VersionedKey…
public class VersionedKey implements Comparable<VersionedKey> {
private String key;
private int version;
public VersionedKey(String key, int version) {
this.key = key;
this.version = version;
}
public String getKey() {
return key;
}
public int getVersion() {
return version;
}
@Override
public int compareTo(VersionedKey other) {
int keyCompare = this.key.compareTo(other.key);
if (keyCompare != 0) {
return keyCompare;
}
return Integer.compare(this.version, other.version);
}
}
class VersionedKey…
public class VersionedKey implements Comparable<VersionedKey> {
private String key;
private int version;
public VersionedKey(String key, int version) {
this.key = key;
this.version = version;
}
public String getKey() {
return key;
}
public int getVersion() {
return version;
}
@Override
public int compareTo(VersionedKey other) {
int keyCompare = this.key.compareTo(other.key);
if (keyCompare != 0) {
return keyCompare;
}
return Integer.compare(this.version, other.version);
}
}
这个实现允许通过 NavigableMap 的 API 获取特定版本的值。
class MVCCStore…
public Optional<String> get(final String key, final int readAt) {
Map.Entry<VersionedKey, String> entry = kv.floorEntry(new VersionedKey(key, readAt));
return (entry == null)? Optional.empty(): Optional.of(entry.getValue());
}
class MVCCStore…
public Optional<String> get(final String key, final int readAt) {
Map.Entry<VersionedKey, String> entry = kv.floorEntry(new VersionedKey(key, readAt));
return (entry == null)? Optional.empty(): Optional.of(entry.getValue());
}
看一个例子,一个键值有四个版本,存储的版本号分别是 1、2、3 和 5。根据客户端所使用版本去读取值,返回的是最接近匹配版本的键值。
存储有特定键值和值的版本就会返回给客户端。客户端使用这个版本读取值。整体工作情况如下所示:
versioned-value-logical-clock-put
读取多个版本
有时,客户端需要获取从某个给定版本号开始的所有版本。比如,在状态监控中,客户端就要获取从指定版本开始的所有事件。
集群节点可以多存一个索引结构,以便将一个键值所有的版本都存起来。
class IndexedMVCCStore…
public class IndexedMVCCStore {
NavigableMap<String, List<Integer>> keyVersionIndex = new TreeMap<>();
NavigableMap<VersionedKey, String> kv = new TreeMap<>();
ReadWriteLock rwLock = new ReentrantReadWriteLock();
int version = 0;
public int put(String key, String value) {
rwLock.writeLock().lock();
try {
version = version + 1;
kv.put(new VersionedKey(key, version), value);
updateVersionIndex(key, version);
return version;
} finally {
rwLock.writeLock().unlock();
}
}
private void updateVersionIndex(String key, int newVersion) {
List<Integer> versions = getVersions(key);
versions.add(newVersion);
keyVersionIndex.put(key, versions);
}
private List<Integer> getVersions(String key) {
List<Integer> versions = keyVersionIndex.get(key);
if (versions == null) {
versions = new ArrayList<>();
keyVersionIndex.put(key, versions);
}
return versions;
}
class IndexedMVCCStore…
public class IndexedMVCCStore {
NavigableMap<String, List<Integer>> keyVersionIndex = new TreeMap<>();
NavigableMap<VersionedKey, String> kv = new TreeMap<>();
ReadWriteLock rwLock = new ReentrantReadWriteLock();
int version = 0;
public int put(String key, String value) {
rwLock.writeLock().lock();
try {
version = version + 1;
kv.put(new VersionedKey(key, version), value);
updateVersionIndex(key, version);
return version;
} finally {
rwLock.writeLock().unlock();
}
}
private void updateVersionIndex(String key, int newVersion) {
List<Integer> versions = getVersions(key);
versions.add(newVersion);
keyVersionIndex.put(key, versions);
}
private List<Integer> getVersions(String key) {
List<Integer> versions = keyVersionIndex.get(key);
if (versions == null) {
versions = new ArrayList<>();
keyVersionIndex.put(key, versions);
}
return versions;
}
这样,就可以提供一个客户端 API,读取从指定版本开始或者一个版本范围内的所有值。
class IndexedMVCCStore…
public List<String> getRange(String key, final int fromRevision, int toRevision) {
rwLock.readLock().lock();
try {
List<Integer> versions = keyVersionIndex.get(key);
Integer maxRevisionForKey = versions.stream().max(Integer::compareTo).get();
Integer revisionToRead = maxRevisionForKey > toRevision ? toRevision : maxRevisionForKey;
SortedMap<VersionedKey, String> versionMap = kv.subMap(new VersionedKey(key, revisionToRead), new VersionedKey(key, toRevision));
getLogger().info("Available version keys " + versionMap + ". Reading@" + fromRevision + ":" + toRevision);
return new ArrayList<>(versionMap.values());
} finally {
rwLock.readLock().unlock();
}
}
class IndexedMVCCStore…
public List<String> getRange(String key, final int fromRevision, int toRevision) {
rwLock.readLock().lock();
try {
List<Integer> versions = keyVersionIndex.get(key);
Integer maxRevisionForKey = versions.stream().max(Integer::compareTo).get();
Integer revisionToRead = maxRevisionForKey > toRevision ? toRevision : maxRevisionForKey;
SortedMap<VersionedKey, String> versionMap = kv.subMap(new VersionedKey(key, revisionToRead), new VersionedKey(key, toRevision));
getLogger().info("Available version keys " + versionMap + ". Reading@" + fromRevision + ":" + toRevision);
return new ArrayList<>(versionMap.values());
} finally {
rwLock.readLock().unlock();
}
}
有一点必须多加小心,根据某个索引进行相应的更新和读取时,需要使用恰当的锁。
There is an alternate implementation possible to save a list of all the versioned values with the key, as used in Gossip Dissemination to avoid unnecessary state exchange.
有一种替代的实现方案,就是将所有值的列表和键值存在一起,就像在Gossip 传播(Gossip Dissemination)中所用的一样,以规避不必要的状态交换
MVCC 与事务隔离
并发控制,讨论的的是有多个并发请求访问同一数据时,该如何使用锁。当用锁对访问进行同步时,在持有锁的请求完成并释放掉锁之前,所有其它请求都会阻塞住。而使用了有版本的值,每个写请求都添加一条新的记录。这就可以用非阻塞数据结构存储值了。
Transaction isolation levels, such as [snapshot-isolation], can be naturally implemented as well. When a client starts reading at a particular version, it's guaranteed to get the same value every time it reads from the database, even if there are concurrent write transactions which commit a different value between multiple read requests.
事务隔离级别,比如快照隔离,实现起来就很自然了。当客户端在某个特定版本开始读取,它保证从数据库中每次读到的都是同一个值,即便存在并发写的事务,在多个读请求之间提交了不同的值。
使用 RocksDb 当做存储引擎
有一种很常见的数据存储的做法,就是使用 rocksdb 或类似的嵌入式存储引擎当做存储后端。比如,etcd 使用 boltdb,cockroachdb 早期使用 rocksdb,现在它用的是 RocksDb的一个 Go 语言的克隆版,称为 pebble。
这些存储引擎提供的实现很适合存储有版本的值。在内部,它们使用了跳表,其方式就如上面讨论的那样,依赖于键值的顺序。当然,需要为键值排序提供一个订制的比较器。
class VersionedKeyComparator…
public class VersionedKeyComparator extends Comparator {
public VersionedKeyComparator() {
super(new ComparatorOptions());
}
@Override
public String name() {
return "VersionedKeyComparator";
}
@Override
public int compare(Slice s1, Slice s2) {
VersionedKey key1 = VersionedKey.deserialize(ByteBuffer.wrap(s1.data()));
VersionedKey key2 = VersionedKey.deserialize(ByteBuffer.wrap(s2.data()));
return key1.compareTo(key2);
}
}
class VersionedKeyComparator…
public class VersionedKeyComparator extends Comparator {
public VersionedKeyComparator() {
super(new ComparatorOptions());
}
@Override
public String name() {
return "VersionedKeyComparator";
}
@Override
public int compare(Slice s1, Slice s2) {
VersionedKey key1 = VersionedKey.deserialize(ByteBuffer.wrap(s1.data()));
VersionedKey key2 = VersionedKey.deserialize(ByteBuffer.wrap(s2.data()));
return key1.compareTo(key2);
}
}
使用 rocksdb 可以这么做:
class RocksDBMvccStore…
private final RocksDB db;
public RocksDBMvccStore(File cacheDir) throws RocksDBException {
Options options = new Options();
options.setKeepLogFileNum(30);
options.setCreateIfMissing(true);
options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
options.setComparator(new VersionedKeyComparator());
db = RocksDB.open(options, cacheDir.getPath());
}
public void put(String key, int version, String value) throws RocksDBException {
VersionedKey versionKey = new VersionedKey(key, version);
db.put(versionKey.serialize(), value.getBytes());
}
public String get(String key, int readAtVersion) {
RocksIterator rocksIterator = db.newIterator();
rocksIterator.seekForPrev(new VersionedKey(key, readAtVersion).serialize());
byte[] valueBytes = rocksIterator.value();
return new String(valueBytes);
}
class RocksDBMvccStore…
private final RocksDB db;
public RocksDBMvccStore(File cacheDir) throws RocksDBException {
Options options = new Options();
options.setKeepLogFileNum(30);
options.setCreateIfMissing(true);
options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
options.setComparator(new VersionedKeyComparator());
db = RocksDB.open(options, cacheDir.getPath());
}
public void put(String key, int version, String value) throws RocksDBException {
VersionedKey versionKey = new VersionedKey(key, version);
db.put(versionKey.serialize(), value.getBytes());
}
public String get(String key, int readAtVersion) {
RocksIterator rocksIterator = db.newIterator();
rocksIterator.seekForPrev(new VersionedKey(key, readAtVersion).serialize());
byte[] valueBytes = rocksIterator.value();
return new String(valueBytes);
}
示例
etcd3 使用的 MVCC 后端有一个单独的整数表示版本。
mongodb 和 cockroachdb 使用的 MVCC 后端有一个混合逻辑时钟。