#Airtest框架详解
Airtest是由网易开发的自动化测试框架,基于图像识别和UI树,特别适合游戏和App自动化。
#Airtest基础配置
# Airtest安装和基础使用
# pip install airtest opencv-contrib-python pocoui
from airtest.core.api import *
from airtest.cli.runner import runner
import logging
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
from poco.utils.airtest import AirtestInput, AirtestScreen
import time
import cv2
import numpy as np
class AirtestController:
"""Airtest控制器 - 用于基于图像识别的App自动化"""
def __init__(self, device_id=None):
self.device_id = device_id
self.poco = None
self.screen = None
self.setup_device()
def setup_device(self):
"""设置设备连接"""
try:
if self.device_id:
connect_device(f"Android:///{self.device_id}")
else:
connect_device("Android:///") # 连接默认设备
# 初始化poco(UI树分析)
self.poco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False)
print("✅ Airtest设备连接成功")
print(f"📱 当前设备: {device().get_serial()}")
except Exception as e:
print(f"❌ Airtest设备连接失败: {e}")
def find_and_click(self, image_path, threshold=0.8):
"""查找图像并点击"""
try:
pos = self.find_image(image_path, threshold)
if pos:
touch(pos)
print(f"✅ 点击图像: {image_path}")
return True
else:
print(f"❌ 未找到图像: {image_path}")
return False
except Exception as e:
print(f"❌ 图像点击失败: {e}")
return False
def find_image(self, image_path, threshold=0.8):
"""查找图像位置"""
try:
pos = exists(Template(image_path, threshold=threshold))
return pos
except Exception as e:
print(f"❌ 图像查找失败: {e}")
return None
def swipe_direction(self, direction='down', distance=200):
"""滑动操作"""
screen_size = device().get_current_resolution()
center_x, center_y = screen_size[0] // 2, screen_size[1] // 2
if direction.lower() == 'up':
start = (center_x, center_y + distance)
end = (center_x, center_y - distance)
elif direction.lower() == 'down':
start = (center_x, center_y - distance)
end = (center_x, center_y + distance)
elif direction.lower() == 'left':
start = (center_x + distance, center_y)
end = (center_x - distance, center_y)
elif direction.lower() == 'right':
start = (center_x - distance, center_y)
end = (center_x + distance, center_y)
else:
return False
swipe(start, end)
print(f"✅ 滑动操作: {direction}, 距离: {distance}")
return True
def wait_for_image(self, image_path, timeout=30, threshold=0.8):
"""等待图像出现"""
try:
pos = wait(Template(image_path, threshold=threshold), timeout=timeout)
print(f"✅ 图像出现: {image_path}")
return pos
except:
print(f"❌ 等待图像超时: {image_path}")
return None
# Airtest高级功能
def airtest_advanced_features():
"""Airtest高级功能演示"""
# 初始化控制器
controller = AirtestController()
# 截图功能
screen_img = snapshot()
print(f"📸 截图保存成功: {screen_img}")
# 获取设备信息
device_info = device().get_info()
print(f"📱 设备信息: {device_info}")
# 等待应用加载
time.sleep(2)
# 使用poco获取UI树
if controller.poco:
# 获取当前页面的所有节点
ui_tree = controller.poco.agent.hierarchy.dump()
print(f"🌳 UI树节点数: {len(ui_tree)}")
# 查找特定元素
elements = controller.poco("android.widget.TextView").exists()
print(f"🏷️ TextView元素: {elements}")#Airtest实战 - 抖音自动化脚本
from airtest.core.api import *
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
import time
import random
from airtest.aircv import imread
import cv2
class DouyinAirtestAutomation:
"""基于Airtest的抖音自动化类"""
def __init__(self, device_id=None):
self.device_id = device_id
self.poco = None
self.setup_environment()
def setup_environment(self):
"""设置Airtest环境"""
try:
if self.device_id:
connect_device(f"Android:///{self.device_id}")
else:
connect_device("Android:///") # 连接默认设备
# 初始化poco(UI树)
self.poco = AndroidUiautomationPoco(
use_airtest_input=True,
screenshot_each_action=False
)
print("✅ Airtest环境初始化成功")
except Exception as e:
print(f"❌ Airtest初始化失败: {e}")
def launch_douyin(self):
"""启动抖音"""
try:
start_app("com.ss.android.ugc.aweme")
time.sleep(5) # 等待应用启动
print("📱 抖音应用已启动")
return True
except Exception as e:
print(f"❌ 启动抖音失败: {e}")
return False
def scroll_feed(self, count=1):
"""滚动信息流"""
screen_size = device().get_current_resolution()
center_x, center_y = screen_size[0] // 2, screen_size[1] // 2
for i in range(count):
# 从屏幕下方向上滑动
start_point = [center_x, screen_size[1] * 4 // 5]
end_point = [center_x, screen_size[1] // 5]
swipe(start_point, end_point, duration=0.5)
time.sleep(random.uniform(2, 4)) # 随机等待
print(f" 📱 滑动第 {i+1} 次")
def like_video_by_image(self):
"""通过图像识别点赞"""
# 这里需要提前准备好点赞按钮的截图模板
# 实际使用时,需要先截取点赞按钮的图片作为模板
like_templates = [
"like_button_template.png", # 点赞按钮模板
"heart_outline_template.png", # 心形外框模板
]
for template_path in like_templates:
try:
# 尝试查找点赞按钮
pos = exists(Template(template_path, threshold=0.7))
if pos:
touch(pos)
print("❤️ 点赞成功")
time.sleep(0.5)
return True
except:
continue
# 如果图像识别失败,尝试固定坐标点击(近似位置)
screen_size = device().get_current_resolution()
# 点赞按钮通常在屏幕右侧中间偏下位置
like_pos = [screen_size[0] * 4 // 5, screen_size[1] // 2 + 100]
# 检查当前位置是否有可点击元素
if self.poco:
element = self.poco.get_screen().get_position()
touch(like_pos)
print("❤️ 点赞(坐标点击)")
time.sleep(0.5)
return True
print("❌ 未找到点赞按钮")
return False
def comment_on_video(self, comment_text="不错👍"):
"""评论当前视频"""
screen_size = device().get_current_resolution()
# 点击评论按钮(通常在屏幕右侧中间偏下)
comment_pos = [screen_size[0] * 4 // 5, screen_size[1] // 2 + 200]
touch(comment_pos)
time.sleep(2)
# 点击输入框
input_pos = [screen_size[0] // 2, screen_size[1] * 5 // 6]
touch(input_pos)
time.sleep(1)
# 输入评论
text(comment_text)
time.sleep(1)
# 点击发送
send_pos = [screen_size[0] * 9 // 10, screen_size[1] * 5 // 6]
touch(send_pos)
time.sleep(2)
print(f"💬 评论成功: {comment_text}")
# 返回
keyevent("BACK")
time.sleep(1)
def follow_author(self):
"""关注当前视频作者"""
# 点击作者头像(通常在左下角)
screen_size = device().get_current_resolution()
author_pos = [screen_size[0] // 2, screen_size[1] - 100]
touch(author_pos)
time.sleep(3)
# 寻找关注按钮
if self.poco:
follow_btn = self.poco("android.widget.Button").attr("text", "关注").exists()
if follow_btn:
follow_btn.click()
print("✅ 关注成功")
time.sleep(1)
# 返回
keyevent("BACK")
time.sleep(1)
def use_poco_find_elements(self):
"""使用Poco查找UI元素"""
if not self.poco:
return []
elements = []
try:
# 查找所有文本元素
text_elements = self.poco("android.widget.TextView")
for element in text_elements:
if element.attr('visible') and element.attr('text'):
elements.append({
'text': element.attr('text'),
'position': element.get_position(),
'size': element.get_size()
})
# 查找所有按钮元素
button_elements = self.poco("android.widget.Button")
for element in button_elements:
if element.attr('visible'):
elements.append({
'type': 'button',
'text': element.attr('text'),
'position': element.get_position()
})
print(f"🔍 发现 {len(elements)} 个UI元素")
return elements
except Exception as e:
print(f"❌ UI元素查找失败: {e}")
return elements
def analyze_current_page(self):
"""分析当前页面内容"""
print("📊 开始分析当前页面...")
# 截图分析
current_screen = snapshot()
print(f"📸 页面截图: {current_screen}")
# UI树分析
elements = self.use_poco_find_elements()
# 识别页面类型
page_type = "unknown"
for element in elements:
if element.get('text') and '抖音' in element['text']:
page_type = "home"
break
elif element.get('text') and '推荐' in element['text']:
page_type = "recommend"
break
print(f"🏠 页面类型: {page_type}")
return page_type
def run_automation_cycle(self, config):
"""运行自动化循环"""
print("🤖 开始Airtest抖音自动化任务")
# 启动抖音
if not self.launch_douyin():
print("❌ 无法启动抖音应用")
return
for cycle in range(config.get('cycles', 5)):
print(f"\n🔄 第 {cycle + 1} 个循环:")
# 随机执行动作
action = random.choices(
['like', 'comment', 'follow', 'scroll'],
weights=[
config.get('like_weight', 70),
config.get('comment_weight', 10),
config.get('follow_weight', 5),
config.get('scroll_weight', 15)
]
)[0]
if action == 'like':
success = self.like_video_by_image()
if success:
print(" ✅ 执行点赞操作")
elif action == 'comment':
if random.random() < config.get('comment_probability', 0.3):
comments = [
"不错👍", "好看!", "支持一下", "厉害了", "666",
"学到了", "收藏了", "转走", "顶", "马克"
]
comment = random.choice(comments)
self.comment_on_video(comment)
print(f" 💬 执行评论操作: {comment}")
elif action == 'follow':
if random.random() < config.get('follow_probability', 0.1):
self.follow_author()
print(" 👤 执行关注操作")
elif action == 'scroll':
self.scroll_feed(count=1)
print(" 📱 执行滑动操作")
# 随机等待
wait_time = random.uniform(3, 8)
print(f" ⏳ 等待 {wait_time:.1f} 秒")
time.sleep(wait_time)
print("🏁 Airtest自动化任务完成")
def close(self):
"""关闭连接"""
stop_app("com.ss.android.ugc.aweme")
def demo_airtest_automation():
"""演示Airtest自动化"""
automation = DouyinAirtestAutomation()
config = {
'cycles': 5,
'like_weight': 70,
'comment_weight': 15,
'follow_weight': 5,
'scroll_weight': 10,
'comment_probability': 0.3,
'follow_probability': 0.1
}
try:
automation.run_automation_cycle(config)
except KeyboardInterrupt:
print("\n⏸️ 用户中断")
finally:
automation.close()
# Airtest测试报告生成
def generate_airtest_report():
"""生成Airtest测试报告"""
import os
from airtest.report.report import simple_report
# 这是一个示例,实际使用时需要在测试执行后调用
try:
# 生成HTML报告
html_report = simple_report(
"airtest_test.ag",
logfile="log/log.txt",
output="report.html"
)
print(f"📊 Airtest报告生成成功: {html_report}")
except Exception as e:
print(f"❌ 报告生成失败: {e}")
if __name__ == "__main__":
demo_airtest_automation()
