1
0

main-trubit.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import asyncio
  2. import time
  3. import json
  4. import hashlib
  5. import websockets
  6. from websockets_proxy import Proxy, proxy_connect
  7. import gzip
  8. async def listen_ibit_futures():
  9. uri = f"wss://future.trubit.com/prod1/market"
  10. # 代理服务器地址
  11. proxy_url = "http://127.0.0.1:7899"
  12. proxy = Proxy.from_url(proxy_url)
  13. # 使用代理连接
  14. async with proxy_connect(uri, proxy=proxy) as websocket:
  15. print(f"成功通过代理 {proxy_url} 连接到 {uri}")
  16. # 发送订阅消息
  17. # sub = {"priority": "HIGH", "_csclass": "RefdataRequestEvent", "version": "4.9-20220729", "txId": "TX20250831-150609-554-2", "channel": "PC"}
  18. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "RefdataUpdateEvent"}
  19. # sub = {"priority": "HIGH", "_csclass": "FundingRateRequestEvent", "symbol": "BTCUSDT", "txId": "TX20250831-150609-555-3", "channel": "PC"}
  20. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "FundingRateUpdateEvent", "subKey": "BTCUSDT"}
  21. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "DepthUpdateEvent", "subKey": "BTCUSDT"}
  22. # sub = {"priority": "HIGH", "_csclass": "DepthRequestEvent", "key": "BTCUSDT", "txId": "TX20250831-150609-555-6", "channel": "PC"}
  23. # sub = {"priority": "HIGH", "_csclass": "PositionTotalRequestEvent", "key": "BTCUSDT", "txId": "TX20250831-150609-555-7", "channel": "PC"}
  24. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "PositionTotalUpdateEvent", "subKey": "BTCUSDT"}
  25. # sub = {"priority": "HIGH", "_csclass": "TradeStatisticsRequestEvent"}
  26. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "TradeStatisticsUpdateEvent"}
  27. # sub = {"fromTimeMillions": 1756623969561, "key": "MEXO_BTCUSDT_15M", "priority": "HIGH", "step": 300, "_csclass": "HistoricalPriceProRequestEvent", "txId": "basic"}
  28. # sub = {"priority": "HIGH", "_csclass": "TradeStatisticsRequestEvent"}
  29. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "TradeStatisticsUpdateEvent"}
  30. # sub = {"priority": "HIGH", "_csclass": "IndexRequestEvent", "symbols": ["BTCUSDT", "ETHUSDT"], "txId": "TX20250831-150609-732-5", "channel": "PC"}
  31. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "IndexUpdateEvent", "subKey": "BTCUSDT"}
  32. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "IndexUpdateEvent", "subKey": "ETHUSDT"}
  33. # sub = {"priority": "HIGH", "_csclass": "MarkRequestEvent", "symbols": ["BTCUSDT", "ETHUSDT"], "txId": "TX20250831-150609-732-4", "channel": "PC"}
  34. sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "MarkUpdateEvent", "subKey": "BTCUSDT"}
  35. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "MarkUpdateEvent", "subKey": "ETHUSDT"}
  36. # sub = {"fromIndex": 0, "key": "MEXO_BTCUSDT_15M", "priority": "HIGH", "step": 329, "_csclass": "HistoricalPriceProRequestEvent"}
  37. # sub = {"priority": "NORMAL", "_csclass": "RemoteSubscribeEvent", "clazz": "HistoricalPriceProUpdateEvent", "subKey": "MEXO_BTCUSDT_15M"}
  38. await websocket.send(json.dumps(sub))
  39. print("已发送订阅消息")
  40. # 定义异步心跳函数
  41. async def send_heartbeat():
  42. while True:
  43. heartbeat_message = "ping"
  44. await websocket.send(heartbeat_message)
  45. print(f"发送心跳消息: {heartbeat_message}")
  46. await asyncio.sleep(30)
  47. # 启动异步心跳任务
  48. heartbeat_task = asyncio.create_task(send_heartbeat())
  49. try:
  50. # 接收消息循环
  51. while True:
  52. message = await websocket.recv()
  53. # 处理二进制数据(gzip压缩)
  54. if isinstance(message, bytes):
  55. # 检查是否是gzip压缩数据
  56. if message.startswith(b'\x1f\x8b'):
  57. try:
  58. # 解压gzip数据
  59. decompressed_data = gzip.decompress(message)
  60. message = decompressed_data.decode('utf-8')
  61. except Exception as e:
  62. print(f"解压数据失败: {e}")
  63. print(f"原始数据 (前50字节): {message[:50]}")
  64. continue
  65. else:
  66. # 尝试多种编码解码非gzip二进制数据
  67. decoded = False
  68. # 尝试UTF-8解码
  69. try:
  70. message = message.decode('utf-8')
  71. decoded = True
  72. except UnicodeDecodeError:
  73. pass
  74. # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示
  75. if not decoded:
  76. try:
  77. message = message.decode('gbk')
  78. decoded = True
  79. except UnicodeDecodeError:
  80. pass
  81. if not decoded:
  82. # 如果所有解码都失败,以十六进制形式显示数据
  83. print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}")
  84. continue
  85. print(f"接收到消息: {message}")
  86. print("====================================================================")
  87. # 解析JSON数据
  88. try:
  89. data = json.loads(message)
  90. except json.JSONDecodeError as e:
  91. # print(f"JSON解析失败: {e}")
  92. print(f"原始消息: {message}")
  93. continue
  94. # if 'channel' in data and data['channel'] == 'pong':
  95. # continue
  96. #
  97. # # 根据不同频道处理数据
  98. # if 'channel' in data:
  99. # channel = data['channel']
  100. # if channel == 'account' and 'data' in data:
  101. # # 账户信息
  102. # for account in data['data']:
  103. # print(f"账户余额: {account.get('balance', 'N/A')} {account.get('currency', 'N/A')}")
  104. # elif channel == 'currentPositions' and 'data' in data:
  105. # # 当前持仓
  106. # for position in data['data']:
  107. # print(f"持仓: {position}")
  108. # elif channel in ['openOrders', 'planCloseOrders', 'planOpenOrders'] and 'data' in data:
  109. # # 订单信息
  110. # for order in data['data']:
  111. # # 安全访问 'p' 字段,如果不存在则显示 N/A
  112. # price = order.get('p', 'N/A')
  113. # print(f"订单价格: {price}")
  114. # # 其他频道可以按需添加处理逻辑
  115. except websockets.exceptions.ConnectionClosed as e:
  116. print(f"连接关闭: {e}")
  117. except Exception as e:
  118. print(f"发生错误: {e}")
  119. import traceback
  120. traceback.print_exc()
  121. finally:
  122. # 取消心跳任务
  123. heartbeat_task.cancel()
  124. try:
  125. await heartbeat_task
  126. except asyncio.CancelledError:
  127. pass
  128. async def main():
  129. await listen_ibit_futures()
  130. if __name__ == "__main__":
  131. asyncio.run(main())