问题1:如何实现相同的行为?但不是Observable.interval
它将由回调调用。
例如:
我有5000ms
间隔,但我的服务器非常慢,5000ms
后它没有返回结果。但是下一个调用是在 5000ms
之后调用的。我不希望这样。我希望在结果从服务器返回后,它将调用下一个调用。
问题2:
如何在不逐个创建多个文件的情况下立即将结果流式传输到csv文件。对于当前的实现,我使用FileSaver
,它可以在IE11
中工作。我想继续使用它。 有没有办法将数据流式传输到文件而不是将其收集到数组中,因为我的数据集很大。比如 1m 行等等... 例:
const progress = Observable.interval(1000)
.switchMap(() => this.messageService.getResults(query))
.map(messageResult => messageResult)
.subscribe((data: MessagesResult) => {
inProcess = true;
if (!data.isMoreResults && data.auditMessageList.length === 0) {
this.fileSaver.save(`result.csv`, csvData);
inProcess = false;
this.logger.info('Download file finished...');
progress.unsubscribe();
}
const start = this.filterModel.offset * this.filterModel.limit;
const rows = [...csvData];
rows.splice(start, 0, ...data.auditMessageList);
csvData = rows;
if (inProcess) {
this.logger.info('Exporting in progress...');
}
query.offset++;
}, error => this.logger.error(error));
}
正如您已经发现的那样,使用 Observable.interval
不会"等待"流的其余部分。
我通常将repeatWhen
与delay
一起使用
const progress = Observable.defer(() => this.messageService.getResults(query))
.repeatWhen(notifications => notifications.delay(1000))
...
这是工作示例:https://jsfiddle.net/a0rz6nLv/19/
我不太了解你们其他人的代码。
不要在方法中使用progress.unsubscribe();
subscribe
。相反,请考虑使用 takeWhile 或 takeUntil - 两者都将为您完成可观察量。
.takeWhile(data => data.isMoreResults data.auditMessageList.length > 0)
缓冲结果也可以通过使用reduce或toArray来完成
.reduce((accumulator, data) => data.auditMessageList.concat(accumulator), [])
副作用最好由do操作员处理
.do({
next: () => {
inProgress = true;
this.logger.info('Exporting in progress...');
},
complete: () => {
inProgress = false;
this.logger.info('Download file finished...');
}
})
关于第二个问题 - 我不知道 - 您应该能够从服务器流式传输csv。如果您无法修改服务器,也许其他人会知道如何在客户端上执行此操作......
问题 1
下面是一个实现函数的示例,该函数在获得响应时调用自身。
后端:
- 模拟在 5 秒和 10 秒内响应的慢速后端
- 在每个响应中,服务器都会给出当前
request_number
和state
- 对于 3 个第一个响应,
state
是active
,之后,state
是closed
法典:
/* Mocked backend. I'm slow, like really slow */
class SlowBackend {
MAX_ITERATIONS = 3; // suppose you're reading a table and you have pagination, with 3 pages
currentIteration = 0;
constructor() {}
getStuff() {
console.log(`**Request N. ${this.currentIteration}**n[Back] : received a request from the front`);
const responseDelay = Math.random() * 5000 + 5000; // response between 5s and 10s
let state = "open";
if(++this.currentIteration > this.MAX_ITERATIONS)
state = "closed";
return Observable
.timer(responseDelay)
.map( () => {
console.log(`[Back] : Responding after ${responseDelay} ms`)
return {
request_number : this.currentIteration,
state : state
};
})
}
}
前面:
这基本上是您的组件。
class Frontend {
isPollingActivated = true;
responses = [];
constructor(private backendService) {
this.backendService = new SlowBackend(); // connection to backend
this.requestOnRegularBasis();
}
requestOnRegularBasis() {
if (!this.isPollingActivated)
return;
this.backendService.getStuff()
.subscribe(response => {
console.log(`[Front] : received response from server. State : ${response.state}`);
// Choose one of the following blocks, comment the other according to what you need
// Block 1 : Sync processing example
console.log(`[Front] : doing some sync processing`);
this.doSomeSyncProcessing(response);
this.requestOnRegularBasis();
// Block 2 : Async processing example
// console.log(`[Front] : doing some async processing`);
// this.doSomeAsyncProcessing(response)
// .subscribe(this.requestOnRegularBasis);
})
}
private doSomeSyncProcessing(response){
if(response.state == 'closed'){
this.isPollingActivated = false; // stop polling
this.saveDataToCsv();
}
else
this.responses.push(Object.values(response).join(';')) // csv line separated by ';'
}
private saveDataToCsv(){
const headers = ['current_request;state']
this.responses = headers.concat(this.responses)
console.log('saving to csv : ', this.responses.join('n'));
// Uncomment this to use FileSaver API
/*
const blob = new Blob(headers.concat(this.responses), {type: "text/csv;charset=utf-8"});
saveAs(blob, "my_responses.csv");*
*/
}
private doSomeAsyncProcessing(response){
return Observable.timer(1000).map(() => this.doSomeSyncProcessing(response));
}
}
输出:
**Request N. 0**
[Back] : received a request from the front
[Back] : Responding after 5482 ms
[Front] : received response from server. State : open
[Front] : doing some sync processing
**Request N. 1**
[Back] : received a request from the front
[Back] : Responding after 7489 ms
[Front] : received response from server. State : open
[Front] : doing some sync processing
**Request N. 2**
[Back] : received a request from the front
[Back] : Responding after 9627 ms
[Front] : received response from server. State : open
[Front] : doing some sync processing
**Request N. 3**
[Back] : received a request from the front
[Back] : Responding after 5806 ms
[Front] : received response from server. State : closed
[Front] : doing some sync processing
saving to csv :
current_request;state
1;open
2;open
3;open
问题2
你不能。
至少不使用FileSaver
.因为它不支持逐块写入。当你实例化你的Blob
时,你必须 准备好所有数据。 有一些库支持块,但它们要么用于服务器端(例如节点.js要么非常特定于浏览器。
检查这个:将客户端生成的数据保存为JavaScript中的文件,以块为单位
注意:
如果您尝试使用 js 在客户端计算机中存储 1M 行 csv,那么架构可能有问题。 因为这不是浏览器的常见用例。客户端应该有弱机器,因此接收处理, 轻巧,易于解析信息。就此而言,例如,您可以在服务器端构造csv,这将 拥有写入流文件的所有权限,以及体面的处理/内存容量。
演示 : 问题 1
http://jsbin.com/rojutudayu/2/edit?html,js,console
演示 : 如何下载 blob ?
<script src="https://cdn.rawgit.com/eligrey/FileSaver.js/e9d941381475b5df8b7d7691013401e171014e89/FileSaver.min.js"> </script>
<script>
var blob = new Blob(["Hello, world!"], {type: "text/plain;charset=utf-8"});
saveAs(blob, "hello world.txt");
</script>
问题 1:
使用forkJoin
.它将等待所有可观察量完成。 当你与delay(5000)
结合时,最短时间为5s。如果在 5 秒之前没有返回 API 响应,它仍然会等到返回结果(演示(
const stream1$ = of(1).pipe(
delay(5000)
);
const intervalTime = Math.random() * 5000 + 5000
// replace with your API stream
const stream2$ = of(intervalTime).pipe(
delay(intervalTime)
);
forkJoin(stream1$, stream2$)
.subscribe(([_, s2]) => {
console.log(s2);
})
问题2:
如果文件很大,则应让 Web 浏览器处理它。最好将文件保存在服务器中,然后返回下载链接。对于小文件,性能不是问题。您可以将文件数据存储在RAM中,然后保存文件一次。
编辑:如果文件很大,开发人员建议使用StreamSaver。你应该看看它
StreamSaver.js采用了不同的方法。现在,您实际上可以直接创建可写流到文件系统,而不是将数据保存在客户端存储或内存中(我不是在谈论chromes沙盒文件系统(
StreamSaver.js 是在客户端保存流的解决方案。它非常适合需要保存在客户端创建的大量数据的 Web 应用程序,其中 RAM 非常有限,例如在移动设备上。