Trying out some new stuff.
This commit is contained in:
		
							parent
							
								
									d6869f594c
								
							
						
					
					
						commit
						a26b6106d4
					
				@ -53,6 +53,7 @@ For development:
 | 
				
			|||||||
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 |
 | 
					| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 |
 | 
				
			||||||
| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
 | 
					| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
 | 
				
			||||||
| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
 | 
					| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
 | 
				
			||||||
 | 
					| retry_sql_states | Array of strings | An array of custom SQL state codes you wish to retry until `max_flush_exceptions`. Useful if you're using a JDBC driver which returns retry-able, but non-standard SQL state codes in it's exceptions. | No | [] |
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## Example configurations
 | 
					## Example configurations
 | 
				
			||||||
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
 | 
					Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
 | 
				
			||||||
 | 
				
			|||||||
@ -18,7 +18,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
    # Classes of retryable SQLSTATE codes
 | 
					    # Classes of retryable SQLSTATE codes
 | 
				
			||||||
    # Not all in the class will be retryable. However, this is the best that 
 | 
					    # Not all in the class will be retryable. However, this is the best that 
 | 
				
			||||||
    # we've got right now.
 | 
					    # we've got right now.
 | 
				
			||||||
    # If something is missing, or database specific they'll have to do.
 | 
					    # If a custom state code is required, set it in retry_sql_states.
 | 
				
			||||||
    '08', # Connection Exception
 | 
					    '08', # Connection Exception
 | 
				
			||||||
    '24', # Invalid Cursor State (Maybe retry-able in some circumstances)
 | 
					    '24', # Invalid Cursor State (Maybe retry-able in some circumstances)
 | 
				
			||||||
    '25', # Invalid Transaction State 
 | 
					    '25', # Invalid Transaction State 
 | 
				
			||||||
@ -77,6 +77,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
  # Maximum time between retries, in seconds
 | 
					  # Maximum time between retries, in seconds
 | 
				
			||||||
  config :retry_max_interval, validate: :number, default: 128
 | 
					  config :retry_max_interval, validate: :number, default: 128
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  # Any additional custom, retryable SQL state codes. 
 | 
				
			||||||
 | 
					  # Suitable for configuring retryable custom JDBC SQL state codes.
 | 
				
			||||||
 | 
					  config :retry_sql_states, validate: :array, default: []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Maximum number of sequential failed attempts, before we stop retrying.
 | 
					  # Maximum number of sequential failed attempts, before we stop retrying.
 | 
				
			||||||
  # If set to < 1, then it will infinitely retry.
 | 
					  # If set to < 1, then it will infinitely retry.
 | 
				
			||||||
  # At the default values this is a little over 10 minutes
 | 
					  # At the default values this is a little over 10 minutes
 | 
				
			||||||
@ -113,13 +117,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def teardown
 | 
					 | 
				
			||||||
    @pool.close
 | 
					 | 
				
			||||||
    super
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def close
 | 
					  def close
 | 
				
			||||||
    @stopping.make_true
 | 
					    @stopping.make_true
 | 
				
			||||||
 | 
					    @pool.close
 | 
				
			||||||
 | 
					    super
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private
 | 
					  private
 | 
				
			||||||
@ -185,7 +186,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
      connection = @pool.getConnection
 | 
					      connection = @pool.getConnection
 | 
				
			||||||
    rescue => e
 | 
					    rescue => e
 | 
				
			||||||
      log_jdbc_exception(e)
 | 
					      log_jdbc_exception(e)
 | 
				
			||||||
      return events
 | 
					      # If a connection is not available, then the server has gone away
 | 
				
			||||||
 | 
					      # We're not counting that towards our retry count.
 | 
				
			||||||
 | 
					      return events, false
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    events.each do |event|
 | 
					    events.each do |event|
 | 
				
			||||||
@ -196,11 +199,11 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
        statement = add_statement_event_params(statement, event) if @statement.length > 1
 | 
					        statement = add_statement_event_params(statement, event) if @statement.length > 1
 | 
				
			||||||
        statement.execute
 | 
					        statement.execute
 | 
				
			||||||
      rescue => e
 | 
					      rescue => e
 | 
				
			||||||
        if e.class == java.sql.SQLException and !RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState.to_s[0,2])
 | 
					        if retry_exception?(e)
 | 
				
			||||||
          @logger.error('JDBC - Non-retryable SQL exception. Dropping event. If you think this is in error please log an issue with the details from this exception.', exception: e, state_code: e.getSQLState, event: event)
 | 
					 | 
				
			||||||
        else
 | 
					 | 
				
			||||||
          log_jdbc_exception(e)
 | 
					          log_jdbc_exception(e)
 | 
				
			||||||
          events_to_retry.push(event)
 | 
					          events_to_retry.push(event)
 | 
				
			||||||
 | 
					        else
 | 
				
			||||||
 | 
					          @logger.error('JDBC - Non-retryable exception. Dropping event. If you think this is in error please log an issue with the details from this exception.', exception: e, event: event)
 | 
				
			||||||
        end
 | 
					        end
 | 
				
			||||||
      ensure
 | 
					      ensure
 | 
				
			||||||
        statement.close unless statement.nil?
 | 
					        statement.close unless statement.nil?
 | 
				
			||||||
@ -209,25 +212,26 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    connection.close unless connection.nil?
 | 
					    connection.close unless connection.nil?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    events_to_retry
 | 
					    return events_to_retry, true
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def retrying_submit(actions)
 | 
					  def retrying_submit(actions)
 | 
				
			||||||
    # Initially we submit the full list of actions
 | 
					    # Initially we submit the full list of actions
 | 
				
			||||||
    submit_actions = actions
 | 
					    submit_actions = actions
 | 
				
			||||||
 | 
					    count_as_attempt = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    attempts = 0
 | 
					    attempts = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    sleep_interval = @retry_initial_interval
 | 
					    sleep_interval = @retry_initial_interval
 | 
				
			||||||
    while submit_actions && !submit_actions.empty?
 | 
					    while @stopping.false? and (submit_actions and !submit_actions.empty?)
 | 
				
			||||||
      return if !submit_actions || submit_actions.empty? # If everything's a success we move along
 | 
					      return if !submit_actions || submit_actions.empty? # If everything's a success we move along
 | 
				
			||||||
      # We retry whatever didn't succeed
 | 
					      # We retry whatever didn't succeed
 | 
				
			||||||
      submit_actions = submit(submit_actions)
 | 
					      submit_actions, count_as_attempt = submit(submit_actions)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      # Everything was a success!
 | 
					      # Everything was a success!
 | 
				
			||||||
      break if !submit_actions || submit_actions.empty?
 | 
					      break if !submit_actions || submit_actions.empty?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      if @max_flush_exceptions > 0
 | 
					      if @max_flush_exceptions > 0 and count_as_attempt == true
 | 
				
			||||||
        attempts += 1
 | 
					        attempts += 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if attempts > @max_flush_exceptions
 | 
					        if attempts > @max_flush_exceptions
 | 
				
			||||||
@ -278,6 +282,13 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
    statement
 | 
					    statement
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def retry_exception?(exception)
 | 
				
			||||||
 | 
					    return (exception.class != java.sql.SQLException or (
 | 
				
			||||||
 | 
					      RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState[0,2]) or 
 | 
				
			||||||
 | 
					      @retry_sql_states.include?(e.getSQLState)
 | 
				
			||||||
 | 
					    ))
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def log_jdbc_exception(exception)
 | 
					  def log_jdbc_exception(exception)
 | 
				
			||||||
    current_exception = exception
 | 
					    current_exception = exception
 | 
				
			||||||
    loop do
 | 
					    loop do
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user