diff --git a/redis/README.md b/redis/README.md index cf7523d..35b881e 100644 --- a/redis/README.md +++ b/redis/README.md @@ -20,8 +20,10 @@ val o = play.api.cache.Cache.getAs[String]("mykey") #### Configurable -* Point to your Redis server using configuration settings ```redis.host```, ```redis.port```, ```redis.password``` and ```redis.database``` (defaults: ```localhost```, ```6379```, ```null``` and ```0```) +* Point to your Redis server using configuration settings ```redis.host```, ```redis.port```, ```redis.password``` and ```redis.database``` (defaults: ```localhost```, ```6379```, ```null``` and ```0```). * Alternatively, specify a URI-based configuration using ```redis.uri``` (for example: ```redis.uri="redis://user:password@localhost:6379"```). +* Configure your Sentinels using ```redis.master.name``` and ```redis.sentinel.hosts``` (defaults: ```mymaster```, ```localhost:26379```). +* If ```redis..sentinel.mode``` is true (default: ```false```), then the `````` named cached will always contact the master node. Otherwise, the ```redis.host```, ```redis.port```, or ```redis.uri``` settings will be used. * Set the timeout in milliseconds using ```redis.timeout``` (default is 2000). * Configure any aspect of the connection pool. See [the documentation for commons-pool2 ```GenericObjectPoolConfig```](https://commons.apache.org/proper/commons-pool/apidocs/org/apache/commons/pool2/impl/GenericObjectPoolConfig.html), the underlying pool implementation, for more information on each setting. * redis.pool.maxIdle @@ -68,7 +70,7 @@ pool.withJedisClient { client => } ``` play = 2.4.x and 2.5.x: -Because the underlying Sedis Pool was injected for the cache module to use, you can just inject the sedis Pool yourself, something like this: +Because the underlying Sedis and Sentinel Pool was injected for the cache module to use, you can just inject the Pool yourself, something like this: ```scala //scala diff --git a/redis/src/main/scala/com/typesafe/play/redis/JedisPoolProvider.scala b/redis/src/main/scala/com/typesafe/play/redis/JedisPoolProvider.scala index cf1aaeb..67a5fad 100644 --- a/redis/src/main/scala/com/typesafe/play/redis/JedisPoolProvider.scala +++ b/redis/src/main/scala/com/typesafe/play/redis/JedisPoolProvider.scala @@ -1,12 +1,14 @@ package com.typesafe.play.redis +import PoolConfig.createPoolConfig + import java.net.URI import javax.inject.{Provider, Inject, Singleton} import org.apache.commons.lang3.builder.ReflectionToStringBuilder import play.api.inject.ApplicationLifecycle import play.api.{Logger, Configuration} -import redis.clients.jedis.{JedisPool, JedisPoolConfig} +import redis.clients.jedis.JedisPool import scala.concurrent.Future @@ -53,22 +55,4 @@ class JedisPoolProvider @Inject()(config: Configuration, lifecycle: ApplicationL jedisPool } - - private def createPoolConfig(config: Configuration): JedisPoolConfig = { - val poolConfig: JedisPoolConfig = new JedisPoolConfig() - config.getInt("redis.pool.maxIdle").foreach(poolConfig.setMaxIdle) - config.getInt("redis.pool.minIdle").foreach(poolConfig.setMinIdle) - config.getInt("redis.pool.maxTotal").foreach(poolConfig.setMaxTotal) - config.getLong("redis.pool.maxWaitMillis").foreach(poolConfig.setMaxWaitMillis) - config.getBoolean("redis.pool.testOnBorrow").foreach(poolConfig.setTestOnBorrow) - config.getBoolean("redis.pool.testOnReturn").foreach(poolConfig.setTestOnReturn) - config.getBoolean("redis.pool.testWhileIdle").foreach(poolConfig.setTestWhileIdle) - config.getLong("redis.pool.timeBetweenEvictionRunsMillis").foreach(poolConfig.setTimeBetweenEvictionRunsMillis) - config.getInt("redis.pool.numTestsPerEvictionRun").foreach(poolConfig.setNumTestsPerEvictionRun) - config.getLong("redis.pool.minEvictableIdleTimeMillis").foreach(poolConfig.setMinEvictableIdleTimeMillis) - config.getLong("redis.pool.softMinEvictableIdleTimeMillis").foreach(poolConfig.setSoftMinEvictableIdleTimeMillis) - config.getBoolean("redis.pool.lifo").foreach(poolConfig.setLifo) - config.getBoolean("redis.pool.blockWhenExhausted").foreach(poolConfig.setBlockWhenExhausted) - poolConfig - } } diff --git a/redis/src/main/scala/com/typesafe/play/redis/JedisSentinelPoolProvider.scala b/redis/src/main/scala/com/typesafe/play/redis/JedisSentinelPoolProvider.scala new file mode 100644 index 0000000..9821c9f --- /dev/null +++ b/redis/src/main/scala/com/typesafe/play/redis/JedisSentinelPoolProvider.scala @@ -0,0 +1,53 @@ +package com.typesafe.play.redis + +import PoolConfig.createPoolConfig + +import java.net.URI +import javax.inject.{Inject, Provider, Singleton} + +import collection.JavaConverters._ +import org.apache.commons.lang3.builder.ReflectionToStringBuilder +import play.api.{Configuration, Logger} +import play.api.inject.ApplicationLifecycle +import redis.clients.jedis.JedisSentinelPool + +import scala.concurrent.Future + +@Singleton +class JedisSentinelPoolProvider @Inject()(config: Configuration, lifecycle: ApplicationLifecycle) extends Provider[JedisSentinelPool] { + + lazy val logger = Logger("redis.module") + lazy val get: JedisSentinelPool = { + val jedisSentinelPool = { + val redisUri = config.getString("redis.uri").map(new URI(_)) + + val masterName = config.getString("redis.master.name").getOrElse("mymaster") + + val sentinelHosts = config.getStringList("redis.sentinel.hosts").getOrElse(Seq("localhost:26379").asJava) + + val sentinelSet = new java.util.HashSet[String]() + sentinelSet.addAll(sentinelHosts) + + val password = config.getString("redis.password") + .orElse(redisUri.map(_.getUserInfo).filter(_ != null).filter(_ contains ":").map(_.split(":", 2)(1))) + .orNull + + val timeout = config.getInt("redis.timeout").getOrElse(2000) + + val poolConfig = createPoolConfig(config) + Logger.info(s"Redis Plugin enabled. Monitoring Redis master $masterName with Sentinels $sentinelSet and timeout $timeout.") + Logger.info("Redis Plugin pool configuration: " + new ReflectionToStringBuilder(poolConfig).toString) + + new JedisSentinelPool(masterName, sentinelSet, poolConfig, timeout, password) + } + + logger.info("Starting Jedis Sentinel Pool Provider") + + lifecycle.addStopHook(() => Future.successful { + logger.info("Stopping Jedis Sentinel Pool Provider") + jedisSentinelPool.destroy() + }) + + jedisSentinelPool + } +} diff --git a/redis/src/main/scala/com/typesafe/play/redis/PoolConfig.scala b/redis/src/main/scala/com/typesafe/play/redis/PoolConfig.scala new file mode 100644 index 0000000..f5a4dfc --- /dev/null +++ b/redis/src/main/scala/com/typesafe/play/redis/PoolConfig.scala @@ -0,0 +1,24 @@ +package com.typesafe.play.redis + +import play.api.Configuration +import redis.clients.jedis.JedisPoolConfig + +object PoolConfig { + def createPoolConfig(config: Configuration): JedisPoolConfig = { + val poolConfig: JedisPoolConfig = new JedisPoolConfig() + config.getInt("redis.pool.maxIdle").foreach(poolConfig.setMaxIdle) + config.getInt("redis.pool.minIdle").foreach(poolConfig.setMinIdle) + config.getInt("redis.pool.maxTotal").foreach(poolConfig.setMaxTotal) + config.getLong("redis.pool.maxWaitMillis").foreach(poolConfig.setMaxWaitMillis) + config.getBoolean("redis.pool.testOnBorrow").foreach(poolConfig.setTestOnBorrow) + config.getBoolean("redis.pool.testOnReturn").foreach(poolConfig.setTestOnReturn) + config.getBoolean("redis.pool.testWhileIdle").foreach(poolConfig.setTestWhileIdle) + config.getLong("redis.pool.timeBetweenEvictionRunsMillis").foreach(poolConfig.setTimeBetweenEvictionRunsMillis) + config.getInt("redis.pool.numTestsPerEvictionRun").foreach(poolConfig.setNumTestsPerEvictionRun) + config.getLong("redis.pool.minEvictableIdleTimeMillis").foreach(poolConfig.setMinEvictableIdleTimeMillis) + config.getLong("redis.pool.softMinEvictableIdleTimeMillis").foreach(poolConfig.setSoftMinEvictableIdleTimeMillis) + config.getBoolean("redis.pool.lifo").foreach(poolConfig.setLifo) + config.getBoolean("redis.pool.blockWhenExhausted").foreach(poolConfig.setBlockWhenExhausted) + poolConfig + } +} diff --git a/redis/src/main/scala/com/typesafe/play/redis/RedisModule.scala b/redis/src/main/scala/com/typesafe/play/redis/RedisModule.scala index c3fbdc0..a6e4faf 100644 --- a/redis/src/main/scala/com/typesafe/play/redis/RedisModule.scala +++ b/redis/src/main/scala/com/typesafe/play/redis/RedisModule.scala @@ -2,12 +2,12 @@ package com.typesafe.play.redis import javax.inject.{Inject, Provider} -import org.sedis.Pool +import org.sedis.{Pool, SentinelPool} import play.api.cache.{CacheApi, Cached, NamedCache} import play.api.inject._ import play.api.{Configuration, Environment} -import play.cache.{CacheApi => JavaCacheApi, DefaultCacheApi => DefaultJavaCacheApi, NamedCacheImpl} -import redis.clients.jedis.JedisPool +import play.cache.{NamedCacheImpl, CacheApi => JavaCacheApi, DefaultCacheApi => DefaultJavaCacheApi} +import redis.clients.jedis.{JedisPool, JedisSentinelPool} /** * Redis cache components for compile time injection @@ -49,7 +49,10 @@ class RedisModule extends Module { val namedCache = named(name) val cacheApiKey = bind[CacheApi].qualifiedWith(namedCache) Seq( - cacheApiKey.to(new NamedRedisCacheApiProvider(name, bind[Pool], environment.classLoader)), + if (configuration.getBoolean(s"redis.$name.sentinel.mode").getOrElse(false)) + cacheApiKey.to(new NamedSentinelCacheApiProvider(name, bind[SentinelPool], environment.classLoader)) + else + cacheApiKey.to(new NamedRedisCacheApiProvider(name, bind[Pool], environment.classLoader)), bind[JavaCacheApi].qualifiedWith(namedCache).to(new NamedJavaCacheApiProvider(cacheApiKey)), bind[Cached].qualifiedWith(namedCache).to(new NamedCachedProvider(cacheApiKey)) ) @@ -58,6 +61,8 @@ class RedisModule extends Module { val defaultBindings = Seq( bind[JedisPool].toProvider[JedisPoolProvider], bind[Pool].toProvider[SedisPoolProvider], + bind[JedisSentinelPool].toProvider[JedisSentinelPoolProvider], + bind[SentinelPool].toProvider[SedisSentinelPoolProvider], bind[JavaCacheApi].to[DefaultJavaCacheApi] ) ++ bindCaches.flatMap(bindCache) @@ -82,6 +87,13 @@ class NamedRedisCacheApiProvider(namespace: String, client: BindingKey[Pool], cl } } +class NamedSentinelCacheApiProvider(namespace: String, client: BindingKey[SentinelPool], classLoader: ClassLoader) extends Provider[CacheApi] { + @Inject private var injector: Injector = _ + lazy val get: CacheApi = { + new SentinelCacheApi(namespace, injector.instanceOf(client), classLoader) + } +} + class NamedJavaCacheApiProvider(key: BindingKey[CacheApi]) extends Provider[JavaCacheApi] { @Inject private var injector: Injector = _ lazy val get: JavaCacheApi = { diff --git a/redis/src/main/scala/com/typesafe/play/redis/SedisSentinelPoolProvider.scala b/redis/src/main/scala/com/typesafe/play/redis/SedisSentinelPoolProvider.scala new file mode 100644 index 0000000..ca57ff1 --- /dev/null +++ b/redis/src/main/scala/com/typesafe/play/redis/SedisSentinelPoolProvider.scala @@ -0,0 +1,16 @@ +package com.typesafe.play.redis + +import javax.inject.{Inject, Provider, Singleton} + +import org.sedis.SentinelPool +import redis.clients.jedis.JedisSentinelPool + +@Singleton +class SedisSentinelPoolProvider @Inject()(jedisSentinelPool: JedisSentinelPool) extends Provider[SentinelPool] { + lazy val get: SentinelPool = { + val sedisSentinelPool = { + new SentinelPool(jedisSentinelPool) + } + sedisSentinelPool + } +} diff --git a/redis/src/main/scala/com/typesafe/play/redis/SentinelCacheApi.scala b/redis/src/main/scala/com/typesafe/play/redis/SentinelCacheApi.scala new file mode 100644 index 0000000..c26577c --- /dev/null +++ b/redis/src/main/scala/com/typesafe/play/redis/SentinelCacheApi.scala @@ -0,0 +1,122 @@ +package com.typesafe.play.redis + +import java.io._ +import javax.inject.{Inject, Singleton} + +import biz.source_code.base64Coder.Base64Coder +import org.sedis.SentinelPool +import play.api.Logger +import play.api.cache.CacheApi + +import scala.concurrent.duration.Duration +import scala.reflect.ClassTag + + +@Singleton +class SentinelCacheApi @Inject()(val namespace: String, sedisPool: SentinelPool, classLoader: ClassLoader) extends CacheApi { + + private val namespacedKey: (String => String) = { x => s"$namespace::$x" } + + def get[T](userKey: String)(implicit ct: ClassTag[T]): Option[T] = { + Logger.trace(s"Reading key ${namespacedKey(userKey)}") + + try { + val rawData = sedisPool.withJedisClient { client => client.get(namespacedKey(userKey)) } + rawData match { + case null => + None + case _ => + val data: Seq[String] = rawData.split("-") + val bytes = Base64Coder.decode(data.last) + data.head match { + case "oos" => Some(withObjectInputStream(bytes)(_.readObject().asInstanceOf[T])) + case "string" => Some(withDataInputStream(bytes)(_.readUTF().asInstanceOf[T])) + case "int" => Some(withDataInputStream(bytes)(_.readInt().asInstanceOf[T])) + case "long" => Some(withDataInputStream(bytes)(_.readLong().asInstanceOf[T])) + case "boolean" => Some(withDataInputStream(bytes)(_.readBoolean().asInstanceOf[T])) + case _ => throw new IOException(s"was not able to recognize the type of serialized value. The type was ${data.head} ") + } + } + } catch { + case ex: Exception => + Logger.warn("could not deserialize key:" + namespacedKey(userKey), ex) + None + } + } + + def getOrElse[A: ClassTag](userKey: String, expiration: Duration)(orElse: => A) = { + get[A](userKey).getOrElse { + val value = orElse + set(userKey, value, expiration) + value + } + } + + def remove(userKey: String): Unit = sedisPool.withJedisClient(_.del(namespacedKey(userKey))) + + def set(userKey: String, value: Any, expiration: Duration) { + val expirationInSec = if (expiration == Duration.Inf) 0 else expiration.toSeconds.toInt + val key = namespacedKey(userKey) + + var oos: ObjectOutputStream = null + var dos: DataOutputStream = null + try { + val baos = new ByteArrayOutputStream() + val prefix = value match { + case _: String => + dos = new DataOutputStream(baos) + dos.writeUTF(value.asInstanceOf[String]) + "string" + case _: Int => + dos = new DataOutputStream(baos) + dos.writeInt(value.asInstanceOf[Int]) + "int" + case _: Long => + dos = new DataOutputStream(baos) + dos.writeLong(value.asInstanceOf[Long]) + "long" + case _: Boolean => + dos = new DataOutputStream(baos) + dos.writeBoolean(value.asInstanceOf[Boolean]) + "boolean" + case _: Serializable => + oos = new ObjectOutputStream(baos) + oos.writeObject(value) + oos.flush() + "oos" + case _ => + throw new IOException("could not serialize: " + value.toString) + } + + val redisV = prefix + "-" + new String(Base64Coder.encode(baos.toByteArray)) + Logger.trace(s"Setting key $key to $redisV") + + sedisPool.withJedisClient { client => + client.set(key, redisV) + if (expirationInSec != 0) client.expire(key, expirationInSec) + } + } catch { + case ex: IOException => + Logger.warn("could not serialize key:" + key + " and value:" + value.toString + " ex:" + ex.toString) + } finally { + if (oos != null) oos.close() + if (dos != null) dos.close() + } + } + + private class ClassLoaderObjectInputStream(stream: InputStream) extends ObjectInputStream(stream) { + override protected def resolveClass(desc: ObjectStreamClass) = { + Class.forName(desc.getName, false, classLoader) + } + } + + private def withDataInputStream[T](bytes: Array[Byte])(f: DataInputStream => T): T = { + val dis = new DataInputStream(new ByteArrayInputStream(bytes)) + try f(dis) finally dis.close() + } + + private def withObjectInputStream[T](bytes: Array[Byte])(f: ObjectInputStream => T): T = { + val ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes)) + try f(ois) finally ois.close() + } +}