訂閱
糾錯(cuò)
加入自媒體

技術(shù)文章:分布式系統(tǒng)模式10-Request Pipeline

作者: Unmesh Joshi

譯者: java達(dá)人

在連接上發(fā)送多個(gè)請(qǐng)求而不等待前一個(gè)請(qǐng)求的響應(yīng),從而減少延遲。

問題

如果請(qǐng)求需要等待對(duì)前一個(gè)請(qǐng)求的響應(yīng),使用單一套接字通道在集群服務(wù)器之間通信可能會(huì)導(dǎo)致性能問題。為了達(dá)到更好的吞吐量和更少的延遲,服務(wù)器上的請(qǐng)求隊(duì)列應(yīng)該被填滿,以確保服務(wù)器容量得到充分利用。例如,當(dāng)服務(wù)器使用Singular Update Queue,處理一個(gè)請(qǐng)求時(shí),它總是可以接受更多的請(qǐng)求,直到隊(duì)列滿為止。如果一次只發(fā)送一個(gè)請(qǐng)求,服務(wù)器的大部分容量都被不必要地浪費(fèi)了。

解決方案

節(jié)點(diǎn)向其他節(jié)點(diǎn)發(fā)送請(qǐng)求,而不等待以前請(qǐng)求的響應(yīng)。這是通過創(chuàng)建兩個(gè)獨(dú)立的線程來實(shí)現(xiàn)的,一個(gè)用于通過網(wǎng)絡(luò)通道發(fā)送請(qǐng)求,另一個(gè)用于從網(wǎng)絡(luò)通道接收響應(yīng)。

發(fā)送方節(jié)點(diǎn)通過套接字通道發(fā)送請(qǐng)求,而不等待響應(yīng)。

class SingleSocketChannel…
 public void sendOneWay(RequestOrResponse request) throws IOException {      var dataStream = new DataOutputStream(socketOutputStream);      byte[] messageBytes = serialize(request);      dataStream.writeInt(messageBytes.length);      dataStream.write(messageBytes);  }

啟動(dòng)一個(gè)單獨(dú)的線程來讀取響應(yīng)。

class ResponseThread…
 class ResponseThread extends Thread implements Logging {      private volatile boolean isRunning = false;      private SingleSocketChannel socketChannel;
     public ResponseThread(SingleSocketChannel socketChannel) {          this.socketChannel = socketChannel;      }
     @Override      public void run() {          try {              isRunning = true;              logger.info("Starting responder thread = " + isRunning);              while (isRunning) {                  doWork();              }
         } catch (IOException e) {              getLogger().error(e); //thread exits if stopped or there is IO error          }      }
     public void doWork() throws IOException {          RequestOrResponse response = socketChannel.read();          logger.info("Read Response = " + response);          processResponse(response);      }

響應(yīng)處理程序可以立即處理響應(yīng)或?qū)⑵涮峤坏絾我桓玛?duì)列

請(qǐng)求管道有兩個(gè)問題需要處理。

如果在不等待響應(yīng)的情況下連續(xù)發(fā)送請(qǐng)求,則接受請(qǐng)求的節(jié)點(diǎn)可能會(huì)不堪重負(fù)。由于這個(gè)原因,對(duì)于一次可以保持的請(qǐng)求數(shù)量有一個(gè)上限。任何節(jié)點(diǎn)都可以向其他節(jié)點(diǎn)發(fā)送最大數(shù)量的請(qǐng)求。一旦發(fā)送了最大數(shù)量的執(zhí)行中請(qǐng)求而沒有收到響應(yīng),就不會(huì)接受更多的請(qǐng)求,發(fā)送方將被阻塞。限制最大數(shù)量執(zhí)行中請(qǐng)求的一個(gè)非常簡單的策略是保持一個(gè)阻塞隊(duì)列來跟蹤請(qǐng)求。隊(duì)列由請(qǐng)求數(shù)量參數(shù)進(jìn)行初始化。一旦接收到請(qǐng)求的響應(yīng),就會(huì)從隊(duì)列中刪除它,以便為更多請(qǐng)求騰出空間。如下面的代碼所示,每個(gè)套接字連接最多可接受五個(gè)執(zhí)行中請(qǐng)求。

class RequestLimitingPipelinedConnection…
 private final Map<inetaddressandport, arrayblockingqueue

一旦收到響應(yīng),該請(qǐng)求將從執(zhí)行中請(qǐng)求隊(duì)列中刪除。

class RequestLimitingPipelinedConnection…
 private void consume(SocketRequestOrResponse response) {      Integer correlationId = response.getRequest().getCorrelationId();      Queue

處理故障和維護(hù)順序保證的實(shí)現(xiàn)比較棘手。假設(shè)有兩個(gè)正在運(yùn)行的請(qǐng)求。第一個(gè)請(qǐng)求失敗并重試,服務(wù)器可能在重試的第一個(gè)請(qǐng)求到達(dá)服務(wù)器之前已經(jīng)處理了第二個(gè)請(qǐng)求。服務(wù)器需要某種機(jī)制來確保錯(cuò)誤的請(qǐng)求被拒絕。否則,在失敗和重試的情況下,總是有消息被重新排序的風(fēng)險(xiǎn)。例如,Raft總是發(fā)送每個(gè)日志條目所期望的前一個(gè)日志索引。如果前一個(gè)日志索引不匹配,服務(wù)器拒絕請(qǐng)求。Kafka可以允許max.in.flight.requests.per.connection 的值大于1,使用冪等生產(chǎn)者實(shí)現(xiàn),該實(shí)現(xiàn)為發(fā)送給broker的每個(gè)消息批次分配唯一標(biāo)識(shí)符。然后,broker可以檢查傳入請(qǐng)求的序列號(hào),并在請(qǐng)求亂序時(shí)拒絕該請(qǐng)求。

例子

? 所有的共識(shí)算法如Zab和Raft都允許request pipeline支持。

? Kafka鼓勵(lì)客戶使用request pipeline來提高吞吐量。

聲明: 本文由入駐維科號(hào)的作者撰寫,觀點(diǎn)僅代表作者本人,不代表OFweek立場。如有侵權(quán)或其他問題,請(qǐng)聯(lián)系舉報(bào)。

發(fā)表評(píng)論

0條評(píng)論,0人參與

請(qǐng)輸入評(píng)論內(nèi)容...

請(qǐng)輸入評(píng)論/評(píng)論長度6~500個(gè)字

您提交的評(píng)論過于頻繁,請(qǐng)輸入驗(yàn)證碼繼續(xù)

  • 看不清,點(diǎn)擊換一張  刷新

暫無評(píng)論

暫無評(píng)論

    掃碼關(guān)注公眾號(hào)
    OFweek人工智能網(wǎng)
    獲取更多精彩內(nèi)容
    文章糾錯(cuò)
    x
    *文字標(biāo)題:
    *糾錯(cuò)內(nèi)容:
    聯(lián)系郵箱:
    *驗(yàn) 證 碼:

    粵公網(wǎng)安備 44030502002758號(hào)