import requests
import logging
import re
import pymongo
from pyquery import PyQuery as pq
from urllib.parse import urljoin
import multiprocessing
# ======================== 1. 全局配置 ========================
# 日志配置:带时间戳、级别,方便定位爬取/解析问题
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量配置(所有可修改的都放这里,后续改目标网站不用改核心逻辑)
BASE_URL = 'https://ssr1.scrape.center' # 崔老师的公开爬虫测试站
TOTAL_PAGE = 10 # 要爬的总页数
MONGO_URI = 'mongodb://localhost:27017' # MongoDB 连接地址
MONGO_DB = 'scrape_movies' # 数据库名
MONGO_COL = 'movies' # 集合名
# ======================== 2. MongoDB 初始化 ========================
client = pymongo.MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COL]
# ======================== 3. 通用请求模块 ========================
def scrape_page(url):
"""
通用的单页抓取函数:加超时、加异常处理
只要返回HTML文本,否则None
"""
logging.info('🚀 开始抓取: %s', url)
try:
# 超时设置是必须的!防止网络卡住整个进程
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
return resp.text
logging.error('❌ 状态码错误: %d, URL: %s', resp.status_code, url)
except requests.RequestException as e:
logging.error('❌ 请求异常: %s, URL: %s', str(e), url, exc_info=False)
return None
# ======================== 4. 列表页(索引页)处理 ========================
def scrape_index(page_num):
"""抓取指定页码的列表页"""
index_url = f"{BASE_URL}/page/{page_num}"
return scrape_page(index_url)
def parse_index(html):
"""
解析列表页:用PyQuery的CSS选择器提取所有详情页URL
用yield返回迭代器,避免一次性存太多URL占用内存
"""
doc = pq(html)
# 定位电影卡片里的标题链接(崔老师的站结构很清晰)
link_nodes = doc('.el-card .name')
for node in link_nodes.items():
# 获取相对路径后转成绝对路径
detail_url = urljoin(BASE_URL, node.attr('href'))
logging.info('📍 发现详情页: %s', detail_url)
yield detail_url
# ======================== 5. 详情页处理 ========================
def scrape_detail(url):
"""抓取详情页(复用通用请求函数)"""
return scrape_page(url)
def parse_detail(html):
"""
解析详情页:清洗并提取成结构化的字典
重点:处理缺失值、用contains定位无特定class的元素
"""
doc = pq(html)
# 1. 封面图URL
cover = doc('img.cover').attr('src')
# 2. 电影名
name = doc('a > h2').text()
# 3. 分类列表
categories = [item.text() for item in doc('.categories button span').items()]
# 4. 上映时间(注意:有些电影没上映时间,或者不在标准位置)
published_info = doc('.info:contains(上映)').text() # PyQuery独有的contains选择器!
published_at = None
if published_info:
match = re.search(r'\d{4}-\d{2}-\d{2}', published_info)
published_at = match.group(1) if match else None
# 5. 剧情简介
drama = doc('.drama p').text()
# 6. 评分(转成浮点数,方便后续排序/分析)
score = doc('p.score').text()
score = float(score.strip()) if score and score.strip() else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
# ======================== 6. 数据存储模块 ========================
def save_movie(data):
"""
存储电影数据:用MongoDB的upsert模式
核心优势:存在则更新,不存在则插入,彻底解决重复问题
"""
if not data:
return
# 用「电影名」作为唯一索引键(崔老师的站电影名不会重复)
result = collection.update_one(
{'name': data['name']},
{'$set': data},
upsert=True
)
# 日志反馈插入/更新状态
if result.upserted_id:
logging.info('✅ 新增电影: %s', data['name'])
else:
logging.info('🔄 更新电影: %s', data['name'])
# ======================== 7. 单页完整逻辑 + 多进程调度 ========================
def process_page(page_num):
"""单页的完整流程:抓列表→解析出URL→抓每个详情→解析→存储"""
index_html = scrape_index(page_num)
if not index_html:
return
detail_urls = parse_index(index_html)
for url in detail_urls:
detail_html = scrape_detail(url)
if not detail_html:
continue
movie_data = parse_detail(detail_html)
save_movie(movie_data)
if __name__ == '__main__':
"""
多进程启动入口:必须写在if __name__ == '__main__'里!
Pool()默认开启CPU核心数个进程,I/O密集型爬虫可以开更多
"""
logging.info('🎬 电影爬虫正式启动!')
pool = multiprocessing.Pool(processes=5) # 崔老师的站比较小,开5个足够
pages = range(1, TOTAL_PAGE + 1)
pool.map(process_page, pages) # map会自动把pages里的每个元素传给process_page
pool.close() # 关闭进程池,不再接受新任务
pool.join() # 等待所有子进程完成
logging.info('🎉 所有任务完成!')