| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- import asyncio
- import time
- import json
- import hashlib
- import websockets
- from websockets_proxy import Proxy, proxy_connect
- import gzip
- async def listen_ibit_futures():
- # 获取当前时间戳(毫秒)
- n = str(int(round(time.time() * 1000)))
- # APP_TOKEN
- r = '66e50af7-b5ea-4079-a11e-66348a14f6f1376847'
- # 使用 SHA256 计算签名
- a = hashlib.sha256(f"ibit_{n}##".encode()).hexdigest()
- # 构造 WebSocket URI
- # uri = f"wss://prod-ibit-futures-ws-p-group.aka-line-a.com/ws/futures?ts={n}&sign={a}&platform=PC&token={r}"
- uri = f"wss://prod-ibit-market-server-p-co-p-group.aka-line-a.com/ws/quotation?ts={n}&sign={a}&env=local"
- # 代理服务器地址
- proxy_url = "http://127.0.0.1:7899"
- proxy = Proxy.from_url(proxy_url)
- # 使用代理连接
- async with proxy_connect(uri, proxy=proxy) as websocket:
- print(f"成功通过代理 {proxy_url} 连接到 {uri}")
- # 发送订阅消息
- sub = {
- "event": "sub",
- "params": {
- "channels": [
- # "account",
- # "currentPositions",
- # "followPositions",
- # "leadPositions",
- # "openOrders",
- # "planCloseOrders",
- # "planOpenOrders",
- # "unsettleProfit"
- ],
- "cb_id": 1
- }
- }
- sub = {"event": "sub", "params": {"channels": [
- "gzip_market_all_mark_price",
- # "gzip_market_all_index_price",
- # "gzip_market_all_latest_price",
- # "gzip_market_all_24h_ticker",
- # "gzip_market_all_position_fee_rate",
- # "gzip_market_abnormal"
- ]}}
- # sub = {"event": "sub", "params": {
- # "channels": [
- # "gzip_market_btcusdt_depth_step0",
- # "gzip_market_btcusdt_trade_ticker",
- # "gzip_market_btcusdt_kline_1day",
- # "gzip_market_btcusdt_ticker",
- # "gzip_market_depth_one_price_btcusdt",
- # "gzip_market_position_fee_rate_btcusdt"
- # ]}}
- await websocket.send(json.dumps(sub))
- print("已发送订阅消息")
- # 定义异步心跳函数
- async def send_heartbeat():
- while True:
- heartbeat_message = {"method": "ping"}
- await websocket.send(json.dumps(heartbeat_message))
- print(f"发送心跳消息: {heartbeat_message}")
- await asyncio.sleep(30)
- # 启动异步心跳任务
- heartbeat_task = asyncio.create_task(send_heartbeat())
- try:
- # 接收消息循环
- while True:
- message = await websocket.recv()
- # 处理二进制数据(gzip压缩)
- if isinstance(message, bytes):
- # 检查是否是gzip压缩数据
- if message.startswith(b'\x1f\x8b'):
- try:
- # 解压gzip数据
- decompressed_data = gzip.decompress(message)
- message = decompressed_data.decode('utf-8')
- except Exception as e:
- print(f"解压数据失败: {e}")
- continue
- else:
- # 非gzip二进制数据转为字符串
- message = message.decode('utf-8')
- print(f"接收到消息: {message}")
- # 解析JSON数据
- try:
- data = json.loads(message)
- except json.JSONDecodeError as e:
- print(f"JSON解析失败: {e}")
- continue
- if 'channel' in data and data['channel'] == 'pong':
- continue
- # 根据不同频道处理数据
- if 'channel' in data:
- channel = data['channel']
- if channel == 'account' and 'data' in data:
- # 账户信息
- for account in data['data']:
- print(f"账户余额: {account.get('balance', 'N/A')} {account.get('currency', 'N/A')}")
- elif channel == 'currentPositions' and 'data' in data:
- # 当前持仓
- for position in data['data']:
- print(f"持仓: {position}")
- elif channel in ['openOrders', 'planCloseOrders', 'planOpenOrders'] and 'data' in data:
- # 订单信息
- for order in data['data']:
- # 安全访问 'p' 字段,如果不存在则显示 N/A
- price = order.get('p', 'N/A')
- print(f"订单价格: {price}")
- # 其他频道可以按需添加处理逻辑
- except websockets.exceptions.ConnectionClosed as e:
- print(f"连接关闭: {e}")
- except Exception as e:
- print(f"发生错误: {e}")
- finally:
- # 取消心跳任务
- heartbeat_task.cancel()
- try:
- await heartbeat_task
- except asyncio.CancelledError:
- pass
- async def main():
- await listen_ibit_futures()
- if __name__ == "__main__":
- asyncio.run(main())
|