function foo() { let itemCount = 0; let rateInMs = 500; // Only emits interval when items are available let rateInterval = Rx.Observable .interval(rateInMs) .filter(x=>intemCount > 0); let itemsSubject = new Rx.Subject(); // Zip the itemsSubject and rateInterval to move // items down the stream at a set rate let rateLimStream = Rx.Observable .zip(rateInterval, itemsSubject) .map(x=>{ // Do something that returns // a promise/observable }) .share(); let successStream = rateLimStream.filter(x=>!x.error); let errorStream = rateLimStream.filter(x=>x.error); successStream.subscribe(x=>{ itemCount--; // Do something with them }); errorStream.subscribe(x=>{ itemCount--; // Do something with them });Now all that would have to be done is expose itemsSubject somewhere, possibly through module.exports so that you may call itemsSubject.next(item).
Be careful though, items that throw errors during an operation will cause the stream to stop and subsequent calls to itemsSubject.next will not cause anything to happen. To avoid this, pass on custom objects instead of a Promise.reject or an Observer.error.
No comments:
Post a Comment