From 98048507148904b0fd637361aa0d124d3bb5de51 Mon Sep 17 00:00:00 2001 From: Karl Southern Date: Tue, 17 Nov 2015 10:32:16 +0000 Subject: [PATCH] WIP --- README.md | 49 +++++++++++++++++++++++++----------- lib/logstash/outputs/jdbc.rb | 41 +++++++++++++++++++++--------- logstash-output-jdbc.gemspec | 5 ++-- 3 files changed, 66 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 8eb8c60..94e8882 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,15 @@ See below for tested adapters, and example configurations. This has not yet been extensively tested with all JDBC drivers and may not yet work for you. +If you do find this works for a JDBC driver not listed, let me know and provide a small example configuration. + This plugin does not bundle any JDBC jar files, and does expect them to be in a particular location. Please ensure you read the 4 installation lines below. +## Headlines + - Support for connection pooling added in 0.2.0 [unreleased until #21 is resolved] + - Support for unsafe statement handling (allowing dynamic queries) in 0.2.0 [unreleased until #21 is resolved] + ## Versions - See master branch for logstash v2+ - See v1.5 branch for logstash v1.5 @@ -16,20 +22,33 @@ particular location. Please ensure you read the 4 installation lines below. ## Installation - Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory - - Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`) - - Add JDBC jar files to vendor/jar/jdbc in your logstash installation - - Configure + - Now either: + - Use driver_class in your configuraton to specify a path to your jar file + - Or: + - Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`) + - Add JDBC jar files to vendor/jar/jdbc in your logstash installation + - And then configure (examples below) ## Configuration options - * driver_class, string, JDBC driver class to load - * connection_string, string, JDBC connection string - * statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. - * flush_size, number, default = 1000, number of entries to buffer before sending to SQL - * idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time - * max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop - * max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value + +| Option | Type | Description | Required? | +| ------ | ---- | ----------- | --------- | +| driver_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | +| connection_string | String | JDBC connection URL | Yes | +| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | +| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | +| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | +| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | +| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | +| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | +| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | +| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | +| max_repeat_exceptions | Number | Number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop | No | +| max_repeat_exceptions_time | Number | Maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value | No | ## Example configurations +If you have a working sample configuration, for a DB thats not listed, pull requests are welcome. + ### SQLite3 * Tested using https://bitbucket.org/xerial/sqlite-jdbc * SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db` @@ -42,7 +61,6 @@ output { stdout { } jdbc { - driver_class => 'org.sqlite.JDBC' connection_string => 'jdbc:sqlite:test.db' statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] } @@ -58,7 +76,6 @@ input } output { jdbc { - driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver' connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;" statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] } @@ -74,7 +91,6 @@ input } output { jdbc { - driver_class => 'org.postgresql.Driver' connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password' statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] } @@ -92,7 +108,6 @@ input } output { jdbc { - driver_class => "oracle.jdbc.driver.OracleDriver" connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID" statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] } @@ -110,9 +125,13 @@ input } output { jdbc { - driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD" statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] } } ``` + +### MariaDB +This is reportedly working, according to [@db2882](https://github.com/db2882) in issue #20. +No example configuration provided. +If you have a working sample, pull requests are welcome. diff --git a/lib/logstash/outputs/jdbc.rb b/lib/logstash/outputs/jdbc.rb index 343db59..58c29c1 100644 --- a/lib/logstash/outputs/jdbc.rb +++ b/lib/logstash/outputs/jdbc.rb @@ -12,7 +12,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base config_name "jdbc" # Driver class - No longer required - config :driver_class, :obsolete => true + config :driver_class, :obsolete => "driver_class is no longer required and can be removed from your configuration" # Where to find the jar # Defaults to not required, and to the original behaviour @@ -104,7 +104,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base end def receive(event) - return unless output?(event) + return unless output?(event) or event.cancelled? return unless @statement.length > 0 buffer_receive(event) @@ -173,10 +173,10 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base def safe_flush(events, teardown=false) connection = @pool.getConnection() - statement = connection.prepareStatement(@statement[0]) events.each do |event| + next if event.cancelled? next if @statement.length < 2 statement = add_statement_event_params(statement, event) @@ -184,17 +184,13 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base end begin - @logger.debug("JDBC - Sending SQL", :sql => statement.toString()) statement.executeBatch() statement.close() rescue => e # Raising an exception will incur a retry from Stud::Buffer. # Since the exceutebatch failed this should mean any events failed to be # inserted will be re-run. We're going to log it for the lols anyway. - @logger.warn("JDBC - Exception. Will automatically retry", :exception => e) - if e.getNextException() != nil - @logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException()) - end + log_jdbc_exception(e) ensure connection.close(); end @@ -204,13 +200,25 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base connection = @pool.getConnection() events.each do |event| + next if event.cancelled? + statement = connection.prepareStatement(event.sprintf(@statement[0])) - statement = add_statement_event_params(statement, event) if @statement.length > 1 - statement.execute() - statement.close() - connection.close() + begin + statement.execute() + + # cancel the event, since we may end up outputting the same event multiple times + # if an exception happens later down the line + event.cancel + rescue => e + # Raising an exception will incur a retry from Stud::Buffer. + # We log for the lols. + log_jdbc_exception(e) + ensure + statement.close() + connection.close() + end end end @@ -237,4 +245,13 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base statement end + + def log_jdbc_exception(e) + ce = e + loop do + @logger.error("JDBC Exception encountered: Will automatically retry.", :exception => ce) + ce = e.getNextException() + break if ce == nil + end + end end # class LogStash::Outputs::jdbc diff --git a/logstash-output-jdbc.gemspec b/logstash-output-jdbc.gemspec index 381764b..9867725 100644 --- a/logstash-output-jdbc.gemspec +++ b/logstash-output-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-jdbc' - s.version = "0.2.0.rc2" + s.version = "0.2.0.rc3" s.licenses = [ "Apache License (2.0)" ] s.summary = "This plugin allows you to output to SQL, via JDBC" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" @@ -10,7 +10,8 @@ Gem::Specification.new do |s| s.require_paths = [ "lib" ] # Files - s.files = `git ls-files`.split($\) + s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md) + # Tests s.test_files = s.files.grep(%r{^(test|spec|features)/})