ps-lite代碼解析

花了一個下午,重新閱讀了ps-lite的代碼,把以前沒有想通的問題,差不多都想通了,各位想了解ps-lite可以結合代碼加上我這篇文章來看, 應該會有一些理解


Postoffice

Postoffice是個單例類,是整個ps-lite的核心,相當於整個ps-lite的調控中心,包括對調起Van負責整個網絡的拉起、通信、命令管理如增加節點、移除節點、恢復節點等等;整個集群基本信息的管理,比如worker、server數的獲取、server端feature分佈的獲取、worker/server Rank與node id的互轉、節點角色身份等等;

Van 和ZMQVan: 網絡如何被構建, 如何通信

Van是ps-lite的一個基類, 實現了基礎的公共函數,ZMQVan是基於zeromq的Van的實現,Van是整個Parameter Server的通信模塊;在整個訓練任務的生命週期中,有以下幾點值得注意:

  1. 任務啟動時,所有nodes,發送消息到scheduler,;
  2. 啟動好scheduler後, worker與server會互相連接,注意worker之間、server之間不會連接;
  3. 框架運行過程中,通信中包括以下多種信息類型,如數據信息:worker向server更新梯度、心跳信息:worker/server向scheduler發送心跳、server和worker的連接、scheduler端的處理命令:如添加節點、恢復dead節點等等;
  4. Message中的Meta,如是否request、app_id、timestamp、nodes的ip、port、role等等在網絡通信過程中會打包成protobuf,減少通信壓力;
  5. 在節點掛掉(心跳時間內沒回應)會恢復節點,這個過程中會將掛掉節點的id賦給恢復的節點;

Customer 消息如何被處理

Customer主要是request、response,比如新建一次request,會返回一個timestamp,這個timestamp會作為這次request的id,每次請求會自增1,相應的res也會自增1,調用wait時會保證 後續比如做Wait以此為ID識別,tracker_是Customer內用來記錄request(使用request id)和對應的response的次數的一個map;recv_handle_綁定Customer接收到request後的處理函數(SimpleApp::Process);Customer會新拉起一個線程,用於在customer生命週期內,使用recv_handle_來處理接受的請求,這裡是使用了一個線程安全隊列,Accept()用於往隊列中一直髮送消息,對於Worker,比如KVWorker,recv_handle_保存拉取的msg中的數據,對於Server,需要使用set_request_handle來設置對應的處理函數,如KVServerDefaultHandle,使用std::unordered_map store保存server的參數,當請求為push時,對store參數做更新,請求為pull時對參數進行拉取;

Message: 消息數據結構

Message封裝包括Meta, Control, Node等消息, 其中data的部分,採用的SArray這個數據結構可以理解為一個零拷貝的vector,能兼容vector的數據結構,另外為了保證高效會對Message裡的Meta,Control,Node使用protobuf來打包,這裡有個疑問,為啥不會數據比如推送的梯度信息用protobuf打包呢?

PS如何構建網絡

1. scheduler節點拉起;2. worker、server節點想scheduler發送請求,彙報ip、端口等等,scheduler分配node id給相應節點;3. 所有節點啟動後,scheduler發送消息周知;4. 所有節點內部啟動線程,拉起心跳線程, scheduler會來處理相應的命令;

同步操作

ps-lite裡面有兩個涉及到等待同步的地方:1. Worker pull時是異步操作,通常調用Wait來調用Customer::WaitRequest()來保證customer裡面的request和response兩者相等,即保證Pull完成後再做其他操作;2. 另外在一個worker內,可以存在多個Customer,當第一個發送barrier後,scheduler接收到request請求,然後根據msg判斷是request,然後,向barrier_group裡的所有node,node接到後, Postoffice::Get()->Manage(*msg)將barrier_done_中的customer_id對應的bool置true,完成同步操作,這裡貌似沒有我們常說的asp、bsp、ssp,可以通過增加相應的Command來完成;3. 當構建節點連接時,也可以進行一個barrier;4. 更復雜的比如Asp,bsp,ssp可以通過發送新定Command來完成

<code>void Van::ProcessBarrierCommand(Message* msg) {
auto& ctrl = msg->meta.control;
if (msg->meta.request) {
if (barrier_count_.empty()) {
barrier_count_.resize(8, 0);
}
int group = ctrl.barrier_group;
++barrier_count_[group];
PS_VLOG(1) << "Barrier count for " << group << " : " << barrier_count_[group];
if (barrier_count_[group] ==
static_cast
(Postoffice::Get()->GetNodeIDs(group).size())) {
barrier_count_[group] = 0;
Message res;
res.meta.request = false;
res.meta.app_id = msg->meta.app_id;
res.meta.customer_id = msg->meta.customer_id;
res.meta.control.cmd = Control::BARRIER;
for (int r : Postoffice::Get()->GetNodeIDs(group)) {
int recver_id = r;
if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
res.meta.recver = recver_id;
res.meta.timestamp = timestamp_++;
CHECK_GT(Send(res), 0);
}
}
}
} else {
Postoffice::Get()->Manage(*msg);
}
}
/<code>

SampleApp

一個基類,封裝了基本的PS app的操作,KVWorker、KVServer集成SampleApp,完成相應邏輯: 如push、pull、key的切片等等;


分享到:


相關文章: