# 定义抽象类 import json import logging import tempfile from abc import ABC, abstractmethod import uiautomator2 as u2 class AbsControl(ABC): def __init__(self, name: str): """ # :param : 设备序列号。 例如 127.0.0.1:6555 # 可以通过 `adb devices` 获取 """ # u2.logger.setLevel(logging.DEBUG) # 名称 self.name = name self.d = None # 屏幕高度 self.height = 0 # 屏幕宽度 self.width = 0 self._points = {} self.point_path = 'point.json' self._func = [] self._log_func = None def connect_adb(self, serial: str): """ 连接设备 """ self.d = u2.connect(serial) self.enable_click_monitor() # 如果连接成功,返回截图,否则返回None def screenshot(self): """ 截图 """ # 获取系统/tmp路径 tmp = tempfile.gettempdir() return self.d.screenshot(f"{tmp}/{self.name}.png") def enable_click_monitor(self): """ 启用点击监控,在屏幕上显示点击位置 """ # self.d.settings['operation_delay'] = (0.5, 0.5) # 增加操作延迟以便观察 self.d.debug = True self.d.toast.show('点击监控已启用') # 显示提示 # 确保有悬浮窗权限 self.d.set_fastinput_ime(True) # 启用ime self.d.show_float_window(True) # 显示悬浮窗 # 可选:打开开发者选项中的"指针位置" self.d.shell('settings put system pointer_location 1') def to_top_swipe(self, sleep=0.1, times=2): """ 滑动到屏幕最顶部,通过多次滑动确保到达顶部 :param sleep: 滑动后等待时间 :param times: 滑动次数 :return: """ width, height = self.get_screen_size() # 循环滑动直到无法继续滑动 for _ in range(times): self.d.swipe(width // 2, height * 0.8, width // 2, height) # 向上滑动 # 短暂等待确保滑动完成 self.d.sleep(sleep) def click_point(self, x: int, y: int): """ 点击指定坐标 :param x: x坐标 :param y: y坐标 """ self.d.click(x, y) def click_xpath(self, xpath: str): """ 点击指定xpath """ el = self.d.xpath(xpath).get() if el: el.click() else: logging.warning(f"未找到元素: {xpath}") def to_next_swipe(self, sleep=0.1): """ 向上滑动一屏 """ width, height = self.get_screen_size() # 从屏幕下方向上滑动到顶部 self.d.swipe(width // 2, height * 0.8, width // 2, height * 0.2) # 向上滑动 # 短暂等待确保滑动完成 self.d.sleep(sleep) def get_screen_size(self): """ 获取屏幕尺寸,宽度和高度 """ self.width, self.height = self.d.window_size() logging.info(f"屏幕尺寸: {self.width}x{self.height}") return self.width, self.height def drag_slider_ext(self, start_x: int, start_y: int, end_x: int, end_y: int, steps: int = 50): """ 精确拖动滑块 :param start_x: 起始x坐标 :param start_y: 起始y坐标 :param end_x: 结束x坐标 :param end_y: 结束y坐标 :param steps: 步数,值越大滑动越平滑 """ self.d.swipe_ext(start_x, start_y, end_x, end_y, steps) def add_point(self, point=None): """ 添加坐标点 point = { "name": "", "x": 0, "y": 0, "desc": "", "xpath": "" } """ logging.info("保存坐标点: %s", point) self._points[point['name']] = point def print_log(self, msg): """ 打印日志 """ logging.info(f'>>111>{msg}') print(f'>222>>{msg},{self._log_func}') if self._log_func: self._log_func(msg) def add_func(self, func): """ 添加坐标点采集函数 """ self._func.append(func) def set_log_func(self, func): """ 添加日志函数 """ self._log_func = func def save_point(self): """ 保存坐标 """ self.to_top_swipe() self.d.sleep(2) for func in self._func: func() __tmp__ = {} for k, v in self._points.items(): __tmp__[k] = v with open(self.point_path, "w", encoding="utf-8") as f: json.dump(__tmp__, f, ensure_ascii=False, indent=4) def get_point(self, info: dict): """ 获取坐标点 :param info: 数据结构 { "name": "btn_mairu_kaiduo", "desc": "买入/开多", "xpath": '//*[@content-desc="买入/开多"]' } :return: x, y, el """ x, y, el = self.get_point_by_xpath(info['xpath']) self.add_point({ "name": info['name'], "x": x, "y": y, "desc": info['desc'], "xpath": info['xpath'], }) return x, y, el def get_point_by_xpath(self, xpath: str): """ :param xpath: xpath 获取坐标第一个匹配的点 info 数据结构 :return: x, y, el """ el = self.d.xpath(xpath).get() x, y = el.center() return x, y, el def get_points_by_xpath(self, xpath: str): """ 获取所有坐标点 info 数据结构 """ els = self.d.xpath(xpath).all() items = [] for el in els: x, y = el.center() items.append((x, y, el)) return items def check_element_exists(self, xpath: str, timeout: float = 0.05) -> bool: """ 判断指定XPath的元素是否存在 :param xpath: 元素的XPath :param timeout: 超时时间(秒) :return: 是否存在 """ self.d.sleep(timeout) return self.d.xpath(xpath).exists def input_xpath(self, xpath: str, text: str, clear: bool = True): """ 给指定xpath的输入框输入文本 :param xpath: 输入框的xpath :param text: 要输入的文本 :param clear: 输入前是否清空原有内容 """ element = self.d.xpath(xpath).get() if element: if clear: element.text.set_text("") # 清空原有内容 element.set_text(text) # 输入新内容 else: logging.warning(f"未找到输入框: {xpath}") def input_by_position(self, x: int, y: int, text: str, clear: bool = True): """ 通过坐标点击并在输入框中输入文本 :param x: 输入框x坐标 :param y: 输入框y坐标 :param text: 要输入的文本 :param clear: 是否清空原有内容 """ self.click_point(x, y) # 先点击获取焦点 if clear: self.d.clear_text() # 清空原有内容 self.d.send_keys(text) # 输入新文本 @abstractmethod def event_f1(self): """ F1 开仓界面,仓位滑竿百分比(30-60) """ pass @abstractmethod def event_f2(self): """ F2 确认开仓,开多 限价职中间数值 检查撤销限价委托单 """ pass @abstractmethod def event_f3(self): """ F3 平仓界面 平空,仓位滑竿百分比(90-100)确认开仓,限价职中间数值 """ pass @abstractmethod def event_f4(self): """ F4 确认平仓 平多 限价取中间数值 检查撤销限价委托单 """ pass @abstractmethod def event_f5(self): """ F5 开仓界面 仓位滑竿百分比(30-60) """ pass @abstractmethod def event_f6(self): """ F6 确认开仓, 开空 限价值中间数值 检查撤销限价委托单 """ pass @abstractmethod def event_f7(self): """ F7 平仓界面二仓位滑竿百分比(90-100) """ pass @abstractmethod def event_f8(self): """ F8 确认平仓, 平空 限价职中间数值 检查撤销限价委托单 """ pass