#电商App商品滑动抓取项目
本项目展示如何构建一个电商App商品信息抓取系统,通过自动化滑动操作获取大量商品数据。
#电商App抓取系统设计
# ecommerce_scraper_system.py
import uiautomator2 as u2
import time
import random
import json
import sqlite3
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Tuple
import logging
from dataclasses import dataclass
import pandas as pd
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class ScrapingConfig:
"""抓取配置类"""
app_package: str = "com.taobao.taobao" # 默认淘宝
category_keywords: List[str] = None
max_products_per_category: int = 100
scroll_interval: float = 2.0 # 滑动间隔(秒)
product_detection_timeout: int = 10 # 产品检测超时(秒)
def __post_init__(self):
if self.category_keywords is None:
self.category_keywords = [
"手机", "笔记本", "服装", "鞋子", "化妆品",
"食品", "家电", "图书", "运动", "数码"
]
class EcommerceDatabase:
"""电商数据库管理器"""
def __init__(self, db_path: str = "ecommerce_data.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建商品信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT UNIQUE,
title TEXT,
price REAL,
original_price REAL,
discount REAL,
sales_count INTEGER,
shop_name TEXT,
category TEXT,
rating REAL,
review_count INTEGER,
image_urls TEXT,
detail_url TEXT,
crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建店铺信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS shops (
id INTEGER PRIMARY KEY AUTOINCREMENT,
shop_id TEXT UNIQUE,
shop_name TEXT,
shop_rating REAL,
total_sales INTEGER,
location TEXT,
crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建分类信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category_name TEXT UNIQUE,
search_keyword TEXT,
product_count INTEGER,
crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建抓取日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS crawl_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT,
products_found INTEGER,
products_crawled INTEGER,
success_rate REAL,
duration_seconds INTEGER,
error_count INTEGER,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
finished_at TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info("电商数据库初始化完成")
def save_product(self, product_info: Dict):
"""保存商品信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO products
(product_id, title, price, original_price, discount,
sales_count, shop_name, category, rating, review_count,
image_urls, detail_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
product_info.get('product_id'),
product_info.get('title', ''),
product_info.get('price'),
product_info.get('original_price'),
product_info.get('discount'),
product_info.get('sales_count', 0),
product_info.get('shop_name', ''),
product_info.get('category', ''),
product_info.get('rating'),
product_info.get('review_count', 0),
json.dumps(product_info.get('image_urls', [])),
product_info.get('detail_url', '')
))
conn.commit()
logger.info(f"商品信息已保存: {product_info.get('title', '')[:20]}...")
except Exception as e:
logger.error(f"保存商品信息失败: {e}")
finally:
conn.close()
def save_shop(self, shop_info: Dict):
"""保存店铺信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO shops
(shop_id, shop_name, shop_rating, total_sales, location)
VALUES (?, ?, ?, ?, ?)
''', (
shop_info.get('shop_id'),
shop_info.get('shop_name'),
shop_info.get('shop_rating'),
shop_info.get('total_sales'),
shop_info.get('location')
))
conn.commit()
logger.info(f"店铺信息已保存: {shop_info.get('shop_name')}")
except Exception as e:
logger.error(f"保存店铺信息失败: {e}")
finally:
conn.close()
def save_category_stats(self, category: str, search_keyword: str, product_count: int):
"""保存分类统计信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO categories
(category_name, search_keyword, product_count)
VALUES (?, ?, ?)
''', (category, search_keyword, product_count))
conn.commit()
logger.info(f"分类统计已保存: {category}")
except Exception as e:
logger.error(f"保存分类统计失败: {e}")
finally:
conn.close()
def save_crawl_log(self, log_info: Dict):
"""保存抓取日志"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO crawl_logs
(category, products_found, products_crawled, success_rate,
duration_seconds, error_count, finished_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
log_info.get('category'),
log_info.get('products_found', 0),
log_info.get('products_crawled', 0),
log_info.get('success_rate', 0.0),
log_info.get('duration_seconds', 0),
log_info.get('error_count', 0),
datetime.now()
))
conn.commit()
logger.info(f"抓取日志已保存: {log_info.get('category')}")
except Exception as e:
logger.error(f"保存抓取日志失败: {e}")
finally:
conn.close()
class EcommerceScraper:
"""电商App抓取器"""
def __init__(self, config: ScrapingConfig = None, db_path: str = "ecommerce_data.db"):
self.config = config or ScrapingConfig()
self.db = EcommerceDatabase(db_path)
self.d = None
self.product_counter = 0
self.error_count = 0
self.setup_device()
def setup_device(self):
"""设置设备连接"""
try:
self.d = u2.connect()
if self.d:
logger.info(f"设备连接成功: {self.d.serial}")
logger.info(f"设备信息: {self.d.info}")
else:
logger.error("设备连接失败")
except Exception as e:
logger.error(f"设备连接失败: {e}")
def launch_ecommerce_app(self):
"""启动电商应用"""
try:
self.d.app_start(self.config.app_package)
time.sleep(8) # 等待应用完全启动
logger.info(f"电商应用已启动: {self.config.app_package}")
return True
except Exception as e:
logger.error(f"启动应用失败: {e}")
return False
def search_products(self, keyword: str) -> bool:
"""搜索商品"""
try:
# 点击搜索框(不同应用可能有不同的资源ID)
search_selectors = [
'com.taobao.taobao:id/searchbar_hint_view', # 淘宝
'com.taobao.taobao:id/searchEdit', # 淘宝
'com.jingdong.app.mall:id/search_widget_text', # 京东
'com.suning.mobile.ebuy:id/search_bar_text', # 苏宁
]
search_element = None
for selector in search_selectors:
search_element = self.d(resourceId=selector).exists(timeout=3)
if search_element:
break
if not search_element:
# 尝试通过描述查找
search_element = self.d(descriptionMatches=r'搜索|Search|输入').exists(timeout=3)
if search_element:
search_element.click()
time.sleep(2)
# 输入搜索关键词
self.d.send_keys(keyword)
time.sleep(1)
# 点击搜索按钮
search_btn_selectors = [
'com.taobao.taobao:id/search_confirm',
'com.jingdong.app.mall:id/search_btn',
'com.suning.mobile.ebuy:id/search_btn'
]
search_btn = None
for selector in search_btn_selectors:
search_btn = self.d(resourceId=selector).exists(timeout=2)
if search_btn:
break
if not search_btn:
# 尝试通用搜索按钮
search_btn = self.d(textMatches=r'搜索|Search').exists(timeout=2)
if search_btn:
search_btn.click()
time.sleep(5) # 等待搜索结果加载
logger.info(f"搜索商品成功: {keyword}")
return True
else:
logger.error("未找到搜索按钮")
return False
else:
logger.error("未找到搜索框")
return False
except Exception as e:
logger.error(f"搜索商品失败: {e}")
return False
def scroll_and_detect_products(self, max_products: int = 50) -> List[Dict]:
"""滑动并检测商品"""
products = []
detected_count = 0
error_count = 0
max_errors = 10 # 最大错误次数
logger.info(f"开始滑动检测商品,最多检测 {max_products} 个")
while detected_count < max_products and error_count < max_errors:
# 获取当前页面的商品元素
product_elements = self.find_product_elements()
for element in product_elements:
if detected_count >= max_products:
break
try:
product_info = self.extract_product_info(element)
if product_info and product_info.get('product_id'):
products.append(product_info)
detected_count += 1
logger.info(f"检测到商品: {product_info.get('title', '')[:20]}...")
# 保存到数据库
self.db.save_product(product_info)
# 检查是否已达到上限
if detected_count >= max_products:
break
except Exception as e:
logger.error(f"提取商品信息失败: {e}")
error_count += 1
# 滑动到下一页
if detected_count < max_products:
if self.scroll_down_products():
time.sleep(self.config.scroll_interval)
else:
error_count += 1
time.sleep(3) # 等待后重试
logger.info(f"商品检测完成,共检测到 {detected_count} 个商品")
return products
def find_product_elements(self) -> List:
"""查找商品元素"""
# 不同电商App的商品元素选择器
product_selectors = [
'com.taobao.taobao:id/product_item', # 淘宝商品项
'com.jingdong.app.mall:id/product_item', # 京东商品项
'android.widget.RelativeLayout', # 通用容器
'android.widget.LinearLayout', # 通用容器
]
elements = []
for selector in product_selectors:
try:
elems = self.d(resourceId=selector).child().all()
elements.extend(elems)
except:
pass
# 如果没有找到特定元素,尝试查找包含价格的商品项
try:
price_elements = self.d(textMatches=r'[¥¥]\d+\.?\d*').parent().all()
elements.extend(price_elements)
except:
pass
# 去重并返回
unique_elements = []
seen_bounds = set()
for elem in elements:
try:
bounds = elem.bounds()
bounds_key = (bounds['left'], bounds['top'], bounds['right'], bounds['bottom'])
if bounds_key not in seen_bounds:
unique_elements.append(elem)
seen_bounds.add(bounds_key)
except:
continue
return unique_elements
def extract_product_info(self, element) -> Optional[Dict]:
"""提取商品信息"""
try:
product_info = {
'product_id': f"prod_{int(time.time())}_{random.randint(1000, 9999)}",
'title': '',
'price': 0.0,
'original_price': 0.0,
'discount': 0.0,
'sales_count': 0,
'shop_name': '',
'category': '',
'rating': 0.0,
'review_count': 0,
'image_urls': [],
'detail_url': ''
}
# 尝试提取各种信息
# 1. 标题
title_selectors = [
'android.widget.TextView', # 通用文本视图
'com.taobao.taobao:id/title', # 淘宝标题
]
for selector in title_selectors:
try:
title_elem = element(childSelector=f'//*[@resource-id="{selector}"]')
if title_elem.exists:
product_info['title'] = title_elem.get_text()[:100] # 限制长度
break
except:
continue
# 2. 价格
price_text = ''
try:
# 查找价格文本
price_elems = element(textMatches=r'[¥¥]\d+\.?\d*').all()
if price_elems:
price_text = price_elems[0].get_text()
# 提取数字
price_match = re.search(r'\d+\.?\d*', price_text)
if price_match:
product_info['price'] = float(price_match.group())
except:
pass
# 3. 销量
try:
sales_elems = element(textMatches=r'(\d+(万|千)?人付款|\d+销量)').all()
if sales_elems:
sales_text = sales_elems[0].get_text()
sales_match = re.search(r'\d+', sales_text)
if sales_match:
base_sales = int(sales_match.group())
if '万' in sales_text:
base_sales *= 10000
elif '千' in sales_text:
base_sales *= 1000
product_info['sales_count'] = base_sales
except:
pass
# 4. 店铺名
try:
shop_elems = element(textMatches=r'.*(旗舰店|专卖店|专营店|个人).*').all()
if shop_elems:
product_info['shop_name'] = shop_elems[0].get_text()[:50]
except:
pass
# 如果价格为0,可能是无效商品
if product_info['price'] == 0:
return None
return product_info
except Exception as e:
logger.error(f"提取商品信息失败: {e}")
return None
def scroll_down_products(self) -> bool:
"""向下滚动商品列表"""
try:
w, h = self.d.window_size()
# 从屏幕中央向下滚动
self.d.swipe(w//2, h*3//4, w//2, h//4, 0.8)
time.sleep(1) # 等待内容加载
return True
except Exception as e:
logger.error(f"滚动失败: {e}")
return False
def scrape_category(self, category: str, keyword: str = None) -> Dict:
"""抓取指定分类的商品"""
if keyword is None:
keyword = category
start_time = time.time()
logger.info(f"开始抓取分类: {category}, 关键词: {keyword}")
# 搜索商品
if not self.search_products(keyword):
logger.error(f"搜索失败: {keyword}")
return {'success': False, 'error': 'search_failed'}
# 滑动并检测商品
products = self.scroll_and_detect_products(self.config.max_products_per_category)
duration = int(time.time() - start_time)
# 记录统计信息
log_info = {
'category': category,
'products_found': len(products),
'products_crawled': len(products),
'success_rate': 1.0 if len(products) > 0 else 0.0,
'duration_seconds': duration,
'error_count': self.error_count
}
self.db.save_crawl_log(log_info)
self.db.save_category_stats(category, keyword, len(products))
logger.info(f"分类抓取完成: {category}, 发现 {len(products)} 个商品, 耗时 {duration} 秒")
return {
'success': True,
'products_count': len(products),
'duration': duration,
'products': products
}
def run_scraping_session(self):
"""运行抓取会话"""
logger.info("开始电商App抓取会话")
results = []
for i, category in enumerate(self.config.category_keywords):
logger.info(f"处理第 {i+1}/{len(self.config.category_keywords)} 个分类: {category}")
result = self.scrape_category(category)
results.append(result)
# 分类间休息
if i < len(self.config.category_keywords) - 1:
rest_time = random.uniform(10, 30)
logger.info(f"休息 {rest_time:.1f} 秒后继续")
time.sleep(rest_time)
# 汇总结果
total_products = sum(r.get('products_count', 0) for r in results if r.get('success'))
total_duration = sum(r.get('duration', 0) for r in results if r.get('success'))
summary = {
'total_categories': len(results),
'successful_categories': sum(1 for r in results if r.get('success')),
'total_products': total_products,
'total_duration': total_duration,
'results': results
}
logger.info(f"抓取会话完成 - 总计: {total_products} 个商品, {len(results)} 个分类")
return summary
def main():
"""主函数"""
logger.info("电商App商品抓取系统启动")
# 配置抓取参数
config = ScrapingConfig(
app_package="com.taobao.taobao", # 淘宝应用包名
category_keywords=["手机", "笔记本电脑", "服装"],
max_products_per_category=20,
scroll_interval=3.0
)
# 创建抓取器
scraper = EcommerceScraper(config)
# 启动应用
if scraper.launch_ecommerce_app():
# 运行抓取会话
summary = scraper.run_scraping_session()
logger.info(f"抓取总结: {summary}")
else:
logger.error("无法启动电商应用")
if __name__ == "__main__":
main()#电商平台适配器
# platform_adapters.py - 不同电商平台的适配器
from abc import ABC, abstractmethod
import uiautomator2 as u2
from typing import Dict, List, Optional
class PlatformAdapter(ABC):
"""电商平台适配器抽象基类"""
def __init__(self, d: u2.Device):
self.d = d
@abstractmethod
def get_search_box_selector(self) -> str:
"""获取搜索框选择器"""
pass
@abstractmethod
def get_search_button_selector(self) -> str:
"""获取搜索按钮选择器"""
pass
@abstractmethod
def get_product_item_selector(self) -> str:
"""获取商品项选择器"""
pass
@abstractmethod
def get_product_price_selector(self) -> str:
"""获取商品价格选择器"""
pass
@abstractmethod
def get_product_title_selector(self) -> str:
"""获取商品标题选择器"""
pass
class TaobaoAdapter(PlatformAdapter):
"""淘宝适配器"""
def get_search_box_selector(self) -> str:
return "com.taobao.taobao:id/searchbar_hint_view"
def get_search_button_selector(self) -> str:
return "com.taobao.taobao:id/search_confirm"
def get_product_item_selector(self) -> str:
return "com.taobao.taobao:id/product_item"
def get_product_price_selector(self) -> str:
return "com.taobao.taobao:id/price"
def get_product_title_selector(self) -> str:
return "com.taobao.taobao:id/title"
class JDAdapter(PlatformAdapter):
"""京东适配器"""
def get_search_box_selector(self) -> str:
return "com.jingdong.app.mall:id/search_widget_text"
def get_search_button_selector(self) -> str:
return "com.jingdong.app.mall:id/search_btn"
def get_product_item_selector(self) -> str:
return "com.jingdong.app.mall:id/product_item"
def get_product_price_selector(self) -> str:
return "com.jingdong.app.mall:id/jd_money"
def get_product_title_selector(self) -> str:
return "com.jingdong.app.mall:id/good_title"
class PddAdapter(PlatformAdapter):
"""拼多多适配器"""
def get_search_box_selector(self) -> str:
return "com.xunmeng.pinduoduo:id/search_edit_text"
def get_search_button_selector(self) -> str:
return "com.xunmeng.pinduoduo:id/search_btn"
def get_product_item_selector(self) -> str:
return "com.xunmeng.pinduoduo:id/product_item"
def get_product_price_selector(self) -> str:
return "com.xunmeng.pinduoduo:id/goods_price"
def get_product_title_selector(self) -> str:
return "com.xunmeng.pinduoduo:id/goods_title"
class PlatformAdapterFactory:
"""平台适配器工厂"""
@staticmethod
def create_adapter(platform_name: str, d: u2.Device) -> Optional[PlatformAdapter]:
adapters = {
'taobao': TaobaoAdapter,
'jd': JDAdapter,
'pinduoduo': PddAdapter,
}
adapter_class = adapters.get(platform_name.lower())
if adapter_class:
return adapter_class(d)
return None
# 电商平台检测器
def detect_ecommerce_platform(app_package: str) -> str:
"""检测电商平台类型"""
platform_mapping = {
'com.taobao.taobao': 'taobao',
'com.jingdong.app.mall': 'jd',
'com.xunmeng.pinduoduo': 'pinduoduo',
'com.tmall.wireless': 'taobao', # 天猫
'com.suning.mobile.ebuy': 'suning', # 苏宁
'com.miui.shop': 'xiaomi', # 小米商城
}
return platform_mapping.get(app_package, 'unknown')
# 电商平台专用抓取器
class PlatformSpecificScraper:
"""平台专用抓取器"""
def __init__(self, d: u2.Device, app_package: str):
self.d = d
self.app_package = app_package
self.platform = detect_ecommerce_platform(app_package)
self.adapter = PlatformAdapterFactory.create_adapter(self.platform, d)
if not self.adapter:
logger.warning(f"未找到 {self.platform} 的适配器,使用通用方法")
def search_products_adaptive(self, keyword: str) -> bool:
"""自适应搜索商品"""
if self.adapter:
# 使用适配器的方法
search_box = self.d(resourceId=self.adapter.get_search_box_selector())
if search_box.exists:
search_box.click()
self.d.send_keys(keyword)
search_btn = self.d(resourceId=self.adapter.get_search_button_selector())
if search_btn.exists:
search_btn.click()
time.sleep(5)
return True
# 通用搜索方法
return self.generic_search(keyword)
def generic_search(self, keyword: str) -> bool:
"""通用搜索方法"""
# 尝试多种搜索方式
search_selectors = [
'//*[@resource-id="search" or contains(@resource-id, "search")]',
'//*[@resource-id="edit" or contains(@resource-id, "edit")]',
'//*[@text="搜索" or @text="Search"]',
]
for selector in search_selectors:
try:
search_elem = self.d.xpath(selector).exists(timeout=3)
if search_elem:
search_elem.click()
time.sleep(1)
self.d.send_keys(keyword)
time.sleep(1)
# 点击搜索
self.d.press("search")
time.sleep(5)
return True
except:
continue
return False#数据分析和报告
# ecommerce_analytics.py - 电商数据分析
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from typing import Dict, List
import numpy as np
class EcommerceAnalytics:
"""电商数据分析器"""
def __init__(self, db_path: str = "ecommerce_data.db"):
self.db_path = db_path
def get_product_statistics(self) -> Dict:
"""获取商品统计信息"""
conn = sqlite3.connect(self.db_path)
# 商品总数和各类别统计
query = """
SELECT
COUNT(*) as total_products,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price,
category,
COUNT(*) as category_count
FROM products
GROUP BY category
ORDER BY category_count DESC
"""
df = pd.read_sql_query(query, conn)
conn.close()
stats = {
'total_products': df['total_products'].iloc[0] if not df.empty else 0,
'average_price': df['avg_price'].iloc[0] if not df.empty else 0,
'price_range': {
'min': df['min_price'].iloc[0] if not df.empty else 0,
'max': df['max_price'].iloc[0] if not df.empty else 0
},
'category_distribution': dict(zip(df['category'], df['category_count'])) if not df.empty else {}
}
return stats
def get_price_analysis(self) -> Dict:
"""价格分析"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT
price,
sales_count,
category,
shop_name
FROM products
WHERE price > 0 AND sales_count > 0
"""
df = pd.read_sql_query(query, conn)
conn.close()
if df.empty:
return {}
# 价格区间分析
price_bins = [0, 50, 100, 200, 500, 1000, float('inf')]
price_labels = ['0-50', '50-100', '100-200', '200-500', '500-1000', '1000+']
df['price_range'] = pd.cut(df['price'], bins=price_bins, labels=price_labels, right=False)
analysis = {
'price_distribution': df['price_range'].value_counts().to_dict(),
'avg_sales_by_price_range': df.groupby('price_range')['sales_count'].mean().to_dict(),
'correlation_price_sales': df['price'].corr(df['sales_count']) if len(df) > 1 else 0,
'top_priced_items': df.nlargest(10, 'price')[['price', 'title']].to_dict('records')
}
return analysis
def get_category_performance(self) -> Dict:
"""分类表现分析"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT
category,
COUNT(*) as product_count,
AVG(price) as avg_price,
AVG(sales_count) as avg_sales,
AVG(rating) as avg_rating
FROM products
GROUP BY category
ORDER BY product_count DESC
"""
df = pd.read_sql_query(query, conn)
conn.close()
return df.to_dict('records') if not df.empty else []
def generate_report(self) -> str:
"""生成分析报告"""
product_stats = self.get_product_statistics()
price_analysis = self.get_price_analysis()
category_performance = self.get_category_performance()
report = f"""
=== 电商App商品抓取分析报告 ===
📊 商品统计:
- 总商品数: {product_stats['total_products']}
- 平均价格: ¥{product_stats['average_price']:.2f}
- 价格范围: ¥{product_stats['price_range']['min']} - ¥{product_stats['price_range']['max']}
🏷️ 分类分布:
"""
for category, count in list(product_stats['category_distribution'].items())[:10]:
report += f"- {category}: {count} 件\n"
if price_analysis:
report += f"\n💰 价格分析:\n"
report += f"- 价格区间分布: {dict(list(price_analysis['price_distribution'].items())[:5])}\n"
report += f"- 价格销量相关系数: {price_analysis['correlation_price_sales']:.3f}\n"
report += f"\n🏆 分类表现 (Top 5):\n"
for perf in category_performance[:5]:
report += f"- {perf['category']}: 平均价格¥{perf['avg_price']:.2f}, 平均销量{perf['avg_sales']:.0f}\n"
return report
def plot_price_distribution(self):
"""绘制价格分布图"""
conn = sqlite3.connect(self.db_path)
query = "SELECT price FROM products WHERE price > 0 AND price < 1000" # 过滤异常高价
df = pd.read_sql_query(query, conn)
conn.close()
if df.empty:
print("暂无价格数据可绘制")
return
plt.figure(figsize=(12, 6))
# 价格分布直方图
plt.subplot(1, 2, 1)
plt.hist(df['price'], bins=50, alpha=0.7, color='skyblue', edgecolor='black')
plt.title('商品价格分布')
plt.xlabel('价格 (¥)')
plt.ylabel('商品数量')
# 价格箱线图
plt.subplot(1, 2, 2)
plt.boxplot(df['price'])
plt.title('价格箱线图')
plt.ylabel('价格 (¥)')
plt.tight_layout()
plt.savefig('price_distribution.png', dpi=300, bbox_inches='tight')
plt.show()
print("📈 价格分布图已保存为 price_distribution.png")
def plot_category_analysis(self):
"""绘制分类分析图"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT
category,
COUNT(*) as product_count,
AVG(price) as avg_price,
AVG(sales_count) as avg_sales
FROM products
WHERE category IS NOT NULL
GROUP BY category
ORDER BY product_count DESC
LIMIT 10
"""
df = pd.read_sql_query(query, conn)
conn.close()
if df.empty:
print("暂无分类数据可绘制")
return
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 分类商品数量
axes[0, 0].bar(range(len(df)), df['product_count'])
axes[0, 0].set_title('各分类商品数量')
axes[0, 0].set_xticks(range(len(df)))
axes[0, 0].set_xticklabels(df['category'], rotation=45, ha='right')
# 分类平均价格
axes[0, 1].bar(range(len(df)), df['avg_price'])
axes[0, 1].set_title('各分类平均价格')
axes[0, 1].set_xticks(range(len(df)))
axes[0, 1].set_xticklabels(df['category'], rotation=45, ha='right')
# 分类平均销量
axes[1, 0].bar(range(len(df)), df['avg_sales'])
axes[1, 0].set_title('各分类平均销量')
axes[1, 0].set_xticks(range(len(df)))
axes[1, 0].set_xticklabels(df['category'], rotation=45, ha='right')
# 价格vs销量散点图
axes[1, 1].scatter(df['avg_price'], df['avg_sales'], alpha=0.6)
axes[1, 1].set_title('平均价格 vs 平均销量')
axes[1, 1].set_xlabel('平均价格 (¥)')
axes[1, 1].set_ylabel('平均销量')
# 添加趋势线
z = np.polyfit(df['avg_price'], df['avg_sales'], 1)
p = np.poly1d(z)
axes[1, 1].plot(df['avg_price'], p(df['avg_price']), "r--", alpha=0.8)
plt.tight_layout()
plt.savefig('category_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
print("📊 分类分析图已保存为 category_analysis.png")
def run_ecommerce_analytics():
"""运行电商数据分析"""
analytics = EcommerceAnalytics()
# 生成报告
report = analytics.generate_report()
print(report)
# 绘制图表
analytics.plot_price_distribution()
analytics.plot_category_analysis()
if __name__ == "__main__":
run_ecommerce_analytics()
