From 2b71cfdc32344d207abdf9b0526e1bf598b47458 Mon Sep 17 00:00:00 2001 From: zhuwang Date: Sun, 22 Dec 2013 21:46:45 -0800 Subject: [PATCH 1/2] Axon rewrite cache --- index.js | 27 ++++--- lib/cache-mgr.js | 135 ++++++++++++++++------------------- lib/cache-usr.js | 178 ++++++++++++++++++++++------------------------- package.json | 3 +- 4 files changed, 162 insertions(+), 181 deletions(-) diff --git a/index.js b/index.js index 3541ef3..be45a6f 100644 --- a/index.js +++ b/index.js @@ -135,22 +135,30 @@ module.exports = { return process.getLogger(__filename); }, + // this will return a promise, which will be resolved after getting two available ports 'enable': _.once(function(options, emitter){ options = options || {}; emitter = emitter || require('cluster-emitter'); emitter = require('./lib/utils').decorateEmitter(emitter); + var tillCacheServerStarted = when.defer(); emitter.once('CLUSTER-ENABLE-CACHE', function(options){ var master = options.master, - mode = options.mode || 'master'; + mode = options.mode || 'master', if(!master || mode === 'master'){ - var mgr = require('./lib/cache-mgr'), - svr = mgr.createServer(mgr.app); - - svr.listen(mgr.port, mgr.afterServerStarted); + process.env.CACHE_BASE_PORTS = master ? master.port + 10 : 9190; + var mgr = require('./lib/cache-mgr'); + mgr.configApp(mgr.app).then(function () { + svr = mgr.createServer(mgr.app), + process.cacheServer = svr; + + svr.listen(mgr.port, mgr.afterServerStarted); + }, function (error) { + logger.error('[cache] start server error %j', error); + }); } else{ //it's master's job to fork another worker specificly as the cache manager @@ -163,6 +171,7 @@ module.exports = { //if it's none cluster mode, above registered cache initialization will take effect //otherwise, in cluster mode, master will be responsible instead emitter.to(['master']).emit('CLUSTER-ENABLE-CACHE', options); + return tillCacheServerStarted.promise; }), 'use': function(namespace, options){ @@ -180,10 +189,10 @@ module.exports = { return new Cache(namespace, pipeline([ - function(domain){ + function(ports){ - logger.debug('[cache] using domain:%s', domain); - return _this.user.use(domain); + logger.debug('[cache] using ports: %j', ports); + return _this.user.use(ports); }, function(usr){ @@ -206,7 +215,7 @@ module.exports = { 'meta': meta }; } - ], actualOptions.domain));//optional + ], actualOptions.ports));//optional }, 'manager': require('./lib/cache-mgr'), diff --git a/lib/cache-mgr.js b/lib/cache-mgr.js index a0fa539..2903b0f 100644 --- a/lib/cache-mgr.js +++ b/lib/cache-mgr.js @@ -10,6 +10,7 @@ var common = require('./cache-common'), when = require('when'), timeout = require('when/timeout'), ensureDir = require('./utils').ensureDir, + pickAvailablePorts = require('./utils').pickAvailablePorts, logger = require('./utils').logger; var success = common.status.success, @@ -27,6 +28,7 @@ var success = common.status.success, domain = '/tmp/cache.socket.' + process.pid, domainPath = common.domainPath, persistPath = common.persistPath, + ports = null, namespaces = { '': { //meta namespace '': { //meta of meta namespace itself @@ -52,31 +54,31 @@ var success = common.status.success, var manager = { - 'domain': domain, + 'ports': ports, - 'ns': function(conn, token){ + 'ns': function(reply, token){ - common.write(conn, common.serialize({ + reply({ 'type': NS, 'token': token, 'namespaces': _.keys(namespaces), 'status': success - })); + }); }, - 'all': function(conn, token, namespace){ + 'all': function(reply, token, namespace){ var cache = namespaces[namespace] || {}; - common.write(conn, common.serialize({ + reply({ 'type': ALL, 'token': token, 'keys': _.keys(cache), 'status': success - })); + }); }, - 'lock': function(conn, token, namespace, key, value){ + 'lock': function(reply, token, namespace, key, value){ var cache = namespaces[namespace] = namespaces[namespace] || {}, write = cache[key] === undefined; @@ -87,15 +89,15 @@ var manager = { }; } - common.write(conn, common.serialize({ + reply({ 'type': LOCK, 'token': token, 'key': key, 'status': write ? success: failure - })); + }); }, - 'set': function(conn, token, namespace, key, value, overwrite){ + 'set': function(reply, token, namespace, key, value, overwrite){ var cache = namespaces[namespace] = namespaces[namespace] || {}, meta = metaOfNs(namespace), @@ -110,12 +112,12 @@ var manager = { } : cache[key]; - common.write(conn, common.serialize({ + reply({ 'type': SET, 'token': token, 'key': key, 'status': write ? success: failure - })); + }); if(write){ @@ -134,9 +136,7 @@ var manager = { 'value': value }); - _.each(conns, function(c){ - common.write(c, notify); - }); + this.cacheSocket.send(notify); if(namespace === '' && value === undefined @@ -146,7 +146,7 @@ var manager = { } }, - 'get': function(conn, token, namespace, key, value){ + 'get': function(reply, token, namespace, key, value){ var cache = namespaces[namespace] || {}, entry = cache[key] || {}, @@ -158,10 +158,10 @@ var manager = { 'status': success }; - common.write(conn, common.serialize(result)); + reply(result); }, - 'ins': function(conn, token, namespace, key){ + 'ins': function(reply, token, namespace, key){ var cache = namespaces[namespace] || {}, meta = metaOfNs(namespace), @@ -176,10 +176,10 @@ var manager = { 'status': success }; - common.write(conn, common.serialize(result)); + reply(result); }, - 'del': function(conn, token, namespace, key){ + 'del': function(reply, token, namespace, key){ var cache = namespaces[namespace] || {}, entry = cache[key] || {}, @@ -189,23 +189,23 @@ var manager = { metaOfNs(namespace).lastModified = Date.now(); - common.write(conn, common.serialize({ + reply({ 'type': DEL, 'token': token, 'key': key, 'value': value, 'status': success - })); + }); manager.notify(namespace, key, undefined); }, 'ping': function(conn, token){ - common.write(conn, common.serialize({ + this.cacheSocket.send({ 'type': PING, 'token': PONG - })); + }); }, 'pong': function(conn, token){ @@ -213,59 +213,44 @@ var manager = { } }; -module.exports = { - - 'createServer': net.createServer, - - 'app': function(conn) { //'connection' listener - - conns.push(conn); - - var buff = ''; - - conn.setEncoding('utf-8'); - conn.counter = 0; - - conn.on('data', function(data){ +module.exporots = { - buff += data; - - var packs = buff.split('\r\n'); - - if(packs.length > 1){ - buff = packs.pop();//whatever the last element is, could be empty string, or partial data, wait till next data arrives. - - _.each(packs, function(pack){ - - conn.counter += 1; - - var command = common.deserialize(pack); - - manager[command.type].apply(manager, [ - conn, command.token, command.ns, command.key, command.value, !command.leaveIfNonNull - ]); - }); - }//otherwise keep buffering... - }); - - var keepAlive = setInterval(function(){ - //this is to keep the connection open, just to send PING, and receive PONG, and will be extended to validate the health of the connection later - manager.ping(conn, PONG); - - }, 3000);//ping/pong every 3secs - - conn.once('close', function(){ - - logger().info('connection closed who has received:%d', conn.counter); + 'configApp': function configApp(app) { + var basePort = process.env.CACHE_BASE_PORT || 9190; + return pickAvailablePorts(basePort, basePort + 100, 2).then(function (ports) { + logger.info('[cache-mgr] pick up ports %j', ports); + manager.ports = ports; + return manager.ports; + }); + }, - conns = _.without(conns, conn); - conn.destroy(); + 'createServer': function createServer(app) { + return app; + }, - clearInterval(keepAlive); - }); - }, + 'app': (function () { //'connection' listener + manager.cacheSocket = require('./cache-socket/cache-socket-factory').getCacheSocket('manager', 'json'); + + manager.cacheSocket.on('message', function (command, reply) { + manager[command.type].apply(manager, [ + reply, command.token, command.ns, command.key, command.value, !command.leaveIfNonNull + ]); + }); + + var keepAlive = setInterval(function(){ + //this is to keep the connection open, just to send PING, and receive PONG, and will be extended to validate the health of the connection later + manager.ping(PONG); + }, 3000); + + manager.cacheSocket.once('close', function () { + logger.info('[cache-mgr] cacheSocket closed'); + clearInterval(keepAlive); + }); - 'port': domain, + return manager.cacheSocket; + })(), + + 'port': manager.ports, 'afterServerStarted': function() { //'listening' listener @@ -309,7 +294,7 @@ module.exports = { logger().info('[cache] manager loaded from all persistences'); //register domain to where all other users and the master process could watch - fs.writeFileSync(domainPath, domain); + fs.writeFileSync(domainPath, JSON.stringify(ports)); var nextUpdate = null, updateTask = function updateTask(){ diff --git a/lib/cache-usr.js b/lib/cache-usr.js index 42e7a91..e179027 100644 --- a/lib/cache-usr.js +++ b/lib/cache-usr.js @@ -7,7 +7,8 @@ var _ = require('underscore'), timeout = require('when/timeout'), fs = require('graceful-fs'), common = require('./cache-common'), - logger = require('./utils').logger; + logger = require('./utils').logger, + cacheSocket = require('./cache-socket/cache-socket-factory').getCacheSocket('user', 'json'); var success = common.status.success, NS = common.types.NS, @@ -35,78 +36,68 @@ var success = common.status.success, }, conn = null, + ports = null, //TODO, support connection pooling to speed up the cache operations if needed. reconnect = function reconnect(error){ - var options = _.isString(domain) ? {'path': domain} : domain, - connecting = net.connect(options, - function(){ - connecting.writable = true; - conn = connecting; - if(!userDeferred.hasBeenResolved){ - userDeferred.resolve(process.user = user); - userDeferred.hasBeenResolved = true; - } - }), - buff = ''; - - connecting.setEncoding('utf-8'); - connecting.on('data', function(data){ - - buff += data; - - var packs = buff.split('\r\n'); - - if(packs.length > 1){ - - buff = packs.pop(); - - _.each(packs, function(pack){ - - var response = common.deserialize(pack), - token = response.token, - namespace = response.ns, - key = response.key, - value = response.value; + cacheSocket.on('error', function (error) { + logger.error('[cache-usr] cacheSocket error %j', error); + } - if(token === CHN){ - - changes[namespace] = changes[namespace] || {}; - _.each(changes[namespace][key] || [], function(whenChange){ - whenChange(value, key); - }); + cacheSocket.connect(ports, function (error) { + logger.info('[cache-usr] connected to cache server on %j', ports); + if (error) { + logger.error('[cache-usr] connect error %j', error); + userDeferred.reject(error); + }else if (!userDeferred.hasBeenResolved) { + userDeferred.resolve(process.user = user); + userDeferred.hasBeenResolved = true; + } + }); - _.invoke(anyChanges[namespace] || [], 'call', null, value, key); - } - else if(token === PONG){//just to keep the connection open - - common.write(connecting, common.serialize({ - 'type': PONG, - 'token': PONG - })); - } - else{ - - handlers[token].apply(user, [ - response.status, - key || response.keys || response.namespaces, - value, - response.persist, - response.expire - ]); - } - }); - } - }); - - connecting.once('close', function(error){ + cacheSocket.on('message', function (response) { + var token = response.token, + namespace = response.ns, + key = response.key, + value = response.value; + + if (token === CHN) { + changes[namespace] = changes[namespace] || {}; + _.each(changes[namespace][key] || [], function(whenChange){ + whenChange(value, key); + }); + + _.invoke(anyChanges[namespace] || [], 'call', null, value, key); + }else if(token === PONG) { + cacheSocket.send({ + 'type': PONG, + 'token': PONG + }, function () { + // do not need to reply + }); + } + }); - connecting.writable = false; - connecting.destroy(); - - reconnect(error); - }); - }; + cacheSocket.once('close'. function (error) { + reconnect(error); + }); + }, + reply = function reply(response) { + logger.debug('[cache-usr] get response %j', response); + + var token = response.token, + namespace = response.ns, + key = response.key, + value = response.value; + + handlers[token].apply(user, [ + response.status, + key || response.key || response.namespaces, + value, + response.persist, + response.expire + ]); + }; var user = process.user || { @@ -131,10 +122,10 @@ var user = process.user || { handlers[token] = handler; - common.write(conn, common.serialize({ + cacheSocket.send({ 'type': NS, 'token': token - })); + }, reply); return (wait > 0 ? timeout(wait, tillNs.promise) : tillNs.promise).ensure(function(){ delete handlers[token]; @@ -162,11 +153,11 @@ var user = process.user || { handlers[token] = handler; - common.write(conn, common.serialize({ + cacheSocket.send({ 'type': ALL, 'token': token, 'ns': namespace - })); + }, reply); return (wait > 0 ? timeout(wait, tillKeys.promise) : tillKeys.promise).ensure(function(){ delete handlers[token]; @@ -255,12 +246,12 @@ var user = process.user || { //local copy miss, fetch from master cache handlers[token] = handler; - common.write(conn, common.serialize({ + cacheSocket.send({ 'type': GET, 'token': token, 'ns': namespace, 'key': key - })); + , reply}); return (wait > 0 ? timeout(wait, tillGet.promise) : tillGet.promise).ensure(function(){ delete handlers[token]; @@ -288,12 +279,12 @@ var user = process.user || { //local copy miss, fetch from master cache handlers[token] = handler; - common.write(conn, common.serialize({ + cacheSocket.send({ 'type': INSPECT, 'token': token, 'ns': namespace, 'key': key - })); + }, reply); return (wait > 0 ? timeout(wait, tillInspect.promise) : tillInspect.promise).ensure(function(){ delete handlers[token]; @@ -324,14 +315,14 @@ var user = process.user || { handlers[token] = handler; - common.write(conn, common.serialize({ + cacheSocket.send({ 'type': SET, 'token': token, 'ns': namespace, 'key': key, 'value': value, 'leaveIfNonNull': options.leaveIfNonNull - })); + }, reply); return (wait > 0 ? timeout(wait, tillSet.promise) : tillSet.promise).ensure(function(){ delete handlers[token]; @@ -352,13 +343,13 @@ var user = process.user || { handlers[token] = handler; - common.write(conn, common.serialize({ + cacheSocket.send({ 'type': LOCK, 'token': token, 'ns': namespace, 'key': key, 'value': process.pid - })); + }, reply); return (wait > 0 ? timeout(wait, tillLock.promise) : tillLock.promise).ensure(function(){ delete handlers[token]; @@ -383,12 +374,12 @@ var user = process.user || { handlers[token] = handler; - common.write(conn, common.serialize({ + cacheSocket.send({ 'type': DEL, 'token': token, 'ns': namespace, 'key': key - })); + }, reply); return (wait > 0 ? timeout(wait, tillDel.promise) : tillDel.promise).ensure(function(){ delete handlers[token]; @@ -436,18 +427,13 @@ var user = process.user || { return stat; }, - 'switchDomain': function(d){//only in case of cache-mgr down and needs to switch to a new one + 'switchPorts': function(p){//only in case of cache-mgr down and needs to switch to a new one - if(d && domain !== d){ - - domain = d; - if(conn){ - conn.writable = false; - conn.end(); - } + if(p && JSON.stringify(ports) !== p){ + ports = JSON.parse(p); reconnect(); } - }, + }, 'pong': function(){ @@ -463,9 +449,9 @@ var tillExists = function(path){ fs.readFile(path, { 'encoding': 'utf-8' }, - function(err, d){ - logger().info('[cache] switching domain:%s', d); - user.switchDomain(d); + function(err, p){ + logger().info('[cache-usr] switching ports:%s', p); + user.switchPorts(p); }); }); } @@ -477,15 +463,15 @@ var tillExists = function(path){ }); }; -exports.use = function(domain){ +exports.use = function(p){ if(!process.userPromise){//each process needs at most one cache user - logger().info('[cache] user created'); + logger().info('[cache-usr] created'); tillExists(common.domainPath); - user.switchDomain(domain || fs.readFileSync(common.domainPath, {'encoding':'utf-8'})); + user.switchPorts(p || fs.readFileSync(common.domainPath, {'encoding':'utf-8'})); process.userPromise = userDeferred.promise; } diff --git a/package.json b/package.json index 8eb7745..b46a8ab 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,8 @@ "graceful-fs": "~2.0.1", "underscore": "~1.5.2", "request": "~2.30.0", - "when": "~2.7.0" + "when": "~2.7.0", + "axon": "~1.0.0" }, "devDependencies": { "optimist": "~0.6.0", From 3abcdaf01e6719a28ab47b6cbe82b04d64920d8e Mon Sep 17 00:00:00 2001 From: zhuwang Date: Fri, 27 Dec 2013 13:49:33 -0800 Subject: [PATCH 2/2] Cache axon implementation --- index.js | 19 ++-- lib/cache-mgr.js | 68 +++++--------- lib/cache-usr.js | 180 ++++++++++++------------------------- lib/utils.js | 2 - test/cluster-cache-test.js | 2 +- 5 files changed, 88 insertions(+), 183 deletions(-) diff --git a/index.js b/index.js index be45a6f..8974ee4 100644 --- a/index.js +++ b/index.js @@ -141,21 +141,21 @@ module.exports = { options = options || {}; emitter = emitter || require('cluster-emitter'); emitter = require('./lib/utils').decorateEmitter(emitter); - var tillCacheServerStarted = when.defer(); + //var tillCacheServerStarted = when.defer(); emitter.once('CLUSTER-ENABLE-CACHE', function(options){ - var master = options.master, - mode = options.mode || 'master', + mode = options.mode || 'master'; if(!master || mode === 'master'){ - process.env.CACHE_BASE_PORTS = master ? master.port + 10 : 9190; + process.env.CACHE_BASE_PORT = master ? master.port + 10 : 9190; var mgr = require('./lib/cache-mgr'); - mgr.configApp(mgr.app).then(function () { - svr = mgr.createServer(mgr.app), + mgr.configApp(mgr.app).then(function (ports) { + mgr.port = ports; + var svr = mgr.createServer(mgr.app); process.cacheServer = svr; - svr.listen(mgr.port, mgr.afterServerStarted); + svr.listen(ports, mgr.afterServerStarted); }, function (error) { logger.error('[cache] start server error %j', error); }); @@ -163,7 +163,8 @@ module.exports = { else{ //it's master's job to fork another worker specificly as the cache manager master.fork(master.options, { - 'CACHE_MANAGER': true + 'CACHE_MANAGER': true, + 'CACHE_BASE_PORT': master.port + 10 }); } }); @@ -171,7 +172,7 @@ module.exports = { //if it's none cluster mode, above registered cache initialization will take effect //otherwise, in cluster mode, master will be responsible instead emitter.to(['master']).emit('CLUSTER-ENABLE-CACHE', options); - return tillCacheServerStarted.promise; + //return tillCacheServerStarted.promise; }), 'use': function(namespace, options){ diff --git a/lib/cache-mgr.js b/lib/cache-mgr.js index 2903b0f..7ee1fcb 100644 --- a/lib/cache-mgr.js +++ b/lib/cache-mgr.js @@ -3,9 +3,6 @@ var common = require('./cache-common'), _ = require('underscore'), fs = require('graceful-fs'), - os = require('os'), - net = require('net'), - util = require('util'), path = require('path'), when = require('when'), timeout = require('when/timeout'), @@ -15,17 +12,8 @@ var common = require('./cache-common'), var success = common.status.success, failure = common.status.failure, - NS = common.types.NS, - GET = common.types.GET, - SET = common.types.SET, - DEL = common.types.DEL, - ALL = common.types.ALL, - LOCK = common.types.LOCK, - INSPECT = common.types.INSPECT, - PING = common.types.PING, PONG = common.types.PONG, CHN = common.changeToken, - domain = '/tmp/cache.socket.' + process.pid, domainPath = common.domainPath, persistPath = common.persistPath, ports = null, @@ -49,36 +37,31 @@ var success = common.status.success, 'persist': false, 'expire': 0 };//default meta - }, - conns = []; + }; var manager = { 'ports': ports, - 'ns': function(reply, token){ + 'ns': function(reply){ reply({ - 'type': NS, - 'token': token, 'namespaces': _.keys(namespaces), 'status': success }); }, - 'all': function(reply, token, namespace){ + 'all': function(reply, namespace){ var cache = namespaces[namespace] || {}; reply({ - 'type': ALL, - 'token': token, 'keys': _.keys(cache), 'status': success }); }, - 'lock': function(reply, token, namespace, key, value){ + 'lock': function(reply, namespace, key, value){ var cache = namespaces[namespace] = namespaces[namespace] || {}, write = cache[key] === undefined; @@ -90,14 +73,12 @@ var manager = { } reply({ - 'type': LOCK, - 'token': token, 'key': key, 'status': write ? success: failure }); }, - 'set': function(reply, token, namespace, key, value, overwrite){ + 'set': function(reply, namespace, key, value, overwrite){ var cache = namespaces[namespace] = namespaces[namespace] || {}, meta = metaOfNs(namespace), @@ -113,8 +94,6 @@ var manager = { : cache[key]; reply({ - 'type': SET, - 'token': token, 'key': key, 'status': write ? success: failure }); @@ -129,12 +108,12 @@ var manager = { 'notify': function(namespace, key, value){ - var notify = common.serialize({ + var notify = { 'token': CHN, 'ns': namespace, 'key': key, 'value': value - }); + }; this.cacheSocket.send(notify); @@ -146,13 +125,11 @@ var manager = { } }, - 'get': function(reply, token, namespace, key, value){ + 'get': function(reply, namespace, key, value){ var cache = namespaces[namespace] || {}, entry = cache[key] || {}, result = { - 'type': GET, - 'token': token, 'key': key, 'value': entry.value || value,//default 'status': success @@ -161,14 +138,12 @@ var manager = { reply(result); }, - 'ins': function(reply, token, namespace, key){ + 'ins': function(reply, namespace, key){ var cache = namespaces[namespace] || {}, meta = metaOfNs(namespace), entry = cache[key], result = { - 'type': INSPECT, - 'token': token, 'key': key, 'value': entry ? entry.value : null, 'persist': meta.persist, @@ -179,7 +154,7 @@ var manager = { reply(result); }, - 'del': function(reply, token, namespace, key){ + 'del': function(reply, namespace, key){ var cache = namespaces[namespace] || {}, entry = cache[key] || {}, @@ -190,8 +165,6 @@ var manager = { metaOfNs(namespace).lastModified = Date.now(); reply({ - 'type': DEL, - 'token': token, 'key': key, 'value': value, 'status': success @@ -200,27 +173,28 @@ var manager = { manager.notify(namespace, key, undefined); }, - 'ping': function(conn, token){ + 'ping': function(){ this.cacheSocket.send({ - 'type': PING, 'token': PONG }); }, - 'pong': function(conn, token){ + 'pong': function(reply){ //nothing } }; -module.exporots = { +module.exports = { 'configApp': function configApp(app) { - var basePort = process.env.CACHE_BASE_PORT || 9190; - return pickAvailablePorts(basePort, basePort + 100, 2).then(function (ports) { - logger.info('[cache-mgr] pick up ports %j', ports); + var basePort = parseInt(process.env.CACHE_BASE_PORT) || 9190; + return pickAvailablePorts(basePort, basePort + 10, 2).then(function (ports) { + logger().info('[cache-mgr] picked up ports %j', ports); manager.ports = ports; return manager.ports; + }, function (error) { + logger().error(error); }); }, @@ -233,7 +207,7 @@ module.exporots = { manager.cacheSocket.on('message', function (command, reply) { manager[command.type].apply(manager, [ - reply, command.token, command.ns, command.key, command.value, !command.leaveIfNonNull + reply, command.ns, command.key, command.value, !command.leaveIfNonNull ]); }); @@ -243,7 +217,7 @@ module.exporots = { }, 3000); manager.cacheSocket.once('close', function () { - logger.info('[cache-mgr] cacheSocket closed'); + logger().info('[cache-mgr] cacheSocket closed'); clearInterval(keepAlive); }); @@ -294,7 +268,7 @@ module.exporots = { logger().info('[cache] manager loaded from all persistences'); //register domain to where all other users and the master process could watch - fs.writeFileSync(domainPath, JSON.stringify(ports)); + fs.writeFileSync(domainPath, JSON.stringify(manager.ports)); var nextUpdate = null, updateTask = function updateTask(){ diff --git a/lib/cache-usr.js b/lib/cache-usr.js index e179027..f612640 100644 --- a/lib/cache-usr.js +++ b/lib/cache-usr.js @@ -1,15 +1,13 @@ 'use strict'; var _ = require('underscore'), - net = require('net'), - util = require('util'), when = require('when'), timeout = require('when/timeout'), fs = require('graceful-fs'), common = require('./cache-common'), logger = require('./utils').logger, cacheSocket = require('./cache-socket/cache-socket-factory').getCacheSocket('user', 'json'); - + var success = common.status.success, NS = common.types.NS, ALL = common.types.ALL, @@ -20,12 +18,7 @@ var success = common.status.success, INSPECT = common.types.INSPECT, PONG = common.types.PONG, CHN = common.changeToken, - nextToken = common.nextToken, - domain = null, userDeferred = when.defer(), - handlers = { - - }, changes = { }, @@ -35,19 +28,18 @@ var success = common.status.success, stats = { }, - conn = null, ports = null, //TODO, support connection pooling to speed up the cache operations if needed. - reconnect = function reconnect(error){ + reconnect = function reconnect(error) { cacheSocket.on('error', function (error) { - logger.error('[cache-usr] cacheSocket error %j', error); - } + logger().error('[cache-usr] cacheSocket error %j', error); + }); cacheSocket.connect(ports, function (error) { - logger.info('[cache-usr] connected to cache server on %j', ports); + logger().info('[cache-usr] connected to cache server on %j', ports); if (error) { - logger.error('[cache-usr] connect error %j', error); + logger().error('[cache-usr] connect error %j', error); userDeferred.reject(error); }else if (!userDeferred.hasBeenResolved) { userDeferred.resolve(process.user = user); @@ -55,13 +47,15 @@ var success = common.status.success, } }); - cacheSocket.on('message', function (response) { - var token = response.token, - namespace = response.ns, - key = response.key, - value = response.value; + cacheSocket.on('message', function (msg) { + // Only CHN and PONG msg will enter here + var token = msg.token, + namespace = msg.ns, + key = msg.key, + value = msg.value; if (token === CHN) { + // get notified from cache-mgr changes[namespace] = changes[namespace] || {}; _.each(changes[namespace][key] || [], function(whenChange){ whenChange(value, key); @@ -69,35 +63,19 @@ var success = common.status.success, _.invoke(anyChanges[namespace] || [], 'call', null, value, key); }else if(token === PONG) { + // get ping from cache-mgr cacheSocket.send({ 'type': PONG, - 'token': PONG }, function () { // do not need to reply }); } }); - cacheSocket.once('close'. function (error) { + cacheSocket.once('close', function (error) { reconnect(error); }); - }, - reply = function reply(response) { - logger.debug('[cache-usr] get response %j', response); - - var token = response.token, - namespace = response.ns, - key = response.key, - value = response.value; - - handlers[token].apply(user, [ - response.status, - key || response.key || response.namespaces, - value, - response.persist, - response.expire - ]); - }; + }; var user = process.user || { @@ -105,14 +83,13 @@ var user = process.user || { options = options || {}; - var token = nextToken(), - wait = options.wait, + var wait = options.wait, tillNs = when.defer(), - handler = function(status, namespaces){ + reply = function(response){ - if(success === status){ + if(success === response.status){ - tillNs.resolve(namespaces || []); + tillNs.resolve(response.namespaces || []); } else{ @@ -120,30 +97,24 @@ var user = process.user || { } }; - handlers[token] = handler; - cacheSocket.send({ 'type': NS, - 'token': token }, reply); - return (wait > 0 ? timeout(wait, tillNs.promise) : tillNs.promise).ensure(function(){ - delete handlers[token]; - }); + return (wait > 0 ? timeout(wait, tillNs.promise) : tillNs.promise); }, 'keys': function(namespace, options){ options = options || {}; - var token = nextToken(), - wait = options.wait, + var wait = options.wait, tillKeys = when.defer(), - handler = function(status, keys){ + reply = function(response){ - if(success === status){ + if(success === response.status){ - tillKeys.resolve(keys || []); + tillKeys.resolve(response.keys || []); } else{ @@ -151,32 +122,26 @@ var user = process.user || { } }; - handlers[token] = handler; - cacheSocket.send({ 'type': ALL, - 'token': token, 'ns': namespace }, reply); - return (wait > 0 ? timeout(wait, tillKeys.promise) : tillKeys.promise).ensure(function(){ - delete handlers[token]; - }); + return (wait > 0 ? timeout(wait, tillKeys.promise) : tillKeys.promise); }, 'get': function(namespace, key, loader, options){ options = options || {}; - var token = nextToken(), - wait = options.wait, + var wait = options.wait, tillGet = when.defer(), - handler = function handler(status, key, value){ + reply = function (response){ - if(success === status && value !== undefined){//got the value + if(success === response.status && response.value !== undefined){//got the value user.stat(namespace, 'hit'); - tillGet.resolve(value); + tillGet.resolve(response.value); } else if(loader){//must atomically load the value @@ -242,53 +207,39 @@ var user = process.user || { } }; - - //local copy miss, fetch from master cache - handlers[token] = handler; - cacheSocket.send({ 'type': GET, - 'token': token, 'ns': namespace, 'key': key - , reply}); + }, reply); - return (wait > 0 ? timeout(wait, tillGet.promise) : tillGet.promise).ensure(function(){ - delete handlers[token]; - }); + return (wait > 0 ? timeout(wait, tillGet.promise) : tillGet.promise); }, 'inspect': function(namespace, key, options){ options = options || {}; - var token = nextToken(), - wait = options.wait, + var wait = options.wait, tillInspect = when.defer(), - handler = function(status, key, value, persist, expire){ + reply = function(response){ - if(success === status && value !== undefined){ + if(success === response.status && response.value !== undefined){ - tillInspect.resolve([value, persist, expire]); + tillInspect.resolve([response.value, response.persist, response.expire]); } else{ tillInspect.reject(new Error('no value found for key')); } }; - //local copy miss, fetch from master cache - handlers[token] = handler; - cacheSocket.send({ 'type': INSPECT, - 'token': token, 'ns': namespace, 'key': key }, reply); - return (wait > 0 ? timeout(wait, tillInspect.promise) : tillInspect.promise).ensure(function(){ - delete handlers[token]; - }); + return (wait > 0 ? timeout(wait, tillInspect.promise) : tillInspect.promise); }, /** @@ -305,55 +256,43 @@ var user = process.user || { options = options || {}; - var token = nextToken(), - wait = options.wait, + var wait = options.wait, tillSet = when.defer(), - handler = function(status, key){ + reply = function(response){ - tillSet.resolve(success === status); + tillSet.resolve(success === response.status); }; - handlers[token] = handler; - cacheSocket.send({ 'type': SET, - 'token': token, 'ns': namespace, 'key': key, 'value': value, 'leaveIfNonNull': options.leaveIfNonNull }, reply); - return (wait > 0 ? timeout(wait, tillSet.promise) : tillSet.promise).ensure(function(){ - delete handlers[token]; - }); + return (wait > 0 ? timeout(wait, tillSet.promise) : tillSet.promise); }, 'lock': function(namespace, key, options){//must guarantee that value has no '\r\n' in it (if it's a string or any complex type) options = options || {}; - var token = nextToken(), - wait = options.wait, + var wait = options.wait, tillLock = when.defer(), - handler = function(status, key){ + reply = function(response){ - tillLock.resolve(success === status); + tillLock.resolve(success === response.status); }; - handlers[token] = handler; - cacheSocket.send({ 'type': LOCK, - 'token': token, 'ns': namespace, 'key': key, 'value': process.pid }, reply); - return (wait > 0 ? timeout(wait, tillLock.promise) : tillLock.promise).ensure(function(){ - delete handlers[token]; - }); + return (wait > 0 ? timeout(wait, tillLock.promise) : tillLock.promise); }, /** @@ -364,26 +303,20 @@ var user = process.user || { options = options || {}; - var token = nextToken(), - wait = options.wait, + var wait = options.wait, tillDel = when.defer(), - handler = function(status, key, value){ + reply = function(response){ - tillDel.resolve(success === status ? value : null); + tillDel.resolve(success === response.status ? response.value : null); }; - handlers[token] = handler; - cacheSocket.send({ 'type': DEL, - 'token': token, 'ns': namespace, 'key': key }, reply); - return (wait > 0 ? timeout(wait, tillDel.promise) : tillDel.promise).ensure(function(){ - delete handlers[token]; - }); + return (wait > 0 ? timeout(wait, tillDel.promise) : tillDel.promise); }, 'watch': function(namespace, key, callback){ @@ -427,23 +360,20 @@ var user = process.user || { return stat; }, - 'switchPorts': function(p){//only in case of cache-mgr down and needs to switch to a new one + 'switchPorts': function(p) {//only in case of cache-mgr down and needs to switch to a new one if(p && JSON.stringify(ports) !== p){ ports = JSON.parse(p); reconnect(); } - }, - - 'pong': function(){ - - } + } }; //when the cache-mgr is created by a replacement process, the new domain will be written to the same file being watched below var tillExists = function(path){ fs.exists(path, function(exists){ if(exists){ + user.switchPorts(fs.readFileSync(path, {'encoding': 'utf-8'})); fs.watchFile(path, function(){ fs.readFile(path, { @@ -471,8 +401,10 @@ exports.use = function(p){ tillExists(common.domainPath); - user.switchPorts(p || fs.readFileSync(common.domainPath, {'encoding':'utf-8'})); - + if (p) { + user.switchPorts(p); + } + process.userPromise = userDeferred.promise; } diff --git a/lib/utils.js b/lib/utils.js index 9744948..add15b3 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -96,9 +96,7 @@ exports.pickAvailablePort = function pickAvailablePort(min, max){ }; exports.pickAvailablePorts = function pickAvailablePorts(min, max, count){ - return when.map(_.range(0, count), function(ith){ - return exports.pickAvailablePort(min, max); }); }; diff --git a/test/cluster-cache-test.js b/test/cluster-cache-test.js index dd78273..55a6675 100644 --- a/test/cluster-cache-test.js +++ b/test/cluster-cache-test.js @@ -189,4 +189,4 @@ describe('cache', function(){ }); -}); \ No newline at end of file +});