通過分頁、線程池、代理池等技術,快速爬取鏈家網近4萬條在售二手房信息,速度可達 10000 條 / 5 分鐘。
通過對二手房作數據分析,得到北上廣深等(新)一線城市四地房價的縱向比較,同時對各個城市各個區的房價做橫向對比,並將對比結果可視化出來。
主要用到的庫或模塊包括
- Requests
- PyQuery
- ThreadPoolExecutor
- JSON
- Matplotlib
- PyEcharts
環境:
- Widnows10
- Python3.5
- Pycharm2018
數據抓取
爬蟲架構設計
通過分析鏈家網的 URL ,不難發現,每一個城市的鏈家網的基本格式是:
城市名簡拼 + ”.lianjia.com“
所以整個爬蟲最外層應該是遍歷一個保存城市簡拼的列表,拼接得到一個個起始 URL,根據這些 URL 爬取對應城市的鏈家網。
針對每一個城市的鏈家網而言,首先得到該城市在售二手房的總套數,由於每一頁顯示的套數是 30,由總套數整除以30再加上1可以得到總頁數,但是由於最大可瀏覽頁數為 100,所以我們這裡得加個判斷,如果總頁數大於 100 的話,令總頁數等於 100。
分析具體城市的鏈家網每一頁的 URL, 以北京為例,我們可以發現第 N 頁的 URL 是:
bj.lianjia.com/ershoufang/pg{N},由此我們可以通過以下代碼來得到每一頁的 URL:
for i in range(total_page):
page_url = "bj.lianjia.com/ershoufang/pg{}".format(i+1)
本來得到每一頁的 URL 後,我們可以得到該頁上 30 套房的房價信息和詳情頁 URL,但是頁面上沒有房子所在區的信息。
我們只能再向下請求訪問詳情頁 URL,從而提取出我們想要的所有數據。
綜上所述,我們可以將整個框架從上往下分為四層,如下圖所示:
基於上述思路,在寫代碼的時候,可以分層從上往下實現,方便調試。
第一層 & 第二層:獲取總套數
根據城市簡拼得到起始 URL,並得到總套數,為分頁做準備。
def get_list_page_url(city):
start_url = "https://{}.lianjia.com/ershoufang".format(city)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
}
try:
response = requests.get(start_url, headers=headers)
# print(response.status_code, response.text)
doc = pq(response.text)
total_num = int(doc(".resultDes .total span").text())
total_page = total_num // 30 + 1
# 只能訪問到前一百頁
if total_page > 100:
total_page = 100
page_url_list = list()
for i in range(total_page):
url = start_url + "/pg" + str(i + 1) + "/"
page_url_list.append(url)
#print(url)
return page_url_list
except:
print("獲取總套數出錯,請確認起始URL是否正確")
return None
第三層:根據起始 URL 得到分頁 URL
def get_detail_page_url(page_url):
global detail_list
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}
try:
response = requests.get(page_url,headers=headers,timeout=3)
doc = pq(response.text)
i = 0
detail_urls = list()
for item in doc(".sellListContent li").items():
i += 1
print(i)
if i == 31:
break
child_item = item(".noresultRecommend")
if child_item == None:
i -= 1
detail_url = child_item.attr("href")
detail_urls.append(detail_url)
return detail_urls
except:
print("獲取列表頁" + page_url + "出錯")
第四層
本層做的是具體解析,解析使用的是 PyQuery 庫,支持 CSS 選擇器且比 Beautiful Soup 方便。僅僅需要下面幾行代碼就幫助我們獲得了目標數據:
response = requests.get(url=detail_url, headers=headers, proxies=proxies)
#detail_url 是得到的詳情頁 URL
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
多線程爬取
p = ThreadPoolExecutor(30)
for page_url in page_url_list:
p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
p.shutdown()
IP 代理池
下載後新開一個 Pycharm 視窗運行該項目,然後我們可以用下面的方式來獲取可用的代理 IP:
def get_valid_ip():
url = "http://localhost:5000/get"
try:
ip = requests.get(url).text
return ip
except:
print("請先運行代理池")
然後通過參數設置使用代理 IP:
proxies = {
"http": "http://" + get_valid_ip(),
}
response = requests.get(url=detail_url, headers=headers, proxies=proxies)
數據保存
採用 JSON文件形式保存數據,每個城市保存一個 JSON 文件,文件名為該城市簡拼。
def save_data(data,filename):
with open(filename+".json", 'w', encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
稍等一會兒,所有數據就保存在本地了:
數據分析
數據整合
在這裡做一些求同地區房價最大值、最小值、平均值,以及數據格式統一化的工作:
def split_data():
global region_data
region_data = dict()
for region in dic_data.keys():
# 最大值、最小值、平均值
region_data[region] = {"max":dic_data[region][0],"min":dic_data[region][0],"average":0}
for per_price in dic_data[region]:
if per_price > region_data[region]["max"]:
region_data[region]["max"] = per_price
if per_price < region_data[region]["min"]:
region_data[region]["min"] = per_price
region_data[region]["average"] += per_price
region_data[region]["average"] /= len(dic_data[region])
# 保留兩位小數
region_data[region]["average"] = round(region_data[region]["average"],2)
數據可視化
將分析結果通過 Matplotlib 直觀的體現出來,該部分的代碼如下:
def data_viewer():
label_list = region_data.keys() # 橫座標刻度顯示值
max = []
min = []
average = []
for label in label_list:
max.append(region_data[label].get("max"))
min.append(region_data[label].get("min"))
average.append(region_data[label].get("average"))
x = range(len(max))
"""
繪製條形圖
left: 長條形中點橫座標
height: 長條形高度
width: 長條形寬度,默認值0
.8
label: 為後面設置legend準備
"""
rects1 = plt.bar(x=x, height=max, width=0.25, alpha=0.8, color='red', label="最大值")
rects2 = plt.bar(x=[i + 0.25 for i in x], height=average, width=0.25, color='green', label="平均值")
rects3 = plt.bar(x=[i + 0.5 for i in x], height=min, width=0.25, color='blue', label="最小值")
#plt.ylim(0, 50) # y軸取值範圍
plt.ylabel("房價/元")
"""
設置x軸刻度顯示值
參數一:中點座標
參數二:顯示值
"""
plt.xticks([index + 0.2 for index in x], label_list)
plt.xlabel("地區")
plt.legend()
for rect in rects1:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height+1, str(height), ha="center", va="bottom")
for rect in rects2:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
for rect in rects3:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
plt.show()
結果如下:
限於篇幅,其他城市的圖就不放了。
再來看全國主要一線城市二手房房價有序條形圖:
可以看出,北京、上海、深圳的房價大致在同一水平線,而廈門位於第四,廣州在第六,最後看一下房價地域圖:
最終代碼
import requests
from concurrent.futures import ThreadPoolExecutor
from pyquery import PyQuery as pq
import json
import threading
import time
def get_list_page_url(city):
start_url = "https://{}.lianjia.com/ershoufang".format(city)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
}
try:
response = requests.get(start_url, headers=headers)
# print(response.status_code, response.text)
doc = pq(response.text)
total_num = int(doc(".resultDes .total span").text())
total_page = total_num // 30 + 1
# 只能訪問到前一百頁
if total_page > 100:
total_page = 100
page_url_list = list()
for i in range(total_page):
url = start_url + "/pg" + str(i + 1) + "/"
page_url_list.append(url)
#print(url)
return page_url_list
except:
print("獲取總套數出錯,請確認起始URL是否正確")
return None
detail_list = list()
# 需要先在本地開啟代理池
# 代理池倉庫: https://github.com/Python3WebSpider/ProxyPool
def get_valid_ip():
url = "http://localhost:5000/get"
try:
ip = requests.get(url).text
return ip
except:
print("請先運行代理池")
def get_detail_page_url(page_url):
global detail_list
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}
try:
response = requests.get(page_url,headers=headers,timeout=3)
doc = pq(response.text)
# broswer.get(page_url)
# print(page_url)
# doc = pq(broswer.page_source)
i = 0
detail_urls = list()
for item in doc(".sellListContent li").items():
i += 1
if i == 31:
break
child_item = item(".noresultRecommend")
if child_item == None:
i -= 1
detail_url = child_item.attr("href")
detail_urls.append(detail_url)
return detail_urls
except:
print("獲取列表頁" + page_url + "出錯")
lock = threading.Lock()
def detail_page_parser(res):
global detail_list
detail_urls = res.result()
if not detail_urls:
print("detail url 為空")
return None
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}
for detail_url in detail_urls:
try:
response = requests.get(url=detail_url, headers=headers,timeout=3)
#print(response.status_code)
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
detail_dict["title"] = title
detail_dict["area"] = area
detail_dict["price"] = unit_price
detail_dict["url"] = url
detail_list.append(detail_dict)
print(unit_price, title, area)
except:
print("獲取詳情頁出錯,換ip重試")
proxies = {
"http": "http://" + get_valid_ip(),
}
try:
response = requests.get(url=detail_url, headers=headers, proxies=proxies)
#print(response.status_code)
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
# 已下架的還會爬取,但是沒有價格
if len(unit_price)>0:
detail_dict["title"] = title
detail_dict["area"] = area
detail_dict["price"] = unit_price
detail_dict["url"] = url
detail_list.append(detail_dict)
print(unit_price, title, area)
except:
print("重試失敗...")
def save_data(data,filename):
with open(filename+".json", 'w', encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
def main():
# cq,cs,nj,dl,wh,cc
city_list = ['nj']
for city in city_list:
page_url_list = get_list_page_url(city)
# pool = threadpool.ThreadPool(20)
# requests = threadpool.makeRequests(page_and_detail_parser, page_url_list)
# [pool.putRequest(req) for req in requests]
# pool.wait()
p = ThreadPoolExecutor(30)
for page_url in page_url_list:
p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
# 這裡的回調函數拿到的是一個對象。
# 先把返回的res得到一個結果。即在前面加上一個res.result(),這個結果就是get_detail_page_url的返回
p.shutdown()
print(detail_list)
save_data(detail_list, city)
detail_list.clear()
if __name__ == '__main__':
old = time.time()
main()
new = time.time()
delta_time = new - old
print("程序共運行{}s".format(delta_time))
閱讀更多 嗨學python 的文章