代理池主要分为4个模块:存储模块、获取模块、检测模块、接口模块
无私分享全套Python爬虫干货,如果你也想学习Python,@ 私信小编获取存储模块
这里我们使用Redis的有序集合,集合的每一个元素都是不重复的。另外,有序集合的每一个元素都有一个分数字段。
具体代码实现如下(ippool_save.py)
<code>MAX_SCORE =100
MIN_SCORE =0
INITIAL_SCORE =10
REDIS_HOST ='localhost'
REDIS_PORT =6379
REDIS_PASSWORD =None
REDIS_KEY ='proxies'
import
redisfrom
randomimport
choiceclass
PoolEmptyError
()
:def
__str__
(self)
:return
PoolEmptyErrorclass
RedisClient
(object)
:def
__init__
(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD)
:''' 初始化 :param host:地址 :param port: 端口号 :param password: 密码 '''
self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True
)def
add
(self,proxy,score=INITIAL_SCORE)
:''' 添加代理,设置初始分数 :param proxy: 代理 :param score: 分数 :return: 添加结果 '''
if
not
self.db.zscore(REDIS_KEY,proxy):return
self.db.zadd(REDIS_KEY,{proxy:score})def
random
(self)
:''' 随即获取有效代理,首先尝试获取最高分数代理,如果最高分数不存在,则按照排名获取 :return: '''
result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)if
len(result):return
choice(result)else
: result = self.db.zrevrange(REDIS_KEY,0
,100
)if
len(result):return
choice(result)else
:raise
PoolEmptyErrordef
decrease
(self, proxy)
:''' 代理值减一分,分数小于最小值,则代理删除 :param proxy: 代理 :return: 修改后的代理分数 '''
score = self.db.zscore(REDIS_KEY,proxy)if
scoreand
score>MIN_SCORE: print("代理"
,proxy,"当前分数"
,score,"减1"
)return
self.db.zincrby(REDIS_KEY,-1
,proxy)else
: print("代理"
,proxy,"当前分数"
,score,"移除"
)return
self.db.zrem(REDIS_KEY,proxy)def
exists
(self,proxy)
:''' 判断是否存在 :param proxy: 代理 :return: 是否存在 '''
return
not
self.db.zscore(REDIS_KEY,proxy) ==None
def
max
(self,proxy)
:''' 将代理设置为MAX_SCORE :param proxy: 代理 :return: 设置结果 '''
print("代理"
,proxy,"可用,设置为"
,MAX_SCORE)return
self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE})
def
count
(self)
:''' 获取数量 :return:数量 '''
return
self.db.zcard(REDIS_KEY)def
all
(self)
:''' 获取全部代理 :return: 全部代理列表 '''
return
self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE) /<code>
获取模块
获取模块的逻辑相对简单,首先要定义一个ippool_crawler.py来从各大网站抓取,具体代码如下:
<code>import
jsonimport
requestsfrom
lxmlimport
etreefrom
ippool_saveimport
RedisClientclass
ProxyMetaclass
(type)
:def
__new__
(cls, name,bases,attrs)
: count =0
attrs['__CrawlFunc__'
] = []for
k,vin
attrs.items():if
'crawl_'
in
k: attrs['__CrawlFunc__'
].append(k) count+=1
attrs['__CrawlFuncCount__'
] = countreturn
type.__new__(cls,name,bases,attrs)class
Crawler
(object,metaclass=ProxyMetaclass)
:def
__init__
(self)
: self.proxy = RedisClient().random() self.proxies = {'http'
:'http://'
+ self.proxy,'https'
:'https://'
+ self.proxy }def
get_proxies
(self,callback)
: proxies = []for
proxyin
eval("self.{}()"
.format(callback)): print('成功获取代理'
,proxy) proxies.append(proxy)return
proxies /<code>
我们还需要定义一个Getter类,用来动态地调用所有以crawl开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来,具体代码如下(ippool_getter.py)
<code>from ippool_save import RedisClient from ippool_crawler import Crawler POOL_UPPER_THRESHOLD =1000
class
Getter
():def
__init__
(
self
):self
.redis = RedisClient()self
.crawler = Crawler()def
is_over_threshold
(
self
):if
self
.redis.count() >=POOL_UPPER_THRESHOLD:
return
Trueelse:
return
Falsedef
run
(
self
): print("获取器开始执行"
)if
not
self
.is_over_threshold():for
callback_labelin
range(self
.crawler.__CrawlFuncCount__
): callback =self
.crawler.__CrawlFunc__
[callback_label] proxies =self
.crawler.get_proxies(callback)for
proxyin
proxies:
self
.redis.add(proxy) /<code>
检测模块
我们已经将各个网站的代理都抓取下来了现在就需要一个检测模块来对所有代理进行多轮检测。
<code>VALID_STATUS_CODES = [200
] TEST_URL ="http://www.baidu.com"
BATCH_TEST_SIZE =100
from
ippool_saveimport
RedisClientimport
aiohttpimport
asyncioimport
timeclass
Tester
(object)
:def
__init__
(self)
: self.redis = RedisClient()async
def
test_single_proxy
(self,proxy)
: conn = aiohttp.TCPConnector(verify_ssl=False
)async
with
aiohttp.ClientSession(connector=conn)as
session:try
:if
isinstance(proxy,bytes): proxy = proxy.decode('utf-8'
) real_proxy ='http://'
+ proxy print("正在测试"
,proxy)async
with
session.get(TEST_URL,proxy=real_proxy,timeout=15
)as
response:if
response.statusin
VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用'
,proxy)else
: self.redis.decrease(proxy) print('请求响应码不合法'
,proxy)except
(TimeoutError,ArithmeticError): self.redis.decrease(proxy) print('代理请求失败'
,proxy)def
run
(self)
: print('测试开始运行'
)try
: proxies = self.redis.all() loop = asyncio.get_event_loop()for
iin
range(0
,len(proxies),BATCH_TEST_SIZE): test_proxies = proxies[i:i+BATCH_TEST_SIZE] tasks = [self.test_single_proxy(proxy)for
proxyin
test_proxies] loop.run_until_complete(asyncio.wait(tasks)) time.sleep(5
)except
Exceptionas
e: print('测试器发生错误'
, e.args)/<code>
接口模块
为了更方便地获取可用代理,我们增加了一个接口模块。
使用Flask来实现这个接口模块,实现代码如下(ippool_api.py)
<code>from
flaskimport
Flask,gfrom
ippool_saveimport
RedisClient __all__ = ['app'
] app = Flask(__name__)def
get_conn
()
:if
not
hasattr(g,'redis'
): g.redis = RedisClient()return
g.redisdef
index
()
:return
'
Welcome to Proxy Pool System
'def
get_proxy
()
: conn = get_conn()return
conn.random()def
get_counts
()
: conn = get_conn()return
str(conn.count())if
__name__ =='__main__'
: app.run()/<code>
调度模块
调度模块就是调用以上定义的3个模块,将这3个模块通过多进程的形式运行起来。
最后,只需要调用Scheduler的run()方法即可启动整个代码池。
<code>TESTER_CYCLE =20
GETTER_CYCLE =20
TESTER_ENABLED =True
GETTER_ENABLED =True
API_ENABLED =True
from
multiprocessingimport
Processfrom
ippool_apiimport
appfrom
ippool_getterimport
Getterfrom
ippool_checkimport
Testerimport
timeclass
Scheduler
()
:def
schedule_tester
(self,cycle=TESTER_CYCLE)
: tester = Tester()while
True
: print('测试器开始运行'
) tester.run() time.sleep(cycle)def
schedule_getter
(self,cycle=GETTER_CYCLE)
: getter = Getter()while
True
: print('开始抓取代理'
) getter.run() time.sleep(cycle)
def
schedule_api
(self)
: app.run()def
run
(self)
: print('代理池开始运行'
)if
TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start()if
GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start()if
API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()if
__name__ =='__main__'
: Scheduler().run() /<code>
为了帮助大家更轻松的学好Python,我给大家分享一套Python学习资料,希望对正在学习的你有所帮助!
获取方式:关注并私信小编 “ 学习 ”,即可免费获取!