import os
import time
import random
from loguru import logger
from DrissionPage import ChromiumPage
from DataRecorder import Recorder
def extract_note_data(raw_json: dict) -> dict:
"""
解析小红书详情 API 返回的 JSON 数据包
兼容两种常见的返回结构
"""
# 提取笔记核心信息节点
if isinstance(raw_json, dict):
# 结构1:feed流接口返回的嵌套 items
note = raw_json.get('data', {}).get('items', [{}])[0].get('note_card', {})
if not note:
# 结构2:偶尔直接返回的 note 节点
note = raw_json.get('note', {})
else:
note = {}
# 结构化输出你需要的字段(可自行增减)
return {
'博主昵称': note.get('user', {}).get('nickname', '未知博主'),
'博主ID': note.get('user', {}).get('user_id', ''),
'笔记标题': note.get('title', ''),
'笔记正文': note.get('desc', ''),
'评论数': note.get('interact_info', {}).get('comment_count', 0),
'点赞数': note.get('interact_info', {}).get('liked_count', 0),
'收藏数': note.get('interact_info', {}).get('collected_count', 0),
'分享数': note.get('interact_info', {}).get('shared_count', 0),
'发布时间': note.get('time', ''),
}
def create_data_recorder(keyword: str) -> Recorder:
"""
为每个关键词创建独立的 Excel 存储文件
自动删除同关键词的旧文件,避免数据混乱
"""
filename = f'小红书_{keyword.strip()}_{time.strftime("%Y%m%d_%H%M%S")}.xlsx'
if os.path.exists(filename):
os.remove(filename)
logger.warning(f'🗑️ 已清理同关键词旧文件: {filename}')
recorder = Recorder(filename)
recorder.set.show_msg(False) # 关闭 DataRecorder 的默认提示
return recorder
def crawl_single_keyword(page: ChromiumPage, keyword: str, max_data: int = 20) -> None:
"""
单个关键词的采集闭环
"""
# 1️⃣ 初始化存储+布网
recorder = create_data_recorder(keyword)
page.listen.start('web/v1/feed')
logger.info(f'🚀 已启动关键词【{keyword}】的采集,目标条数:{max_data}')
# 2️⃣ 进入搜索页
search_url = f'https://www.xiaohongshu.com/search_result?keyword={keyword}&source=web_explore_feed'
page.get(search_url)
page.wait.load_start() # 等待页面DOM初步加载完成
# 3️⃣ 去重+计数初始化
seen_index = set()
data_collected = 0
while data_collected < max_data:
# 获取当前页面所有可见的笔记卡片(外层带 data-index 的 section 最稳)
cards = page.eles('xpath://*[@id="global"]//div[contains(@class,"feeds-page")]//section')
# 没抓到卡片说明滚动过快/页面没渲染,多等会儿再试
if not cards:
logger.warning('⚠️ 未加载到新卡片,尝试等待+滚动...')
page.wait(random.uniform(1.5, 2.5))
page.scroll.down(random.randint(800, 1200))
continue
# 遍历当前页的卡片
for card in cards:
if data_collected >= max_data:
break
# 用 data-index 做唯一标识去重(比用标题/图片URL稳)
card_index = card.attr('data-index')
if not card_index or card_index in seen_index:
continue
seen_index.add(card_index)
try:
logger.info(f'正在处理第 {data_collected + 1}/{max_data} 条数据...')
# 4️⃣ JS点击触发详情请求(优先点图片,兜底点卡片)
target_click = card.ele('xpath:.//img[contains(@class,"Cover")]')
if target_click:
target_click.click(by_js=True)
else:
card.click(by_js=True)
# 5️⃣ 等待详情数据包(超时4秒防卡死)
api_response = page.listen.wait(timeout=4)
if not api_response:
logger.warning(f'⏱️ 第 {data_collected + 1} 条抓包超时,跳过')
continue
# 解析+存储数据
note_info = extract_note_data(api_response.response.body)
recorder.add_data(note_info)
recorder.record() # 逐条保存防止程序崩溃数据丢失
data_collected += 1
logger.success(f'✅ 第 {data_collected} 条数据已保存')
except Exception as e:
logger.error(f'❌ 第 {data_collected + 1} 条处理异常: {str(e)[:50]}')
finally:
# 6️⃣ 无论成功失败都ESC关闭详情页,重置页面状态
page.actions.type('\ue00c')
page.wait(random.uniform(0.5, 1.2)) # 给页面0.5-1.2秒缓冲
# 7️⃣ 本轮卡片处理完,自适应滚动加载下一批
logger.info('📜 本轮卡片处理完毕,正在滚动加载更多...')
page.scroll.down(random.randint(1000, 1500))
page.wait(random.uniform(1.2, 2.0))
logger.success(f'🏆 关键词【{keyword}】采集完成!共保存 {data_collected} 条数据到: {recorder.path}')
def main():
"""
主程序入口:配置关键词、启动浏览器、扫码登录、批量采集
"""
# 配置你要采集的关键词列表
target_keywords = ['极简书桌布置', '平价减脂早餐']
# 配置每个关键词的采集条数
per_keyword_max = 15
# 启动 Chromium 浏览器(自动下载适配的驱动,无需手动配置)
page = ChromiumPage()
try:
# 先打开首页,等待人工扫码登录(必须登录!否则大概率被风控)
page.get('https://www.xiaohongshu.com')
input('🔐 请在浏览器中扫码登录小红书,完成后在此按【回车键】继续...')
# 批量采集关键词
for kw in target_keywords:
crawl_single_keyword(page, kw, per_keyword_max)
# 不同关键词之间多等一会儿,降低连续请求的风险
logger.info('⏸️ 不同关键词之间等待 3-5 秒...')
time.sleep(random.uniform(3, 5))
except Exception as main_e:
logger.error(f'❌ 主程序异常: {main_e}')
finally:
# 无论成功失败都关闭浏览器
logger.info('🚪 正在关闭浏览器...')
page.quit()
if __name__ == '__main__':
main()