From e32b6e9bbdef99d6bbbfeed1f09ea3b7b1d4c6e8 Mon Sep 17 00:00:00 2001 From: Karl Southern Date: Sat, 14 May 2016 22:17:34 +0100 Subject: [PATCH] Switches to what I believe is the prefered method for retrying in logstash v5 --- lib/logstash-output-jdbc_ring-buffer.rb | 21 --- lib/logstash/outputs/jdbc.rb | 181 ++++++++++++++---------- logstash-output-jdbc.gemspec | 2 +- spec/jdbc_spec_helper.rb | 11 +- 4 files changed, 108 insertions(+), 107 deletions(-) delete mode 100644 lib/logstash-output-jdbc_ring-buffer.rb diff --git a/lib/logstash-output-jdbc_ring-buffer.rb b/lib/logstash-output-jdbc_ring-buffer.rb deleted file mode 100644 index b2f4570..0000000 --- a/lib/logstash-output-jdbc_ring-buffer.rb +++ /dev/null @@ -1,21 +0,0 @@ -class RingBuffer < Array - attr_reader :max_size - - def initialize(max_size, enum = nil) - @max_size = max_size - enum.each { |e| self << e } if enum - end - - def <<(el) - if self.size < @max_size || @max_size.nil? - super - else - self.shift - self.push(el) - end - end - - def not_nil_length - reject { |i| i.nil? }.count - end -end diff --git a/lib/logstash/outputs/jdbc.rb b/lib/logstash/outputs/jdbc.rb index f1cca8e..6b1cc67 100644 --- a/lib/logstash/outputs/jdbc.rb +++ b/lib/logstash/outputs/jdbc.rb @@ -1,15 +1,12 @@ # encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" -require "stud/buffer" +require "concurrent" +require "stud/interval" require "java" require "logstash-output-jdbc_jars" -require "logstash-output-jdbc_ring-buffer" class LogStash::Outputs::Jdbc < LogStash::Outputs::Base - # Adds buffer support - include Stud::Buffer - STRFTIME_FMT = "%Y-%m-%d %T.%L".freeze config_name "jdbc" @@ -53,31 +50,19 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base # batch of events. config :flush_size, :validate => :number, :default => 1000 - # The amount of time since last flush before a flush is forced. - # - # This setting helps ensure slow event rates don't get stuck in Logstash. - # For example, if your `flush_size` is 100, and you have received 10 events, - # and it has been more than `idle_flush_time` seconds since the last flush, - # Logstash will flush those 10 events automatically. - # - # This helps keep both fast and slow log streams moving along in - # a timely manner. - # - # If you change this value please ensure that you change - # max_flush_exceptions accordingly. - config :idle_flush_time, :validate => :number, :default => 1 + # Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval` + config :retry_initial_interval, :validate => :number, :default => 2 - # Maximum number of sequential flushes which encounter exceptions, before we stop retrying. + # Maximum time between retries, in seconds + config :retry_max_interval, :validate => :number, :default => 128 + + # Maximum number of sequential failed attempts, before we stop retrying. # If set to < 1, then it will infinitely retry. - # - # You should carefully tune this in relation to idle_flush_time if your SQL server - # is not highly available. - # i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes - # longer than 200 seconds to reboot, then logstash will stop. config :max_flush_exceptions, :validate => :number, :default => 0 config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation." config :max_repeat_exceptions_time, :obsolete => "This is no longer required" + config :idle_flush_time, :obsolete => "No longer necessary under Logstash v5" public def register @@ -86,7 +71,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base LogStash::Logger.setup_log4j(@logger) load_jar_files! - @exceptions_tracker = RingBuffer.new(@max_flush_exceptions) + @stopping = Concurrent::AtomicBoolean.new(false) if (@flush_size > 1000) @logger.warn("JDBC - Flush size is set to > 1000") @@ -101,47 +86,23 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base end setup_and_test_pool! - - buffer_initialize( - :max_items => @flush_size, - :max_interval => @idle_flush_time, - :logger => @logger - ) end - def receive(event) - return unless output?(event) or event.cancelled? - return unless @statement.length > 0 - - buffer_receive(event) - end - - def flush(events, teardown=false) - if @unsafe_statement == true - unsafe_flush(events, teardown) - else - safe_flush(events, teardown) - end - end - - def on_flush_error(e) - return if @max_flush_exceptions < 1 - - @exceptions_tracker << e.class - - if @exceptions_tracker.not_nil_length >= @max_flush_exceptions - @logger.error("JDBC - max_flush_exceptions has been reached") - log_jdbc_exception(e) - raise LogStash::ShutdownSignal.new + def multi_receive(events) + events.each_slice(@flush_size) do |slice| + retrying_submit(slice) end end def teardown - buffer_flush(:final => true) @pool.close() super end + def close + @stopping.make_true + end + private def setup_and_test_pool! @@ -196,13 +157,19 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base end end - def safe_flush(events, teardown=false) + def submit_safe(events) connection = nil statement = nil + events_to_retry = [] begin connection = @pool.getConnection() - statement = connection.prepareStatement(@statement[0]) + rescue => e + log_jdbc_exception(e) + return events + end + begin + statement = connection.prepareStatement(@statement[0]) events.each do |event| next if event.cancelled? next if @statement.length < 2 @@ -210,42 +177,99 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base statement.addBatch() end - statement.executeBatch() - statement.close() - @exceptions_tracker << nil + rescue java.sql.BatchUpdateException => e + # Only retry the failed items from the batch + updates = e.getUpdateCounts() + events_to_retry = events + updates.each_with_index{ |update, idx| + if (update != java.sql.Statement.EXECUTE_FAILED) + # Remove any successful events + events_to_retry[idx] = nil + end + } + # Remove the nil entries + events_to_retry = events_to_retry.compact + log_jdbc_exception(e) rescue => e + events_to_retry = events log_jdbc_exception(e) ensure statement.close() unless statement.nil? - connection.close() unless connection.nil? end + + connection.close() unless connection.nil? + return events_to_retry end - def unsafe_flush(events, teardown=false) + def submit_unsafe(events) connection = nil statement = nil + events_to_retry = [] + begin connection = @pool.getConnection() + rescue => e + log_jdbc_exception(e) + return events + end - events.each do |event| - next if event.cancelled? - + events.each do |event| + begin statement = connection.prepareStatement(event.sprintf(@statement[0])) statement = add_statement_event_params(statement, event) if @statement.length > 1 statement.execute() - - # cancel the event, since we may end up outputting the same event multiple times - # if an exception happens later down the line - event.cancel - @exceptions_tracker << nil + rescue => e + log_jdbc_exception(e) + events_to_retry.push(event) + ensure + statement.close() unless statement.nil? + statement = nil end - rescue => e - log_jdbc_exception(e) - ensure - statement.close() unless statement.nil? - connection.close() unless connection.nil? + end + + connection.close() unless connection.nil? + + return events_to_retry + end + + def retrying_submit(actions) + # Initially we submit the full list of actions + submit_actions = actions + + attempts = 0 + + sleep_interval = @retry_initial_interval + while submit_actions && submit_actions.length > 0 + return if !submit_actions || submit_actions.empty? # If everything's a success we move along + # We retry with whatever is didn't succeed + begin + if @unsafe_statement == true + submit_actions = submit_unsafe(submit_actions) + else + submit_actions = submit_safe(submit_actions) + end + rescue => e + log_jdbc_exception(e) + end + + if @max_flush_exceptions > 0 + attempts += 1 + + if attempts > @max_flush_exceptions + @logger.error("JDBC - max_flush_exceptions has been reached") + break + end + end + + # Everything was a success! + break if !submit_actions || submit_actions.empty? + + # If we're retrying the action sleep for the recommended interval + # Double the interval for the next time through to achieve exponential backoff + Stud.stoppable_sleep(sleep_interval) { @stopping.true? } + sleep_interval = next_sleep_interval(sleep_interval) end end @@ -294,4 +318,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base break if current_exception == nil end end + + def next_sleep_interval(current_interval) + doubled = current_interval * 2 + doubled > @retry_max_interval ? @retry_max_interval : doubled + end end # class LogStash::Outputs::jdbc diff --git a/logstash-output-jdbc.gemspec b/logstash-output-jdbc.gemspec index be3a4cb..2f1c301 100644 --- a/logstash-output-jdbc.gemspec +++ b/logstash-output-jdbc.gemspec @@ -13,7 +13,7 @@ Gem::Specification.new do |s| s.platform = 'java' # Files - s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md) + s.files = Dir.glob("{lib,spec}/**/*.rb") + Dir.glob("vendor/**/*") + %w(LICENSE.txt README.md) # Tests s.test_files = s.files.grep(%r{^(test|spec|features)/}) diff --git a/spec/jdbc_spec_helper.rb b/spec/jdbc_spec_helper.rb index 489e32b..f712b57 100644 --- a/spec/jdbc_spec_helper.rb +++ b/spec/jdbc_spec_helper.rb @@ -69,16 +69,9 @@ RSpec.shared_context 'when outputting messages' do } it 'should save a event' do + expect { plugin.multi_receive([event]) }.to_not raise_error - previous_exceptions_count = plugin.instance_variable_get(:@exceptions_tracker).not_nil_length - - expect { plugin.receive(event) }.to_not raise_error - - # Force flush the buffer, so that we know anything in the buffer - # has been sent - plugin.buffer_flush(force: true) - - expect(plugin.instance_variable_get(:@exceptions_tracker).not_nil_length).to eq(previous_exceptions_count) + sleep 5 # Verify the number of items in the output table c = plugin.instance_variable_get(:@pool).getConnection()