3 回答
TA贡献1853条经验 获得超6个赞
您可以debounceTime使用switchMap和进行模拟delay。然后取消内部 ObservabletakeUntil以防止发出等待值。
private updateSubject = new Subject<string>();
private interrupt = new Subject();
ngOnInit() {
this.updateSubject.pipe(
switchMap(val => of(val).pipe(
delay(3000),
takeUntil(this.interrupt)
))
).subscribe(val => publish(val));
}
doChange(val: string) {
this.updateSubject.next(val);
}
doImmediateChange(val: string) {
this.interrupt.next();
publish(val);
}
https://stackblitz.com/edit/rxjs-ya93fb
TA贡献1820条经验 获得超10个赞
使用竞赛运算符:
第一个完成的observable成为唯一订阅的observable,因此这个递归函数将在一次发射后完成take(1),然后重新订阅() => this.raceRecursive()。
private timed$ = new Subject<string>();
private event$ = new Subject<string>();
ngOnInit() {
this.raceRecursive()
}
raceRecursive() {
race(
this.timed$.pipe(debounceTime(1000)),
this.event$
)
.pipe(take(1)) // force it to complete
.subscribe(
val => console.log(val), // srv call here
err => console.error(err),
() => this.raceRecursive() // reset it once complete
)
}
doChange(val: string) {
this.timed$.next(val)
}
doImmediateChange(val: string) {
this.event$.next(val)
}
TA贡献1842条经验 获得超21个赞
您可以使用debounce和race实现此行为:
使用您提供的代码
private destroy$ = new Subject<void>();
private immediate$ = new Subject<void>();
private updateSubject$ = new Subject<string>();
constructor(private srv: PubSubService) {}
ngOnInit() {
this.updateSubject$.pipe(
takeUntil(this.destroy$),
debounce(() => race(timer(3000), this.immediate$))
).subscribe(val => {
this.srv.publishChange(val);
});
}
doChange(val: string, immediate?: boolean) {
this.updateSubject$.next(val);
if (immediate) this.immediate$.next();
}
// don't forget to unsubscribe
ngOnDestroy() {
this.destroy$.next();
}
立即发出更改将替换之前的正常更改(即去抖 3 秒),而不会延迟(感谢我们的种族可观察)。
添加回答
举报