This commit is contained in:
Karl Southern 2015-11-17 10:32:16 +00:00
parent a6c669cc52
commit 9804850714
3 changed files with 66 additions and 29 deletions

View File

@ -6,9 +6,15 @@ See below for tested adapters, and example configurations.
This has not yet been extensively tested with all JDBC drivers and may not yet work for you. This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
If you do find this works for a JDBC driver not listed, let me know and provide a small example configuration.
This plugin does not bundle any JDBC jar files, and does expect them to be in a This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below. particular location. Please ensure you read the 4 installation lines below.
## Headlines
- Support for connection pooling added in 0.2.0 [unreleased until #21 is resolved]
- Support for unsafe statement handling (allowing dynamic queries) in 0.2.0 [unreleased until #21 is resolved]
## Versions ## Versions
- See master branch for logstash v2+ - See master branch for logstash v2+
- See v1.5 branch for logstash v1.5 - See v1.5 branch for logstash v1.5
@ -16,20 +22,33 @@ particular location. Please ensure you read the 4 installation lines below.
## Installation ## Installation
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory - Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
- Now either:
- Use driver_class in your configuraton to specify a path to your jar file
- Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`) - Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation - Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- Configure - And then configure (examples below)
## Configuration options ## Configuration options
* driver_class, string, JDBC driver class to load
* connection_string, string, JDBC connection string | Option | Type | Description | Required? |
* statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | ------ | ---- | ----------- | --------- |
* flush_size, number, default = 1000, number of entries to buffer before sending to SQL | driver_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No |
* idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time | connection_string | String | JDBC connection URL | Yes |
* max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop | username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No |
* max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value | password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No |
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No |
| max_repeat_exceptions | Number | Number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop | No |
| max_repeat_exceptions_time | Number | Maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value | No |
## Example configurations ## Example configurations
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
### SQLite3 ### SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc * Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db` * SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
@ -42,7 +61,6 @@ output {
stdout { } stdout { }
jdbc { jdbc {
driver_class => 'org.sqlite.JDBC'
connection_string => 'jdbc:sqlite:test.db' connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
} }
@ -58,7 +76,6 @@ input
} }
output { output {
jdbc { jdbc {
driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;" connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
} }
@ -74,7 +91,6 @@ input
} }
output { output {
jdbc { jdbc {
driver_class => 'org.postgresql.Driver'
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password' connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
} }
@ -92,7 +108,6 @@ input
} }
output { output {
jdbc { jdbc {
driver_class => "oracle.jdbc.driver.OracleDriver"
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID" connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
} }
@ -110,9 +125,13 @@ input
} }
output { output {
jdbc { jdbc {
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD" connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
} }
} }
``` ```
### MariaDB
This is reportedly working, according to [@db2882](https://github.com/db2882) in issue #20.
No example configuration provided.
If you have a working sample, pull requests are welcome.

View File

@ -12,7 +12,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
config_name "jdbc" config_name "jdbc"
# Driver class - No longer required # Driver class - No longer required
config :driver_class, :obsolete => true config :driver_class, :obsolete => "driver_class is no longer required and can be removed from your configuration"
# Where to find the jar # Where to find the jar
# Defaults to not required, and to the original behaviour # Defaults to not required, and to the original behaviour
@ -104,7 +104,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end end
def receive(event) def receive(event)
return unless output?(event) return unless output?(event) or event.cancelled?
return unless @statement.length > 0 return unless @statement.length > 0
buffer_receive(event) buffer_receive(event)
@ -173,10 +173,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
def safe_flush(events, teardown=false) def safe_flush(events, teardown=false)
connection = @pool.getConnection() connection = @pool.getConnection()
statement = connection.prepareStatement(@statement[0]) statement = connection.prepareStatement(@statement[0])
events.each do |event| events.each do |event|
next if event.cancelled?
next if @statement.length < 2 next if @statement.length < 2
statement = add_statement_event_params(statement, event) statement = add_statement_event_params(statement, event)
@ -184,17 +184,13 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end end
begin begin
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
statement.executeBatch() statement.executeBatch()
statement.close() statement.close()
rescue => e rescue => e
# Raising an exception will incur a retry from Stud::Buffer. # Raising an exception will incur a retry from Stud::Buffer.
# Since the exceutebatch failed this should mean any events failed to be # Since the exceutebatch failed this should mean any events failed to be
# inserted will be re-run. We're going to log it for the lols anyway. # inserted will be re-run. We're going to log it for the lols anyway.
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e) log_jdbc_exception(e)
if e.getNextException() != nil
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
end
ensure ensure
connection.close(); connection.close();
end end
@ -204,15 +200,27 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
connection = @pool.getConnection() connection = @pool.getConnection()
events.each do |event| events.each do |event|
statement = connection.prepareStatement(event.sprintf(@statement[0])) next if event.cancelled?
statement = connection.prepareStatement(event.sprintf(@statement[0]))
statement = add_statement_event_params(statement, event) if @statement.length > 1 statement = add_statement_event_params(statement, event) if @statement.length > 1
begin
statement.execute() statement.execute()
# cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line
event.cancel
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# We log for the lols.
log_jdbc_exception(e)
ensure
statement.close() statement.close()
connection.close() connection.close()
end end
end end
end
def add_statement_event_params(statement, event) def add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx| @statement[1..-1].each_with_index do |i, idx|
@ -237,4 +245,13 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
statement statement
end end
def log_jdbc_exception(e)
ce = e
loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => ce)
ce = e.getNextException()
break if ce == nil
end
end
end # class LogStash::Outputs::jdbc end # class LogStash::Outputs::jdbc

View File

@ -1,6 +1,6 @@
Gem::Specification.new do |s| Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc' s.name = 'logstash-output-jdbc'
s.version = "0.2.0.rc2" s.version = "0.2.0.rc3"
s.licenses = [ "Apache License (2.0)" ] s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC" s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
@ -10,7 +10,8 @@ Gem::Specification.new do |s|
s.require_paths = [ "lib" ] s.require_paths = [ "lib" ]
# Files # Files
s.files = `git ls-files`.split($\) s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
# Tests # Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/}) s.test_files = s.files.grep(%r{^(test|spec|features)/})