import requests
import logging
import re
import pymongo
from pyquery import PyQuery as pq
from urllib.parse import urljoin
import multiprocessing
# 1. 配置日志:输出时间、级别和内容,方便调试
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量配置
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
MONGO_DB_NAME = 'movies'
MONGO_COLLECTION_NAME = 'movies'
# 2. 初始化 MongoDB 连接
client = pymongo.MongoClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DB_NAME]
collection = db[MONGO_COLLECTION_NAME]
def scrape_page(url):
"""通用抓取函数,包含异常处理"""
logging.info('正在爬取 %s...', url)
try:
response = requests.get(url, timeout=10) # 建议增加超时设置
if response.status_code == 200:
return response.text
logging.error('抓取失败,状态码: %s, URL: %s', response.status_code, url)
except requests.RequestException:
logging.error('抓取 %s 时发生错误', url, exc_info=True)
def scrape_index(page):
"""抓取列表索引页"""
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
def parse_index(html):
"""解析索引页,利用 PyQuery 提取所有详情页 URL"""
doc = pq(html)
links = doc('.el-card .name') # 定位 CSS 选择器
for link in links.items():
href = link.attr('href')
detail_url = urljoin(BASE_URL, href)
logging.info('获取到详情页地址: %s', detail_url)
yield detail_url
def scrape_detail(url):
"""抓取详情页"""
return scrape_page(url)
def parse_detail(html):
"""解析详情页数据,清洗并转换为字段"""
doc = pq(html)
cover = doc('img.cover').attr('src')
name = doc('a > h2').text()
categories = [item.text() for item in doc('.categories button span').items()]
# 使用正则提取上映时间,注意处理 Python 3.12+ 的转义字符
published_at = doc('.info:contains(上映)').text()
published_at = re.search(r'(\d{4}-\d{2}-\d{2})', published_at).group(1) \
if published_at and re.search(r'\d{4}-\d{2}-\d{2}', published_at) else None
drama = doc('.drama p').text()
score = doc('p.score').text()
score = float(score) if score else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
def save_data(data):
"""保存数据到 MongoDB,使用 upsert 模式"""
if data:
collection.update_one({
'name': data.get('name') # 根据电影名唯一标识
}, {
'$set': data
}, upsert=True) # 如果存在则更新,不存在则插入
def main(page):
"""单页爬取主逻辑"""
index_html = scrape_index(page)
if not index_html:
return
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info('获取详情数据: %s', data.get('name'))
save_data(data)
logging.info('数据已存入 MongoDB')
if __name__ == '__main__':
# 3. 多进程并发调度
pool = multiprocessing.Pool() # 默认开启 CPU 核心数个进程
pages = range(1, TOTAL_PAGE + 1)
pool.map(main, pages)
pool.close()
pool.join() # 等待所有任务完成