如何使用HMAC SHA256加密和Base64编码使用websocket对OKEx API V5登录进行签名



尽管此问题在使用REST的OKEx API的早期版本中已得到回答,但在使用websocket的API的最新版本5中尚未得到回答。文档在这里。

我得到以下错误{"event":"error","msg":"Invalid sign","code":"60007"},所以签名字符串算法一定有问题,但我似乎无法确定我在哪里犯了错误。

import hmac
import json
import time
import hashlib
import asyncio
import websockets
passphrase = "XXXX"
secret_key = b"XXXX"
api_key = "XXXX"
timestamp = int(time.time())
print("timestamp: " + str(timestamp))
sign = str(timestamp) + 'GET' + '/users/self/verify'
total_params = bytes(sign, encoding= 'utf-8')
signature = hmac.new(bytes(secret_key, encoding= 'utf-8'), total_params, digestmod=hashlib.sha256).digest()
signature = base64.b64encode(signature)
print("signature = {0}".format(signature))
async def main():
msg = 
{
"op": "login",
"args": [
{
"apiKey": f'{api_key}',
"passphrase": f'{passphrase}',
"timestamp": f'{timestamp}',
"sign": f'{signature}'
}
]
}
async with websockets.connect('wss://wspap.okx.com:8443/ws/v5/private?brokerId=9999') as websocket:
print(msg)
await websocket.send(json.dumps(msg))
response = await websocket.recv()
print(response)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

我想明白了。在发送签名之前,需要将其从字节字符串变量转换回字符串变量。

添加以下代码行可以做到这一点:signature = str(signature, 'utf-8')

import json
import time
import hmac
import hashlib
import base64
import asyncio
import websockets
passphrase = "XXXX"
secret_key = "XXXX"
api_key = "XXXX"
timestamp = int(time.time())
print("timestamp: " + str(timestamp))
sign = str(timestamp) + 'GET' + '/users/self/verify'
total_params = bytes(sign, encoding= 'utf-8')
signature = hmac.new(bytes(secret_key, encoding= 'utf-8'), total_params, digestmod=hashlib.sha256).digest()
signature = base64.b64encode(signature)
signature = str(signature, 'utf-8')
print("signature = {0}".format(signature))
async def main():
msg = 
{
"op": "login",
"args": [
{
"apiKey": f'{api_key}',
"passphrase": f'{passphrase}',
"timestamp": f'{timestamp}',
"sign": f'{signature}'
}
]
}
async with websockets.connect('wss://wspap.okx.com:8443/ws/v5/private?brokerId=9999') as websocket:
print(msg)
await websocket.send(json.dumps(msg))
response = await websocket.recv()
print(response)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

最新更新