幂等接收者(Idempotent Receiver)
原文
https://martinfowler.com/articles/patterns-of-distributed-systems/idempotent-receiver.html
识别来自客户端的请求是否唯一,以便在客户端重试时,忽略重复的请求。
2021.1.26
问题
客户端给服务器发请求,可能没有得到应答。客户端不可能知道应答是丢失了,还是服务端在处理请求之前就崩溃了。为了确保请求得到处理,客户端唯有重复发送请求。
如果服务器已经处理了请求,然后奔溃了,之后,客户端重试时,服务器端会收到客户端的重复请求。
解决方案
每个客户端都会分配得到一个唯一的 ID,用以对客户端进行识别。发送任何请求之前,客户端需要先向服务器进行注册。
class ConsistentCoreClient…
private void registerWithLeader() {
RequestOrResponse request
= new RequestOrResponse(RequestId.RegisterClientRequest.getId(),
correlationId.incrementAndGet());
//blockingSend will attempt to create a new connection if there is a network error.
RequestOrResponse response = blockingSend(request);
RegisterClientResponse registerClientResponse
= JsonSerDes.deserialize(response.getMessageBodyJson(),
RegisterClientResponse.class);
this.clientId = registerClientResponse.getClientId();
}
class ConsistentCoreClient…
private void registerWithLeader() {
RequestOrResponse request
= new RequestOrResponse(RequestId.RegisterClientRequest.getId(),
correlationId.incrementAndGet());
//blockingSend will attempt to create a new connection if there is a network error.
RequestOrResponse response = blockingSend(request);
RegisterClientResponse registerClientResponse
= JsonSerDes.deserialize(response.getMessageBodyJson(),
RegisterClientResponse.class);
this.clientId = registerClientResponse.getClientId();
}
当服务器接收到来自客户端的注册请求,它就给客户端分配一个唯一的 ID,如果服务器是一个一致性内核(Consistent Core),它可以先分配预写日志索引当做客户端标识符。
class ReplicatedKVStore…
private Map<Long, Session> clientSessions = new ConcurrentHashMap<>();
private RegisterClientResponse registerClient(WALEntry walEntry) {
Long clientId = walEntry.getEntryId();
//clientId to store client responses.
clientSessions.put(clientId, new Session(clock.nanoTime()));
return new RegisterClientResponse(clientId);
}
class ReplicatedKVStore…
private Map<Long, Session> clientSessions = new ConcurrentHashMap<>();
private RegisterClientResponse registerClient(WALEntry walEntry) {
Long clientId = walEntry.getEntryId();
//clientId to store client responses.
clientSessions.put(clientId, new Session(clock.nanoTime()));
return new RegisterClientResponse(clientId);
}
服务器会创建一个会话(session),以便为注册客户端的请求存储应答。它还会追踪会话的创建时间,这样,会话不起作用时,就可以把它丢弃了,这会在后面详细讨论。
class Session {
long lastAccessTimestamp;
Queue<Response> clientResponses = new ArrayDeque<>();
public Session(long lastAccessTimestamp) {
this.lastAccessTimestamp = lastAccessTimestamp;
}
public long getLastAccessTimestamp() {
return lastAccessTimestamp;
}
public Optional<Response> getResponse(int requestNumber) {
return clientResponses.stream().
filter(r -> requestNumber == r.getRequestNumber()).findFirst();
}
private static final int MAX_SAVED_RESPONSES = 5;
public void addResponse(Response response) {
if (clientResponses.size() == MAX_SAVED_RESPONSES) {
clientResponses.remove(); //remove the oldest request
}
clientResponses.add(response);
}
public void refresh(long nanoTime) {
this.lastAccessTimestamp = nanoTime;
}
}
class Session {
long lastAccessTimestamp;
Queue<Response> clientResponses = new ArrayDeque<>();
public Session(long lastAccessTimestamp) {
this.lastAccessTimestamp = lastAccessTimestamp;
}
public long getLastAccessTimestamp() {
return lastAccessTimestamp;
}
public Optional<Response> getResponse(int requestNumber) {
return clientResponses.stream().
filter(r -> requestNumber == r.getRequestNumber()).findFirst();
}
private static final int MAX_SAVED_RESPONSES = 5;
public void addResponse(Response response) {
if (clientResponses.size() == MAX_SAVED_RESPONSES) {
clientResponses.remove(); //remove the oldest request
}
clientResponses.add(response);
}
public void refresh(long nanoTime) {
this.lastAccessTimestamp = nanoTime;
}
}
对一个一致性内核而言,客户端的注册请求也要作为共识算法的一部分进行复制。如此一来,即便既有的领导者失效了,客户端的注册依然是可用的。对于后续的请求,服务器还要存储发送给客户端的应答。
幂等和非幂等请求
注意到一些请求的幂等属性是很重要的。比如说,在一个键值存储中,设置键值和值就天然是幂等的。即便同样的键值和值设置了多次,也不会产生什么问题。 另一方面,创建租约(Lease)却并不幂等。如果租约已经创建,再次尝试创建租约 的请求就会失败。这就是问题了。考虑一下这样一个场景。一个客户端发送请求创建租约,服务器成功地创建了租约,然后,崩溃了,或者是,应答发给客户端之前连接断开了。客户端会重新创建连接,重新尝试创建租约;因为服务端已经有了一个指定名称的租约,所以,它会返回错误。因此,客户端就会认为没有这个租约。显然,这并不是我们预期的行为。 有了幂等接收者,客户端会用同样的请求号发送租约请求。因为表示“请求已经处理过”的应答已经存在服务器上了,这个应答就可以直接返回。这样一来,如果是客户端在连接断开之前已经成功创建了租约,后续重试相同的请求时,它会得到应有的应答。
对于收到的每个非幂等请求(参见边栏),服务端成功执行之后,都会将应答存在客户端会话中。
class ReplicatedKVStore…
private Response applyRegisterLeaseCommand(WALEntry walEntry, RegisterLeaseCommand command) {
logger.info("Creating lease with id " + command.getName()
+ "with timeout " + command.getTimeout()
+ " on server " + getServer().getServerId());
try {
leaseTracker.addLease(command.getName(),
command.getTimeout());
Response success = Response.success(walEntry.getEntryId());
if (command.hasClientId()) {
Session session = clientSessions.get(command.getClientId());
session.addResponse(success.withRequestNumber(command.getRequestNumber()));
}
return success;
} catch (DuplicateLeaseException e) {
return Response.error(1, e.getMessage(), walEntry.getEntryId());
}
}
class ReplicatedKVStore…
private Response applyRegisterLeaseCommand(WALEntry walEntry, RegisterLeaseCommand command) {
logger.info("Creating lease with id " + command.getName()
+ "with timeout " + command.getTimeout()
+ " on server " + getServer().getServerId());
try {
leaseTracker.addLease(command.getName(),
command.getTimeout());
Response success = Response.success(walEntry.getEntryId());
if (command.hasClientId()) {
Session session = clientSessions.get(command.getClientId());
session.addResponse(success.withRequestNumber(command.getRequestNumber()));
}
return success;
} catch (DuplicateLeaseException e) {
return Response.error(1, e.getMessage(), walEntry.getEntryId());
}
}
客户端发送给服务器的每个请求里都包含客户端的标识符。客户端还保持了一个计数器,每个发送给服务器的请求都会分配到一个请求号。
class ConsistentCoreClient…
int nextRequestNumber = 1;
public void registerLease(String name, long ttl) {
RegisterLeaseRequest registerLeaseRequest
= new RegisterLeaseRequest(clientId, nextRequestNumber, name, ttl);
nextRequestNumber++; //increment request number for next request.
var serializedRequest = serialize(registerLeaseRequest);
logger.info("Sending RegisterLeaseRequest for " + name);
blockingSendWithRetries(serializedRequest);
}
private static final int MAX_RETRIES = 3;
private RequestOrResponse blockingSendWithRetries(RequestOrResponse request) {
for (int i = 0; i <= MAX_RETRIES; i++) {
try {
//blockingSend will attempt to create a new connection is there is no connection.
return blockingSend(request);
} catch (NetworkException e) {
resetConnectionToLeader();
logger.error("Failed sending request " + request + ". Try " + i, e);
}
}
throw new NetworkException("Timed out after " + MAX_RETRIES + " retries");
}
class ConsistentCoreClient…
int nextRequestNumber = 1;
public void registerLease(String name, long ttl) {
RegisterLeaseRequest registerLeaseRequest
= new RegisterLeaseRequest(clientId, nextRequestNumber, name, ttl);
nextRequestNumber++; //increment request number for next request.
var serializedRequest = serialize(registerLeaseRequest);
logger.info("Sending RegisterLeaseRequest for " + name);
blockingSendWithRetries(serializedRequest);
}
private static final int MAX_RETRIES = 3;
private RequestOrResponse blockingSendWithRetries(RequestOrResponse request) {
for (int i = 0; i <= MAX_RETRIES; i++) {
try {
//blockingSend will attempt to create a new connection is there is no connection.
return blockingSend(request);
} catch (NetworkException e) {
resetConnectionToLeader();
logger.error("Failed sending request " + request + ". Try " + i, e);
}
}
throw new NetworkException("Timed out after " + MAX_RETRIES + " retries");
}
服务器收到请求时,它会先检查来自同一个客户端给定的请求号是否已经处理过了。如果找到已保存的应答,它就会把相同的应答返回给客户端,而无需重新处理请求。
class ReplicatedKVStore…
private Response applyWalEntry(WALEntry walEntry) {
Command command = deserialize(walEntry);
if (command.hasClientId()) {
Session session = clientSessions.get(command.getClientId());
Optional<Response> savedResponse = session.getResponse(command.getRequestNumber());
if(savedResponse.isPresent()) {
return savedResponse.get();
} //else continue and execute this command.
}
class ReplicatedKVStore…
private Response applyWalEntry(WALEntry walEntry) {
Command command = deserialize(walEntry);
if (command.hasClientId()) {
Session session = clientSessions.get(command.getClientId());
Optional<Response> savedResponse = session.getResponse(command.getRequestNumber());
if(savedResponse.isPresent()) {
return savedResponse.get();
} //else continue and execute this command.
}
已保存的客户端请求过期处理
按客户端存储的请求不可能是永久保存的。有几种方式可以对请求进行过期处理。在 Raft 的参考实现中,客户端会保存一个单独的号码,以便记录成功收到应答的请求号。这个号码稍后会随着每个请求发送给服务器。这样,对于请求号小于这个号码的请求,服务器就可以安全地将其丢弃了。
如果客户端能够保证只在接收到上一个请求的应答之后,再发起下一个请求,那么,服务器端一旦接收到来自这个客户端的请求,就可以放心地删除之前所有的请求。使用请求管道(Request Pipeline)还会有个问题,可能有在途(in-flight)请求存在,也就是客户端没有收到应答。如果服务器端知道客户端能够接受的在途请求的最大数量,它就可以保留那么多的应答,删除其它的应答。比如说,kafka 的 producer 能够接受的最大在途请求数量是 5 个,因此,它最多保存 5 个之前的请求。
class Session…
private static final int MAX_SAVED_RESPONSES = 5;
public void addResponse(Response response) {
if (clientResponses.size() == MAX_SAVED_RESPONSES) {
clientResponses.remove(); //remove the oldest request
}
clientResponses.add(response);
}
class Session…
private static final int MAX_SAVED_RESPONSES = 5;
public void addResponse(Response response) {
if (clientResponses.size() == MAX_SAVED_RESPONSES) {
clientResponses.remove(); //remove the oldest request
}
clientResponses.add(response);
}
删除已注册的客户端
客户端的会话也不会在服务器上永久保存。一个服务器会对其存储的客户端会话有一个最大保活时间。客户端周期性地发送心跳(HeartBeat)。如果在保活时间内没有收到心跳,服务器上客户端的状态就会被删除掉。
服务器会启动一个定时任务,周期性地检查是否有过期会话,删除已过期的会话。
class ReplicatedKVStore…
private long heartBeatIntervalMs = TimeUnit.SECONDS.toMillis(10);
private long sessionTimeoutNanos = TimeUnit.MINUTES.toNanos(5);
private void startSessionCheckerTask() {
scheduledTask = executor.scheduleWithFixedDelay(()->{
removeExpiredSession();
}, heartBeatIntervalMs, heartBeatIntervalMs, TimeUnit.MILLISECONDS);
}
private void removeExpiredSession() {
long now = System.nanoTime();
for (Long clientId : clientSessions.keySet()) {
Session session = clientSessions.get(clientId);
long elapsedNanosSinceLastAccess = now - session.getLastAccessTimestamp();
if (elapsedNanosSinceLastAccess > sessionTimeoutNanos) {
clientSessions.remove(clientId);
}
}
}
class ReplicatedKVStore…
private long heartBeatIntervalMs = TimeUnit.SECONDS.toMillis(10);
private long sessionTimeoutNanos = TimeUnit.MINUTES.toNanos(5);
private void startSessionCheckerTask() {
scheduledTask = executor.scheduleWithFixedDelay(()->{
removeExpiredSession();
}, heartBeatIntervalMs, heartBeatIntervalMs, TimeUnit.MILLISECONDS);
}
private void removeExpiredSession() {
long now = System.nanoTime();
for (Long clientId : clientSessions.keySet()) {
Session session = clientSessions.get(clientId);
long elapsedNanosSinceLastAccess = now - session.getLastAccessTimestamp();
if (elapsedNanosSinceLastAccess > sessionTimeoutNanos) {
clientSessions.remove(clientId);
}
}
}
示例
Raft 有一个实现了幂等性的参考实现,提供了线性一致性的行为。
Kafka 有一个幂等 Producer,允许客户端重试请求,忽略重复的请求。
ZooKeeper 有 Session 的概念,还有 zxid,用于客户端恢复。HBase 有一个 hbase-recoverable-zookeeper 的封装,它实现了遵循 zookeeper-error-handling 指导的幂等的行为。