Fastflux Home Reference Source Repository

src/core/observable/base.js

import ObservableState from './state';
import Subscription from './subscription';

/**
 * @example
 * let ready = new Observable;
 * ready.subscribe(data => console.log("Received:", data));
 *
 * ready.emit("Foo");
 * //> Received: Foo
 */
export default class Observable<T> {

  _listeners = [];

  /**
   * Register listener
   * @param {function} listener
   * @throws {Error} if `listener` is already subscribed to this observable
   * @returns {Subscription}
   */
  subscribe(listener: (v: T) => void): void {
    if (this._listeners.indexOf(listener) === -1) {
      this._listeners.push(listener);
      return new Subscription(this, listener);
    }
    else throw new Error("subscribe: cannot register twice");
  }

  /**
   * True if `listener` has subscribed to `this` observable
   * @param {function} listener
   * @returns {boolean}
   */
  hasSubscribed(listener: (v: T) => void): boolean {
    return this._listeners.indexOf(listener) !== -1
  }

  /**
   * Unregister listener
   * @param {function} listener
   * @throws {Error} if `listener` is not registered
   */
  unsubscribe(listener: (v: T) => void): void {
    var idx = this._listeners.indexOf(listener);
    if (idx !== -1)
      this._listeners.splice(idx, 1);
    else
      throw new Error("unsubscribe: listener is not registered");
  }

  /**
   * Invoke each registered listener with `value`.
   * @param {Any} value - argument will be passed to listeners
   */
  emit(value: T): void {
    var len = this._listeners.length;
    for (var i = 0; i < len; i++) {
      this._listeners[i](value)
    }
  }

  /**
   * @example
   * let x = new Observable;
   * x.map(v => v * 2).subscribe(v => console.log("Value:", v));
   *
   * x.emit(10);
   * //> Value: 20
   *
   * @param {function(value: Any): Any} mapper - how to transform the emitted value
   * @returns {Observable}
   */
  map(mapper: (p: T) => T): Observable<T> {
    let mapped: Observable<T> = new Observable;
    this.subscribe(p => {
      mapped.emit(mapper(p));
    });
    return mapped
  }

  /**
   * @example
   * let numbers = new Observable;
   * let even = numbers.filter(v => v % 2 === 0);
   *
   * numbers.subscribe(v => console.log("Any:", v))
   * even.subscribe(v => console.log("Even:", v));
   *
   * numbers.emit(1);
   * //> Any: 1

   * numbers.emit(2);
   * //> Any: 2
   * //> Even: 2
   *
   * @param {function(value: Any): boolean} predicate
   * @returns {Observable}
   */
  filter(predicate: (p: T) => boolean): Observable<T> {
    let filtered: Observable<T> = new Observable;
    this.subscribe(p => {
      if (predicate(p)) filtered.emit(p)
    });
    return filtered
  }

  /**
   * @example
   * let emitter = new Observable;
   * let sum = emitter.reduce(0, (a, v) => a + v);
   *
   * sum.subscribe(s => console.log(s));
   *
   * emitter.emit(10);
   * //> 10
   * emitter.emit(15);
   * //> 25
   *
   * @param {Any} accumulator - initial state
   * @param {function(accumulator: Any, value: Any): Any} reducer
   * @returns {ObservableState}
   */
  reduce<R>(accumulator: R, reducer: (a: R, p: T) => R): ObservableState<R> {
    let reduced: ObservableState<R> = new ObservableState(accumulator);
    this.subscribe(p => {
      reduced.getState(a => {
        reduced.emit(reducer(a, p))
      })
    });
    return reduced
  }

}