技術文章:分布式系統(tǒng)模式10-Request Pipeline
作者: Unmesh Joshi
譯者: java達人
在連接上發(fā)送多個請求而不等待前一個請求的響應,從而減少延遲。
問題
如果請求需要等待對前一個請求的響應,使用單一套接字通道在集群服務器之間通信可能會導致性能問題。為了達到更好的吞吐量和更少的延遲,服務器上的請求隊列應該被填滿,以確保服務器容量得到充分利用。例如,當服務器使用Singular Update Queue,處理一個請求時,它總是可以接受更多的請求,直到隊列滿為止。如果一次只發(fā)送一個請求,服務器的大部分容量都被不必要地浪費了。
解決方案
節(jié)點向其他節(jié)點發(fā)送請求,而不等待以前請求的響應。這是通過創(chuàng)建兩個獨立的線程來實現(xiàn)的,一個用于通過網(wǎng)絡通道發(fā)送請求,另一個用于從網(wǎng)絡通道接收響應。
發(fā)送方節(jié)點通過套接字通道發(fā)送請求,而不等待響應。
class SingleSocketChannel…
public void sendOneWay(RequestOrResponse request) throws IOException { var dataStream = new DataOutputStream(socketOutputStream); byte[] messageBytes = serialize(request); dataStream.writeInt(messageBytes.length); dataStream.write(messageBytes); }
啟動一個單獨的線程來讀取響應。
class ResponseThread…
class ResponseThread extends Thread implements Logging { private volatile boolean isRunning = false; private SingleSocketChannel socketChannel;
public ResponseThread(SingleSocketChannel socketChannel) { this.socketChannel = socketChannel; }
@Override public void run() { try { isRunning = true; logger.info("Starting responder thread = " + isRunning); while (isRunning) { doWork(); }
} catch (IOException e) { getLogger().error(e); //thread exits if stopped or there is IO error } }
public void doWork() throws IOException { RequestOrResponse response = socketChannel.read(); logger.info("Read Response = " + response); processResponse(response); }
響應處理程序可以立即處理響應或將其提交到單一更新隊列
請求管道有兩個問題需要處理。
如果在不等待響應的情況下連續(xù)發(fā)送請求,則接受請求的節(jié)點可能會不堪重負。由于這個原因,對于一次可以保持的請求數(shù)量有一個上限。任何節(jié)點都可以向其他節(jié)點發(fā)送最大數(shù)量的請求。一旦發(fā)送了最大數(shù)量的執(zhí)行中請求而沒有收到響應,就不會接受更多的請求,發(fā)送方將被阻塞。限制最大數(shù)量執(zhí)行中請求的一個非常簡單的策略是保持一個阻塞隊列來跟蹤請求。隊列由請求數(shù)量參數(shù)進行初始化。一旦接收到請求的響應,就會從隊列中刪除它,以便為更多請求騰出空間。如下面的代碼所示,每個套接字連接最多可接受五個執(zhí)行中請求。
class RequestLimitingPipelinedConnection…
private final Map<inetaddressandport, arrayblockingqueue
一旦收到響應,該請求將從執(zhí)行中請求隊列中刪除。
class RequestLimitingPipelinedConnection…
private void consume(SocketRequestOrResponse response) { Integer correlationId = response.getRequest().getCorrelationId(); Queue
處理故障和維護順序保證的實現(xiàn)比較棘手。假設有兩個正在運行的請求。第一個請求失敗并重試,服務器可能在重試的第一個請求到達服務器之前已經(jīng)處理了第二個請求。服務器需要某種機制來確保錯誤的請求被拒絕。否則,在失敗和重試的情況下,總是有消息被重新排序的風險。例如,Raft總是發(fā)送每個日志條目所期望的前一個日志索引。如果前一個日志索引不匹配,服務器拒絕請求。Kafka可以允許max.in.flight.requests.per.connection 的值大于1,使用冪等生產(chǎn)者實現(xiàn),該實現(xiàn)為發(fā)送給broker的每個消息批次分配唯一標識符。然后,broker可以檢查傳入請求的序列號,并在請求亂序時拒絕該請求。
例子
? 所有的共識算法如Zab和Raft都允許request pipeline支持。
? Kafka鼓勵客戶使用request pipeline來提高吞吐量。

請輸入評論內(nèi)容...
請輸入評論/評論長度6~500個字
最新活動更多
推薦專題
- 1 AI 眼鏡讓百萬 APP「集體失業(yè)」?
- 2 大廠紛紛入局,百度、阿里、字節(jié)搶奪Agent話語權
- 3 深度報告|中國AI產(chǎn)業(yè)正在崛起成全球力量,市場潛力和關鍵挑戰(zhàn)有哪些?
- 4 上海跑出80億超級獨角獸:獲上市公司戰(zhàn)投,干人形機器人
- 5 國家數(shù)據(jù)局局長劉烈宏調研格創(chuàng)東智
- 6 下一代入口之戰(zhàn):大廠為何紛紛押注智能體?
- 7 百億AI芯片訂單,瘋狂傾銷中東?
- 8 Robotaxi新消息密集釋放,量產(chǎn)元年誰在領跑?
- 9 格斗大賽出圈!人形機器人致命短板曝光:頭腦過于簡單
- 10 一文看懂視覺語言動作模型(VLA)及其應用