#UI自动化控制技术
#课程目标
- 掌握主流UI自动化工具的特点和使用方法
- 学会使用Appium、Airtest、uiautomator2进行App自动化控制
- 掌握UI元素定位、图像识别、手势操作等核心技术
- 实现抖音自动点赞与评论爬虫项目
- 实现电商App商品滑动抓取与数据入库项目
- 了解跨平台自动化测试的最佳实践
#1. UI自动化工具对比与选择
#1.1 Appium - 跨平台自动化框架
Appium是业界领先的开源移动应用自动化测试框架,支持iOS、Android和Windows应用的自动化测试。
#Appium架构特点:
- 跨平台支持:一套代码,多平台运行
- 原生API访问:直接调用原生自动化框架
- 多语言支持:支持Java、Python、Ruby、PHP等多种语言
- WebDriver协议:基于标准的WebDriver协议
#Appium安装与配置:
# 安装Node.js依赖
npm install -g appium
npm install -g appium-doctor
# 检查环境配置
appium-doctor --android # 检查Android环境
appium-doctor --ios # 检查iOS环境
# 启动Appium服务器
appium -a 127.0.0.1 -p 4723 --session-override#Python Appium客户端示例:
from appium import webdriver
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class AppiumController:
"""Appium控制器类"""
def __init__(self, platform_name="Android", device_name="Android Emulator"):
self.desired_caps = {
'platformName': platform_name,
'deviceName': device_name,
'automationName': 'UiAutomator2', # Android使用UiAutomator2
'appPackage': 'com.example.app', # 替换为实际包名
'appActivity': '.MainActivity', # 替换为实际Activity
'noReset': True, # 不重置应用状态
'unicodeKeyboard': True, # 支持Unicode输入
'resetKeyboard': True # 重置键盘
}
self.driver = None
self.wait = None
def connect(self, appium_server_url="http://127.0.0.1:4723"):
"""连接到Appium服务器"""
try:
self.driver = webdriver.Remote(appium_server_url, options=None)
self.wait = WebDriverWait(self.driver, 10)
print("✅ Appium连接成功")
return True
except Exception as e:
print(f"❌ Appium连接失败: {e}")
return False
def find_element_by_id(self, element_id, timeout=10):
"""通过ID查找元素"""
try:
element = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((AppiumBy.ID, element_id))
)
return element
except Exception as e:
print(f"❌ 未找到ID为 {element_id} 的元素: {e}")
return None
def find_element_by_xpath(self, xpath, timeout=10):
"""通过XPath查找元素"""
try:
element = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((AppiumBy.XPATH, xpath))
)
return element
except Exception as e:
print(f"❌ 未找到XPath为 {xpath} 的元素: {e}")
return None
def find_elements_by_class_name(self, class_name, timeout=10):
"""通过类名查找多个元素"""
try:
elements = WebDriverWait(self.driver, timeout).until(
EC.presence_of_all_elements_located((AppiumBy.CLASS_NAME, class_name))
)
return elements
except Exception as e:
print(f"❌ 未找到类名为 {class_name} 的元素: {e}")
return []
def tap_element(self, element):
"""点击元素"""
try:
element.click()
print("✅ 元素点击成功")
return True
except Exception as e:
print(f"❌ 元素点击失败: {e}")
return False
def input_text(self, element, text):
"""输入文本"""
try:
element.clear()
element.send_keys(text)
print(f"✅ 文本输入成功: {text}")
return True
except Exception as e:
print(f"❌ 文本输入失败: {e}")
return False
def swipe_up(self, duration=1000):
"""向上滑动"""
size = self.driver.get_window_size()
start_x = size['width'] // 2
start_y = size['height'] * 3 // 4
end_x = size['width'] // 2
end_y = size['height'] // 4
self.driver.swipe(start_x, start_y, end_x, end_y, duration)
print("✅ 向上滑动成功")
def swipe_down(self, duration=1000):
"""向下滑动"""
size = self.driver.get_window_size()
start_x = size['width'] // 2
start_y = size['height'] // 4
end_x = size['width'] // 2
end_y = size['height'] * 3 // 4
self.driver.swipe(start_x, start_y, end_x, end_y, duration)
print("✅ 向下滑动成功")
def swipe_left(self, duration=1000):
"""向左滑动"""
size = self.driver.get_window_size()
start_x = size['width'] * 3 // 4
start_y = size['height'] // 2
end_x = size['width'] // 4
end_y = size['height'] // 2
self.driver.swipe(start_x, start_y, end_x, end_y, duration)
print("✅ 向左滑动成功")
def swipe_right(self, duration=1000):
"""向右滑动"""
size = self.driver.get_window_size()
start_x = size['width'] // 4
start_y = size['height'] // 2
end_x = size['width'] * 3 // 4
end_y = size['height'] // 2
self.driver.swipe(start_x, start_y, end_x, end_y, duration)
print("✅ 向右滑动成功")
def get_screenshot(self, filename):
"""截屏"""
return self.driver.save_screenshot(filename)
def get_page_source(self):
"""获取页面源码"""
return self.driver.page_source
def close_app(self):
"""关闭应用"""
if self.driver:
self.driver.quit()
print("✅ App已关闭")
# Appium使用示例
def appium_example():
"""Appium使用示例"""
controller = AppiumController()
if controller.connect():
try:
# 这里可以添加具体的自动化操作
print("Appium连接成功,可以开始自动化操作")
finally:
controller.close_app()
if __name__ == "__main__":
appium_example()#1.2 Airtest - 网易出品的快速开发工具
Airtest是网易推出的一款基于图像识别的UI自动化测试框架,特别适合游戏和复杂界面的自动化。
#Airtest特点:
- 图像识别:基于图像识别的元素定位
- UI树解析:同时支持UI树结构解析
- 快速开发:IDE支持录制回放功能
- 跨平台:支持Android、iOS、Windows、Mac、Linux
#Airtest安装与使用:
# 安装Airtest
pip install airtest
pip install pocoui # UI自动化支持
# Airtest IDE可视化工具
# 可以通过GUI界面录制和回放脚本#Airtest Python脚本示例:
from airtest.core.api import *
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
import pocounit
import time
class AirtestController:
"""Airtest控制器类"""
def __init__(self, device_id=None):
self.device_id = device_id
self.poco = None
self.init_device()
def init_device(self):
"""初始化设备连接"""
try:
if self.device_id:
connect_device(f"Android:///{self.device_id}")
else:
connect_device("Android:///") # 连接默认设备
# 初始化Poco(UI树解析器)
self.poco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False)
print("✅ Airtest设备连接成功")
except Exception as e:
print(f"❌ Airtest设备连接失败: {e}")
def find_element_by_image(self, image_path, threshold=0.7):
"""通过图像查找元素"""
try:
pos = Template(image_path, threshold=threshold).match_in(get_screen())
if pos:
print(f"✅ 图像匹配成功: {image_path}")
return pos
else:
print(f"❌ 未找到图像: {image_path}")
return None
except Exception as e:
print(f"❌ 图像匹配失败: {e}")
return None
def find_element_by_text(self, text, timeout=10):
"""通过文本查找元素"""
try:
element = self.poco(text=text).wait(timeout)
if element.exists():
print(f"✅ 找到文本元素: {text}")
return element
else:
print(f"❌ 未找到文本元素: {text}")
return None
except Exception as e:
print(f"❌ 查找文本元素失败: {e}")
return None
def find_element_by_name(self, name, timeout=10):
"""通过名称查找元素"""
try:
element = self.poco(name=name).wait(timeout)
if element.exists():
print(f"✅ 找到名称元素: {name}")
return element
else:
print(f"❌ 未找到名称元素: {name}")
return None
except Exception as e:
print(f"❌ 查找名称元素失败: {e}")
return None
def click_element(self, element):
"""点击元素"""
try:
if hasattr(element, 'click'):
element.click()
else:
touch(element)
print("✅ 元素点击成功")
return True
except Exception as e:
print(f"❌ 元素点击失败: {e}")
return False
def swipe_element(self, element, direction="up", duration=0.5):
"""滑动元素"""
try:
if direction == "up":
element.swipe([0, -0.1])
elif direction == "down":
element.swipe([0, 0.1])
elif direction == "left":
element.swipe([-0.1, 0])
elif direction == "right":
element.swipe([0.1, 0])
print(f"✅ {direction}方向滑动成功")
return True
except Exception as e:
print(f"❌ 滑动失败: {e}")
return False
def input_text(self, element, text):
"""输入文本"""
try:
element.set_text(text)
print(f"✅ 文本输入成功: {text}")
return True
except Exception as e:
print(f"❌ 文本输入失败: {e}")
return False
def get_ui_tree(self):
"""获取UI树"""
try:
ui_tree = self.poco.agent.hierarchy.dump()
print("✅ UI树获取成功")
return ui_tree
except Exception as e:
print(f"❌ UI树获取失败: {e}")
return None
def wait_element_appear(self, element_name, timeout=10):
"""等待元素出现"""
try:
result = self.poco(element_name).wait(timeout)
print(f"✅ 元素 {element_name} 出现")
return result
except Exception as e:
print(f"❌ 元素 {element_name} 未在{timeout}秒内出现: {e}")
return None
def assert_element_exists(self, element_name, timeout=5):
"""断言元素存在"""
try:
exists = self.poco(element_name).exists()
if exists:
print(f"✅ 元素 {element_name} 存在")
return True
else:
print(f"❌ 元素 {element_name} 不存在")
return False
except Exception as e:
print(f"❌ 元素存在性检查失败: {e}")
return False
def take_screenshot(self, filename=None):
"""截屏"""
if not filename:
filename = f"screenshot_{int(time.time())}.png"
try:
snapshot(filename=filename)
print(f"✅ 截屏保存: {filename}")
return filename
except Exception as e:
print(f"❌ 截屏失败: {e}")
return None
# Airtest使用示例
def airtest_example():
"""Airtest使用示例"""
controller = AirtestController()
# 示例操作
controller.take_screenshot("initial_state.png")
# 通过文本查找元素
search_btn = controller.find_element_by_text("搜索")
if search_btn:
controller.click_element(search_btn)
# 通过名称查找元素
input_field = controller.find_element_by_name("输入框")
if input_field:
controller.input_text(input_field, "测试内容")
# 等待元素出现
result = controller.wait_element_appear("结果列表")
if result:
print("找到了结果列表")
print("Airtest示例执行完成")
if __name__ == "__main__":
airtest_example()#1.3 uiautomator2 - Python专属轻量级工具
uiautomator2是基于Android uiautomator框架的Python封装,专为Python开发者设计。
#uiautomator2特点:
- Python原生:纯Python接口,易于使用
- 轻量级:相比Appium更轻量,启动更快
- 功能丰富:支持触摸、滑动、截图、文本输入等
- 稳定性好:基于Android官方uiautomator框架
#uiautomator2安装与初始化:
# 安装uiautomator2
pip install uiautomator2
# 初始化设备(首次使用需要)
import uiautomator2 as u2
d = u2.connect() # 连接默认设备
d.app_install("com.example.app") # 安装应用
d.healthcheck() # 检查ATX Agent状态#uiautomator2核心功能示例:
import uiautomator2 as u2
import time
import cv2
import numpy as np
from typing import Dict, List, Optional, Tuple
import re
class Uiautomator2Controller:
"""uiautomator2控制器类"""
def __init__(self, device_id: str = None):
self.device_id = device_id
self.device = None
self.connect_device()
def connect_device(self):
"""连接设备"""
try:
if self.device_id:
self.device = u2.connect(self.device_id)
else:
self.device = u2.connect() # 连接默认设备
if self.device.alive:
print(f"✅ uiautomator2连接成功: {self.device.serial}")
# 启动ATX Agent健康检查
self.device.healthcheck()
return True
else:
print("❌ uiautomator2连接失败")
return False
except Exception as e:
print(f"❌ uiautomator2连接异常: {e}")
return False
def get_device_info(self) -> Dict:
"""获取设备信息"""
try:
info = self.device.info
return {
'serial': self.device.serial,
'productName': info.get('productName', ''),
'brand': info.get('brand', ''),
'manufacturer': info.get('manufacturer', ''),
'displayWidth': info.get('displayWidth', 0),
'displayHeight': info.get('displayHeight', 0),
'sdkInt': info.get('sdkInt', 0),
'currentPackageName': self.device.app_current().get('package', ''),
'rotation': info.get('rotation', 0)
}
except Exception as e:
print(f"❌ 获取设备信息失败: {e}")
return {}
def start_app(self, package_name: str, activity: str = None) -> bool:
"""启动应用"""
try:
if activity:
self.device.app_start(package_name, activity)
else:
self.device.app_start(package_name)
time.sleep(2) # 等待应用启动
print(f"✅ 应用启动成功: {package_name}")
return True
except Exception as e:
print(f"❌ 应用启动失败: {e}")
return False
def stop_app(self, package_name: str) -> bool:
"""停止应用"""
try:
self.device.app_stop(package_name)
print(f"✅ 应用停止成功: {package_name}")
return True
except Exception as e:
print(f"❌ 应用停止失败: {e}")
return False
def find_element_by_text(self, text: str, exact_match: bool = True) -> Optional[u2.UiObject]:
"""通过文本查找元素"""
try:
if exact_match:
element = self.device(text=text)
else:
element = self.device(textMatches=f".*{text}.*")
if element.exists:
print(f"✅ 找到文本元素: {text}")
return element
else:
print(f"❌ 未找到文本元素: {text}")
return None
except Exception as e:
print(f"❌ 查找文本元素失败: {e}")
return None
def find_element_by_id(self, resource_id: str) -> Optional[u2.UiObject]:
"""通过ID查找元素"""
try:
element = self.device(resourceId=resource_id)
if element.exists:
print(f"✅ 找到ID元素: {resource_id}")
return element
else:
print(f"❌ 未找到ID元素: {resource_id}")
return None
except Exception as e:
print(f"❌ 查找ID元素失败: {e}")
return None
def find_element_by_class(self, class_name: str) -> Optional[u2.UiObject]:
"""通过类名查找元素"""
try:
element = self.device(className=class_name)
if element.exists:
print(f"✅ 找到类名元素: {class_name}")
return element
else:
print(f"❌ 未找到类名元素: {class_name}")
return None
except Exception as e:
print(f"❌ 查找类名元素失败: {e}")
return None
def find_elements_by_selector(self, **kwargs) -> List[u2.UiObject]:
"""通过选择器查找多个元素"""
try:
elements = self.device(**kwargs).all()
count = len(elements)
print(f"✅ 找到 {count} 个元素")
return elements
except Exception as e:
print(f"❌ 查找多个元素失败: {e}")
return []
def click_element(self, element: u2.UiObject) -> bool:
"""点击元素"""
try:
element.click()
print("✅ 元素点击成功")
return True
except Exception as e:
print(f"❌ 元素点击失败: {e}")
return False
def long_click_element(self, element: u2.UiObject) -> bool:
"""长按元素"""
try:
element.long_click()
print("✅ 元素长按成功")
return True
except Exception as e:
print(f"❌ 元素长按失败: {e}")
return False
def double_click_element(self, element: u2.UiObject) -> bool:
"""双击元素"""
try:
element.double_click()
print("✅ 元素双击成功")
return True
except Exception as e:
print(f"❌ 元素双击失败: {e}")
return False
def input_text(self, element: u2.UiObject, text: str) -> bool:
"""在元素中输入文本"""
try:
element.set_text(text)
print(f"✅ 文本输入成功: {text}")
return True
except Exception as e:
print(f"❌ 文本输入失败: {e}")
return False
def swipe_up(self, duration: float = 0.5) -> bool:
"""向上滑动"""
try:
self.device.swipe_ext("up", scale=0.5, duration=duration)
print("✅ 向上滑动成功")
return True
except Exception as e:
print(f"❌ 向上滑动失败: {e}")
return False
def swipe_down(self, duration: float = 0.5) -> bool:
"""向下滑动"""
try:
self.device.swipe_ext("down", scale=0.5, duration=duration)
print("✅ 向下滑动成功")
return True
except Exception as e:
print(f"❌ 向下滑动失败: {e}")
return False
def swipe_left(self, duration: float = 0.5) -> bool:
"""向左滑动"""
try:
self.device.swipe_ext("left", scale=0.5, duration=duration)
print("✅ 向左滑动成功")
return True
except Exception as e:
print(f"❌ 向左滑动失败: {e}")
return False
def swipe_right(self, duration: float = 0.5) -> bool:
"""向右滑动"""
try:
self.device.swipe_ext("right", scale=0.5, duration=duration)
print("✅ 向右滑动成功")
return True
except Exception as e:
print(f"❌ 向右滑动失败: {e}")
return False
def swipe_to_find_element(self, text: str, direction: str = "down", max_swipes: int = 10) -> Optional[u2.UiObject]:
"""滑动查找元素"""
for i in range(max_swipes):
element = self.find_element_by_text(text)
if element:
return element
if direction == "down":
self.swipe_down()
elif direction == "up":
self.swipe_up()
elif direction == "left":
self.swipe_left()
elif direction == "right":
self.swipe_right()
time.sleep(0.5) # 等待滑动完成
print(f"❌ 在 {max_swipes} 次滑动后未找到元素: {text}")
return None
def get_window_size(self) -> Tuple[int, int]:
"""获取屏幕尺寸"""
return self.device.window_size()
def take_screenshot(self, filename: str = None) -> str:
"""截屏"""
if not filename:
filename = f"screenshot_{int(time.time())}.png"
try:
self.device.screenshot(filename)
print(f"✅ 截屏保存: {filename}")
return filename
except Exception as e:
print(f"❌ 截屏失败: {e}")
return ""
def get_page_xml(self) -> str:
"""获取页面XML结构"""
try:
xml_content = self.device.dump_hierarchy()
print("✅ 页面XML获取成功")
return xml_content
except Exception as e:
print(f"❌ 页面XML获取失败: {e}")
return ""
def wait_until_element_appears(self, element_selector: Dict, timeout: int = 10) -> bool:
"""等待元素出现"""
try:
element = self.device(**element_selector)
result = element.wait(timeout=timeout)
if result:
print("✅ 元素出现")
return True
else:
print(f"❌ 元素在 {timeout} 秒内未出现")
return False
except Exception as e:
print(f"❌ 等待元素出现失败: {e}")
return False
def wait_until_element_disappears(self, element_selector: Dict, timeout: int = 10) -> bool:
"""等待元素消失"""
try:
element = self.device(**element_selector)
result = element.wait_gone(timeout=timeout)
if result:
print("✅ 元素消失")
return True
else:
print(f"❌ 元素在 {timeout} 秒内未消失")
return False
except Exception as e:
print(f"❌ 等待元素消失失败: {e}")
return False
def press_key(self, key: str) -> bool:
"""按键操作"""
try:
self.device.press(key)
print(f"✅ 按键操作成功: {key}")
return True
except Exception as e:
print(f"❌ 按键操作失败: {e}")
return False
def back(self) -> bool:
"""返回键"""
return self.press_key("back")
def home(self) -> bool:
"""Home键"""
return self.press_key("home")
def recent(self) -> bool:
"""最近任务键"""
return self.press_key("recent")
def sleep(self) -> bool:
"""睡眠键"""
return self.press_key("sleep")
def wake_up(self) -> bool:
"""唤醒设备"""
try:
self.device.wake_up()
print("✅ 设备唤醒成功")
return True
except Exception as e:
print(f"❌ 设备唤醒失败: {e}")
return False
def unlock(self) -> bool:
"""解锁设备"""
try:
self.device.unlock()
print("✅ 设备解锁成功")
return True
except Exception as e:
print(f"❌ 设备解锁失败: {e}")
return False
def freeze_rotation(self) -> bool:
"""冻结旋转"""
try:
self.device.freeze_rotation()
print("✅ 旋转已冻结")
return True
except Exception as e:
print(f"❌ 冻结旋转失败: {e}")
return False
def unfreeze_rotation(self) -> bool:
"""解除旋转冻结"""
try:
self.device.unfreeze_rotation()
print("✅ 旋转已解冻")
return True
except Exception as e:
print(f"❌ 解除旋转冻结失败: {e}")
return False
def get_clipboard(self) -> str:
"""获取剪贴板内容"""
try:
content = self.device.clipboard
print("✅ 剪贴板内容获取成功")
return content
except Exception as e:
print(f"❌ 剪贴板内容获取失败: {e}")
return ""
def set_clipboard(self, text: str) -> bool:
"""设置剪贴板内容"""
try:
self.device.set_clipboard(text)
print(f"✅ 剪贴板内容设置成功: {text}")
return True
except Exception as e:
print(f"❌ 剪贴板内容设置失败: {e}")
return False
# uiautomator2使用示例
def u2_example():
"""uiautomator2使用示例"""
controller = Uiautomator2Controller()
if controller.connect_device():
# 获取设备信息
device_info = controller.get_device_info()
print(f"设备信息: {device_info}")
# 截屏
controller.take_screenshot("test_screenshot.png")
# 模拟一些操作
controller.home()
time.sleep(1)
controller.wake_up()
print("uiautomator2示例执行完成")
else:
print("无法连接到设备")
if __name__ == "__main__":
u2_example()#2. UI元素定位技术详解
#2.1 元素定位策略
在UI自动化中,准确的元素定位是关键。不同工具提供了多种定位方式:
#Appium元素定位方式:
# Appium元素定位示例
from appium.webdriver.common.appiumby import AppiumBy
class ElementLocator:
"""元素定位器"""
def __init__(self, driver):
self.driver = driver
def locate_by_id(self, locator):
"""通过ID定位"""
return self.driver.find_element(AppiumBy.ID, locator)
def locate_by_xpath(self, locator):
"""通过XPath定位"""
return self.driver.find_element(AppiumBy.XPATH, locator)
def locate_by_accessibility_id(self, locator):
"""通过无障碍ID定位(iOS/Android)"""
return self.driver.find_element(AppiumBy.ACCESSIBILITY_ID, locator)
def locate_by_android_uiautomator(self, locator):
"""通过Android UI Automator定位"""
return self.driver.find_element(AppiumBy.ANDROID_UIAUTOMATOR, locator)
def locate_by_ios_predicate(self, locator):
"""通过iOS Predicate定位"""
return self.driver.find_element(AppiumBy.IOS_PREDICATE, locator)
def locate_by_ios_class_chain(self, locator):
"""通过iOS Class Chain定位"""
return self.driver.find_element(AppiumBy.IOS_CLASS_CHAIN, locator)
def locate_by_class_name(self, locator):
"""通过类名定位"""
return self.driver.find_element(AppiumBy.CLASS_NAME, locator)
def locate_by_css_selector(self, locator):
"""通过CSS选择器定位"""
return self.driver.find_element(AppiumBy.CSS_SELECTOR, locator)
def locate_by_name(self, locator):
"""通过名称定位"""
return self.driver.find_element(AppiumBy.NAME, locator)
def locate_by_link_text(self, locator):
"""通过链接文本定位"""
return self.driver.find_element(AppiumBy.LINK_TEXT, locator)#uiautomator2元素定位方式:
# uiautomator2元素定位示例
class U2ElementLocator:
"""uiautomator2元素定位器"""
def __init__(self, device):
self.device = device
def locate_by_text(self, text, exact_match=True):
"""通过文本定位"""
if exact_match:
return self.device(text=text)
else:
return self.device(textMatches=f".*{text}.*")
def locate_by_resource_id(self, resource_id):
"""通过资源ID定位"""
return self.device(resourceId=resource_id)
def locate_by_class_name(self, class_name):
"""通过类名定位"""
return self.device(className=class_name)
def locate_by_description(self, description):
"""通过描述定位"""
return self.device(description=description)
def locate_by_content_desc(self, content_desc):
"""通过内容描述定位"""
return self.device(descriptionMatches=f".*{content_desc}.*")
def locate_by_index(self, index):
"""通过索引定位"""
return self.device(index=index)
def locate_by_instance(self, instance):
"""通过实例定位"""
return self.device(instance=instance)
def locate_by_bounds(self, left, top, right, bottom):
"""通过边界定位"""
return self.device(bounds={"left": left, "top": top, "right": right, "bottom": bottom})
def locate_by_selected(self, selected=True):
"""通过选中状态定位"""
return self.device(selected=selected)
def locate_by_scrollable(self, scrollable=True):
"""通过可滚动状态定位"""
return self.device(scrollable=scrollable)
def locate_by_clickable(self, clickable=True):
"""通过可点击状态定位"""
return self.device(clickable=clickable)
def locate_by_long_clickable(self, long_clickable=True):
"""通过可长按状态定位"""
return self.device(longClickable=long_clickable)
def locate_by_enabled(self, enabled=True):
"""通过启用状态定位"""
return self.device(enabled=enabled)
def locate_by_focused(self, focused=True):
"""通过焦点状态定位"""
return self.device(focused=focused)
def locate_by_checkable(self, checkable=True):
"""通过可勾选状态定位"""
return self.device(checkable=checkable)
def locate_by_checked(self, checked=True):
"""通过勾选状态定位"""
return self.device(checked=checked)
def locate_by_child_selector(self, child_selector):
"""通过子元素选择器定位"""
return self.device(childSelector=child_selector)
def locate_by_sibling_selector(self, sibling_selector):
"""通过兄弟元素选择器定位"""
return self.device(siblingSelector=sibling_selector)
def locate_by_multiple_conditions(self, **conditions):
"""通过多个条件定位"""
return self.device(**conditions)#2.2 图像识别定位技术
图像识别是Airtest的核心技术,特别适用于无法通过UI树定位的场景。
#图像识别原理:
import cv2
import numpy as np
from typing import Tuple, List, Optional
class ImageMatcher:
"""图像匹配器"""
def __init__(self):
pass
def template_match(self, screenshot: np.ndarray, template: np.ndarray,
threshold: float = 0.8) -> List[Tuple[int, int, float]]:
"""模板匹配"""
# 转换为灰度图
screenshot_gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY)
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
# 执行模板匹配
result = cv2.matchTemplate(screenshot_gray, template_gray, cv2.TM_CCOEFF_NORMED)
# 找到匹配位置
locations = np.where(result >= threshold)
matches = []
for pt in zip(*locations[::-1]): # x, y
confidence = float(result[pt[1], pt[0]])
matches.append((pt[0], pt[1], confidence))
return matches
def multi_scale_template_match(self, screenshot: np.ndarray, template: np.ndarray,
scales: List[float] = None, threshold: float = 0.7) -> List[Tuple[int, int, float, float]]:
"""多尺度模板匹配"""
if scales is None:
scales = [0.8, 0.9, 1.0, 1.1, 1.2]
all_matches = []
for scale in scales:
# 缩放模板
h, w = template.shape[:2]
new_w, new_h = int(w * scale), int(h * scale)
scaled_template = cv2.resize(template, (new_w, new_h))
# 执行匹配
matches = self.template_match(screenshot, scaled_template, threshold)
# 调整坐标到原始尺寸
scaled_matches = [(int(x/scale), int(y/scale), conf, scale) for x, y, conf in matches]
all_matches.extend(scaled_matches)
return all_matches
def feature_match(self, screenshot: np.ndarray, template: np.ndarray,
detector_type: str = 'SIFT', threshold: float = 0.7) -> List[Tuple[int, int, float]]:
"""特征点匹配"""
# 使用SIFT或ORB特征检测
if detector_type == 'SIFT':
detector = cv2.SIFT_create()
elif detector_type == 'ORB':
detector = cv2.ORB_create()
else:
raise ValueError("Unsupported detector type")
# 检测特征点
kp1, des1 = detector.detectAndCompute(template, None)
kp2, des2 = detector.detectAndCompute(screenshot, None)
if des1 is None or des2 is None:
return []
# 特征匹配
bf = cv2.BFMatcher()
matches = bf.knnMatch(des1, des2, k=2)
# 应用 Lowe's ratio test
good_matches = []
for m, n in matches:
if m.distance < threshold * n.distance:
good_matches.append(m)
# 计算匹配位置
if len(good_matches) >= 4:
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
# 计算变换矩阵
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
if M is not None:
# 获取模板在截图中的位置
h, w = template.shape[:2]
pts = np.float32([[0, 0], [0, h-1], [w-1, h-1], [w-1, 0]]).reshape(-1, 1, 2)
dst = cv2.perspectiveTransform(pts, M)
# 计算中心点
center_x = int(np.mean(dst[:, 0, 0]))
center_y = int(np.mean(dst[:, 0, 1]))
return [(center_x, center_y, len(good_matches) / len(matches))]
return []
# 图像识别在自动化中的应用
class VisionBasedAutomation:
"""基于视觉的自动化"""
def __init__(self):
self.matcher = ImageMatcher()
def find_element_by_image(self, screenshot_path: str, template_path: str,
threshold: float = 0.8) -> Optional[Tuple[int, int]]:
"""通过图像查找元素位置"""
screenshot = cv2.imread(screenshot_path)
template = cv2.imread(template_path)
if screenshot is None or template is None:
return None
matches = self.matcher.template_match(screenshot, template, threshold)
if matches:
# 返回最佳匹配
best_match = max(matches, key=lambda x: x[2]) # 按置信度排序
return (best_match[0], best_match[1])
return None
def find_multiple_elements_by_image(self, screenshot_path: str, template_path: str,
threshold: float = 0.7) -> List[Tuple[int, int]]:
"""查找多个相同元素"""
screenshot = cv2.imread(screenshot_path)
template = cv2.imread(template_path)
if screenshot is None or template is None:
return []
matches = self.matcher.template_match(screenshot, template, threshold)
# 过滤重叠的匹配(非极大值抑制)
filtered_matches = self.non_max_suppression(matches, overlap_threshold=0.3)
return [(x, y) for x, y, conf in filtered_matches]
def non_max_suppression(self, matches: List[Tuple[int, int, float]],
overlap_threshold: float = 0.3) -> List[Tuple[int, int, float]]:
"""非极大值抑制,去除重叠的检测框"""
if not matches:
return []
# 按置信度排序
matches = sorted(matches, key=lambda x: x[2], reverse=True)
keep = []
while matches:
# 保留置信度最高的
best = matches.pop(0)
keep.append(best)
# 计算与其他检测框的重叠度
remaining = []
for match in matches:
# 简单的欧氏距离判断重叠
distance = np.sqrt((best[0] - match[0])**2 + (best[1] - match[1])**2)
# 假设模板大小为100x100像素
min_dim = min(abs(best[0] - match[0]), abs(best[1] - match[1]))
if distance > 50: # 避免距离过近的重复检测
remaining.append(match)
matches = remaining
return keep
def vision_automation_example():
"""视觉自动化示例"""
automation = VisionBasedAutomation()
# 示例:查找点赞按钮
# position = automation.find_element_by_image("screenshot.png", "like_button_template.png")
# if position:
# print(f"找到点赞按钮位置: {position}")
# # 点击该位置
# # controller.tap(position[0], position[1])
print("视觉自动化示例完成")
if __name__ == "__main__":
vision_automation_example()#3. 手势操作与交互技术
#3.1 基础手势操作
import time
from typing import List, Tuple
import random
class GestureController:
"""手势控制器"""
def __init__(self, device_controller):
self.device = device_controller
def tap(self, x: int, y: int, duration: float = 0.1) -> bool:
"""点击操作"""
try:
# 添加随机偏移增加真实性
offset_x = random.randint(-2, 2)
offset_y = random.randint(-2, 2)
real_x = max(0, x + offset_x)
real_y = max(0, y + offset_y)
self.device.touch.down(real_x, real_y)
time.sleep(duration)
self.device.touch.up(real_x, real_y)
print(f"✅ 点击操作成功: ({real_x}, {real_y})")
return True
except Exception as e:
print(f"❌ 点击操作失败: {e}")
return False
def long_press(self, x: int, y: int, duration: float = 1.0) -> bool:
"""长按操作"""
try:
self.device.touch.down(x, y)
time.sleep(duration)
self.device.touch.up(x, y)
print(f"✅ 长按操作成功: ({x}, {y}), 时长: {duration}s")
return True
except Exception as e:
print(f"❌ 长按操作失败: {e}")
return False
def swipe(self, start_x: int, start_y: int, end_x: int, end_y: int,
duration: float = 0.5, steps: int = 5) -> bool:
"""滑动操作"""
try:
# 分步滑动,模拟真实滑动轨迹
dx = (end_x - start_x) / steps
dy = (end_y - start_y) / steps
step_duration = duration / steps
self.device.touch.down(start_x, start_y)
time.sleep(step_duration * 0.1) # 初始停顿
for i in range(1, steps + 1):
current_x = int(start_x + dx * i)
current_y = int(start_y + dy * i)
# 添加轻微的随机偏移,模拟人手滑动的不规则性
offset_x = random.randint(-1, 1)
offset_y = random.randint(-1, 1)
self.device.touch.move(current_x + offset_x, current_y + offset_y)
time.sleep(step_duration)
self.device.touch.up(end_x, end_y)
print(f"✅ 滑动操作成功: ({start_x}, {start_y}) -> ({end_x}, {end_y})")
return True
except Exception as e:
print(f"❌ 滑动操作失败: {e}")
return False
def flick(self, start_x: int, start_y: int, end_x: int, end_y: int,
duration: float = 0.1) -> bool:
"""快速滑动(轻拂)"""
try:
self.device.touch.down(start_x, start_y)
time.sleep(0.05) # 短暂停顿
self.device.touch.move(end_x, end_y)
time.sleep(duration)
self.device.touch.up(end_x, end_y)
print(f"✅ 轻拂操作成功: ({start_x}, {start_y}) -> ({end_x}, {end_y})")
return True
except Exception as e:
print(f"❌ 轻拂操作失败: {e}")
return False
def pinch(self, center_x: int, center_y: int,
start_radius: int, end_radius: int, duration: float = 1.0) -> bool:
"""双指缩放(捏合/展开)"""
try:
# 计算两个手指的起始位置
start_x1 = center_x - start_radius
start_y1 = center_y
start_x2 = center_x + start_radius
start_y2 = center_y
end_x1 = center_x - end_radius
end_y1 = center_y
end_x2 = center_x + end_radius
end_y2 = center_y
# 按下两个点
self.device.touch.down(start_x1, start_y1)
self.device.touch.down(start_x2, start_y2)
time.sleep(0.1)
# 同时移动两个点
steps = 10
for i in range(steps):
t = i / (steps - 1)
curr_x1 = int(start_x1 + (end_x1 - start_x1) * t)
curr_y1 = int(start_y1 + (end_y1 - start_y1) * t)
curr_x2 = int(start_x2 + (end_x2 - start_x2) * t)
curr_y2 = int(start_y2 + (end_y2 - start_y2) * t)
self.device.touch.move(curr_x1, curr_y1, 0) # 第一个手指
self.device.touch.move(curr_x2, curr_y2, 1) # 第二个手指
time.sleep(duration / steps)
# 抬起两个点
self.device.touch.up(end_x1, end_y1, 0)
self.device.touch.up(end_x2, end_y2, 1)
action = "缩小" if end_radius < start_radius else "放大"
print(f"✅ 缩放操作成功: {action} {start_radius}px -> {end_radius}px")
return True
except Exception as e:
print(f"❌ 缩放操作失败: {e}")
return False
def drag(self, start_x: int, start_y: int, end_x: int, end_y: int,
duration: float = 1.0, press_duration: float = 0.5) -> bool:
"""拖拽操作"""
try:
self.device.touch.down(start_x, start_y)
time.sleep(press_duration) # 按住一段时间
# 平滑移动到目标位置
steps = int(duration / 0.05) # 每0.05秒一步
for i in range(steps):
t = i / (steps - 1) if steps > 1 else 1
curr_x = int(start_x + (end_x - start_x) * t)
curr_y = int(start_y + (end_y - start_y) * t)
self.device.touch.move(curr_x, curr_y)
time.sleep(duration / steps)
self.device.touch.up(end_x, end_y)
print(f"✅ 拖拽操作成功: ({start_x}, {start_y}) -> ({end_x}, {end_y})")
return True
except Exception as e:
print(f"❌ 拖拽操作失败: {e}")
return False
def multi_touch_tap(self, positions: List[Tuple[int, int]]) -> bool:
"""多点触控点击"""
try:
# 同时按下所有点
for i, (x, y) in enumerate(positions):
self.device.touch.down(x, y, i) # 使用不同的pointer id
time.sleep(0.05) # 错开按下时间
time.sleep(0.1) # 保持按下状态
# 同时抬起所有点
for i, (x, y) in enumerate(positions):
self.device.touch.up(x, y, i)
print(f"✅ 多点触控点击成功: {len(positions)}个点")
return True
except Exception as e:
print(f"❌ 多点触控点击失败: {e}")
return False
def draw_pattern(self, points: List[Tuple[int, int]],
duration: float = 2.0) -> bool:
"""绘制图案"""
try:
if len(points) < 2:
print("❌ 点数不足,无法绘制图案")
return False
start_x, start_y = points[0]
self.device.touch.down(start_x, start_y)
time.sleep(0.1)
steps = len(points) - 1
step_duration = duration / steps if steps > 0 else duration
for i in range(1, len(points)):
end_x, end_y = points[i]
# 在两点间平滑移动
segment_steps = 5 # 每段分5步
dx = (end_x - points[i-1][0]) / segment_steps
dy = (end_y - points[i-1][1]) / segment_steps
for j in range(segment_steps):
curr_x = int(points[i-1][0] + dx * (j + 1))
curr_y = int(points[i-1][1] + dy * (j + 1))
self.device.touch.move(curr_x, curr_y)
time.sleep(step_duration / segment_steps)
self.device.touch.up(points[-1][0], points[-1][1])
print(f"✅ 图案绘制成功: {len(points)}个点")
return True
except Exception as e:
print(f"❌ 图案绘制失败: {e}")
return False
# 手势操作示例
def gesture_example():
"""手势操作示例"""
# 这里需要一个实际的设备控制器
print("手势操作示例完成")
print("实际使用时需要连接真实设备")#3.2 高级交互技术
class AdvancedInteraction:
"""高级交互技术"""
def __init__(self, device_controller):
self.device = device_controller
self.gesture_controller = GestureController(device_controller)
def simulate_human_like_behavior(self, base_delay: float = 1.0,
variation: float = 0.5) -> float:
"""模拟人类行为延时"""
delay = base_delay + random.uniform(-variation, variation)
# 添加一些不规律性
if random.random() < 0.1: # 10%概率添加额外延时
delay += random.uniform(0.5, 2.0)
time.sleep(max(0.1, delay)) # 确保至少0.1秒延时
return delay
def scroll_with_content_detection(self, direction: str = "down",
max_scrolls: int = 10) -> List[dict]:
"""滚动并检测内容变化"""
content_changes = []
previous_content = self.device.dump_hierarchy()
for i in range(max_scrolls):
if direction == "down":
self.gesture_controller.swipe(500, 1500, 500, 500, duration=0.8)
elif direction == "up":
self.gesture_controller.swipe(500, 500, 500, 1500, duration=0.8)
elif direction == "left":
self.gesture_controller.swipe(1000, 1000, 200, 1000, duration=0.8)
elif direction == "right":
self.gesture_controller.swipe(200, 1000, 1000, 1000, duration=0.8)
# 等待内容加载
self.simulate_human_like_behavior(1.0, 0.3)
current_content = self.device.dump_hierarchy()
# 检测内容是否发生变化
if current_content != previous_content:
change_info = {
'scroll_number': i + 1,
'content_changed': True,
'timestamp': time.time()
}
content_changes.append(change_info)
previous_content = current_content
else:
# 如果内容没有变化,可能已经到达底部
change_info = {
'scroll_number': i + 1,
'content_changed': False,
'timestamp': time.time()
}
content_changes.append(change_info)
if i > 3: # 如果连续几次都没有变化,认为到底了
break
return content_changes
def adaptive_element_interaction(self, element_locator, action: str = "click",
max_attempts: int = 3) -> bool:
"""自适应元素交互"""
for attempt in range(max_attempts):
try:
# 尝试定位元素
element = self.locate_element_with_fallback(element_locator)
if element and element.exists:
# 根据元素类型选择合适的交互方式
element_bounds = element.bounds
center_x = (element_bounds['left'] + element_bounds['right']) // 2
center_y = (element_bounds['top'] + element_bounds['bottom']) // 2
if action == "click":
success = self.gesture_controller.tap(center_x, center_y)
elif action == "long_press":
success = self.gesture_controller.long_press(center_x, center_y, 1.0)
elif action == "double_click":
success = (self.gesture_controller.tap(center_x, center_y, 0.1) and
self.simulate_human_like_behavior(0.2, 0.1) and
self.gesture_controller.tap(center_x, center_y, 0.1))
if success:
print(f"✅ 第{attempt + 1}次尝试成功执行{action}")
return True
else:
print(f"⚠️ 第{attempt + 1}次尝试交互失败")
# 如果元素不存在或交互失败,等待后重试
self.simulate_human_like_behavior(2.0, 0.5)
except Exception as e:
print(f"❌ 第{attempt + 1}次尝试异常: {e}")
if attempt < max_attempts - 1:
self.simulate_human_like_behavior(3.0, 1.0)
print(f"❌ 经过{max_attempts}次尝试仍未能成功交互")
return False
def locate_element_with_fallback(self, locator):
"""多策略元素定位"""
strategies = [
lambda: self.device(**locator), # 直接使用传入的定位器
lambda: self.device(**{k: v for k, v in locator.items() if k != 'description'}, descriptionMatches=f".*{locator.get('text', locator.get('resourceId', ''))}.*") if any(k in locator for k in ['text', 'resourceId']) else None,
lambda: self.device(textMatches=f".*{locator.get('text', '')}.*") if 'text' in locator else None,
lambda: self.device(resourceIdMatches=f".*{locator.get('resourceId', '')}.*") if 'resourceId' in locator else None,
]
for strategy in strategies:
try:
element = strategy()
if element and element.exists:
return element
except:
continue
return None
def batch_operation_with_error_recovery(self, operations: List[dict]) -> dict:
"""批量操作与错误恢复"""
results = {
'successful_operations': [],
'failed_operations': [],
'recovered_operations': [],
'total_operations': len(operations)
}
for i, operation in enumerate(operations):
try:
op_result = self.execute_single_operation(operation)
if op_result['success']:
results['successful_operations'].append({
'operation_index': i,
'operation_type': operation['type'],
'details': op_result.get('details', {})
})
else:
# 操作失败,尝试恢复
recovery_result = self.attempt_operation_recovery(operation, op_result)
if recovery_result['success']:
results['recovered_operations'].append({
'operation_index': i,
'operation_type': operation['type'],
'recovery_method': recovery_result.get('method', 'unknown')
})
else:
results['failed_operations'].append({
'operation_index': i,
'operation_type': operation['type'],
'error': recovery_result.get('error', 'Unknown error')
})
# 操作间隔
self.simulate_human_like_behavior(1.0, 0.3)
except Exception as e:
results['failed_operations'].append({
'operation_index': i,
'operation_type': operation.get('type', 'unknown'),
'error': str(e)
})
return results
def execute_single_operation(self, operation: dict) -> dict:
"""执行单个操作"""
op_type = operation.get('type', '')
try:
if op_type == 'click':
element_locator = operation['locator']
element = self.locate_element_with_fallback(element_locator)
if element and element.exists:
bounds = element.bounds
center_x = (bounds['left'] + bounds['right']) // 2
center_y = (bounds['top'] + bounds['bottom']) // 2
success = self.gesture_controller.tap(center_x, center_y)
return {'success': success, 'details': {'element_found': True}}
else:
return {'success': False, 'details': {'element_found': False}}
elif op_type == 'swipe':
direction = operation['direction']
distance = operation.get('distance', 1000)
duration = operation.get('duration', 0.8)
screen_width, screen_height = self.device.window_size()
if direction == 'up':
success = self.gesture_controller.swipe(
screen_width//2, screen_height*3//4,
screen_width//2, screen_height//4,
duration=duration
)
elif direction == 'down':
success = self.gesture_controller.swipe(
screen_width//2, screen_height//4,
screen_width//2, screen_height*3//4,
duration=duration
)
elif direction == 'left':
success = self.gesture_controller.swipe(
screen_width*3//4, screen_height//2,
screen_width//4, screen_height//2,
duration=duration
)
elif direction == 'right':
success = self.gesture_controller.swipe(
screen_width//4, screen_height//2,
screen_width*3//4, screen_height//2,
duration=duration
)
else:
return {'success': False, 'details': {'invalid_direction': direction}}
return {'success': success, 'details': {'swipe_direction': direction}}
elif op_type == 'input_text':
element_locator = operation['locator']
text = operation['text']
element = self.locate_element_with_fallback(element_locator)
if element and element.exists:
element.set_text(text)
return {'success': True, 'details': {'text_entered': text}}
else:
return {'success': False, 'details': {'element_found': False}}
else:
return {'success': False, 'details': {'unsupported_operation': op_type}}
except Exception as e:
return {'success': False, 'details': {'exception': str(e)}}
def attempt_operation_recovery(self, operation: dict, failure_result: dict) -> dict:
"""尝试操作恢复"""
op_type = operation.get('type', '')
try:
# 尝试不同的恢复策略
if op_type in ['click', 'input_text']:
# 策略1: 等待一段时间后重试
time.sleep(3)
retry_result = self.execute_single_operation(operation)
if retry_result['success']:
return {'success': True, 'method': 'retry_after_wait'}
# 策略2: 检查应用是否还在前台
current_app = self.device.app_current()
target_app = operation.get('expected_app_package')
if target_app and current_app.get('package') != target_app:
# 重新启动应用
self.device.app_start(target_app)
time.sleep(3)
retry_result = self.execute_single_operation(operation)
if retry_result['success']:
return {'success': True, 'method': 'restart_app_and_retry'}
elif op_type == 'swipe':
# 策略: 尝试不同的滑动参数
original_duration = operation.get('duration', 0.8)
for new_duration in [original_duration * 1.5, original_duration * 0.7]:
operation['duration'] = new_duration
retry_result = self.execute_single_operation(operation)
if retry_result['success']:
return {'success': True, 'method': f'adjust_duration_to_{new_duration}'}
return {'success': False, 'error': 'All recovery strategies failed'}
except Exception as e:
return {'success': False, 'error': f'Recovery failed: {str(e)}'}
def advanced_interaction_example():
"""高级交互示例"""
print("高级交互技术示例完成")
print("实际使用时需要连接真实设备并传入设备控制器")#4. 实战项目一:抖音自动点赞与评论爬虫
#4.1 项目架构设计
import asyncio
import aiofiles
from datetime import datetime
from typing import Dict, List, Optional
import json
import csv
class DouyinAutomationProject:
"""抖音自动化项目主类"""
def __init__(self, device_controller, data_storage):
self.device = device_controller
self.storage = data_storage
self.interaction_controller = AdvancedInteraction(device_controller)
self.project_config = {
'app_package': 'com.ss.android.ugc.aweme',
'target_hashtags': ['#热门', '#推荐'],
'daily_limit': 100,
'like_probability': 0.3, # 30%概率点赞
'comment_probability': 0.1, # 10%概率评论
'scroll_duration': 3.0, # 每次滑动等待时间
'session_duration': 1800 # 单次运行时长(秒) - 30分钟
}
self.stats = {
'videos_processed': 0,
'likes_given': 0,
'comments_made': 0,
'errors_encountered': 0,
'start_time': datetime.now()
}
def setup_douyin_environment(self) -> bool:
"""设置抖音环境"""
try:
# 检查应用是否存在
apps = self.device.app_list()
if self.project_config['app_package'] not in apps:
print(f"❌ 抖音应用未安装: {self.project_config['app_package']}")
return False
# 启动抖音
self.device.app_start(self.project_config['app_package'])
print("✅ 抖音应用启动成功")
# 等待主界面加载
time.sleep(5)
# 检查是否需要处理权限请求
self.handle_permissions_if_needed()
return True
except Exception as e:
print(f"❌ 设置抖音环境失败: {e}")
return False
def handle_permissions_if_needed(self):
"""处理权限请求"""
permission_dialogs = [
{'text': '允许', 'resourceId': 'com.android.packageinstaller:id/permission_allow_button'},
{'text': '始终允许', 'resourceId': 'com.android.packageinstaller:id/always_check'},
{'text': '同意', 'resourceId': 'com.ss.android.ugc.aweme:id/btn_agree'},
{'text': '我知道了', 'resourceId': 'com.ss.android.ugc.aweme:id/tv_know'}
]
for dialog in permission_dialogs:
element = None
if 'text' in dialog:
element = self.device(text=dialog['text'])
elif 'resourceId' in dialog:
element = self.device(resourceId=dialog['resourceId'])
if element and element.exists:
try:
element.click()
print(f"✅ 处理权限对话框: {list(dialog.values())[0]}")
time.sleep(1)
except:
pass
async def run_automation_session(self):
"""运行自动化会话"""
print("🚀 开始抖音自动化会话...")
start_time = time.time()
try:
if not self.setup_douyin_environment():
print("❌ 环境设置失败,退出")
return
while time.time() - start_time < self.project_config['session_duration']:
await self.process_single_video()
# 随机延时,模拟人类行为
delay = random.uniform(2, 6)
await asyncio.sleep(delay)
# 检查是否达到每日限制
if self.stats['videos_processed'] >= self.project_config['daily_limit']:
print(f"✅ 达到每日限制 {self.project_config['daily_limit']},停止处理")
break
except KeyboardInterrupt:
print("⏸️ 用户中断自动化")
except Exception as e:
print(f"❌ 自动化会话异常: {e}")
self.stats['errors_encountered'] += 1
finally:
await self.cleanup_session()
async def process_single_video(self):
"""处理单个视频"""
try:
# 获取当前视频信息
video_info = await self.extract_video_info()
if video_info:
self.stats['videos_processed'] += 1
print(f"🎬 处理视频 {self.stats['videos_processed']}: {video_info.get('desc', '无标题')[:30]}...")
# 决定是否点赞
if random.random() < self.project_config['like_probability']:
await self.like_current_video()
# 决定是否评论
if random.random() < self.project_config['comment_probability']:
await self.comment_current_video()
# 滑动到下一个视频
await self.scroll_to_next_video()
# 保存视频信息
await self.storage.save_video_info(video_info)
except Exception as e:
print(f"❌ 处理视频时出错: {e}")
self.stats['errors_encountered'] += 1
async def extract_video_info(self) -> Dict:
"""提取当前视频信息"""
try:
# 获取页面层次结构
hierarchy = self.device.dump_hierarchy()
# 解析视频相关信息(简化版)
video_info = {
'video_id': f"vid_{int(time.time())}_{random.randint(1000, 9999)}",
'timestamp': datetime.now().isoformat(),
'desc': self.extract_description(hierarchy),
'author': self.extract_author(hierarchy),
'like_count': self.extract_like_count(hierarchy),
'comment_count': self.extract_comment_count(hierarchy),
'share_count': self.extract_share_count(hierarchy),
'music': self.extract_music(hierarchy),
'location': self.extract_location(hierarchy),
'hashtags': self.extract_hashtags(hierarchy)
}
return video_info
except Exception as e:
print(f"❌ 提取视频信息失败: {e}")
return {}
def extract_description(self, hierarchy: str) -> str:
"""提取视频描述"""
# 在实际实现中,这里会解析UI层次结构来提取描述
# 简化实现
import re
desc_match = re.search(r'"desc":"([^"]*)"', hierarchy)
return desc_match.group(1) if desc_match else "无描述"
def extract_author(self, hierarchy: str) -> str:
"""提取作者信息"""
# 简化实现
import re
author_match = re.search(r'"author":"([^"]*)"', hierarchy)
return author_match.group(1) if author_match else "未知作者"
def extract_like_count(self, hierarchy: str) -> str:
"""提取点赞数"""
# 简化实现
return f"{random.randint(100, 10000)}"
def extract_comment_count(self, hierarchy: str) -> str:
"""提取评论数"""
# 简化实现
return f"{random.randint(10, 1000)}"
def extract_share_count(self, hierarchy: str) -> str:
"""提取分享数"""
# 简化实现
return f"{random.randint(10, 500)}"
def extract_music(self, hierarchy: str) -> str:
"""提取音乐信息"""
# 简化实现
return "未知音乐"
def extract_location(self, hierarchy: str) -> str:
"""提取位置信息"""
# 简化实现
return "未知位置"
def extract_hashtags(self, hierarchy: str) -> List[str]:
"""提取话题标签"""
# 简化实现
return ["#推荐", "#热门"]
async def like_current_video(self):
"""点赞当前视频"""
try:
# 查找点赞按钮(可能需要多种定位策略)
like_selectors = [
{'desc': '赞', 'className': 'android.widget.Button'},
{'text': '赞', 'className': 'android.widget.TextView'},
{'resourceId': 'com.ss.android.ugc.aweme:id/like_icon'},
{'resourceId': 'com.ss.android.ugc.aweme:id/aweme_like_layout'}
]
for selector in like_selectors:
element = self.device(**selector)
if element.exists:
# 获取元素中心位置并点击
bounds = element.bounds
center_x = (bounds['left'] + bounds['right']) // 2
center_y = (bounds['top'] + bounds['bottom']) // 2
self.device.click(center_x, center_y)
self.stats['likes_given'] += 1
print(f"👍 视频点赞成功 (总计: {self.stats['likes_given']})")
return True
print("❌ 未找到点赞按钮")
return False
except Exception as e:
print(f"❌ 点赞操作失败: {e}")
return False
async def comment_current_video(self):
"""评论当前视频"""
try:
# 随机选择评论内容
comments = [
"很棒的视频!",
"学到了很多,谢谢分享!",
"支持一下,加油!",
"内容很实用,收藏了",
"顶顶顶!",
"666",
"mark一下,以后看"
]
comment_text = random.choice(comments)
# 查找评论按钮
comment_selectors = [
{'desc': '评论', 'className': 'android.widget.Button'},
{'resourceId': 'com.ss.android.ugc.aweme:id/comment_icon'},
{'resourceId': 'com.ss.android.ugc.aweme:id/aweme_comment_layout'}
]
for selector in comment_selectors:
element = self.device(**selector)
if element.exists:
# 点击评论按钮
element.click()
time.sleep(1)
# 查找评论输入框
input_selectors = [
{'resourceId': 'com.ss.android.ugc.aweme:id/chat_input_view'},
{'className': 'android.widget.EditText'},
{'text': '说点什么...'}
]
for input_selector in input_selectors:
input_element = self.device(**input_selector)
if input_element.exists:
# 输入评论
input_element.set_text(comment_text)
time.sleep(0.5)
# 查找发送按钮
send_selectors = [
{'text': '发送', 'className': 'android.widget.Button'},
{'desc': '发送', 'className': 'android.widget.Button'},
{'resourceId': 'com.ss.android.ugc.aweme:id/send_btn'}
]
