Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,34 +135,44 @@ module.exports = {
return process.getLogger(__filename);
},

// this will return a promise, which will be resolved after getting two available ports
'enable': _.once(function(options, emitter){

options = options || {};
emitter = emitter || require('cluster-emitter');
emitter = require('./lib/utils').decorateEmitter(emitter);
//var tillCacheServerStarted = when.defer();

emitter.once('CLUSTER-ENABLE-CACHE', function(options){

var master = options.master,
mode = options.mode || 'master';

if(!master || mode === 'master'){
var mgr = require('./lib/cache-mgr'),
svr = mgr.createServer(mgr.app);

svr.listen(mgr.port, mgr.afterServerStarted);
process.env.CACHE_BASE_PORT = master ? master.port + 10 : 9190;
var mgr = require('./lib/cache-mgr');
mgr.configApp(mgr.app).then(function (ports) {
mgr.port = ports;
var svr = mgr.createServer(mgr.app);
process.cacheServer = svr;

svr.listen(ports, mgr.afterServerStarted);
}, function (error) {
logger.error('[cache] start server error %j', error);
});
}
else{
//it's master's job to fork another worker specificly as the cache manager
master.fork(master.options, {
'CACHE_MANAGER': true
'CACHE_MANAGER': true,
'CACHE_BASE_PORT': master.port + 10
});
}
});

//if it's none cluster mode, above registered cache initialization will take effect
//otherwise, in cluster mode, master will be responsible instead
emitter.to(['master']).emit('CLUSTER-ENABLE-CACHE', options);
//return tillCacheServerStarted.promise;
}),

'use': function(namespace, options){
Expand All @@ -180,10 +190,10 @@ module.exports = {

return new Cache(namespace, pipeline([

function(domain){
function(ports){

logger.debug('[cache] using domain:%s', domain);
return _this.user.use(domain);
logger.debug('[cache] using ports: %j', ports);
return _this.user.use(ports);
},

function(usr){
Expand All @@ -206,7 +216,7 @@ module.exports = {
'meta': meta
};
}
], actualOptions.domain));//optional
], actualOptions.ports));//optional
},

'manager': require('./lib/cache-mgr'),
Expand Down
173 changes: 66 additions & 107 deletions lib/cache-mgr.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,20 @@
var common = require('./cache-common'),
_ = require('underscore'),
fs = require('graceful-fs'),
os = require('os'),
net = require('net'),
util = require('util'),
path = require('path'),
when = require('when'),
timeout = require('when/timeout'),
ensureDir = require('./utils').ensureDir,
pickAvailablePorts = require('./utils').pickAvailablePorts,
logger = require('./utils').logger;

var success = common.status.success,
failure = common.status.failure,
NS = common.types.NS,
GET = common.types.GET,
SET = common.types.SET,
DEL = common.types.DEL,
ALL = common.types.ALL,
LOCK = common.types.LOCK,
INSPECT = common.types.INSPECT,
PING = common.types.PING,
PONG = common.types.PONG,
CHN = common.changeToken,
domain = '/tmp/cache.socket.' + process.pid,
domainPath = common.domainPath,
persistPath = common.persistPath,
ports = null,
namespaces = {
'': { //meta namespace
'': { //meta of meta namespace itself
Expand All @@ -47,36 +37,31 @@ var success = common.status.success,
'persist': false,
'expire': 0
};//default meta
},
conns = [];
};

var manager = {

'domain': domain,
'ports': ports,

'ns': function(conn, token){
'ns': function(reply){

common.write(conn, common.serialize({
'type': NS,
'token': token,
reply({
'namespaces': _.keys(namespaces),
'status': success
}));
});
},

'all': function(conn, token, namespace){
'all': function(reply, namespace){

var cache = namespaces[namespace] || {};

common.write(conn, common.serialize({
'type': ALL,
'token': token,
reply({
'keys': _.keys(cache),
'status': success
}));
});
},

'lock': function(conn, token, namespace, key, value){
'lock': function(reply, namespace, key, value){

var cache = namespaces[namespace] = namespaces[namespace] || {},
write = cache[key] === undefined;
Expand All @@ -87,15 +72,13 @@ var manager = {
};
}

common.write(conn, common.serialize({
'type': LOCK,
'token': token,
reply({
'key': key,
'status': write ? success: failure
}));
});
},

'set': function(conn, token, namespace, key, value, overwrite){
'set': function(reply, namespace, key, value, overwrite){

var cache = namespaces[namespace] = namespaces[namespace] || {},
meta = metaOfNs(namespace),
Expand All @@ -110,12 +93,10 @@ var manager = {
}
: cache[key];

common.write(conn, common.serialize({
'type': SET,
'token': token,
reply({
'key': key,
'status': write ? success: failure
}));
});

if(write){

Expand All @@ -127,16 +108,14 @@ var manager = {

'notify': function(namespace, key, value){

var notify = common.serialize({
var notify = {
'token': CHN,
'ns': namespace,
'key': key,
'value': value
});
};

_.each(conns, function(c){
common.write(c, notify);
});
this.cacheSocket.send(notify);

if(namespace === ''
&& value === undefined
Expand All @@ -146,40 +125,36 @@ var manager = {
}
},

'get': function(conn, token, namespace, key, value){
'get': function(reply, namespace, key, value){

var cache = namespaces[namespace] || {},
entry = cache[key] || {},
result = {
'type': GET,
'token': token,
'key': key,
'value': entry.value || value,//default
'status': success
};

common.write(conn, common.serialize(result));
reply(result);
},

'ins': function(conn, token, namespace, key){
'ins': function(reply, namespace, key){

var cache = namespaces[namespace] || {},
meta = metaOfNs(namespace),
entry = cache[key],
result = {
'type': INSPECT,
'token': token,
'key': key,
'value': entry ? entry.value : null,
'persist': meta.persist,
'expire': meta.expire && entry ? Date.now() - entry.expire : meta.expire,
'status': success
};

common.write(conn, common.serialize(result));
reply(result);
},

'del': function(conn, token, namespace, key){
'del': function(reply, namespace, key){

var cache = namespaces[namespace] || {},
entry = cache[key] || {},
Expand All @@ -189,83 +164,67 @@ var manager = {

metaOfNs(namespace).lastModified = Date.now();

common.write(conn, common.serialize({
'type': DEL,
'token': token,
reply({
'key': key,
'value': value,
'status': success
}));
});

manager.notify(namespace, key, undefined);
},

'ping': function(conn, token){
'ping': function(){

common.write(conn, common.serialize({
'type': PING,
this.cacheSocket.send({
'token': PONG
}));
});
},

'pong': function(conn, token){
'pong': function(reply){
//nothing
}
};

module.exports = {

'createServer': net.createServer,

'app': function(conn) { //'connection' listener

conns.push(conn);

var buff = '';

conn.setEncoding('utf-8');
conn.counter = 0;

conn.on('data', function(data){

buff += data;

var packs = buff.split('\r\n');

if(packs.length > 1){
buff = packs.pop();//whatever the last element is, could be empty string, or partial data, wait till next data arrives.

_.each(packs, function(pack){

conn.counter += 1;

var command = common.deserialize(pack);

manager[command.type].apply(manager, [
conn, command.token, command.ns, command.key, command.value, !command.leaveIfNonNull
]);
});
}//otherwise keep buffering...
});

var keepAlive = setInterval(function(){
//this is to keep the connection open, just to send PING, and receive PONG, and will be extended to validate the health of the connection later
manager.ping(conn, PONG);

}, 3000);//ping/pong every 3secs

conn.once('close', function(){

logger().info('connection closed who has received:%d', conn.counter);
'configApp': function configApp(app) {
var basePort = parseInt(process.env.CACHE_BASE_PORT) || 9190;
return pickAvailablePorts(basePort, basePort + 10, 2).then(function (ports) {
logger().info('[cache-mgr] picked up ports %j', ports);
manager.ports = ports;
return manager.ports;
}, function (error) {
logger().error(error);
});
},

conns = _.without(conns, conn);
conn.destroy();
'createServer': function createServer(app) {
return app;
},

clearInterval(keepAlive);
});
},
'app': (function () { //'connection' listener
manager.cacheSocket = require('./cache-socket/cache-socket-factory').getCacheSocket('manager', 'json');

manager.cacheSocket.on('message', function (command, reply) {
manager[command.type].apply(manager, [
reply, command.ns, command.key, command.value, !command.leaveIfNonNull
]);
});

var keepAlive = setInterval(function(){
//this is to keep the connection open, just to send PING, and receive PONG, and will be extended to validate the health of the connection later
manager.ping(PONG);
}, 3000);

manager.cacheSocket.once('close', function () {
logger().info('[cache-mgr] cacheSocket closed');
clearInterval(keepAlive);
});

'port': domain,
return manager.cacheSocket;
})(),

'port': manager.ports,

'afterServerStarted': function() { //'listening' listener

Expand Down Expand Up @@ -309,7 +268,7 @@ module.exports = {
logger().info('[cache] manager loaded from all persistences');

//register domain to where all other users and the master process could watch
fs.writeFileSync(domainPath, domain);
fs.writeFileSync(domainPath, JSON.stringify(manager.ports));

var nextUpdate = null,
updateTask = function updateTask(){
Expand Down
Loading