diff --git a/Vagrantfile b/Vagrantfile index 1cde748..885f4e3 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -6,7 +6,11 @@ Vagrant.configure(2) do |config| config.vm.synced_folder '.', '/vagrant', type: :virtualbox config.vm.provision 'shell', inline: <<-EOP - apt-get install git -y -q + echo "deb http://ftp.debian.org/debian jessie-backports main" | tee --append /etc/apt/sources.list > /dev/null + sed -i 's/main/main contrib non-free/g' /etc/apt/sources.list + apt-get update + apt-get remove openjdk-7-jre-headless -y -q + apt-get install git openjdk-8-jre -y -q gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 curl -sSL https://get.rvm.io | bash -s stable --ruby=jruby-1.7 usermod -a -G rvm vagrant diff --git a/lib/logstash/outputs/jdbc.rb b/lib/logstash/outputs/jdbc.rb index 3ec278c..724163c 100644 --- a/lib/logstash/outputs/jdbc.rb +++ b/lib/logstash/outputs/jdbc.rb @@ -217,7 +217,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base submit_actions = actions count_as_attempt = true - attempts = 0 + attempts = 1 sleep_interval = @retry_initial_interval while @stopping.false? and (submit_actions and !submit_actions.empty?) @@ -280,7 +280,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base end def retry_exception?(exception) - retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState[0,2]) or @retry_sql_states.include?(exception.getSQLState))) + retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState))) log_jdbc_exception(exception, retrying) retrying @@ -290,9 +290,9 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base current_exception = exception loop do if retrying - @logger.warn('JDBC Exception. Retrying.', exception: current_exception) + @logger.warn('JDBC - Exception. Retrying.', exception: current_exception) else - @logger.warn('JDBC Exception. Not retrying. Dropping event.', exception: current_exception) + @logger.error('JDBC - Exception. Not retrying. Dropping event.', exception: current_exception) end current_exception = current_exception.getNextException break if current_exception.nil? diff --git a/spec/jdbc_spec_helper.rb b/spec/jdbc_spec_helper.rb index f4200ae..753b425 100644 --- a/spec/jdbc_spec_helper.rb +++ b/spec/jdbc_spec_helper.rb @@ -20,16 +20,22 @@ RSpec.shared_context 'when initializing' do end RSpec.shared_context 'when outputting messages' do + let(:logger) { double("logger") } + let(:jdbc_test_table) do 'logstash_output_jdbc_test' end let(:jdbc_drop_table) do - "DROP TABLE IF EXISTS #{jdbc_test_table}" + "DROP TABLE #{jdbc_test_table}" end let(:jdbc_create_table) do - "CREATE table #{jdbc_test_table} (created_at datetime, message varchar(512))" + "CREATE table #{jdbc_test_table} (created_at datetime not null, message varchar(512) not null)" + end + + let(:systemd_database_service) do + nil end let(:event_fields) do @@ -42,21 +48,26 @@ RSpec.shared_context 'when outputting messages' do # Setup plugin output = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings) output.register - output.logger.subscribe(STDOUT) if ENV['JDBC_DEBUG'] == '1' + output.logger = logger # Setup table c = output.instance_variable_get(:@pool).getConnection - unless jdbc_drop_table.nil? + # Derby doesn't support IF EXISTS. + # Seems like the quickest solution. Bleurgh. + begin stmt = c.createStatement stmt.executeUpdate(jdbc_drop_table) + rescue + # noop + ensure stmt.close - end - stmt = c.createStatement - stmt.executeUpdate(jdbc_create_table) - stmt.close - c.close + stmt = c.createStatement + stmt.executeUpdate(jdbc_create_table) + stmt.close + c.close + end output end @@ -76,4 +87,39 @@ RSpec.shared_context 'when outputting messages' do expect(count).to eq(1) end + + it 'should not save event, and log an unretryable exception' do + e = event + original_event = e.get('message') + e.set('message', nil) + + expect(logger).to receive(:error).once.with(/JDBC - Exception. Not retrying/, Hash) + expect { plugin.multi_receive([event]) }.to_not raise_error + + e.set('message', original_event) + end + + it 'it should retry after a connection loss, and log a warning' do + skip "does not run as a service" if systemd_database_service.nil? + + p = plugin + + # Check that everything is fine right now + expect { p.multi_receive([event]) }.not_to raise_error + + # Start a thread to stop and restart the service. + t = Thread.new(systemd_database_service) { |systemd_database_service| + `sudo systemctl stop #{systemd_database_service}` + sleep 10 + `sudo systemctl start #{systemd_database_service}` + } + + # Wait a few seconds to the service to stop + sleep 5 + + expect(logger).to receive(:warn).at_least(:once).with(/JDBC - Exception. Retrying/, Hash) + expect { p.multi_receive([event]) }.to_not raise_error + + t.join + end end diff --git a/spec/outputs/jdbc_derby_spec.rb b/spec/outputs/jdbc_derby_spec.rb index 652ed82..039b5ab 100644 --- a/spec/outputs/jdbc_derby_spec.rb +++ b/spec/outputs/jdbc_derby_spec.rb @@ -9,20 +9,17 @@ describe 'logstash-output-jdbc: derby', if: ENV['JDBC_DERBY_JAR'] do 'JDBC_DERBY_JAR' end - let(:jdbc_drop_table) do - nil - end - let(:jdbc_create_table) do - "CREATE table #{jdbc_test_table} (created_at timestamp, message varchar(512))" + "CREATE table #{jdbc_test_table} (created_at timestamp not null, message varchar(512) not null)" end - + let(:jdbc_settings) do { 'driver_class' => 'org.apache.derby.jdbc.EmbeddedDriver', 'connection_string' => 'jdbc:derby:memory:testdb;create=true', 'driver_jar_path' => ENV[jdbc_jar_env], - 'statement' => ['insert into logstash_output_jdbc_test (created_at, message) values(?, ?)', '@timestamp', 'message'] + 'statement' => ['insert into logstash_output_jdbc_test (created_at, message) values(?, ?)', '@timestamp', 'message'], + 'max_flush_exceptions' => 1 } end end diff --git a/spec/outputs/jdbc_mysql_spec.rb b/spec/outputs/jdbc_mysql_spec.rb index fd4236c..70aef24 100644 --- a/spec/outputs/jdbc_mysql_spec.rb +++ b/spec/outputs/jdbc_mysql_spec.rb @@ -9,12 +9,17 @@ describe 'logstash-output-jdbc: mysql', if: ENV['JDBC_MYSQL_JAR'] do 'JDBC_MYSQL_JAR' end + let(:systemd_database_service) do + 'mysql' + end + let(:jdbc_settings) do { 'driver_class' => 'com.mysql.jdbc.Driver', 'connection_string' => 'jdbc:mysql://localhost/logstash_output_jdbc_test?user=root', 'driver_jar_path' => ENV[jdbc_jar_env], - 'statement' => ["insert into #{jdbc_test_table} (created_at, message) values(?, ?)", '@timestamp', 'message'] + 'statement' => ["insert into #{jdbc_test_table} (created_at, message) values(?, ?)", '@timestamp', 'message'], + 'max_flush_exceptions' => 1 } end end diff --git a/spec/outputs/jdbc_sqlite_spec.rb b/spec/outputs/jdbc_sqlite_spec.rb index aa77d40..8b8aa55 100644 --- a/spec/outputs/jdbc_sqlite_spec.rb +++ b/spec/outputs/jdbc_sqlite_spec.rb @@ -20,7 +20,8 @@ describe 'logstash-output-jdbc: sqlite', if: ENV['JDBC_SQLITE_JAR'] do 'driver_class' => 'org.sqlite.JDBC', 'connection_string' => "jdbc:sqlite:#{JDBC_SQLITE_FILE}", 'driver_jar_path' => ENV[jdbc_jar_env], - 'statement' => ["insert into #{jdbc_test_table} (created_at, message) values(?, ?)", '@timestamp', 'message'] + 'statement' => ["insert into #{jdbc_test_table} (created_at, message) values(?, ?)", '@timestamp', 'message'], + 'max_flush_exceptions' => 1 } end end