Sunday, February 26, 2017

Rate Limiter with RxJS

Use Subjects and Ovservables to create a stream of operations that get executed at a set rate. Subjects allow you to create an entry point to the stream and insert new items. An Observable Interval and a zip operator will give a set interval to move items down the stream. We can use a map/flatmap/concatmap operator to do a request or operation. Finally, a share will allow the stream to be sent to different observable streams based on conditions from a filter.

function foo()
{
  let itemCount = 0;
  let rateInMs = 500;

  // Only emits interval when items are available
  let rateInterval = Rx.Observable
                       .interval(rateInMs)  
                       .filter(x=>intemCount > 0);
  let itemsSubject = new Rx.Subject();
  // Zip the itemsSubject and rateInterval to move 
  // items down the stream at a set rate
  let rateLimStream = Rx.Observable
                        .zip(rateInterval, itemsSubject)
                        .map(x=>{
                          // Do something that returns 
                          // a promise/observable
                        })
                        .share();
  let successStream = rateLimStream.filter(x=>!x.error);
  let errorStream = rateLimStream.filter(x=>x.error);

  successStream.subscribe(x=>{
    itemCount--;
    // Do something with them
  });

  errorStream.subscribe(x=>{
  itemCount--;
  // Do something with them
  });
Now all that would have to be done is expose itemsSubject somewhere, possibly through module.exports so that you may call itemsSubject.next(item).

Be careful though, items that throw errors during an operation will cause the stream to stop and subsequent calls to itemsSubject.next will not cause anything to happen. To avoid this, pass on custom objects instead of a Promise.reject or an Observer.error.