main-websea.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import asyncio
  2. import time
  3. import json
  4. import hashlib
  5. import websockets
  6. from websockets_proxy import Proxy, proxy_connect
  7. import gzip
  8. import zlib
  9. async def listen_ibit_futures():
  10. uri = f"wss://cws.websea.com/ws/realTime?compress=0"
  11. # uri = f"wss://api.digifinex.com/ws?access_token=J9oxYwf86pX7kbtwFaUbPzeOYE8HGayG&user_msg=true"
  12. # 代理服务器地址
  13. proxy_url = "http://127.0.0.1:7899"
  14. proxy = Proxy.from_url(proxy_url)
  15. # 使用代理连接
  16. async with proxy_connect(uri, proxy=proxy) as websocket:
  17. print(f"成功通过代理 {proxy_url} 连接到 {uri}")
  18. # 发送订阅消息
  19. # sub = {"subs": [{"token": "undefined_pc", "type": 16}], "unSubs": []}
  20. sub = {"subs": [{"symbol": "BTC-USDT", "type": 8}], "unSubs": []}
  21. # sub = {"subs": [{"type": 9}], "unSubs": []}
  22. # sub = {"subs": [{"symbol": "BTC-USDT", "depth": "0.1", "type": 1}], "unSubs": []}
  23. # sub = {"subs": [{"symbol": "BTC-USDT", "type": 3}], "unSubs": []}
  24. # sub = {"subs": [{"symbol": "BTC-USDT", "time": "30min", "type": 5}], "unSubs": []}
  25. # sub = {"subs": [{"symbol": "BTC-USDT", "time": "30min", "type": 14}], "unSubs": []}
  26. # sub = {"subs": [{"symbol": "BTC-USDT", "time": "30min", "type": 13}], "unSubs": []}
  27. await websocket.send(json.dumps(sub))
  28. print("已发送订阅消息")
  29. # 定义异步心跳函数
  30. async def send_heartbeat():
  31. while True:
  32. # heartbeat_message = {"id": "921848", "method": "server.ping", "params": []}
  33. print("==ping==>", int(time.time()))
  34. heartbeat_message = str(int(time.time())) # {"action":"ping","data":{}}
  35. await websocket.send(json.dumps(heartbeat_message))
  36. print(f"发送心跳消息: {heartbeat_message}")
  37. await asyncio.sleep(10)
  38. # 启动异步心跳任务
  39. heartbeat_task = asyncio.create_task(send_heartbeat())
  40. try:
  41. # 接收消息循环
  42. while True:
  43. message = await websocket.recv()
  44. # 处理二进制数据(gzip压缩)
  45. if isinstance(message, bytes):
  46. # 首先尝试直接解码为UTF-8,可能不是压缩数据
  47. try:
  48. message = message.decode('utf-8')
  49. except UnicodeDecodeError:
  50. # 如果直接解码失败,再尝试解压
  51. # 检查是否是gzip压缩数据
  52. if message.startswith(b'\x1f\x8b'):
  53. try:
  54. # 解压gzip数据
  55. decompressed_data = gzip.decompress(message)
  56. message = decompressed_data.decode('utf-8')
  57. except Exception as e:
  58. print(f"gzip解压失败: {e}")
  59. print(f"原始数据 (前100字节): {message[:100].hex()}")
  60. continue
  61. # 检查是否是deflate压缩数据(以78da开头)
  62. elif message.startswith(b'\x78\xda') or message.startswith(b'\x78\x9c') or message.startswith(b'\x78\x01'):
  63. # 尝试多种deflate解压方法
  64. decompressed_success = False
  65. # 方法1: 使用zlib.decompress和-wbits参数
  66. if not decompressed_success:
  67. try:
  68. decompressed_data = zlib.decompress(message, -zlib.MAX_WBITS)
  69. message = decompressed_data.decode('utf-8')
  70. decompressed_success = True
  71. except Exception as e:
  72. print(f"deflate解压方法1失败: {e}")
  73. # 方法2: 使用zlib.decompress和默认wbits参数
  74. if not decompressed_success:
  75. try:
  76. decompressed_data = zlib.decompress(message)
  77. message = decompressed_data.decode('utf-8')
  78. decompressed_success = True
  79. except Exception as e:
  80. print(f"deflate解压方法2失败: {e}")
  81. # 方法3: 使用zlib.decompress和+16参数(gzip格式)
  82. if not decompressed_success:
  83. try:
  84. decompressed_data = zlib.decompress(message, zlib.MAX_WBITS | 16)
  85. message = decompressed_data.decode('utf-8')
  86. decompressed_success = True
  87. except Exception as e:
  88. print(f"deflate解压方法3失败: {e}")
  89. # 如果所有方法都失败,打印原始数据
  90. if not decompressed_success:
  91. print(f"deflate解压失败: Error -3 while decompressing data: invalid stored block lengths")
  92. print(f"原始数据 (前100字节): {message[:100].hex()}")
  93. print(f"数据长度: {len(message)}")
  94. continue
  95. else:
  96. # 尝试多种编码解码非gzip二进制数据
  97. decoded = False
  98. # 尝试UTF-8解码
  99. try:
  100. message = message.decode('utf-8')
  101. decoded = True
  102. except UnicodeDecodeError:
  103. pass
  104. # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示
  105. if not decoded:
  106. try:
  107. message = message.decode('gbk')
  108. decoded = True
  109. except UnicodeDecodeError:
  110. pass
  111. if not decoded:
  112. # 如果所有解码都失败,以十六进制形式显示数据
  113. print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}")
  114. continue
  115. print(f"接收到消息: {message}")
  116. # 解析JSON数据
  117. try:
  118. data = json.loads(message)
  119. except json.JSONDecodeError as e:
  120. print(f"JSON解析失败: {e}")
  121. print(f"原始消息: {message}")
  122. continue
  123. if 'channel' in data and data['channel'] == 'pong':
  124. continue
  125. # 处理event类型的消息
  126. if 'event' in data:
  127. event = data['event']
  128. if event == 'mark_price.update' and 'data' in data:
  129. mark_data = data['data']
  130. print(f"标记价格更新: {mark_data.get('instrument_id', 'N/A')} - {mark_data.get('mark_price', 'N/A')}")
  131. except websockets.exceptions.ConnectionClosed as e:
  132. print(f"连接关闭: {e}")
  133. except Exception as e:
  134. print(f"发生错误: {e}")
  135. import traceback
  136. traceback.print_exc()
  137. finally:
  138. # 取消心跳任务
  139. heartbeat_task.cancel()
  140. try:
  141. await heartbeat_task
  142. except asyncio.CancelledError:
  143. pass
  144. async def main():
  145. await listen_ibit_futures()
  146. if __name__ == "__main__":
  147. asyncio.run(main())