import asyncio import time import json import hashlib import websockets from websockets_proxy import Proxy, proxy_connect import gzip async def listen_ibit_futures(): uri = f"wss://net-wss.deepcoin.com/streamlet/trade/public/swap?device=pc-0748a44d11158f4f2fd1fda01d1775df&platform=pc&isStreamlet=true" # uri = f"wss://net-wss.deepcoin.com/business/public?platform=pc" # 代理服务器地址 proxy_url = "http://127.0.0.1:7899" proxy = Proxy.from_url(proxy_url) # 添加完整的请求头以模拟浏览器请求 extra_headers = { # "Upgrade": "websocket", # "Origin": "https://www.deepcoin.com", # "Cache-Control": "no-cache", # "Accept-Language": "zh-CN,zh;q=0.9", # "Pragma": "no-cache", # "Connection": "Upgrade", "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", # "Sec-WebSocket-Version": "13", # "Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits" } # 使用代理连接,并添加请求头 async with proxy_connect(uri, proxy=proxy, extra_headers=extra_headers) as websocket: print(f"成功通过代理 {proxy_url} 连接到 {uri}") # 发送订阅消息 sub = {"SendTopicAction": {"Action": "2", "FilterValue": "DeepCoin_BTCUSDT", "LocalNo": 9, "ResumeNo": -2, "TopicID": "7"}} sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT", "LocalNo": 9, "ResumeNo": -2, "TopicID": "7"}} # sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT", "LocalNo": 5, "ResumeNo": -30, "TopicID": "2"}} # sub = {"SendTopicAction": {"Action": "2", "FilterValue": "DeepCoin_BTCUSDT_0.1", "LocalNo": 4, "ResumeNo": -2, "TopicID": "25"}} # sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT_0.1", "LocalNo": 4, "ResumeNo": -2, "TopicID": "25"}} # sub = {"SendTopicAction": {"Action": "2", "FilterValue": "DeepCoin_BTCUSDT_15m", "LocalNo": 6, "ResumeNo": -50, "TopicID": "11"}} # sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT_15m", "LocalNo": 6, "ResumeNo": -50, "TopicID": "11"}} await websocket.send(json.dumps(sub)) print("已发送订阅消息") # 定义异步心跳函数 async def send_heartbeat(): while True: heartbeat_message = "ping" await websocket.send(heartbeat_message) print(f"发送心跳消息: {heartbeat_message}") await asyncio.sleep(10) # 启动异步心跳任务 heartbeat_task = asyncio.create_task(send_heartbeat()) try: # 接收消息循环 while True: message = await websocket.recv() # 处理二进制数据(gzip压缩) if isinstance(message, bytes): # 检查是否是gzip压缩数据 if message.startswith(b'\x1f\x8b'): try: # 解压gzip数据 decompressed_data = gzip.decompress(message) message = decompressed_data.decode('utf-8') except Exception as e: print(f"解压数据失败: {e}") print(f"原始数据 (前50字节): {message[:50]}") continue else: # 尝试多种编码解码非gzip二进制数据 decoded = False # 尝试UTF-8解码 try: message = message.decode('utf-8') decoded = True except UnicodeDecodeError: pass # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示 if not decoded: try: message = message.decode('gbk') decoded = True except UnicodeDecodeError: pass if not decoded: # 如果所有解码都失败,以十六进制形式显示数据 print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}") continue print(f"接收到消息: {message}") # 解析JSON数据 try: data = json.loads(message) except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") print(f"原始消息: {message}") continue except websockets.exceptions.ConnectionClosed as e: print(f"连接关闭: {e}") except Exception as e: print(f"发生错误: {e}") import traceback traceback.print_exc() finally: # 取消心跳任务 heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass async def main(): await listen_ibit_futures() if __name__ == "__main__": asyncio.run(main())