This commit is contained in:
Karl Southern 2016-04-16 14:33:30 +01:00
parent 909cae01b3
commit 4e0292d222
3 changed files with 33 additions and 33 deletions

View File

@ -98,7 +98,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end end
setup_and_test_pool! setup_and_test_pool!
buffer_initialize( buffer_initialize(
:max_items => @flush_size, :max_items => @flush_size,
:max_interval => @idle_flush_time, :max_interval => @idle_flush_time,
@ -194,56 +194,55 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end end
def safe_flush(events, teardown=false) def safe_flush(events, teardown=false)
connection = @pool.getConnection() connection = nil
statement = connection.prepareStatement(@statement[0]) statement = nil
events.each do |event|
next if event.cancelled?
next if @statement.length < 2
statement = add_statement_event_params(statement, event)
statement.addBatch()
end
begin begin
connection = @pool.getConnection()
statement = connection.prepareStatement(@statement[0])
events.each do |event|
next if event.cancelled?
next if @statement.length < 2
statement = add_statement_event_params(statement, event)
statement.addBatch()
end
statement.executeBatch() statement.executeBatch()
statement.close() statement.close()
@exceptions_tracker << nil @exceptions_tracker << nil
rescue => e rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# Since the exceutebatch failed this should mean any events failed to be
# inserted will be re-run. We're going to log it for the lols anyway.
log_jdbc_exception(e) log_jdbc_exception(e)
ensure ensure
connection.close(); statement.close() unless statement.nil?
connection.close() unless connection.nil?
end end
end end
def unsafe_flush(events, teardown=false) def unsafe_flush(events, teardown=false)
connection = @pool.getConnection() connection = nil
statement = nil
begin
connection = @pool.getConnection()
events.each do |event| events.each do |event|
next if event.cancelled? next if event.cancelled?
statement = connection.prepareStatement(event.sprintf(@statement[0])) statement = connection.prepareStatement(event.sprintf(@statement[0]))
statement = add_statement_event_params(statement, event) if @statement.length > 1 statement = add_statement_event_params(statement, event) if @statement.length > 1
begin
statement.execute() statement.execute()
# cancel the event, since we may end up outputting the same event multiple times # cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line # if an exception happens later down the line
event.cancel event.cancel
@exceptions_tracker << nil @exceptions_tracker << nil
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# We log for the lols.
log_jdbc_exception(e)
ensure
statement.close()
connection.close()
end end
rescue => e
log_jdbc_exception(e)
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
end end
end end

View File

@ -1,6 +1,6 @@
Gem::Specification.new do |s| Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc' s.name = 'logstash-output-jdbc'
s.version = "0.2.5" s.version = "0.2.6-rc1"
s.licenses = [ "Apache License (2.0)" ] s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC" s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"

View File

@ -88,6 +88,7 @@ describe LogStash::Outputs::Jdbc do
end end
stmt.close() stmt.close()
c.close() c.close()
expect(count).to be > 0 expect(count).to be > 0
end end