Skip to content

请求管道(Request Pipeline)

原文

https://martinfowler.com/articles/patterns-of-distributed-systems/request-pipeline.html

在连接上发送多个请求,而无需等待之前请求的应答,以此改善延迟。

2020.8.20

问题

在集群里服务器间使用单一 Socket 通道(Single Socket Channel)进行通信,如果一个请求需要等到之前请求对应应答的返回,这种做法可能会导致性能问题。为了达到更好的吞吐和延迟,服务端的请求队列应该充分填满,确保服务器容量得到完全地利用。比如,当服务器端使用了单一更新队列(Singular Update Queue),只要队列未填满,就可以继续接收更多的请求。如果只是一次只发一个请求,大多数服务器容量就毫无必要地浪费了。

解决方案

节点向另外的节点发送请求,无需等待之前请求的应答。只要创建两个单独的线程就可以做到,一个在网络通道上发送请求,一个从网络通道上接受应答。

请求管道

发送者节点通过 socket 通道发送请求,无需等待应答。

java
class SingleSocketChannel
  public void sendOneWay(RequestOrResponse request) throws IOException {
      var dataStream = new DataOutputStream(socketOutputStream);
      byte[] messageBytes = serialize(request);
      dataStream.writeInt(messageBytes.length);
      dataStream.write(messageBytes);
  }
class SingleSocketChannel
  public void sendOneWay(RequestOrResponse request) throws IOException {
      var dataStream = new DataOutputStream(socketOutputStream);
      byte[] messageBytes = serialize(request);
      dataStream.writeInt(messageBytes.length);
      dataStream.write(messageBytes);
  }

启动一个单独的线程用以读取应答。

java
class ResponseThread
  class ResponseThread extends Thread implements Logging {
      private volatile boolean isRunning = false;
      private SingleSocketChannel socketChannel;

      public ResponseThread(SingleSocketChannel socketChannel) {
          this.socketChannel = socketChannel;
      }

      @Override
      public void run() {
          try {
              isRunning = true;
              logger.info("Starting responder thread = " + isRunning);
              while (isRunning) {
                  doWork();
              }
          } catch (IOException e) {
              e.printStackTrace();
              getLogger().error(e); //thread exits if stopped or there is IO error
          }
      }

      public void doWork() throws IOException {
          RequestOrResponse response = socketChannel.read();
          logger.info("Read Response = " + response);
          processResponse(response);
      }
class ResponseThread
  class ResponseThread extends Thread implements Logging {
      private volatile boolean isRunning = false;
      private SingleSocketChannel socketChannel;

      public ResponseThread(SingleSocketChannel socketChannel) {
          this.socketChannel = socketChannel;
      }

      @Override
      public void run() {
          try {
              isRunning = true;
              logger.info("Starting responder thread = " + isRunning);
              while (isRunning) {
                  doWork();
              }
          } catch (IOException e) {
              e.printStackTrace();
              getLogger().error(e); //thread exits if stopped or there is IO error
          }
      }

      public void doWork() throws IOException {
          RequestOrResponse response = socketChannel.read();
          logger.info("Read Response = " + response);
          processResponse(response);
      }

应答处理器可以理解处理应答,或是将它提交到单一更新队列里(Singular Update Queue)

请求管道有两个问题需要处理。

如果无需等待应答,请求持续发送,接收请求的节点就可能会不堪重负。有鉴于此,一般会有一个上限,也就是一次可以有多少在途请求。任何一个节点都可以发送最大数量的请求给其它节点。一旦发出且未收到应答的请求数量达到最大值,再发送请求就不能再接收了,发送者就要阻塞住了。限制最大在途请求,一个非常简单的策略就是,用一个阻塞队列来跟踪请求。队列可以用可接受的最大在途请求数量进行初始化。一旦接收到一个请求的应答,就从队列中把它移除,为更多的请求创造空间。在下面的代码中,每个 socket 连接接收的最大请求数量是 5 个。

java
class RequestLimitingPipelinedConnection
  private final Map<InetAddressAndPort, ArrayBlockingQueue<RequestOrResponse>> inflightRequests = new ConcurrentHashMap<>();
  private int maxInflightRequests = 5;

  public void send(InetAddressAndPort to, RequestOrResponse request) throws InterruptedException {
      ArrayBlockingQueue<RequestOrResponse> requestsForAddress = inflightRequests.get(to);
      if (requestsForAddress == null) {
          requestsForAddress = new ArrayBlockingQueue<>(maxInflightRequests);
          inflightRequests.put(to, requestsForAddress);
      }
      requestsForAddress.put(request);
class RequestLimitingPipelinedConnection
  private final Map<InetAddressAndPort, ArrayBlockingQueue<RequestOrResponse>> inflightRequests = new ConcurrentHashMap<>();
  private int maxInflightRequests = 5;

  public void send(InetAddressAndPort to, RequestOrResponse request) throws InterruptedException {
      ArrayBlockingQueue<RequestOrResponse> requestsForAddress = inflightRequests.get(to);
      if (requestsForAddress == null) {
          requestsForAddress = new ArrayBlockingQueue<>(maxInflightRequests);
          inflightRequests.put(to, requestsForAddress);
      }
      requestsForAddress.put(request);

一旦接收到应答,请求就从在途请求中移除。

java
class RequestLimitingPipelinedConnection
  private void consume(SocketRequestOrResponse response) {
      Integer correlationId = response.getRequest().getCorrelationId();
      Queue<RequestOrResponse> requestsForAddress = inflightRequests.get(response.getAddress());
      RequestOrResponse first = requestsForAddress.peek();
      if (correlationId != first.getCorrelationId()) {
          throw new RuntimeException("First response should be for the first request");
      }
      requestsForAddress.remove(first);
      responseConsumer.accept(response.getRequest());
  }
class RequestLimitingPipelinedConnection
  private void consume(SocketRequestOrResponse response) {
      Integer correlationId = response.getRequest().getCorrelationId();
      Queue<RequestOrResponse> requestsForAddress = inflightRequests.get(response.getAddress());
      RequestOrResponse first = requestsForAddress.peek();
      if (correlationId != first.getCorrelationId()) {
          throw new RuntimeException("First response should be for the first request");
      }
      requestsForAddress.remove(first);
      responseConsumer.accept(response.getRequest());
  }

处理失败,以及要维护顺序的保证,这些都会让实现变得比较诡异。比如,有两个在途请求。第一个请求失败,然后,重试了,服务器在重试的第一个请求到达服务器之前,已经把第二个请求处理了。服务器需要有一些机制,确保拒绝掉乱序的请求。否则,如果有失败和重试的情况,就会存在消息重排序的风险。比如,Raft 总是发送之前的日志索引,我们会预期,每个日志条目都会有这么个索引。如果之前的日志索引无法匹配,服务器就会拒绝掉这个请求。Kafka 允许 max.in.flight.requests.per.connection 大于 1,还有幂等的 Producer 实现,它会给发送到 Broker 的每个消息批次分配一个唯一标识符。Broker 可以检查进来的请求序列号,如果这边的请求已经乱序,则拒绝掉新请求。

示例

所有像ZabRaft这样的共识算法都支持请求通道。

Kafka鼓励客户端使用请求通道来改善吞吐。

Released under the MIT License. Forked from dreamhead/patterns-of-distributed-systems