多线程下载农产品产品存储到本地

农产品下载地址

一、具体步骤

  • 1、导包

    import os
    import random
    import csv
    import time
    import queue
    import threading
    import requests
    from lxml import etree
    
  • 2、定义几个全局的变量(方便下面的类使用)

    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36',
    }
    
    path = os.path.join(os.path.dirname(__file__), 'products.csv')
    
  • 3、定义一个生产者,用来抓取网页上的数据

    class Procuder(threading.Thread):
        """
        创建一个生产者(获取全部的url地址)
        """
    
        def __init__(self, page_queue, data_queue, *args, **kwargs):
            super(Procuder, self).__init__(*args, **kwargs)
            self.page_queue = page_queue
            self.data_queue = data_queue
    
        def run(self):
            while True:
                if self.page_queue.empty():
                    break
    
                url = self.page_queue.get()
                self.parse_page(url)
    
        def parse_page(self, url):
            """
            定义一个根据url请求数据的方法
            :param url:
            :return:
            """
            response = requests.get(url=url, headers=headers)
            time.sleep(random.randrange(3))
            if response.status_code == 200:
                html = etree.HTML(response.text)
                trs = html.xpath('//table[@class="table table-hover"]/tbody/tr')
                for tr in trs:
                    tds = tr.xpath('./td')
                    name = tds[0].xpath('./text()')[0]
                    price = tds[1].xpath('./text()')[0]
                    unit = tds[2].xpath('./text()')[0]
                    address = tds[3].xpath('./text()')[0]
                    date = tds[4].xpath('./text()')[0]
                    # print(name, price, unit, address, date)
                    # 加入队列中
                    self.data_queue.put(({'name': name, 'price': price, 'unit': unit, 'address': address, 'date': date}))
    
  • 4、定义一个消费者用来从生产者那边获取数据并下载

    class Consumer(threading.Thread):
        """
        定义一个消费者用来存储数据
        """
    
        def __init__(self, page_queue, data_queue, *args, **kwargs):
            super(Consumer, self).__init__(*args, **kwargs)
            self.page_queue = page_queue
            self.data_queue = data_queue
    
        def run(self):
            while True:
                if self.page_queue.empty() and self.data_queue.empty():
                    break
                # 从队列中获取数据
                data_row = self.data_queue.get()
                print('开始写入==>', data_row)
    
                with open(path, 'a', newline='', encoding='utf8') as f:
                    # # 写入到本地的csv文件中
                    table_headers = ['name', 'price', 'unit', 'address', 'date']
                    writer = csv.DictWriter(f, fieldnames=table_headers)
                    # # 写入头部
                    # writer.writeheader()
                    # 一行一行写入数据
                    writer.writerow(data_row)
    
  • 5、定义一个运行的main函数

    def main():
        """
        定义一个主运行方法
        :return:
        """
        if os.path.exists(path):
            os.remove(path)
    
        page_queue = queue.Queue(100)
        data_queue = queue.Queue(1000)
    
        # 创建爬取页面
        for i in range(1, 101):
            page_queue.put("http://www.gznw.gov.cn/priceInfo/getPriceInfoByAreaId.jx?areaid=22572&page={0}".format(str(i)))
    
        # 创建线程
        for x in range(5):
            p = Procuder(page_queue, data_queue)
            c = Consumer(page_queue, data_queue)
            p.start()
            c.start()
    
    if __name__ == '__main__':
        main()
    

二、完整代码

import os
import random
import csv
import time
import queue
import threading
import requests
from lxml import etree

headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36',
}

path = os.path.join(os.path.dirname(__file__), 'products.csv')

class Procuder(threading.Thread):
    """
    创建一个生产者(获取全部的url地址)
    """

    def __init__(self, page_queue, data_queue, *args, **kwargs):
        super(Procuder, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.data_queue = data_queue

    def run(self):
        while True:
            if self.page_queue.empty():
                break

            url = self.page_queue.get()
            self.parse_page(url)

    def parse_page(self, url):
        """
        定义一个根据url请求数据的方法
        :param url:
        :return:
        """
        response = requests.get(url=url, headers=headers)
        time.sleep(random.randrange(3))
        if response.status_code == 200:
            html = etree.HTML(response.text)
            trs = html.xpath('//table[@class="table table-hover"]/tbody/tr')
            for tr in trs:
                tds = tr.xpath('./td')
                name = tds[0].xpath('./text()')[0]
                price = tds[1].xpath('./text()')[0]
                unit = tds[2].xpath('./text()')[0]
                address = tds[3].xpath('./text()')[0]
                date = tds[4].xpath('./text()')[0]
                # print(name, price, unit, address, date)
                # 加入队列中
                self.data_queue.put(({'name': name, 'price': price, 'unit': unit, 'address': address, 'date': date}))


class Consumer(threading.Thread):
    """
    定义一个消费者用来存储数据
    """

    def __init__(self, page_queue, data_queue, *args, **kwargs):
        super(Consumer, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.data_queue = data_queue

    def run(self):
        while True:
            if self.page_queue.empty() and self.data_queue.empty():
                break
            # 从队列中获取数据
            data_row = self.data_queue.get()
            print('开始写入==>', data_row)

            with open(path, 'a', newline='', encoding='utf8') as f:
                # # 写入到本地的csv文件中
                table_headers = ['name', 'price', 'unit', 'address', 'date']
                writer = csv.DictWriter(f, fieldnames=table_headers)
                # # 写入头部
                # writer.writeheader()
                # 一行一行写入数据
                writer.writerow(data_row)

def main():
    """
    定义一个主运行方法
    :return:
    """
    if os.path.exists(path):
        os.remove(path)

    page_queue = queue.Queue(100)
    data_queue = queue.Queue(1000)

    # 创建爬取页面
    for i in range(1, 101):
        page_queue.put("http://www.gznw.gov.cn/priceInfo/getPriceInfoByAreaId.jx?areaid=22572&page={0}".format(str(i)))

    # 创建线程
    for x in range(5):
        p = Procuder(page_queue, data_queue)
        c = Consumer(page_queue, data_queue)
        p.start()
        c.start()

if __name__ == '__main__':
    main()

results matching ""

    No results matching ""