More v5 adjustments
This commit is contained in:
		
							parent
							
								
									85b3f31051
								
							
						
					
					
						commit
						fe2e23ac27
					
				@ -50,8 +50,8 @@ For development:
 | 
			
		||||
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
 | 
			
		||||
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
 | 
			
		||||
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
 | 
			
		||||
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 0 |
 | 
			
		||||
| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure | No | 2 |
 | 
			
		||||
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 |
 | 
			
		||||
| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
 | 
			
		||||
| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
 | 
			
		||||
 | 
			
		||||
## Example configurations
 | 
			
		||||
 | 
			
		||||
@ -9,6 +9,40 @@ require "logstash-output-jdbc_jars"
 | 
			
		||||
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
  STRFTIME_FMT = "%Y-%m-%d %T.%L".freeze
 | 
			
		||||
 | 
			
		||||
  # Will never work, but only because it duplicates data (i.e. duplicate keys)
 | 
			
		||||
  # Will throw a warning.
 | 
			
		||||
  SQL_STATES_IGNORE = [
 | 
			
		||||
    ### Constraint Violation 
 | 
			
		||||
    # Integrity constraint violation.
 | 
			
		||||
    23000,
 | 
			
		||||
    # A violation of the constraint imposed by a unique index or a unique constraint occurred.
 | 
			
		||||
    23505
 | 
			
		||||
  ]
 | 
			
		||||
 | 
			
		||||
  # Will never work because of SQL statement errors.
 | 
			
		||||
  # Will throw an error.
 | 
			
		||||
  SQL_STATES_FATAL = [
 | 
			
		||||
    ### Data Exception
 | 
			
		||||
    # Character data, right truncation occurred. Field too small.
 | 
			
		||||
    22001,
 | 
			
		||||
    # Numeric value out of range.
 | 
			
		||||
    22003,
 | 
			
		||||
    # A null value is not allowed.
 | 
			
		||||
    22004,
 | 
			
		||||
    # Invalid datetime format.
 | 
			
		||||
    22007,
 | 
			
		||||
    # A parameter or host variable value is invalid.
 | 
			
		||||
    22023,
 | 
			
		||||
    # Character conversion resulted in truncation.
 | 
			
		||||
    22524,
 | 
			
		||||
 | 
			
		||||
    ### Constraint Violation 
 | 
			
		||||
    # The insert or update value of a foreign key is invalid.
 | 
			
		||||
    23503,
 | 
			
		||||
    # The range of values for the identity column or sequence is exhausted.
 | 
			
		||||
    23522
 | 
			
		||||
  ]
 | 
			
		||||
 | 
			
		||||
  config_name "jdbc"
 | 
			
		||||
 | 
			
		||||
  # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
 | 
			
		||||
@ -58,7 +92,8 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
 | 
			
		||||
  # Maximum number of sequential failed attempts, before we stop retrying.
 | 
			
		||||
  # If set to < 1, then it will infinitely retry.
 | 
			
		||||
  config :max_flush_exceptions, :validate => :number, :default => 0
 | 
			
		||||
  # At the default values this is a little over 10 minutes
 | 
			
		||||
  config :max_flush_exceptions, :validate => :number, :default => 10 
 | 
			
		||||
 | 
			
		||||
  config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
 | 
			
		||||
  config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
 | 
			
		||||
@ -157,52 +192,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
      
 | 
			
		||||
  def submit_safe(events)
 | 
			
		||||
    connection = nil
 | 
			
		||||
    statement = nil
 | 
			
		||||
    events_to_retry = []
 | 
			
		||||
    begin
 | 
			
		||||
      connection = @pool.getConnection()
 | 
			
		||||
    rescue => e
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
      return events
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    begin
 | 
			
		||||
      statement = connection.prepareStatement(@statement[0])
 | 
			
		||||
      events.each do |event|
 | 
			
		||||
        next if event.cancelled?
 | 
			
		||||
        next if @statement.length < 2
 | 
			
		||||
        statement = add_statement_event_params(statement, event)
 | 
			
		||||
 | 
			
		||||
        statement.addBatch()
 | 
			
		||||
      end
 | 
			
		||||
      statement.executeBatch()
 | 
			
		||||
    rescue java.sql.BatchUpdateException => e
 | 
			
		||||
      # Only retry the failed items from the batch
 | 
			
		||||
      updates = e.getUpdateCounts()
 | 
			
		||||
      events_to_retry = events
 | 
			
		||||
      updates.each_with_index{ |update, idx|
 | 
			
		||||
        if (update != java.sql.Statement.EXECUTE_FAILED)
 | 
			
		||||
          # Remove any successful events
 | 
			
		||||
          events_to_retry[idx] = nil
 | 
			
		||||
        end
 | 
			
		||||
      }
 | 
			
		||||
      # Remove the nil entries
 | 
			
		||||
      events_to_retry = events_to_retry.compact
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
    rescue => e
 | 
			
		||||
      events_to_retry = events
 | 
			
		||||
      log_jdbc_exception(e)
 | 
			
		||||
    ensure
 | 
			
		||||
      statement.close() unless statement.nil?
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    connection.close() unless connection.nil?    
 | 
			
		||||
    return events_to_retry
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def submit_unsafe(events)
 | 
			
		||||
  def submit(events)
 | 
			
		||||
    connection = nil
 | 
			
		||||
    statement = nil
 | 
			
		||||
    events_to_retry = []
 | 
			
		||||
@ -216,16 +206,30 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
 | 
			
		||||
    events.each do |event|
 | 
			
		||||
      begin
 | 
			
		||||
        statement = connection.prepareStatement(event.sprintf(@statement[0]))
 | 
			
		||||
        if @unsafe_statement == true
 | 
			
		||||
          statement = connection.prepareStatement(event.sprintf(@statement[0]))
 | 
			
		||||
        else
 | 
			
		||||
          statement = connection.prepareStatement(@statement[0])
 | 
			
		||||
        end
 | 
			
		||||
        
 | 
			
		||||
        statement = add_statement_event_params(statement, event) if @statement.length > 1
 | 
			
		||||
 | 
			
		||||
        statement.execute()
 | 
			
		||||
      rescue java.sql.SQLException => e
 | 
			
		||||
        if SQL_STATES_IGNORE.include? e.getSQLState()
 | 
			
		||||
          @logger.warn('JDBC - Dropping event. Ignore-able exception (duplicate key most likely)', exception: e, event: event)
 | 
			
		||||
        elsif SQL_STATES_FATAL.include? e.getSQLState()
 | 
			
		||||
          @logger.error('JDBC - Fatal SQL exception. Can never succeed. Dropping event.', exception: e, event: event)
 | 
			
		||||
        else
 | 
			
		||||
          log_jdbc_exception(e)
 | 
			
		||||
          events_to_retry.push(event)
 | 
			
		||||
        end
 | 
			
		||||
      rescue => e
 | 
			
		||||
        # Something else happened.
 | 
			
		||||
        log_jdbc_exception(e)        
 | 
			
		||||
        events_to_retry.push(event)
 | 
			
		||||
      ensure
 | 
			
		||||
        statement.close() unless statement.nil?
 | 
			
		||||
        statement = nil
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
    
 | 
			
		||||
@ -243,29 +247,21 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
    sleep_interval = @retry_initial_interval
 | 
			
		||||
    while submit_actions && submit_actions.length > 0
 | 
			
		||||
      return if !submit_actions || submit_actions.empty? # If everything's a success we move along
 | 
			
		||||
      # We retry with whatever is didn't succeed
 | 
			
		||||
      begin
 | 
			
		||||
        if @unsafe_statement == true
 | 
			
		||||
          submit_actions = submit_unsafe(submit_actions)
 | 
			
		||||
        else
 | 
			
		||||
          submit_actions = submit_safe(submit_actions)
 | 
			
		||||
        end
 | 
			
		||||
      rescue => e
 | 
			
		||||
        log_jdbc_exception(e)
 | 
			
		||||
      end
 | 
			
		||||
      # We retry whatever didn't succeed
 | 
			
		||||
      submit_actions = submit(submit_actions)
 | 
			
		||||
 | 
			
		||||
      # Everything was a success!
 | 
			
		||||
      break if !submit_actions || submit_actions.empty?
 | 
			
		||||
 | 
			
		||||
      if @max_flush_exceptions > 0
 | 
			
		||||
        attempts += 1
 | 
			
		||||
 | 
			
		||||
        if attempts > @max_flush_exceptions
 | 
			
		||||
          @logger.error("JDBC - max_flush_exceptions has been reached")
 | 
			
		||||
          @logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.")
 | 
			
		||||
          break
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      # Everything was a success!
 | 
			
		||||
      break if !submit_actions || submit_actions.empty?
 | 
			
		||||
 | 
			
		||||
      # If we're retrying the action sleep for the recommended interval
 | 
			
		||||
      # Double the interval for the next time through to achieve exponential backoff
 | 
			
		||||
      Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
 | 
			
		||||
@ -313,7 +309,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
			
		||||
  def log_jdbc_exception(exception)
 | 
			
		||||
    current_exception = exception
 | 
			
		||||
    loop do
 | 
			
		||||
      @logger.error("JDBC Exception encountered: Will automatically retry.", :exception => current_exception)
 | 
			
		||||
      @logger.warn("JDBC Exception encountered: Will automatically retry.", :exception => current_exception)
 | 
			
		||||
      current_exception = current_exception.getNextException()
 | 
			
		||||
      break if current_exception == nil
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										20
									
								
								scripts/minutes_to_retries.rb
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										20
									
								
								scripts/minutes_to_retries.rb
									
									
									
									
									
										Executable file
									
								
							@ -0,0 +1,20 @@
 | 
			
		||||
#!/usr/bin/env ruby -w
 | 
			
		||||
 | 
			
		||||
seconds_to_reach = 10 * 60
 | 
			
		||||
default_interval = 2
 | 
			
		||||
retry_max_interval = 128
 | 
			
		||||
 | 
			
		||||
current_interval = 2
 | 
			
		||||
total_interval = 0
 | 
			
		||||
exceptions_count = 1
 | 
			
		||||
 | 
			
		||||
loop do
 | 
			
		||||
  break if total_interval > seconds_to_reach  
 | 
			
		||||
  exceptions_count += 1
 | 
			
		||||
  
 | 
			
		||||
  current_interval = current_interval*2 > retry_max_interval ? retry_max_interval : current_interval*2
 | 
			
		||||
 | 
			
		||||
  total_interval += current_interval
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
puts exceptions_count
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user