Python网络爬虫3 - 生产者消费者模型爬取某金融网站数据

应一位金融圈的朋友所托,帮忙写个爬虫,帮他爬取中国期货行业协议网站中所有金融机构的从业人员信息。网站数据的获取本身比较简单,但是为了学习一些新的爬虫方法和技巧,即本文要讲述的生产者消费者模型,我又学习了一下Python中队列库queue及线程库Thread的使用方法。

生产者消费者模型

生产者消费者模型非常简单,相信大部分程序员都知道,就是一方作为生产者不断提供资源,另一方作为消费者不断消费资源。简单点说,就好比餐馆的厨师和顾客,厨师作为生产者不断制作美味的食物,而顾客作为消费者不断食用厨师提供的食物。此外,生产者与消费者之间可以是一对一、一对多、多对一和多对多的关系。

那么这个模型和爬虫有什么关系呢?其实,爬虫可以认为是一个生产者,它不断从网站爬取数据,爬取到的数据就是食物;而所得数据需要消费者进行数据清洗,把有用的数据吸收掉,把无用的数据丢弃。

在实践过程中,爬虫爬取和数据清洗分别对应一个Thread,两个线程之间通过顺序队列queue传递数据,数据传递过程就好比餐馆服务员从厨房把食物送到顾客餐桌上的过程。爬取线程负责爬取网站数据,并将原始数据存入队列,清洗线程从队列中按入队顺序读取原始数据并提取出有效数据。

以上便是对生产者消费者模型的简单介绍了,下面针对本次爬取任务予以详细说明。

分析站点

http://www.cfachina.org/cfainfo/organbaseinfoServlet?all=personinfo

home page

我们要爬取的数据是主页显示的表格中所有期货公司的从业人员信息,每个公司对应一个机构编号(G01001~G01198)。从上图可以看到有主页有分页,共8页。以G01001方正中期期货公司为例,点击该公司名称跳转至对应网页如下:

personinfo

从网址及网页内容可以提取出以下信息:

  1. 网址
  2. 机构名称mechanism_name,在每页表格上方可以看到当前机构名称
  3. 从业人员信息,即每页的表格内容,也是我们要爬取的对象
  4. 该机构从业人员信息总页数page_cnt

我们最终爬取的数据可以按机构名称存储到对应的txt文件或excel文件中。

获取机构名称

get mechanism name

获取到某机构的任意从业信息页面后,使用BeautifulSoup可快速提取机构名称。

mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()

那么有人可能会问,既然主页表格都已经包含了所有机构的编号和名称,为何还要多此一举的再获取一次呢?这是因为,我压根就不想爬主页的那些表格,直接根据机构编号的递增规律生成对应的网址即可,所以获取机构名称的任务就放在了爬取每个机构首个信息页面之后。

获取机构信息对应的网页数量

get count of page

每个机构的数据量是不等的,幸好每个页面都包含了当前页面数及总页面数。使用以下代码即可获取页码数。

url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
page_cnt = url_re.search(html).group(1)

从每个机构首页获取页码数后,便可for循环修改网址参数中的currentPage,逐页获取机构信息。

获取当前页面从业人员信息

get personinfo

针对如上图所示的一个特定信息页时,人员信息被存放于一个表中,除了固定的表头信息外,人员信息均被包含在一个带有idtr标签中,所以使用BeautifulSoup可以很容易提取出页面内所有人员信息。

soup.find_all('tr', id=True)

确定爬取方案

一般的想法当然是逐页爬取主页信息,然后获取每页所有机构对应的网页链接,进而继续爬取每个机构信息。

但是由于该网站的机构信息网址具有明显的规律,我们根据每个机构的编号便可直接得到每个机构每个信息页面的网址。所以具体爬取方案如下:

  1. 将所有机构编号网址存入队列url_queue
  2. 新建生产者线程SpiderThread完成抓取任务
    • 循环从队列url_queue中读取一个编号,生成机构首页网址,使用requests抓取之
    • 从抓取结果中获取页码数量,若为0,则返回该线程第1步
    • 循环爬取当前机构剩余页面
    • 将页面信息存入队列html_queue
  3. 新建消费者线程DatamineThread完成数据清洗任务
    • 循环从队列html_queue中读取一组页面信息
    • 使用BeautifulSoup提取页面中的从业人员信息
    • 将信息以二维数组形式存储,最后交由数据存储类Storage存入本地文件

代码实现

生成者SpiderThread

爬虫线程先从队列获取一个机构编号,生成机构首页网址并进行爬取,接着判断机构页面数量是否为0,如若不为0则继续获取机构名称,并根据页面数循环爬取剩余页面,将原始html数据以如下dict格式存入队列html_queue:

{
    'name': mechanismId_mechanismName,
    'num': currentPage,
    'content': html
}

爬虫产生的数据队列html_queue将由数据清洗线程进行处理,下面是爬虫线程的主程序,整个线程代码请看后面的源码

def run(self):
    while True:
        mechanism_id = 'G0' + self.url_queue.get()

        # the first page's url
        url = self.__get_url(mechanism_id, 1)
        html = self.grab(url)

        page_cnt = self.url_re.search(html.text).group(1)
        if page_cnt == '0':
            self.url_queue.task_done()
            continue

        soup = BeautifulSoup(html.text, 'html.parser')
        mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
        print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))

        # put data into html_queue
        self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
        for i in range(2, int(page_cnt) + 1):
            url = self.__get_url(mechanism_id, i)
            html = self.grab(url)
            self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})

        self.url_queue.task_done()

消费者DatamineThread

数据清洗线程比较简单,就是从生产者提供的数据队列html_queue逐一提取html数据,然后从html数据中提取从业人员信息,以二维数组形式存储,最后交由存储模块Storage完成数据存储工作。

class DatamineThread(Thread):
    """Parse data from html"""
    def __init__(self, html_queue, filetype):
        Thread.__init__(self)
        self.html_queue = html_queue
        self.filetype = filetype

    def __datamine(self, data):
        '''Get data from html content'''
        soup = BeautifulSoup(data['content'].text, 'html.parser')
        infos = []
        for info in soup.find_all('tr', id=True):
            items = []
            for item in info.find_all('td'):
                items.append(item.get_text())
            infos.append(items)
        return infos

    def run(self):
        while True:
            data = self.html_queue.get()
            print('Datamine Thread: get %s_%d' % (data['name'], data['num']))

            store = Storage(data['name'], self.filetype)
            store.save(self.__datamine(data))
            self.html_queue.task_done()

数据存储Storage

我写了两类文件格式的存储函数,write_txt, write_excel,分别对应txt,excel文件。实际存储时由调用方确定文件格式。

def save(self, data):
    {
        '.txt': self.write_txt,
        '.xls': self.write_excel
    }.get(self.filetype)(data)

存入txt文件

存入txt文件是比较简单的,就是以附加(a)形式打开文件,写入数据,关闭文件。其中,文件名称由调用方提供。写入数据时,每个人员信息占用一行,以制表符\t分隔。

def write_txt(self, data):
    '''Write data to txt file'''
    fid = open(self.path, 'a', encoding='utf-8')

    # insert the header of table
    if not os.path.getsize(self.path):
        fid.write('\t'.join(self.table_header) + '\n')

    for info in data:
        fid.write('\t'.join(info) + '\n')
    fid.close()

存入Excel文件

存入Excel文件还是比较繁琐的,由于经验不多,选用的是xlwt, xlrdxlutils库。说实话,这3个库真心不大好用,勉强完成任务而已。为什么这么说,且看:

  1. 修改文件麻烦:xlwt只能写,xlrd只能读,需要xlutilscopy函数将xlrd读取的数据复制到内存,再用xlwt修改
  2. 只支持.xls文件:.xlsx经读写也会变成.xls格式
  3. 表格样式易变:只要重新写入文件,表格样式必然重置

所以后续我肯定会再学学其它的excel库,当然,当前解决方案暂时还用这三个。代码如下:

def write_excel(self, data):
    '''write data to excel file'''
    if not os.path.exists(self.path):
        header_style = xlwt.easyxf('font:name 楷体, color-index black, bold on')
        wb = xlwt.Workbook(encoding='utf-8')
        ws = wb.add_sheet('Data')

        # insert the header of table
        for i in range(len(self.table_header)):
            ws.write(0, i, self.table_header[i], header_style)
    else:
        rb = open_workbook(self.path)
        wb = copy(rb)
        ws = wb.get_sheet(0)

    # write data
    offset = len(ws.rows)
    for i in range(0, len(data)):
        for j in range(0, len(data[0])):
            ws.write(offset + i, j, data[i][j])

    # When use xlutils.copy.copy function to copy data from exist .xls file,
    # it will loss the origin style, so we need overwrite the width of column,
    # maybe there some other good solution, but I have not found yet.
    for i in range(len(self.table_header)):
        ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]

    # save to file
    while True:
        try:
            wb.save(self.path)
            break
        except PermissionError as e:
            print('{0} error: {1}'.format(self.path, e.strerror))
            time.sleep(5)
        finally:
            pass

说明:

  1. 一个文件对应一个机构的数据,需要多次读取和写入,所以需要计算文件写入时的行数偏移量offset,即当前文件已包含数据的行数;
  2. 当被写入文件被人为打开时,会出现PermissionError异常,可以在捕获该异常然后提示错误信息,并定时等待直到文件被关闭。

main

主函数用于创建和启动生产者线程和消费者线程,同时为生产者线程提供机构编号队列。

url_queue = queue.Queue()
html_queue = queue.Queue()

def main():
    for i in range(1001, 1199):
        url_queue.put(str(i))

    # create and start a spider thread
    st = SpiderThread(url_queue, html_queue)
    st.setDaemon(True)
    st.start()

    # create and start a datamine thread
    dt = DatamineThread(html_queue, '.xls')
    dt.setDaemon(True)
    dt.start()

    # wait on the queue until everything has been processed
    url_queue.join()
    html_queue.join()

从主函数可以看到,两个队列都调用了join函数,用于阻塞,直到对应队列为空为止。要注意的是,队列操作中,每个出队操作queue.get()需要对应一个queue.task_done()操作,否则会出现队列数据已全部处理完,但主线程仍在执行的情况。

至此,爬虫的主要代码便讲解完了,下面是完整源码。

源码

#!/usr/bin/python3
# -*-coding:utf-8-*-

import queue
from threading import Thread

import requests

import re
from bs4 import BeautifulSoup

import os
import platform

import xlwt
from xlrd import open_workbook
from xlutils.copy import copy

import time

# url format ↓
# http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+&currentPage=1&pageSize=20&selectType=personinfo&all=undefined
# organid: +G01001+, +G01002+, +G01003+, ...
# currentPage: 1, 2, 3, ...
# pageSize: 20(default)
#
# Algorithm design:
# 2 threads with 2 queues
# Thread-1, get first page url, then get page_num and mechanism_name from first page
# Thread-2, parse html file and get data from it, then output data to local file
# url_queue data -> 'url'  # first url of each mechanism
# html_queue data -> {'name':'mechanism_name', 'html':data}

url_queue = queue.Queue()
html_queue = queue.Queue()


class SpiderThread(Thread):
    """Threaded Url Grab"""
    def __init__(self, url_queue, html_queue):
        Thread.__init__(self)
        self.url_queue = url_queue
        self.html_queue = html_queue
        self.page_size = 20
        self.url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
        self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}

    def __get_url(self, mechanism_id, current_page):
        return 'http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+%s+&currentPage=%d&pageSize=%d&selectType=personinfo&all=undefined' \
        % (mechanism_id, current_page, self.page_size)

    def grab(self, url):
        '''Grab html of url from web'''
        while True:
            try:
                html = requests.get(url, headers=self.headers, timeout=20)
                if html.status_code == 200:
                    break
            except requests.exceptions.ConnectionError as e:
                print(url + ' Connection error, try again...')
            except requests.exceptions.ReadTimeout as e:
                print(url + ' Read timeout, try again...')
            except Exception as e:
                print(str(e))
            finally:
                pass
        return html

    def run(self):
        '''Grab all htmls of mechanism one by one
        Steps:
            1. grab first page of each mechanism from url_queue
            2. get number of pages and mechanism name from first page
            3. grab all html file of each mechanism
            4. push all html to html_queue
        '''
        while True:
            mechanism_id = 'G0' + self.url_queue.get()

            # the first page's url
            url = self.__get_url(mechanism_id, 1)
            html = self.grab(url)

            page_cnt = self.url_re.search(html.text).group(1)
            if page_cnt == '0':
                self.url_queue.task_done()
                continue

            soup = BeautifulSoup(html.text, 'html.parser')
            mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
            print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))

            # put data into html_queue
            self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
            for i in range(2, int(page_cnt) + 1):
                url = self.__get_url(mechanism_id, i)
                html = self.grab(url)
                self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})

            self.url_queue.task_done()


class DatamineThread(Thread):
    """Parse data from html"""
    def __init__(self, html_queue, filetype):
        Thread.__init__(self)
        self.html_queue = html_queue
        self.filetype = filetype

    def __datamine(self, data):
        '''Get data from html content'''
        soup = BeautifulSoup(data['content'].text, 'html.parser')
        infos = []
        for info in soup.find_all('tr', id=True):
            items = []
            for item in info.find_all('td'):
                items.append(item.get_text())
            infos.append(items)
        return infos

    def run(self):
        while True:
            data = self.html_queue.get()
            print('Datamine Thread: get %s_%d' % (data['name'], data['num']))

            store = Storage(data['name'], self.filetype)
            store.save(self.__datamine(data))
            self.html_queue.task_done()


class Storage():
    def __init__(self, filename, filetype):
        self.filetype = filetype
        self.filename = filename + filetype
        self.table_header = ('姓名', '性别', '从业资格号', '投资咨询从业证书号', '任职部门', '职务', '任现职时间')
        self.path = self.__get_path()

    def __get_path(self):
        path = {
            'Windows': 'D:/litreily/Documents/python/cfachina',
            'Linux': '/mnt/d/litreily/Documents/python/cfachina'
        }.get(platform.system())

        if not os.path.isdir(path):
            os.makedirs(path)
        return '%s/%s' % (path, self.filename)

    def write_txt(self, data):
        '''Write data to txt file'''
        fid = open(self.path, 'a', encoding='utf-8')

        # insert the header of table
        if not os.path.getsize(self.path):
            fid.write('\t'.join(self.table_header) + '\n')

        for info in data:
            fid.write('\t'.join(info) + '\n')
        fid.close()

    def write_excel(self, data):
        '''write data to excel file'''
        if not os.path.exists(self.path):
            header_style = xlwt.easyxf('font:name 楷体, color-index black, bold on')
            wb = xlwt.Workbook(encoding='utf-8')
            ws = wb.add_sheet('Data')

            # insert the header of table
            for i in range(len(self.table_header)):
                ws.write(0, i, self.table_header[i], header_style)
        else:
            rb = open_workbook(self.path)
            wb = copy(rb)
            ws = wb.get_sheet(0)

        # write data
        offset = len(ws.rows)
        for i in range(0, len(data)):
            for j in range(0, len(data[0])):
                ws.write(offset + i, j, data[i][j])

        # When use xlutils.copy.copy function to copy data from exist .xls file,
        # it will loss the origin style, so we need overwrite the width of column,
        # maybe there some other good solution, but I have not found yet.
        for i in range(len(self.table_header)):
            ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]

        # save to file
        while True:
            try:
                wb.save(self.path)
                break
            except PermissionError as e:
                print('{0} error: {1}'.format(self.path, e.strerror))
                time.sleep(5)
            finally:
                pass

    def save(self, data):
        '''Write data to local file.

        According filetype to choose function to save data, filetype can be '.txt'
        or '.xls', but '.txt' type is saved more faster then '.xls' type

        Args:
            data: a 2d-list array that need be save
        '''
        {
            '.txt': self.write_txt,
            '.xls': self.write_excel
        }.get(self.filetype)(data)


def main():
    for i in range(1001, 1199):
        url_queue.put(str(i))

    # create and start a spider thread
    st = SpiderThread(url_queue, html_queue)
    st.setDaemon(True)
    st.start()

    # create and start a datamine thread
    dt = DatamineThread(html_queue, '.xls')
    dt.setDaemon(True)
    dt.start()

    # wait on the queue until everything has been processed
    url_queue.join()
    html_queue.join()


if __name__ == '__main__':
    main()

爬取测试

spider

save to txt

save to excel

写在最后

  • 测试发现,写入txt的速度明显高于写入excel的速度
  • 如果将页面网址中的pageSize修改为1000或更大,则可以一次性获取某机构的所有从业人员信息,而不用逐页爬取,效率可以大大提高。
  • 该爬虫已托管至github Python-demos