import asyncio import time import json import hashlib import websockets from websockets_proxy import Proxy, proxy_connect import gzip async def listen_ibit_futures(): uri = f"wss://future.trubit.com/prod1/market" # 代理服务器地址 proxy_url = "http://127.0.0.1:7899" proxy = Proxy.from_url(proxy_url) # 使用代理连接 async with proxy_connect(uri, proxy=proxy) as websocket: print(f"成功通过代理 {proxy_url} 连接到 {uri}") # 发送订阅消息 # sub = {"priority": "HIGH", "_csclass": "RefdataRequestEvent", "version": "4.9-20220729", "txId": "TX20250831-150609-554-2", "channel": "PC"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "RefdataUpdateEvent"} # sub = {"priority": "HIGH", "_csclass": "FundingRateRequestEvent", "symbol": "BTCUSDT", "txId": "TX20250831-150609-555-3", "channel": "PC"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "FundingRateUpdateEvent", "subKey": "BTCUSDT"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "DepthUpdateEvent", "subKey": "BTCUSDT"} # sub = {"priority": "HIGH", "_csclass": "DepthRequestEvent", "key": "BTCUSDT", "txId": "TX20250831-150609-555-6", "channel": "PC"} # sub = {"priority": "HIGH", "_csclass": "PositionTotalRequestEvent", "key": "BTCUSDT", "txId": "TX20250831-150609-555-7", "channel": "PC"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "PositionTotalUpdateEvent", "subKey": "BTCUSDT"} # sub = {"priority": "HIGH", "_csclass": "TradeStatisticsRequestEvent"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "TradeStatisticsUpdateEvent"} # sub = {"fromTimeMillions": 1756623969561, "key": "MEXO_BTCUSDT_15M", "priority": "HIGH", "step": 300, "_csclass": "HistoricalPriceProRequestEvent", "txId": "basic"} # sub = {"priority": "HIGH", "_csclass": "TradeStatisticsRequestEvent"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "TradeStatisticsUpdateEvent"} # sub = {"priority": "HIGH", "_csclass": "IndexRequestEvent", "symbols": ["BTCUSDT", "ETHUSDT"], "txId": "TX20250831-150609-732-5", "channel": "PC"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "IndexUpdateEvent", "subKey": "BTCUSDT"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "IndexUpdateEvent", "subKey": "ETHUSDT"} # sub = {"priority": "HIGH", "_csclass": "MarkRequestEvent", "symbols": ["BTCUSDT", "ETHUSDT"], "txId": "TX20250831-150609-732-4", "channel": "PC"} sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "MarkUpdateEvent", "subKey": "BTCUSDT"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "MarkUpdateEvent", "subKey": "ETHUSDT"} # sub = {"fromIndex": 0, "key": "MEXO_BTCUSDT_15M", "priority": "HIGH", "step": 329, "_csclass": "HistoricalPriceProRequestEvent"} # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "HistoricalPriceProUpdateEvent", "subKey": "MEXO_BTCUSDT_15M"} await websocket.send(json.dumps(sub)) print("已发送订阅消息") # 定义异步心跳函数 async def send_heartbeat(): while True: heartbeat_message = "ping" await websocket.send(heartbeat_message) print(f"发送心跳消息: {heartbeat_message}") await asyncio.sleep(30) # 启动异步心跳任务 heartbeat_task = asyncio.create_task(send_heartbeat()) try: # 接收消息循环 while True: message = await websocket.recv() # 处理二进制数据(gzip压缩) if isinstance(message, bytes): # 检查是否是gzip压缩数据 if message.startswith(b'\x1f\x8b'): try: # 解压gzip数据 decompressed_data = gzip.decompress(message) message = decompressed_data.decode('utf-8') except Exception as e: print(f"解压数据失败: {e}") print(f"原始数据 (前50字节): {message[:50]}") continue else: # 尝试多种编码解码非gzip二进制数据 decoded = False # 尝试UTF-8解码 try: message = message.decode('utf-8') decoded = True except UnicodeDecodeError: pass # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示 if not decoded: try: message = message.decode('gbk') decoded = True except UnicodeDecodeError: pass if not decoded: # 如果所有解码都失败,以十六进制形式显示数据 print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}") continue print(f"接收到消息: {message}") print("====================================================================") # 解析JSON数据 try: data = json.loads(message) except json.JSONDecodeError as e: # print(f"JSON解析失败: {e}") print(f"原始消息: {message}") continue # if 'channel' in data and data['channel'] == 'pong': # continue # # # 根据不同频道处理数据 # if 'channel' in data: # channel = data['channel'] # if channel == 'account' and 'data' in data: # # 账户信息 # for account in data['data']: # print(f"账户余额: {account.get('balance', 'N/A')} {account.get('currency', 'N/A')}") # elif channel == 'currentPositions' and 'data' in data: # # 当前持仓 # for position in data['data']: # print(f"持仓: {position}") # elif channel in ['openOrders', 'planCloseOrders', 'planOpenOrders'] and 'data' in data: # # 订单信息 # for order in data['data']: # # 安全访问 'p' 字段,如果不存在则显示 N/A # price = order.get('p', 'N/A') # print(f"订单价格: {price}") # # 其他频道可以按需添加处理逻辑 except websockets.exceptions.ConnectionClosed as e: print(f"连接关闭: {e}") except Exception as e: print(f"发生错误: {e}") import traceback traceback.print_exc() finally: # 取消心跳任务 heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass async def main(): await listen_ibit_futures() if __name__ == "__main__": asyncio.run(main())