Backport functionality from v5 branch.

This commit is contained in:
Karl Southern
2016-05-29 13:40:47 +01:00
parent 927e532b2a
commit 7b337a8b91
5 changed files with 63 additions and 9 deletions

View File

@@ -6,10 +6,33 @@ require "java"
require "logstash-output-jdbc_jars"
require "logstash-output-jdbc_ring-buffer"
# Write events to a SQL engine, using JDBC.
#
# It is upto the user of the plugin to correctly configure the plugin. This
# includes correctly crafting the SQL statement, and matching the number of
# parameters correctly.
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Adds buffer support
include Stud::Buffer
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
RETRYABLE_SQLSTATE_CLASSES = [
# Classes of retryable SQLSTATE codes
# Not all in the class will be retryable. However, this is the best that
# we've got right now.
# If a custom state code is required, set it in retry_sql_states.
'08', # Connection Exception
'24', # Invalid Cursor State (Maybe retry-able in some circumstances)
'25', # Invalid Transaction State
'40', # Transaction Rollback
'53', # Insufficient Resources
'54', # Program Limit Exceeded (MAYBE)
'55', # Object Not In Prerequisite State
'57', # Operator Intervention
'58', # System Error
].freeze
config_name "jdbc"
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
@@ -196,8 +219,15 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
def safe_flush(events, teardown=false)
connection = nil
statement = nil
begin
connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e)
raise
end
begin
statement = connection.prepareStatement(@statement[0])
events.each do |event|
@@ -213,6 +243,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
@exceptions_tracker << nil
rescue => e
log_jdbc_exception(e)
if retry_exception?(e)
raise
end
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
@@ -224,7 +257,12 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
statement = nil
begin
connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e)
raise
end
begin
events.each do |event|
next if event.cancelled?
@@ -240,6 +278,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end
rescue => e
log_jdbc_exception(e)
if retry_exception?(e)
raise
end
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
@@ -250,11 +291,17 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].iso8601())
# See LogStash::Timestamp, below, for the why behind strftime.
statement.setString(idx + 1, event[i].strftime(STRFTIME_FMT))
when LogStash::Timestamp
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].to_iso8601())
# XXX: Using setString as opposed to setTimestamp, because setTimestamp
# doesn't behave correctly in some drivers (Known: sqlite)
#
# Additionally this does not use `to_iso8601`, since some SQL databases
# choke on the 'T' in the string (Known: Derby).
#
# strftime appears to be the most reliable across drivers.
statement.setString(idx + 1, event[i].time.strftime(STRFTIME_FMT))
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
@@ -285,4 +332,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
break if current_exception == nil
end
end
def retry_exception?(exception)
return (exception.class != java.sql.SQLException or
RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState[0,2]))
end
end # class LogStash::Outputs::jdbc