03.03 仅用7行Python代码,完美演绎MapReduce并行运算编程模型

MapReduce 是一种用于大规模数据集的并行运算编程模型

分为 Map(映射)和 Reduce(归约)两个步骤。

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

Py2 时代,map() 和 reduce() 都是标准函数。

不知为何,Py3 把 reduce() 藏到了标准模块 functools 中,只保留了 map() 在标准函数库内。

借助于 Python 的标准进程模块(不熟悉进程模块的读者),以及map() 和 reduce() 函数,我们可以非常容易地搭建一个 MapReduce 框架。

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

下面,我们以计算整数列表各元素的平方和为例,演示 MapReduce 并行运算编程模型。

<code># -*- coding: utf-8 -*-from functools import reduceimport multiprocessing as mpdef power(x): # 返回x的平方    return pow(x, 2)def adder(a, b): # 求和    return a + bdef map_reduce(dataset, processes=4): # 默认进程池最多4个进程    with mp.Pool(processes=processes) as mpp: # 创建进程池mpp        d = mpp.map(power, dataset) # 使用进程池Pool的映射方法,计算整数列表各元素的平方,返回列表        result = reduce(adder, d, 0) # 规约计算结果        return result # 返回计算结果if __name__ == '__main__':    dataset = range(10000) # 生成[0,10000)区间的整数列表(实际是生成器)作为数据集    print(map_reduce(dataset))/<code>

一个听起来非常繁琐的概念,我们只用了十几行代码就讲清楚了。

事实上,如果将求和改用lambda 函数,这个代码还可以写得更简洁。

需要说明一点:不能在进程池的 map() 方法中使用lambda 函数,因为在 Pool 实例化之前就应定义好这个函数——这就是所谓的可序列化(pickle)。

<code>from functools import reduceimport multiprocessing as mpdef power(x):    return pow(x, 2)if __name__ == '__main__':    with mp.Pool(processes=4) as mpp:        print(reduce(lambda a,b:a+b, mpp.map(power, range(10000)), 0))/<code>

最终,我们仅用7行Python代码,就完美地演绎了MapReduce并行运算编程模型。

Process 类是 multiprocessing 模块的子进程类,用于创建、启动和管理子进程。

Process 和线程模块 treading.Thread 的 API 几乎完全相同。

Process 类用来描述一个进程对象。

创建子进程的时候,只需要传入进程函数和函数的参数即可完成 Process 实例化。

下面这段代码,主进程启动了两个子进程,然后等待用户的键盘输入以结束程序。主进程结束后,子进程也随之结束。

<code># -*- coding: utf-8 -*-import os, timeimport multiprocessing as mpdef sub_process(name, delay):    """进程函数"""        while True:        time.sleep(delay)        print('我是子进程%s,进程id为%s'%(name, os.getpid()))if __name__ == '__main__':    print('主进程(%s)开始,按任意键结束本程序'%os.getpid())        p_a = mp.Process(target=sub_process, args=('A', 1))    p_a.daemon = True  # 设置子进程为守护进程    p_a.start()        p_b = mp.Process(target=sub_process, args=('B', 2))    p_b.daemon = True  # 如果子进程不是守护进程,主进程结束后子进程可能成为僵尸进程    p_b.start()            input() # 利用input函数阻塞主进程。这是常用的调试手段之一。/<code>

和线程一样,处理并行任务时,处理效率和进程数量并不总是成正比——当进程数量超过一定限度后,完成任务所需时间反倒会延长。

进程池提供了一个保持合理进程数量的方案,但合理进程数量则需要根据硬件状况及运行状况来确定。

<code>multiprocessing.Pool(n) 用于创建n个进程的进程池供用户调用。/<code> 

如果进程池任务不满,则新的进程请求会被立即执行;

如果进程池任务已满,则新的请求将等待至有可用进程才被执行。向进程池提交任务有两种方式:

<code>apply_async(func[, args[, kwds[, callback]]])/<code>

非阻塞式提交。即使进程池已满,也会接受新的任务,不会阻塞主进程。

新任务将处于等待状态。

<code>apply(func[, args[, kwds]])/<code>

阻塞式提交。

若进程池已满,则主进程阻塞,直至有空闲进成可用

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

下面的代码,演示了进程池的典型用法。读者可自行尝试阻塞式提交和非阻塞式提交两种方法的差异。

<code># -*- coding: utf-8 -*-import timeimport multiprocessing as mpdef power(x, a=2):    """进程函数:幂函数"""        time.sleep(1)    print('%d的%d次方等于%d'%(x, a, pow(x, a)))def demo():    mpp = mp.Pool(processes=4)        for item in [2,3,4,5,6,7,8,9]:        mpp.apply_async(power, (item, )) # 非阻塞式提交新任务        #mpp.apply(power, (item, )) # 阻塞式提交新任务        mpp.close() # 关闭进程池,意味着不再接受新的任务    print('主进程走到这里,正在等待子进程结束')    mpp.join() # 等待所有子进程结束    print('程序结束')if __name__ == '__main__':
demo()/<code>

最后小编帮助大家整理了一套python教程,下面展示了部分,希望也能帮助对编程感兴趣,想做数据分析,人工智能、爬虫或者希望从事编程开发的小伙伴,毕竟python工资也还可以,如果能帮到你请点赞、点赞、点赞。

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

仅用7行Python代码,完美演绎MapReduce并行运算编程模型

PS:如果你喜欢python,并觉得这篇文章对你有益的话,麻烦多多点赞关注支持!!!!


分享到:


相關文章: