From 496fc10f815a945c3ecea6fb63955e22d0fa1bf5 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 13 Aug 2023 12:44:20 -0500 Subject: [PATCH 1/4] Add support for NATS as a backend --- .gitignore | 4 +- examples/multi_backend/README.md | 36 +++++++++++ examples/multi_backend/shard.lock | 42 +++++++++++++ examples/multi_backend/shard.yml | 23 ++++++++ .../multi_backend/spec/multi_backend_spec.cr | 4 ++ examples/multi_backend/spec/spec_helper.cr | 1 + examples/multi_backend/src/multi_backend.cr | 48 +++++++++++++++ src/backend/nats.cr | 59 +++++++++++++++++++ src/backend/redis/backend.cr | 5 ++ src/cable.cr | 9 ++- src/cable/backend_core.cr | 32 ++++++++++ 11 files changed, 258 insertions(+), 5 deletions(-) create mode 100644 examples/multi_backend/README.md create mode 100644 examples/multi_backend/shard.lock create mode 100644 examples/multi_backend/shard.yml create mode 100644 examples/multi_backend/spec/multi_backend_spec.cr create mode 100644 examples/multi_backend/spec/spec_helper.cr create mode 100644 examples/multi_backend/src/multi_backend.cr create mode 100644 src/backend/nats.cr diff --git a/.gitignore b/.gitignore index 0bbd4a9..4b01810 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ /docs/ -/lib/ -/bin/ +lib/ +bin/ /.shards/ *.dwarf diff --git a/examples/multi_backend/README.md b/examples/multi_backend/README.md new file mode 100644 index 0000000..976e389 --- /dev/null +++ b/examples/multi_backend/README.md @@ -0,0 +1,36 @@ +# Multi-Backend example + +This is an example Turbo+Cable app to demo the use of multiple backends with the Cable shard. + +## Installation + +To use the Redis and NATS backends, you will need to install both the Redis and +NATS servers. A package manager for your operating system might make this +simple. + +Once you have Redis and NATS installed, install the Crystal dependencies: + +```shell +shards install +``` + +## Usage + +To use either backend, specify the url in the `CABLE_BACKEND_URL` environment variable: + +```shell +CABLE_BACKEND_URL=redis:/// +CABLE_BACKEND_URL=nats:/// +``` + +If you would like to see the messages passing through Redis, you can use the Redis CLI with the following command: + +```shell +redis-cli subscribe time +``` + +If you would like to see the messages passing through NATS, you can use the [NATS CLI](https://github.com/nats-io/natscli), which may need to be installed separately from the NATS server. + +```shell +nats sub time +``` diff --git a/examples/multi_backend/shard.lock b/examples/multi_backend/shard.lock new file mode 100644 index 0000000..3feb84e --- /dev/null +++ b/examples/multi_backend/shard.lock @@ -0,0 +1,42 @@ +version: 2.0 +shards: + base32: + git: https://github.com/jgaskins/base32.git + version: 0.1.1+git.commit.18f5647c42dae4de654e3003825fab43dc95b029 + + cable: + path: ../.. + version: 0.2.2 + + cron_parser: + git: https://github.com/kostya/cron_parser.git + version: 0.4.0 + + db: + git: https://github.com/crystal-lang/crystal-db.git + version: 0.12.0 + + future: + git: https://github.com/crystal-community/future.cr.git + version: 1.0.0 + + habitat: + git: https://github.com/luckyframework/habitat.git + version: 0.4.7 + + nats: + git: https://github.com/jgaskins/nats.git + version: 1.3.3 + + redis: + git: https://github.com/jgaskins/redis.git + version: 0.7.0 + + tasker: + git: https://github.com/spider-gazelle/tasker.git + version: 2.1.4 + + turbo: + git: https://github.com/jgaskins/turbo.git + version: 0.1.0+git.commit.8685616e26d7903d1559f5f3f8b96085bc10af12 + diff --git a/examples/multi_backend/shard.yml b/examples/multi_backend/shard.yml new file mode 100644 index 0000000..1164a26 --- /dev/null +++ b/examples/multi_backend/shard.yml @@ -0,0 +1,23 @@ +name: multi_backend +version: 0.1.0 + +authors: + - Jamie Gaskins + +targets: + multi_backend: + main: src/multi_backend.cr + +dependencies: + cable: + path: ../.. + nats: + github: jgaskins/nats + redis: + github: jgaskins/redis + turbo: + github: jgaskins/turbo + +crystal: 1.9.2 + +license: MIT diff --git a/examples/multi_backend/spec/multi_backend_spec.cr b/examples/multi_backend/spec/multi_backend_spec.cr new file mode 100644 index 0000000..df39e44 --- /dev/null +++ b/examples/multi_backend/spec/multi_backend_spec.cr @@ -0,0 +1,4 @@ +require "./spec_helper" + +describe "multiple backends" do +end diff --git a/examples/multi_backend/spec/spec_helper.cr b/examples/multi_backend/spec/spec_helper.cr new file mode 100644 index 0000000..e2f4f80 --- /dev/null +++ b/examples/multi_backend/spec/spec_helper.cr @@ -0,0 +1 @@ +require "spec" diff --git a/examples/multi_backend/src/multi_backend.cr b/examples/multi_backend/src/multi_backend.cr new file mode 100644 index 0000000..d4976d3 --- /dev/null +++ b/examples/multi_backend/src/multi_backend.cr @@ -0,0 +1,48 @@ +require "turbo/cable" +require "cable/backend/nats" +require "cable/backend/redis/backend" + +module AppCable + class Connection < Cable::Connection + identified_by id + + getter id = UUID.random.to_s + + def connect + end + end +end + +Cable.configure do |settings| + settings.route = "/cable" # the URL your JS Client will connect + # settings.url = "redis:///" + # settings.url = ENV.fetch("NATS_URL", "nats:///") + settings.url = ENV.fetch("CABLE_BACKEND_URL", "redis:///") +end + +Turbo::StreamsChannel.signing_key = "this is my signing key" + +spawn do + loop do + duration = Time.measure do + Turbo::StreamsChannel.broadcast_update_to "time", + message: Time.local.to_s + end + sleep 1.second - duration + end +end + +http = HTTP::Server.new([ + HTTP::LogHandler.new, + Cable::Handler(AppCable::Connection).new, +]) do |context| + context.response << <<-HTML + + #{Turbo.javascript_tag} + #{Turbo.cable_tag} + #{Turbo::Frame.new(id: "time") { }} + #{Turbo.stream_from "time"} + HTML +end + +http.listen 3200 diff --git a/src/backend/nats.cr b/src/backend/nats.cr new file mode 100644 index 0000000..cb9bf74 --- /dev/null +++ b/src/backend/nats.cr @@ -0,0 +1,59 @@ +require "nats" + +module Cable + class NATSBackend < BackendCore + register "nats" + + getter nats : NATS::Client do + NATS::Client.new(URI.parse(Cable.settings.url)) + end + getter streams = Hash(String, Set(Int64)).new { |streams, channel| + streams[channel] = Set(Int64).new + } + + def subscribe_connection + nats + end + + def publish_connection + nats + end + + def close_subscribe_connection + nats.close rescue nil + end + + def close_publish_connection + nats.close rescue nil + end + + def open_subscribe_connection(channel) + nats + end + + def publish_message(stream_identifier : String, message : String) + nats.publish stream_identifier, message + end + + def subscribe(stream_identifier : String) + subscription = nats.subscribe stream_identifier, queue_group: object_id.to_s do |msg| + Cable.server.fiber_channel.send({ + msg.subject.sub(/\Acable\./, ""), + String.new(msg.body), + }) + end + streams[stream_identifier] << subscription.sid + end + + def unsubscribe(stream_identifier : String) + end + + def ping_redis_subscribe + nats.ping + end + + def ping_redis_publish + nats.ping + end + end +end diff --git a/src/backend/redis/backend.cr b/src/backend/redis/backend.cr index 08a4f9a..e193813 100644 --- a/src/backend/redis/backend.cr +++ b/src/backend/redis/backend.cr @@ -1,5 +1,10 @@ +require "redis" + module Cable class RedisBackend < Cable::BackendCore + register "redis" + register "rediss" + # connection management getter redis_subscribe : Redis::Connection = Redis::Connection.new(URI.parse(Cable.settings.url)) getter redis_publish : Redis::Client = Redis::Client.new(URI.parse(Cable.settings.url)) diff --git a/src/cable.cr b/src/cable.cr index af3f441..f1e2761 100644 --- a/src/cable.cr +++ b/src/cable.cr @@ -1,6 +1,5 @@ require "habitat" require "json" -require "redis" require "./cable/**" # TODO: Write documentation for `Cable` @@ -32,8 +31,12 @@ module Cable setting token : String = "token", example: "token" setting url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379" setting disable_sec_websocket_protocol_header : Bool = false - setting backend_class : Cable::BackendCore.class = Cable::RedisBackend, example: "Cable::RedisBackend" - setting redis_ping_interval : Time::Span = 15.seconds + setting backend_class : Cable::BackendCore.class = Cable::RegistryBackend, example: "Cable::RedisBackend" + setting backend_ping_interval : Time::Span = 15.seconds + @[Deprecated("Use backend_ping_interval")] + setting redis_ping_interval : Time::Span do + backend_ping_interval + end setting restart_error_allowance : Int32 = 20 setting on_error : Proc(Exception, String, Nil) = ->(exception : Exception, message : String) do Cable::Logger.error(exception: exception) { message } diff --git a/src/cable/backend_core.cr b/src/cable/backend_core.cr index 9d04b9c..3038b28 100644 --- a/src/cable/backend_core.cr +++ b/src/cable/backend_core.cr @@ -1,5 +1,9 @@ module Cable abstract class BackendCore + def self.register(uri_scheme : String, backend : BackendCore.class = self) + ::Cable::RegistryBackend.register uri_scheme, backend + end + # connection management abstract def subscribe_connection abstract def publish_connection @@ -21,4 +25,32 @@ module Cable abstract def ping_redis_subscribe abstract def ping_redis_publish end + + class RegistryBackend < BackendCore + REGISTERED_BACKENDS = {} of String => BackendCore.class + + def self.register(uri_scheme : String, backend : BackendCore.class = self) + REGISTERED_BACKENDS[uri_scheme] = backend + end + + @backend : BackendCore + + def initialize + @backend = REGISTERED_BACKENDS[URI.parse(::Cable.settings.url).scheme].new + end + + delegate( + subscribe_connection, + publish_connection, + close_subscribe_connection, + close_publish_connection, + open_subscribe_connection, + publish_message, + subscribe, + unsubscribe, + ping_redis_subscribe, + ping_redis_publish, + to: @backend + ) + end end From 0fb0f59ba8d16edabb48c282d1c482b2d6886bfb Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 13 Aug 2023 12:50:40 -0500 Subject: [PATCH 2/4] Remove example specs There was nothing there anyway --- examples/multi_backend/spec/multi_backend_spec.cr | 4 ---- examples/multi_backend/spec/spec_helper.cr | 1 - 2 files changed, 5 deletions(-) delete mode 100644 examples/multi_backend/spec/multi_backend_spec.cr delete mode 100644 examples/multi_backend/spec/spec_helper.cr diff --git a/examples/multi_backend/spec/multi_backend_spec.cr b/examples/multi_backend/spec/multi_backend_spec.cr deleted file mode 100644 index df39e44..0000000 --- a/examples/multi_backend/spec/multi_backend_spec.cr +++ /dev/null @@ -1,4 +0,0 @@ -require "./spec_helper" - -describe "multiple backends" do -end diff --git a/examples/multi_backend/spec/spec_helper.cr b/examples/multi_backend/spec/spec_helper.cr deleted file mode 100644 index e2f4f80..0000000 --- a/examples/multi_backend/spec/spec_helper.cr +++ /dev/null @@ -1 +0,0 @@ -require "spec" From 12707cd6c09de387ff3f80076314e1185182ba7f Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 13 Aug 2023 12:58:50 -0500 Subject: [PATCH 3/4] Add support for unsubscribing via NATS --- src/backend/nats.cr | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/backend/nats.cr b/src/backend/nats.cr index cb9bf74..2984d0b 100644 --- a/src/backend/nats.cr +++ b/src/backend/nats.cr @@ -7,8 +7,8 @@ module Cable getter nats : NATS::Client do NATS::Client.new(URI.parse(Cable.settings.url)) end - getter streams = Hash(String, Set(Int64)).new { |streams, channel| - streams[channel] = Set(Int64).new + getter streams = Hash(String, Set(NATS::Subscription)).new { |streams, channel| + streams[channel] = Set(NATS::Subscription).new } def subscribe_connection @@ -38,14 +38,19 @@ module Cable def subscribe(stream_identifier : String) subscription = nats.subscribe stream_identifier, queue_group: object_id.to_s do |msg| Cable.server.fiber_channel.send({ - msg.subject.sub(/\Acable\./, ""), + stream_identifier, String.new(msg.body), }) end - streams[stream_identifier] << subscription.sid + streams[stream_identifier] << subscription end def unsubscribe(stream_identifier : String) + if subscriptions = streams.delete(stream_identifier) + subscriptions.each do |subscription| + nats.unsubscribe subscription + end + end end def ping_redis_subscribe From e64bd3360afc3a281d3332f7c5a55f5f7577b5a4 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 13 Aug 2023 13:06:37 -0500 Subject: [PATCH 4/4] Add note about demo.nats.io --- examples/multi_backend/README.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/examples/multi_backend/README.md b/examples/multi_backend/README.md index 976e389..7d6cc31 100644 --- a/examples/multi_backend/README.md +++ b/examples/multi_backend/README.md @@ -4,9 +4,11 @@ This is an example Turbo+Cable app to demo the use of multiple backends with the ## Installation -To use the Redis and NATS backends, you will need to install both the Redis and -NATS servers. A package manager for your operating system might make this -simple. +To use the Redis and NATS backends, you will need to have access to running +Redis and NATS servers. A package manager for your operating system can simplify +the installation of them on your machine. + +If you don't want to install a NATS server, you can use publicly available servers. For example, there is a public NATS server available at `demo.nats.io` — just don't use it for production. 😄 Once you have Redis and NATS installed, install the Crystal dependencies: @@ -21,15 +23,16 @@ To use either backend, specify the url in the `CABLE_BACKEND_URL` environment va ```shell CABLE_BACKEND_URL=redis:/// CABLE_BACKEND_URL=nats:/// +CABLE_BACKEND_URL=nats://demo.nats.io/ ``` -If you would like to see the messages passing through Redis, you can use the Redis CLI with the following command: +If you would like to see the messages passing through Redis when using the Redis backend, you can use the Redis CLI with the following command: ```shell redis-cli subscribe time ``` -If you would like to see the messages passing through NATS, you can use the [NATS CLI](https://github.com/nats-io/natscli), which may need to be installed separately from the NATS server. +If you would like to see the messages passing through NATS when using the NATS backend, you can use the [NATS CLI](https://github.com/nats-io/natscli), which may need to be installed separately from the NATS server. ```shell nats sub time