1
0

main-bittap.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import asyncio
  2. import time
  3. import json
  4. import hashlib
  5. import websockets
  6. from websockets_proxy import Proxy, proxy_connect
  7. import gzip
  8. async def listen_ibit_futures():
  9. uri = f"wss://stream.bittap.com/endpoint?format=JSON"
  10. # 代理服务器地址
  11. proxy_url = "http://127.0.0.1:7899"
  12. proxy = Proxy.from_url(proxy_url)
  13. # 使用代理连接
  14. async with proxy_connect(uri, proxy=proxy) as websocket:
  15. print(f"成功通过代理 {proxy_url} 连接到 {uri}")
  16. # 发送订阅消息
  17. sub = {"method": "SUBSCRIBE", "params": ["f_trade@BTC-USDT-M"], "id": "fsb7qjv"}
  18. sub = {"method": "SUBSCRIBE", "params": ["f_markPrice@BTC-USDT-M"], "id": "sld73t7"}
  19. # sub = {"method": "SUBSCRIBE", "params": ["f_ticker@BTC-USDT-M"], "id": "pccg3m1"}
  20. # sub = {"method": "SUBSCRIBE", "params": ["f_depth30@BTC-USDT-M_0.1"], "id": "3fst1jg"}
  21. # sub = {"method": "SUBSCRIBE", "params": ["f_kline_15m@BTC-USDT-M"], "id": "dhhn9h9"}
  22. # sub = {"method": "UNSUBSCRIBE", "params": ["f_kline_15m@BTC-USDT-M"]}
  23. # sub = {"method": "SUBSCRIBE", "params": ["f_kline_15m@BTC-USDT-M"], "id": "el13nfi"}
  24. await websocket.send(json.dumps(sub))
  25. print("已发送订阅消息")
  26. # 定义异步心跳函数
  27. async def send_heartbeat():
  28. while True:
  29. heartbeat_message = {"id": "a88y2z7", "method": "PING"}
  30. await websocket.send(json.dumps(heartbeat_message))
  31. print(f"发送心跳消息: {heartbeat_message}")
  32. await asyncio.sleep(30)
  33. # 启动异步心跳任务
  34. heartbeat_task = asyncio.create_task(send_heartbeat())
  35. try:
  36. # 接收消息循环
  37. while True:
  38. message = await websocket.recv()
  39. # 处理二进制数据(gzip压缩)
  40. if isinstance(message, bytes):
  41. # 检查是否是gzip压缩数据
  42. if message.startswith(b'\x1f\x8b'):
  43. try:
  44. # 解压gzip数据
  45. decompressed_data = gzip.decompress(message)
  46. message = decompressed_data.decode('utf-8')
  47. except Exception as e:
  48. print(f"解压数据失败: {e}")
  49. print(f"原始数据 (前50字节): {message[:50]}")
  50. continue
  51. else:
  52. # 尝试多种编码解码非gzip二进制数据
  53. decoded = False
  54. # 尝试UTF-8解码
  55. try:
  56. message = message.decode('utf-8')
  57. decoded = True
  58. except UnicodeDecodeError:
  59. pass
  60. # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示
  61. if not decoded:
  62. try:
  63. message = message.decode('gbk')
  64. decoded = True
  65. except UnicodeDecodeError:
  66. pass
  67. if not decoded:
  68. # 如果所有解码都失败,以十六进制形式显示数据
  69. print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}")
  70. continue
  71. print(f"接收到消息: {message}")
  72. print("====================================================================")
  73. # 解析JSON数据
  74. try:
  75. data = json.loads(message)
  76. except json.JSONDecodeError as e:
  77. # print(f"JSON解析失败: {e}")
  78. print(f"原始消息: {message}")
  79. continue
  80. # if 'channel' in data and data['channel'] == 'pong':
  81. # continue
  82. #
  83. # # 根据不同频道处理数据
  84. # if 'channel' in data:
  85. # channel = data['channel']
  86. # if channel == 'account' and 'data' in data:
  87. # # 账户信息
  88. # for account in data['data']:
  89. # print(f"账户余额: {account.get('balance', 'N/A')} {account.get('currency', 'N/A')}")
  90. # elif channel == 'currentPositions' and 'data' in data:
  91. # # 当前持仓
  92. # for position in data['data']:
  93. # print(f"持仓: {position}")
  94. # elif channel in ['openOrders', 'planCloseOrders', 'planOpenOrders'] and 'data' in data:
  95. # # 订单信息
  96. # for order in data['data']:
  97. # # 安全访问 'p' 字段,如果不存在则显示 N/A
  98. # price = order.get('p', 'N/A')
  99. # print(f"订单价格: {price}")
  100. # # 其他频道可以按需添加处理逻辑
  101. except websockets.exceptions.ConnectionClosed as e:
  102. print(f"连接关闭: {e}")
  103. except Exception as e:
  104. print(f"发生错误: {e}")
  105. import traceback
  106. traceback.print_exc()
  107. finally:
  108. # 取消心跳任务
  109. heartbeat_task.cancel()
  110. try:
  111. await heartbeat_task
  112. except asyncio.CancelledError:
  113. pass
  114. async def main():
  115. await listen_ibit_futures()
  116. if __name__ == "__main__":
  117. asyncio.run(main())