diff --git a/README.md b/README.md index 6e02e95..5631dca 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ For development: | max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 | | retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 | | retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 | +| retry_sql_states | Array of strings | An array of custom SQL state codes you wish to retry until `max_flush_exceptions`. Useful if you're using a JDBC driver which returns retry-able, but non-standard SQL state codes in it's exceptions. | No | [] | ## Example configurations Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar. diff --git a/lib/logstash/outputs/jdbc.rb b/lib/logstash/outputs/jdbc.rb index 743efbf..7c682fb 100644 --- a/lib/logstash/outputs/jdbc.rb +++ b/lib/logstash/outputs/jdbc.rb @@ -18,7 +18,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base # Classes of retryable SQLSTATE codes # Not all in the class will be retryable. However, this is the best that # we've got right now. - # If something is missing, or database specific they'll have to do. + # If a custom state code is required, set it in retry_sql_states. '08', # Connection Exception '24', # Invalid Cursor State (Maybe retry-able in some circumstances) '25', # Invalid Transaction State @@ -77,6 +77,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base # Maximum time between retries, in seconds config :retry_max_interval, validate: :number, default: 128 + # Any additional custom, retryable SQL state codes. + # Suitable for configuring retryable custom JDBC SQL state codes. + config :retry_sql_states, validate: :array, default: [] + # Maximum number of sequential failed attempts, before we stop retrying. # If set to < 1, then it will infinitely retry. # At the default values this is a little over 10 minutes @@ -113,13 +117,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base end end - def teardown - @pool.close - super - end - def close @stopping.make_true + @pool.close + super end private @@ -185,7 +186,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base connection = @pool.getConnection rescue => e log_jdbc_exception(e) - return events + # If a connection is not available, then the server has gone away + # We're not counting that towards our retry count. + return events, false end events.each do |event| @@ -196,11 +199,11 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base statement = add_statement_event_params(statement, event) if @statement.length > 1 statement.execute rescue => e - if e.class == java.sql.SQLException and !RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState.to_s[0,2]) - @logger.error('JDBC - Non-retryable SQL exception. Dropping event. If you think this is in error please log an issue with the details from this exception.', exception: e, state_code: e.getSQLState, event: event) - else + if retry_exception?(e) log_jdbc_exception(e) events_to_retry.push(event) + else + @logger.error('JDBC - Non-retryable exception. Dropping event. If you think this is in error please log an issue with the details from this exception.', exception: e, event: event) end ensure statement.close unless statement.nil? @@ -209,25 +212,26 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base connection.close unless connection.nil? - events_to_retry + return events_to_retry, true end def retrying_submit(actions) # Initially we submit the full list of actions submit_actions = actions + count_as_attempt = true attempts = 0 sleep_interval = @retry_initial_interval - while submit_actions && !submit_actions.empty? + while @stopping.false? and (submit_actions and !submit_actions.empty?) return if !submit_actions || submit_actions.empty? # If everything's a success we move along # We retry whatever didn't succeed - submit_actions = submit(submit_actions) + submit_actions, count_as_attempt = submit(submit_actions) # Everything was a success! break if !submit_actions || submit_actions.empty? - if @max_flush_exceptions > 0 + if @max_flush_exceptions > 0 and count_as_attempt == true attempts += 1 if attempts > @max_flush_exceptions @@ -278,6 +282,13 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base statement end + def retry_exception?(exception) + return (exception.class != java.sql.SQLException or ( + RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState[0,2]) or + @retry_sql_states.include?(e.getSQLState) + )) + end + def log_jdbc_exception(exception) current_exception = exception loop do