base_control.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. # 定义抽象类
  2. import json
  3. import logging
  4. import tempfile
  5. from abc import ABC, abstractmethod
  6. from typing import List
  7. import tkinter as tk
  8. from tkinter import ttk
  9. import uiautomator2 as u2
  10. from uiautomator2.exceptions import LaunchUiAutomationError
  11. class BaseControl(ABC):
  12. # ctx_dict = {
  13. # 'connect': 1,
  14. # 'platform': 2,
  15. # }
  16. connect_dict = {
  17. }
  18. app_pkg = {
  19. 'com.niocpeed.dna': 'DeepCoin',
  20. }
  21. def __init__(self, name: str, ctx: int = 2):
  22. # 名称
  23. self.name = name
  24. self.ctx = ctx
  25. pass
  26. def init_adb(self, commands: List[str], *args, **kwargs):
  27. """
  28. 连接设备
  29. 使用 python shell执行命令
  30. """
  31. pass
  32. def print_log(self, msg):
  33. logging.info(f'打印信息 {msg}')
  34. def connect_adb(self, serial: str):
  35. """
  36. 连接设备
  37. """
  38. logging.info(f'连接设[备{self.name}]:{serial}', )
  39. try:
  40. d = u2.connect(serial)
  41. # self.enable_click_monitor()
  42. d.shell('settings put system pointer_location 1')
  43. # d.debug = True
  44. return d
  45. except LaunchUiAutomationError as e:
  46. print(f"uiautomator2 连接失败: {e}")
  47. def re_connect(self):
  48. """
  49. 重新建立连接
  50. :return:
  51. """
  52. pass
  53. # def enable_click_monitor(self):
  54. # """
  55. # 启用点击监控,在屏幕上显示点击位置
  56. # """
  57. # # self.d.settings['operation_delay'] = (0.5, 0.5) # 增加操作延迟以便观察
  58. # # self.d.debug = True
  59. # self.d.toast.show('点击监控已启用') # 显示提示
  60. #
  61. # # 确保有悬浮窗权限
  62. # # self.d.set_fastinput_ime(True) # 启用ime
  63. # # self.d.show_float_window(True) # 显示悬浮窗
  64. #
  65. # # 可选:打开开发者选项中的"指针位置"
  66. # self.d.shell('settings put system pointer_location 1')
  67. def loadding(self, top, func):
  68. """
  69. 等待加载完成
  70. :return:
  71. """
  72. # 创建加载指示器
  73. loading_window = tk.Toplevel(top)
  74. loading_window.title("连接中")
  75. loading_window.geometry("200x100")
  76. loading_window.transient(top) # 设置为模态
  77. loading_window.grab_set() # 设置为模态
  78. loading_window.overrideredirect(True) # 移除窗口边框和按钮
  79. # 相对于父窗口居中显示
  80. window_width = loading_window.winfo_reqwidth()
  81. window_height = loading_window.winfo_reqheight()
  82. parent_x = top.winfo_rootx()
  83. parent_y = top.winfo_rooty()
  84. parent_width = top.winfo_width()
  85. parent_height = top.winfo_height()
  86. position_right = parent_x + (parent_width - window_width) // 2
  87. position_down = parent_y + (parent_height - window_height) // 2
  88. loading_window.geometry(f"+{position_right}+{position_down}")
  89. loading_label = ttk.Label(loading_window, text="连接中...", padding=20)
  90. loading_label.pack(expand=True)
  91. def thread_task():
  92. try:
  93. func()
  94. finally:
  95. # 在主线程中销毁加载指示器
  96. top.after(0, lambda: [loading_window.grab_release(), loading_window.destroy()])
  97. import threading
  98. thread = threading.Thread(target=thread_task)
  99. # thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束
  100. thread.start()
  101. pass
  102. class AbsControl(BaseControl):
  103. def __init__(self, name: str, ctx: int = 2, *args, **kwargs):
  104. """
  105. # :param : 设备序列号。 例如 127.0.0.1:6555
  106. # 可以通过 `adb devices` 获取
  107. """
  108. # u2.logger.setLevel(logging.DEBUG)
  109. super().__init__(name)
  110. self.ctx = ctx
  111. self.info = {}
  112. #
  113. self._d = None
  114. # 屏幕高度
  115. self.height = 0
  116. # 屏幕宽度
  117. self.width = 0
  118. self._points = {}
  119. self.point_path = 'point.json'
  120. self._func = []
  121. self._log_func = None
  122. def d(self, serial: str = ''):
  123. """
  124. 连接设备
  125. """
  126. if self._d is not None:
  127. return self._d
  128. if serial in self.connect_dict:
  129. self.info = self.connect_dict[serial]
  130. self._d = self.info['d']
  131. else:
  132. self._d = u2.connect(serial)
  133. return self._d
  134. def re_connect(self):
  135. pass
  136. # @abstractmethod
  137. # def get_app_package_name(self):
  138. # return ""
  139. def prevent_sleep(self):
  140. """
  141. 防止设备休眠
  142. """
  143. self.d().wake()
  144. self.d().screen_on()
  145. self.d().unlock()
  146. def screenshot(self):
  147. """
  148. 截图
  149. """
  150. # 获取系统/tmp路径
  151. tmp = tempfile.gettempdir()
  152. return self.d().screenshot(f"{tmp}/{self.name}.png")
  153. def to_top_swipe(self, sleep=0.1, times=2):
  154. """
  155. 滑动到屏幕最顶部,通过多次滑动确保到达顶部
  156. :param sleep: 滑动后等待时间
  157. :param times: 滑动次数
  158. :return:
  159. """
  160. width, height = self.get_screen_size()
  161. # 循环滑动直到无法继续滑动
  162. for _ in range(times):
  163. self.d().swipe(width // 2, height * 0.8, width // 2, height) # 向上滑动
  164. # 短暂等待确保滑动完成
  165. self.d().sleep(sleep)
  166. def click_point(self, x: int, y: int):
  167. """
  168. 点击指定坐标
  169. :param x: x坐标
  170. :param y: y坐标
  171. """
  172. self.d().click(x, y)
  173. def click_xpath(self, xpath: str):
  174. """
  175. 点击指定xpath
  176. """
  177. el = self.d().xpath(xpath).get()
  178. if el:
  179. el.click()
  180. else:
  181. logging.warning(f"未找到元素: {xpath}")
  182. def to_next_swipe(self, sleep=0.1):
  183. """
  184. 向上滑动一屏
  185. """
  186. width, height = self.get_screen_size()
  187. # 从屏幕下方向上滑动到顶部
  188. self.d().swipe(width // 2, height * 0.8, width // 2, height * 0.2) # 向上滑动
  189. # 短暂等待确保滑动完成
  190. self.d().sleep(sleep)
  191. def get_screen_size(self):
  192. """
  193. 获取屏幕尺寸,宽度和高度
  194. """
  195. self.width, self.height = self.d().window_size()
  196. logging.info(f"屏幕尺寸: {self.width}x{self.height}")
  197. return self.width, self.height
  198. def drag_slider_ext(self, start_x: int, start_y: int, end_x: int, end_y: int, steps: int = 50):
  199. """
  200. 精确拖动滑块
  201. :param start_x: 起始x坐标
  202. :param start_y: 起始y坐标
  203. :param end_x: 结束x坐标
  204. :param end_y: 结束y坐标
  205. :param steps: 步数,值越大滑动越平滑
  206. """
  207. self.d().swipe_ext(start_x, start_y, end_x, end_y, steps)
  208. def add_point(self, point=None):
  209. """
  210. 添加坐标点
  211. point = {
  212. "name": "",
  213. "x": 0,
  214. "y": 0,
  215. "desc": "",
  216. "xpath": ""
  217. }
  218. """
  219. logging.info("保存坐标点: %s", point)
  220. self._points[point['name']] = point
  221. def print_log(self, msg):
  222. """
  223. 打印日志
  224. """
  225. logging.info(f'>>111>{msg}')
  226. print(f'>222>>{msg},{self._log_func}')
  227. if self._log_func:
  228. self._log_func(msg)
  229. def add_func(self, func):
  230. """
  231. 添加坐标点采集函数
  232. """
  233. self._func.append(func)
  234. def set_log_func(self, func):
  235. """
  236. 添加日志函数
  237. """
  238. self._log_func = func
  239. def save_point(self):
  240. """
  241. 保存坐标
  242. """
  243. self.to_top_swipe()
  244. self.d().sleep(2)
  245. for func in self._func:
  246. func()
  247. __tmp__ = {}
  248. for k, v in self._points.items():
  249. __tmp__[k] = v
  250. with open(self.point_path, "w", encoding="utf-8") as f:
  251. json.dump(__tmp__, f, ensure_ascii=False, indent=4)
  252. def get_point(self, info: dict):
  253. """
  254. 获取坐标点
  255. :param info: 数据结构
  256. {
  257. "name": "btn_mairu_kaiduo",
  258. "desc": "买入/开多",
  259. "xpath": '//*[@content-desc="买入/开多"]'
  260. }
  261. :return: x, y, el
  262. """
  263. x, y, el = self.get_point_by_xpath(info['xpath'])
  264. self.add_point({
  265. "name": info['name'],
  266. "x": x,
  267. "y": y,
  268. "desc": info['desc'],
  269. "xpath": info['xpath'],
  270. })
  271. return x, y, el
  272. def get_point_by_xpath(self, xpath: str):
  273. """
  274. :param xpath: xpath
  275. 获取坐标第一个匹配的点
  276. info 数据结构
  277. :return: x, y, el
  278. """
  279. el = self.d().xpath(xpath).get()
  280. x, y = el.center()
  281. return x, y, el
  282. def get_points_by_xpath(self, xpath: str):
  283. """
  284. 获取所有坐标点
  285. info 数据结构
  286. """
  287. els = self.d().xpath(xpath).all()
  288. items = []
  289. for el in els:
  290. x, y = el.center()
  291. items.append((x, y, el))
  292. return items
  293. def check_element_exists(self, xpath: str, timeout: float = 0.05) -> bool:
  294. """
  295. 判断指定XPath的元素是否存在
  296. :param xpath: 元素的XPath
  297. :param timeout: 超时时间(秒)
  298. :return: 是否存在
  299. """
  300. self.d().sleep(timeout)
  301. return self.d().xpath(xpath).exists
  302. def input_xpath(self, xpath: str, text: str, clear: bool = True):
  303. """
  304. 给指定xpath的输入框输入文本
  305. :param xpath: 输入框的xpath
  306. :param text: 要输入的文本
  307. :param clear: 输入前是否清空原有内容
  308. """
  309. element = self.d().xpath(xpath).get()
  310. if element:
  311. if clear:
  312. element.text.set_text("") # 清空原有内容
  313. element.set_text(text) # 输入新内容
  314. else:
  315. logging.warning(f"未找到输入框: {xpath}")
  316. def input_by_position(self, x: int, y: int, text: str, clear: bool = True):
  317. """
  318. 通过坐标点击并在输入框中输入文本
  319. :param x: 输入框x坐标
  320. :param y: 输入框y坐标
  321. :param text: 要输入的文本
  322. :param clear: 是否清空原有内容
  323. """
  324. self.click_point(x, y) # 先点击获取焦点
  325. if clear:
  326. self.d().clear_text() # 清空原有内容
  327. self.d().send_keys(text) # 输入新文本
  328. @abstractmethod
  329. def event_f1(self):
  330. """
  331. F1 开仓界面,仓位滑竿百分比(30-60)
  332. """
  333. pass
  334. @abstractmethod
  335. def event_f2(self):
  336. """
  337. F2 确认开仓,开多 限价职中间数值
  338. 检查撤销限价委托单
  339. """
  340. pass
  341. @abstractmethod
  342. def event_f3(self):
  343. """
  344. F3 平仓界面 平空,仓位滑竿百分比(90-100)确认开仓,限价职中间数值
  345. """
  346. pass
  347. @abstractmethod
  348. def event_f4(self):
  349. """
  350. F4 确认平仓 平多 限价取中间数值 检查撤销限价委托单
  351. """
  352. pass
  353. @abstractmethod
  354. def event_f5(self):
  355. """
  356. F5 开仓界面 仓位滑竿百分比(30-60)
  357. """
  358. pass
  359. @abstractmethod
  360. def event_f6(self):
  361. """
  362. F6 确认开仓, 开空 限价值中间数值
  363. 检查撤销限价委托单
  364. """
  365. pass
  366. @abstractmethod
  367. def event_f7(self):
  368. """
  369. F7 平仓界面二仓位滑竿百分比(90-100)
  370. """
  371. pass
  372. @abstractmethod
  373. def event_f8(self):
  374. """
  375. F8 确认平仓, 平空 限价职中间数值
  376. 检查撤销限价委托单
  377. """
  378. pass