1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | function foo() { let itemCount = 0; let rateInMs = 500; // Only emits interval when items are available let rateInterval = Rx.Observable .interval(rateInMs) .filter(x=>intemCount > 0); let itemsSubject = new Rx.Subject(); // Zip the itemsSubject and rateInterval to move // items down the stream at a set rate let rateLimStream = Rx.Observable .zip(rateInterval, itemsSubject) .map(x=>{ // Do something that returns // a promise/observable }) .share(); let successStream = rateLimStream.filter(x=>!x.error); let errorStream = rateLimStream.filter(x=>x.error); successStream.subscribe(x=>{ itemCount--; // Do something with them }); errorStream.subscribe(x=>{ itemCount--; // Do something with them }); |
Be careful though, items that throw errors during an operation will cause the stream to stop and subsequent calls to itemsSubject.next will not cause anything to happen. To avoid this, pass on custom objects instead of a Promise.reject or an Observer.error.
No comments:
Post a Comment