Switches to what I believe is the prefered method for retrying in logstash v5
This commit is contained in:
		
							parent
							
								
									f1202f6454
								
							
						
					
					
						commit
						e32b6e9bbd
					
				@ -1,21 +0,0 @@
 | 
			
		||||
class RingBuffer < Array
 | 
			
		||||
  attr_reader :max_size
 | 
			
		||||
 | 
			
		||||
  def initialize(max_size, enum = nil)
 | 
			
		||||
    @max_size = max_size
 | 
			
		||||
    enum.each { |e| self << e } if enum
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def <<(el)
 | 
			
		||||
    if self.size < @max_size || @max_size.nil?
 | 
			
		||||
      super
 | 
			
		||||
    else
 | 
			
		||||
      self.shift
 | 
			
		||||
      self.push(el)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def not_nil_length
 | 
			
		||||
    reject { |i| i.nil? }.count
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@ -1,15 +1,12 @@
 | 
			
		||||
# encoding: utf-8
 | 
			
		||||
require "logstash/outputs/base"
 | 
			
		||||
require "logstash/namespace"
 | 
			
		||||
require "stud/buffer"
 | 
			
		||||
require "concurrent"
 | 
			
		||||
require "stud/interval"
 | 
			
		||||
require "java"
 | 
			
		||||
require "logstash-output-jdbc_jars"
 | 
			
		||||
require "logstash-output-jdbc_ring-buffer"
 | 
			
		||||
 | 
			
		||||
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
  # Adds buffer support
 | 
			
		||||
  include Stud::Buffer
 | 
			
		||||
 | 
			
		||||
  STRFTIME_FMT = "%Y-%m-%d %T.%L".freeze
 | 
			
		||||
 | 
			
		||||
  config_name "jdbc"
 | 
			
		||||
@ -53,31 +50,19 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
  # batch of events.
 | 
			
		||||
  config :flush_size, :validate => :number, :default => 1000
 | 
			
		||||
 | 
			
		||||
  # The amount of time since last flush before a flush is forced.
 | 
			
		||||
  #
 | 
			
		||||
  # This setting helps ensure slow event rates don't get stuck in Logstash.
 | 
			
		||||
  # For example, if your `flush_size` is 100, and you have received 10 events,
 | 
			
		||||
  # and it has been more than `idle_flush_time` seconds since the last flush,
 | 
			
		||||
  # Logstash will flush those 10 events automatically.
 | 
			
		||||
  #
 | 
			
		||||
  # This helps keep both fast and slow log streams moving along in
 | 
			
		||||
  # a timely manner.
 | 
			
		||||
  #
 | 
			
		||||
  # If you change this value please ensure that you change
 | 
			
		||||
  # max_flush_exceptions accordingly.
 | 
			
		||||
  config :idle_flush_time, :validate => :number, :default => 1
 | 
			
		||||
  # Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval`
 | 
			
		||||
  config :retry_initial_interval, :validate => :number, :default => 2
 | 
			
		||||
 | 
			
		||||
  # Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
 | 
			
		||||
  # Maximum time between retries, in seconds
 | 
			
		||||
  config :retry_max_interval, :validate => :number, :default => 128
 | 
			
		||||
 | 
			
		||||
  # Maximum number of sequential failed attempts, before we stop retrying.
 | 
			
		||||
  # If set to < 1, then it will infinitely retry.
 | 
			
		||||
  # 
 | 
			
		||||
  # You should carefully tune this in relation to idle_flush_time if your SQL server
 | 
			
		||||
  # is not highly available.
 | 
			
		||||
  # i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
 | 
			
		||||
  # longer than 200 seconds to reboot, then logstash will stop.
 | 
			
		||||
  config :max_flush_exceptions, :validate => :number, :default => 0
 | 
			
		||||
 | 
			
		||||
  config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
 | 
			
		||||
  config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
 | 
			
		||||
  config :idle_flush_time, :obsolete => "No longer necessary under Logstash v5"
 | 
			
		||||
 | 
			
		||||
  public
 | 
			
		||||
  def register
 | 
			
		||||
@ -86,7 +71,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
    LogStash::Logger.setup_log4j(@logger)
 | 
			
		||||
    load_jar_files!
 | 
			
		||||
 | 
			
		||||
    @exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
 | 
			
		||||
    @stopping = Concurrent::AtomicBoolean.new(false)
 | 
			
		||||
 | 
			
		||||
    if (@flush_size > 1000)
 | 
			
		||||
      @logger.warn("JDBC - Flush size is set to > 1000")
 | 
			
		||||
@ -101,47 +86,23 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    setup_and_test_pool!
 | 
			
		||||
 | 
			
		||||
    buffer_initialize(
 | 
			
		||||
      :max_items => @flush_size,
 | 
			
		||||
      :max_interval => @idle_flush_time,
 | 
			
		||||
      :logger => @logger
 | 
			
		||||
    )
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def receive(event)
 | 
			
		||||
    return unless output?(event) or event.cancelled?
 | 
			
		||||
    return unless @statement.length > 0
 | 
			
		||||
 | 
			
		||||
    buffer_receive(event)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def flush(events, teardown=false)
 | 
			
		||||
    if @unsafe_statement == true
 | 
			
		||||
      unsafe_flush(events, teardown)
 | 
			
		||||
    else
 | 
			
		||||
      safe_flush(events, teardown)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def on_flush_error(e)
 | 
			
		||||
    return if @max_flush_exceptions < 1
 | 
			
		||||
 | 
			
		||||
    @exceptions_tracker << e.class
 | 
			
		||||
 | 
			
		||||
    if @exceptions_tracker.not_nil_length >= @max_flush_exceptions
 | 
			
		||||
      @logger.error("JDBC - max_flush_exceptions has been reached")
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
      raise LogStash::ShutdownSignal.new
 | 
			
		||||
  def multi_receive(events)
 | 
			
		||||
    events.each_slice(@flush_size) do |slice|
 | 
			
		||||
      retrying_submit(slice)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def teardown
 | 
			
		||||
    buffer_flush(:final => true)
 | 
			
		||||
    @pool.close()
 | 
			
		||||
    super
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def close
 | 
			
		||||
    @stopping.make_true
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def setup_and_test_pool!  
 | 
			
		||||
@ -196,13 +157,19 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def safe_flush(events, teardown=false)
 | 
			
		||||
  def submit_safe(events)
 | 
			
		||||
    connection = nil
 | 
			
		||||
    statement = nil
 | 
			
		||||
    events_to_retry = []
 | 
			
		||||
    begin
 | 
			
		||||
      connection = @pool.getConnection()
 | 
			
		||||
      statement = connection.prepareStatement(@statement[0])
 | 
			
		||||
    rescue => e
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
      return events
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    begin
 | 
			
		||||
      statement = connection.prepareStatement(@statement[0])
 | 
			
		||||
      events.each do |event|
 | 
			
		||||
        next if event.cancelled?
 | 
			
		||||
        next if @statement.length < 2
 | 
			
		||||
@ -210,42 +177,99 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
 | 
			
		||||
        statement.addBatch()
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      statement.executeBatch()
 | 
			
		||||
      statement.close()
 | 
			
		||||
      @exceptions_tracker << nil
 | 
			
		||||
    rescue java.sql.BatchUpdateException => e
 | 
			
		||||
      # Only retry the failed items from the batch
 | 
			
		||||
      updates = e.getUpdateCounts()
 | 
			
		||||
      events_to_retry = events
 | 
			
		||||
      updates.each_with_index{ |update, idx|
 | 
			
		||||
        if (update != java.sql.Statement.EXECUTE_FAILED)
 | 
			
		||||
          # Remove any successful events
 | 
			
		||||
          events_to_retry[idx] = nil
 | 
			
		||||
        end
 | 
			
		||||
      }
 | 
			
		||||
      # Remove the nil entries
 | 
			
		||||
      events_to_retry = events_to_retry.compact
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
    rescue => e
 | 
			
		||||
      events_to_retry = events
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
    ensure
 | 
			
		||||
      statement.close() unless statement.nil?
 | 
			
		||||
      connection.close() unless connection.nil?
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    connection.close() unless connection.nil?    
 | 
			
		||||
    return events_to_retry
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def unsafe_flush(events, teardown=false)
 | 
			
		||||
  def submit_unsafe(events)
 | 
			
		||||
    connection = nil
 | 
			
		||||
    statement = nil
 | 
			
		||||
    events_to_retry = []
 | 
			
		||||
    
 | 
			
		||||
    begin
 | 
			
		||||
      connection = @pool.getConnection()
 | 
			
		||||
    rescue => e
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
      return events
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
      events.each do |event|
 | 
			
		||||
        next if event.cancelled?
 | 
			
		||||
 | 
			
		||||
    events.each do |event|
 | 
			
		||||
      begin
 | 
			
		||||
        statement = connection.prepareStatement(event.sprintf(@statement[0]))
 | 
			
		||||
        statement = add_statement_event_params(statement, event) if @statement.length > 1
 | 
			
		||||
 | 
			
		||||
        statement.execute()
 | 
			
		||||
 | 
			
		||||
        # cancel the event, since we may end up outputting the same event multiple times
 | 
			
		||||
        # if an exception happens later down the line
 | 
			
		||||
        event.cancel
 | 
			
		||||
        @exceptions_tracker << nil
 | 
			
		||||
      rescue => e
 | 
			
		||||
        log_jdbc_exception(e)
 | 
			
		||||
        events_to_retry.push(event)
 | 
			
		||||
      ensure
 | 
			
		||||
        statement.close() unless statement.nil?
 | 
			
		||||
        statement = nil
 | 
			
		||||
      end
 | 
			
		||||
    rescue => e
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
    ensure
 | 
			
		||||
      statement.close() unless statement.nil?
 | 
			
		||||
      connection.close() unless connection.nil?
 | 
			
		||||
    end
 | 
			
		||||
    
 | 
			
		||||
    connection.close() unless connection.nil?
 | 
			
		||||
 | 
			
		||||
    return events_to_retry
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def retrying_submit(actions)
 | 
			
		||||
    # Initially we submit the full list of actions
 | 
			
		||||
    submit_actions = actions
 | 
			
		||||
 | 
			
		||||
    attempts = 0
 | 
			
		||||
 | 
			
		||||
    sleep_interval = @retry_initial_interval
 | 
			
		||||
    while submit_actions && submit_actions.length > 0
 | 
			
		||||
      return if !submit_actions || submit_actions.empty? # If everything's a success we move along
 | 
			
		||||
      # We retry with whatever is didn't succeed
 | 
			
		||||
      begin
 | 
			
		||||
        if @unsafe_statement == true
 | 
			
		||||
          submit_actions = submit_unsafe(submit_actions)
 | 
			
		||||
        else
 | 
			
		||||
          submit_actions = submit_safe(submit_actions)
 | 
			
		||||
        end
 | 
			
		||||
      rescue => e
 | 
			
		||||
        log_jdbc_exception(e)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      if @max_flush_exceptions > 0
 | 
			
		||||
        attempts += 1
 | 
			
		||||
 | 
			
		||||
        if attempts > @max_flush_exceptions
 | 
			
		||||
          @logger.error("JDBC - max_flush_exceptions has been reached")
 | 
			
		||||
          break
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      # Everything was a success!
 | 
			
		||||
      break if !submit_actions || submit_actions.empty?
 | 
			
		||||
 | 
			
		||||
      # If we're retrying the action sleep for the recommended interval
 | 
			
		||||
      # Double the interval for the next time through to achieve exponential backoff
 | 
			
		||||
      Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
 | 
			
		||||
      sleep_interval = next_sleep_interval(sleep_interval)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
@ -294,4 +318,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
      break if current_exception == nil
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def next_sleep_interval(current_interval)
 | 
			
		||||
    doubled = current_interval * 2
 | 
			
		||||
    doubled > @retry_max_interval ? @retry_max_interval : doubled
 | 
			
		||||
  end
 | 
			
		||||
end # class LogStash::Outputs::jdbc
 | 
			
		||||
 | 
			
		||||
@ -13,7 +13,7 @@ Gem::Specification.new do |s|
 | 
			
		||||
  s.platform = 'java'
 | 
			
		||||
 | 
			
		||||
  # Files
 | 
			
		||||
  s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
 | 
			
		||||
  s.files = Dir.glob("{lib,spec}/**/*.rb") + Dir.glob("vendor/**/*") + %w(LICENSE.txt README.md)
 | 
			
		||||
 | 
			
		||||
   # Tests
 | 
			
		||||
  s.test_files = s.files.grep(%r{^(test|spec|features)/})
 | 
			
		||||
 | 
			
		||||
@ -69,16 +69,9 @@ RSpec.shared_context 'when outputting messages' do
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  it 'should save a event' do
 | 
			
		||||
    expect { plugin.multi_receive([event]) }.to_not raise_error
 | 
			
		||||
 | 
			
		||||
    previous_exceptions_count = plugin.instance_variable_get(:@exceptions_tracker).not_nil_length
 | 
			
		||||
 | 
			
		||||
    expect { plugin.receive(event) }.to_not raise_error
 | 
			
		||||
 | 
			
		||||
    # Force flush the buffer, so that we know anything in the buffer
 | 
			
		||||
    # has been sent
 | 
			
		||||
    plugin.buffer_flush(force: true)
 | 
			
		||||
 | 
			
		||||
    expect(plugin.instance_variable_get(:@exceptions_tracker).not_nil_length).to eq(previous_exceptions_count)
 | 
			
		||||
    sleep 5
 | 
			
		||||
 | 
			
		||||
    # Verify the number of items in the output table
 | 
			
		||||
    c = plugin.instance_variable_get(:@pool).getConnection()
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user