03.03 僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

MapReduce 是一種用於大規模數據集的並行運算編程模型

分為 Map(映射)和 Reduce(歸約)兩個步驟。

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

Py2 時代,map() 和 reduce() 都是標準函數。

不知為何,Py3 把 reduce() 藏到了標準模塊 functools 中,只保留了 map() 在標準函數庫內。

藉助於 Python 的標準進程模塊(不熟悉進程模塊的讀者),以及map() 和 reduce() 函數,我們可以非常容易地搭建一個 MapReduce 框架。

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

下面,我們以計算整數列表各元素的平方和為例,演示 MapReduce 並行運算編程模型。

<code># -*- coding: utf-8 -*-from functools import reduceimport multiprocessing as mpdef power(x): # 返回x的平方    return pow(x, 2)def adder(a, b): # 求和    return a + bdef map_reduce(dataset, processes=4): # 默認進程池最多4個進程    with mp.Pool(processes=processes) as mpp: # 創建進程池mpp        d = mpp.map(power, dataset) # 使用進程池Pool的映射方法,計算整數列表各元素的平方,返回列表        result = reduce(adder, d, 0) # 規約計算結果        return result # 返回計算結果if __name__ == '__main__':    dataset = range(10000) # 生成[0,10000)區間的整數列表(實際是生成器)作為數據集    print(map_reduce(dataset))/<code>

一個聽起來非常繁瑣的概念,我們只用了十幾行代碼就講清楚了。

事實上,如果將求和改用lambda 函數,這個代碼還可以寫得更簡潔。

需要說明一點:不能在進程池的 map() 方法中使用lambda 函數,因為在 Pool 實例化之前就應定義好這個函數——這就是所謂的可序列化(pickle)。

<code>from functools import reduceimport multiprocessing as mpdef power(x):    return pow(x, 2)if __name__ == '__main__':    with mp.Pool(processes=4) as mpp:        print(reduce(lambda a,b:a+b, mpp.map(power, range(10000)), 0))/<code>

最終,我們僅用7行Python代碼,就完美地演繹了MapReduce並行運算編程模型。

Process 類是 multiprocessing 模塊的子進程類,用於創建、啟動和管理子進程。

Process 和線程模塊 treading.Thread 的 API 幾乎完全相同。

Process 類用來描述一個進程對象。

創建子進程的時候,只需要傳入進程函數和函數的參數即可完成 Process 實例化。

下面這段代碼,主進程啟動了兩個子進程,然後等待用戶的鍵盤輸入以結束程序。主進程結束後,子進程也隨之結束。

<code># -*- coding: utf-8 -*-import os, timeimport multiprocessing as mpdef sub_process(name, delay):    """進程函數"""        while True:        time.sleep(delay)        print('我是子進程%s,進程id為%s'%(name, os.getpid()))if __name__ == '__main__':    print('主進程(%s)開始,按任意鍵結束本程序'%os.getpid())        p_a = mp.Process(target=sub_process, args=('A', 1))    p_a.daemon = True  # 設置子進程為守護進程    p_a.start()        p_b = mp.Process(target=sub_process, args=('B', 2))    p_b.daemon = True  # 如果子進程不是守護進程,主進程結束後子進程可能成為殭屍進程    p_b.start()            input() # 利用input函數阻塞主進程。這是常用的調試手段之一。/<code>

和線程一樣,處理並行任務時,處理效率和進程數量並不總是成正比——當進程數量超過一定限度後,完成任務所需時間反倒會延長。

進程池提供了一個保持合理進程數量的方案,但合理進程數量則需要根據硬件狀況及運行狀況來確定。

<code>multiprocessing.Pool(n) 用於創建n個進程的進程池供用戶調用。/<code> 

如果進程池任務不滿,則新的進程請求會被立即執行;

如果進程池任務已滿,則新的請求將等待至有可用進程才被執行。向進程池提交任務有兩種方式:

<code>apply_async(func[, args[, kwds[, callback]]])/<code>

非阻塞式提交。即使進程池已滿,也會接受新的任務,不會阻塞主進程。

新任務將處於等待狀態。

<code>apply(func[, args[, kwds]])/<code>

阻塞式提交。

若進程池已滿,則主進程阻塞,直至有空閒進成可用

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

下面的代碼,演示了進程池的典型用法。讀者可自行嘗試阻塞式提交和非阻塞式提交兩種方法的差異。

<code># -*- coding: utf-8 -*-import timeimport multiprocessing as mpdef power(x, a=2):    """進程函數:冪函數"""        time.sleep(1)    print('%d的%d次方等於%d'%(x, a, pow(x, a)))def demo():    mpp = mp.Pool(processes=4)        for item in [2,3,4,5,6,7,8,9]:        mpp.apply_async(power, (item, )) # 非阻塞式提交新任務        #mpp.apply(power, (item, )) # 阻塞式提交新任務        mpp.close() # 關閉進程池,意味著不再接受新的任務    print('主進程走到這裡,正在等待子進程結束')    mpp.join() # 等待所有子進程結束    print('程序結束')if __name__ == '__main__':
demo()/<code>

最後小編幫助大家整理了一套python教程,下面展示了部分,希望也能幫助對編程感興趣,想做數據分析,人工智能、爬蟲或者希望從事編程開發的小夥伴,畢竟python工資也還可以,如果能幫到你請點贊、點贊、點贊。

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

僅用7行Python代碼,完美演繹MapReduce並行運算編程模型

PS:如果你喜歡python,並覺得這篇文章對你有益的話,麻煩多多點贊關注支持!!!!


分享到:


相關文章: