从 HTTP 响应流流式传输到 Azure 存储 Blob



摘要

我需要从 Azure 函数(无服务器)从文本到语音转换 (tts) 流式传输到 Azure 存储 Blob。代码有效,但未按我预期的顺序完成。

功能说明

该代码通过流式处理将文本转换为音频 (.mp3) 文件。文本到语音转换 (tts) 转换是通过处理流的TextToSpeech类中的 REST API 进行的。tts 调用将音频流写入可写流 - 即 Azure 存储 blob。

期望

我认为对创建 blob 的函数的调用应该在 blob 完成写入后完成,但事实并非如此。这段代码是否正确,如果不正确,问题出在哪里?

我希望输出为:

  • 结束! - 来自 tts
  • 斑点结果 - 来自 fn
  • 然后 - 从代码调用 fn

收到

接收的输出(如底部所示)为:

  • 然后
  • 斑点结果
  • 结束!

无服务器函数的代码:

require('dotenv').config();
const TextToSpeech = require("./textToSpeech");
const azure = require('azure-storage');
const fn = async () => {
const blobService = azure.createBlobService(process.env.STORAGECONNECTIONSTRING);
const textToSpeech = new TextToSpeech({
accessTokenHost: process.env.SPEECHACCESSTOKENHOST,
ttsHost: process.env.SPEECHRESOURCETTSHOST,
ttsKey: process.env.SPEECHRESOURCETTSKEY
});
const userName = "diberry";
const container = "function-blob-tts";
const directory = userName;
const transformConfig = {"filenameandpath": 'test.mp3'};
const blobName = directory + "/" + transformConfig.filenameandpath;
// DOCS: https://azure.github.io/azure-storage-node/BlobService.html#createWriteStreamToBlockBlob__anchor
const writableStream = blobService.createWriteStreamToBlockBlob(container, blobName, { blockIdPrefix: 'block' });
await textToSpeech.transform(transformConfig, "This is a brand new world.", writableStream);
// blob properties
return await blobService.getBlobProperties(container, blobName, (err, results)=>{
if (err) throw err;
console.log(results);
if (results) return results;
});
}

调用无服务器 fn 的代码:

fn().then(results => {
console.log("then");
}).catch(err => {
console.log("err received");
console.log(err);
})

文字转语音类:

const rp = require('requestretry');
class TextToSpeech {
/**
*
* @param config - {key:"",endpoint:""}
*/
constructor(config) {
this.config = config;
this.delayMS = 500;
this.retry = 5;
}
// retry request if error or 429 received
retryStrategy(err, response) {
let shouldRetry = err || response.statusCode === 429;
return shouldRetry;
};
// Gets an access token.
async getAccessToken() {
const options = {
method: 'POST',
uri: `https://${this.config.accessTokenHost}/sts/v1.0/issueToken`,
headers: {
'Ocp-Apim-Subscription-Key': this.config.ttsKey,
},
};
const response = await rp(options);
return response.body;
};
// Make sure to update User-Agent with the name of your resource.
// You can also change the voice and output formats. See:
// https://learn.microsoft.com/azure/cognitive-services/speech-service/language-support#text-to-speech
/**
*
* @param accessToken - good for 10 minutes, used immediately
* @param transformConfig - ttsConfigs
* @param text
* @param writableStream
*/
async textToSpeech(accessToken, transformConfig, text, writableStream) {
try {
transformConfig.selectedVoice = {
gender: 'female',
locale: 'en-us',
code: 'Jessa24KRUS',
};
// Create the SSML request.
let body = `<?xml version="1.0"?><speak version="1.0" xml:lang="en-us"><voice xml:lang="en-us" name="Microsoft Server Speech Text to Speech Voice (${transformConfig.selectedVoice.locale}, ${transformConfig.selectedVoice.code})"><prosody rate="-20.00%">${text}</prosody></voice></speak>`;
let options = {
method: 'POST',
baseUrl: `https://${this.config.ttsHost}/`,
url: '/cognitiveservices/v1',
headers: {
Authorization: 'Bearer ' + accessToken,
'cache-control': 'no-cache',
'User-Agent': 'YOUR_RESOURCE_NAME',
'X-Microsoft-OutputFormat': 'audio-24khz-48kbitrate-mono-mp3',
'Content-Type': 'application/ssml+xml',
},
//timeout: 120000,
body: body,
maxAttempts: this.retry,
retryDelay: this.delayMS,
retryStrategy: this.retryStrategy,
};
// request has binary audio file
await rp(options)
.on('response', async (response) => {
if (response.statusCode === 200) {
writableStream.on('finish', () => {
console.log('The end!');
});
response.pipe(writableStream);
} else {
throw Error('Response statusCode ' + response.statusCode);
}
})
.on('error', err => {
throw err;
});
} catch (err) {
throw err;
}
}
/**
*
* @param transformConfig
* @param text
*/
async transform(transformConfig, text, writableStream) {
try {
// get token - access token is good for 10 minutes
const accessToken = await this.getAccessToken();
// get binary and return in in/out writableStream
await this.textToSpeech(accessToken, transformConfig, text, writableStream);
} catch (err) {
throw err;
}
}
}

module.exports = TextToSpeech;

从控制台输出

从控制台运行时:

then
BlobResult {
container: 'function-blob-tts',
name: 'diberry/test.mp3',
metadata: {},
lastModified: 'Sun, 25 Aug 2019 13:06:25 GMT',
creationTime: 'Sun, 25 Aug 2019 12:38:50 GMT',
etag: '"0x8D7295D08E34C0E"',
blobType: 'BlockBlob',
contentLength: '19008',
serverEncrypted: 'true',
requestId: 'caa7abc9-701e-00ff-0b47-5b694c000000',
contentSettings:
{ contentType: 'application/octet-stream',
contentMD5: 'FN99sCq5XC3DOnAucPHtCA==' },
lease: { status: 'unlocked', state: 'available' } }
The end!

必须更改代码以使订单按我想要的方式工作。

转换不是必需的 - 我只是想看看发生了什么。

可写流的管道位于承诺内,因此会保持引擎滴答声而不是继续。转换将流的处理显示为同步操作。

作为回调的 blob 属性也弄乱了即时报价,因此将其转换为承诺 - 不要混合回调和承诺。

无服务器函数的代码

require('dotenv').config();
const TextToSpeech = require("./textToSpeech");
const azure = require('azure-storage');
const getBlobProperties = async(blobService, container, blobName) => {
return new Promise((resolve, reject) => {
try {
// blob properties
blobService.getBlobProperties(container, blobName, (err, results)=>{
if (err) throw err;
console.log(`getBlobProperties - ${JSON.stringify(results)}`);
if (results) {
console.log(`getBlobProperties - done`);
resolve(results);
}
});
} catch (err) {
reject(err);
}
});
}
const fn = async () => {
try{
const blobService = azure.createBlobService(process.env.STORAGECONNECTIONSTRING);
const textToSpeech = new TextToSpeech({
accessTokenHost: process.env.SPEECHACCESSTOKENHOST,
ttsHost: process.env.SPEECHRESOURCETTSHOST,
ttsKey: process.env.SPEECHRESOURCETTSKEY
});
const userName = "diberry";
const container = "function-blob-tts";
const directory = userName;
const transformConfig = {"filenameandpath": '6-test.mp3'};
const blobName = directory + "/" + transformConfig.filenameandpath;
// DOCS: https://azure.github.io/azure-storage-node/BlobService.html#createWriteStreamToBlockBlob__anchor
const writableStream = blobService.createWriteStreamToBlockBlob(container, blobName, { blockIdPrefix: 'block' });
await textToSpeech.transform(transformConfig, "This is a brand new world.", writableStream);
console.log(`N-2 textToSpeech.transform done`);
await getBlobProperties(blobService, container, blobName);
console.log(`N-1 blob properties done`);
}catch(err){
console.log(`function error - ${err}`);
}
}

调用无服务器 fn 的代码:

fn().then(results => {
console.log("N function done");
}).catch(err => {
console.log("function err received");
console.log(err);
})

文字转语音类:

const rp = require('requestretry');
class TextToSpeech {
/**
*
* @param config - {key:"",endpoint:""}
*/
constructor(config) {
this.config = config;
this.delayMS = 500;
this.retry = 5;
}
// retry request if error or 429 received
retryStrategy(err, response) {
let shouldRetry = err || response.statusCode === 429;
return shouldRetry;
};
// Gets an access token.
async getAccessToken() {
const options = {
method: 'POST',
uri: `https://${this.config.accessTokenHost}/sts/v1.0/issueToken`,
headers: {
'Ocp-Apim-Subscription-Key': this.config.ttsKey,
},
};
const response = await rp(options);
return response.body;
};
// Make sure to update User-Agent with the name of your resource.
// You can also change the voice and output formats. See:
// https://learn.microsoft.com/azure/cognitive-services/speech-service/language-support#text-to-speech
/**
*
* @param accessToken - good for 10 minutes, used immediately
* @param transformConfig - ttsConfigs
* @param text
* @param writableStream
*/
async textToSpeech(accessToken, transformConfig, text, writableStream) {
return new Promise((resolve, reject) => {
try {
transformConfig.selectedVoice = {
gender: 'female',
locale: 'en-us',
code: 'Jessa24KRUS',
};
// Create the SSML request.
let body = `<?xml version="1.0"?><speak version="1.0" xml:lang="en-us"><voice xml:lang="en-us" name="Microsoft Server Speech Text to Speech Voice (${transformConfig.selectedVoice.locale}, ${transformConfig.selectedVoice.code})"><prosody rate="-20.00%">${text}</prosody></voice></speak>`;
let options = {
method: 'POST',
baseUrl: `https://${this.config.ttsHost}/`,
url: '/cognitiveservices/v1',
headers: {
Authorization: 'Bearer ' + accessToken,
'cache-control': 'no-cache',
'User-Agent': 'YOUR_RESOURCE_NAME',
'X-Microsoft-OutputFormat': 'audio-24khz-48kbitrate-mono-mp3',
'Content-Type': 'application/ssml+xml',
},
//timeout: 120000,
body: body,
maxAttempts: this.retry,
retryDelay: this.delayMS,
retryStrategy: this.retryStrategy,
};
const { Transform } = require('stream');
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.');
callback(null, chunk);
}
});
// request has binary audio file
rp(options)
.pipe(reportProgress)
.pipe(writableStream)
.on('finish', () => {
console.log('Done');
resolve();
});

} catch (err) {
reject(err);
}
});
}

/**
*
* @param transformConfig
* @param text
*/
async transform(transformConfig, text, writableStream) {
try {
// get token - access token is good for 10 minutes
const accessToken = await this.getAccessToken();
// get binary and return in in/out writableStream
await this.textToSpeech(accessToken, transformConfig, text, writableStream);
console.log("transform done");
} catch (err) {
console.log(`transform error - ${err}`);
throw err;
}
}
}

module.exports = TextToSpeech;

相关内容

  • 没有找到相关文章

最新更新