1
0

main-deepcoin.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import asyncio
  2. import time
  3. import json
  4. import hashlib
  5. import websockets
  6. from websockets_proxy import Proxy, proxy_connect
  7. import gzip
  8. async def listen_ibit_futures():
  9. uri = f"wss://net-wss.deepcoin.com/streamlet/trade/public/swap?device=pc-0748a44d11158f4f2fd1fda01d1775df&platform=pc&isStreamlet=true"
  10. # uri = f"wss://net-wss.deepcoin.com/business/public?platform=pc"
  11. # 代理服务器地址
  12. proxy_url = "http://127.0.0.1:7899"
  13. proxy = Proxy.from_url(proxy_url)
  14. # 添加完整的请求头以模拟浏览器请求
  15. extra_headers = {
  16. # "Upgrade": "websocket",
  17. # "Origin": "https://www.deepcoin.com",
  18. # "Cache-Control": "no-cache",
  19. # "Accept-Language": "zh-CN,zh;q=0.9",
  20. # "Pragma": "no-cache",
  21. # "Connection": "Upgrade",
  22. "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
  23. # "Sec-WebSocket-Version": "13",
  24. # "Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits"
  25. }
  26. # 使用代理连接,并添加请求头
  27. async with proxy_connect(uri, proxy=proxy, extra_headers=extra_headers) as websocket:
  28. print(f"成功通过代理 {proxy_url} 连接到 {uri}")
  29. # 发送订阅消息
  30. sub = {"SendTopicAction": {"Action": "2", "FilterValue": "DeepCoin_BTCUSDT", "LocalNo": 9, "ResumeNo": -2, "TopicID": "7"}}
  31. sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT", "LocalNo": 9, "ResumeNo": -2, "TopicID": "7"}}
  32. # sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT", "LocalNo": 5, "ResumeNo": -30, "TopicID": "2"}}
  33. # sub = {"SendTopicAction": {"Action": "2", "FilterValue": "DeepCoin_BTCUSDT_0.1", "LocalNo": 4, "ResumeNo": -2, "TopicID": "25"}}
  34. # sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT_0.1", "LocalNo": 4, "ResumeNo": -2, "TopicID": "25"}}
  35. # sub = {"SendTopicAction": {"Action": "2", "FilterValue": "DeepCoin_BTCUSDT_15m", "LocalNo": 6, "ResumeNo": -50, "TopicID": "11"}}
  36. # sub = {"SendTopicAction": {"Action": "1", "FilterValue": "DeepCoin_BTCUSDT_15m", "LocalNo": 6, "ResumeNo": -50, "TopicID": "11"}}
  37. await websocket.send(json.dumps(sub))
  38. print("已发送订阅消息")
  39. # 定义异步心跳函数
  40. async def send_heartbeat():
  41. while True:
  42. heartbeat_message = "ping"
  43. await websocket.send(heartbeat_message)
  44. print(f"发送心跳消息: {heartbeat_message}")
  45. await asyncio.sleep(10)
  46. # 启动异步心跳任务
  47. heartbeat_task = asyncio.create_task(send_heartbeat())
  48. try:
  49. # 接收消息循环
  50. while True:
  51. message = await websocket.recv()
  52. # 处理二进制数据(gzip压缩)
  53. if isinstance(message, bytes):
  54. # 检查是否是gzip压缩数据
  55. if message.startswith(b'\x1f\x8b'):
  56. try:
  57. # 解压gzip数据
  58. decompressed_data = gzip.decompress(message)
  59. message = decompressed_data.decode('utf-8')
  60. except Exception as e:
  61. print(f"解压数据失败: {e}")
  62. print(f"原始数据 (前50字节): {message[:50]}")
  63. continue
  64. else:
  65. # 尝试多种编码解码非gzip二进制数据
  66. decoded = False
  67. # 尝试UTF-8解码
  68. try:
  69. message = message.decode('utf-8')
  70. decoded = True
  71. except UnicodeDecodeError:
  72. pass
  73. # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示
  74. if not decoded:
  75. try:
  76. message = message.decode('gbk')
  77. decoded = True
  78. except UnicodeDecodeError:
  79. pass
  80. if not decoded:
  81. # 如果所有解码都失败,以十六进制形式显示数据
  82. print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}")
  83. continue
  84. print(f"接收到消息: {message}")
  85. # 解析JSON数据
  86. try:
  87. data = json.loads(message)
  88. except json.JSONDecodeError as e:
  89. print(f"JSON解析失败: {e}")
  90. print(f"原始消息: {message}")
  91. continue
  92. except websockets.exceptions.ConnectionClosed as e:
  93. print(f"连接关闭: {e}")
  94. except Exception as e:
  95. print(f"发生错误: {e}")
  96. import traceback
  97. traceback.print_exc()
  98. finally:
  99. # 取消心跳任务
  100. heartbeat_task.cancel()
  101. try:
  102. await heartbeat_task
  103. except asyncio.CancelledError:
  104. pass
  105. async def main():
  106. await listen_ibit_futures()
  107. if __name__ == "__main__":
  108. asyncio.run(main())