From a6c669cc522fd2cf0abbd1c279a066c775af1ad5 Mon Sep 17 00:00:00 2001 From: Karl Southern Date: Sun, 15 Nov 2015 12:35:57 +0000 Subject: [PATCH] Adds unsafe_statement support --- lib/logstash/outputs/jdbc.rb | 123 ++++++++++++++++++++++------------- logstash-output-jdbc.gemspec | 2 +- 2 files changed, 79 insertions(+), 46 deletions(-) diff --git a/lib/logstash/outputs/jdbc.rb b/lib/logstash/outputs/jdbc.rb index 3002478..343db59 100644 --- a/lib/logstash/outputs/jdbc.rb +++ b/lib/logstash/outputs/jdbc.rb @@ -30,6 +30,12 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base # [ "insert into table (message) values(?)", "%{message}" ] config :statement, :validate => :array, :required => true + # If this is an unsafe statement, use event.sprintf + # This also has potential performance penalties due to having to create a + # new statement for each event, rather than adding to the batch and issuing + # multiple inserts in 1 go + config :unsafe_statement, :validate => :boolean, :default => false + # Number of connections in the pool to maintain config :max_pool_size, :validate => :number, :default => 5 @@ -72,7 +78,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base @pool = Java::ComZaxxerHikari::HikariDataSource.new @pool.setJdbcUrl(@connection_string) - + @pool.setUsername(@username) if @username @pool.setPassword(@password) if @password @@ -105,50 +111,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base end def flush(events, teardown=false) - connection = @pool.getConnection() - - statement = connection.prepareStatement(@statement[0]) - - events.each do |event| - next if @statement.length < 2 - - @statement[1..-1].each_with_index do |i, idx| - case event[i] - when Time, LogStash::Timestamp - # Most reliable solution, cross JDBC driver - statement.setString(idx + 1, event[i].iso8601()) - when Fixnum, Integer - statement.setInt(idx + 1, event[i]) - when Float - statement.setFloat(idx + 1, event[i]) - when String - statement.setString(idx + 1, event[i]) - when true - statement.setBoolean(idx + 1, true) - when false - statement.setBoolean(idx + 1, false) - else - statement.setString(idx + 1, event.sprintf(i)) - end - end - - statement.addBatch() - end - - begin - @logger.debug("JDBC - Sending SQL", :sql => statement.toString()) - statement.executeBatch() - statement.close() - rescue => e - # Raising an exception will incur a retry from Stud::Buffer. - # Since the exceutebatch failed this should mean any events failed to be - # inserted will be re-run. We're going to log it for the lols anyway. - @logger.warn("JDBC - Exception. Will automatically retry", :exception => e) - if e.getNextException() != nil - @logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException()) - end - ensure - connection.close(); + if @unsafe_statement == true + unsafe_flush(events, teardown) + else + safe_flush(events, teardown) end end @@ -204,4 +170,71 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base require jar end end + + def safe_flush(events, teardown=false) + connection = @pool.getConnection() + + statement = connection.prepareStatement(@statement[0]) + + events.each do |event| + next if @statement.length < 2 + statement = add_statement_event_params(statement, event) + + statement.addBatch() + end + + begin + @logger.debug("JDBC - Sending SQL", :sql => statement.toString()) + statement.executeBatch() + statement.close() + rescue => e + # Raising an exception will incur a retry from Stud::Buffer. + # Since the exceutebatch failed this should mean any events failed to be + # inserted will be re-run. We're going to log it for the lols anyway. + @logger.warn("JDBC - Exception. Will automatically retry", :exception => e) + if e.getNextException() != nil + @logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException()) + end + ensure + connection.close(); + end + end + + def unsafe_flush(events, teardown=false) + connection = @pool.getConnection() + + events.each do |event| + statement = connection.prepareStatement(event.sprintf(@statement[0])) + + statement = add_statement_event_params(statement, event) if @statement.length > 1 + + statement.execute() + statement.close() + connection.close() + end + end + + def add_statement_event_params(statement, event) + @statement[1..-1].each_with_index do |i, idx| + case event[i] + when Time, LogStash::Timestamp + # Most reliable solution, cross JDBC driver + statement.setString(idx + 1, event[i].iso8601()) + when Fixnum, Integer + statement.setInt(idx + 1, event[i]) + when Float + statement.setFloat(idx + 1, event[i]) + when String + statement.setString(idx + 1, event[i]) + when true + statement.setBoolean(idx + 1, true) + when false + statement.setBoolean(idx + 1, false) + else + statement.setString(idx + 1, event.sprintf(i)) + end + end + + statement + end end # class LogStash::Outputs::jdbc diff --git a/logstash-output-jdbc.gemspec b/logstash-output-jdbc.gemspec index 4537dc9..381764b 100644 --- a/logstash-output-jdbc.gemspec +++ b/logstash-output-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-jdbc' - s.version = "0.2.0.rc1" + s.version = "0.2.0.rc2" s.licenses = [ "Apache License (2.0)" ] s.summary = "This plugin allows you to output to SQL, via JDBC" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"