基础爬虫实战 学习分享

import requests
import logging
import  re
from urllib.parse import urljoin
import json
from os import makedirs
from os.path import exists
import multiprocessing

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center'# 爬取的目标网站的URL
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)

# 爬取页面html源码
def scrape_page(url):
    logging.info('scraping %s...', url)
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        logging.error('get invalid status code %s while scraping %s', response.status_code, url)
    except requests.RequestException:
        logging.error('error occurred while scraping %s', url, exc_info=True)

# 页面url
def scrape_index(page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

# 解析详情页面url
def parse_index(html):
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern, html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL, item)
        logging.info('get detail url %s', detail_url)
        yield detail_url

# 获取详情页面html源码
def scrape_detail(url):
    return scrape_page(url)

# 解析详情页面源码
def parse_detail(html):
    # 搜索图片模式
    cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
    # 搜索标题模式
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    # 搜索分类模式
    categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
    # 搜索上映日期模式
    published_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
    # 搜索剧情介绍模式
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
    # 搜索评分模式
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)
    cover = re.search(cover_pattern, html).group(1).strip() if re.search(cover_pattern, html) else None
    name = re.search(name_pattern, html).group(1).strip() if re.search(name_pattern, html) else None
    categories = re.findall(categories_pattern, html) if re.findall(categories_pattern, html) else None
    published = re.search(published_pattern, html).group(1).strip() if re.search(published_pattern, html) else None
    drama = re.search(drama_pattern, html).group(1).strip() if re.search(drama_pattern, html) else None
    score = float(re.search(score_pattern, html).group(1).strip()) if re.search(score_pattern, html) else None

    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published': published,
        'drama': drama,
        'score': score
    }

# 保存数据
def save_data(data):
    name = data.get('name')
    encodings = name.replace(':', '')
    data['name'] = encodings
    data_path = f'{RESULTS_DIR}/{encodings}.json'
    json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, indent=2)

# main
def main(page):
    # 获取页面html内容
    index_html = scrape_index(page)
    # 解析页面电影详情url
    detail_urls = parse_index(index_html)
    for detail_url in detail_urls:
        # 获取详情页面html内容
        detail_html = scrape_detail(detail_url)
        # 解析详情页面数据
        data = parse_detail(detail_html)
        logging.info('get data %s', data)
        logging.info('saving data to json data')
        save_data(data)

if __name__ == '__main__':
    # 创建了一个进程池对象,此处未指定进程数量,因此将使用默认值(通常是 CPU 核心数)
    pool = multiprocessing.Pool()
    # 定义了一个迭代器 pages,从 1 开始直到 TOTAL_PAGE 的范围
    pages = range(1, TOTAL_PAGE + 1)
    # 使用 pool.map() 将函数 main 应用于每个 pages 中的元素,实现了并行处理。这里假设 main 函数用于处理单个页面的逻辑。
    pool.map(main, pages)
    # 关闭进程池,不再接受新的任务
    pool.close()
    # 等待所有进程完成
    pool.join()

张海荣 发布于  2024-4-17 19:11 
article cover

张海荣 发布于  2024-3-19 20:50