import asyncio import time import json import hashlib import websockets from websockets_proxy import Proxy, proxy_connect import gzip import zlib async def listen_ibit_futures(): uri = f"wss://cws.websea.com/ws/realTime?compress=0" # uri = f"wss://api.digifinex.com/ws?access_token=J9oxYwf86pX7kbtwFaUbPzeOYE8HGayG&user_msg=true" # 代理服务器地址 proxy_url = "http://127.0.0.1:7899" proxy = Proxy.from_url(proxy_url) # 使用代理连接 async with proxy_connect(uri, proxy=proxy) as websocket: print(f"成功通过代理 {proxy_url} 连接到 {uri}") # 发送订阅消息 # sub = {"subs": [{"token": "undefined_pc", "type": 16}], "unSubs": []} sub = {"subs": [{"symbol": "BTC-USDT", "type": 8}], "unSubs": []} # sub = {"subs": [{"type": 9}], "unSubs": []} # sub = {"subs": [{"symbol": "BTC-USDT", "depth": "0.1", "type": 1}], "unSubs": []} # sub = {"subs": [{"symbol": "BTC-USDT", "type": 3}], "unSubs": []} # sub = {"subs": [{"symbol": "BTC-USDT", "time": "30min", "type": 5}], "unSubs": []} # sub = {"subs": [{"symbol": "BTC-USDT", "time": "30min", "type": 14}], "unSubs": []} # sub = {"subs": [{"symbol": "BTC-USDT", "time": "30min", "type": 13}], "unSubs": []} await websocket.send(json.dumps(sub)) print("已发送订阅消息") # 定义异步心跳函数 async def send_heartbeat(): while True: # heartbeat_message = {"id": "921848", "method": "server.ping", "params": []} print("==ping==>", int(time.time())) heartbeat_message = str(int(time.time())) # {"action":"ping","data":{}} await websocket.send(json.dumps(heartbeat_message)) print(f"发送心跳消息: {heartbeat_message}") await asyncio.sleep(10) # 启动异步心跳任务 heartbeat_task = asyncio.create_task(send_heartbeat()) try: # 接收消息循环 while True: message = await websocket.recv() # 处理二进制数据(gzip压缩) if isinstance(message, bytes): # 首先尝试直接解码为UTF-8,可能不是压缩数据 try: message = message.decode('utf-8') except UnicodeDecodeError: # 如果直接解码失败,再尝试解压 # 检查是否是gzip压缩数据 if message.startswith(b'\x1f\x8b'): try: # 解压gzip数据 decompressed_data = gzip.decompress(message) message = decompressed_data.decode('utf-8') except Exception as e: print(f"gzip解压失败: {e}") print(f"原始数据 (前100字节): {message[:100].hex()}") continue # 检查是否是deflate压缩数据(以78da开头) elif message.startswith(b'\x78\xda') or message.startswith(b'\x78\x9c') or message.startswith(b'\x78\x01'): # 尝试多种deflate解压方法 decompressed_success = False # 方法1: 使用zlib.decompress和-wbits参数 if not decompressed_success: try: decompressed_data = zlib.decompress(message, -zlib.MAX_WBITS) message = decompressed_data.decode('utf-8') decompressed_success = True except Exception as e: print(f"deflate解压方法1失败: {e}") # 方法2: 使用zlib.decompress和默认wbits参数 if not decompressed_success: try: decompressed_data = zlib.decompress(message) message = decompressed_data.decode('utf-8') decompressed_success = True except Exception as e: print(f"deflate解压方法2失败: {e}") # 方法3: 使用zlib.decompress和+16参数(gzip格式) if not decompressed_success: try: decompressed_data = zlib.decompress(message, zlib.MAX_WBITS | 16) message = decompressed_data.decode('utf-8') decompressed_success = True except Exception as e: print(f"deflate解压方法3失败: {e}") # 如果所有方法都失败,打印原始数据 if not decompressed_success: print(f"deflate解压失败: Error -3 while decompressing data: invalid stored block lengths") print(f"原始数据 (前100字节): {message[:100].hex()}") print(f"数据长度: {len(message)}") continue else: # 尝试多种编码解码非gzip二进制数据 decoded = False # 尝试UTF-8解码 try: message = message.decode('utf-8') decoded = True except UnicodeDecodeError: pass # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示 if not decoded: try: message = message.decode('gbk') decoded = True except UnicodeDecodeError: pass if not decoded: # 如果所有解码都失败,以十六进制形式显示数据 print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}") continue print(f"接收到消息: {message}") # 解析JSON数据 try: data = json.loads(message) except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") print(f"原始消息: {message}") continue if 'channel' in data and data['channel'] == 'pong': continue # 处理event类型的消息 if 'event' in data: event = data['event'] if event == 'mark_price.update' and 'data' in data: mark_data = data['data'] print(f"标记价格更新: {mark_data.get('instrument_id', 'N/A')} - {mark_data.get('mark_price', 'N/A')}") except websockets.exceptions.ConnectionClosed as e: print(f"连接关闭: {e}") except Exception as e: print(f"发生错误: {e}") import traceback traceback.print_exc() finally: # 取消心跳任务 heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass async def main(): await listen_ibit_futures() if __name__ == "__main__": asyncio.run(main())