最近要做一個國內外新冠疫情的熱點信息的收集系統,所以,需要爬取推特上的一些數據,然後做數據分類及情緒分析。作為一名合格的程序員,我們要有「拿來主義精神」,藉助別人的輪子來實現自己的項目,而不是從頭搭建。
一、爬蟲框架Scrapy
Scrapy 是用Python實現一個為爬取網站數據、提取結構性數據而編寫的應用框架。專業的事情交給專業的框架來做,所以,本項目我們確定使用 Scrapy 框架來進行數據爬取。如果對 Scrapy 還不熟悉,可以看我之前寫的這篇博文幫你快速上手, Python Scrapy爬蟲框架學習 。
二、尋找開源項目
在開始一個項目之前,避免重複造輪子,所以通過關鍵詞 「Scrapy」,「Twitter」在 GitHub上搜索是否有現成的開源項目。
通過搜索,我們發現有很多符合條件的開源項目,那麼如何選擇這些項目呢?有三個條件,第一是Star數,Star數多說明項目質量應該不錯得到了大家的認可,第二是,更新時間,說明這個項目一直在維護,第三是,文檔是否完整,通過文檔我們可以快速使用這個開源項目。所以,通過以上三個條件,我們看了下排在第一個的開源項目很不錯,star數頗高,最近更新時間在幾個月前,而且文檔很詳細,因此我們就用這個項目做二次開發,項目GitHub地址: jonbakerfish/TweetScraper 。
三、本地安裝及調試
1、拉取項目
It requires Scrapy and PyMongo (Also install MongoDB if you want to save the data to database). Setting up:
<code>$ git clone https://github.com/jonbakerfish/TweetScraper.git $ cd TweetScraper/ $ pip install -r requirements.txt #add '--user' if you are not root $ scrapy list $ #If the output is 'TweetScraper', then you are ready to go./<code>
2、數據持久化
通過閱讀文檔,我們發現該項目有三種持久化數據的方式,第一種是保存在文件中,第二種是保存在Mongo中,第三種是保存在MySQL數據庫中。因為我們抓取的數據需要做後期的分析,所以,需要將數據保存在MySQL中。
抓取到的數據默認是以Json格式保存在磁盤 ./Data/tweet/ 中的,所以,需要修改配置文件 TweetScraper/settings.py 。
<code>ITEM_PIPELINES = { # 'TweetScraper.pipelines.SaveToFilePipeline':100, #'TweetScraper.pipelines.SaveToMongoPipeline':100, # replace `SaveToFilePipeline` with this to use MongoDB 'TweetScraper.pipelines.SavetoMySQLPipeline':100, # replace `SaveToFilePipeline` with this to use MySQL } #settings for mysql MYSQL_SERVER = "18.126.219.16" MYSQL_DB = "scraper" MYSQL_TABLE = "tweets" # the table will be created automatically MYSQL_USER = "root" # MySQL user to use (should have INSERT access granted to the Database/Table MYSQL_PWD = "admin123456" # MySQL user's password/<code>
3、測試
進入到項目的根目錄下,運行以下命令:
<code># 進入到項目目錄 # cd /work/Code/scraper/TweetScraper scrapy crawl TweetScraper -a query="Novel coronavirus,#COVID-19"/<code>
注意,抓取Twitter的數據需要科學上網或者服務器部署在國外,我使用的是國外的服務器。
<code>[root@cs TweetScraper]# scrapy crawl TweetScraper -a query="Novel coronavirus,#COVID-19" 2020-04-16 19:22:40 [scrapy.utils.log] INFO: Scrapy 2.0.1 started (bot: TweetScraper) 2020-04-16 19:22:40 [scrapy.utils.log] INFO: Versions: lxml 4.2.1.0, libxml2 2.9.8, cssselect 1.1.0, parsel 1.5.2, w3lib 1.21.0, Twisted 20.3.0, Python 3.6.5 |Anaconda, Inc.| (default, Apr 29 2018, 16:14:56) - [GCC 7.2.0], pyOpenSSL 18.0.0 (OpenSSL 1.0.2o 27 Mar 2018), cryptography 2.2.2, Platform Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2020-04-16 19:22:40 [scrapy.crawler] INFO: Overridden settings: {'BOT_NAME': 'TweetScraper', 'LOG_LEVEL': 'INFO', 'NEWSPIDER_MODULE': 'TweetScraper.spiders', 'SPIDER_MODULES': ['TweetScraper.spiders'], 'USER_AGENT': 'TweetScraper'} 2020-04-16 19:22:40 [scrapy.extensions.telnet] INFO: Telnet Password: 1fb55da389e595db 2020-04-16 19:22:40 [scrapy.middleware] INFO: Enabled extensions: ['scrapy.extensions.corestats.CoreStats', 'scrapy.extensions.telnet.TelnetConsole', 'scrapy.extensions.memusage.MemoryUsage', 'scrapy.extensions.logstats.LogStats'] 2020-04-16 19:22:41 [scrapy.middleware] INFO: Enabled downloader middlewares: ['scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware', 'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware', 'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware', 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware', 'scrapy.downloadermiddlewares.retry.RetryMiddleware', 'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware', 'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware', 'scrapy.downloadermiddlewares.redirect.RedirectMiddleware', 'scrapy.downloadermiddlewares.cookies.CookiesMiddleware', 'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware', 'scrapy.downloadermiddlewares.stats.DownloaderStats'] 2020-04-16 19:22:41 [scrapy.middleware] INFO: Enabled spider middlewares: ['scrapy.spidermiddlewares.httperror.HttpErrorMiddleware', 'scrapy.spidermiddlewares.offsite.OffsiteMiddleware', 'scrapy.spidermiddlewares.referer.RefererMiddleware', 'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware', 'scrapy.spidermiddlewares.depth.DepthMiddleware'] Mysql連接成功###################################### MySQLCursorBuffered: (Nothing executed yet) 2020-04-16 19:22:41 [TweetScraper.pipelines] INFO: Table 'tweets' already exists 2020-04-16 19:22:41 [scrapy.middleware] INFO: Enabled item pipelines: ['TweetScraper.pipelines.SavetoMySQLPipeline'] 2020-04-16 19:22:41 [scrapy.core.engine] INFO: Spider opened 2020-04-16 19:22:41 [scrapy.extensions.logstats] INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min) 2020-04-16 19:22:41 [scrapy.extensions.telnet] INFO: Telnet console listening on 127.0.0.1:6023 2020-04-16 19:23:45 [scrapy.extensions.logstats] INFO: Crawled 1 pages (at 1 pages/min), scraped 11 items (at 11 items/min) 2020-04-16 19:24:44 [scrapy.extensions.logstats] INFO: Crawled 2 pages (at 1 pages/min), scraped 22 items (at 11 items/min) ^C2020-04-16 19:26:27 [scrapy.crawler] INFO: Received SIGINT, shutting down gracefully. Send again to force 2020-04-16 19:26:27 [scrapy.core.engine] INFO: Closing spider (shutdown) 2020-04-16 19:26:43 [scrapy.extensions.logstats] INFO: Crawled 3 pages (at 1 pages/min), scraped 44 items (at 11 items/min)/<code>
我們可以看到,該項目運行OK,抓取到的數據也已經被保存在數據庫了。
四、清洗數據
因為抓取到的Twitter上有表情等特殊符號,在插入數據庫時會報錯,所以,這裡需要對抓取的內容信息進行清洗。
TweetScraper/utils.py 文件新增filter_emoji過濾方法
<code>import re def filter_emoji(desstr, restr=''): """ filter emoji desstr: origin str restr: replace str """ # filter emoji try: res = re.compile(u'[\U00010000-\U0010ffff]') except re.error: res = re.compile(u'[\uD800-\uDBFF][\uDC00-\uDFFF]') return res.sub(restr, desstr)/<code>
在 TweetCrawler.py 文件中調用該方法:
<code>from TweetScraper.utils import filter_emoji def parse_tweet_item(self, items): for item in items: try: tweet = Tweet() tweet['usernameTweet'] = item.xpath('.//span[@class="username u-dir u-textTruncate"]/b/text()').extract()[0] ID = item.xpath('.//@data-tweet-id').extract() if not ID: continue tweet['ID'] = ID[0] ### get text content tweet['text'] = ' '.join( item.xpath('.//div[@class="js-tweet-text-container"]/p//text()').extract()).replace(' # ', '#').replace( ' @ ', '@') ### clear data[20200416] # tweet['text'] = re.sub(r"[\s+\.\!\/_,$%^*(+"\')]+|[+——?【】?~@#¥%……&*]+|\\n+|\\r+|(\\xa0)+|(\\u3000)+|\\t", "", tweet['text']); # 過濾掉表情符號【20200417】 tweet['text'] = filter_emoji(tweet['text'], '') if tweet['text'] == '': # If there is not text, we ignore the tweet continue ### get meta data tweet['url'] = item.xpath('.//@data-permalink-path').extract()[0] nbr_retweet = item.css('span.ProfileTweet-action--retweet > span.ProfileTweet-actionCount').xpath( '@data-tweet-stat-count').extract() if nbr_retweet: tweet['nbr_retweet'] = int(nbr_retweet[0]) else: tweet['nbr_retweet'] = 0 nbr_favorite = item.css('span.ProfileTweet-action--favorite > span.ProfileTweet-actionCount').xpath( '@data-tweet-stat-count').extract() if nbr_favorite: tweet['nbr_favorite'] = int(nbr_favorite[0]) else: tweet['nbr_favorite'] = 0 nbr_reply = item.css('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount').xpath( '@data-tweet-stat-count').extract() if nbr_reply: tweet['nbr_reply'] = int(nbr_reply[0]) else: tweet['nbr_reply'] = 0 tweet['datetime'] = datetime.fromtimestamp(int( item.xpath('.//div[@class="stream-item-header"]/small[@class="time"]/a/span/@data-time').extract()[ 0])).strftime('%Y-%m-%d %H:%M:%S') ### get photo has_cards = item.xpath('.//@data-card-type').extract() if has_cards and has_cards[0] == 'photo': tweet['has_image'] = True tweet['images'] = item.xpath('.//*/div/@data-image-url').extract() elif has_cards: logger.debug('Not handle "data-card-type":\n%s' % item.xpath('.').extract()[0]) ### get animated_gif has_cards = item.xpath('.//@data-card2-type').extract() if has_cards: if has_cards[0] == 'animated_gif': tweet['has_video'] = True tweet['videos'] = item.xpath('.//*/source/@video-src').extract() elif has_cards[0] == 'player': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == 'summary_large_image': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == 'amplify': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == 'summary': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == '__entity_video': pass # TODO # tweet['has_media'] = True # tweet['medias'] = item.xpath('.//*/div/@data-src').extract() else: # there are many other types of card2 !!!! logger.debug('Not handle "data-card2-type":\n%s' % item.xpath('.').extract()[0]) is_reply = item.xpath('.//div[@class="ReplyingToContextBelowAuthor"]').extract() tweet['is_reply'] = is_reply != [] is_retweet = item.xpath('.//span[@class="js-retweet-text"]').extract() tweet['is_retweet'] = is_retweet != [] tweet['user_id'] = item.xpath('.//@data-user-id').extract()[0] yield tweet if self.crawl_user: ### get user info user = User() user['ID'] = tweet['user_id'] user['name'] = item.xpath('.//@data-name').extract()[0] user['screen_name'] = item.xpath('.//@data-screen-name').extract()[0] user['avatar'] = \ item.xpath('.//div[@class="content"]/div[@class="stream-item-header"]/a/img/@src').extract()[0] yield user except: logger.error("Error tweet:\n%s" % item.xpath('.').extract()[0]) # raise/<code>
通過數據清洗,現在可以正常插入到表裡了。
五、翻譯成中文
我們可以看到,爬取的數據內容有多個國家的語言,如英文、日語、阿拉伯語、法語等,為了能夠知道是什麼意思,需要將這些文字翻譯成中文,怎麼翻譯呢?其實很簡單,GitHub上有一個開源的Python 谷歌翻譯包 ssut/py-googletrans ,該項目非常強大,可以自動識別語言並且翻譯成我們指定的語言,我們只需安裝即可使用。
1、安裝
<code>$ pip install googletrans/<code>
2、使用
<code>>>> from googletrans import Translator >>> translator = Translator() >>> translator.translate('안녕하세요.') # >>> translator.translate('안녕하세요.', dest='ja') # >>> translator.translate('veritas lux mea', class="lazy" data-original='la') # /<code>
<code>from googletrans import Translator destination = 'zh-CN' # 翻譯為中文 t = '안녕하세요.' res = Translator().translate(t, dest=destination).text print(res) 你好/<code>
3、引用到項目
在 TweetCrawler.py 文件中調用該方法,並且需要在數據庫中新增加一個字段 text_cn 。
<code># google translate[20200416] # @see https://github.com/ssut/py-googletrans from googletrans import Translator def parse_tweet_item(self, items): for item in items: try: tweet = Tweet() tweet['usernameTweet'] = item.xpath('.//span[@class="username u-dir u-textTruncate"]/b/text()').extract()[0] ID = item.xpath('.//@data-tweet-id').extract() if not ID: continue tweet['ID'] = ID[0] ### get text content tweet['text'] = ' '.join( item.xpath('.//div[@class="js-tweet-text-container"]/p//text()').extract()).replace(' # ', '#').replace( ' @ ', '@') ### clear data[20200416] # tweet['text'] = re.sub(r"[\s+\.\!\/_,$%^*(+"\')]+|[+——?【】?~@#¥%……&*]+|\\n+|\\r+|(\\xa0)+|(\\u3000)+|\\t", "", tweet['text']); # 過濾掉表情符號【20200417】 tweet['text'] = filter_emoji(tweet['text'], '') # 翻譯成中文 Translate Chinese【20200417】 tweet['text_cn'] = Translator().translate(tweet['text'],'zh-CN').text; if tweet['text'] == '': # If there is not text, we ignore the tweet continue ### get meta data tweet['url'] = item.xpath('.//@data-permalink-path').extract()[0] nbr_retweet = item.css('span.ProfileTweet-action--retweet > span.ProfileTweet-actionCount').xpath( '@data-tweet-stat-count').extract() if nbr_retweet: tweet['nbr_retweet'] = int(nbr_retweet[0]) else: tweet['nbr_retweet'] = 0 nbr_favorite = item.css('span.ProfileTweet-action--favorite > span.ProfileTweet-actionCount').xpath( '@data-tweet-stat-count').extract() if nbr_favorite: tweet['nbr_favorite'] = int(nbr_favorite[0]) else: tweet['nbr_favorite'] = 0 nbr_reply = item.css('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount').xpath( '@data-tweet-stat-count').extract() if nbr_reply: tweet['nbr_reply'] = int(nbr_reply[0]) else: tweet['nbr_reply'] = 0 tweet['datetime'] = datetime.fromtimestamp(int( item.xpath('.//div[@class="stream-item-header"]/small[@class="time"]/a/span/@data-time').extract()[ 0])).strftime('%Y-%m-%d %H:%M:%S') ### get photo has_cards = item.xpath('.//@data-card-type').extract() if has_cards and has_cards[0] == 'photo': tweet['has_image'] = True tweet['images'] = item.xpath('.//*/div/@data-image-url').extract() elif has_cards: logger.debug('Not handle "data-card-type":\n%s' % item.xpath('.').extract()[0]) ### get animated_gif has_cards = item.xpath('.//@data-card2-type').extract() if has_cards: if has_cards[0] == 'animated_gif': tweet['has_video'] = True tweet['videos'] = item.xpath('.//*/source/@video-src').extract() elif has_cards[0] == 'player': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == 'summary_large_image': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == 'amplify': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == 'summary': tweet['has_media'] = True tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract() elif has_cards[0] == '__entity_video': pass # TODO # tweet['has_media'] = True # tweet['medias'] = item.xpath('.//*/div/@data-src').extract() else: # there are many other types of card2 !!!! logger.debug('Not handle "data-card2-type":\n%s' % item.xpath('.').extract()[0]) is_reply = item.xpath('.//div[@class="ReplyingToContextBelowAuthor"]').extract() tweet['is_reply'] = is_reply != [] is_retweet = item.xpath('.//span[@class="js-retweet-text"]').extract() tweet['is_retweet'] = is_retweet != [] tweet['user_id'] = item.xpath('.//@data-user-id').extract()[0] yield tweet if self.crawl_user: ### get user info user = User() user['ID'] = tweet['user_id'] user['name'] = item.xpath('.//@data-name').extract()[0] user['screen_name'] = item.xpath('.//@data-screen-name').extract()[0] user['avatar'] = \ item.xpath('.//div[@class="content"]/div[@class="stream-item-header"]/a/img/@src').extract()[0] yield user except: logger.error("Error tweet:\n%s" % item.xpath('.').extract()[0]) # raise/<code>
items.py 中新增加字段
<code># -*- coding: utf-8 -*- # Define here the models for your scraped items from scrapy import Item, Field class Tweet(Item): ID = Field() # tweet id url = Field() # tweet url datetime = Field() # post time text = Field() # text content text_cn = Field() # text Chinese content (新增字段) user_id = Field() # user id/<code>
管道 piplines.py 文件中修改數據庫持久化的方法,新增加text_cn字段
<code>class SavetoMySQLPipeline(object): ''' pipeline that save data to mysql ''' def __init__(self): # connect to mysql server self.cnx = mysql.connector.connect( user=SETTINGS["MYSQL_USER"], password=SETTINGS["MYSQL_PWD"], host=SETTINGS["MYSQL_SERVER"], database=SETTINGS["MYSQL_DB"], buffered=True) self.cursor = self.cnx.cursor() print('Mysql連接成功######################################', self.cursor) self.table_name = SETTINGS["MYSQL_TABLE"] create_table_query = "CREATE TABLE `" + self.table_name + "` (\ `ID` CHAR(20) NOT NULL,\ `url` VARCHAR(140) NOT NULL,\ `datetime` VARCHAR(22),\ `text` VARCHAR(280),\ `text_cn` VARCHAR(280),\ `user_id` CHAR(20) NOT NULL,\ `usernameTweet` VARCHAR(20) NOT NULL\ )" try: self.cursor.execute(create_table_query) except mysql.connector.Error as err: logger.info(err.msg) else: self.cnx.commit() def find_one(self, trait, value): select_query = "SELECT " + trait + " FROM " + self.table_name + " WHERE " + trait + " = " + value + ";" try: val = self.cursor.execute(select_query) except mysql.connector.Error as err: return False if (val == None): return False else: return True def check_vals(self, item): ID = item['ID'] url = item['url'] datetime = item['datetime'] text = item['text'] user_id = item['user_id'] username = item['usernameTweet'] if (ID is None): return False elif (user_id is None): return False elif (url is None): return False elif (text is None): return False elif (username is None): return False elif (datetime is None): return False else: return True def insert_one(self, item): ret = self.check_vals(item) if not ret: return None ID = item['ID'] user_id = item['user_id'] url = item['url'] text = item['text'] text_cn = item['text_cn'] username = item['usernameTweet'] datetime = item['datetime'] insert_query = 'INSERT INTO ' + self.table_name + ' (ID, url, datetime, text, text_cn, user_id, usernameTweet )' insert_query += ' VALUES ( %s, %s, %s, %s, %s, %s, %s)' insert_query += ' ON DUPLICATE KEY UPDATE' insert_query += ' url = %s, datetime = %s, text= %s, text_cn= %s, user_id = %s, usernameTweet = %s' try: self.cursor.execute(insert_query, ( ID, url, datetime, text, text_cn, user_id, username, url, datetime, text, text_cn, user_id, username )) # insert and updadte parameter,so repeat except mysql.connector.Error as err: logger.info(err.msg) else: self.cnx.commit() def process_item(self, item, spider): if isinstance(item, Tweet): self.insert_one(dict(item)) # Item is inserted or updated. logger.debug("Add tweet:%s" %item['url'])/<code>
4、再次運行
然後再次運行該命令:
<code>scrapy crawl TweetScraper -a query="Novel coronavirus,#COVID-19"/<code>
可以看到數據庫中已經將外文翻譯成中文了^_^
更多完整項目代碼私信小編01獲取哦