基础爬虫实战
时间:2024-4-17 19:11 作者:张海荣 分类: 学习分享
import requests
import logging
import re
from urllib.parse import urljoin
import json
from os import makedirs
from os.path import exists
import multiprocessing
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center'# 爬取的目标网站的URL
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)
# 爬取页面html源码
def scrape_page(url):
logging.info('scraping %s...', url)
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error('get invalid status code %s while scraping %s', response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
# 页面url
def scrape_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
# 解析详情页面url
def parse_index(html):
pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
items = re.findall(pattern, html)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL, item)
logging.info('get detail url %s', detail_url)
yield detail_url
# 获取详情页面html源码
def scrape_detail(url):
return scrape_page(url)
# 解析详情页面源码
def parse_detail(html):
# 搜索图片模式
cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
# 搜索标题模式
name_pattern = re.compile('<h2.*?>(.*?)</h2>')
# 搜索分类模式
categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
# 搜索上映日期模式
published_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
# 搜索剧情介绍模式
drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
# 搜索评分模式
score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)
cover = re.search(cover_pattern, html).group(1).strip() if re.search(cover_pattern, html) else None
name = re.search(name_pattern, html).group(1).strip() if re.search(name_pattern, html) else None
categories = re.findall(categories_pattern, html) if re.findall(categories_pattern, html) else None
published = re.search(published_pattern, html).group(1).strip() if re.search(published_pattern, html) else None
drama = re.search(drama_pattern, html).group(1).strip() if re.search(drama_pattern, html) else None
score = float(re.search(score_pattern, html).group(1).strip()) if re.search(score_pattern, html) else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published': published,
'drama': drama,
'score': score
}
# 保存数据
def save_data(data):
name = data.get('name')
encodings = name.replace(':', '')
data['name'] = encodings
data_path = f'{RESULTS_DIR}/{encodings}.json'
json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, indent=2)
# main
def main(page):
# 获取页面html内容
index_html = scrape_index(page)
# 解析页面电影详情url
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
# 获取详情页面html内容
detail_html = scrape_detail(detail_url)
# 解析详情页面数据
data = parse_detail(detail_html)
logging.info('get data %s', data)
logging.info('saving data to json data')
save_data(data)
if __name__ == '__main__':
# 创建了一个进程池对象,此处未指定进程数量,因此将使用默认值(通常是 CPU 核心数)
pool = multiprocessing.Pool()
# 定义了一个迭代器 pages,从 1 开始直到 TOTAL_PAGE 的范围
pages = range(1, TOTAL_PAGE + 1)
# 使用 pool.map() 将函数 main 应用于每个 pages 中的元素,实现了并行处理。这里假设 main 函数用于处理单个页面的逻辑。
pool.map(main, pages)
# 关闭进程池,不再接受新的任务
pool.close()
# 等待所有进程完成
pool.join()
推荐阅读:
收藏