import asyncio import time import json import hashlib import websockets from websockets_proxy import Proxy, proxy_connect import gzip async def listen_ibit_futures(): # 获取当前时间戳(毫秒) n = str(int(round(time.time() * 1000))) # APP_TOKEN r = '66e50af7-b5ea-4079-a11e-66348a14f6f1376847' # 使用 SHA256 计算签名 a = hashlib.sha256(f"ibit_{n}##".encode()).hexdigest() # 构造 WebSocket URI # uri = f"wss://prod-ibit-futures-ws-p-group.aka-line-a.com/ws/futures?ts={n}&sign={a}&platform=PC&token={r}" uri = f"wss://prod-ibit-market-server-p-co-p-group.aka-line-a.com/ws/quotation?ts={n}&sign={a}&env=local" # 代理服务器地址 proxy_url = "http://127.0.0.1:7899" proxy = Proxy.from_url(proxy_url) # 使用代理连接 async with proxy_connect(uri, proxy=proxy) as websocket: print(f"成功通过代理 {proxy_url} 连接到 {uri}") # 发送订阅消息 sub = { "event": "sub", "params": { "channels": [ # "account", # "currentPositions", # "followPositions", # "leadPositions", # "openOrders", # "planCloseOrders", # "planOpenOrders", # "unsettleProfit" ], "cb_id": 1 } } sub = {"event": "sub", "params": {"channels": [ "gzip_market_all_mark_price", # "gzip_market_all_index_price", # "gzip_market_all_latest_price", # "gzip_market_all_24h_ticker", # "gzip_market_all_position_fee_rate", # "gzip_market_abnormal" ]}} # sub = {"event": "sub", "params": { # "channels": [ # "gzip_market_btcusdt_depth_step0", # "gzip_market_btcusdt_trade_ticker", # "gzip_market_btcusdt_kline_1day", # "gzip_market_btcusdt_ticker", # "gzip_market_depth_one_price_btcusdt", # "gzip_market_position_fee_rate_btcusdt" # ]}} await websocket.send(json.dumps(sub)) print("已发送订阅消息") # 定义异步心跳函数 async def send_heartbeat(): while True: heartbeat_message = {"method": "ping"} await websocket.send(json.dumps(heartbeat_message)) print(f"发送心跳消息: {heartbeat_message}") await asyncio.sleep(30) # 启动异步心跳任务 heartbeat_task = asyncio.create_task(send_heartbeat()) try: # 接收消息循环 while True: message = await websocket.recv() # 处理二进制数据(gzip压缩) if isinstance(message, bytes): # 检查是否是gzip压缩数据 if message.startswith(b'\x1f\x8b'): try: # 解压gzip数据 decompressed_data = gzip.decompress(message) message = decompressed_data.decode('utf-8') except Exception as e: print(f"解压数据失败: {e}") continue else: # 非gzip二进制数据转为字符串 message = message.decode('utf-8') print(f"接收到消息: {message}") # 解析JSON数据 try: data = json.loads(message) except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") continue if 'channel' in data and data['channel'] == 'pong': continue # 根据不同频道处理数据 if 'channel' in data: channel = data['channel'] if channel == 'account' and 'data' in data: # 账户信息 for account in data['data']: print(f"账户余额: {account.get('balance', 'N/A')} {account.get('currency', 'N/A')}") elif channel == 'currentPositions' and 'data' in data: # 当前持仓 for position in data['data']: print(f"持仓: {position}") elif channel in ['openOrders', 'planCloseOrders', 'planOpenOrders'] and 'data' in data: # 订单信息 for order in data['data']: # 安全访问 'p' 字段,如果不存在则显示 N/A price = order.get('p', 'N/A') print(f"订单价格: {price}") # 其他频道可以按需添加处理逻辑 except websockets.exceptions.ConnectionClosed as e: print(f"连接关闭: {e}") except Exception as e: print(f"发生错误: {e}") finally: # 取消心跳任务 heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass async def main(): await listen_ibit_futures() if __name__ == "__main__": asyncio.run(main())