如何获得Kucoin Exchange的websocket连接



如何获得与Kucoin Exchange的websocket连接?我正在尝试那个代码:

import os
import time, hashlib, requests, base64, hmac, json, websocket, uuid    
now = str(int(time.time() * 1000))
token = "my token"
SOCKET = f'wss://push1-v2.kucoin.net/endpoint?token={token}&connectId={now}'
params = {"type": "subscribe", "topic": "/market/ticker:BTC-USDT", "response": True}
def on_open(ws):
print('Opened Connection')
ws.send(json.dumps(params))
def on_close(ws):
print('Closed Connection')
def on_ping(ws, message):
pass
def on_pong(ws, message):
pass
def on_message(ws, message):
print(message)
def on_error(ws, err):
print("Got a an error: ", err)
if __name__ == "__main__":
ws = websocket.WebSocketApp(SOCKET, on_open = on_open, on_close = on_close, on_message = on_message,on_error=on_error, on_ping=on_ping, on_pong=on_pong)
ws.run_forever(ping_interval=10, ping_timeout=5)

但我收到消息:[Erno 11001]getaddrinfo失败也许我用错了端点?我必须使用哪个端点?

import requests
import json
import hmac
import hashlib
import base64
from urllib.parse import urlencode
import time
import websocket,threading

def get_token(api_key,api_secret,api_passphrase):
base_uri = 'https://api-futures.kucoin.com'
def get_headers(method, endpoint):
now = int(time.time() * 1000)
str_to_sign = str(now) + method + endpoint
signature = base64.b64encode(hmac.new(api_secret.encode(), str_to_sign.encode(), hashlib.sha256).digest()).decode()
passphrase = base64.b64encode(hmac.new(api_secret.encode(), api_passphrase.encode(), hashlib.sha256).digest()).decode()
return {'KC-API-KEY': api_key,
'KC-API-KEY-VERSION': '2',
'KC-API-PASSPHRASE': passphrase,
'KC-API-SIGN': signature,
'KC-API-TIMESTAMP': str(now)}

req = requests.post(f"{base_uri}/api/v1/bullet-private",headers=get_headers("POST","/api/v1/bullet-private"))
print(req.json())
return(req.json())

def RunKucoinSocket(account):        
def connect_websocket(account):
now = str(int(time.time() * 1000))
Data = get_token(account['apikey'],account['apisecret'],account['passphrase'])['data']
server = Data['instanceServers'][0]
ws = websocket.WebSocketApp(f"{server['endpoint']}?token={Data['token']}&connectId={now}",
on_message = on_message,
on_error = on_error,
on_close = on_close,
on_open = on_open,
on_ping = on_ping,
on_pong = on_pong
)
ws.run_forever(ping_interval=15,ping_timeout=10)
def on_open(ws):
print('Opened Connection')
params = {"type": "subscribe", "topic": "/contractMarket/tradeOrders", "response": True, "privateChannel":True}
ws.send(json.dumps(params))
def on_close(ws,closemsg,closeerror):
print('Closed Connection')
def on_ping(ws, message):
pass
def on_pong(ws, message):
pass
def on_message(ws, message):
print(message)
def on_error(ws, err):
print("Got a an error: ", err)

connect_websocket(account)
if __name__ == "__main__":
data = {
"username":"Nate",
"exchange":"Kucoin",
"apikey":'KEY',
"apisecret":"SECRET",
"passphrase":"password"
}
RunKucoinSocket(data)
time.sleep(1000)

我希望这有帮助-它的格式很奇怪,因为我用它来缩放,但它应该做你想做的。

只需将此项(push1-v2.kucoin.net(更改为(ws-api.kucoin.com(即可

相关内容

  • 没有找到相关文章

最新更新