干货分享,程序员自建代理ip池,轻松爬取数据不封ip没有反爬虫。

代理池主要分为4个模块:存储模块、获取模块、检测模块、接口模块

无私分享全套Python爬虫干货,如果你也想学习Python,@ 私信小编获取

存储模块

这里我们使用Redis的有序集合,集合的每一个元素都是不重复的。另外,有序集合的每一个元素都有一个分数字段。

具体代码实现如下(ippool_save.py)

<code>MAX_SCORE = 

100

MIN_SCORE =

0

INITIAL_SCORE =

10

REDIS_HOST =

'localhost'

REDIS_PORT =

6379

REDIS_PASSWORD =

None

REDIS_KEY =

'proxies'

import

redis

from

random

import

choice

class

PoolEmptyError

()

:

def

__str__

(self)

:

return

PoolEmptyError

class

RedisClient

(object)

:

def

__init__

(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD)

:

''' 初始化 :param host:地址 :param port: 端口号 :param password: 密码 '''

self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=

True

)

def

add

(self,proxy,score=INITIAL_SCORE)

:

''' 添加代理,设置初始分数 :param proxy: 代理 :param score: 分数 :return: 添加结果 '''

if

not

self.db.zscore(REDIS_KEY,proxy):

return

self.db.zadd(REDIS_KEY,{proxy:score})

def

random

(self)

:

''' 随即获取有效代理,首先尝试获取最高分数代理,如果最高分数不存在,则按照排名获取 :return: '''

result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)

if

len(result):

return

choice(result)

else

: result = self.db.zrevrange(REDIS_KEY,

0

,

100

)

if

len(result):

return

choice(result)

else

:

raise

PoolEmptyError

def

decrease

(self, proxy)

:

''' 代理值减一分,分数小于最小值,则代理删除 :param proxy: 代理 :return: 修改后的代理分数 '''

score = self.db.zscore(REDIS_KEY,proxy)

if

score

and

score>MIN_SCORE: print(

"代理"

,proxy,

"当前分数"

,score,

"减1"

)

return

self.db.zincrby(REDIS_KEY,

-1

,proxy)

else

: print(

"代理"

,proxy,

"当前分数"

,score,

"移除"

)

return

self.db.zrem(REDIS_KEY,proxy)

def

exists

(self,proxy)

:

''' 判断是否存在 :param proxy: 代理 :return: 是否存在 '''

return

not

self.db.zscore(REDIS_KEY,proxy) ==

None

def

max

(self,proxy)

:

''' 将代理设置为MAX_SCORE :param proxy: 代理 :return: 设置结果 '''

print(

"代理"

,proxy,

"可用,设置为"

,MAX_SCORE)

return

self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE})

def

count

(self)

:

''' 获取数量 :return:数量 '''

return

self.db.zcard(REDIS_KEY)

def

all

(self)

:

''' 获取全部代理 :return: 全部代理列表 '''

return

self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE) /<code>

获取模块

获取模块的逻辑相对简单,首先要定义一个ippool_crawler.py来从各大网站抓取,具体代码如下:

<code>

import

json

import

requests

from

lxml

import

etree

from

ippool_save

import

RedisClient

class

ProxyMetaclass

(type)

:

def

__new__

(cls, name,bases,attrs)

:

count =

0

attrs[

'__CrawlFunc__'

] = []

for

k,v

in

attrs.items():

if

'crawl_'

in

k: attrs[

'__CrawlFunc__'

].append(k) count+=

1

attrs[

'__CrawlFuncCount__'

] = count

return

type.__new__(cls,name,bases,attrs)

class

Crawler

(object,metaclass=ProxyMetaclass)

:

def

__init__

(self)

:

self.proxy = RedisClient().random() self.proxies = {

'http'

:

'http://'

+ self.proxy,

'https'

:

'https://'

+ self.proxy }

def

get_proxies

(self,callback)

:

proxies = []

for

proxy

in

eval(

"self.{}()"

.format(callback)): print(

'成功获取代理'

,proxy) proxies.append(proxy)

return

proxies /<code>

我们还需要定义一个Getter类,用来动态地调用所有以crawl开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来,具体代码如下(ippool_getter.py)

<code>from ippool_save import RedisClient
from ippool_crawler import Crawler

POOL_UPPER_THRESHOLD = 

1000

class

Getter

():

def

__init__

(

self

)

:

self

.redis = RedisClient()

self

.crawler = Crawler()

def

is_over_threshold

(

self

)

:

if

self

.redis.count() >=

POOL_UPPER_THRESHOLD:

return

True

else:

return

False

def

run

(

self

)

: print(

"获取器开始执行"

)

if

not

self

.is_over_threshold():

for

callback_label

in

range(

self

.crawler.__CrawlFuncCount_

_

): callback =

self

.crawler.__CrawlFunc_

_

[callback_label] proxies =

self

.crawler.get_proxies(callback)

for

proxy

in

proxies:

self

.redis.add(proxy) /<code>

检测模块

我们已经将各个网站的代理都抓取下来了现在就需要一个检测模块来对所有代理进行多轮检测。

<code>VALID_STATUS_CODES = [

200

] TEST_URL =

"http://www.baidu.com"

BATCH_TEST_SIZE =

100

from

ippool_save

import

RedisClient

import

aiohttp

import

asyncio

import

time

class

Tester

(object)

:

def

__init__

(self)

:

self.redis = RedisClient()

async

def

test_single_proxy

(self,proxy)

:

conn = aiohttp.TCPConnector(verify_ssl=

False

)

async

with

aiohttp.ClientSession(connector=conn)

as

session:

try

:

if

isinstance(proxy,bytes): proxy = proxy.decode(

'utf-8'

) real_proxy =

'http://'

+ proxy print(

"正在测试"

,proxy)

async

with

session.get(TEST_URL,proxy=real_proxy,timeout=

15

)

as

response:

if

response.status

in

VALID_STATUS_CODES: self.redis.max(proxy) print(

'代理可用'

,proxy)

else

: self.redis.decrease(proxy) print(

'请求响应码不合法'

,proxy)

except

(TimeoutError,ArithmeticError): self.redis.decrease(proxy) print(

'代理请求失败'

,proxy)

def

run

(self)

:

print(

'测试开始运行'

)

try

: proxies = self.redis.all() loop = asyncio.get_event_loop()

for

i

in

range(

0

,len(proxies),BATCH_TEST_SIZE): test_proxies = proxies[i:i+BATCH_TEST_SIZE] tasks = [self.test_single_proxy(proxy)

for

proxy

in

test_proxies] loop.run_until_complete(asyncio.wait(tasks)) time.sleep(

5

)

except

Exception

as

e: print(

'测试器发生错误'

, e.args)/<code>

接口模块

为了更方便地获取可用代理,我们增加了一个接口模块。

使用Flask来实现这个接口模块,实现代码如下(ippool_api.py)

<code>

from

flask

import

Flask,g

from

ippool_save

import

RedisClient __all__ = [

'app'

] app = Flask(__name__)

def

get_conn

()

:

if

not

hasattr(g,

'redis'

): g.redis = RedisClient()

return

g.redis

def

index

()

:

return

'

Welcome to Proxy Pool System

'

def

get_proxy

()

:

conn = get_conn()

return

conn.random()

def

get_counts

()

:

conn = get_conn()

return

str(conn.count())

if

__name__ ==

'__main__'

: app.run()/<code>

调度模块

调度模块就是调用以上定义的3个模块,将这3个模块通过多进程的形式运行起来。

最后,只需要调用Scheduler的run()方法即可启动整个代码池。

<code>TESTER_CYCLE = 

20

GETTER_CYCLE =

20

TESTER_ENABLED =

True

GETTER_ENABLED =

True

API_ENABLED =

True

from

multiprocessing

import

Process

from

ippool_api

import

app

from

ippool_getter

import

Getter

from

ippool_check

import

Tester

import

time

class

Scheduler

()

:

def

schedule_tester

(self,cycle=TESTER_CYCLE)

:

tester = Tester()

while

True

: print(

'测试器开始运行'

) tester.run() time.sleep(cycle)

def

schedule_getter

(self,cycle=GETTER_CYCLE)

:

getter = Getter()

while

True

: print(

'开始抓取代理'

) getter.run() time.sleep(cycle)

def

schedule_api

(self)

:

app.run()

def

run

(self)

:

print(

'代理池开始运行'

)

if

TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start()

if

GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start()

if

API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()

if

__name__ ==

'__main__'

: Scheduler().run() /<code>

为了帮助大家更轻松的学好Python,我给大家分享一套Python学习资料,希望对正在学习的你有所帮助!

获取方式:关注并私信小编 “ 学习 ”,即可免费获取!


干货分享,程序员自建代理ip池,轻松爬取数据不封ip没有反爬虫。


分享到:


相關文章: