| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import logging
- import subprocess
- from PIL import Image
- import os
- import time
- import cv2
- import numpy as np
- import uiautomator2 as u2
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- class ScreenshotStitcher:
- def __init__(self, device_serial, adb_path):
- self.device_serial = device_serial
- self.adb = adb_path
- self.d = u2.connect(device_serial)
- self.screen_width = 0
- self.screen_height = 0
- def check_adb_connection(self):
- try:
- output = subprocess.check_output([self.adb, '-s', self.device_serial, 'devices']).decode('utf-8')
- if self.device_serial in output and 'device' in output:
- logging.info(f"ADB 设备 {self.device_serial} 连接正常")
- return True
- logging.error(f"ADB 设备 {self.device_serial} 连接异常")
- return False
- except FileNotFoundError:
- logging.error("ADB 未找到,请确保已安装并配置 ADB")
- return False
- def adb_shell(self, command):
- try:
- output = subprocess.check_output(
- [self.adb, '-s', self.device_serial, 'shell', command],
- stderr=subprocess.STDOUT
- ).decode('utf-8')
- return output
- except subprocess.CalledProcessError as e:
- logging.error(f"ADB 命令执行失败:{e.output.decode('utf-8')}")
- raise
- def get_screen_size(self):
- output = self.adb_shell('wm size')
- if "Physical size: " in output:
- size = output.split('Physical size: ')[1].strip().split('x')
- self.screen_width, self.screen_height = int(size[0]), int(size[1])
- elif "Size: " in output:
- size = output.split('Size: ')[1].strip().split('x')
- self.screen_width, self.screen_height = int(size[0]), int(size[1])
- else:
- raise ValueError("无法获取屏幕尺寸")
- def get_screenshot(self, filename):
- self.adb_shell(f'screencap -p /sdcard/{filename}')
- subprocess.call([self.adb, '-s', self.device_serial, 'pull', f'/sdcard/{filename}', filename])
- self.adb_shell(f'rm /sdcard/{filename}')
- def merge_screenshots_vertical(self, screenshots):
- images = [np.array(img) for img in screenshots]
- max_width = max(img.shape[1] for img in images)
- aligned_images = []
- for img in images:
- if img.shape[1] != max_width:
- img = cv2.resize(img, (max_width, int(img.shape[0] * max_width / img.shape[1])))
- aligned_images.append(img)
- return cv2.vconcat(aligned_images)
- def is_bottom_reached(self, last_screenshot, current_screenshot):
- """检查是否到达底部"""
- # 将图片转换为numpy数组
- last_img = np.array(last_screenshot)
- current_img = np.array(current_screenshot)
- # 计算最后一部分的相似度
- last_portion = last_img[-100:, :]
- current_portion = current_img[-100:, :]
- # 使用结构相似性指数(SSIM)比较
- similarity = cv2.matchTemplate(last_portion, current_portion, cv2.TM_CCOEFF_NORMED)
- return similarity.max() > 0.95
- def scroll_to_top(self):
- """滚动到页面顶部"""
- try:
- # 多次向上滑动以确保到达顶部
- for _ in range(3):
- # 从屏幕底部往上滑
- self.adb_shell(
- f'input swipe {self.screen_width // 2} {self.screen_height - 200} '
- f'{self.screen_width // 2} {200} 800'
- )
- time.sleep(1)
- logging.info("已滚动到页面顶部")
- return True
- except subprocess.CalledProcessError as e:
- logging.error(f"滚动到顶部失败:{e}")
- return False
- def images_are_different(self, img1, img2, threshold=0.95):
- """检查两张图片是否不同"""
- # 转换为OpenCV格式
- img1_cv = cv2.cvtColor(np.array(img1), cv2.COLOR_RGB2BGR)
- img2_cv = cv2.cvtColor(np.array(img2), cv2.COLOR_RGB2BGR)
- # 计算相似度
- result = cv2.matchTemplate(img1_cv, img2_cv, cv2.TM_CCOEFF_NORMED)
- similarity = np.max(result)
- return similarity < threshold
- def capture_long_screenshot(self, output_filename='full_screenshot.png', max_screenshots=10):
- """改进的长截图函数"""
- if not self.check_adb_connection():
- return False
- self.get_screen_size()
- screenshots = []
- # 第一次截图
- self.get_screenshot('temp_screenshot_0.png')
- last_screenshot = Image.open('temp_screenshot_0.png')
- screenshots.append(last_screenshot)
- # 循环滚动和截图
- for i in range(1, max_screenshots):
- try:
- # 从屏幕底部向上滑动
- self.adb_shell(
- f'input swipe {self.screen_width // 2} {self.screen_height - 200} '
- f'{self.screen_width // 2} {200} 800'
- )
- time.sleep(1.5) # 等待滚动完成
- # 截取当前页面
- self.get_screenshot(f'temp_screenshot_{i}.png')
- current_screenshot = Image.open(f'temp_screenshot_{i}.png')
- # 只有当图片不同时才添加
- if self.images_are_different(last_screenshot, current_screenshot):
- screenshots.append(current_screenshot)
- last_screenshot = current_screenshot
- else:
- logging.info("检测到重复页面,停止截图")
- break
- except subprocess.CalledProcessError:
- break
- try:
- if screenshots:
- result = self.merge_screenshots_vertical(screenshots)
- cv2.imwrite(output_filename, result)
- logging.info(f"长截图已保存至 {output_filename}")
- except Exception as e:
- logging.error(f"截图拼接失败:{e}")
- return False
- finally:
- # 清理临时文件
- for i in range(max_screenshots):
- temp_file = f'temp_screenshot_{i}.png'
- if os.path.exists(temp_file):
- os.remove(temp_file)
- return True
- def main():
- # 配置参数
- adb_path = '/home/fzxs/Android/Sdk/platform-tools/adb' # 替换为实际的ADB路径
- device_serial = '127.0.0.1:6555' # 替换为实际的设备序列号
- # 创建截图工具实例
- stitcher = ScreenshotStitcher(device_serial, adb_path)
- # 执行长截图
- stitcher.capture_long_screenshot('long_screenshot.png')
- if __name__ == '__main__':
- main()
|