Source: lib/collection.js

/**
 * @description
 * A Collection of caches designed to allow easier management of several
 * caches by the server.
 */


"use strict";


// exporting the Constructor
exports = module.exports = Collection;
exports.Collection = Collection;


// npm-installed modules
var _ = require("lodash");
var async = require("async");
var debug = require("debug")("ss-interface:collection");


// own modules
var defaults = require("./defaults");


/**
 * @callback SwitchFunction
 * @param {Object} message - data/message payload
 * @return {Choice} choice - determined choice
 */


/**
 * @callback PopulateFunction
 * @param {String} id - id of the cache. Note that this will always be a
 *   string even if the original ID was a Number
 * @param {Cache} cache - the server cache
 * @param {Function} next - function that MUST be called to indicate completion
 *   of cache population
 */


/**
 * Required return value from a {@link switchFunction}
 * @typedef {Object} Choice
 * @property {Number} choice.cacheId - id of target cache
 * @property {Number} choice.id - id of the message itself
 * @property {Object} choice.data - message that will be inserted into cache
 */


/**
 * Create a new collection
 *
 * @constructor
 * @public
 * @param {Object} [configurations] - configurations for collection
 * @param {Number} [configurations.refreshInterval] - refresh interval
 * @return {Collection}
 */
function Collection(configurations) {
    this._configurations = configurations || {};
    _.defaultsDeep(this._configurations, {
        refreshInterval: defaults.cache_refresh_interval,
    });
    this._caches = { };
    this._sources = [ ];
    this._chooser = function() { throw new Error("no chooser function added"); };
    this._refreshing = false;
    this._refreshInterval = null;
}


/**
 * Add a cache
 *
 * @public
 *
 * @param {Number} id - id of cache
 * @param {cache.Server} cache - server's cache client
 * @param {PopulateFunction} populate - function called to populate cache
 * @return {this} for chaining
 */
Collection.prototype.addCache = function(id, cache, populate) {
    this._caches[id] = {
        cache: cache,
        populate: populate,
    };
    return this;
};


/**
 * Return a reference to a cache. Returns 'undefined' if cache is not found.
 *
 * @public
 *
 * @param {Number} id - id of the cache
 * @return {cache.Server|undefined}
 */
Collection.prototype.getCache = function getCache(id) {
    var c = this._caches[id];
    return c ? c.cache : c;
}


/**
 * Add function for deciding which cache is to be used
 *
 * If the function returns `null`, or the id it returns does
 * not resolve to a cache, the message is ignored.
 *
 * @public
 *
 * @param {SwitchFunction} choose - choose function
 * @return {this} for chaining
 */
Collection.prototype.switch = function(choose) {
    this._chooser = choose;
    return this;
};


/**
 * Add a source of data.
 *
 * While it might seem too much, I shall add option to add more than one
 * source of data. This might be useful if more sources would ensure more
 * fault tolerance or we actually have different sources.
 *
 * If a cache can not be resolved, the message is ignored.
 * If an error occurs while adding an item to cache, it is ignored.
 *
 * @public
 *
 * @param {EventEmitter} source - an Event Emitter
 * @return {this} for chaining
 */
Collection.prototype.addSource = function(source) {
    var self = this;
    source.on("message", function(data) {
        // do not insert an item, if we are in a refresh process
        if (self._refreshing) {
            return;
        }

        debug("new items from source");
        if (_.isArray(data)) {
            data.forEach(function(item) {
                putInItem(item);
            });
        } else {
            putInItem(data);
        }
    });

    function putInItem(item) {
        try {
            if (!_.isPlainObject(item)) item = JSON.parse(item);
        } catch(parseErr) {
            // ignore the message, if we can not parse it
            debug("could not parse message into object: %s", item);
            return;
        }
        var det = self._chooser(item);
        if (!det) {
            return;
        }
        var c = self._caches[det.cacheId];
        if (c) {
            c.cache.addOne(det.id, det.data);
        }
    }

    this._sources.push(source);
    return this;
};


/**
 * Start the refresh interval
 *
 * @return {this}
 */
Collection.prototype.startRefreshInterval = function() {
    debug("starting refresh interval");
    var self = this;
    var funcs = {};

    for (var cacheId in self._caches) {
        funcs[cacheId] = (function(id) {
            return function(done) {
                var c = self._caches[id];
                c.populate(id, c.cache, done);
            };
        })(cacheId);
    }

    refresh();
    self._refreshInterval = setInterval(refresh, self._configurations.refreshInterval);

    function refresh() {
        if (self._refreshing) {
            return null;
        }

        self._refreshing = true;
        debug("## CACHE_REFRESH at %s", Date.now());
        return async.parallel(funcs, function(err) {
            if (err) {
                debug("error occurred during cache refresh: %s", err);
            }
            self._refreshing = false;
        });
    }

    return self;
};


/**
 * Stop the refresh interval
 *
 * @return {this}
 */
Collection.prototype.stopRefreshInterval = function() {
    clearInterval(this._refreshInterval);
    return this;
};