Airtest框架详解

Airtest是由网易开发的自动化测试框架,基于图像识别和UI树,特别适合游戏和App自动化。

Airtest基础配置

# Airtest安装和基础使用
# pip install airtest opencv-contrib-python pocoui

from airtest.core.api import *
from airtest.cli.runner import runner
import logging
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
from poco.utils.airtest import AirtestInput, AirtestScreen
import time
import cv2
import numpy as np

class AirtestController:
    """Airtest控制器 - 用于基于图像识别的App自动化"""
    
    def __init__(self, device_id=None):
        self.device_id = device_id
        self.poco = None
        self.screen = None
        self.setup_device()
    
    def setup_device(self):
        """设置设备连接"""
        try:
            if self.device_id:
                connect_device(f"Android:///{self.device_id}")
            else:
                connect_device("Android:///")  # 连接默认设备
            
            # 初始化poco(UI树分析)
            self.poco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False)
            
            print("✅ Airtest设备连接成功")
            print(f"📱 当前设备: {device().get_serial()}")
            
        except Exception as e:
            print(f"❌ Airtest设备连接失败: {e}")
    
    def find_and_click(self, image_path, threshold=0.8):
        """查找图像并点击"""
        try:
            pos = self.find_image(image_path, threshold)
            if pos:
                touch(pos)
                print(f"✅ 点击图像: {image_path}")
                return True
            else:
                print(f"❌ 未找到图像: {image_path}")
                return False
        except Exception as e:
            print(f"❌ 图像点击失败: {e}")
            return False
    
    def find_image(self, image_path, threshold=0.8):
        """查找图像位置"""
        try:
            pos = exists(Template(image_path, threshold=threshold))
            return pos
        except Exception as e:
            print(f"❌ 图像查找失败: {e}")
            return None
    
    def swipe_direction(self, direction='down', distance=200):
        """滑动操作"""
        screen_size = device().get_current_resolution()
        center_x, center_y = screen_size[0] // 2, screen_size[1] // 2
        
        if direction.lower() == 'up':
            start = (center_x, center_y + distance)
            end = (center_x, center_y - distance)
        elif direction.lower() == 'down':
            start = (center_x, center_y - distance)
            end = (center_x, center_y + distance)
        elif direction.lower() == 'left':
            start = (center_x + distance, center_y)
            end = (center_x - distance, center_y)
        elif direction.lower() == 'right':
            start = (center_x - distance, center_y)
            end = (center_x + distance, center_y)
        else:
            return False
        
        swipe(start, end)
        print(f"✅ 滑动操作: {direction}, 距离: {distance}")
        return True
    
    def wait_for_image(self, image_path, timeout=30, threshold=0.8):
        """等待图像出现"""
        try:
            pos = wait(Template(image_path, threshold=threshold), timeout=timeout)
            print(f"✅ 图像出现: {image_path}")
            return pos
        except:
            print(f"❌ 等待图像超时: {image_path}")
            return None

# Airtest高级功能
def airtest_advanced_features():
    """Airtest高级功能演示"""
    
    # 初始化控制器
    controller = AirtestController()
    
    # 截图功能
    screen_img = snapshot()
    print(f"📸 截图保存成功: {screen_img}")
    
    # 获取设备信息
    device_info = device().get_info()
    print(f"📱 设备信息: {device_info}")
    
    # 等待应用加载
    time.sleep(2)
    
    # 使用poco获取UI树
    if controller.poco:
        # 获取当前页面的所有节点
        ui_tree = controller.poco.agent.hierarchy.dump()
        print(f"🌳 UI树节点数: {len(ui_tree)}")
        
        # 查找特定元素
        elements = controller.poco("android.widget.TextView").exists()
        print(f"🏷️  TextView元素: {elements}")

Airtest实战 - 抖音自动化脚本

from airtest.core.api import *
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
import time
import random
from airtest.aircv import imread
import cv2

class DouyinAirtestAutomation:
    """基于Airtest的抖音自动化类"""
    
    def __init__(self, device_id=None):
        self.device_id = device_id
        self.poco = None
        self.setup_environment()
    
    def setup_environment(self):
        """设置Airtest环境"""
        try:
            if self.device_id:
                connect_device(f"Android:///{self.device_id}")
            else:
                connect_device("Android:///")  # 连接默认设备
            
            # 初始化poco(UI树)
            self.poco = AndroidUiautomationPoco(
                use_airtest_input=True, 
                screenshot_each_action=False
            )
            
            print("✅ Airtest环境初始化成功")
            
        except Exception as e:
            print(f"❌ Airtest初始化失败: {e}")
    
    def launch_douyin(self):
        """启动抖音"""
        try:
            start_app("com.ss.android.ugc.aweme")
            time.sleep(5)  # 等待应用启动
            print("📱 抖音应用已启动")
            return True
        except Exception as e:
            print(f"❌ 启动抖音失败: {e}")
            return False
    
    def scroll_feed(self, count=1):
        """滚动信息流"""
        screen_size = device().get_current_resolution()
        center_x, center_y = screen_size[0] // 2, screen_size[1] // 2
        
        for i in range(count):
            # 从屏幕下方向上滑动
            start_point = [center_x, screen_size[1] * 4 // 5]
            end_point = [center_x, screen_size[1] // 5]
            
            swipe(start_point, end_point, duration=0.5)
            time.sleep(random.uniform(2, 4))  # 随机等待
            print(f"  📱 滑动第 {i+1} 次")
    
    def like_video_by_image(self):
        """通过图像识别点赞"""
        # 这里需要提前准备好点赞按钮的截图模板
        # 实际使用时,需要先截取点赞按钮的图片作为模板
        like_templates = [
            "like_button_template.png",  # 点赞按钮模板
            "heart_outline_template.png",  # 心形外框模板
        ]
        
        for template_path in like_templates:
            try:
                # 尝试查找点赞按钮
                pos = exists(Template(template_path, threshold=0.7))
                if pos:
                    touch(pos)
                    print("❤️  点赞成功")
                    time.sleep(0.5)
                    return True
            except:
                continue
        
        # 如果图像识别失败,尝试固定坐标点击(近似位置)
        screen_size = device().get_current_resolution()
        # 点赞按钮通常在屏幕右侧中间偏下位置
        like_pos = [screen_size[0] * 4 // 5, screen_size[1] // 2 + 100]
        
        # 检查当前位置是否有可点击元素
        if self.poco:
            element = self.poco.get_screen().get_position()
            touch(like_pos)
            print("❤️  点赞(坐标点击)")
            time.sleep(0.5)
            return True
        
        print("❌ 未找到点赞按钮")
        return False
    
    def comment_on_video(self, comment_text="不错👍"):
        """评论当前视频"""
        screen_size = device().get_current_resolution()
        
        # 点击评论按钮(通常在屏幕右侧中间偏下)
        comment_pos = [screen_size[0] * 4 // 5, screen_size[1] // 2 + 200]
        touch(comment_pos)
        time.sleep(2)
        
        # 点击输入框
        input_pos = [screen_size[0] // 2, screen_size[1] * 5 // 6]
        touch(input_pos)
        time.sleep(1)
        
        # 输入评论
        text(comment_text)
        time.sleep(1)
        
        # 点击发送
        send_pos = [screen_size[0] * 9 // 10, screen_size[1] * 5 // 6]
        touch(send_pos)
        time.sleep(2)
        
        print(f"💬 评论成功: {comment_text}")
        
        # 返回
        keyevent("BACK")
        time.sleep(1)
    
    def follow_author(self):
        """关注当前视频作者"""
        # 点击作者头像(通常在左下角)
        screen_size = device().get_current_resolution()
        author_pos = [screen_size[0] // 2, screen_size[1] - 100]
        
        touch(author_pos)
        time.sleep(3)
        
        # 寻找关注按钮
        if self.poco:
            follow_btn = self.poco("android.widget.Button").attr("text", "关注").exists()
            if follow_btn:
                follow_btn.click()
                print("✅ 关注成功")
                time.sleep(1)
        
        # 返回
        keyevent("BACK")
        time.sleep(1)
    
    def use_poco_find_elements(self):
        """使用Poco查找UI元素"""
        if not self.poco:
            return []
        
        elements = []
        try:
            # 查找所有文本元素
            text_elements = self.poco("android.widget.TextView")
            for element in text_elements:
                if element.attr('visible') and element.attr('text'):
                    elements.append({
                        'text': element.attr('text'),
                        'position': element.get_position(),
                        'size': element.get_size()
                    })
            
            # 查找所有按钮元素
            button_elements = self.poco("android.widget.Button")
            for element in button_elements:
                if element.attr('visible'):
                    elements.append({
                        'type': 'button',
                        'text': element.attr('text'),
                        'position': element.get_position()
                    })
            
            print(f"🔍 发现 {len(elements)} 个UI元素")
            return elements
            
        except Exception as e:
            print(f"❌ UI元素查找失败: {e}")
            return elements
    
    def analyze_current_page(self):
        """分析当前页面内容"""
        print("📊 开始分析当前页面...")
        
        # 截图分析
        current_screen = snapshot()
        print(f"📸 页面截图: {current_screen}")
        
        # UI树分析
        elements = self.use_poco_find_elements()
        
        # 识别页面类型
        page_type = "unknown"
        for element in elements:
            if element.get('text') and '抖音' in element['text']:
                page_type = "home"
                break
            elif element.get('text') and '推荐' in element['text']:
                page_type = "recommend"
                break
        
        print(f"🏠 页面类型: {page_type}")
        return page_type
    
    def run_automation_cycle(self, config):
        """运行自动化循环"""
        print("🤖 开始Airtest抖音自动化任务")
        
        # 启动抖音
        if not self.launch_douyin():
            print("❌ 无法启动抖音应用")
            return
        
        for cycle in range(config.get('cycles', 5)):
            print(f"\n🔄 第 {cycle + 1} 个循环:")
            
            # 随机执行动作
            action = random.choices(
                ['like', 'comment', 'follow', 'scroll'],
                weights=[
                    config.get('like_weight', 70),
                    config.get('comment_weight', 10),
                    config.get('follow_weight', 5),
                    config.get('scroll_weight', 15)
                ]
            )[0]
            
            if action == 'like':
                success = self.like_video_by_image()
                if success:
                    print("  ✅ 执行点赞操作")
            
            elif action == 'comment':
                if random.random() < config.get('comment_probability', 0.3):
                    comments = [
                        "不错👍", "好看!", "支持一下", "厉害了", "666",
                        "学到了", "收藏了", "转走", "顶", "马克"
                    ]
                    comment = random.choice(comments)
                    self.comment_on_video(comment)
                    print(f"  💬 执行评论操作: {comment}")
            
            elif action == 'follow':
                if random.random() < config.get('follow_probability', 0.1):
                    self.follow_author()
                    print("  👤 执行关注操作")
            
            elif action == 'scroll':
                self.scroll_feed(count=1)
                print("  📱 执行滑动操作")
            
            # 随机等待
            wait_time = random.uniform(3, 8)
            print(f"  ⏳ 等待 {wait_time:.1f} 秒")
            time.sleep(wait_time)
        
        print("🏁 Airtest自动化任务完成")
    
    def close(self):
        """关闭连接"""
        stop_app("com.ss.android.ugc.aweme")

def demo_airtest_automation():
    """演示Airtest自动化"""
    automation = DouyinAirtestAutomation()
    
    config = {
        'cycles': 5,
        'like_weight': 70,
        'comment_weight': 15,
        'follow_weight': 5,
        'scroll_weight': 10,
        'comment_probability': 0.3,
        'follow_probability': 0.1
    }
    
    try:
        automation.run_automation_cycle(config)
    except KeyboardInterrupt:
        print("\n⏸️  用户中断")
    finally:
        automation.close()

# Airtest测试报告生成
def generate_airtest_report():
    """生成Airtest测试报告"""
    import os
    from airtest.report.report import simple_report
    
    # 这是一个示例,实际使用时需要在测试执行后调用
    try:
        # 生成HTML报告
        html_report = simple_report(
            "airtest_test.ag",
            logfile="log/log.txt",
            output="report.html"
        )
        print(f"📊 Airtest报告生成成功: {html_report}")
    except Exception as e:
        print(f"❌ 报告生成失败: {e}")

if __name__ == "__main__":
    demo_airtest_automation()