UI自动化控制技术

课程目标

  • 掌握主流UI自动化工具的特点和使用方法
  • 学会使用Appium、Airtest、uiautomator2进行App自动化控制
  • 掌握UI元素定位、图像识别、手势操作等核心技术
  • 实现抖音自动点赞与评论爬虫项目
  • 实现电商App商品滑动抓取与数据入库项目
  • 了解跨平台自动化测试的最佳实践

1. UI自动化工具对比与选择

1.1 Appium - 跨平台自动化框架

Appium是业界领先的开源移动应用自动化测试框架,支持iOS、Android和Windows应用的自动化测试。

Appium架构特点:

  • 跨平台支持:一套代码,多平台运行
  • 原生API访问:直接调用原生自动化框架
  • 多语言支持:支持Java、Python、Ruby、PHP等多种语言
  • WebDriver协议:基于标准的WebDriver协议

Appium安装与配置:

# 安装Node.js依赖
npm install -g appium
npm install -g appium-doctor

# 检查环境配置
appium-doctor --android  # 检查Android环境
appium-doctor --ios      # 检查iOS环境

# 启动Appium服务器
appium -a 127.0.0.1 -p 4723 --session-override

Python Appium客户端示例:

from appium import webdriver
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

class AppiumController:
    """Appium控制器类"""
    
    def __init__(self, platform_name="Android", device_name="Android Emulator"):
        self.desired_caps = {
            'platformName': platform_name,
            'deviceName': device_name,
            'automationName': 'UiAutomator2',  # Android使用UiAutomator2
            'appPackage': 'com.example.app',   # 替换为实际包名
            'appActivity': '.MainActivity',    # 替换为实际Activity
            'noReset': True,                   # 不重置应用状态
            'unicodeKeyboard': True,           # 支持Unicode输入
            'resetKeyboard': True              # 重置键盘
        }
        self.driver = None
        self.wait = None
    
    def connect(self, appium_server_url="http://127.0.0.1:4723"):
        """连接到Appium服务器"""
        try:
            self.driver = webdriver.Remote(appium_server_url, options=None)
            self.wait = WebDriverWait(self.driver, 10)
            print("✅ Appium连接成功")
            return True
        except Exception as e:
            print(f"❌ Appium连接失败: {e}")
            return False
    
    def find_element_by_id(self, element_id, timeout=10):
        """通过ID查找元素"""
        try:
            element = WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located((AppiumBy.ID, element_id))
            )
            return element
        except Exception as e:
            print(f"❌ 未找到ID为 {element_id} 的元素: {e}")
            return None
    
    def find_element_by_xpath(self, xpath, timeout=10):
        """通过XPath查找元素"""
        try:
            element = WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located((AppiumBy.XPATH, xpath))
            )
            return element
        except Exception as e:
            print(f"❌ 未找到XPath为 {xpath} 的元素: {e}")
            return None
    
    def find_elements_by_class_name(self, class_name, timeout=10):
        """通过类名查找多个元素"""
        try:
            elements = WebDriverWait(self.driver, timeout).until(
                EC.presence_of_all_elements_located((AppiumBy.CLASS_NAME, class_name))
            )
            return elements
        except Exception as e:
            print(f"❌ 未找到类名为 {class_name} 的元素: {e}")
            return []
    
    def tap_element(self, element):
        """点击元素"""
        try:
            element.click()
            print("✅ 元素点击成功")
            return True
        except Exception as e:
            print(f"❌ 元素点击失败: {e}")
            return False
    
    def input_text(self, element, text):
        """输入文本"""
        try:
            element.clear()
            element.send_keys(text)
            print(f"✅ 文本输入成功: {text}")
            return True
        except Exception as e:
            print(f"❌ 文本输入失败: {e}")
            return False
    
    def swipe_up(self, duration=1000):
        """向上滑动"""
        size = self.driver.get_window_size()
        start_x = size['width'] // 2
        start_y = size['height'] * 3 // 4
        end_x = size['width'] // 2
        end_y = size['height'] // 4
        self.driver.swipe(start_x, start_y, end_x, end_y, duration)
        print("✅ 向上滑动成功")
    
    def swipe_down(self, duration=1000):
        """向下滑动"""
        size = self.driver.get_window_size()
        start_x = size['width'] // 2
        start_y = size['height'] // 4
        end_x = size['width'] // 2
        end_y = size['height'] * 3 // 4
        self.driver.swipe(start_x, start_y, end_x, end_y, duration)
        print("✅ 向下滑动成功")
    
    def swipe_left(self, duration=1000):
        """向左滑动"""
        size = self.driver.get_window_size()
        start_x = size['width'] * 3 // 4
        start_y = size['height'] // 2
        end_x = size['width'] // 4
        end_y = size['height'] // 2
        self.driver.swipe(start_x, start_y, end_x, end_y, duration)
        print("✅ 向左滑动成功")
    
    def swipe_right(self, duration=1000):
        """向右滑动"""
        size = self.driver.get_window_size()
        start_x = size['width'] // 4
        start_y = size['height'] // 2
        end_x = size['width'] * 3 // 4
        end_y = size['height'] // 2
        self.driver.swipe(start_x, start_y, end_x, end_y, duration)
        print("✅ 向右滑动成功")
    
    def get_screenshot(self, filename):
        """截屏"""
        return self.driver.save_screenshot(filename)
    
    def get_page_source(self):
        """获取页面源码"""
        return self.driver.page_source
    
    def close_app(self):
        """关闭应用"""
        if self.driver:
            self.driver.quit()
            print("✅ App已关闭")

# Appium使用示例
def appium_example():
    """Appium使用示例"""
    controller = AppiumController()
    if controller.connect():
        try:
            # 这里可以添加具体的自动化操作
            print("Appium连接成功,可以开始自动化操作")
        finally:
            controller.close_app()

if __name__ == "__main__":
    appium_example()

1.2 Airtest - 网易出品的快速开发工具

Airtest是网易推出的一款基于图像识别的UI自动化测试框架,特别适合游戏和复杂界面的自动化。

Airtest特点:

  • 图像识别:基于图像识别的元素定位
  • UI树解析:同时支持UI树结构解析
  • 快速开发:IDE支持录制回放功能
  • 跨平台:支持Android、iOS、Windows、Mac、Linux

Airtest安装与使用:

# 安装Airtest
pip install airtest
pip install pocoui  # UI自动化支持

# Airtest IDE可视化工具
# 可以通过GUI界面录制和回放脚本

Airtest Python脚本示例:

from airtest.core.api import *
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
import pocounit
import time

class AirtestController:
    """Airtest控制器类"""
    
    def __init__(self, device_id=None):
        self.device_id = device_id
        self.poco = None
        self.init_device()
    
    def init_device(self):
        """初始化设备连接"""
        try:
            if self.device_id:
                connect_device(f"Android:///{self.device_id}")
            else:
                connect_device("Android:///")  # 连接默认设备
            
            # 初始化Poco(UI树解析器)
            self.poco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False)
            print("✅ Airtest设备连接成功")
        except Exception as e:
            print(f"❌ Airtest设备连接失败: {e}")
    
    def find_element_by_image(self, image_path, threshold=0.7):
        """通过图像查找元素"""
        try:
            pos = Template(image_path, threshold=threshold).match_in(get_screen())
            if pos:
                print(f"✅ 图像匹配成功: {image_path}")
                return pos
            else:
                print(f"❌ 未找到图像: {image_path}")
                return None
        except Exception as e:
            print(f"❌ 图像匹配失败: {e}")
            return None
    
    def find_element_by_text(self, text, timeout=10):
        """通过文本查找元素"""
        try:
            element = self.poco(text=text).wait(timeout)
            if element.exists():
                print(f"✅ 找到文本元素: {text}")
                return element
            else:
                print(f"❌ 未找到文本元素: {text}")
                return None
        except Exception as e:
            print(f"❌ 查找文本元素失败: {e}")
            return None
    
    def find_element_by_name(self, name, timeout=10):
        """通过名称查找元素"""
        try:
            element = self.poco(name=name).wait(timeout)
            if element.exists():
                print(f"✅ 找到名称元素: {name}")
                return element
            else:
                print(f"❌ 未找到名称元素: {name}")
                return None
        except Exception as e:
            print(f"❌ 查找名称元素失败: {e}")
            return None
    
    def click_element(self, element):
        """点击元素"""
        try:
            if hasattr(element, 'click'):
                element.click()
            else:
                touch(element)
            print("✅ 元素点击成功")
            return True
        except Exception as e:
            print(f"❌ 元素点击失败: {e}")
            return False
    
    def swipe_element(self, element, direction="up", duration=0.5):
        """滑动元素"""
        try:
            if direction == "up":
                element.swipe([0, -0.1])
            elif direction == "down":
                element.swipe([0, 0.1])
            elif direction == "left":
                element.swipe([-0.1, 0])
            elif direction == "right":
                element.swipe([0.1, 0])
            print(f"✅ {direction}方向滑动成功")
            return True
        except Exception as e:
            print(f"❌ 滑动失败: {e}")
            return False
    
    def input_text(self, element, text):
        """输入文本"""
        try:
            element.set_text(text)
            print(f"✅ 文本输入成功: {text}")
            return True
        except Exception as e:
            print(f"❌ 文本输入失败: {e}")
            return False
    
    def get_ui_tree(self):
        """获取UI树"""
        try:
            ui_tree = self.poco.agent.hierarchy.dump()
            print("✅ UI树获取成功")
            return ui_tree
        except Exception as e:
            print(f"❌ UI树获取失败: {e}")
            return None
    
    def wait_element_appear(self, element_name, timeout=10):
        """等待元素出现"""
        try:
            result = self.poco(element_name).wait(timeout)
            print(f"✅ 元素 {element_name} 出现")
            return result
        except Exception as e:
            print(f"❌ 元素 {element_name} 未在{timeout}秒内出现: {e}")
            return None
    
    def assert_element_exists(self, element_name, timeout=5):
        """断言元素存在"""
        try:
            exists = self.poco(element_name).exists()
            if exists:
                print(f"✅ 元素 {element_name} 存在")
                return True
            else:
                print(f"❌ 元素 {element_name} 不存在")
                return False
        except Exception as e:
            print(f"❌ 元素存在性检查失败: {e}")
            return False
    
    def take_screenshot(self, filename=None):
        """截屏"""
        if not filename:
            filename = f"screenshot_{int(time.time())}.png"
        try:
            snapshot(filename=filename)
            print(f"✅ 截屏保存: {filename}")
            return filename
        except Exception as e:
            print(f"❌ 截屏失败: {e}")
            return None

# Airtest使用示例
def airtest_example():
    """Airtest使用示例"""
    controller = AirtestController()
    
    # 示例操作
    controller.take_screenshot("initial_state.png")
    
    # 通过文本查找元素
    search_btn = controller.find_element_by_text("搜索")
    if search_btn:
        controller.click_element(search_btn)
    
    # 通过名称查找元素
    input_field = controller.find_element_by_name("输入框")
    if input_field:
        controller.input_text(input_field, "测试内容")
    
    # 等待元素出现
    result = controller.wait_element_appear("结果列表")
    if result:
        print("找到了结果列表")
    
    print("Airtest示例执行完成")

if __name__ == "__main__":
    airtest_example()

1.3 uiautomator2 - Python专属轻量级工具

uiautomator2是基于Android uiautomator框架的Python封装,专为Python开发者设计。

uiautomator2特点:

  • Python原生:纯Python接口,易于使用
  • 轻量级:相比Appium更轻量,启动更快
  • 功能丰富:支持触摸、滑动、截图、文本输入等
  • 稳定性好:基于Android官方uiautomator框架

uiautomator2安装与初始化:

# 安装uiautomator2
pip install uiautomator2

# 初始化设备(首次使用需要)
import uiautomator2 as u2
d = u2.connect()  # 连接默认设备
d.app_install("com.example.app")  # 安装应用
d.healthcheck()  # 检查ATX Agent状态

uiautomator2核心功能示例:

import uiautomator2 as u2
import time
import cv2
import numpy as np
from typing import Dict, List, Optional, Tuple
import re

class Uiautomator2Controller:
    """uiautomator2控制器类"""
    
    def __init__(self, device_id: str = None):
        self.device_id = device_id
        self.device = None
        self.connect_device()
    
    def connect_device(self):
        """连接设备"""
        try:
            if self.device_id:
                self.device = u2.connect(self.device_id)
            else:
                self.device = u2.connect()  # 连接默认设备
            
            if self.device.alive:
                print(f"✅ uiautomator2连接成功: {self.device.serial}")
                # 启动ATX Agent健康检查
                self.device.healthcheck()
                return True
            else:
                print("❌ uiautomator2连接失败")
                return False
        except Exception as e:
            print(f"❌ uiautomator2连接异常: {e}")
            return False
    
    def get_device_info(self) -> Dict:
        """获取设备信息"""
        try:
            info = self.device.info
            return {
                'serial': self.device.serial,
                'productName': info.get('productName', ''),
                'brand': info.get('brand', ''),
                'manufacturer': info.get('manufacturer', ''),
                'displayWidth': info.get('displayWidth', 0),
                'displayHeight': info.get('displayHeight', 0),
                'sdkInt': info.get('sdkInt', 0),
                'currentPackageName': self.device.app_current().get('package', ''),
                'rotation': info.get('rotation', 0)
            }
        except Exception as e:
            print(f"❌ 获取设备信息失败: {e}")
            return {}
    
    def start_app(self, package_name: str, activity: str = None) -> bool:
        """启动应用"""
        try:
            if activity:
                self.device.app_start(package_name, activity)
            else:
                self.device.app_start(package_name)
            time.sleep(2)  # 等待应用启动
            print(f"✅ 应用启动成功: {package_name}")
            return True
        except Exception as e:
            print(f"❌ 应用启动失败: {e}")
            return False
    
    def stop_app(self, package_name: str) -> bool:
        """停止应用"""
        try:
            self.device.app_stop(package_name)
            print(f"✅ 应用停止成功: {package_name}")
            return True
        except Exception as e:
            print(f"❌ 应用停止失败: {e}")
            return False
    
    def find_element_by_text(self, text: str, exact_match: bool = True) -> Optional[u2.UiObject]:
        """通过文本查找元素"""
        try:
            if exact_match:
                element = self.device(text=text)
            else:
                element = self.device(textMatches=f".*{text}.*")
            
            if element.exists:
                print(f"✅ 找到文本元素: {text}")
                return element
            else:
                print(f"❌ 未找到文本元素: {text}")
                return None
        except Exception as e:
            print(f"❌ 查找文本元素失败: {e}")
            return None
    
    def find_element_by_id(self, resource_id: str) -> Optional[u2.UiObject]:
        """通过ID查找元素"""
        try:
            element = self.device(resourceId=resource_id)
            if element.exists:
                print(f"✅ 找到ID元素: {resource_id}")
                return element
            else:
                print(f"❌ 未找到ID元素: {resource_id}")
                return None
        except Exception as e:
            print(f"❌ 查找ID元素失败: {e}")
            return None
    
    def find_element_by_class(self, class_name: str) -> Optional[u2.UiObject]:
        """通过类名查找元素"""
        try:
            element = self.device(className=class_name)
            if element.exists:
                print(f"✅ 找到类名元素: {class_name}")
                return element
            else:
                print(f"❌ 未找到类名元素: {class_name}")
                return None
        except Exception as e:
            print(f"❌ 查找类名元素失败: {e}")
            return None
    
    def find_elements_by_selector(self, **kwargs) -> List[u2.UiObject]:
        """通过选择器查找多个元素"""
        try:
            elements = self.device(**kwargs).all()
            count = len(elements)
            print(f"✅ 找到 {count} 个元素")
            return elements
        except Exception as e:
            print(f"❌ 查找多个元素失败: {e}")
            return []
    
    def click_element(self, element: u2.UiObject) -> bool:
        """点击元素"""
        try:
            element.click()
            print("✅ 元素点击成功")
            return True
        except Exception as e:
            print(f"❌ 元素点击失败: {e}")
            return False
    
    def long_click_element(self, element: u2.UiObject) -> bool:
        """长按元素"""
        try:
            element.long_click()
            print("✅ 元素长按成功")
            return True
        except Exception as e:
            print(f"❌ 元素长按失败: {e}")
            return False
    
    def double_click_element(self, element: u2.UiObject) -> bool:
        """双击元素"""
        try:
            element.double_click()
            print("✅ 元素双击成功")
            return True
        except Exception as e:
            print(f"❌ 元素双击失败: {e}")
            return False
    
    def input_text(self, element: u2.UiObject, text: str) -> bool:
        """在元素中输入文本"""
        try:
            element.set_text(text)
            print(f"✅ 文本输入成功: {text}")
            return True
        except Exception as e:
            print(f"❌ 文本输入失败: {e}")
            return False
    
    def swipe_up(self, duration: float = 0.5) -> bool:
        """向上滑动"""
        try:
            self.device.swipe_ext("up", scale=0.5, duration=duration)
            print("✅ 向上滑动成功")
            return True
        except Exception as e:
            print(f"❌ 向上滑动失败: {e}")
            return False
    
    def swipe_down(self, duration: float = 0.5) -> bool:
        """向下滑动"""
        try:
            self.device.swipe_ext("down", scale=0.5, duration=duration)
            print("✅ 向下滑动成功")
            return True
        except Exception as e:
            print(f"❌ 向下滑动失败: {e}")
            return False
    
    def swipe_left(self, duration: float = 0.5) -> bool:
        """向左滑动"""
        try:
            self.device.swipe_ext("left", scale=0.5, duration=duration)
            print("✅ 向左滑动成功")
            return True
        except Exception as e:
            print(f"❌ 向左滑动失败: {e}")
            return False
    
    def swipe_right(self, duration: float = 0.5) -> bool:
        """向右滑动"""
        try:
            self.device.swipe_ext("right", scale=0.5, duration=duration)
            print("✅ 向右滑动成功")
            return True
        except Exception as e:
            print(f"❌ 向右滑动失败: {e}")
            return False
    
    def swipe_to_find_element(self, text: str, direction: str = "down", max_swipes: int = 10) -> Optional[u2.UiObject]:
        """滑动查找元素"""
        for i in range(max_swipes):
            element = self.find_element_by_text(text)
            if element:
                return element
            
            if direction == "down":
                self.swipe_down()
            elif direction == "up":
                self.swipe_up()
            elif direction == "left":
                self.swipe_left()
            elif direction == "right":
                self.swipe_right()
            
            time.sleep(0.5)  # 等待滑动完成
        
        print(f"❌ 在 {max_swipes} 次滑动后未找到元素: {text}")
        return None
    
    def get_window_size(self) -> Tuple[int, int]:
        """获取屏幕尺寸"""
        return self.device.window_size()
    
    def take_screenshot(self, filename: str = None) -> str:
        """截屏"""
        if not filename:
            filename = f"screenshot_{int(time.time())}.png"
        try:
            self.device.screenshot(filename)
            print(f"✅ 截屏保存: {filename}")
            return filename
        except Exception as e:
            print(f"❌ 截屏失败: {e}")
            return ""
    
    def get_page_xml(self) -> str:
        """获取页面XML结构"""
        try:
            xml_content = self.device.dump_hierarchy()
            print("✅ 页面XML获取成功")
            return xml_content
        except Exception as e:
            print(f"❌ 页面XML获取失败: {e}")
            return ""
    
    def wait_until_element_appears(self, element_selector: Dict, timeout: int = 10) -> bool:
        """等待元素出现"""
        try:
            element = self.device(**element_selector)
            result = element.wait(timeout=timeout)
            if result:
                print("✅ 元素出现")
                return True
            else:
                print(f"❌ 元素在 {timeout} 秒内未出现")
                return False
        except Exception as e:
            print(f"❌ 等待元素出现失败: {e}")
            return False
    
    def wait_until_element_disappears(self, element_selector: Dict, timeout: int = 10) -> bool:
        """等待元素消失"""
        try:
            element = self.device(**element_selector)
            result = element.wait_gone(timeout=timeout)
            if result:
                print("✅ 元素消失")
                return True
            else:
                print(f"❌ 元素在 {timeout} 秒内未消失")
                return False
        except Exception as e:
            print(f"❌ 等待元素消失失败: {e}")
            return False
    
    def press_key(self, key: str) -> bool:
        """按键操作"""
        try:
            self.device.press(key)
            print(f"✅ 按键操作成功: {key}")
            return True
        except Exception as e:
            print(f"❌ 按键操作失败: {e}")
            return False
    
    def back(self) -> bool:
        """返回键"""
        return self.press_key("back")
    
    def home(self) -> bool:
        """Home键"""
        return self.press_key("home")
    
    def recent(self) -> bool:
        """最近任务键"""
        return self.press_key("recent")
    
    def sleep(self) -> bool:
        """睡眠键"""
        return self.press_key("sleep")
    
    def wake_up(self) -> bool:
        """唤醒设备"""
        try:
            self.device.wake_up()
            print("✅ 设备唤醒成功")
            return True
        except Exception as e:
            print(f"❌ 设备唤醒失败: {e}")
            return False
    
    def unlock(self) -> bool:
        """解锁设备"""
        try:
            self.device.unlock()
            print("✅ 设备解锁成功")
            return True
        except Exception as e:
            print(f"❌ 设备解锁失败: {e}")
            return False
    
    def freeze_rotation(self) -> bool:
        """冻结旋转"""
        try:
            self.device.freeze_rotation()
            print("✅ 旋转已冻结")
            return True
        except Exception as e:
            print(f"❌ 冻结旋转失败: {e}")
            return False
    
    def unfreeze_rotation(self) -> bool:
        """解除旋转冻结"""
        try:
            self.device.unfreeze_rotation()
            print("✅ 旋转已解冻")
            return True
        except Exception as e:
            print(f"❌ 解除旋转冻结失败: {e}")
            return False
    
    def get_clipboard(self) -> str:
        """获取剪贴板内容"""
        try:
            content = self.device.clipboard
            print("✅ 剪贴板内容获取成功")
            return content
        except Exception as e:
            print(f"❌ 剪贴板内容获取失败: {e}")
            return ""
    
    def set_clipboard(self, text: str) -> bool:
        """设置剪贴板内容"""
        try:
            self.device.set_clipboard(text)
            print(f"✅ 剪贴板内容设置成功: {text}")
            return True
        except Exception as e:
            print(f"❌ 剪贴板内容设置失败: {e}")
            return False

# uiautomator2使用示例
def u2_example():
    """uiautomator2使用示例"""
    controller = Uiautomator2Controller()
    
    if controller.connect_device():
        # 获取设备信息
        device_info = controller.get_device_info()
        print(f"设备信息: {device_info}")
        
        # 截屏
        controller.take_screenshot("test_screenshot.png")
        
        # 模拟一些操作
        controller.home()
        time.sleep(1)
        controller.wake_up()
        
        print("uiautomator2示例执行完成")
    else:
        print("无法连接到设备")

if __name__ == "__main__":
    u2_example()

2. UI元素定位技术详解

2.1 元素定位策略

在UI自动化中,准确的元素定位是关键。不同工具提供了多种定位方式:

Appium元素定位方式:

# Appium元素定位示例
from appium.webdriver.common.appiumby import AppiumBy

class ElementLocator:
    """元素定位器"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def locate_by_id(self, locator):
        """通过ID定位"""
        return self.driver.find_element(AppiumBy.ID, locator)
    
    def locate_by_xpath(self, locator):
        """通过XPath定位"""
        return self.driver.find_element(AppiumBy.XPATH, locator)
    
    def locate_by_accessibility_id(self, locator):
        """通过无障碍ID定位(iOS/Android)"""
        return self.driver.find_element(AppiumBy.ACCESSIBILITY_ID, locator)
    
    def locate_by_android_uiautomator(self, locator):
        """通过Android UI Automator定位"""
        return self.driver.find_element(AppiumBy.ANDROID_UIAUTOMATOR, locator)
    
    def locate_by_ios_predicate(self, locator):
        """通过iOS Predicate定位"""
        return self.driver.find_element(AppiumBy.IOS_PREDICATE, locator)
    
    def locate_by_ios_class_chain(self, locator):
        """通过iOS Class Chain定位"""
        return self.driver.find_element(AppiumBy.IOS_CLASS_CHAIN, locator)
    
    def locate_by_class_name(self, locator):
        """通过类名定位"""
        return self.driver.find_element(AppiumBy.CLASS_NAME, locator)
    
    def locate_by_css_selector(self, locator):
        """通过CSS选择器定位"""
        return self.driver.find_element(AppiumBy.CSS_SELECTOR, locator)
    
    def locate_by_name(self, locator):
        """通过名称定位"""
        return self.driver.find_element(AppiumBy.NAME, locator)
    
    def locate_by_link_text(self, locator):
        """通过链接文本定位"""
        return self.driver.find_element(AppiumBy.LINK_TEXT, locator)

uiautomator2元素定位方式:

# uiautomator2元素定位示例
class U2ElementLocator:
    """uiautomator2元素定位器"""
    
    def __init__(self, device):
        self.device = device
    
    def locate_by_text(self, text, exact_match=True):
        """通过文本定位"""
        if exact_match:
            return self.device(text=text)
        else:
            return self.device(textMatches=f".*{text}.*")
    
    def locate_by_resource_id(self, resource_id):
        """通过资源ID定位"""
        return self.device(resourceId=resource_id)
    
    def locate_by_class_name(self, class_name):
        """通过类名定位"""
        return self.device(className=class_name)
    
    def locate_by_description(self, description):
        """通过描述定位"""
        return self.device(description=description)
    
    def locate_by_content_desc(self, content_desc):
        """通过内容描述定位"""
        return self.device(descriptionMatches=f".*{content_desc}.*")
    
    def locate_by_index(self, index):
        """通过索引定位"""
        return self.device(index=index)
    
    def locate_by_instance(self, instance):
        """通过实例定位"""
        return self.device(instance=instance)
    
    def locate_by_bounds(self, left, top, right, bottom):
        """通过边界定位"""
        return self.device(bounds={"left": left, "top": top, "right": right, "bottom": bottom})
    
    def locate_by_selected(self, selected=True):
        """通过选中状态定位"""
        return self.device(selected=selected)
    
    def locate_by_scrollable(self, scrollable=True):
        """通过可滚动状态定位"""
        return self.device(scrollable=scrollable)
    
    def locate_by_clickable(self, clickable=True):
        """通过可点击状态定位"""
        return self.device(clickable=clickable)
    
    def locate_by_long_clickable(self, long_clickable=True):
        """通过可长按状态定位"""
        return self.device(longClickable=long_clickable)
    
    def locate_by_enabled(self, enabled=True):
        """通过启用状态定位"""
        return self.device(enabled=enabled)
    
    def locate_by_focused(self, focused=True):
        """通过焦点状态定位"""
        return self.device(focused=focused)
    
    def locate_by_checkable(self, checkable=True):
        """通过可勾选状态定位"""
        return self.device(checkable=checkable)
    
    def locate_by_checked(self, checked=True):
        """通过勾选状态定位"""
        return self.device(checked=checked)
    
    def locate_by_child_selector(self, child_selector):
        """通过子元素选择器定位"""
        return self.device(childSelector=child_selector)
    
    def locate_by_sibling_selector(self, sibling_selector):
        """通过兄弟元素选择器定位"""
        return self.device(siblingSelector=sibling_selector)
    
    def locate_by_multiple_conditions(self, **conditions):
        """通过多个条件定位"""
        return self.device(**conditions)

2.2 图像识别定位技术

图像识别是Airtest的核心技术,特别适用于无法通过UI树定位的场景。

图像识别原理:

import cv2
import numpy as np
from typing import Tuple, List, Optional

class ImageMatcher:
    """图像匹配器"""
    
    def __init__(self):
        pass
    
    def template_match(self, screenshot: np.ndarray, template: np.ndarray, 
                      threshold: float = 0.8) -> List[Tuple[int, int, float]]:
        """模板匹配"""
        # 转换为灰度图
        screenshot_gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY)
        template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
        
        # 执行模板匹配
        result = cv2.matchTemplate(screenshot_gray, template_gray, cv2.TM_CCOEFF_NORMED)
        
        # 找到匹配位置
        locations = np.where(result >= threshold)
        matches = []
        
        for pt in zip(*locations[::-1]):  # x, y
            confidence = float(result[pt[1], pt[0]])
            matches.append((pt[0], pt[1], confidence))
        
        return matches
    
    def multi_scale_template_match(self, screenshot: np.ndarray, template: np.ndarray,
                                  scales: List[float] = None, threshold: float = 0.7) -> List[Tuple[int, int, float, float]]:
        """多尺度模板匹配"""
        if scales is None:
            scales = [0.8, 0.9, 1.0, 1.1, 1.2]
        
        all_matches = []
        
        for scale in scales:
            # 缩放模板
            h, w = template.shape[:2]
            new_w, new_h = int(w * scale), int(h * scale)
            scaled_template = cv2.resize(template, (new_w, new_h))
            
            # 执行匹配
            matches = self.template_match(screenshot, scaled_template, threshold)
            
            # 调整坐标到原始尺寸
            scaled_matches = [(int(x/scale), int(y/scale), conf, scale) for x, y, conf in matches]
            all_matches.extend(scaled_matches)
        
        return all_matches
    
    def feature_match(self, screenshot: np.ndarray, template: np.ndarray,
                     detector_type: str = 'SIFT', threshold: float = 0.7) -> List[Tuple[int, int, float]]:
        """特征点匹配"""
        # 使用SIFT或ORB特征检测
        if detector_type == 'SIFT':
            detector = cv2.SIFT_create()
        elif detector_type == 'ORB':
            detector = cv2.ORB_create()
        else:
            raise ValueError("Unsupported detector type")
        
        # 检测特征点
        kp1, des1 = detector.detectAndCompute(template, None)
        kp2, des2 = detector.detectAndCompute(screenshot, None)
        
        if des1 is None or des2 is None:
            return []
        
        # 特征匹配
        bf = cv2.BFMatcher()
        matches = bf.knnMatch(des1, des2, k=2)
        
        # 应用 Lowe's ratio test
        good_matches = []
        for m, n in matches:
            if m.distance < threshold * n.distance:
                good_matches.append(m)
        
        # 计算匹配位置
        if len(good_matches) >= 4:
            src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
            dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
            
            # 计算变换矩阵
            M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
            
            if M is not None:
                # 获取模板在截图中的位置
                h, w = template.shape[:2]
                pts = np.float32([[0, 0], [0, h-1], [w-1, h-1], [w-1, 0]]).reshape(-1, 1, 2)
                dst = cv2.perspectiveTransform(pts, M)
                
                # 计算中心点
                center_x = int(np.mean(dst[:, 0, 0]))
                center_y = int(np.mean(dst[:, 0, 1]))
                
                return [(center_x, center_y, len(good_matches) / len(matches))]
        
        return []

# 图像识别在自动化中的应用
class VisionBasedAutomation:
    """基于视觉的自动化"""
    
    def __init__(self):
        self.matcher = ImageMatcher()
    
    def find_element_by_image(self, screenshot_path: str, template_path: str, 
                            threshold: float = 0.8) -> Optional[Tuple[int, int]]:
        """通过图像查找元素位置"""
        screenshot = cv2.imread(screenshot_path)
        template = cv2.imread(template_path)
        
        if screenshot is None or template is None:
            return None
        
        matches = self.matcher.template_match(screenshot, template, threshold)
        
        if matches:
            # 返回最佳匹配
            best_match = max(matches, key=lambda x: x[2])  # 按置信度排序
            return (best_match[0], best_match[1])
        
        return None
    
    def find_multiple_elements_by_image(self, screenshot_path: str, template_path: str,
                                      threshold: float = 0.7) -> List[Tuple[int, int]]:
        """查找多个相同元素"""
        screenshot = cv2.imread(screenshot_path)
        template = cv2.imread(template_path)
        
        if screenshot is None or template is None:
            return []
        
        matches = self.matcher.template_match(screenshot, template, threshold)
        
        # 过滤重叠的匹配(非极大值抑制)
        filtered_matches = self.non_max_suppression(matches, overlap_threshold=0.3)
        
        return [(x, y) for x, y, conf in filtered_matches]
    
    def non_max_suppression(self, matches: List[Tuple[int, int, float]], 
                          overlap_threshold: float = 0.3) -> List[Tuple[int, int, float]]:
        """非极大值抑制,去除重叠的检测框"""
        if not matches:
            return []
        
        # 按置信度排序
        matches = sorted(matches, key=lambda x: x[2], reverse=True)
        
        keep = []
        while matches:
            # 保留置信度最高的
            best = matches.pop(0)
            keep.append(best)
            
            # 计算与其他检测框的重叠度
            remaining = []
            for match in matches:
                # 简单的欧氏距离判断重叠
                distance = np.sqrt((best[0] - match[0])**2 + (best[1] - match[1])**2)
                # 假设模板大小为100x100像素
                min_dim = min(abs(best[0] - match[0]), abs(best[1] - match[1]))
                if distance > 50:  # 避免距离过近的重复检测
                    remaining.append(match)
            
            matches = remaining
        
        return keep

def vision_automation_example():
    """视觉自动化示例"""
    automation = VisionBasedAutomation()
    
    # 示例:查找点赞按钮
    # position = automation.find_element_by_image("screenshot.png", "like_button_template.png")
    # if position:
    #     print(f"找到点赞按钮位置: {position}")
    #     # 点击该位置
    #     # controller.tap(position[0], position[1])
    
    print("视觉自动化示例完成")

if __name__ == "__main__":
    vision_automation_example()

3. 手势操作与交互技术

3.1 基础手势操作

import time
from typing import List, Tuple
import random

class GestureController:
    """手势控制器"""
    
    def __init__(self, device_controller):
        self.device = device_controller
    
    def tap(self, x: int, y: int, duration: float = 0.1) -> bool:
        """点击操作"""
        try:
            # 添加随机偏移增加真实性
            offset_x = random.randint(-2, 2)
            offset_y = random.randint(-2, 2)
            real_x = max(0, x + offset_x)
            real_y = max(0, y + offset_y)
            
            self.device.touch.down(real_x, real_y)
            time.sleep(duration)
            self.device.touch.up(real_x, real_y)
            print(f"✅ 点击操作成功: ({real_x}, {real_y})")
            return True
        except Exception as e:
            print(f"❌ 点击操作失败: {e}")
            return False
    
    def long_press(self, x: int, y: int, duration: float = 1.0) -> bool:
        """长按操作"""
        try:
            self.device.touch.down(x, y)
            time.sleep(duration)
            self.device.touch.up(x, y)
            print(f"✅ 长按操作成功: ({x}, {y}), 时长: {duration}s")
            return True
        except Exception as e:
            print(f"❌ 长按操作失败: {e}")
            return False
    
    def swipe(self, start_x: int, start_y: int, end_x: int, end_y: int, 
              duration: float = 0.5, steps: int = 5) -> bool:
        """滑动操作"""
        try:
            # 分步滑动,模拟真实滑动轨迹
            dx = (end_x - start_x) / steps
            dy = (end_y - start_y) / steps
            step_duration = duration / steps
            
            self.device.touch.down(start_x, start_y)
            time.sleep(step_duration * 0.1)  # 初始停顿
            
            for i in range(1, steps + 1):
                current_x = int(start_x + dx * i)
                current_y = int(start_y + dy * i)
                
                # 添加轻微的随机偏移,模拟人手滑动的不规则性
                offset_x = random.randint(-1, 1)
                offset_y = random.randint(-1, 1)
                
                self.device.touch.move(current_x + offset_x, current_y + offset_y)
                time.sleep(step_duration)
            
            self.device.touch.up(end_x, end_y)
            print(f"✅ 滑动操作成功: ({start_x}, {start_y}) -> ({end_x}, {end_y})")
            return True
        except Exception as e:
            print(f"❌ 滑动操作失败: {e}")
            return False
    
    def flick(self, start_x: int, start_y: int, end_x: int, end_y: int, 
              duration: float = 0.1) -> bool:
        """快速滑动(轻拂)"""
        try:
            self.device.touch.down(start_x, start_y)
            time.sleep(0.05)  # 短暂停顿
            self.device.touch.move(end_x, end_y)
            time.sleep(duration)
            self.device.touch.up(end_x, end_y)
            print(f"✅ 轻拂操作成功: ({start_x}, {start_y}) -> ({end_x}, {end_y})")
            return True
        except Exception as e:
            print(f"❌ 轻拂操作失败: {e}")
            return False
    
    def pinch(self, center_x: int, center_y: int, 
              start_radius: int, end_radius: int, duration: float = 1.0) -> bool:
        """双指缩放(捏合/展开)"""
        try:
            # 计算两个手指的起始位置
            start_x1 = center_x - start_radius
            start_y1 = center_y
            start_x2 = center_x + start_radius
            start_y2 = center_y
            
            end_x1 = center_x - end_radius
            end_y1 = center_y
            end_x2 = center_x + end_radius
            end_y2 = center_y
            
            # 按下两个点
            self.device.touch.down(start_x1, start_y1)
            self.device.touch.down(start_x2, start_y2)
            time.sleep(0.1)
            
            # 同时移动两个点
            steps = 10
            for i in range(steps):
                t = i / (steps - 1)
                curr_x1 = int(start_x1 + (end_x1 - start_x1) * t)
                curr_y1 = int(start_y1 + (end_y1 - start_y1) * t)
                curr_x2 = int(start_x2 + (end_x2 - start_x2) * t)
                curr_y2 = int(start_y2 + (end_y2 - start_y2) * t)
                
                self.device.touch.move(curr_x1, curr_y1, 0)  # 第一个手指
                self.device.touch.move(curr_x2, curr_y2, 1)  # 第二个手指
                time.sleep(duration / steps)
            
            # 抬起两个点
            self.device.touch.up(end_x1, end_y1, 0)
            self.device.touch.up(end_x2, end_y2, 1)
            
            action = "缩小" if end_radius < start_radius else "放大"
            print(f"✅ 缩放操作成功: {action} {start_radius}px -> {end_radius}px")
            return True
        except Exception as e:
            print(f"❌ 缩放操作失败: {e}")
            return False
    
    def drag(self, start_x: int, start_y: int, end_x: int, end_y: int,
             duration: float = 1.0, press_duration: float = 0.5) -> bool:
        """拖拽操作"""
        try:
            self.device.touch.down(start_x, start_y)
            time.sleep(press_duration)  # 按住一段时间
            
            # 平滑移动到目标位置
            steps = int(duration / 0.05)  # 每0.05秒一步
            for i in range(steps):
                t = i / (steps - 1) if steps > 1 else 1
                curr_x = int(start_x + (end_x - start_x) * t)
                curr_y = int(start_y + (end_y - start_y) * t)
                
                self.device.touch.move(curr_x, curr_y)
                time.sleep(duration / steps)
            
            self.device.touch.up(end_x, end_y)
            print(f"✅ 拖拽操作成功: ({start_x}, {start_y}) -> ({end_x}, {end_y})")
            return True
        except Exception as e:
            print(f"❌ 拖拽操作失败: {e}")
            return False
    
    def multi_touch_tap(self, positions: List[Tuple[int, int]]) -> bool:
        """多点触控点击"""
        try:
            # 同时按下所有点
            for i, (x, y) in enumerate(positions):
                self.device.touch.down(x, y, i)  # 使用不同的pointer id
                time.sleep(0.05)  # 错开按下时间
            
            time.sleep(0.1)  # 保持按下状态
            
            # 同时抬起所有点
            for i, (x, y) in enumerate(positions):
                self.device.touch.up(x, y, i)
            
            print(f"✅ 多点触控点击成功: {len(positions)}个点")
            return True
        except Exception as e:
            print(f"❌ 多点触控点击失败: {e}")
            return False
    
    def draw_pattern(self, points: List[Tuple[int, int]], 
                     duration: float = 2.0) -> bool:
        """绘制图案"""
        try:
            if len(points) < 2:
                print("❌ 点数不足,无法绘制图案")
                return False
            
            start_x, start_y = points[0]
            self.device.touch.down(start_x, start_y)
            time.sleep(0.1)
            
            steps = len(points) - 1
            step_duration = duration / steps if steps > 0 else duration
            
            for i in range(1, len(points)):
                end_x, end_y = points[i]
                
                # 在两点间平滑移动
                segment_steps = 5  # 每段分5步
                dx = (end_x - points[i-1][0]) / segment_steps
                dy = (end_y - points[i-1][1]) / segment_steps
                
                for j in range(segment_steps):
                    curr_x = int(points[i-1][0] + dx * (j + 1))
                    curr_y = int(points[i-1][1] + dy * (j + 1))
                    
                    self.device.touch.move(curr_x, curr_y)
                    time.sleep(step_duration / segment_steps)
            
            self.device.touch.up(points[-1][0], points[-1][1])
            print(f"✅ 图案绘制成功: {len(points)}个点")
            return True
        except Exception as e:
            print(f"❌ 图案绘制失败: {e}")
            return False

# 手势操作示例
def gesture_example():
    """手势操作示例"""
    # 这里需要一个实际的设备控制器
    print("手势操作示例完成")
    print("实际使用时需要连接真实设备")

3.2 高级交互技术

class AdvancedInteraction:
    """高级交互技术"""
    
    def __init__(self, device_controller):
        self.device = device_controller
        self.gesture_controller = GestureController(device_controller)
    
    def simulate_human_like_behavior(self, base_delay: float = 1.0, 
                                   variation: float = 0.5) -> float:
        """模拟人类行为延时"""
        delay = base_delay + random.uniform(-variation, variation)
        # 添加一些不规律性
        if random.random() < 0.1:  # 10%概率添加额外延时
            delay += random.uniform(0.5, 2.0)
        time.sleep(max(0.1, delay))  # 确保至少0.1秒延时
        return delay
    
    def scroll_with_content_detection(self, direction: str = "down", 
                                    max_scrolls: int = 10) -> List[dict]:
        """滚动并检测内容变化"""
        content_changes = []
        previous_content = self.device.dump_hierarchy()
        
        for i in range(max_scrolls):
            if direction == "down":
                self.gesture_controller.swipe(500, 1500, 500, 500, duration=0.8)
            elif direction == "up":
                self.gesture_controller.swipe(500, 500, 500, 1500, duration=0.8)
            elif direction == "left":
                self.gesture_controller.swipe(1000, 1000, 200, 1000, duration=0.8)
            elif direction == "right":
                self.gesture_controller.swipe(200, 1000, 1000, 1000, duration=0.8)
            
            # 等待内容加载
            self.simulate_human_like_behavior(1.0, 0.3)
            
            current_content = self.device.dump_hierarchy()
            
            # 检测内容是否发生变化
            if current_content != previous_content:
                change_info = {
                    'scroll_number': i + 1,
                    'content_changed': True,
                    'timestamp': time.time()
                }
                content_changes.append(change_info)
                previous_content = current_content
            else:
                # 如果内容没有变化,可能已经到达底部
                change_info = {
                    'scroll_number': i + 1,
                    'content_changed': False,
                    'timestamp': time.time()
                }
                content_changes.append(change_info)
                if i > 3:  # 如果连续几次都没有变化,认为到底了
                    break
        
        return content_changes
    
    def adaptive_element_interaction(self, element_locator, action: str = "click",
                                   max_attempts: int = 3) -> bool:
        """自适应元素交互"""
        for attempt in range(max_attempts):
            try:
                # 尝试定位元素
                element = self.locate_element_with_fallback(element_locator)
                if element and element.exists:
                    # 根据元素类型选择合适的交互方式
                    element_bounds = element.bounds
                    center_x = (element_bounds['left'] + element_bounds['right']) // 2
                    center_y = (element_bounds['top'] + element_bounds['bottom']) // 2
                    
                    if action == "click":
                        success = self.gesture_controller.tap(center_x, center_y)
                    elif action == "long_press":
                        success = self.gesture_controller.long_press(center_x, center_y, 1.0)
                    elif action == "double_click":
                        success = (self.gesture_controller.tap(center_x, center_y, 0.1) and
                                 self.simulate_human_like_behavior(0.2, 0.1) and
                                 self.gesture_controller.tap(center_x, center_y, 0.1))
                    
                    if success:
                        print(f"✅ 第{attempt + 1}次尝试成功执行{action}")
                        return True
                    else:
                        print(f"⚠️  第{attempt + 1}次尝试交互失败")
                
                # 如果元素不存在或交互失败,等待后重试
                self.simulate_human_like_behavior(2.0, 0.5)
                
            except Exception as e:
                print(f"❌ 第{attempt + 1}次尝试异常: {e}")
                if attempt < max_attempts - 1:
                    self.simulate_human_like_behavior(3.0, 1.0)
        
        print(f"❌ 经过{max_attempts}次尝试仍未能成功交互")
        return False
    
    def locate_element_with_fallback(self, locator):
        """多策略元素定位"""
        strategies = [
            lambda: self.device(**locator),  # 直接使用传入的定位器
            lambda: self.device(**{k: v for k, v in locator.items() if k != 'description'}, descriptionMatches=f".*{locator.get('text', locator.get('resourceId', ''))}.*") if any(k in locator for k in ['text', 'resourceId']) else None,
            lambda: self.device(textMatches=f".*{locator.get('text', '')}.*") if 'text' in locator else None,
            lambda: self.device(resourceIdMatches=f".*{locator.get('resourceId', '')}.*") if 'resourceId' in locator else None,
        ]
        
        for strategy in strategies:
            try:
                element = strategy()
                if element and element.exists:
                    return element
            except:
                continue
        
        return None
    
    def batch_operation_with_error_recovery(self, operations: List[dict]) -> dict:
        """批量操作与错误恢复"""
        results = {
            'successful_operations': [],
            'failed_operations': [],
            'recovered_operations': [],
            'total_operations': len(operations)
        }
        
        for i, operation in enumerate(operations):
            try:
                op_result = self.execute_single_operation(operation)
                if op_result['success']:
                    results['successful_operations'].append({
                        'operation_index': i,
                        'operation_type': operation['type'],
                        'details': op_result.get('details', {})
                    })
                else:
                    # 操作失败,尝试恢复
                    recovery_result = self.attempt_operation_recovery(operation, op_result)
                    if recovery_result['success']:
                        results['recovered_operations'].append({
                            'operation_index': i,
                            'operation_type': operation['type'],
                            'recovery_method': recovery_result.get('method', 'unknown')
                        })
                    else:
                        results['failed_operations'].append({
                            'operation_index': i,
                            'operation_type': operation['type'],
                            'error': recovery_result.get('error', 'Unknown error')
                        })
                
                # 操作间隔
                self.simulate_human_like_behavior(1.0, 0.3)
                
            except Exception as e:
                results['failed_operations'].append({
                    'operation_index': i,
                    'operation_type': operation.get('type', 'unknown'),
                    'error': str(e)
                })
        
        return results
    
    def execute_single_operation(self, operation: dict) -> dict:
        """执行单个操作"""
        op_type = operation.get('type', '')
        try:
            if op_type == 'click':
                element_locator = operation['locator']
                element = self.locate_element_with_fallback(element_locator)
                if element and element.exists:
                    bounds = element.bounds
                    center_x = (bounds['left'] + bounds['right']) // 2
                    center_y = (bounds['top'] + bounds['bottom']) // 2
                    success = self.gesture_controller.tap(center_x, center_y)
                    return {'success': success, 'details': {'element_found': True}}
                else:
                    return {'success': False, 'details': {'element_found': False}}
            
            elif op_type == 'swipe':
                direction = operation['direction']
                distance = operation.get('distance', 1000)
                duration = operation.get('duration', 0.8)
                
                screen_width, screen_height = self.device.window_size()
                
                if direction == 'up':
                    success = self.gesture_controller.swipe(
                        screen_width//2, screen_height*3//4,
                        screen_width//2, screen_height//4,
                        duration=duration
                    )
                elif direction == 'down':
                    success = self.gesture_controller.swipe(
                        screen_width//2, screen_height//4,
                        screen_width//2, screen_height*3//4,
                        duration=duration
                    )
                elif direction == 'left':
                    success = self.gesture_controller.swipe(
                        screen_width*3//4, screen_height//2,
                        screen_width//4, screen_height//2,
                        duration=duration
                    )
                elif direction == 'right':
                    success = self.gesture_controller.swipe(
                        screen_width//4, screen_height//2,
                        screen_width*3//4, screen_height//2,
                        duration=duration
                    )
                else:
                    return {'success': False, 'details': {'invalid_direction': direction}}
                
                return {'success': success, 'details': {'swipe_direction': direction}}
            
            elif op_type == 'input_text':
                element_locator = operation['locator']
                text = operation['text']
                
                element = self.locate_element_with_fallback(element_locator)
                if element and element.exists:
                    element.set_text(text)
                    return {'success': True, 'details': {'text_entered': text}}
                else:
                    return {'success': False, 'details': {'element_found': False}}
            
            else:
                return {'success': False, 'details': {'unsupported_operation': op_type}}
        
        except Exception as e:
            return {'success': False, 'details': {'exception': str(e)}}
    
    def attempt_operation_recovery(self, operation: dict, failure_result: dict) -> dict:
        """尝试操作恢复"""
        op_type = operation.get('type', '')
        
        try:
            # 尝试不同的恢复策略
            if op_type in ['click', 'input_text']:
                # 策略1: 等待一段时间后重试
                time.sleep(3)
                retry_result = self.execute_single_operation(operation)
                if retry_result['success']:
                    return {'success': True, 'method': 'retry_after_wait'}
                
                # 策略2: 检查应用是否还在前台
                current_app = self.device.app_current()
                target_app = operation.get('expected_app_package')
                if target_app and current_app.get('package') != target_app:
                    # 重新启动应用
                    self.device.app_start(target_app)
                    time.sleep(3)
                    retry_result = self.execute_single_operation(operation)
                    if retry_result['success']:
                        return {'success': True, 'method': 'restart_app_and_retry'}
            
            elif op_type == 'swipe':
                # 策略: 尝试不同的滑动参数
                original_duration = operation.get('duration', 0.8)
                for new_duration in [original_duration * 1.5, original_duration * 0.7]:
                    operation['duration'] = new_duration
                    retry_result = self.execute_single_operation(operation)
                    if retry_result['success']:
                        return {'success': True, 'method': f'adjust_duration_to_{new_duration}'}
        
            return {'success': False, 'error': 'All recovery strategies failed'}
        
        except Exception as e:
            return {'success': False, 'error': f'Recovery failed: {str(e)}'}

def advanced_interaction_example():
    """高级交互示例"""
    print("高级交互技术示例完成")
    print("实际使用时需要连接真实设备并传入设备控制器")

4. 实战项目一:抖音自动点赞与评论爬虫

4.1 项目架构设计

import asyncio
import aiofiles
from datetime import datetime
from typing import Dict, List, Optional
import json
import csv

class DouyinAutomationProject:
    """抖音自动化项目主类"""
    
    def __init__(self, device_controller, data_storage):
        self.device = device_controller
        self.storage = data_storage
        self.interaction_controller = AdvancedInteraction(device_controller)
        self.project_config = {
            'app_package': 'com.ss.android.ugc.aweme',
            'target_hashtags': ['#热门', '#推荐'],
            'daily_limit': 100,
            'like_probability': 0.3,  # 30%概率点赞
            'comment_probability': 0.1,  # 10%概率评论
            'scroll_duration': 3.0,  # 每次滑动等待时间
            'session_duration': 1800  # 单次运行时长(秒) - 30分钟
        }
        self.stats = {
            'videos_processed': 0,
            'likes_given': 0,
            'comments_made': 0,
            'errors_encountered': 0,
            'start_time': datetime.now()
        }
    
    def setup_douyin_environment(self) -> bool:
        """设置抖音环境"""
        try:
            # 检查应用是否存在
            apps = self.device.app_list()
            if self.project_config['app_package'] not in apps:
                print(f"❌ 抖音应用未安装: {self.project_config['app_package']}")
                return False
            
            # 启动抖音
            self.device.app_start(self.project_config['app_package'])
            print("✅ 抖音应用启动成功")
            
            # 等待主界面加载
            time.sleep(5)
            
            # 检查是否需要处理权限请求
            self.handle_permissions_if_needed()
            
            return True
        except Exception as e:
            print(f"❌ 设置抖音环境失败: {e}")
            return False
    
    def handle_permissions_if_needed(self):
        """处理权限请求"""
        permission_dialogs = [
            {'text': '允许', 'resourceId': 'com.android.packageinstaller:id/permission_allow_button'},
            {'text': '始终允许', 'resourceId': 'com.android.packageinstaller:id/always_check'},
            {'text': '同意', 'resourceId': 'com.ss.android.ugc.aweme:id/btn_agree'},
            {'text': '我知道了', 'resourceId': 'com.ss.android.ugc.aweme:id/tv_know'}
        ]
        
        for dialog in permission_dialogs:
            element = None
            if 'text' in dialog:
                element = self.device(text=dialog['text'])
            elif 'resourceId' in dialog:
                element = self.device(resourceId=dialog['resourceId'])
            
            if element and element.exists:
                try:
                    element.click()
                    print(f"✅ 处理权限对话框: {list(dialog.values())[0]}")
                    time.sleep(1)
                except:
                    pass
    
    async def run_automation_session(self):
        """运行自动化会话"""
        print("🚀 开始抖音自动化会话...")
        start_time = time.time()
        
        try:
            if not self.setup_douyin_environment():
                print("❌ 环境设置失败,退出")
                return
            
            while time.time() - start_time < self.project_config['session_duration']:
                await self.process_single_video()
                
                # 随机延时,模拟人类行为
                delay = random.uniform(2, 6)
                await asyncio.sleep(delay)
                
                # 检查是否达到每日限制
                if self.stats['videos_processed'] >= self.project_config['daily_limit']:
                    print(f"✅ 达到每日限制 {self.project_config['daily_limit']},停止处理")
                    break
        
        except KeyboardInterrupt:
            print("⏸️ 用户中断自动化")
        except Exception as e:
            print(f"❌ 自动化会话异常: {e}")
            self.stats['errors_encountered'] += 1
        finally:
            await self.cleanup_session()
    
    async def process_single_video(self):
        """处理单个视频"""
        try:
            # 获取当前视频信息
            video_info = await self.extract_video_info()
            
            if video_info:
                self.stats['videos_processed'] += 1
                print(f"🎬 处理视频 {self.stats['videos_processed']}: {video_info.get('desc', '无标题')[:30]}...")
                
                # 决定是否点赞
                if random.random() < self.project_config['like_probability']:
                    await self.like_current_video()
                
                # 决定是否评论
                if random.random() < self.project_config['comment_probability']:
                    await self.comment_current_video()
                
                # 滑动到下一个视频
                await self.scroll_to_next_video()
                
                # 保存视频信息
                await self.storage.save_video_info(video_info)
            
        except Exception as e:
            print(f"❌ 处理视频时出错: {e}")
            self.stats['errors_encountered'] += 1
    
    async def extract_video_info(self) -> Dict:
        """提取当前视频信息"""
        try:
            # 获取页面层次结构
            hierarchy = self.device.dump_hierarchy()
            
            # 解析视频相关信息(简化版)
            video_info = {
                'video_id': f"vid_{int(time.time())}_{random.randint(1000, 9999)}",
                'timestamp': datetime.now().isoformat(),
                'desc': self.extract_description(hierarchy),
                'author': self.extract_author(hierarchy),
                'like_count': self.extract_like_count(hierarchy),
                'comment_count': self.extract_comment_count(hierarchy),
                'share_count': self.extract_share_count(hierarchy),
                'music': self.extract_music(hierarchy),
                'location': self.extract_location(hierarchy),
                'hashtags': self.extract_hashtags(hierarchy)
            }
            
            return video_info
        except Exception as e:
            print(f"❌ 提取视频信息失败: {e}")
            return {}
    
    def extract_description(self, hierarchy: str) -> str:
        """提取视频描述"""
        # 在实际实现中,这里会解析UI层次结构来提取描述
        # 简化实现
        import re
        desc_match = re.search(r'"desc":"([^"]*)"', hierarchy)
        return desc_match.group(1) if desc_match else "无描述"
    
    def extract_author(self, hierarchy: str) -> str:
        """提取作者信息"""
        # 简化实现
        import re
        author_match = re.search(r'"author":"([^"]*)"', hierarchy)
        return author_match.group(1) if author_match else "未知作者"
    
    def extract_like_count(self, hierarchy: str) -> str:
        """提取点赞数"""
        # 简化实现
        return f"{random.randint(100, 10000)}"
    
    def extract_comment_count(self, hierarchy: str) -> str:
        """提取评论数"""
        # 简化实现
        return f"{random.randint(10, 1000)}"
    
    def extract_share_count(self, hierarchy: str) -> str:
        """提取分享数"""
        # 简化实现
        return f"{random.randint(10, 500)}"
    
    def extract_music(self, hierarchy: str) -> str:
        """提取音乐信息"""
        # 简化实现
        return "未知音乐"
    
    def extract_location(self, hierarchy: str) -> str:
        """提取位置信息"""
        # 简化实现
        return "未知位置"
    
    def extract_hashtags(self, hierarchy: str) -> List[str]:
        """提取话题标签"""
        # 简化实现
        return ["#推荐", "#热门"]
    
    async def like_current_video(self):
        """点赞当前视频"""
        try:
            # 查找点赞按钮(可能需要多种定位策略)
            like_selectors = [
                {'desc': '赞', 'className': 'android.widget.Button'},
                {'text': '赞', 'className': 'android.widget.TextView'},
                {'resourceId': 'com.ss.android.ugc.aweme:id/like_icon'},
                {'resourceId': 'com.ss.android.ugc.aweme:id/aweme_like_layout'}
            ]
            
            for selector in like_selectors:
                element = self.device(**selector)
                if element.exists:
                    # 获取元素中心位置并点击
                    bounds = element.bounds
                    center_x = (bounds['left'] + bounds['right']) // 2
                    center_y = (bounds['top'] + bounds['bottom']) // 2
                    
                    self.device.click(center_x, center_y)
                    self.stats['likes_given'] += 1
                    print(f"👍 视频点赞成功 (总计: {self.stats['likes_given']})")
                    return True
            
            print("❌ 未找到点赞按钮")
            return False
            
        except Exception as e:
            print(f"❌ 点赞操作失败: {e}")
            return False
    
    async def comment_current_video(self):
        """评论当前视频"""
        try:
            # 随机选择评论内容
            comments = [
                "很棒的视频!",
                "学到了很多,谢谢分享!",
                "支持一下,加油!",
                "内容很实用,收藏了",
                "顶顶顶!",
                "666",
                "mark一下,以后看"
            ]
            comment_text = random.choice(comments)
            
            # 查找评论按钮
            comment_selectors = [
                {'desc': '评论', 'className': 'android.widget.Button'},
                {'resourceId': 'com.ss.android.ugc.aweme:id/comment_icon'},
                {'resourceId': 'com.ss.android.ugc.aweme:id/aweme_comment_layout'}
            ]
            
            for selector in comment_selectors:
                element = self.device(**selector)
                if element.exists:
                    # 点击评论按钮
                    element.click()
                    time.sleep(1)
                    
                    # 查找评论输入框
                    input_selectors = [
                        {'resourceId': 'com.ss.android.ugc.aweme:id/chat_input_view'},
                        {'className': 'android.widget.EditText'},
                        {'text': '说点什么...'}
                    ]
                    
                    for input_selector in input_selectors:
                        input_element = self.device(**input_selector)
                        if input_element.exists:
                            # 输入评论
                            input_element.set_text(comment_text)
                            time.sleep(0.5)
                            
                            # 查找发送按钮
                            send_selectors = [
                                {'text': '发送', 'className': 'android.widget.Button'},
                                {'desc': '发送', 'className': 'android.widget.Button'},
                                {'resourceId': 'com.ss.android.ugc.aweme:id/send_btn'}
                            ]