1
0

main-digifinex.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. import asyncio
  2. import time
  3. import json
  4. import hashlib
  5. import websockets
  6. from websockets_proxy import Proxy, proxy_connect
  7. import gzip
  8. import zlib
  9. async def listen_ibit_futures():
  10. uri = f"wss://api.digifinex.com/swap_ws/v2/"
  11. uri = f"wss://api.digifinex.com/ws?access_token=J9oxYwf86pX7kbtwFaUbPzeOYE8HGayG&user_msg=true"
  12. # 代理服务器地址
  13. proxy_url = "http://127.0.0.1:7899"
  14. proxy = Proxy.from_url(proxy_url)
  15. # 使用代理连接
  16. async with proxy_connect(uri, proxy=proxy) as websocket:
  17. print(f"成功通过代理 {proxy_url} 连接到 {uri}")
  18. # 发送订阅消息
  19. sub = {"id": "701441", "event": "trades.subscribe", "instrument_id": "BTCUSDTPERP"}
  20. sub = {"id": "362033", "event": "ticker.subscribe", "instrument_id": "BTCUSDTPERP"}
  21. sub = {"id": "265011", "event": "price_range.subscribe", "instrument_id": "BTCUSDTPERP"}
  22. sub = {"id": "889274", "event": "index_price.subscribe", "instrument_id": "BTCUSDTPERP"}
  23. sub = {"id": "375625", "event": "fund_rate.subscribe", "instrument_id": "BTCUSDTPERP"}
  24. sub = {"id": "299202", "event": "depth.subscribe", "level": 100, "instrument_id": "BTCUSDTPERP"}
  25. sub = {"id": "220604", "event": "all_ticker.subscribe"}
  26. sub = {"id": "667725", "event": "cur_candle.subscribe", "instrument_id": "BTCUSDTPERP", "granularity": "15m"}
  27. # sub = {"id": "605176", "event": "mark_price.subscribe", "instrument_id": "BTCUSDTPERP"}
  28. # sub = {"id": "phWjnWnbjGbGCKyyhTPCJMiC7eP5PBFc", "cmd": "sub", "topic": "alpha-market-depth-contract-btcusdt-trade", "data": {}}
  29. # sub = {"id": "YnmB4W4mjiSTdnCPhCfBxHH34WwidKyW", "cmd": "sub", "topic": "alpha-market-ticker", "data": {}}
  30. sub = {"action": "USER_TRADE_TICK", "data": {"currency_id": 30, "base_currency_id": 104, "get_orders": 1, "get_balance": 1, "get_asset_trade": 1, "get_positions": 1}}
  31. await websocket.send(json.dumps(sub))
  32. print("已发送订阅消息")
  33. # 定义异步心跳函数
  34. async def send_heartbeat():
  35. while True:
  36. heartbeat_message = {"id": "921848", "method": "server.ping", "params": []}
  37. heartbeat_message ={"action":"ping","data":{}}
  38. await websocket.send(json.dumps(heartbeat_message))
  39. print(f"发送心跳消息: {heartbeat_message}")
  40. await asyncio.sleep(10)
  41. # 启动异步心跳任务
  42. heartbeat_task = asyncio.create_task(send_heartbeat())
  43. try:
  44. # 接收消息循环
  45. while True:
  46. message = await websocket.recv()
  47. # 处理二进制数据(gzip压缩)
  48. if isinstance(message, bytes):
  49. # 首先尝试直接解码为UTF-8,可能不是压缩数据
  50. try:
  51. message = message.decode('utf-8')
  52. except UnicodeDecodeError:
  53. # 如果直接解码失败,再尝试解压
  54. # 检查是否是gzip压缩数据
  55. if message.startswith(b'\x1f\x8b'):
  56. try:
  57. # 解压gzip数据
  58. decompressed_data = gzip.decompress(message)
  59. message = decompressed_data.decode('utf-8')
  60. except Exception as e:
  61. print(f"gzip解压失败: {e}")
  62. print(f"原始数据 (前100字节): {message[:100].hex()}")
  63. continue
  64. # 检查是否是deflate压缩数据(以78da开头)
  65. elif message.startswith(b'\x78\xda') or message.startswith(b'\x78\x9c') or message.startswith(b'\x78\x01'):
  66. # 尝试多种deflate解压方法
  67. decompressed_success = False
  68. # 方法1: 使用zlib.decompress和-wbits参数
  69. # if not decompressed_success:
  70. # try:
  71. # decompressed_data = zlib.decompress(message, -zlib.MAX_WBITS)
  72. # message = decompressed_data.decode('utf-8')
  73. # decompressed_success = True
  74. # except Exception as e:
  75. # print(f"deflate解压方法1失败: {e}")
  76. #
  77. # # 方法2: 使用zlib.decompress和默认wbits参数
  78. # if not decompressed_success:
  79. try:
  80. decompressed_data = zlib.decompress(message)
  81. message = decompressed_data.decode('utf-8')
  82. decompressed_success = True
  83. except Exception as e:
  84. print(f"deflate解压方法2失败: {e}")
  85. #
  86. # # 方法3: 使用zlib.decompress和+16参数(gzip格式)
  87. # if not decompressed_success:
  88. # try:
  89. # decompressed_data = zlib.decompress(message, zlib.MAX_WBITS | 16)
  90. # message = decompressed_data.decode('utf-8')
  91. # decompressed_success = True
  92. # except Exception as e:
  93. # print(f"deflate解压方法3失败: {e}")
  94. #
  95. # # 如果所有方法都失败,打印原始数据
  96. # if not decompressed_success:
  97. # print(f"deflate解压失败: Error -3 while decompressing data: invalid stored block lengths")
  98. # print(f"原始数据 (前100字节): {message[:100].hex()}")
  99. # print(f"数据长度: {len(message)}")
  100. # continue
  101. else:
  102. # 尝试多种编码解码非gzip二进制数据
  103. decoded = False
  104. # 尝试UTF-8解码
  105. try:
  106. message = message.decode('utf-8')
  107. decoded = True
  108. except UnicodeDecodeError:
  109. pass
  110. # 如果UTF-8解码失败,尝试其他编码或以十六进制形式显示
  111. if not decoded:
  112. try:
  113. message = message.decode('gbk')
  114. decoded = True
  115. except UnicodeDecodeError:
  116. pass
  117. if not decoded:
  118. # 如果所有解码都失败,以十六进制形式显示数据
  119. print(f"无法解码的二进制数据 (前100字节): {message[:100].hex()}")
  120. continue
  121. print(f"接收到消息: {message}")
  122. # 解析JSON数据
  123. try:
  124. data = json.loads(message)
  125. except json.JSONDecodeError as e:
  126. print(f"JSON解析失败: {e}")
  127. print(f"原始消息: {message}")
  128. continue
  129. if 'channel' in data and data['channel'] == 'pong':
  130. continue
  131. # 根据不同频道处理数据
  132. # if 'channel' in data:
  133. # channel = data['channel']
  134. # if channel == 'account' and 'data' in data:
  135. # # 账户信息
  136. # for account in data['data']:
  137. # print(f"账户余额: {account.get('balance', 'N/A')} {account.get('currency', 'N/A')}")
  138. # elif channel == 'currentPositions' and 'data' in data:
  139. # # 当前持仓
  140. # for position in data['data']:
  141. # print(f"持仓: {position}")
  142. # elif channel in ['openOrders', 'planCloseOrders', 'planOpenOrders'] and 'data' in data:
  143. # # 订单信息
  144. # for order in data['data']:
  145. # # 安全访问 'p' 字段,如果不存在则显示 N/A
  146. # price = order.get('p', 'N/A')
  147. # print(f"订单价格: {price}")
  148. # # 其他频道可以按需添加处理逻辑
  149. # 处理event类型的消息
  150. if 'event' in data:
  151. event = data['event']
  152. if event == 'mark_price.update' and 'data' in data:
  153. mark_data = data['data']
  154. print(f"标记价格更新: {mark_data.get('instrument_id', 'N/A')} - {mark_data.get('mark_price', 'N/A')}")
  155. except websockets.exceptions.ConnectionClosed as e:
  156. print(f"连接关闭: {e}")
  157. except Exception as e:
  158. print(f"发生错误: {e}")
  159. import traceback
  160. traceback.print_exc()
  161. finally:
  162. # 取消心跳任务
  163. heartbeat_task.cancel()
  164. try:
  165. await heartbeat_task
  166. except asyncio.CancelledError:
  167. pass
  168. async def main():
  169. await listen_ibit_futures()
  170. if __name__ == "__main__":
  171. asyncio.run(main())