13 Commits

Author SHA1 Message Date
Karl Southern
f52c14b79a Fix travis 2016-08-28 21:59:35 +01:00
Karl Southern
f46fd58048 connection_test supression support for issue #53 2016-08-28 21:48:06 +01:00
Karl Southern
3dc7627782 v0.3.0 2016-07-24 12:14:31 +01:00
Karl Southern
9235c48c88 Fix travis for v2.x 2016-07-13 17:46:42 +01:00
Karl Southern
53e665bbb6 0.3.0 uses jar-dependencies 2016-07-13 17:41:32 +01:00
Karl Southern
fa2d226fbf 0.3.0.pre - Preparing for threadsafety 2016-07-13 17:40:35 +01:00
Karl Southern
da5a3d8be3 0.2.10 2016-07-07 11:03:14 +01:00
Karl Southern
b10462dacd Preparing for 0.2.10 2016-07-07 10:09:31 +01:00
Karl Southern
61c7a1307e Provisionally address issue 46 2016-07-07 08:50:58 +01:00
Karl Southern
b5419813ba 0.2.9 2016-06-29 13:42:09 +01:00
Karl Southern
ded1106b13 Address issue 44. 2016-06-28 22:38:36 +01:00
Karl Southern
2b27f39088 0.2.7 2016-05-29 13:45:26 +01:00
Karl Southern
7b337a8b91 Backport functionality from v5 branch. 2016-05-29 13:40:47 +01:00
16 changed files with 59 additions and 221 deletions

View File

@@ -1,26 +0,0 @@
<!--
Trouble installing the plugin under Logstash 2.4.0 with the message "duplicate gems"? See https://github.com/elastic/logstash/issues/5852
Please remember:
- I have not used every database engine in the world
- I have not got access to every database engine in the world
- Any support I provide is done in my own personal time which is limited
- Understand that I won't always have the answer immediately
Please provide as much information as possible.
-->
<!--- Provide a general summary of the issue in the Title above -->
## Expected & Actual Behavior
<!--- If you're describing a bug, tell us what should happen, and what is actually happening, and if necessary how to reproduce it -->
<!--- If you're suggesting a change/improvement, tell us how it should work -->
## Your Environment
<!--- Include as many relevant details about the environment you experienced the bug in -->
* Version of plugin used:
* Version of Logstash used:
* Database engine & version you're connecting to:
* Have you checked you've met the Logstash requirements for Java versions?:

View File

@@ -1,25 +0,0 @@
# I don't care for underscores in numbers.
Style/NumericLiterals:
Enabled: false
Style/ClassAndModuleChildren:
Enabled: false
Metrics/AbcSize:
Enabled: false
Metrics/CyclomaticComplexity:
Max: 9
Metrics/PerceivedComplexity:
Max: 10
Metrics/LineLength:
Enabled: false
Metrics/MethodLength:
Max: 50
Style/FileName:
Exclude:
- 'lib/logstash-output-jdbc_jars.rb'

View File

@@ -2,9 +2,7 @@ sudo: required
language: ruby
cache: bundler
rvm:
- jruby-1.7.25
jdk:
- oraclejdk8
- jruby
before_script:
- bundle exec rake vendor
- bundle exec rake install_jars

View File

@@ -21,7 +21,7 @@ See CHANGELOG.md
Released versions are available via rubygems, and typically tagged.
For development:
- See master branch for logstash v5
- See master branch for logstash v5 (currently **development only**)
- See v2.x branch for logstash v2
- See v1.5 branch for logstash v1.5
- See v1.4 branch for logstash 1.4
@@ -47,8 +47,8 @@ For development:
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time. Default set to same as Logstash default number of workers | No | 24 |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 |

18
Vagrantfile vendored
View File

@@ -1,18 +0,0 @@
# -*- mode: ruby -*-
# vi: set ft=ruby :
Vagrant.configure(2) do |config|
config.vm.box = 'debian/jessie64'
config.vm.synced_folder '.', '/vagrant', type: :virtualbox
config.vm.provision 'shell', inline: <<-EOP
echo "deb http://ftp.debian.org/debian jessie-backports main" | tee --append /etc/apt/sources.list > /dev/null
sed -i 's/main/main contrib non-free/g' /etc/apt/sources.list
apt-get update
apt-get remove openjdk-7-jre-headless -y -q
apt-get install git openjdk-8-jre curl -y -q
gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
curl -sSL https://get.rvm.io | bash -s stable --ruby=jruby-1.7
usermod -a -G rvm vagrant
EOP
end

View File

@@ -1,18 +0,0 @@
# Example: CockroachDB
- Tested using postgresql-9.4.1209.jre6.jar
- **Warning** cockroach is known to throw a warning on connection test (at time of writing), thus the connection test is explicitly disabled.
```
input
{
stdin { }
}
output {
jdbc {
driver_jar_path => '/opt/postgresql-9.4.1209.jre6.jar'
connection_test => false
connection_string => 'jdbc:postgresql://127.0.0.1:26257/test?user=root'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

View File

@@ -9,9 +9,8 @@ input
}
output {
jdbc {
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST(? AS timestamp), ?)", "host", "@timestamp", "message" ]
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

View File

@@ -1,6 +1,5 @@
# Example: SQL Server
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
* Known to be working with Microsoft SQL Server Always-On Cluster (see https://github.com/theangryangel/logstash-output-jdbc/issues/37). With thanks to [@phr0gz](https://github.com/phr0gz)
```
input
{
@@ -8,7 +7,7 @@ input
}
output {
jdbc {
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password"
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}

View File

@@ -10,7 +10,6 @@ output {
stdout { }
jdbc {
driver_class => "org.sqlite.JDBC"
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}

View File

@@ -1,5 +1,5 @@
# encoding: utf-8
require 'logstash/environment'
root_dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
LogStash::Environment.load_runtime_jars! File.join(root_dir, 'vendor')
root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")

View File

@@ -12,7 +12,7 @@ require 'logstash-output-jdbc_jars'
# includes correctly crafting the SQL statement, and matching the number of
# parameters correctly.
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
concurrency :shared
declare_threadsafe! if self.respond_to?(:declare_threadsafe!)
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
@@ -98,6 +98,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
def register
@logger.info('JDBC - Starting up')
LogStash::Logger.setup_log4j(@logger)
load_jar_files!
@stopping = Concurrent::AtomicBoolean.new(false)
@@ -121,6 +122,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end
end
def receive(event)
retrying_submit([event])
end
def close
@stopping.make_true
@pool.close
@@ -151,7 +156,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Test connection
test_connection = @pool.getConnection
unless test_connection.isValid(validate_connection_timeout)
@logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
@logger.error('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
end
test_connection.close
end
@@ -172,13 +177,13 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar')
end
@logger.trace('JDBC - jarpath', path: jarpath)
@logger.debug('JDBC - jarpath', path: jarpath)
jars = Dir[jarpath]
raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty?
jars.each do |jar|
@logger.trace('JDBC - Loaded jar', jar: jar)
@logger.debug('JDBC - Loaded jar', jar: jar)
require jar
end
end
@@ -253,7 +258,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
def add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx|
if i.is_a? String
value = event.get(i)
value = event[i]
if value.nil? and i =~ /%\{/
value = event.sprintf(i)
end
@@ -275,11 +280,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# strftime appears to be the most reliable across drivers.
statement.setString(idx + 1, value.time.strftime(STRFTIME_FMT))
when Fixnum, Integer
if value > 2147483647 or value < -2147483648
statement.setLong(idx + 1, value)
else
statement.setInt(idx + 1, value)
end
statement.setInt(idx + 1, value)
when Float
statement.setFloat(idx + 1, value)
when String
@@ -307,7 +308,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
log_method = (retrying ? 'warn' : 'error')
loop do
@logger.send(log_method, log_text, :exception => current_exception)
@logger.send(log_method, log_text, :exception => current_exception, :backtrace => current_exception.backtrace)
if current_exception.respond_to? 'getNextException'
current_exception = current_exception.getNextException()

View File

@@ -1,17 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
<File name="file" fileName="log4j2.log">
<PatternLayout pattern="%d{yyyy-mm-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</File>
</Appenders>
<Loggers>
<!-- If we need to figure out whats happening for development purposes, disable this -->
<Logger name="com.zaxxer.hikari" level="off" />
<Root level="debug">
<AppenderRef ref="file"/>
</Root>
</Loggers>
</Configuration>

View File

@@ -1,13 +1,13 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc'
s.version = '5.0.0'
s.licenses = ['Apache License (2.0)']
s.summary = 'This plugin allows you to output to SQL, via JDBC'
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install 'logstash-output-jdbc'. This gem is not a stand-alone program"
s.authors = ['the_angry_angel']
s.email = 'karl+github@theangryangel.co.uk'
s.homepage = 'https://github.com/theangryangel/logstash-output-jdbc'
s.require_paths = ['lib']
s.version = "0.3.1"
s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
s.authors = ["the_angry_angel"]
s.email = "karl+github@theangryangel.co.uk"
s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
s.require_paths = [ "lib" ]
# Java only
s.platform = 'java'
@@ -15,18 +15,18 @@ Gem::Specification.new do |s|
# Files
s.files = Dir.glob('{lib,spec}/**/*.rb') + Dir.glob('vendor/**/*') + %w(LICENSE.txt README.md)
# Tests
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
# Special flag to let us know this is actually a logstash plugin
s.metadata = { 'logstash_plugin' => 'true', 'logstash_group' => 'output' }
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
# Gem dependencies
s.add_runtime_dependency 'logstash-core-plugin-api', '>= 1.60', '<= 2.99'
s.add_runtime_dependency 'logstash-core-plugin-api', '~> 1.0'
s.add_runtime_dependency 'stud'
s.add_runtime_dependency 'logstash-codec-plain'
s.requirements << "jar 'com.zaxxer:HikariCP', '2.4.7'"
s.requirements << "jar 'com.zaxxer:HikariCP', '2.4.2'"
s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.21'"
s.add_development_dependency 'jar-dependencies'

View File

@@ -1,19 +0,0 @@
#!/usr/bin/env ruby -w
seconds_to_reach = 10 * 60
retry_max_interval = 128
current_interval = 2
total_interval = 0
exceptions_count = 1
loop do
break if total_interval > seconds_to_reach
exceptions_count += 1
current_interval = current_interval * 2 > retry_max_interval ? retry_max_interval : current_interval * 2
total_interval += current_interval
end
puts exceptions_count

View File

@@ -4,32 +4,6 @@ require 'stud/temporary'
require 'java'
require 'securerandom'
RSpec.configure do |c|
def start_service(name)
cmd = "sudo /etc/init.d/#{name}* start"
`which systemctl`
if $?.success?
cmd = "sudo systemctl start #{name}"
end
`#{cmd}`
end
def stop_service(name)
cmd = "sudo /etc/init.d/#{name}* stop"
`which systemctl`
if $?.success?
cmd = "sudo systemctl stop #{name}"
end
`#{cmd}`
end
end
RSpec.shared_context 'rspec setup' do
it 'ensure jar is available' do
expect(ENV[jdbc_jar_env]).not_to be_nil, "#{jdbc_jar_env} not defined, required to run tests"
@@ -46,9 +20,7 @@ RSpec.shared_context 'when initializing' do
end
RSpec.shared_context 'when outputting messages' do
let(:logger) {
double("logger")
}
let(:logger) { double("logger") }
let(:jdbc_test_table) do
'logstash_output_jdbc_test'
@@ -59,11 +31,11 @@ RSpec.shared_context 'when outputting messages' do
end
let(:jdbc_create_table) do
"CREATE table #{jdbc_test_table} (created_at datetime not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit bit not null, static_bigint bigint not null)"
"CREATE table #{jdbc_test_table} (created_at datetime not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit bit not null)"
end
let(:jdbc_statement) do
["insert into #{jdbc_test_table} (created_at, message, message_sprintf, static_int, static_bit, static_bigint) values(?, ?, ?, ?, ?, ?)", '@timestamp', 'message', 'sprintf-%{message}', 1, true, 4000881632477184]
["insert into #{jdbc_test_table} (created_at, message, message_sprintf, static_int, static_bit) values(?, ?, ?, ?, ?)", '@timestamp', 'message', 'sprintf-%{message}', 1, true]
end
let(:systemd_database_service) do
@@ -71,27 +43,16 @@ RSpec.shared_context 'when outputting messages' do
end
let(:event_fields) do
{ message: "test-message #{SecureRandom.uuid}" }
{ 'message' => "test-message #{SecureRandom.uuid}" }
end
let(:event) { LogStash::Event.new(event_fields) }
let(:plugin) do
# Setup logger
allow(LogStash::Outputs::Jdbc).to receive(:logger).and_return(logger)
# XXX: Suppress reflection logging. There has to be a better way around this.
allow(logger).to receive(:debug).with(/config LogStash::/)
# Suppress beta warnings.
allow(logger).to receive(:info).with(/Please let us know if you find bugs or have suggestions on how to improve this plugin./)
# Suppress start up messages.
expect(logger).to receive(:info).once.with(/JDBC - Starting up/)
# Setup plugin
output = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
output.register
output.logger = logger
# Setup table
c = output.instance_variable_get(:@pool).getConnection
@@ -121,7 +82,7 @@ RSpec.shared_context 'when outputting messages' do
# Verify the number of items in the output table
c = plugin.instance_variable_get(:@pool).getConnection
stmt = c.prepareStatement("select count(*) as total from #{jdbc_test_table} where message = ?")
stmt.setString(1, event.get('message'))
stmt.setString(1, event['message'])
rs = stmt.executeQuery
count = 0
count = rs.getInt('total') while rs.next
@@ -132,14 +93,10 @@ RSpec.shared_context 'when outputting messages' do
end
it 'should not save event, and log an unretryable exception' do
e = event
original_event = e.get('message')
e.set('message', nil)
e = LogStash::Event.new({})
expect(logger).to receive(:error).once.with(/JDBC - Exception. Not retrying/, Hash)
expect { plugin.multi_receive([event]) }.to_not raise_error
e.set('message', original_event)
expect { plugin.multi_receive([e]) }.to_not raise_error
end
it 'it should retry after a connection loss, and log a warning' do
@@ -150,21 +107,29 @@ RSpec.shared_context 'when outputting messages' do
# Check that everything is fine right now
expect { p.multi_receive([event]) }.not_to raise_error
stop_service(systemd_database_service)
# Start a thread to restart the service after the fact.
# Start a thread to stop and restart the service.
t = Thread.new(systemd_database_service) { |systemd_database_service|
sleep 20
start_stop_cmd = 'sudo /etc/init.d/%<service>s* %<action>s'
start_service(systemd_database_service)
`which systemctl`
if $?.success?
start_stop_cmd = 'sudo systemctl %<action>s %<service>s'
end
cmd = start_stop_cmd % { action: 'stop', service: systemd_database_service }
`#{cmd}`
sleep 10
cmd = start_stop_cmd % { action: 'start', service: systemd_database_service }
`#{cmd}`
}
t.run
# Wait a few seconds to the service to stop
sleep 5
expect(logger).to receive(:warn).at_least(:once).with(/JDBC - Exception. Retrying/, Hash)
expect { p.multi_receive([event]) }.to_not raise_error
# Wait for the thread to finish
t.join
end
end

View File

@@ -10,7 +10,7 @@ describe 'logstash-output-jdbc: derby', if: ENV['JDBC_DERBY_JAR'] do
end
let(:jdbc_create_table) do
"CREATE table #{jdbc_test_table} (created_at timestamp not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit boolean not null, static_bigint bigint not null)"
"CREATE table #{jdbc_test_table} (created_at timestamp not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit boolean not null)"
end
let(:jdbc_settings) do