Compare commits

...

18 Commits
master ... v2.x

Author SHA1 Message Date
Karl
efa84fb63f Update README.md 2017-04-01 12:20:41 +01:00
Karl
a9acb33adf Update README.md
Removes incorrect performance information on unsafe_statement. No longer relevant for current releases.
2016-10-05 11:48:18 +01:00
Karl Southern
1788c81f91 Backport travisci fix 2016-09-15 22:13:53 +01:00
Karl Southern
47fdf7d442 Adds bigint/long support to address #61 2016-09-15 11:07:39 +01:00
Karl
ffb2f700be Update README.md 2016-08-28 23:16:25 +01:00
Karl Southern
f52c14b79a Fix travis 2016-08-28 21:59:35 +01:00
Karl Southern
f46fd58048 connection_test supression support for issue #53 2016-08-28 21:48:06 +01:00
Karl Southern
3dc7627782 v0.3.0 2016-07-24 12:14:31 +01:00
Karl Southern
9235c48c88 Fix travis for v2.x 2016-07-13 17:46:42 +01:00
Karl Southern
53e665bbb6 0.3.0 uses jar-dependencies 2016-07-13 17:41:32 +01:00
Karl Southern
fa2d226fbf 0.3.0.pre - Preparing for threadsafety 2016-07-13 17:40:35 +01:00
Karl Southern
da5a3d8be3 0.2.10 2016-07-07 11:03:14 +01:00
Karl Southern
b10462dacd Preparing for 0.2.10 2016-07-07 10:09:31 +01:00
Karl Southern
61c7a1307e Provisionally address issue 46 2016-07-07 08:50:58 +01:00
Karl Southern
b5419813ba 0.2.9 2016-06-29 13:42:09 +01:00
Karl Southern
ded1106b13 Address issue 44. 2016-06-28 22:38:36 +01:00
Karl Southern
2b27f39088 0.2.7 2016-05-29 13:45:26 +01:00
Karl Southern
7b337a8b91 Backport functionality from v5 branch. 2016-05-29 13:40:47 +01:00
18 changed files with 553 additions and 297 deletions

8
.gitignore vendored
View File

@ -2,3 +2,11 @@
Gemfile.lock Gemfile.lock
Gemfile.bak Gemfile.bak
.bundle .bundle
.vagrant
.mvn
vendor
lib/**/*.jar
.DS_Store
*.swp
*.log

View File

@ -1,8 +1,11 @@
sudo: required
language: ruby language: ruby
cache: bundler cache: bundler
rvm: rvm:
- jruby - jruby
before_script: before_script:
- wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar - bundle exec rake vendor
- export JDBC_DERBY_JAR=/tmp/derby.jar - bundle exec rake install_jars
- ./scripts/travis-before_script.sh
- source ./scripts/travis-variables.sh
script: bundle exec rspec script: bundle exec rspec

View File

@ -1,6 +1,31 @@
# Change Log # Change Log
All notable changes to this project will be documented in this file, from 0.2.0. All notable changes to this project will be documented in this file, from 0.2.0.
## [0.3.2] - 2016-09-15
- Adds long/bigint support to address https://github.com/theangryangel/logstash-output-jdbc/issues/61
## [0.3.1] - 2016-08-28
- Adds connection_test configuration option, to prevent the connection test from occuring, allowing the error to be suppressed.
Useful for cockroachdb deployments. https://github.com/theangryangel/logstash-output-jdbc/issues/53
## [0.3.0] - 2016-07-24
- Brings tests from v5 branch, providing greater coverage
- Removes bulk update support, due to inconsistent behaviour
- Plugin now marked as threadsafe, meaning only 1 instance per-Logstash
- Raises default max_pool_size to match the default number of workers (1 connection per worker)
## [0.2.10] - 2016-07-07
- Support non-string entries in statement array
- Adds backtrace to exception logging
## [0.2.9] - 2016-06-29
- Fix NameError exception.
- Moved log_jdbc_exception calls
## [0.2.7] - 2016-05-29
- Backport retry exception logic from v5 branch
- Backport improved timestamp compatibility from v5 branch
## [0.2.6] - 2016-05-02 ## [0.2.6] - 2016-05-02
- Fix for exception infinite loop - Fix for exception infinite loop

View File

@ -1,6 +1,8 @@
# logstash-output-jdbc # logstash-output-jdbc
[![Build Status](https://travis-ci.org/theangryangel/logstash-output-jdbc.svg?branch=master)](https://travis-ci.org/theangryangel/logstash-output-jdbc) [![Build Status](https://travis-ci.org/theangryangel/logstash-output-jdbc.svg?branch=v2.x)](https://travis-ci.org/theangryangel/logstash-output-jdbc)
⚠️ The logstash v2 version of the plugin does not contain all fixes covered by the v5 version. If you find an issue is resolved under v5, but require the same issue fixed under v2, please raise an issue and I will do my best to find the time to backport the fix. At this time I recommend Logstash v5 where possible.
This plugin is provided as an external plugin and is not part of the Logstash project. This plugin is provided as an external plugin and is not part of the Logstash project.
@ -14,7 +16,7 @@ If you do find this works for a JDBC driver without an example, let me know and
This plugin does not bundle any JDBC jar files, and does expect them to be in a This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below. particular location. Please ensure you read the 4 installation lines below.
## ChangeLog ## Changelog
See CHANGELOG.md See CHANGELOG.md
## Versions ## Versions
@ -27,23 +29,13 @@ For development:
- See v1.4 branch for logstash 1.4 - See v1.4 branch for logstash 1.4
## Installation ## Installation
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory - Run `bin/logstash-plugin install logstash-output-jdbc` in your logstash installation directory
- Now either: - Now either:
- Use driver_jar_path in your configuraton to specify a path to your jar file - Use driver_jar_path in your configuraton to specify a path to your jar file
- Or: - Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`) - Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation - Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- And then configure (examples below) - And then configure (examples can be found in the examples directory)
## Running tests
At this time tests only run against Derby, in an in-memory database.
Acceptance tests for individual database engines will be added over time.
Assuming valid jruby is installed
- First time, issue `jruby -S bundle install` to install dependencies
- Next, download Derby jar from https://db.apache.org/derby/
- Run the tests `JDBC_DERBY_JAR=path/to/derby.jar jruby -S rspec`
- Optionally add the `JDBC_DEBUG=1` env variable to add logging to stdout
## Configuration options ## Configuration options
@ -53,17 +45,43 @@ Assuming valid jruby is installed
| driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True | | driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
| driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | | | driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
| connection_string | String | JDBC connection URL | Yes | | | connection_string | String | JDBC connection URL | Yes | |
| connection_test | Boolean | Run a JDBC connection test. Some drivers do not function correctly, and you may need to disable the connection test to supress an error. Cockroach with the postgres JDBC driver is such an example. | No | Yes |
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | | | username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | | | password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | | | statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False | | unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 | | max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time. Default set to same as Logstash default number of workers | No | 24 |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 | | connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 | | flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 | | max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 |
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 | | retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
| retry_sql_states | Array of strings | An array of custom SQL state codes you wish to retry until `max_flush_exceptions`. Useful if you're using a JDBC driver which returns retry-able, but non-standard SQL state codes in it's exceptions. | No | [] |
## Example configurations ## Example configurations
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar. Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome. If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
## Development and Running tests
For development tests are recommended to run inside a virtual machine (Vagrantfile is included in the repo), as it requires
access to various database engines and could completely destroy any data in a live system.
If you have vagrant available (this is temporary whilst I'm hacking on v5 support. I'll make this more streamlined later):
- `vagrant up`
- `vagrant ssh`
- `cd /vagrant`
- `gem install bundler`
- `cd /vagrant && bundle install && bundle exec rake vendor && bundle exec rake install_jars`
- `./scripts/travis-before_script.sh && source ./scripts/travis-variables.sh`
- `bundle exec rspec`
## Releasing
- Update Changelog
- Bump version in gemspec
- Commit
- Create tag `git tag v<version-number-in-gemspec>`
- `bundle exec rake install_jars`
- `bundle exec rake pre_release_checks`
- `gem build logstash-output-jdbc.gemspec`
- `gem push`

View File

@ -1 +1,11 @@
require "logstash/devutils/rake" # encoding: utf-8
require 'logstash/devutils/rake'
require 'jars/installer'
require 'rubygems'
desc 'Fetch any jars required for this plugin'
task :install_jars do
ENV['JARS_HOME'] = Dir.pwd + '/vendor/jar-dependencies/runtime-jars'
ENV['JARS_VENDOR'] = 'false'
Jars::Installer.new.vendor_jars!(false)
end

View File

@ -1,17 +0,0 @@
class RingBuffer < Array
attr_reader :max_size
def initialize(max_size, enum = nil)
@max_size = max_size
enum.each { |e| self << e } if enum
end
def <<(el)
if self.size < @max_size || @max_size.nil?
super
else
self.shift
self.push(el)
end
end
end

View File

@ -1,141 +1,134 @@
# encoding: utf-8 # encoding: utf-8
require "logstash/outputs/base" require 'logstash/outputs/base'
require "logstash/namespace" require 'logstash/namespace'
require "stud/buffer" require 'concurrent'
require "java" require 'stud/interval'
require "logstash-output-jdbc_jars" require 'java'
require "logstash-output-jdbc_ring-buffer" require 'logstash-output-jdbc_jars'
# Write events to a SQL engine, using JDBC.
#
# It is upto the user of the plugin to correctly configure the plugin. This
# includes correctly crafting the SQL statement, and matching the number of
# parameters correctly.
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Adds buffer support declare_threadsafe! if self.respond_to?(:declare_threadsafe!)
include Stud::Buffer
config_name "jdbc" STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
RETRYABLE_SQLSTATE_CLASSES = [
# Classes of retryable SQLSTATE codes
# Not all in the class will be retryable. However, this is the best that
# we've got right now.
# If a custom state code is required, set it in retry_sql_states.
'08', # Connection Exception
'24', # Invalid Cursor State (Maybe retry-able in some circumstances)
'25', # Invalid Transaction State
'40', # Transaction Rollback
'53', # Insufficient Resources
'54', # Program Limit Exceeded (MAYBE)
'55', # Object Not In Prerequisite State
'57', # Operator Intervention
'58', # System Error
].freeze
config_name 'jdbc'
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26 # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
config :driver_class, :validate => :string config :driver_class, validate: :string
# Does the JDBC driver support autocommit? # Does the JDBC driver support autocommit?
config :driver_auto_commit, :validate => :boolean, :default => true, :required => true config :driver_auto_commit, validate: :boolean, default: true, required: true
# Where to find the jar # Where to find the jar
# Defaults to not required, and to the original behaviour # Defaults to not required, and to the original behaviour
config :driver_jar_path, :validate => :string, :required => false config :driver_jar_path, validate: :string, required: false
# jdbc connection string # jdbc connection string
config :connection_string, :validate => :string, :required => true config :connection_string, validate: :string, required: true
# jdbc username - optional, maybe in the connection string # jdbc username - optional, maybe in the connection string
config :username, :validate => :string, :required => false config :username, validate: :string, required: false
# jdbc password - optional, maybe in the connection string # jdbc password - optional, maybe in the connection string
config :password, :validate => :string, :required => false config :password, validate: :string, required: false
# [ "insert into table (message) values(?)", "%{message}" ] # [ "insert into table (message) values(?)", "%{message}" ]
config :statement, :validate => :array, :required => true config :statement, validate: :array, required: true
# If this is an unsafe statement, use event.sprintf # If this is an unsafe statement, use event.sprintf
# This also has potential performance penalties due to having to create a # This also has potential performance penalties due to having to create a
# new statement for each event, rather than adding to the batch and issuing # new statement for each event, rather than adding to the batch and issuing
# multiple inserts in 1 go # multiple inserts in 1 go
config :unsafe_statement, :validate => :boolean, :default => false config :unsafe_statement, validate: :boolean, default: false
# Number of connections in the pool to maintain # Number of connections in the pool to maintain
config :max_pool_size, :validate => :number, :default => 5 config :max_pool_size, validate: :number, default: 24
# Connection timeout # Connection timeout
config :connection_timeout, :validate => :number, :default => 10000 config :connection_timeout, validate: :number, default: 10000
# We buffer a certain number of events before flushing that out to SQL. # We buffer a certain number of events before flushing that out to SQL.
# This setting controls how many events will be buffered before sending a # This setting controls how many events will be buffered before sending a
# batch of events. # batch of events.
config :flush_size, :validate => :number, :default => 1000 config :flush_size, validate: :number, default: 1000
# The amount of time since last flush before a flush is forced. # Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval`
# config :retry_initial_interval, validate: :number, default: 2
# This setting helps ensure slow event rates don't get stuck in Logstash.
# For example, if your `flush_size` is 100, and you have received 10 events,
# and it has been more than `idle_flush_time` seconds since the last flush,
# Logstash will flush those 10 events automatically.
#
# This helps keep both fast and slow log streams moving along in
# a timely manner.
#
# If you change this value please ensure that you change
# max_flush_exceptions accordingly.
config :idle_flush_time, :validate => :number, :default => 1
# Maximum number of sequential flushes which encounter exceptions, before we stop retrying. # Maximum time between retries, in seconds
config :retry_max_interval, validate: :number, default: 128
# Any additional custom, retryable SQL state codes.
# Suitable for configuring retryable custom JDBC SQL state codes.
config :retry_sql_states, validate: :array, default: []
# Run a connection test on start.
config :connection_test, validate: :boolean, default: true
# Maximum number of sequential failed attempts, before we stop retrying.
# If set to < 1, then it will infinitely retry. # If set to < 1, then it will infinitely retry.
# # At the default values this is a little over 10 minutes
# You should carefully tune this in relation to idle_flush_time if your SQL server config :max_flush_exceptions, validate: :number, default: 10
# is not highly available.
# i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
# longer than 200 seconds to reboot, then logstash will stop.
config :max_flush_exceptions, :validate => :number, :default => 0
config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation." config :max_repeat_exceptions, obsolete: 'This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation.'
config :max_repeat_exceptions_time, :obsolete => "This is no longer required" config :max_repeat_exceptions_time, obsolete: 'This is no longer required'
config :idle_flush_time, obsolete: 'No longer necessary under Logstash v5'
public
def register def register
@logger.info("JDBC - Starting up") @logger.info('JDBC - Starting up')
LogStash::Logger.setup_log4j(@logger)
load_jar_files! load_jar_files!
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions) @stopping = Concurrent::AtomicBoolean.new(false)
if (@flush_size > 1000) @logger.warn('JDBC - Flush size is set to > 1000') if @flush_size > 1000
@logger.warn("JDBC - Flush size is set to > 1000")
if @statement.empty?
@logger.error('JDBC - No statement provided. Configuration error.')
end end
if @statement.length < 1 if !@unsafe_statement && @statement.length < 2
@logger.error("JDBC - No statement provided. Configuration error.")
end
if (!@unsafe_statement and @statement.length < 2)
@logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.") @logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
end end
setup_and_test_pool! setup_and_test_pool!
end
buffer_initialize( def multi_receive(events)
:max_items => @flush_size, events.each_slice(@flush_size) do |slice|
:max_interval => @idle_flush_time, retrying_submit(slice)
:logger => @logger end
)
end end
def receive(event) def receive(event)
return unless output?(event) or event.cancelled? retrying_submit([event])
return unless @statement.length > 0
buffer_receive(event)
end end
def flush(events, teardown=false) def close
if @unsafe_statement == true @stopping.make_true
unsafe_flush(events, teardown) @pool.close
else
safe_flush(events, teardown)
end
end
def on_flush_error(e)
return if @max_flush_exceptions < 1
@exceptions_tracker << e.class
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
@logger.error("JDBC - max_flush_exceptions has been reached")
log_jdbc_exception(e)
raise LogStash::ShutdownSignal.new
end
end
def teardown
buffer_flush(:final => true)
@pool.close()
super super
end end
@ -158,131 +151,183 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
validate_connection_timeout = (@connection_timeout / 1000) / 2 validate_connection_timeout = (@connection_timeout / 1000) / 2
return unless @connection_test
# Test connection # Test connection
test_connection = @pool.getConnection() test_connection = @pool.getConnection
unless test_connection.isValid(validate_connection_timeout) unless test_connection.isValid(validate_connection_timeout)
@logger.error("JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.") @logger.error('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
end end
test_connection.close() test_connection.close
end end
def load_jar_files! def load_jar_files!
# Load jar from driver path # Load jar from driver path
unless @driver_jar_path.nil? unless @driver_jar_path.nil?
raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path
require @driver_jar_path require @driver_jar_path
return return
end end
# Revert original behaviour of loading from vendor directory # Revert original behaviour of loading from vendor directory
# if no path given # if no path given
if ENV['LOGSTASH_HOME'] jarpath = if ENV['LOGSTASH_HOME']
jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar") File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar')
else else
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar") File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar')
end end
@logger.debug("JDBC - jarpath", path: jarpath) @logger.debug('JDBC - jarpath', path: jarpath)
jars = Dir[jarpath] jars = Dir[jarpath]
raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty? raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty?
jars.each do |jar| jars.each do |jar|
@logger.debug("JDBC - Loaded jar", :jar => jar) @logger.debug('JDBC - Loaded jar', jar: jar)
require jar require jar
end end
end end
def safe_flush(events, teardown=false) def submit(events)
connection = nil connection = nil
statement = nil statement = nil
events_to_retry = []
begin begin
connection = @pool.getConnection() connection = @pool.getConnection
statement = connection.prepareStatement(@statement[0])
events.each do |event|
next if event.cancelled?
next if @statement.length < 2
statement = add_statement_event_params(statement, event)
statement.addBatch()
end
statement.executeBatch()
statement.close()
@exceptions_tracker << nil
rescue => e rescue => e
log_jdbc_exception(e) log_jdbc_exception(e, true)
ensure # If a connection is not available, then the server has gone away
statement.close() unless statement.nil? # We're not counting that towards our retry count.
connection.close() unless connection.nil? return events, false
end end
end
def unsafe_flush(events, teardown=false)
connection = nil
statement = nil
begin
connection = @pool.getConnection()
events.each do |event| events.each do |event|
next if event.cancelled? begin
statement = connection.prepareStatement(
statement = connection.prepareStatement(event.sprintf(@statement[0])) (@unsafe_statement == true) ? event.sprintf(@statement[0]) : @statement[0]
)
statement = add_statement_event_params(statement, event) if @statement.length > 1 statement = add_statement_event_params(statement, event) if @statement.length > 1
statement.execute
statement.execute()
# cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line
event.cancel
@exceptions_tracker << nil
end
rescue => e rescue => e
log_jdbc_exception(e) if retry_exception?(e)
events_to_retry.push(event)
end
ensure ensure
statement.close() unless statement.nil? statement.close unless statement.nil?
connection.close() unless connection.nil? end
end
connection.close unless connection.nil?
return events_to_retry, true
end
def retrying_submit(actions)
# Initially we submit the full list of actions
submit_actions = actions
count_as_attempt = true
attempts = 1
sleep_interval = @retry_initial_interval
while @stopping.false? and (submit_actions and !submit_actions.empty?)
return if !submit_actions || submit_actions.empty? # If everything's a success we move along
# We retry whatever didn't succeed
submit_actions, count_as_attempt = submit(submit_actions)
# Everything was a success!
break if !submit_actions || submit_actions.empty?
if @max_flush_exceptions > 0 and count_as_attempt == true
attempts += 1
if attempts > @max_flush_exceptions
@logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.")
break
end
end
# If we're retrying the action sleep for the recommended interval
# Double the interval for the next time through to achieve exponential backoff
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
sleep_interval = next_sleep_interval(sleep_interval)
end end
end end
def add_statement_event_params(statement, event) def add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx| @statement[1..-1].each_with_index do |i, idx|
case event[i] if i.is_a? String
when Time value = event[i]
# Most reliable solution, cross JDBC driver if value.nil? and i =~ /%\{/
statement.setString(idx + 1, event[i].iso8601()) value = event.sprintf(i)
when LogStash::Timestamp end
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].to_iso8601())
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
when true
statement.setBoolean(idx + 1, true)
when false
statement.setBoolean(idx + 1, false)
else else
if event[i].nil? and i =~ /%\{/ value = i
statement.setString(idx + 1, event.sprintf(i)) end
case value
when Time
# See LogStash::Timestamp, below, for the why behind strftime.
statement.setString(idx + 1, value.strftime(STRFTIME_FMT))
when LogStash::Timestamp
# XXX: Using setString as opposed to setTimestamp, because setTimestamp
# doesn't behave correctly in some drivers (Known: sqlite)
#
# Additionally this does not use `to_iso8601`, since some SQL databases
# choke on the 'T' in the string (Known: Derby).
#
# strftime appears to be the most reliable across drivers.
statement.setString(idx + 1, value.time.strftime(STRFTIME_FMT))
when Fixnum, Integer
# bit_length doesn't exist in the current version of ruby/jruby logstash targets
# and this seems quicker than doing some Math.log2(value < 0 ? -value : value+1).ceil shit
if value > 2147483647 or value < -2147483648
statement.setLong(idx + 1, value)
else
statement.setInt(idx + 1, value)
end
when Float
statement.setFloat(idx + 1, value)
when String
statement.setString(idx + 1, value)
when true, false
statement.setBoolean(idx + 1, value)
else else
statement.setString(idx + 1, nil) statement.setString(idx + 1, nil)
end end
end end
end
statement statement
end end
def log_jdbc_exception(exception) def retry_exception?(exception)
retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState)))
log_jdbc_exception(exception, retrying)
retrying
end
def log_jdbc_exception(exception, retrying)
current_exception = exception current_exception = exception
log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') + '.'
log_method = (retrying ? 'warn' : 'error')
loop do loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => current_exception) @logger.send(log_method, log_text, :exception => current_exception, :backtrace => current_exception.backtrace)
if current_exception.respond_to? 'getNextException'
current_exception = current_exception.getNextException() current_exception = current_exception.getNextException()
else
current_exception = nil
end
break if current_exception == nil break if current_exception == nil
end end
end end
def next_sleep_interval(current_interval)
doubled = current_interval * 2
doubled > @retry_max_interval ? @retry_max_interval : doubled
end
end # class LogStash::Outputs::jdbc end # class LogStash::Outputs::jdbc

View File

@ -1,6 +1,6 @@
Gem::Specification.new do |s| Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc' s.name = 'logstash-output-jdbc'
s.version = "0.2.6" s.version = "0.3.2"
s.licenses = [ "Apache License (2.0)" ] s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC" s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
@ -9,8 +9,11 @@ Gem::Specification.new do |s|
s.homepage = "https://github.com/theangryangel/logstash-output-jdbc" s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
s.require_paths = [ "lib" ] s.require_paths = [ "lib" ]
# Java only
s.platform = 'java'
# Files # Files
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md) s.files = Dir.glob('{lib,spec}/**/*.rb') + Dir.glob('vendor/**/*') + %w(LICENSE.txt README.md)
# Tests # Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/}) s.test_files = s.files.grep(%r{^(test|spec|features)/})
@ -19,11 +22,17 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
# Gem dependencies # Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" s.add_runtime_dependency 'logstash-core-plugin-api', '~> 1.0'
s.add_runtime_dependency 'stud' s.add_runtime_dependency 'stud'
s.add_runtime_dependency "logstash-codec-plain" s.add_runtime_dependency 'logstash-codec-plain'
s.add_development_dependency "logstash-devutils" s.requirements << "jar 'com.zaxxer:HikariCP', '2.4.2'"
s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.21'"
s.post_install_message = "logstash-output-jdbc 0.2.0 introduces several new features - please ensure you check the documentation in the README file" s.add_development_dependency 'jar-dependencies'
s.add_development_dependency 'ruby-maven', '~> 3.3'
s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'rubocop', '0.41.2'
end end

View File

@ -0,0 +1,8 @@
#!/bin/bash
wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
sudo apt-get install mysql-server -qq -y
echo "create database logstash_output_jdbc_test;" | mysql -u root
wget http://search.maven.org/remotecontent?filepath=mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar -O /tmp/mysql.jar
wget http://search.maven.org/remotecontent?filepath=org/xerial/sqlite-jdbc/3.8.11.2/sqlite-jdbc-3.8.11.2.jar -O /tmp/sqlite.jar

View File

@ -0,0 +1,3 @@
export JDBC_DERBY_JAR=/tmp/derby.jar
export JDBC_MYSQL_JAR=/tmp/mysql.jar
export JDBC_SQLITE_JAR=/tmp/sqlite.jar

153
spec/jdbc_spec_helper.rb Normal file
View File

@ -0,0 +1,153 @@
require 'logstash/devutils/rspec/spec_helper'
require 'logstash/outputs/jdbc'
require 'stud/temporary'
require 'java'
require 'securerandom'
RSpec.configure do |c|
def start_service(name)
cmd = "sudo /etc/init.d/#{name}* start"
`which systemctl`
if $?.success?
cmd = "sudo systemctl start #{name}"
end
`#{cmd}`
end
def stop_service(name)
cmd = "sudo /etc/init.d/#{name}* stop"
`which systemctl`
if $?.success?
cmd = "sudo systemctl stop #{name}"
end
`#{cmd}`
end
end
RSpec.shared_context 'rspec setup' do
it 'ensure jar is available' do
expect(ENV[jdbc_jar_env]).not_to be_nil, "#{jdbc_jar_env} not defined, required to run tests"
expect(File.exist?(ENV[jdbc_jar_env])).to eq(true), "#{jdbc_jar_env} defined, but not valid"
end
end
RSpec.shared_context 'when initializing' do
it 'shouldn\'t register with a missing jar file' do
jdbc_settings['driver_jar_path'] = nil
plugin = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end
RSpec.shared_context 'when outputting messages' do
let(:logger) { double("logger") }
let(:jdbc_test_table) do
'logstash_output_jdbc_test'
end
let(:jdbc_drop_table) do
"DROP TABLE #{jdbc_test_table}"
end
let(:jdbc_create_table) do
"CREATE table #{jdbc_test_table} (created_at datetime not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit bit not null, static_bigint bigint not null)"
end
let(:jdbc_statement) do
["insert into #{jdbc_test_table} (created_at, message, message_sprintf, static_int, static_bit, static_bigint) values(?, ?, ?, ?, ?, ?)", '@timestamp', 'message', 'sprintf-%{message}', 1, true, 4000881632477184]
end
let(:systemd_database_service) do
nil
end
let(:event_fields) do
{ 'message' => "test-message #{SecureRandom.uuid}" }
end
let(:event) { LogStash::Event.new(event_fields) }
let(:plugin) do
# Setup plugin
output = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
output.register
output.logger = logger
# Setup table
c = output.instance_variable_get(:@pool).getConnection
# Derby doesn't support IF EXISTS.
# Seems like the quickest solution. Bleurgh.
begin
stmt = c.createStatement
stmt.executeUpdate(jdbc_drop_table)
rescue
# noop
ensure
stmt.close
stmt = c.createStatement
stmt.executeUpdate(jdbc_create_table)
stmt.close
c.close
end
output
end
it 'should save a event' do
expect { plugin.multi_receive([event]) }.to_not raise_error
# Verify the number of items in the output table
c = plugin.instance_variable_get(:@pool).getConnection
stmt = c.prepareStatement("select count(*) as total from #{jdbc_test_table} where message = ?")
stmt.setString(1, event['message'])
rs = stmt.executeQuery
count = 0
count = rs.getInt('total') while rs.next
stmt.close
c.close
expect(count).to eq(1)
end
it 'should not save event, and log an unretryable exception' do
e = LogStash::Event.new({})
expect(logger).to receive(:error).once.with(/JDBC - Exception. Not retrying/, Hash)
expect { plugin.multi_receive([e]) }.to_not raise_error
end
it 'it should retry after a connection loss, and log a warning' do
skip "does not run as a service" if systemd_database_service.nil?
p = plugin
# Check that everything is fine right now
expect { p.multi_receive([event]) }.not_to raise_error
stop_service(systemd_database_service)
# Start a thread to restart the service after the fact.
t = Thread.new(systemd_database_service) { |systemd_database_service|
sleep 20
start_service(systemd_database_service)
}
t.run
expect(logger).to receive(:warn).at_least(:once).with(/JDBC - Exception. Retrying/, Hash)
expect { p.multi_receive([event]) }.to_not raise_error
# Wait for the thread to finish
t.join
end
end

View File

@ -0,0 +1,25 @@
require_relative '../jdbc_spec_helper'
describe 'logstash-output-jdbc: derby', if: ENV['JDBC_DERBY_JAR'] do
include_context 'rspec setup'
include_context 'when initializing'
include_context 'when outputting messages'
let(:jdbc_jar_env) do
'JDBC_DERBY_JAR'
end
let(:jdbc_create_table) do
"CREATE table #{jdbc_test_table} (created_at timestamp not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit boolean not null, static_bigint bigint not null)"
end
let(:jdbc_settings) do
{
'driver_class' => 'org.apache.derby.jdbc.EmbeddedDriver',
'connection_string' => 'jdbc:derby:memory:testdb;create=true',
'driver_jar_path' => ENV[jdbc_jar_env],
'statement' => jdbc_statement,
'max_flush_exceptions' => 1
}
end
end

View File

@ -0,0 +1,25 @@
require_relative '../jdbc_spec_helper'
describe 'logstash-output-jdbc: mysql', if: ENV['JDBC_MYSQL_JAR'] do
include_context 'rspec setup'
include_context 'when initializing'
include_context 'when outputting messages'
let(:jdbc_jar_env) do
'JDBC_MYSQL_JAR'
end
let(:systemd_database_service) do
'mysql'
end
let(:jdbc_settings) do
{
'driver_class' => 'com.mysql.jdbc.Driver',
'connection_string' => 'jdbc:mysql://localhost/logstash_output_jdbc_test?user=root',
'driver_jar_path' => ENV[jdbc_jar_env],
'statement' => jdbc_statement,
'max_flush_exceptions' => 1
}
end
end

View File

@ -1,97 +1,11 @@
require "logstash/devutils/rspec/spec_helper" require_relative '../jdbc_spec_helper'
require "logstash/outputs/jdbc"
require "stud/temporary"
require "java"
describe LogStash::Outputs::Jdbc do describe LogStash::Outputs::Jdbc do
let(:derby_settings) do
{
"driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"connection_string" => "jdbc:derby:memory:testdb;create=true",
"driver_jar_path" => ENV['JDBC_DERBY_JAR'],
# Grumble. Grumble.
# Derby doesn't like 'T' in timestamps as of current writing, so for now
# we'll just use CURRENT_TIMESTAMP as opposed to the event @timestamp
"statement" => [ "insert into log (created_at, message) values(CURRENT_TIMESTAMP, ?)", "message" ]
}
end
context 'rspec setup' do
it 'ensure derby is available' do
j = ENV['JDBC_DERBY_JAR']
expect(j).not_to be_nil, "JDBC_DERBY_JAR not defined, required to run tests"
expect(File.exists?(j)).to eq(true), "JDBC_DERBY_JAR defined, but not valid"
end
end
context 'when initializing' do context 'when initializing' do
it 'shouldn\'t register without a config' do it 'shouldn\'t register without a config' do
expect { expect do
LogStash::Plugin.lookup("output", "jdbc").new() LogStash::Plugin.lookup('output', 'jdbc').new
}.to raise_error(LogStash::ConfigurationError) end.to raise_error(LogStash::ConfigurationError)
end end
it 'shouldn\'t register with a missing jar file' do
derby_settings['driver_jar_path'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end end
it 'shouldn\'t register with a missing jar file' do
derby_settings['connection_string'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end
end
context 'when outputting messages' do
let(:event_fields) do
{ message: 'test-message' }
end
let(:event) { LogStash::Event.new(event_fields) }
let(:plugin) {
# Setup plugin
output = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
output.register
if ENV['JDBC_DEBUG'] == '1'
output.logger.subscribe(STDOUT)
end
# Setup table
c = output.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
stmt.executeUpdate("CREATE table log (created_at timestamp, message varchar(512))")
stmt.close()
c.close()
output
}
it 'should save a event' do
expect { plugin.receive(event) }.to_not raise_error
# Wait for 1 second, for the buffer to flush
sleep 1
c = plugin.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
rs = stmt.executeQuery("select count(*) as total from log")
count = 0
while rs.next()
count = rs.getInt("total")
end
stmt.close()
c.close()
expect(count).to be > 0
end
end
end end

View File

@ -0,0 +1,27 @@
require_relative '../jdbc_spec_helper'
describe 'logstash-output-jdbc: sqlite', if: ENV['JDBC_SQLITE_JAR'] do
JDBC_SQLITE_FILE = '/tmp/logstash_output_jdbc_test.db'.freeze
before(:context) do
File.delete(JDBC_SQLITE_FILE) if File.exist? JDBC_SQLITE_FILE
end
include_context 'rspec setup'
include_context 'when initializing'
include_context 'when outputting messages'
let(:jdbc_jar_env) do
'JDBC_SQLITE_JAR'
end
let(:jdbc_settings) do
{
'driver_class' => 'org.sqlite.JDBC',
'connection_string' => "jdbc:sqlite:#{JDBC_SQLITE_FILE}",
'driver_jar_path' => ENV[jdbc_jar_env],
'statement' => jdbc_statement,
'max_flush_exceptions' => 1
}
end
end