uiautomator2详解

uiautomator2是专为Android自动化设计的Python库,轻量级且功能强大。

uiautomator2基础配置

# uiautomator2安装和配置
# pip install uiautomator2

import uiautomator2 as u2
import time
import cv2
import numpy as np
from typing import Optional, Dict, List, Tuple
import os

class Uiautomator2Controller:
    """uiautomator2控制器 - 轻量级Android自动化工具"""
    
    def __init__(self, device_id: Optional[str] = None):
        self.device_id = device_id
        self.d = None
        self.setup_device()
    
    def setup_device(self):
        """设置设备连接"""
        try:
            if self.device_id:
                self.d = u2.connect(self.device_id)
            else:
                self.d = u2.connect()  # 连接默认设备
            
            # 检查连接状态
            if self.d.info:
                print(f"✅ uiautomator2连接成功: {self.d.serial}")
                print(f"📱 设备信息: {self.d.info}")
            else:
                print("❌ 设备连接失败")
                
        except Exception as e:
            print(f"❌ uiautomator2连接失败: {e}")
    
    def click_element(self, selector: str, by: str = 'text'):
        """点击元素"""
        try:
            if by == 'text':
                element = self.d(text=selector)
            elif by == 'resource_id':
                element = self.d(resourceId=selector)
            elif by == 'class_name':
                element = self.d(className=selector)
            elif by == 'xpath':
                element = self.d.xpath(selector)
            else:
                element = self.d(description=selector)
            
            if element.exists:
                element.click()
                print(f"✅ 点击元素: {selector}")
                return True
            else:
                print(f"❌ 元素不存在: {selector}")
                return False
        except Exception as e:
            print(f"❌ 点击元素失败: {e}")
            return False
    
    def swipe_up(self, duration: float = 0.5):
        """向上滑动"""
        w, h = self.d.window_size()
        self.d.swipe(w//2, h*4//5, w//2, h//5, duration)
        print(f"✅ 向上滑动")
    
    def swipe_down(self, duration: float = 0.5):
        """向下滑动"""
        w, h = self.d.window_size()
        self.d.swipe(w//2, h//5, w//2, h*4//5, duration)
        print(f"✅ 向下滑动")
    
    def swipe_left(self, duration: float = 0.5):
        """向左滑动"""
        w, h = self.d.window_size()
        self.d.swipe(w*4//5, h//2, w//5, h//2, duration)
        print(f"✅ 向左滑动")
    
    def swipe_right(self, duration: float = 0.5):
        """向右滑动"""
        w, h = self.d.window_size()
        self.d.swipe(w//5, h//2, w*4//5, h//2, duration)
        print(f"✅ 向右滑动")
    
    def find_elements(self, selector: str, by: str = 'text') -> List:
        """查找多个元素"""
        try:
            if by == 'text':
                elements = self.d(text=selector).all()
            elif by == 'resource_id':
                elements = self.d(resourceId=selector).all()
            elif by == 'class_name':
                elements = self.d(className=selector).all()
            elif by == 'xpath':
                elements = self.d.xpath(selector).all()
            else:
                elements = self.d(description=selector).all()
            
            print(f"✅ 找到 {len(elements)} 个元素: {selector}")
            return elements
        except Exception as e:
            print(f"❌ 查找元素失败: {e}")
            return []
    
    def get_page_source(self) -> str:
        """获取页面源码"""
        try:
            source = self.d.dump_hierarchy()
            print(f"📄 页面源码长度: {len(source)}")
            return source
        except Exception as e:
            print(f"❌ 获取页面源码失败: {e}")
            return ""
    
    def screenshot(self, filename: str = None) -> str:
        """截图"""
        if not filename:
            filename = f"screenshot_{int(time.time())}.png"
        
        try:
            self.d.screenshot(filename)
            print(f"📸 截图保存: {filename}")
            return filename
        except Exception as e:
            print(f"❌ 截图失败: {e}")
            return ""
    
    def press_key(self, key: str):
        """按键操作"""
        try:
            self.d.press(key)
            print(f"⌨️  按键: {key}")
        except Exception as e:
            print(f"❌ 按键失败: {e}")
    
    def send_text(self, text: str):
        """输入文本"""
        try:
            self.d.send_keys(text)
            print(f"📝 输入文本: {text}")
        except Exception as e:
            print(f"❌ 输入文本失败: {e}")

# uiautomator2高级功能
def u2_advanced_features():
    """uiautomator2高级功能演示"""
    controller = Uiautomator2Controller()
    
    if not controller.d:
        print("❌ 设备未连接,无法演示高级功能")
        return
    
    # 获取屏幕信息
    width, height = controller.d.window_size()
    print(f"📏 屏幕尺寸: {width} x {height}")
    
    # 获取当前应用信息
    current_app = controller.d.app_current()
    print(f"📱 当前应用: {current_app}")
    
    # 获取电池信息
    battery_info = controller.d.battery()
    print(f"🔋 电池信息: {battery_info}")
    
    # 获取网络状态
    network_info = controller.d.network_stats()
    print(f"🌐 网络状态: {network_info}")
    
    # 截图并保存
    screenshot_file = controller.screenshot()
    
    # 获取通知栏
    notifications = controller.d.open_notification()
    time.sleep(2)
    controller.d.press("back")  # 返回
    print("🔔 尝试打开通知栏")

uiautomator2实战 - 抖音自动化脚本

import uiautomator2 as u2
import time
import random
from typing import Optional
import cv2
import numpy as np

class DouyinU2Automation:
    """基于uiautomator2的抖音自动化类"""
    
    def __init__(self, device_id: Optional[str] = None):
        self.device_id = device_id
        self.d = None
        self.setup_device()
    
    def setup_device(self):
        """设置设备连接"""
        try:
            if self.device_id:
                self.d = u2.connect(self.device_id)
            else:
                self.d = u2.connect()
            
            if self.d:
                print(f"✅ uiautomator2连接成功: {self.d.serial}")
            else:
                print("❌ 设备连接失败")
                
        except Exception as e:
            print(f"❌ 连接失败: {e}")
    
    def launch_douyin(self):
        """启动抖音"""
        try:
            self.d.app_start("com.ss.android.ugc.aweme")
            time.sleep(5)  # 等待应用启动
            print("📱 抖音应用已启动")
            return True
        except Exception as e:
            print(f"❌ 启动抖音失败: {e}")
            return False
    
    def scroll_feed(self, count: int = 1):
        """滚动信息流"""
        for i in range(count):
            # 向上滑动切换视频
            w, h = self.d.window_size()
            self.d.swipe(w//2, h*4//5, w//2, h//5, 0.8)
            time.sleep(random.uniform(2, 4))  # 随机等待
            print(f"  📱 滑动第 {i+1} 次")
    
    def like_current_video(self):
        """点赞当前视频"""
        try:
            # 点赞按钮通常在屏幕右侧中间偏下位置
            w, h = self.d.window_size()
            x, y = w*4//5, h//2 + 100
            
            # 点击点赞按钮
            self.d.click(x, y)
            time.sleep(0.5)
            
            print("❤️  点赞成功")
            return True
        except Exception as e:
            print(f"❌ 点赞失败: {e}")
            return False
    
    def comment_on_video(self, comment_text: str = "不错👍"):
        """评论当前视频"""
        try:
            # 点击评论按钮(屏幕右侧中间偏下)
            w, h = self.d.window_size()
            comment_x, comment_y = w*4//5, h//2 + 200
            
            # 点击评论按钮
            self.d.click(comment_x, comment_y)
            time.sleep(2)
            
            # 点击输入框
            input_x, input_y = w//2, h*5//6
            self.d.click(input_x, input_y)
            time.sleep(1)
            
            # 输入评论
            self.d.send_keys(comment_text)
            time.sleep(1)
            
            # 点击发送按钮
            send_x, send_y = w*9//10, h*5//6
            self.d.click(send_x, send_y)
            time.sleep(2)
            
            print(f"💬 评论成功: {comment_text}")
            
            # 返回
            self.d.press("back")
            return True
        except Exception as e:
            print(f"❌ 评论失败: {e}")
            # 确保返回
            self.d.press("back")
            return False
    
    def follow_current_author(self):
        """关注当前视频作者"""
        try:
            # 点击作者头像(通常在视频左侧下方)
            w, h = self.d.window_size()
            author_x, author_y = w//2, h - 100
            
            # 点击作者区域
            self.d.click(author_x, author_y)
            time.sleep(3)
            
            # 查找关注按钮
            follow_btn = self.d(text="关注").exists(timeout=3)
            if follow_btn:
                self.d(text="关注").click()
                print("✅ 关注成功")
                time.sleep(1)
            else:
                print("❌ 未找到关注按钮")
            
            # 返回
            self.d.press("back")
            return True
        except Exception as e:
            print(f"❌ 关注失败: {e}")
            # 确保返回
            self.d.press("back")
            return False
    
    def share_video(self):
        """分享当前视频"""
        try:
            # 点击分享按钮(屏幕右侧最下方)
            w, h = self.d.window_size()
            share_x, share_y = w*4//5, h - 100
            
            self.d.click(share_x, share_y)
            time.sleep(2)
            
            # 点击复制链接
            copy_link = self.d(text="复制链接").exists(timeout=2)
            if copy_link:
                self.d(text="复制链接").click()
                print("🔗 链接已复制")
                time.sleep(1)
            
            # 返回
            self.d.press("back")
            return True
        except Exception as e:
            print(f"❌ 分享失败: {e}")
            self.d.press("back")
            return False
    
    def search_function(self, keyword: str):
        """搜索功能"""
        try:
            # 点击搜索图标
            search_icon = self.d(resourceId="com.ss.android.ugc.aweme:id/search").exists(timeout=3)
            if search_icon:
                self.d(resourceId="com.ss.android.ugc.aweme:id/search").click()
            else:
                # 尝试通过描述查找
                search_desc = self.d(description="搜索").exists(timeout=3)
                if search_desc:
                    self.d(description="搜索").click()
                else:
                    print("❌ 未找到搜索入口")
                    return False
            
            time.sleep(2)
            
            # 输入搜索关键词
            search_input = self.d(resourceId="com.ss.android.ugc.aweme:id/search_input").exists(timeout=3)
            if search_input:
                self.d(resourceId="com.ss.android.ugc.aweme:id/search_input").set_text(keyword)
            else:
                # 尝试通用搜索输入框
                self.d.send_keys(keyword)
            
            time.sleep(1)
            
            # 点击搜索按钮
            search_btn = self.d(text="搜索").exists(timeout=2)
            if search_btn:
                self.d(text="搜索").click()
            
            time.sleep(3)
            print(f"🔍 搜索完成: {keyword}")
            return True
        except Exception as e:
            print(f"❌ 搜索失败: {e}")
            return False
    
    def get_current_video_info(self) -> dict:
        """获取当前视频信息"""
        try:
            # 通过页面源码分析获取视频信息
            hierarchy = self.d.dump_hierarchy()
            
            # 简单的解析,实际项目中需要更复杂的解析逻辑
            video_info = {
                'timestamp': time.time(),
                'screen_size': self.d.window_size(),
                'current_activity': self.d.app_current().get('activity', ''),
                'app_package': self.d.app_current().get('package', '')
            }
            
            print(f"📺 当前视频信息: {video_info}")
            return video_info
        except Exception as e:
            print(f"❌ 获取视频信息失败: {e}")
            return {}
    
    def run_automation_cycle(self, config: dict):
        """运行自动化循环"""
        print("🤖 开始uiautomator2抖音自动化任务")
        
        # 启动抖音
        if not self.launch_douyin():
            print("❌ 无法启动抖音应用")
            return
        
        for cycle in range(config.get('cycles', 5)):
            print(f"\n🔄 第 {cycle + 1} 个循环:")
            
            # 随机执行动作
            actions = []
            
            # 按权重随机选择动作
            action_weights = [
                ('like', config.get('like_weight', 70)),
                ('comment', config.get('comment_weight', 10)), 
                ('follow', config.get('follow_weight', 5)),
                ('share', config.get('share_weight', 5)),
                ('scroll', config.get('scroll_weight', 10))
            ]
            
            # 根据权重选择动作
            total_weight = sum(weight for _, weight in action_weights)
            rand_num = random.randint(1, total_weight)
            
            current_weight = 0
            selected_action = None
            for action, weight in action_weights:
                current_weight += weight
                if rand_num <= current_weight:
                    selected_action = action
                    break
            
            # 执行选中的动作
            if selected_action == 'like':
                success = self.like_current_video()
                if success:
                    print("  ✅ 执行点赞操作")
            
            elif selected_action == 'comment':
                if random.random() < config.get('comment_probability', 0.3):
                    comments = [
                        "不错👍", "好看!", "支持一下", "厉害了", "666",
                        "学到了", "收藏了", "转走", "顶", "马克",
                        "优秀!", "棒棒哒", "安排!", "稳"
                    ]
                    comment = random.choice(comments)
                    self.comment_on_video(comment)
                    print(f"  💬 执行评论操作: {comment}")
            
            elif selected_action == 'follow':
                if random.random() < config.get('follow_probability', 0.1):
                    self.follow_current_author()
                    print("  👤 执行关注操作")
            
            elif selected_action == 'share':
                if random.random() < config.get('share_probability', 0.05):
                    self.share_video()
                    print("  🔗 执行分享操作")
            
            elif selected_action == 'scroll':
                self.scroll_feed(count=1)
                print("  📱 执行滑动操作")
            
            # 获取当前视频信息
            self.get_current_video_info()
            
            # 随机等待
            wait_time = random.uniform(3, 8)
            print(f"  ⏳ 等待 {wait_time:.1f} 秒")
            time.sleep(wait_time)
        
        print("🏁 uiautomator2自动化任务完成")
    
    def close(self):
        """关闭连接"""
        if self.d:
            self.d.app_stop("com.ss.android.ugc.aweme")
            print("📱 抖音应用已关闭")

def demo_u2_automation():
    """演示uiautomator2自动化"""
    automation = DouyinU2Automation()
    
    config = {
        'cycles': 5,
        'like_weight': 70,
        'comment_weight': 15,
        'follow_weight': 5,
        'share_weight': 5,
        'scroll_weight': 5,
        'comment_probability': 0.3,
        'follow_probability': 0.1,
        'share_probability': 0.05
    }
    
    try:
        automation.run_automation_cycle(config)
    except KeyboardInterrupt:
        print("\n⏸️  用户中断")
    finally:
        automation.close()

# uiautomator2工具函数
def u2_utils():
    """uiautomator2实用工具函数"""
    
    # 设备列表
    devices = u2.list_device()
    print(f"🔌 连接的设备: {devices}")
    
    # 获取设备信息
    if devices:
        d = u2.connect(devices[0]['serial'])
        print(f"📱 设备型号: {d.info.get('productName')}")
        print(f"🔧 Android版本: {d.info.get('display')['density']}")
        print(f"📊 屏幕分辨率: {d.window_size()}")

if __name__ == "__main__":
    demo_u2_automation()