main-ibit.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import asyncio
  2. import time
  3. import json
  4. import hashlib
  5. import websockets
  6. from websockets_proxy import Proxy, proxy_connect
  7. import gzip
  8. async def listen_ibit_futures():
  9. # 获取当前时间戳(毫秒)
  10. n = str(int(round(time.time() * 1000)))
  11. # APP_TOKEN
  12. r = '66e50af7-b5ea-4079-a11e-66348a14f6f1376847'
  13. # 使用 SHA256 计算签名
  14. a = hashlib.sha256(f"ibit_{n}##".encode()).hexdigest()
  15. # 构造 WebSocket URI
  16. # uri = f"wss://prod-ibit-futures-ws-p-group.aka-line-a.com/ws/futures?ts={n}&sign={a}&platform=PC&token={r}"
  17. uri = f"wss://prod-ibit-market-server-p-co-p-group.aka-line-a.com/ws/quotation?ts={n}&sign={a}&env=local"
  18. # 代理服务器地址
  19. proxy_url = "http://127.0.0.1:7899"
  20. proxy = Proxy.from_url(proxy_url)
  21. # 使用代理连接
  22. async with proxy_connect(uri, proxy=proxy) as websocket:
  23. print(f"成功通过代理 {proxy_url} 连接到 {uri}")
  24. # 发送订阅消息
  25. sub = {
  26. "event": "sub",
  27. "params": {
  28. "channels": [
  29. # "account",
  30. # "currentPositions",
  31. # "followPositions",
  32. # "leadPositions",
  33. # "openOrders",
  34. # "planCloseOrders",
  35. # "planOpenOrders",
  36. # "unsettleProfit"
  37. ],
  38. "cb_id": 1
  39. }
  40. }
  41. sub = {"event": "sub", "params": {"channels": [
  42. "gzip_market_all_mark_price",
  43. # "gzip_market_all_index_price",
  44. # "gzip_market_all_latest_price",
  45. # "gzip_market_all_24h_ticker",
  46. # "gzip_market_all_position_fee_rate",
  47. # "gzip_market_abnormal"
  48. ]}}
  49. # sub = {"event": "sub", "params": {
  50. # "channels": [
  51. # "gzip_market_btcusdt_depth_step0",
  52. # "gzip_market_btcusdt_trade_ticker",
  53. # "gzip_market_btcusdt_kline_1day",
  54. # "gzip_market_btcusdt_ticker",
  55. # "gzip_market_depth_one_price_btcusdt",
  56. # "gzip_market_position_fee_rate_btcusdt"
  57. # ]}}
  58. await websocket.send(json.dumps(sub))
  59. print("已发送订阅消息")
  60. # 定义异步心跳函数
  61. async def send_heartbeat():
  62. while True:
  63. heartbeat_message = {"method": "ping"}
  64. await websocket.send(json.dumps(heartbeat_message))
  65. print(f"发送心跳消息: {heartbeat_message}")
  66. await asyncio.sleep(30)
  67. # 启动异步心跳任务
  68. heartbeat_task = asyncio.create_task(send_heartbeat())
  69. try:
  70. # 接收消息循环
  71. while True:
  72. message = await websocket.recv()
  73. # 处理二进制数据(gzip压缩)
  74. if isinstance(message, bytes):
  75. # 检查是否是gzip压缩数据
  76. if message.startswith(b'\x1f\x8b'):
  77. try:
  78. # 解压gzip数据
  79. decompressed_data = gzip.decompress(message)
  80. message = decompressed_data.decode('utf-8')
  81. except Exception as e:
  82. print(f"解压数据失败: {e}")
  83. continue
  84. else:
  85. # 非gzip二进制数据转为字符串
  86. message = message.decode('utf-8')
  87. print(f"接收到消息: {message}")
  88. # 解析JSON数据
  89. try:
  90. data = json.loads(message)
  91. except json.JSONDecodeError as e:
  92. print(f"JSON解析失败: {e}")
  93. continue
  94. if 'channel' in data and data['channel'] == 'pong':
  95. continue
  96. # 根据不同频道处理数据
  97. if 'channel' in data:
  98. channel = data['channel']
  99. if channel == 'account' and 'data' in data:
  100. # 账户信息
  101. for account in data['data']:
  102. print(f"账户余额: {account.get('balance', 'N/A')} {account.get('currency', 'N/A')}")
  103. elif channel == 'currentPositions' and 'data' in data:
  104. # 当前持仓
  105. for position in data['data']:
  106. print(f"持仓: {position}")
  107. elif channel in ['openOrders', 'planCloseOrders', 'planOpenOrders'] and 'data' in data:
  108. # 订单信息
  109. for order in data['data']:
  110. # 安全访问 'p' 字段,如果不存在则显示 N/A
  111. price = order.get('p', 'N/A')
  112. print(f"订单价格: {price}")
  113. # 其他频道可以按需添加处理逻辑
  114. except websockets.exceptions.ConnectionClosed as e:
  115. print(f"连接关闭: {e}")
  116. except Exception as e:
  117. print(f"发生错误: {e}")
  118. finally:
  119. # 取消心跳任务
  120. heartbeat_task.cancel()
  121. try:
  122. await heartbeat_task
  123. except asyncio.CancelledError:
  124. pass
  125. async def main():
  126. await listen_ibit_futures()
  127. if __name__ == "__main__":
  128. asyncio.run(main())