多线程下载农产品产品存储到本地
一、具体步骤
1、导包
import os import random import csv import time import queue import threading import requests from lxml import etree
2、定义几个全局的变量(方便下面的类使用)
headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36', } path = os.path.join(os.path.dirname(__file__), 'products.csv')
3、定义一个生产者,用来抓取网页上的数据
class Procuder(threading.Thread): """ 创建一个生产者(获取全部的url地址) """ def __init__(self, page_queue, data_queue, *args, **kwargs): super(Procuder, self).__init__(*args, **kwargs) self.page_queue = page_queue self.data_queue = data_queue def run(self): while True: if self.page_queue.empty(): break url = self.page_queue.get() self.parse_page(url) def parse_page(self, url): """ 定义一个根据url请求数据的方法 :param url: :return: """ response = requests.get(url=url, headers=headers) time.sleep(random.randrange(3)) if response.status_code == 200: html = etree.HTML(response.text) trs = html.xpath('//table[@class="table table-hover"]/tbody/tr') for tr in trs: tds = tr.xpath('./td') name = tds[0].xpath('./text()')[0] price = tds[1].xpath('./text()')[0] unit = tds[2].xpath('./text()')[0] address = tds[3].xpath('./text()')[0] date = tds[4].xpath('./text()')[0] # print(name, price, unit, address, date) # 加入队列中 self.data_queue.put(({'name': name, 'price': price, 'unit': unit, 'address': address, 'date': date}))
4、定义一个消费者用来从生产者那边获取数据并下载
class Consumer(threading.Thread): """ 定义一个消费者用来存储数据 """ def __init__(self, page_queue, data_queue, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.page_queue = page_queue self.data_queue = data_queue def run(self): while True: if self.page_queue.empty() and self.data_queue.empty(): break # 从队列中获取数据 data_row = self.data_queue.get() print('开始写入==>', data_row) with open(path, 'a', newline='', encoding='utf8') as f: # # 写入到本地的csv文件中 table_headers = ['name', 'price', 'unit', 'address', 'date'] writer = csv.DictWriter(f, fieldnames=table_headers) # # 写入头部 # writer.writeheader() # 一行一行写入数据 writer.writerow(data_row)
5、定义一个运行的
main
函数def main(): """ 定义一个主运行方法 :return: """ if os.path.exists(path): os.remove(path) page_queue = queue.Queue(100) data_queue = queue.Queue(1000) # 创建爬取页面 for i in range(1, 101): page_queue.put("http://www.gznw.gov.cn/priceInfo/getPriceInfoByAreaId.jx?areaid=22572&page={0}".format(str(i))) # 创建线程 for x in range(5): p = Procuder(page_queue, data_queue) c = Consumer(page_queue, data_queue) p.start() c.start() if __name__ == '__main__': main()
二、完整代码
import os
import random
import csv
import time
import queue
import threading
import requests
from lxml import etree
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36',
}
path = os.path.join(os.path.dirname(__file__), 'products.csv')
class Procuder(threading.Thread):
"""
创建一个生产者(获取全部的url地址)
"""
def __init__(self, page_queue, data_queue, *args, **kwargs):
super(Procuder, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.data_queue = data_queue
def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self, url):
"""
定义一个根据url请求数据的方法
:param url:
:return:
"""
response = requests.get(url=url, headers=headers)
time.sleep(random.randrange(3))
if response.status_code == 200:
html = etree.HTML(response.text)
trs = html.xpath('//table[@class="table table-hover"]/tbody/tr')
for tr in trs:
tds = tr.xpath('./td')
name = tds[0].xpath('./text()')[0]
price = tds[1].xpath('./text()')[0]
unit = tds[2].xpath('./text()')[0]
address = tds[3].xpath('./text()')[0]
date = tds[4].xpath('./text()')[0]
# print(name, price, unit, address, date)
# 加入队列中
self.data_queue.put(({'name': name, 'price': price, 'unit': unit, 'address': address, 'date': date}))
class Consumer(threading.Thread):
"""
定义一个消费者用来存储数据
"""
def __init__(self, page_queue, data_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.data_queue = data_queue
def run(self):
while True:
if self.page_queue.empty() and self.data_queue.empty():
break
# 从队列中获取数据
data_row = self.data_queue.get()
print('开始写入==>', data_row)
with open(path, 'a', newline='', encoding='utf8') as f:
# # 写入到本地的csv文件中
table_headers = ['name', 'price', 'unit', 'address', 'date']
writer = csv.DictWriter(f, fieldnames=table_headers)
# # 写入头部
# writer.writeheader()
# 一行一行写入数据
writer.writerow(data_row)
def main():
"""
定义一个主运行方法
:return:
"""
if os.path.exists(path):
os.remove(path)
page_queue = queue.Queue(100)
data_queue = queue.Queue(1000)
# 创建爬取页面
for i in range(1, 101):
page_queue.put("http://www.gznw.gov.cn/priceInfo/getPriceInfoByAreaId.jx?areaid=22572&page={0}".format(str(i)))
# 创建线程
for x in range(5):
p = Procuder(page_queue, data_queue)
c = Consumer(page_queue, data_queue)
p.start()
c.start()
if __name__ == '__main__':
main()