电商App商品滑动抓取项目

本项目展示如何构建一个电商App商品信息抓取系统,通过自动化滑动操作获取大量商品数据。

电商App抓取系统设计

# ecommerce_scraper_system.py
import uiautomator2 as u2
import time
import random
import json
import sqlite3
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Tuple
import logging
from dataclasses import dataclass
import pandas as pd

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class ScrapingConfig:
    """抓取配置类"""
    app_package: str = "com.taobao.taobao"  # 默认淘宝
    category_keywords: List[str] = None
    max_products_per_category: int = 100
    scroll_interval: float = 2.0  # 滑动间隔(秒)
    product_detection_timeout: int = 10  # 产品检测超时(秒)
    
    def __post_init__(self):
        if self.category_keywords is None:
            self.category_keywords = [
                "手机", "笔记本", "服装", "鞋子", "化妆品", 
                "食品", "家电", "图书", "运动", "数码"
            ]

class EcommerceDatabase:
    """电商数据库管理器"""
    
    def __init__(self, db_path: str = "ecommerce_data.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建商品信息表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id TEXT UNIQUE,
                title TEXT,
                price REAL,
                original_price REAL,
                discount REAL,
                sales_count INTEGER,
                shop_name TEXT,
                category TEXT,
                rating REAL,
                review_count INTEGER,
                image_urls TEXT,
                detail_url TEXT,
                crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建店铺信息表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS shops (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                shop_id TEXT UNIQUE,
                shop_name TEXT,
                shop_rating REAL,
                total_sales INTEGER,
                location TEXT,
                crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建分类信息表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS categories (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                category_name TEXT UNIQUE,
                search_keyword TEXT,
                product_count INTEGER,
                crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建抓取日志表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS crawl_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                category TEXT,
                products_found INTEGER,
                products_crawled INTEGER,
                success_rate REAL,
                duration_seconds INTEGER,
                error_count INTEGER,
                started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                finished_at TIMESTAMP
            )
        ''')
        
        conn.commit()
        conn.close()
        logger.info("电商数据库初始化完成")
    
    def save_product(self, product_info: Dict):
        """保存商品信息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
                INSERT OR REPLACE INTO products 
                (product_id, title, price, original_price, discount, 
                 sales_count, shop_name, category, rating, review_count, 
                 image_urls, detail_url)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                product_info.get('product_id'),
                product_info.get('title', ''),
                product_info.get('price'),
                product_info.get('original_price'),
                product_info.get('discount'),
                product_info.get('sales_count', 0),
                product_info.get('shop_name', ''),
                product_info.get('category', ''),
                product_info.get('rating'),
                product_info.get('review_count', 0),
                json.dumps(product_info.get('image_urls', [])),
                product_info.get('detail_url', '')
            ))
            conn.commit()
            logger.info(f"商品信息已保存: {product_info.get('title', '')[:20]}...")
        except Exception as e:
            logger.error(f"保存商品信息失败: {e}")
        finally:
            conn.close()
    
    def save_shop(self, shop_info: Dict):
        """保存店铺信息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
                INSERT OR REPLACE INTO shops
                (shop_id, shop_name, shop_rating, total_sales, location)
                VALUES (?, ?, ?, ?, ?)
            ''', (
                shop_info.get('shop_id'),
                shop_info.get('shop_name'),
                shop_info.get('shop_rating'),
                shop_info.get('total_sales'),
                shop_info.get('location')
            ))
            conn.commit()
            logger.info(f"店铺信息已保存: {shop_info.get('shop_name')}")
        except Exception as e:
            logger.error(f"保存店铺信息失败: {e}")
        finally:
            conn.close()
    
    def save_category_stats(self, category: str, search_keyword: str, product_count: int):
        """保存分类统计信息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
                INSERT OR REPLACE INTO categories
                (category_name, search_keyword, product_count)
                VALUES (?, ?, ?)
            ''', (category, search_keyword, product_count))
            conn.commit()
            logger.info(f"分类统计已保存: {category}")
        except Exception as e:
            logger.error(f"保存分类统计失败: {e}")
        finally:
            conn.close()
    
    def save_crawl_log(self, log_info: Dict):
        """保存抓取日志"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
                INSERT INTO crawl_logs
                (category, products_found, products_crawled, success_rate, 
                 duration_seconds, error_count, finished_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                log_info.get('category'),
                log_info.get('products_found', 0),
                log_info.get('products_crawled', 0),
                log_info.get('success_rate', 0.0),
                log_info.get('duration_seconds', 0),
                log_info.get('error_count', 0),
                datetime.now()
            ))
            conn.commit()
            logger.info(f"抓取日志已保存: {log_info.get('category')}")
        except Exception as e:
            logger.error(f"保存抓取日志失败: {e}")
        finally:
            conn.close()

class EcommerceScraper:
    """电商App抓取器"""
    
    def __init__(self, config: ScrapingConfig = None, db_path: str = "ecommerce_data.db"):
        self.config = config or ScrapingConfig()
        self.db = EcommerceDatabase(db_path)
        self.d = None
        self.product_counter = 0
        self.error_count = 0
        self.setup_device()
    
    def setup_device(self):
        """设置设备连接"""
        try:
            self.d = u2.connect()
            if self.d:
                logger.info(f"设备连接成功: {self.d.serial}")
                logger.info(f"设备信息: {self.d.info}")
            else:
                logger.error("设备连接失败")
        except Exception as e:
            logger.error(f"设备连接失败: {e}")
    
    def launch_ecommerce_app(self):
        """启动电商应用"""
        try:
            self.d.app_start(self.config.app_package)
            time.sleep(8)  # 等待应用完全启动
            logger.info(f"电商应用已启动: {self.config.app_package}")
            return True
        except Exception as e:
            logger.error(f"启动应用失败: {e}")
            return False
    
    def search_products(self, keyword: str) -> bool:
        """搜索商品"""
        try:
            # 点击搜索框(不同应用可能有不同的资源ID)
            search_selectors = [
                'com.taobao.taobao:id/searchbar_hint_view',  # 淘宝
                'com.taobao.taobao:id/searchEdit',           # 淘宝
                'com.jingdong.app.mall:id/search_widget_text', # 京东
                'com.suning.mobile.ebuy:id/search_bar_text',  # 苏宁
            ]
            
            search_element = None
            for selector in search_selectors:
                search_element = self.d(resourceId=selector).exists(timeout=3)
                if search_element:
                    break
            
            if not search_element:
                # 尝试通过描述查找
                search_element = self.d(descriptionMatches=r'搜索|Search|输入').exists(timeout=3)
            
            if search_element:
                search_element.click()
                time.sleep(2)
                
                # 输入搜索关键词
                self.d.send_keys(keyword)
                time.sleep(1)
                
                # 点击搜索按钮
                search_btn_selectors = [
                    'com.taobao.taobao:id/search_confirm',
                    'com.jingdong.app.mall:id/search_btn',
                    'com.suning.mobile.ebuy:id/search_btn'
                ]
                
                search_btn = None
                for selector in search_btn_selectors:
                    search_btn = self.d(resourceId=selector).exists(timeout=2)
                    if search_btn:
                        break
                
                if not search_btn:
                    # 尝试通用搜索按钮
                    search_btn = self.d(textMatches=r'搜索|Search').exists(timeout=2)
                
                if search_btn:
                    search_btn.click()
                    time.sleep(5)  # 等待搜索结果加载
                    logger.info(f"搜索商品成功: {keyword}")
                    return True
                else:
                    logger.error("未找到搜索按钮")
                    return False
            else:
                logger.error("未找到搜索框")
                return False
                
        except Exception as e:
            logger.error(f"搜索商品失败: {e}")
            return False
    
    def scroll_and_detect_products(self, max_products: int = 50) -> List[Dict]:
        """滑动并检测商品"""
        products = []
        detected_count = 0
        error_count = 0
        max_errors = 10  # 最大错误次数
        
        logger.info(f"开始滑动检测商品,最多检测 {max_products} 个")
        
        while detected_count < max_products and error_count < max_errors:
            # 获取当前页面的商品元素
            product_elements = self.find_product_elements()
            
            for element in product_elements:
                if detected_count >= max_products:
                    break
                
                try:
                    product_info = self.extract_product_info(element)
                    if product_info and product_info.get('product_id'):
                        products.append(product_info)
                        detected_count += 1
                        logger.info(f"检测到商品: {product_info.get('title', '')[:20]}...")
                        
                        # 保存到数据库
                        self.db.save_product(product_info)
                        
                        # 检查是否已达到上限
                        if detected_count >= max_products:
                            break
                except Exception as e:
                    logger.error(f"提取商品信息失败: {e}")
                    error_count += 1
            
            # 滑动到下一页
            if detected_count < max_products:
                if self.scroll_down_products():
                    time.sleep(self.config.scroll_interval)
                else:
                    error_count += 1
                    time.sleep(3)  # 等待后重试
        
        logger.info(f"商品检测完成,共检测到 {detected_count} 个商品")
        return products
    
    def find_product_elements(self) -> List:
        """查找商品元素"""
        # 不同电商App的商品元素选择器
        product_selectors = [
            'com.taobao.taobao:id/product_item',  # 淘宝商品项
            'com.jingdong.app.mall:id/product_item',  # 京东商品项
            'android.widget.RelativeLayout',  # 通用容器
            'android.widget.LinearLayout',    # 通用容器
        ]
        
        elements = []
        
        for selector in product_selectors:
            try:
                elems = self.d(resourceId=selector).child().all()
                elements.extend(elems)
            except:
                pass
        
        # 如果没有找到特定元素,尝试查找包含价格的商品项
        try:
            price_elements = self.d(textMatches=r'[¥¥]\d+\.?\d*').parent().all()
            elements.extend(price_elements)
        except:
            pass
        
        # 去重并返回
        unique_elements = []
        seen_bounds = set()
        
        for elem in elements:
            try:
                bounds = elem.bounds()
                bounds_key = (bounds['left'], bounds['top'], bounds['right'], bounds['bottom'])
                if bounds_key not in seen_bounds:
                    unique_elements.append(elem)
                    seen_bounds.add(bounds_key)
            except:
                continue
        
        return unique_elements
    
    def extract_product_info(self, element) -> Optional[Dict]:
        """提取商品信息"""
        try:
            product_info = {
                'product_id': f"prod_{int(time.time())}_{random.randint(1000, 9999)}",
                'title': '',
                'price': 0.0,
                'original_price': 0.0,
                'discount': 0.0,
                'sales_count': 0,
                'shop_name': '',
                'category': '',
                'rating': 0.0,
                'review_count': 0,
                'image_urls': [],
                'detail_url': ''
            }
            
            # 尝试提取各种信息
            # 1. 标题
            title_selectors = [
                'android.widget.TextView',  # 通用文本视图
                'com.taobao.taobao:id/title',  # 淘宝标题
            ]
            
            for selector in title_selectors:
                try:
                    title_elem = element(childSelector=f'//*[@resource-id="{selector}"]')
                    if title_elem.exists:
                        product_info['title'] = title_elem.get_text()[:100]  # 限制长度
                        break
                except:
                    continue
            
            # 2. 价格
            price_text = ''
            try:
                # 查找价格文本
                price_elems = element(textMatches=r'[¥¥]\d+\.?\d*').all()
                if price_elems:
                    price_text = price_elems[0].get_text()
                    # 提取数字
                    price_match = re.search(r'\d+\.?\d*', price_text)
                    if price_match:
                        product_info['price'] = float(price_match.group())
            except:
                pass
            
            # 3. 销量
            try:
                sales_elems = element(textMatches=r'(\d+(|)?人付款|\d+销量)').all()
                if sales_elems:
                    sales_text = sales_elems[0].get_text()
                    sales_match = re.search(r'\d+', sales_text)
                    if sales_match:
                        base_sales = int(sales_match.group())
                        if '万' in sales_text:
                            base_sales *= 10000
                        elif '千' in sales_text:
                            base_sales *= 1000
                        product_info['sales_count'] = base_sales
            except:
                pass
            
            # 4. 店铺名
            try:
                shop_elems = element(textMatches=r'.*(旗舰店|专卖店|专营店|个人).*').all()
                if shop_elems:
                    product_info['shop_name'] = shop_elems[0].get_text()[:50]
            except:
                pass
            
            # 如果价格为0,可能是无效商品
            if product_info['price'] == 0:
                return None
            
            return product_info
            
        except Exception as e:
            logger.error(f"提取商品信息失败: {e}")
            return None
    
    def scroll_down_products(self) -> bool:
        """向下滚动商品列表"""
        try:
            w, h = self.d.window_size()
            # 从屏幕中央向下滚动
            self.d.swipe(w//2, h*3//4, w//2, h//4, 0.8)
            time.sleep(1)  # 等待内容加载
            return True
        except Exception as e:
            logger.error(f"滚动失败: {e}")
            return False
    
    def scrape_category(self, category: str, keyword: str = None) -> Dict:
        """抓取指定分类的商品"""
        if keyword is None:
            keyword = category
        
        start_time = time.time()
        logger.info(f"开始抓取分类: {category}, 关键词: {keyword}")
        
        # 搜索商品
        if not self.search_products(keyword):
            logger.error(f"搜索失败: {keyword}")
            return {'success': False, 'error': 'search_failed'}
        
        # 滑动并检测商品
        products = self.scroll_and_detect_products(self.config.max_products_per_category)
        
        duration = int(time.time() - start_time)
        
        # 记录统计信息
        log_info = {
            'category': category,
            'products_found': len(products),
            'products_crawled': len(products),
            'success_rate': 1.0 if len(products) > 0 else 0.0,
            'duration_seconds': duration,
            'error_count': self.error_count
        }
        
        self.db.save_crawl_log(log_info)
        self.db.save_category_stats(category, keyword, len(products))
        
        logger.info(f"分类抓取完成: {category}, 发现 {len(products)} 个商品, 耗时 {duration} 秒")
        
        return {
            'success': True,
            'products_count': len(products),
            'duration': duration,
            'products': products
        }
    
    def run_scraping_session(self):
        """运行抓取会话"""
        logger.info("开始电商App抓取会话")
        
        results = []
        
        for i, category in enumerate(self.config.category_keywords):
            logger.info(f"处理第 {i+1}/{len(self.config.category_keywords)} 个分类: {category}")
            
            result = self.scrape_category(category)
            results.append(result)
            
            # 分类间休息
            if i < len(self.config.category_keywords) - 1:
                rest_time = random.uniform(10, 30)
                logger.info(f"休息 {rest_time:.1f} 秒后继续")
                time.sleep(rest_time)
        
        # 汇总结果
        total_products = sum(r.get('products_count', 0) for r in results if r.get('success'))
        total_duration = sum(r.get('duration', 0) for r in results if r.get('success'))
        
        summary = {
            'total_categories': len(results),
            'successful_categories': sum(1 for r in results if r.get('success')),
            'total_products': total_products,
            'total_duration': total_duration,
            'results': results
        }
        
        logger.info(f"抓取会话完成 - 总计: {total_products} 个商品, {len(results)} 个分类")
        return summary

def main():
    """主函数"""
    logger.info("电商App商品抓取系统启动")
    
    # 配置抓取参数
    config = ScrapingConfig(
        app_package="com.taobao.taobao",  # 淘宝应用包名
        category_keywords=["手机", "笔记本电脑", "服装"],
        max_products_per_category=20,
        scroll_interval=3.0
    )
    
    # 创建抓取器
    scraper = EcommerceScraper(config)
    
    # 启动应用
    if scraper.launch_ecommerce_app():
        # 运行抓取会话
        summary = scraper.run_scraping_session()
        logger.info(f"抓取总结: {summary}")
    else:
        logger.error("无法启动电商应用")

if __name__ == "__main__":
    main()

电商平台适配器

# platform_adapters.py - 不同电商平台的适配器
from abc import ABC, abstractmethod
import uiautomator2 as u2
from typing import Dict, List, Optional

class PlatformAdapter(ABC):
    """电商平台适配器抽象基类"""
    
    def __init__(self, d: u2.Device):
        self.d = d
    
    @abstractmethod
    def get_search_box_selector(self) -> str:
        """获取搜索框选择器"""
        pass
    
    @abstractmethod
    def get_search_button_selector(self) -> str:
        """获取搜索按钮选择器"""
        pass
    
    @abstractmethod
    def get_product_item_selector(self) -> str:
        """获取商品项选择器"""
        pass
    
    @abstractmethod
    def get_product_price_selector(self) -> str:
        """获取商品价格选择器"""
        pass
    
    @abstractmethod
    def get_product_title_selector(self) -> str:
        """获取商品标题选择器"""
        pass

class TaobaoAdapter(PlatformAdapter):
    """淘宝适配器"""
    
    def get_search_box_selector(self) -> str:
        return "com.taobao.taobao:id/searchbar_hint_view"
    
    def get_search_button_selector(self) -> str:
        return "com.taobao.taobao:id/search_confirm"
    
    def get_product_item_selector(self) -> str:
        return "com.taobao.taobao:id/product_item"
    
    def get_product_price_selector(self) -> str:
        return "com.taobao.taobao:id/price"
    
    def get_product_title_selector(self) -> str:
        return "com.taobao.taobao:id/title"

class JDAdapter(PlatformAdapter):
    """京东适配器"""
    
    def get_search_box_selector(self) -> str:
        return "com.jingdong.app.mall:id/search_widget_text"
    
    def get_search_button_selector(self) -> str:
        return "com.jingdong.app.mall:id/search_btn"
    
    def get_product_item_selector(self) -> str:
        return "com.jingdong.app.mall:id/product_item"
    
    def get_product_price_selector(self) -> str:
        return "com.jingdong.app.mall:id/jd_money"
    
    def get_product_title_selector(self) -> str:
        return "com.jingdong.app.mall:id/good_title"

class PddAdapter(PlatformAdapter):
    """拼多多适配器"""
    
    def get_search_box_selector(self) -> str:
        return "com.xunmeng.pinduoduo:id/search_edit_text"
    
    def get_search_button_selector(self) -> str:
        return "com.xunmeng.pinduoduo:id/search_btn"
    
    def get_product_item_selector(self) -> str:
        return "com.xunmeng.pinduoduo:id/product_item"
    
    def get_product_price_selector(self) -> str:
        return "com.xunmeng.pinduoduo:id/goods_price"
    
    def get_product_title_selector(self) -> str:
        return "com.xunmeng.pinduoduo:id/goods_title"

class PlatformAdapterFactory:
    """平台适配器工厂"""
    
    @staticmethod
    def create_adapter(platform_name: str, d: u2.Device) -> Optional[PlatformAdapter]:
        adapters = {
            'taobao': TaobaoAdapter,
            'jd': JDAdapter,
            'pinduoduo': PddAdapter,
        }
        
        adapter_class = adapters.get(platform_name.lower())
        if adapter_class:
            return adapter_class(d)
        return None

# 电商平台检测器
def detect_ecommerce_platform(app_package: str) -> str:
    """检测电商平台类型"""
    platform_mapping = {
        'com.taobao.taobao': 'taobao',
        'com.jingdong.app.mall': 'jd',
        'com.xunmeng.pinduoduo': 'pinduoduo',
        'com.tmall.wireless': 'taobao',  # 天猫
        'com.suning.mobile.ebuy': 'suning',  # 苏宁
        'com.miui.shop': 'xiaomi',  # 小米商城
    }
    
    return platform_mapping.get(app_package, 'unknown')

# 电商平台专用抓取器
class PlatformSpecificScraper:
    """平台专用抓取器"""
    
    def __init__(self, d: u2.Device, app_package: str):
        self.d = d
        self.app_package = app_package
        self.platform = detect_ecommerce_platform(app_package)
        self.adapter = PlatformAdapterFactory.create_adapter(self.platform, d)
        
        if not self.adapter:
            logger.warning(f"未找到 {self.platform} 的适配器,使用通用方法")
    
    def search_products_adaptive(self, keyword: str) -> bool:
        """自适应搜索商品"""
        if self.adapter:
            # 使用适配器的方法
            search_box = self.d(resourceId=self.adapter.get_search_box_selector())
            if search_box.exists:
                search_box.click()
                self.d.send_keys(keyword)
                
                search_btn = self.d(resourceId=self.adapter.get_search_button_selector())
                if search_btn.exists:
                    search_btn.click()
                    time.sleep(5)
                    return True
        
        # 通用搜索方法
        return self.generic_search(keyword)
    
    def generic_search(self, keyword: str) -> bool:
        """通用搜索方法"""
        # 尝试多种搜索方式
        search_selectors = [
            '//*[@resource-id="search" or contains(@resource-id, "search")]',
            '//*[@resource-id="edit" or contains(@resource-id, "edit")]',
            '//*[@text="搜索" or @text="Search"]',
        ]
        
        for selector in search_selectors:
            try:
                search_elem = self.d.xpath(selector).exists(timeout=3)
                if search_elem:
                    search_elem.click()
                    time.sleep(1)
                    self.d.send_keys(keyword)
                    time.sleep(1)
                    
                    # 点击搜索
                    self.d.press("search")
                    time.sleep(5)
                    return True
            except:
                continue
        
        return False

数据分析和报告

# ecommerce_analytics.py - 电商数据分析
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from typing import Dict, List
import numpy as np

class EcommerceAnalytics:
    """电商数据分析器"""
    
    def __init__(self, db_path: str = "ecommerce_data.db"):
        self.db_path = db_path
    
    def get_product_statistics(self) -> Dict:
        """获取商品统计信息"""
        conn = sqlite3.connect(self.db_path)
        
        # 商品总数和各类别统计
        query = """
        SELECT 
            COUNT(*) as total_products,
            AVG(price) as avg_price,
            MIN(price) as min_price,
            MAX(price) as max_price,
            category,
            COUNT(*) as category_count
        FROM products 
        GROUP BY category
        ORDER BY category_count DESC
        """
        
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        stats = {
            'total_products': df['total_products'].iloc[0] if not df.empty else 0,
            'average_price': df['avg_price'].iloc[0] if not df.empty else 0,
            'price_range': {
                'min': df['min_price'].iloc[0] if not df.empty else 0,
                'max': df['max_price'].iloc[0] if not df.empty else 0
            },
            'category_distribution': dict(zip(df['category'], df['category_count'])) if not df.empty else {}
        }
        
        return stats
    
    def get_price_analysis(self) -> Dict:
        """价格分析"""
        conn = sqlite3.connect(self.db_path)
        
        query = """
        SELECT 
            price,
            sales_count,
            category,
            shop_name
        FROM products 
        WHERE price > 0 AND sales_count > 0
        """
        
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        if df.empty:
            return {}
        
        # 价格区间分析
        price_bins = [0, 50, 100, 200, 500, 1000, float('inf')]
        price_labels = ['0-50', '50-100', '100-200', '200-500', '500-1000', '1000+']
        df['price_range'] = pd.cut(df['price'], bins=price_bins, labels=price_labels, right=False)
        
        analysis = {
            'price_distribution': df['price_range'].value_counts().to_dict(),
            'avg_sales_by_price_range': df.groupby('price_range')['sales_count'].mean().to_dict(),
            'correlation_price_sales': df['price'].corr(df['sales_count']) if len(df) > 1 else 0,
            'top_priced_items': df.nlargest(10, 'price')[['price', 'title']].to_dict('records')
        }
        
        return analysis
    
    def get_category_performance(self) -> Dict:
        """分类表现分析"""
        conn = sqlite3.connect(self.db_path)
        
        query = """
        SELECT 
            category,
            COUNT(*) as product_count,
            AVG(price) as avg_price,
            AVG(sales_count) as avg_sales,
            AVG(rating) as avg_rating
        FROM products 
        GROUP BY category
        ORDER BY product_count DESC
        """
        
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        return df.to_dict('records') if not df.empty else []
    
    def generate_report(self) -> str:
        """生成分析报告"""
        product_stats = self.get_product_statistics()
        price_analysis = self.get_price_analysis()
        category_performance = self.get_category_performance()
        
        report = f"""
=== 电商App商品抓取分析报告 ===

📊 商品统计:
- 总商品数: {product_stats['total_products']}
- 平均价格: ¥{product_stats['average_price']:.2f}
- 价格范围: ¥{product_stats['price_range']['min']} - ¥{product_stats['price_range']['max']}

🏷️  分类分布:
"""
        for category, count in list(product_stats['category_distribution'].items())[:10]:
            report += f"- {category}: {count}\n"
        
        if price_analysis:
            report += f"\n💰 价格分析:\n"
            report += f"- 价格区间分布: {dict(list(price_analysis['price_distribution'].items())[:5])}\n"
            report += f"- 价格销量相关系数: {price_analysis['correlation_price_sales']:.3f}\n"
        
        report += f"\n🏆 分类表现 (Top 5):\n"
        for perf in category_performance[:5]:
            report += f"- {perf['category']}: 平均价格¥{perf['avg_price']:.2f}, 平均销量{perf['avg_sales']:.0f}\n"
        
        return report
    
    def plot_price_distribution(self):
        """绘制价格分布图"""
        conn = sqlite3.connect(self.db_path)
        
        query = "SELECT price FROM products WHERE price > 0 AND price < 1000"  # 过滤异常高价
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        if df.empty:
            print("暂无价格数据可绘制")
            return
        
        plt.figure(figsize=(12, 6))
        
        # 价格分布直方图
        plt.subplot(1, 2, 1)
        plt.hist(df['price'], bins=50, alpha=0.7, color='skyblue', edgecolor='black')
        plt.title('商品价格分布')
        plt.xlabel('价格 (¥)')
        plt.ylabel('商品数量')
        
        # 价格箱线图
        plt.subplot(1, 2, 2)
        plt.boxplot(df['price'])
        plt.title('价格箱线图')
        plt.ylabel('价格 (¥)')
        
        plt.tight_layout()
        plt.savefig('price_distribution.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        print("📈 价格分布图已保存为 price_distribution.png")
    
    def plot_category_analysis(self):
        """绘制分类分析图"""
        conn = sqlite3.connect(self.db_path)
        
        query = """
        SELECT 
            category,
            COUNT(*) as product_count,
            AVG(price) as avg_price,
            AVG(sales_count) as avg_sales
        FROM products 
        WHERE category IS NOT NULL
        GROUP BY category
        ORDER BY product_count DESC
        LIMIT 10
        """
        
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        if df.empty:
            print("暂无分类数据可绘制")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # 分类商品数量
        axes[0, 0].bar(range(len(df)), df['product_count'])
        axes[0, 0].set_title('各分类商品数量')
        axes[0, 0].set_xticks(range(len(df)))
        axes[0, 0].set_xticklabels(df['category'], rotation=45, ha='right')
        
        # 分类平均价格
        axes[0, 1].bar(range(len(df)), df['avg_price'])
        axes[0, 1].set_title('各分类平均价格')
        axes[0, 1].set_xticks(range(len(df)))
        axes[0, 1].set_xticklabels(df['category'], rotation=45, ha='right')
        
        # 分类平均销量
        axes[1, 0].bar(range(len(df)), df['avg_sales'])
        axes[1, 0].set_title('各分类平均销量')
        axes[1, 0].set_xticks(range(len(df)))
        axes[1, 0].set_xticklabels(df['category'], rotation=45, ha='right')
        
        # 价格vs销量散点图
        axes[1, 1].scatter(df['avg_price'], df['avg_sales'], alpha=0.6)
        axes[1, 1].set_title('平均价格 vs 平均销量')
        axes[1, 1].set_xlabel('平均价格 (¥)')
        axes[1, 1].set_ylabel('平均销量')
        
        # 添加趋势线
        z = np.polyfit(df['avg_price'], df['avg_sales'], 1)
        p = np.poly1d(z)
        axes[1, 1].plot(df['avg_price'], p(df['avg_price']), "r--", alpha=0.8)
        
        plt.tight_layout()
        plt.savefig('category_analysis.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        print("📊 分类分析图已保存为 category_analysis.png")

def run_ecommerce_analytics():
    """运行电商数据分析"""
    analytics = EcommerceAnalytics()
    
    # 生成报告
    report = analytics.generate_report()
    print(report)
    
    # 绘制图表
    analytics.plot_price_distribution()
    analytics.plot_category_analysis()

if __name__ == "__main__":
    run_ecommerce_analytics()