diff --git a/.travis.yml b/.travis.yml index ef3807c..c1be87b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ language: ruby rvm: - - 1.9.3 - - 2.0 - 2.1 - 2.2.2 + - 2.3.4 + - 2.4.1 gemfile: - Gemfile diff --git a/fluent-plugin-gcloud-pubsub.gemspec b/fluent-plugin-gcloud-pubsub.gemspec index d97910a..ea9a173 100644 --- a/fluent-plugin-gcloud-pubsub.gemspec +++ b/fluent-plugin-gcloud-pubsub.gemspec @@ -16,7 +16,7 @@ Gem::Specification.new do |gem| gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } gem.require_paths = ['lib'] - gem.add_runtime_dependency "fluentd", "~> 0.12.0" + gem.add_runtime_dependency "fluentd", [">= 0.14.15", "< 2"] gem.add_runtime_dependency "gcloud", "= 0.6.3" gem.add_runtime_dependency "fluent-plugin-buffer-lightening", ">= 0.0.2" diff --git a/lib/fluent/plugin/in_gcloud_pubsub.rb b/lib/fluent/plugin/in_gcloud_pubsub.rb index 80a4d8c..cc92f48 100644 --- a/lib/fluent/plugin/in_gcloud_pubsub.rb +++ b/lib/fluent/plugin/in_gcloud_pubsub.rb @@ -1,40 +1,34 @@ require 'gcloud' -require 'fluent/input' -require 'fluent/parser' +require 'fluent/plugin/input' +require 'fluent/plugin/parser' -module Fluent +module Fluent::Plugin class GcloudPubSubInput < Input Fluent::Plugin.register_input('gcloud_pubsub', self) + helpers :parser, :thread + config_param :tag, :string config_param :project, :string, :default => nil - config_param :topic, :string, :default => nil - config_param :subscription, :string, :default => nil + config_param :topic, :string + config_param :subscription, :string config_param :key, :string, :default => nil config_param :pull_interval, :integer, :default => 5 config_param :max_messages, :integer, :default => 100 config_param :return_immediately, :bool, :default => true - unless method_defined?(:log) - define_method("log") { $log } - end - - unless method_defined?(:router) - define_method("router") { Fluent::Engine } + config_section :parse do + config_set_default :@type, 'json' end def configure(conf) super - raise Fluent::ConfigError, "'topic' must be specified." unless @topic - raise Fluent::ConfigError, "'subscription' must be specified." unless @subscription - configure_parser(conf) end def configure_parser(conf) - @parser = Fluent::TextParser.new - @parser.configure(conf) + @parser = parser_create end def start @@ -44,7 +38,7 @@ def start topic = pubsub.topic @topic @client = topic.subscription @subscription @stop_subscribing = false - @subscribe_thread = Thread.new(&method(:subscribe)) + @subscribe_thread = thread_create(:in_gcloud_pubsub_input, &method(:subscribe)) end def shutdown @@ -55,10 +49,6 @@ def shutdown end private - def configure_parser(conf) - @parser = Fluent::TextParser.new - @parser.configure(conf) - end def subscribe until @stop_subscribing @@ -87,7 +77,7 @@ def subscribe end def parse_messages(messages) - es = MultiEventStream.new + es = Fluent::MultiEventStream.new messages.each do |m| convert_line_to_event(m.message.data, es) end diff --git a/lib/fluent/plugin/out_gcloud_pubsub.rb b/lib/fluent/plugin/out_gcloud_pubsub.rb index d80d0bf..95eb86e 100644 --- a/lib/fluent/plugin/out_gcloud_pubsub.rb +++ b/lib/fluent/plugin/out_gcloud_pubsub.rb @@ -1,34 +1,33 @@ require 'gcloud' -require 'fluent/output' +require 'fluent/plugin/output' -module Fluent - class GcloudPubSubOutput < BufferedOutput +module Fluent::Plugin + class GcloudPubSubOutput < Output Fluent::Plugin.register_output('gcloud_pubsub', self) - config_set_default :buffer_type, 'lightening' - config_set_default :flush_interval, 1 - config_set_default :try_flush_interval, 0.05 - config_set_default :buffer_chunk_records_limit, 900 - config_set_default :buffer_chunk_limit, 9437184 - config_set_default :buffer_queue_limit, 64 + helpers :compat_parameters + + DEFAULT_BUFFER_TYPE = "memory" config_param :project, :string, :default => nil - config_param :topic, :string, :default => nil + config_param :topic, :string config_param :key, :string, :default => nil config_param :autocreate_topic, :bool, :default => false - unless method_defined?(:log) - define_method("log") { $log } - end - - unless method_defined?(:router) - define_method("router") { Fluent::Engine } + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + # In v0.14, buffer configurations are renamed. + # see: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/buffer.rb + config_set_default :flush_interval, 1 + config_set_default :try_flush_interval, 0.05 + config_set_default :chunk_limit_records, 900 + config_set_default :chunk_limit_size, 9437184 + config_set_default :queue_limit_length, 64 end def configure(conf) + compat_parameters_convert(conf, :buffer) super - - raise Fluent::ConfigError, "'topic' must be specified." unless @topic end def start @@ -42,6 +41,14 @@ def format(tag, time, record) [tag, time, record].to_msgpack end + def formatted_to_msgpack_binary? + true + end + + def multi_workers_ready? + true + end + def write(chunk) messages = [] diff --git a/test/plugin/test_in_gcloud_pubsub.rb b/test/plugin/test_in_gcloud_pubsub.rb index c185ee9..1f9d688 100644 --- a/test/plugin/test_in_gcloud_pubsub.rb +++ b/test/plugin/test_in_gcloud_pubsub.rb @@ -1,5 +1,5 @@ require_relative "../test_helper" - +require 'fluent/test/driver/input' class GcloudPubSubInputTest < Test::Unit::TestCase def setup @@ -7,7 +7,7 @@ def setup end def create_driver(conf=CONFIG) - Fluent::Test::InputTestDriver.new(Fluent::GcloudPubSubInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::GcloudPubSubInput).configure(conf) end def test_configure diff --git a/test/plugin/test_out_gcloud_pubsub.rb b/test/plugin/test_out_gcloud_pubsub.rb index d128fd9..7d71795 100644 --- a/test/plugin/test_out_gcloud_pubsub.rb +++ b/test/plugin/test_out_gcloud_pubsub.rb @@ -1,5 +1,5 @@ require_relative "../test_helper" - +require 'fluent/test/driver/output' class GcloudPubSubOutputTest < Test::Unit::TestCase DEFAULT_CONFIG = <<-EOC @@ -16,7 +16,7 @@ def setup end def create_driver(conf) - Fluent::Test::BufferedOutputTestDriver.new(Fluent::GcloudPubSubOutput).configure(conf) + Fluent::Test::Driver::Output.new(Fluent::Plugin::GcloudPubSubOutput).configure(conf) end def test_configure @@ -32,7 +32,7 @@ def test_configure assert_equal('topic-test', d.instance.topic) assert_equal('key-test', d.instance.key) assert_equal(false, d.instance.autocreate_topic) - assert_equal(1, d.instance.flush_interval) + assert_equal(1, d.instance.buffer_config.flush_interval) end def test_autocreate_topic @@ -47,7 +47,9 @@ def test_autocreate_topic assert_equal(true, d.instance.autocreate_topic) - chunk = Fluent::MemoryBufferChunk.new('key', 'data') + tag, time, record = "tag", Fluent::Engine.now, {"a" => "b"} + metadata = d.instance.metadata_for_test(tag, time, record) + chunk = d.instance.buffer.generate_chunk(metadata) client = mock! client.topic("topic-test", autocreate: true).once @@ -61,7 +63,14 @@ def test_autocreate_topic def test_re_raise_errors d = create_driver(DEFAULT_CONFIG) - chunk = Fluent::MemoryBufferChunk.new('key', 'data') + tag, time, record = "tag", Fluent::Engine.now, {"a" => "b"} + metadata = d.instance.metadata_for_test(tag, time, record) + chunk = d.instance.buffer.generate_chunk(metadata).tap do |c| + c.append([d.instance.format(tag, time, record)]) + end + # For chunk#msgpack_each + chunk.extend Fluent::ChunkMessagePackEventStreamer + client = Object.new def client.publish raise ReRaisedError