Adds unsafe_statement support
This commit is contained in:
parent
e615829310
commit
a6c669cc52
|
@ -30,6 +30,12 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
||||||
# [ "insert into table (message) values(?)", "%{message}" ]
|
# [ "insert into table (message) values(?)", "%{message}" ]
|
||||||
config :statement, :validate => :array, :required => true
|
config :statement, :validate => :array, :required => true
|
||||||
|
|
||||||
|
# If this is an unsafe statement, use event.sprintf
|
||||||
|
# This also has potential performance penalties due to having to create a
|
||||||
|
# new statement for each event, rather than adding to the batch and issuing
|
||||||
|
# multiple inserts in 1 go
|
||||||
|
config :unsafe_statement, :validate => :boolean, :default => false
|
||||||
|
|
||||||
# Number of connections in the pool to maintain
|
# Number of connections in the pool to maintain
|
||||||
config :max_pool_size, :validate => :number, :default => 5
|
config :max_pool_size, :validate => :number, :default => 5
|
||||||
|
|
||||||
|
@ -105,50 +111,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
||||||
end
|
end
|
||||||
|
|
||||||
def flush(events, teardown=false)
|
def flush(events, teardown=false)
|
||||||
connection = @pool.getConnection()
|
if @unsafe_statement == true
|
||||||
|
unsafe_flush(events, teardown)
|
||||||
statement = connection.prepareStatement(@statement[0])
|
|
||||||
|
|
||||||
events.each do |event|
|
|
||||||
next if @statement.length < 2
|
|
||||||
|
|
||||||
@statement[1..-1].each_with_index do |i, idx|
|
|
||||||
case event[i]
|
|
||||||
when Time, LogStash::Timestamp
|
|
||||||
# Most reliable solution, cross JDBC driver
|
|
||||||
statement.setString(idx + 1, event[i].iso8601())
|
|
||||||
when Fixnum, Integer
|
|
||||||
statement.setInt(idx + 1, event[i])
|
|
||||||
when Float
|
|
||||||
statement.setFloat(idx + 1, event[i])
|
|
||||||
when String
|
|
||||||
statement.setString(idx + 1, event[i])
|
|
||||||
when true
|
|
||||||
statement.setBoolean(idx + 1, true)
|
|
||||||
when false
|
|
||||||
statement.setBoolean(idx + 1, false)
|
|
||||||
else
|
else
|
||||||
statement.setString(idx + 1, event.sprintf(i))
|
safe_flush(events, teardown)
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
statement.addBatch()
|
|
||||||
end
|
|
||||||
|
|
||||||
begin
|
|
||||||
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
|
|
||||||
statement.executeBatch()
|
|
||||||
statement.close()
|
|
||||||
rescue => e
|
|
||||||
# Raising an exception will incur a retry from Stud::Buffer.
|
|
||||||
# Since the exceutebatch failed this should mean any events failed to be
|
|
||||||
# inserted will be re-run. We're going to log it for the lols anyway.
|
|
||||||
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
|
|
||||||
if e.getNextException() != nil
|
|
||||||
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
|
|
||||||
end
|
|
||||||
ensure
|
|
||||||
connection.close();
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -204,4 +170,71 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
||||||
require jar
|
require jar
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def safe_flush(events, teardown=false)
|
||||||
|
connection = @pool.getConnection()
|
||||||
|
|
||||||
|
statement = connection.prepareStatement(@statement[0])
|
||||||
|
|
||||||
|
events.each do |event|
|
||||||
|
next if @statement.length < 2
|
||||||
|
statement = add_statement_event_params(statement, event)
|
||||||
|
|
||||||
|
statement.addBatch()
|
||||||
|
end
|
||||||
|
|
||||||
|
begin
|
||||||
|
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
|
||||||
|
statement.executeBatch()
|
||||||
|
statement.close()
|
||||||
|
rescue => e
|
||||||
|
# Raising an exception will incur a retry from Stud::Buffer.
|
||||||
|
# Since the exceutebatch failed this should mean any events failed to be
|
||||||
|
# inserted will be re-run. We're going to log it for the lols anyway.
|
||||||
|
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
|
||||||
|
if e.getNextException() != nil
|
||||||
|
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
connection.close();
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def unsafe_flush(events, teardown=false)
|
||||||
|
connection = @pool.getConnection()
|
||||||
|
|
||||||
|
events.each do |event|
|
||||||
|
statement = connection.prepareStatement(event.sprintf(@statement[0]))
|
||||||
|
|
||||||
|
statement = add_statement_event_params(statement, event) if @statement.length > 1
|
||||||
|
|
||||||
|
statement.execute()
|
||||||
|
statement.close()
|
||||||
|
connection.close()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_statement_event_params(statement, event)
|
||||||
|
@statement[1..-1].each_with_index do |i, idx|
|
||||||
|
case event[i]
|
||||||
|
when Time, LogStash::Timestamp
|
||||||
|
# Most reliable solution, cross JDBC driver
|
||||||
|
statement.setString(idx + 1, event[i].iso8601())
|
||||||
|
when Fixnum, Integer
|
||||||
|
statement.setInt(idx + 1, event[i])
|
||||||
|
when Float
|
||||||
|
statement.setFloat(idx + 1, event[i])
|
||||||
|
when String
|
||||||
|
statement.setString(idx + 1, event[i])
|
||||||
|
when true
|
||||||
|
statement.setBoolean(idx + 1, true)
|
||||||
|
when false
|
||||||
|
statement.setBoolean(idx + 1, false)
|
||||||
|
else
|
||||||
|
statement.setString(idx + 1, event.sprintf(i))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
statement
|
||||||
|
end
|
||||||
end # class LogStash::Outputs::jdbc
|
end # class LogStash::Outputs::jdbc
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
Gem::Specification.new do |s|
|
Gem::Specification.new do |s|
|
||||||
s.name = 'logstash-output-jdbc'
|
s.name = 'logstash-output-jdbc'
|
||||||
s.version = "0.2.0.rc1"
|
s.version = "0.2.0.rc2"
|
||||||
s.licenses = [ "Apache License (2.0)" ]
|
s.licenses = [ "Apache License (2.0)" ]
|
||||||
s.summary = "This plugin allows you to output to SQL, via JDBC"
|
s.summary = "This plugin allows you to output to SQL, via JDBC"
|
||||||
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
|
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
|
||||||
|
|
Loading…
Reference in New Issue
Block a user