Python Scrapy 爬蟲框架爬取推特信息及數據持久化!整理了我三天

最近要做一個國內外新冠疫情的熱點信息的收集系統,所以,需要爬取推特上的一些數據,然後做數據分類及情緒分析。作為一名合格的程序員,我們要有「拿來主義精神」,藉助別人的輪子來實現自己的項目,而不是從頭搭建。

一、爬蟲框架Scrapy

Scrapy 是用Python實現一個為爬取網站數據、提取結構性數據而編寫的應用框架。專業的事情交給專業的框架來做,所以,本項目我們確定使用 Scrapy 框架來進行數據爬取。如果對 Scrapy 還不熟悉,可以看我之前寫的這篇博文幫你快速上手, Python Scrapy爬蟲框架學習

二、尋找開源項目

在開始一個項目之前,避免重複造輪子,所以通過關鍵詞 「Scrapy」,「Twitter」在 GitHub上搜索是否有現成的開源項目。

Python Scrapy 爬蟲框架爬取推特信息及數據持久化!整理了我三天

通過搜索,我們發現有很多符合條件的開源項目,那麼如何選擇這些項目呢?有三個條件,第一是Star數,Star數多說明項目質量應該不錯得到了大家的認可,第二是,更新時間,說明這個項目一直在維護,第三是,文檔是否完整,通過文檔我們可以快速使用這個開源項目。所以,通過以上三個條件,我們看了下排在第一個的開源項目很不錯,star數頗高,最近更新時間在幾個月前,而且文檔很詳細,因此我們就用這個項目做二次開發,項目GitHub地址: jonbakerfish/TweetScraper

三、本地安裝及調試

1、拉取項目

It requires Scrapy and PyMongo (Also install MongoDB if you want to save the data to database). Setting up:

<code>$ git clone https://github.com/jonbakerfish/TweetScraper.git
$ cd TweetScraper/
$ pip install -r requirements.txt  #add '--user' if you are not root
$ scrapy list
$ #If the output is 'TweetScraper', then you are ready to go./<code>

2、數據持久化

通過閱讀文檔,我們發現該項目有三種持久化數據的方式,第一種是保存在文件中,第二種是保存在Mongo中,第三種是保存在MySQL數據庫中。因為我們抓取的數據需要做後期的分析,所以,需要將數據保存在MySQL中。

抓取到的數據默認是以Json格式保存在磁盤 ./Data/tweet/ 中的,所以,需要修改配置文件 TweetScraper/settings.py 。

<code>ITEM_PIPELINES = {
    # 'TweetScraper.pipelines.SaveToFilePipeline':100,
    #'TweetScraper.pipelines.SaveToMongoPipeline':100, # replace `SaveToFilePipeline` with this to use MongoDB
    'TweetScraper.pipelines.SavetoMySQLPipeline':100, # replace `SaveToFilePipeline` with this to use MySQL
}

#settings for mysql
MYSQL_SERVER = "18.126.219.16"
MYSQL_DB     = "scraper"
MYSQL_TABLE  = "tweets" # the table will be created automatically
MYSQL_USER   = "root"        # MySQL user to use (should have INSERT access granted to the Database/Table
MYSQL_PWD    = "admin123456"        # MySQL user's password/<code>

3、測試

進入到項目的根目錄下,運行以下命令:

<code># 進入到項目目錄
# cd  /work/Code/scraper/TweetScraper 
 scrapy crawl TweetScraper -a query="Novel coronavirus,#COVID-19"/<code>

注意,抓取Twitter的數據需要科學上網或者服務器部署在國外,我使用的是國外的服務器。

<code>[root@cs TweetScraper]#  scrapy crawl TweetScraper -a query="Novel coronavirus,#COVID-19"
2020-04-16 19:22:40 [scrapy.utils.log] INFO: Scrapy 2.0.1 started (bot: TweetScraper)
2020-04-16 19:22:40 [scrapy.utils.log] INFO: Versions: lxml 4.2.1.0, libxml2 2.9.8, cssselect 1.1.0, parsel 1.5.2, w3lib 1.21.0, Twisted 20.3.0, Python 3.6.5 |Anaconda, Inc.| (default, Apr 29 2018, 16:14:56) - [GCC 7.2.0], pyOpenSSL 18.0.0 (OpenSSL 1.0.2o  27 Mar 2018), cryptography 2.2.2, Platform Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core
2020-04-16 19:22:40 [scrapy.crawler] INFO: Overridden settings:
{'BOT_NAME': 'TweetScraper',
 'LOG_LEVEL': 'INFO',
 'NEWSPIDER_MODULE': 'TweetScraper.spiders',
 'SPIDER_MODULES': ['TweetScraper.spiders'],
 'USER_AGENT': 'TweetScraper'}
2020-04-16 19:22:40 [scrapy.extensions.telnet] INFO: Telnet Password: 1fb55da389e595db
2020-04-16 19:22:40 [scrapy.middleware] INFO: Enabled extensions:
['scrapy.extensions.corestats.CoreStats',
 'scrapy.extensions.telnet.TelnetConsole',
 'scrapy.extensions.memusage.MemoryUsage',
 'scrapy.extensions.logstats.LogStats']
2020-04-16 19:22:41 [scrapy.middleware] INFO: Enabled downloader middlewares:
['scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware',
 'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware',
 'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware',
 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware',
 'scrapy.downloadermiddlewares.retry.RetryMiddleware',
 'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware',
 'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware',
 'scrapy.downloadermiddlewares.redirect.RedirectMiddleware',
 'scrapy.downloadermiddlewares.cookies.CookiesMiddleware',
 'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware',
 'scrapy.downloadermiddlewares.stats.DownloaderStats']
2020-04-16 19:22:41 [scrapy.middleware] INFO: Enabled spider middlewares:
['scrapy.spidermiddlewares.httperror.HttpErrorMiddleware',
 'scrapy.spidermiddlewares.offsite.OffsiteMiddleware',
 'scrapy.spidermiddlewares.referer.RefererMiddleware',
 'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware',
 'scrapy.spidermiddlewares.depth.DepthMiddleware']
Mysql連接成功###################################### MySQLCursorBuffered: (Nothing executed yet)
2020-04-16 19:22:41 [TweetScraper.pipelines] INFO: Table 'tweets' already exists
2020-04-16 19:22:41 [scrapy.middleware] INFO: Enabled item pipelines:
['TweetScraper.pipelines.SavetoMySQLPipeline']
2020-04-16 19:22:41 [scrapy.core.engine] INFO: Spider opened
2020-04-16 19:22:41 [scrapy.extensions.logstats] INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
2020-04-16 19:22:41 [scrapy.extensions.telnet] INFO: Telnet console listening on 127.0.0.1:6023
2020-04-16 19:23:45 [scrapy.extensions.logstats] INFO: Crawled 1 pages (at 1 pages/min), scraped 11 items (at 11 items/min)
2020-04-16 19:24:44 [scrapy.extensions.logstats] INFO: Crawled 2 pages (at 1 pages/min), scraped 22 items (at 11 items/min)

^C2020-04-16 19:26:27 [scrapy.crawler] INFO: Received SIGINT, shutting down gracefully. Send again to force 
2020-04-16 19:26:27 [scrapy.core.engine] INFO: Closing spider (shutdown)
2020-04-16 19:26:43 [scrapy.extensions.logstats] INFO: Crawled 3 pages (at 1 pages/min), scraped 44 items (at 11 items/min)/<code>
Python Scrapy 爬蟲框架爬取推特信息及數據持久化!整理了我三天

我們可以看到,該項目運行OK,抓取到的數據也已經被保存在數據庫了。

四、清洗數據

因為抓取到的Twitter上有表情等特殊符號,在插入數據庫時會報錯,所以,這裡需要對抓取的內容信息進行清洗。

TweetScraper/utils.py 文件新增filter_emoji過濾方法

<code>import re

def filter_emoji(desstr, restr=''):
    """
    filter emoji
    desstr: origin str
    restr: replace str
    """
    # filter emoji
    try:
        res = re.compile(u'[\U00010000-\U0010ffff]')
    except re.error:
        res = re.compile(u'[\uD800-\uDBFF][\uDC00-\uDFFF]')
    return res.sub(restr, desstr)/<code>

在 TweetCrawler.py 文件中調用該方法:

<code>from TweetScraper.utils import filter_emoji

def parse_tweet_item(self, items):
        for item in items:
            try:
                tweet = Tweet()

                tweet['usernameTweet'] = item.xpath('.//span[@class="username u-dir u-textTruncate"]/b/text()').extract()[0]

                ID = item.xpath('.//@data-tweet-id').extract()
                if not ID:
                    continue
                tweet['ID'] = ID[0]

                ### get text content
                tweet['text'] = ' '.join(
                    item.xpath('.//div[@class="js-tweet-text-container"]/p//text()').extract()).replace(' # ',
                                                                                                        '#').replace(
                    ' @ ', '@')

                ### clear data[20200416]
                # tweet['text'] = re.sub(r"[\s+\.\!\/_,$%^*(+"\')]+|[+——?【】?~@#¥%……&*]+|\\n+|\\r+|(\\xa0)+|(\\u3000)+|\\t", "", tweet['text']);
                                
                                # 過濾掉表情符號【20200417】
                tweet['text'] = filter_emoji(tweet['text'], '')

                if tweet['text'] == '':
                    # If there is not text, we ignore the tweet
                    continue

                ### get meta data
                tweet['url'] = item.xpath('.//@data-permalink-path').extract()[0]

                nbr_retweet = item.css('span.ProfileTweet-action--retweet > span.ProfileTweet-actionCount').xpath(
                    '@data-tweet-stat-count').extract()
                if nbr_retweet:
                    tweet['nbr_retweet'] = int(nbr_retweet[0])
                else:
                    tweet['nbr_retweet'] = 0

                nbr_favorite = item.css('span.ProfileTweet-action--favorite > span.ProfileTweet-actionCount').xpath(
                    '@data-tweet-stat-count').extract()
                if nbr_favorite:
                    tweet['nbr_favorite'] = int(nbr_favorite[0])
                else:
                    tweet['nbr_favorite'] = 0

                nbr_reply = item.css('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount').xpath(
                    '@data-tweet-stat-count').extract()
                if nbr_reply:
                    tweet['nbr_reply'] = int(nbr_reply[0])
                else:
                    tweet['nbr_reply'] = 0

                tweet['datetime'] = datetime.fromtimestamp(int(
                    item.xpath('.//div[@class="stream-item-header"]/small[@class="time"]/a/span/@data-time').extract()[
                        0])).strftime('%Y-%m-%d %H:%M:%S')

                ### get photo
                has_cards = item.xpath('.//@data-card-type').extract()
                if has_cards and has_cards[0] == 'photo':
                    tweet['has_image'] = True
                    tweet['images'] = item.xpath('.//*/div/@data-image-url').extract()
                elif has_cards:
                    logger.debug('Not handle "data-card-type":\n%s' % item.xpath('.').extract()[0])

                ### get animated_gif
                has_cards = item.xpath('.//@data-card2-type').extract()
                if has_cards:
                    if has_cards[0] == 'animated_gif':
                        tweet['has_video'] = True
                        tweet['videos'] = item.xpath('.//*/source/@video-src').extract()
                    elif has_cards[0] == 'player':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == 'summary_large_image':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == 'amplify':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == 'summary':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == '__entity_video':
                        pass  # TODO
                        # tweet['has_media'] = True
                        # tweet['medias'] = item.xpath('.//*/div/@data-src').extract()
                    else:  # there are many other types of card2 !!!!
                        logger.debug('Not handle "data-card2-type":\n%s' % item.xpath('.').extract()[0])

                is_reply = item.xpath('.//div[@class="ReplyingToContextBelowAuthor"]').extract()
                tweet['is_reply'] = is_reply != []

                is_retweet = item.xpath('.//span[@class="js-retweet-text"]').extract()
                tweet['is_retweet'] = is_retweet != []

                tweet['user_id'] = item.xpath('.//@data-user-id').extract()[0]
                yield tweet

                if self.crawl_user:
                    ### get user info
                    user = User()
                    user['ID'] = tweet['user_id']
                    user['name'] = item.xpath('.//@data-name').extract()[0]
                    user['screen_name'] = item.xpath('.//@data-screen-name').extract()[0]
                    user['avatar'] = \
                        item.xpath('.//div[@class="content"]/div[@class="stream-item-header"]/a/img/@src').extract()[0]
                    yield user
            except:
                logger.error("Error tweet:\n%s" % item.xpath('.').extract()[0])
                # raise/<code>

通過數據清洗,現在可以正常插入到表裡了。

五、翻譯成中文

我們可以看到,爬取的數據內容有多個國家的語言,如英文、日語、阿拉伯語、法語等,為了能夠知道是什麼意思,需要將這些文字翻譯成中文,怎麼翻譯呢?其實很簡單,GitHub上有一個開源的Python 谷歌翻譯包 ssut/py-googletrans ,該項目非常強大,可以自動識別語言並且翻譯成我們指定的語言,我們只需安裝即可使用。

1、安裝

<code>$ pip install googletrans/<code>

2、使用

<code>>>> from googletrans import Translator
>>> translator = Translator()
>>> translator.translate('안녕하세요.')
# 
>>> translator.translate('안녕하세요.', dest='ja')
# 
>>> translator.translate('veritas lux mea', class="lazy" data-original='la')
# /<code>
<code>from googletrans import Translator

destination = 'zh-CN' # 翻譯為中文
t = '안녕하세요.'
res = Translator().translate(t, dest=destination).text
 print(res)
你好/<code>

3、引用到項目

在 TweetCrawler.py 文件中調用該方法,並且需要在數據庫中新增加一個字段 text_cn 。

<code># google translate[20200416]
# @see https://github.com/ssut/py-googletrans
from googletrans import Translator

def parse_tweet_item(self, items):
        for item in items:
            try:
                tweet = Tweet()

                tweet['usernameTweet'] = item.xpath('.//span[@class="username u-dir u-textTruncate"]/b/text()').extract()[0]

                ID = item.xpath('.//@data-tweet-id').extract()
                if not ID:
                    continue
                tweet['ID'] = ID[0]

                ### get text content
                tweet['text'] = ' '.join(
                    item.xpath('.//div[@class="js-tweet-text-container"]/p//text()').extract()).replace(' # ',
                                                                                                        '#').replace(
                    ' @ ', '@')

                ### clear data[20200416]
                # tweet['text'] = re.sub(r"[\s+\.\!\/_,$%^*(+"\')]+|[+——?【】?~@#¥%……&*]+|\\n+|\\r+|(\\xa0)+|(\\u3000)+|\\t", "", tweet['text']);
                                
                                # 過濾掉表情符號【20200417】
                tweet['text'] = filter_emoji(tweet['text'], '')
                                
                                # 翻譯成中文 Translate Chinese【20200417】
                tweet['text_cn'] = Translator().translate(tweet['text'],'zh-CN').text;

                if tweet['text'] == '':
                    # If there is not text, we ignore the tweet
                    continue

                ### get meta data
                tweet['url'] = item.xpath('.//@data-permalink-path').extract()[0]

                nbr_retweet = item.css('span.ProfileTweet-action--retweet > span.ProfileTweet-actionCount').xpath(
                    '@data-tweet-stat-count').extract()
                if nbr_retweet:
                    tweet['nbr_retweet'] = int(nbr_retweet[0])
                else:
                    tweet['nbr_retweet'] = 0

                nbr_favorite = item.css('span.ProfileTweet-action--favorite > span.ProfileTweet-actionCount').xpath(
                    '@data-tweet-stat-count').extract()
                if nbr_favorite:
                    tweet['nbr_favorite'] = int(nbr_favorite[0])
                else:
                    tweet['nbr_favorite'] = 0

                nbr_reply = item.css('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount').xpath(
                    '@data-tweet-stat-count').extract()
                if nbr_reply:
                    tweet['nbr_reply'] = int(nbr_reply[0])
                else:
                    tweet['nbr_reply'] = 0

                tweet['datetime'] = datetime.fromtimestamp(int(
                    item.xpath('.//div[@class="stream-item-header"]/small[@class="time"]/a/span/@data-time').extract()[
                        0])).strftime('%Y-%m-%d %H:%M:%S')

                ### get photo
                has_cards = item.xpath('.//@data-card-type').extract()
                if has_cards and has_cards[0] == 'photo':
                    tweet['has_image'] = True
                    tweet['images'] = item.xpath('.//*/div/@data-image-url').extract()
                elif has_cards:
                    logger.debug('Not handle "data-card-type":\n%s' % item.xpath('.').extract()[0])

                ### get animated_gif
                has_cards = item.xpath('.//@data-card2-type').extract()
                if has_cards:
                    if has_cards[0] == 'animated_gif':
                        tweet['has_video'] = True
                        tweet['videos'] = item.xpath('.//*/source/@video-src').extract()
                    elif has_cards[0] == 'player':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == 'summary_large_image':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == 'amplify':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == 'summary':
                        tweet['has_media'] = True
                        tweet['medias'] = item.xpath('.//*/div/@data-card-url').extract()
                    elif has_cards[0] == '__entity_video':
                        pass  # TODO
                        # tweet['has_media'] = True
                        # tweet['medias'] = item.xpath('.//*/div/@data-src').extract()
                    else:  # there are many other types of card2 !!!!
                        logger.debug('Not handle "data-card2-type":\n%s' % item.xpath('.').extract()[0])

                is_reply = item.xpath('.//div[@class="ReplyingToContextBelowAuthor"]').extract()
                tweet['is_reply'] = is_reply != []

                is_retweet = item.xpath('.//span[@class="js-retweet-text"]').extract()
                tweet['is_retweet'] = is_retweet != []

                tweet['user_id'] = item.xpath('.//@data-user-id').extract()[0]
                yield tweet

                if self.crawl_user:
                    ### get user info
                    user = User()
                    user['ID'] = tweet['user_id']
                    user['name'] = item.xpath('.//@data-name').extract()[0]
                    user['screen_name'] = item.xpath('.//@data-screen-name').extract()[0]
                    user['avatar'] = \
                        item.xpath('.//div[@class="content"]/div[@class="stream-item-header"]/a/img/@src').extract()[0]
                    yield user
            except:
                logger.error("Error tweet:\n%s" % item.xpath('.').extract()[0])
                # raise/<code>

items.py 中新增加字段

<code># -*- coding: utf-8 -*-

# Define here the models for your scraped items
from scrapy import Item, Field

class Tweet(Item):
    ID = Field()       # tweet id
    url = Field()      # tweet url
    datetime = Field() # post time
    text = Field()     # text content
    text_cn = Field()  # text Chinese content  (新增字段)
    user_id = Field()  # user id/<code>

管道 piplines.py 文件中修改數據庫持久化的方法,新增加text_cn字段

<code>class SavetoMySQLPipeline(object):

    ''' pipeline that save data to mysql '''
    def __init__(self):
        # connect to mysql server
        self.cnx = mysql.connector.connect(
            user=SETTINGS["MYSQL_USER"],
            password=SETTINGS["MYSQL_PWD"],
            host=SETTINGS["MYSQL_SERVER"],
            database=SETTINGS["MYSQL_DB"],
            buffered=True)
        self.cursor = self.cnx.cursor()

        print('Mysql連接成功######################################', self.cursor)
        self.table_name = SETTINGS["MYSQL_TABLE"]
        create_table_query =   "CREATE TABLE `" + self.table_name + "` (\
                `ID` CHAR(20) NOT NULL,\
                `url` VARCHAR(140) NOT NULL,\
                `datetime` VARCHAR(22),\
                `text` VARCHAR(280),\
                `text_cn` VARCHAR(280),\
                `user_id` CHAR(20) NOT NULL,\
                `usernameTweet` VARCHAR(20) NOT NULL\
                )"

        try:
            self.cursor.execute(create_table_query)
        except mysql.connector.Error as err:
            logger.info(err.msg)
        else:
            self.cnx.commit()

    def find_one(self, trait, value):
        select_query =  "SELECT " + trait + " FROM " + self.table_name + " WHERE " + trait + " = " + value + ";"
        try:
            val = self.cursor.execute(select_query)
        except mysql.connector.Error as err:
            return False

        if (val == None):
            return False
        else:
            return True

    def check_vals(self, item):
        ID = item['ID']
        url = item['url']
        datetime = item['datetime']
        text = item['text']
        user_id = item['user_id']
        username = item['usernameTweet']

        if (ID is None):
            return False
        elif (user_id is None):
            return False
        elif (url is None):
            return False
        elif (text is None):
            return False
        elif (username is None):
            return False
        elif (datetime is None):
            return False
        else:
            return True


    def insert_one(self, item):
        ret = self.check_vals(item)

        if not ret:
            return None

        ID = item['ID']
        user_id = item['user_id']
        url = item['url']
        text = item['text']
        text_cn = item['text_cn']

        username = item['usernameTweet']
        datetime = item['datetime']

        insert_query =  'INSERT INTO ' + self.table_name + ' (ID, url, datetime, text, text_cn, user_id, usernameTweet )'
        insert_query += ' VALUES ( %s, %s, %s, %s, %s, %s, %s)'
        insert_query += ' ON DUPLICATE KEY UPDATE'
        insert_query += ' url = %s, datetime = %s, text= %s, text_cn= %s, user_id = %s, usernameTweet = %s'

        try:
            self.cursor.execute(insert_query, (
                ID,
                url,
                datetime,
                text,
                text_cn,
                user_id,
                username,
                url,
                datetime,
                text,
                text_cn,
                user_id,
                username
                ))
        # insert and updadte parameter,so repeat
        except mysql.connector.Error as err:
            logger.info(err.msg)
        else:
            self.cnx.commit()

    def process_item(self, item, spider):
        if isinstance(item, Tweet):
           self.insert_one(dict(item))  # Item is inserted or updated.
           logger.debug("Add tweet:%s" %item['url'])/<code>

4、再次運行

然後再次運行該命令:

<code>scrapy crawl TweetScraper -a query="Novel coronavirus,#COVID-19"/<code>

可以看到數據庫中已經將外文翻譯成中文了^_^

Python Scrapy 爬蟲框架爬取推特信息及數據持久化!整理了我三天

更多完整項目代碼私信小編01獲取哦


分享到:


相關文章: