0.3.0.pre - Preparing for threadsafety
This commit is contained in:
		
							parent
							
								
									da5a3d8be3
								
							
						
					
					
						commit
						fa2d226fbf
					
				
							
								
								
									
										7
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										7
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@ -3,3 +3,10 @@ Gemfile.lock
 | 
				
			|||||||
Gemfile.bak
 | 
					Gemfile.bak
 | 
				
			||||||
.bundle
 | 
					.bundle
 | 
				
			||||||
.vagrant
 | 
					.vagrant
 | 
				
			||||||
 | 
					.mvn
 | 
				
			||||||
 | 
					vendor
 | 
				
			||||||
 | 
					lib/**/*.jar
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					.DS_Store
 | 
				
			||||||
 | 
					*.swp
 | 
				
			||||||
 | 
					*.log
 | 
				
			||||||
 | 
				
			|||||||
@ -1,6 +1,12 @@
 | 
				
			|||||||
# Change Log
 | 
					# Change Log
 | 
				
			||||||
All notable changes to this project will be documented in this file, from 0.2.0.
 | 
					All notable changes to this project will be documented in this file, from 0.2.0.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## [0.3.0.pre] - 2016-07-13
 | 
				
			||||||
 | 
					  - Brings tests from v5 branch, providing greater coverage
 | 
				
			||||||
 | 
					  - Removes bulk update support, due to inconsistent behaviour
 | 
				
			||||||
 | 
					  - Plugin now marked as threadsafe, meaning only 1 instance per-Logstash
 | 
				
			||||||
 | 
					    - Raises default max_pool_size to match the default number of works (1 connection per worker)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## [0.2.10] - 2016-07-07
 | 
					## [0.2.10] - 2016-07-07
 | 
				
			||||||
  - Support non-string entries in statement array
 | 
					  - Support non-string entries in statement array
 | 
				
			||||||
  - Adds backtrace to exception logging
 | 
					  - Adds backtrace to exception logging
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										49
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										49
									
								
								README.md
									
									
									
									
									
								
							@ -14,36 +14,26 @@ If you do find this works for a JDBC driver without an example, let me know and
 | 
				
			|||||||
This plugin does not bundle any JDBC jar files, and does expect them to be in a
 | 
					This plugin does not bundle any JDBC jar files, and does expect them to be in a
 | 
				
			||||||
particular location. Please ensure you read the 4 installation lines below.
 | 
					particular location. Please ensure you read the 4 installation lines below.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## ChangeLog
 | 
					## Changelog
 | 
				
			||||||
See CHANGELOG.md
 | 
					See CHANGELOG.md
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## Versions
 | 
					## Versions
 | 
				
			||||||
Released versions are available via rubygems, and typically tagged.
 | 
					Released versions are available via rubygems, and typically tagged.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
For development:
 | 
					For development:
 | 
				
			||||||
  - See master branch for logstash v5
 | 
					  - See master branch for logstash v5 (currently **development only**)
 | 
				
			||||||
  - See v2.x branch for logstash v2
 | 
					  - See v2.x branch for logstash v2
 | 
				
			||||||
  - See v1.5 branch for logstash v1.5 
 | 
					  - See v1.5 branch for logstash v1.5 
 | 
				
			||||||
  - See v1.4 branch for logstash 1.4
 | 
					  - See v1.4 branch for logstash 1.4
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## Installation
 | 
					## Installation
 | 
				
			||||||
  - Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
 | 
					  - Run `bin/logstash-plugin install logstash-output-jdbc` in your logstash installation directory
 | 
				
			||||||
  - Now either:
 | 
					  - Now either:
 | 
				
			||||||
    - Use driver_jar_path in your configuraton to specify a path to your jar file
 | 
					    - Use driver_jar_path in your configuraton to specify a path to your jar file
 | 
				
			||||||
  - Or:
 | 
					  - Or:
 | 
				
			||||||
    - Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
 | 
					    - Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
 | 
				
			||||||
    - Add JDBC jar files to vendor/jar/jdbc in your logstash installation
 | 
					    - Add JDBC jar files to vendor/jar/jdbc in your logstash installation
 | 
				
			||||||
  - And then configure (examples below)
 | 
					  - And then configure (examples can be found in the examples directory)
 | 
				
			||||||
 | 
					 | 
				
			||||||
## Running tests
 | 
					 | 
				
			||||||
At this time tests only run against Derby, in an in-memory database.
 | 
					 | 
				
			||||||
Acceptance tests for individual database engines will be added over time.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
Assuming valid jruby is installed
 | 
					 | 
				
			||||||
  - First time, issue `jruby -S bundle install` to install dependencies
 | 
					 | 
				
			||||||
  - Next, download Derby jar from https://db.apache.org/derby/
 | 
					 | 
				
			||||||
  - Run the tests `JDBC_DERBY_JAR=path/to/derby.jar jruby -S rspec`
 | 
					 | 
				
			||||||
  - Optionally add the `JDBC_DEBUG=1` env variable to add logging to stdout
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
## Configuration options
 | 
					## Configuration options
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -57,13 +47,38 @@ Assuming valid jruby is installed
 | 
				
			|||||||
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
 | 
					| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
 | 
				
			||||||
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes |  |
 | 
					| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes |  |
 | 
				
			||||||
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
 | 
					| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
 | 
				
			||||||
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
 | 
					| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time. Default set to same as Logstash default number of workers | No | 24 |
 | 
				
			||||||
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
 | 
					| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
 | 
				
			||||||
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
 | 
					| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
 | 
				
			||||||
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 |
 | 
					| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 |
 | 
				
			||||||
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
 | 
					| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
 | 
				
			||||||
 | 
					| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
 | 
				
			||||||
 | 
					| retry_sql_states | Array of strings | An array of custom SQL state codes you wish to retry until `max_flush_exceptions`. Useful if you're using a JDBC driver which returns retry-able, but non-standard SQL state codes in it's exceptions. | No | [] |
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## Example configurations
 | 
					## Example configurations
 | 
				
			||||||
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
 | 
					Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
 | 
					If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## Development and Running tests
 | 
				
			||||||
 | 
					For development tests are recommended to run inside a virtual machine (Vagrantfile is included in the repo), as it requires
 | 
				
			||||||
 | 
					access to various database engines and could completely destroy any data in a live system.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					If you have vagrant available (this is temporary whilst I'm hacking on v5 support. I'll make this more streamlined later):
 | 
				
			||||||
 | 
					  - `vagrant up`
 | 
				
			||||||
 | 
					  - `vagrant ssh`
 | 
				
			||||||
 | 
					  - `cd /vagrant`
 | 
				
			||||||
 | 
					  - `gem install bundler`
 | 
				
			||||||
 | 
					  - `cd /vagrant && bundle install && bundle exec rake vendor && bundle exec rake install_jars`
 | 
				
			||||||
 | 
					  - `./scripts/travis-before_script.sh && source ./scripts/travis-variables.sh`
 | 
				
			||||||
 | 
					  - `bundle exec rspec`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## Releasing
 | 
				
			||||||
 | 
					  - Update Changelog
 | 
				
			||||||
 | 
					  - Bump version in gemspec
 | 
				
			||||||
 | 
					  - Commit
 | 
				
			||||||
 | 
					  - Create tag `git tag v<version-number-in-gemspec>`
 | 
				
			||||||
 | 
					  - `bundle exec rake install_jars`
 | 
				
			||||||
 | 
					  - `bundle exec rake pre_release_checks`
 | 
				
			||||||
 | 
					  - `gem build logstash-output-jdbc.gemspec`
 | 
				
			||||||
 | 
					  - `gem push`
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										12
									
								
								Rakefile
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								Rakefile
									
									
									
									
									
								
							@ -1 +1,11 @@
 | 
				
			|||||||
require "logstash/devutils/rake"
 | 
					# encoding: utf-8
 | 
				
			||||||
 | 
					require 'logstash/devutils/rake'
 | 
				
			||||||
 | 
					require 'jars/installer'
 | 
				
			||||||
 | 
					require 'rubygems'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					desc 'Fetch any jars required for this plugin'
 | 
				
			||||||
 | 
					task :install_jars do
 | 
				
			||||||
 | 
					  ENV['JARS_HOME'] = Dir.pwd + '/vendor/jar-dependencies/runtime-jars'
 | 
				
			||||||
 | 
					  ENV['JARS_VENDOR'] = 'false'
 | 
				
			||||||
 | 
					  Jars::Installer.new.vendor_jars!(false)
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
 | 
				
			|||||||
@ -1,17 +0,0 @@
 | 
				
			|||||||
class RingBuffer < Array
 | 
					 | 
				
			||||||
  attr_reader :max_size
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def initialize(max_size, enum = nil)
 | 
					 | 
				
			||||||
    @max_size = max_size
 | 
					 | 
				
			||||||
    enum.each { |e| self << e } if enum
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def <<(el)
 | 
					 | 
				
			||||||
    if self.size < @max_size || @max_size.nil?
 | 
					 | 
				
			||||||
      super
 | 
					 | 
				
			||||||
    else
 | 
					 | 
				
			||||||
      self.shift
 | 
					 | 
				
			||||||
      self.push(el)
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
end
 | 
					 | 
				
			||||||
@ -1,10 +1,10 @@
 | 
				
			|||||||
# encoding: utf-8
 | 
					# encoding: utf-8
 | 
				
			||||||
require "logstash/outputs/base"
 | 
					require 'logstash/outputs/base'
 | 
				
			||||||
require "logstash/namespace"
 | 
					require 'logstash/namespace'
 | 
				
			||||||
require "stud/buffer"
 | 
					require 'concurrent'
 | 
				
			||||||
require "java"
 | 
					require 'stud/interval'
 | 
				
			||||||
require "logstash-output-jdbc_jars"
 | 
					require 'java'
 | 
				
			||||||
require "logstash-output-jdbc_ring-buffer"
 | 
					require 'logstash-output-jdbc_jars'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Write events to a SQL engine, using JDBC.
 | 
					# Write events to a SQL engine, using JDBC.
 | 
				
			||||||
#
 | 
					#
 | 
				
			||||||
@ -12,8 +12,7 @@ require "logstash-output-jdbc_ring-buffer"
 | 
				
			|||||||
# includes correctly crafting the SQL statement, and matching the number of
 | 
					# includes correctly crafting the SQL statement, and matching the number of
 | 
				
			||||||
# parameters correctly.
 | 
					# parameters correctly.
 | 
				
			||||||
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
					class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			||||||
  # Adds buffer support
 | 
					  declare_threadsafe! if self.respond_to?(:declare_threadsafe!)
 | 
				
			||||||
  include Stud::Buffer
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
 | 
					  STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -33,131 +32,100 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
    '58', # System Error
 | 
					    '58', # System Error
 | 
				
			||||||
  ].freeze
 | 
					  ].freeze
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  config_name "jdbc"
 | 
					  config_name 'jdbc'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
 | 
					  # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
 | 
				
			||||||
  config :driver_class, :validate => :string
 | 
					  config :driver_class, validate: :string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Does the JDBC driver support autocommit?
 | 
					  # Does the JDBC driver support autocommit?
 | 
				
			||||||
  config :driver_auto_commit, :validate => :boolean, :default => true, :required => true
 | 
					  config :driver_auto_commit, validate: :boolean, default: true, required: true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Where to find the jar
 | 
					  # Where to find the jar
 | 
				
			||||||
  # Defaults to not required, and to the original behaviour
 | 
					  # Defaults to not required, and to the original behaviour
 | 
				
			||||||
  config :driver_jar_path, :validate => :string, :required => false
 | 
					  config :driver_jar_path, validate: :string, required: false
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # jdbc connection string
 | 
					  # jdbc connection string
 | 
				
			||||||
  config :connection_string, :validate => :string, :required => true
 | 
					  config :connection_string, validate: :string, required: true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # jdbc username - optional, maybe in the connection string
 | 
					  # jdbc username - optional, maybe in the connection string
 | 
				
			||||||
  config :username, :validate => :string, :required => false
 | 
					  config :username, validate: :string, required: false
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # jdbc password - optional, maybe in the connection string
 | 
					  # jdbc password - optional, maybe in the connection string
 | 
				
			||||||
  config :password, :validate => :string, :required => false
 | 
					  config :password, validate: :string, required: false
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # [ "insert into table (message) values(?)", "%{message}" ]
 | 
					  # [ "insert into table (message) values(?)", "%{message}" ]
 | 
				
			||||||
  config :statement, :validate => :array, :required => true
 | 
					  config :statement, validate: :array, required: true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # If this is an unsafe statement, use event.sprintf
 | 
					  # If this is an unsafe statement, use event.sprintf
 | 
				
			||||||
  # This also has potential performance penalties due to having to create a
 | 
					  # This also has potential performance penalties due to having to create a
 | 
				
			||||||
  # new statement for each event, rather than adding to the batch and issuing
 | 
					  # new statement for each event, rather than adding to the batch and issuing
 | 
				
			||||||
  # multiple inserts in 1 go
 | 
					  # multiple inserts in 1 go
 | 
				
			||||||
  config :unsafe_statement, :validate => :boolean, :default => false
 | 
					  config :unsafe_statement, validate: :boolean, default: false
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Number of connections in the pool to maintain
 | 
					  # Number of connections in the pool to maintain
 | 
				
			||||||
  config :max_pool_size, :validate => :number, :default => 5
 | 
					  config :max_pool_size, validate: :number, default: 24
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Connection timeout
 | 
					  # Connection timeout
 | 
				
			||||||
  config :connection_timeout, :validate => :number, :default => 10000
 | 
					  config :connection_timeout, validate: :number, default: 10000
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # We buffer a certain number of events before flushing that out to SQL.
 | 
					  # We buffer a certain number of events before flushing that out to SQL.
 | 
				
			||||||
  # This setting controls how many events will be buffered before sending a
 | 
					  # This setting controls how many events will be buffered before sending a
 | 
				
			||||||
  # batch of events.
 | 
					  # batch of events.
 | 
				
			||||||
  config :flush_size, :validate => :number, :default => 1000
 | 
					  config :flush_size, validate: :number, default: 1000
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # The amount of time since last flush before a flush is forced.
 | 
					  # Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval`
 | 
				
			||||||
  #
 | 
					  config :retry_initial_interval, validate: :number, default: 2
 | 
				
			||||||
  # This setting helps ensure slow event rates don't get stuck in Logstash.
 | 
					 | 
				
			||||||
  # For example, if your `flush_size` is 100, and you have received 10 events,
 | 
					 | 
				
			||||||
  # and it has been more than `idle_flush_time` seconds since the last flush,
 | 
					 | 
				
			||||||
  # Logstash will flush those 10 events automatically.
 | 
					 | 
				
			||||||
  #
 | 
					 | 
				
			||||||
  # This helps keep both fast and slow log streams moving along in
 | 
					 | 
				
			||||||
  # a timely manner.
 | 
					 | 
				
			||||||
  #
 | 
					 | 
				
			||||||
  # If you change this value please ensure that you change
 | 
					 | 
				
			||||||
  # max_flush_exceptions accordingly.
 | 
					 | 
				
			||||||
  config :idle_flush_time, :validate => :number, :default => 1
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
 | 
					  # Maximum time between retries, in seconds
 | 
				
			||||||
 | 
					  config :retry_max_interval, validate: :number, default: 128
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  # Any additional custom, retryable SQL state codes. 
 | 
				
			||||||
 | 
					  # Suitable for configuring retryable custom JDBC SQL state codes.
 | 
				
			||||||
 | 
					  config :retry_sql_states, validate: :array, default: []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  # Maximum number of sequential failed attempts, before we stop retrying.
 | 
				
			||||||
  # If set to < 1, then it will infinitely retry.
 | 
					  # If set to < 1, then it will infinitely retry.
 | 
				
			||||||
  # 
 | 
					  # At the default values this is a little over 10 minutes
 | 
				
			||||||
  # You should carefully tune this in relation to idle_flush_time if your SQL server
 | 
					  config :max_flush_exceptions, validate: :number, default: 10
 | 
				
			||||||
  # is not highly available.
 | 
					 | 
				
			||||||
  # i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
 | 
					 | 
				
			||||||
  # longer than 200 seconds to reboot, then logstash will stop.
 | 
					 | 
				
			||||||
  config :max_flush_exceptions, :validate => :number, :default => 0
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
 | 
					  config :max_repeat_exceptions, obsolete: 'This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation.'
 | 
				
			||||||
  config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
 | 
					  config :max_repeat_exceptions_time, obsolete: 'This is no longer required'
 | 
				
			||||||
 | 
					  config :idle_flush_time, obsolete: 'No longer necessary under Logstash v5'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public
 | 
					 | 
				
			||||||
  def register
 | 
					  def register
 | 
				
			||||||
    @logger.info("JDBC - Starting up")
 | 
					    @logger.info('JDBC - Starting up')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    LogStash::Logger.setup_log4j(@logger)
 | 
				
			||||||
    load_jar_files!
 | 
					    load_jar_files!
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
 | 
					    @stopping = Concurrent::AtomicBoolean.new(false)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (@flush_size > 1000)
 | 
					    @logger.warn('JDBC - Flush size is set to > 1000') if @flush_size > 1000
 | 
				
			||||||
      @logger.warn("JDBC - Flush size is set to > 1000")
 | 
					
 | 
				
			||||||
 | 
					    if @statement.empty?
 | 
				
			||||||
 | 
					      @logger.error('JDBC - No statement provided. Configuration error.')
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if @statement.length < 1
 | 
					    if !@unsafe_statement && @statement.length < 2
 | 
				
			||||||
      @logger.error("JDBC - No statement provided. Configuration error.")
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if (!@unsafe_statement and @statement.length < 2) 
 | 
					 | 
				
			||||||
      @logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
 | 
					      @logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    setup_and_test_pool!
 | 
					    setup_and_test_pool!
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    buffer_initialize(
 | 
					  def multi_receive(events)
 | 
				
			||||||
      :max_items => @flush_size,
 | 
					    events.each_slice(@flush_size) do |slice|
 | 
				
			||||||
      :max_interval => @idle_flush_time,
 | 
					      retrying_submit(slice)
 | 
				
			||||||
      :logger => @logger
 | 
					    end
 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def receive(event)
 | 
					  def receive(event)
 | 
				
			||||||
    return unless output?(event) or event.cancelled?
 | 
					    retrying_submit([event])
 | 
				
			||||||
    return unless @statement.length > 0
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    buffer_receive(event)
 | 
					 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def flush(events, teardown=false)
 | 
					  def close
 | 
				
			||||||
    if @unsafe_statement == true
 | 
					    @stopping.make_true
 | 
				
			||||||
      unsafe_flush(events, teardown)
 | 
					    @pool.close
 | 
				
			||||||
    else
 | 
					 | 
				
			||||||
      safe_flush(events, teardown)
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def on_flush_error(e)
 | 
					 | 
				
			||||||
    return if @max_flush_exceptions < 1
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @exceptions_tracker << e.class
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
 | 
					 | 
				
			||||||
      @logger.error("JDBC - max_flush_exceptions has been reached")
 | 
					 | 
				
			||||||
      raise LogStash::ShutdownSignal.new
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def teardown
 | 
					 | 
				
			||||||
    buffer_flush(:final => true)
 | 
					 | 
				
			||||||
    @pool.close()
 | 
					 | 
				
			||||||
    super
 | 
					    super
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -181,106 +149,104 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
    validate_connection_timeout = (@connection_timeout / 1000) / 2
 | 
					    validate_connection_timeout = (@connection_timeout / 1000) / 2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Test connection
 | 
					    # Test connection
 | 
				
			||||||
    test_connection = @pool.getConnection()
 | 
					    test_connection = @pool.getConnection
 | 
				
			||||||
    unless test_connection.isValid(validate_connection_timeout)
 | 
					    unless test_connection.isValid(validate_connection_timeout)
 | 
				
			||||||
      @logger.error("JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.")
 | 
					      @logger.error('JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.')
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
    test_connection.close()
 | 
					    test_connection.close
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def load_jar_files!
 | 
					  def load_jar_files!
 | 
				
			||||||
    # Load jar from driver path
 | 
					    # Load jar from driver path
 | 
				
			||||||
    unless @driver_jar_path.nil?
 | 
					    unless @driver_jar_path.nil?
 | 
				
			||||||
      raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path
 | 
					      raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path
 | 
				
			||||||
      require @driver_jar_path
 | 
					      require @driver_jar_path
 | 
				
			||||||
      return
 | 
					      return
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Revert original behaviour of loading from vendor directory
 | 
					    # Revert original behaviour of loading from vendor directory
 | 
				
			||||||
    # if no path given
 | 
					    # if no path given
 | 
				
			||||||
    if ENV['LOGSTASH_HOME']
 | 
					    jarpath = if ENV['LOGSTASH_HOME']
 | 
				
			||||||
      jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
 | 
					                File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar')
 | 
				
			||||||
    else
 | 
					              else
 | 
				
			||||||
      jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
 | 
					                File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar')
 | 
				
			||||||
    end
 | 
					              end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @logger.debug("JDBC - jarpath", path: jarpath)
 | 
					    @logger.debug('JDBC - jarpath', path: jarpath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    jars = Dir[jarpath]
 | 
					    jars = Dir[jarpath]
 | 
				
			||||||
    raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty?
 | 
					    raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    jars.each do |jar|
 | 
					    jars.each do |jar|
 | 
				
			||||||
      @logger.debug("JDBC - Loaded jar", :jar => jar)
 | 
					      @logger.debug('JDBC - Loaded jar', jar: jar)
 | 
				
			||||||
      require jar
 | 
					      require jar
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def safe_flush(events, teardown=false)
 | 
					  def submit(events)
 | 
				
			||||||
    connection = nil
 | 
					    connection = nil
 | 
				
			||||||
    statement = nil
 | 
					    statement = nil
 | 
				
			||||||
 | 
					    events_to_retry = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    begin
 | 
					    begin
 | 
				
			||||||
      connection = @pool.getConnection()
 | 
					      connection = @pool.getConnection
 | 
				
			||||||
    rescue => e
 | 
					    rescue => e
 | 
				
			||||||
      log_jdbc_exception(e, true)
 | 
					      log_jdbc_exception(e, true)
 | 
				
			||||||
      raise
 | 
					      # If a connection is not available, then the server has gone away
 | 
				
			||||||
 | 
					      # We're not counting that towards our retry count.
 | 
				
			||||||
 | 
					      return events, false
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    begin
 | 
					    events.each do |event|
 | 
				
			||||||
      statement = connection.prepareStatement(@statement[0])
 | 
					      begin
 | 
				
			||||||
 | 
					        statement = connection.prepareStatement(
 | 
				
			||||||
      events.each do |event|
 | 
					          (@unsafe_statement == true) ? event.sprintf(@statement[0]) : @statement[0]
 | 
				
			||||||
        next if event.cancelled?
 | 
					        )
 | 
				
			||||||
        next if @statement.length < 2
 | 
					        statement = add_statement_event_params(statement, event) if @statement.length > 1
 | 
				
			||||||
        statement = add_statement_event_params(statement, event)
 | 
					        statement.execute
 | 
				
			||||||
 | 
					      rescue => e
 | 
				
			||||||
        statement.addBatch()
 | 
					        if retry_exception?(e)
 | 
				
			||||||
 | 
					          events_to_retry.push(event)
 | 
				
			||||||
 | 
					        end
 | 
				
			||||||
 | 
					      ensure
 | 
				
			||||||
 | 
					        statement.close unless statement.nil?
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
 | 
					 | 
				
			||||||
      statement.executeBatch()
 | 
					 | 
				
			||||||
      statement.close()
 | 
					 | 
				
			||||||
      @exceptions_tracker << nil
 | 
					 | 
				
			||||||
    rescue => e
 | 
					 | 
				
			||||||
      if retry_exception?(e)
 | 
					 | 
				
			||||||
        raise
 | 
					 | 
				
			||||||
      end
 | 
					 | 
				
			||||||
    ensure
 | 
					 | 
				
			||||||
      statement.close() unless statement.nil?
 | 
					 | 
				
			||||||
      connection.close() unless connection.nil?
 | 
					 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    connection.close unless connection.nil?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return events_to_retry, true
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def unsafe_flush(events, teardown=false)
 | 
					  def retrying_submit(actions)
 | 
				
			||||||
    connection = nil
 | 
					    # Initially we submit the full list of actions
 | 
				
			||||||
    statement = nil
 | 
					    submit_actions = actions
 | 
				
			||||||
    begin
 | 
					    count_as_attempt = true
 | 
				
			||||||
      connection = @pool.getConnection()
 | 
					 | 
				
			||||||
    rescue => e
 | 
					 | 
				
			||||||
      log_jdbc_exception(e, true)
 | 
					 | 
				
			||||||
      raise
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    begin
 | 
					    attempts = 1
 | 
				
			||||||
      events.each do |event|
 | 
					 | 
				
			||||||
        next if event.cancelled?
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        statement = connection.prepareStatement(event.sprintf(@statement[0]))
 | 
					    sleep_interval = @retry_initial_interval
 | 
				
			||||||
        statement = add_statement_event_params(statement, event) if @statement.length > 1
 | 
					    while @stopping.false? and (submit_actions and !submit_actions.empty?)
 | 
				
			||||||
 | 
					      return if !submit_actions || submit_actions.empty? # If everything's a success we move along
 | 
				
			||||||
 | 
					      # We retry whatever didn't succeed
 | 
				
			||||||
 | 
					      submit_actions, count_as_attempt = submit(submit_actions)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        statement.execute()
 | 
					      # Everything was a success!
 | 
				
			||||||
 | 
					      break if !submit_actions || submit_actions.empty?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # cancel the event, since we may end up outputting the same event multiple times
 | 
					      if @max_flush_exceptions > 0 and count_as_attempt == true
 | 
				
			||||||
        # if an exception happens later down the line
 | 
					        attempts += 1
 | 
				
			||||||
        event.cancel
 | 
					
 | 
				
			||||||
        @exceptions_tracker << nil
 | 
					        if attempts > @max_flush_exceptions
 | 
				
			||||||
 | 
					          @logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.")
 | 
				
			||||||
 | 
					          break
 | 
				
			||||||
 | 
					        end
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    rescue => e
 | 
					
 | 
				
			||||||
      if retry_exception?(e)
 | 
					      # If we're retrying the action sleep for the recommended interval
 | 
				
			||||||
        raise
 | 
					      # Double the interval for the next time through to achieve exponential backoff
 | 
				
			||||||
      end
 | 
					      Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
 | 
				
			||||||
    ensure
 | 
					      sleep_interval = next_sleep_interval(sleep_interval)
 | 
				
			||||||
      statement.close() unless statement.nil?
 | 
					 | 
				
			||||||
      connection.close() unless connection.nil?
 | 
					 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -324,6 +290,12 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
    statement
 | 
					    statement
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def retry_exception?(exception)
 | 
				
			||||||
 | 
					    retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState)))
 | 
				
			||||||
 | 
					    log_jdbc_exception(exception, retrying)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    retrying
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def log_jdbc_exception(exception, retrying)
 | 
					  def log_jdbc_exception(exception, retrying)
 | 
				
			||||||
    current_exception = exception
 | 
					    current_exception = exception
 | 
				
			||||||
@ -343,10 +315,8 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
 | 
				
			|||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def retry_exception?(exception)
 | 
					  def next_sleep_interval(current_interval)
 | 
				
			||||||
    retrying = (exception.respond_to? 'getSQLState' and RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]))
 | 
					    doubled = current_interval * 2
 | 
				
			||||||
    log_jdbc_exception(exception, retrying)
 | 
					    doubled > @retry_max_interval ? @retry_max_interval : doubled
 | 
				
			||||||
 | 
					 | 
				
			||||||
    retrying
 | 
					 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
end # class LogStash::Outputs::jdbc
 | 
					end # class LogStash::Outputs::jdbc
 | 
				
			||||||
 | 
				
			|||||||
@ -1,6 +1,6 @@
 | 
				
			|||||||
Gem::Specification.new do |s|
 | 
					Gem::Specification.new do |s|
 | 
				
			||||||
  s.name = 'logstash-output-jdbc'
 | 
					  s.name = 'logstash-output-jdbc'
 | 
				
			||||||
  s.version = "0.2.10"
 | 
					  s.version = "0.3.0.pre"
 | 
				
			||||||
  s.licenses = [ "Apache License (2.0)" ]
 | 
					  s.licenses = [ "Apache License (2.0)" ]
 | 
				
			||||||
  s.summary = "This plugin allows you to output to SQL, via JDBC"
 | 
					  s.summary = "This plugin allows you to output to SQL, via JDBC"
 | 
				
			||||||
  s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
 | 
					  s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
 | 
				
			||||||
@ -9,8 +9,11 @@ Gem::Specification.new do |s|
 | 
				
			|||||||
  s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
 | 
					  s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
 | 
				
			||||||
  s.require_paths = [ "lib" ]
 | 
					  s.require_paths = [ "lib" ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  # Java only
 | 
				
			||||||
 | 
					  s.platform = 'java'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Files
 | 
					  # Files
 | 
				
			||||||
  s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
 | 
					  s.files = Dir.glob('{lib,spec}/**/*.rb') + Dir.glob('vendor/**/*') + %w(LICENSE.txt README.md)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   # Tests
 | 
					   # Tests
 | 
				
			||||||
  s.test_files = s.files.grep(%r{^(test|spec|features)/})
 | 
					  s.test_files = s.files.grep(%r{^(test|spec|features)/})
 | 
				
			||||||
@ -19,11 +22,17 @@ Gem::Specification.new do |s|
 | 
				
			|||||||
  s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
 | 
					  s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Gem dependencies
 | 
					  # Gem dependencies
 | 
				
			||||||
 | 
					  s.add_runtime_dependency 'logstash-core-plugin-api', '~> 1.0'
 | 
				
			||||||
  s.add_runtime_dependency 'stud'
 | 
					  s.add_runtime_dependency 'stud'
 | 
				
			||||||
 | 
					  s.add_runtime_dependency 'logstash-codec-plain'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
 | 
					  s.requirements << "jar 'com.zaxxer:HikariCP', '2.4.2'"
 | 
				
			||||||
  s.add_runtime_dependency "logstash-codec-plain"
 | 
					  s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.21'"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # https://github.com/elastic/logstash-devutils/issues/48
 | 
					  s.add_development_dependency 'jar-dependencies'
 | 
				
			||||||
  s.add_development_dependency "logstash-devutils", '0.0.18'
 | 
					  s.add_development_dependency 'ruby-maven', '~> 3.3'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  s.add_development_dependency 'logstash-devutils'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  s.add_development_dependency 'rubocop'
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										135
									
								
								spec/jdbc_spec_helper.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										135
									
								
								spec/jdbc_spec_helper.rb
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,135 @@
 | 
				
			|||||||
 | 
					require 'logstash/devutils/rspec/spec_helper'
 | 
				
			||||||
 | 
					require 'logstash/outputs/jdbc'
 | 
				
			||||||
 | 
					require 'stud/temporary'
 | 
				
			||||||
 | 
					require 'java'
 | 
				
			||||||
 | 
					require 'securerandom'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					RSpec.shared_context 'rspec setup' do
 | 
				
			||||||
 | 
					  it 'ensure jar is available' do
 | 
				
			||||||
 | 
					    expect(ENV[jdbc_jar_env]).not_to be_nil, "#{jdbc_jar_env} not defined, required to run tests"
 | 
				
			||||||
 | 
					    expect(File.exist?(ENV[jdbc_jar_env])).to eq(true), "#{jdbc_jar_env} defined, but not valid"
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					RSpec.shared_context 'when initializing' do
 | 
				
			||||||
 | 
					  it 'shouldn\'t register with a missing jar file' do
 | 
				
			||||||
 | 
					    jdbc_settings['driver_jar_path'] = nil
 | 
				
			||||||
 | 
					    plugin = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
 | 
				
			||||||
 | 
					    expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					RSpec.shared_context 'when outputting messages' do
 | 
				
			||||||
 | 
					  let(:logger) { double("logger") }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_test_table) do
 | 
				
			||||||
 | 
					    'logstash_output_jdbc_test'
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_drop_table) do
 | 
				
			||||||
 | 
					    "DROP TABLE #{jdbc_test_table}"
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_create_table) do
 | 
				
			||||||
 | 
					    "CREATE table #{jdbc_test_table} (created_at datetime not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit bit not null)"
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_statement) do
 | 
				
			||||||
 | 
					    ["insert into #{jdbc_test_table} (created_at, message, message_sprintf, static_int, static_bit) values(?, ?, ?, ?, ?)", '@timestamp', 'message', 'sprintf-%{message}', 1, true]
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:systemd_database_service) do
 | 
				
			||||||
 | 
					    nil
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:event_fields) do
 | 
				
			||||||
 | 
					    { 'message' => "test-message #{SecureRandom.uuid}" }
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:event) { LogStash::Event.new(event_fields) }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:plugin) do
 | 
				
			||||||
 | 
					    # Setup plugin
 | 
				
			||||||
 | 
					    output = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
 | 
				
			||||||
 | 
					    output.register
 | 
				
			||||||
 | 
					    output.logger = logger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Setup table
 | 
				
			||||||
 | 
					    c = output.instance_variable_get(:@pool).getConnection
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Derby doesn't support IF EXISTS. 
 | 
				
			||||||
 | 
					    # Seems like the quickest solution. Bleurgh.
 | 
				
			||||||
 | 
					    begin
 | 
				
			||||||
 | 
					      stmt = c.createStatement
 | 
				
			||||||
 | 
					      stmt.executeUpdate(jdbc_drop_table)
 | 
				
			||||||
 | 
					    rescue
 | 
				
			||||||
 | 
					      # noop
 | 
				
			||||||
 | 
					    ensure
 | 
				
			||||||
 | 
					      stmt.close
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      stmt = c.createStatement
 | 
				
			||||||
 | 
					      stmt.executeUpdate(jdbc_create_table)
 | 
				
			||||||
 | 
					      stmt.close
 | 
				
			||||||
 | 
					      c.close
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    output
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  it 'should save a event' do
 | 
				
			||||||
 | 
					    expect { plugin.multi_receive([event]) }.to_not raise_error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Verify the number of items in the output table
 | 
				
			||||||
 | 
					    c = plugin.instance_variable_get(:@pool).getConnection
 | 
				
			||||||
 | 
					    stmt = c.prepareStatement("select count(*) as total from #{jdbc_test_table} where message = ?")
 | 
				
			||||||
 | 
					    stmt.setString(1, event['message'])
 | 
				
			||||||
 | 
					    rs = stmt.executeQuery
 | 
				
			||||||
 | 
					    count = 0
 | 
				
			||||||
 | 
					    count = rs.getInt('total') while rs.next
 | 
				
			||||||
 | 
					    stmt.close
 | 
				
			||||||
 | 
					    c.close
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    expect(count).to eq(1)
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  it 'should not save event, and log an unretryable exception' do
 | 
				
			||||||
 | 
					    e = LogStash::Event.new({})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    expect(logger).to receive(:error).once.with(/JDBC - Exception. Not retrying/, Hash)
 | 
				
			||||||
 | 
					    expect { plugin.multi_receive([e]) }.to_not raise_error
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  it 'it should retry after a connection loss, and log a warning' do
 | 
				
			||||||
 | 
					    skip "does not run as a service" if systemd_database_service.nil?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    p = plugin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Check that everything is fine right now
 | 
				
			||||||
 | 
					    expect { p.multi_receive([event]) }.not_to raise_error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Start a thread to stop and restart the service.
 | 
				
			||||||
 | 
					    t = Thread.new(systemd_database_service) { |systemd_database_service|
 | 
				
			||||||
 | 
					      start_stop_cmd = 'sudo /etc/init.d/%<service>s* %<action>s'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      `which systemctl`
 | 
				
			||||||
 | 
					      if $?.success?
 | 
				
			||||||
 | 
					        start_stop_cmd = 'sudo systemctl %<action>s %<service>s'
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      cmd = start_stop_cmd % { action: 'stop', service: systemd_database_service }
 | 
				
			||||||
 | 
					      `#{cmd}`
 | 
				
			||||||
 | 
					      sleep 10
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      cmd = start_stop_cmd % { action: 'start', service: systemd_database_service }
 | 
				
			||||||
 | 
					      `#{cmd}`
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Wait a few seconds to the service to stop
 | 
				
			||||||
 | 
					    sleep 5
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    expect(logger).to receive(:warn).at_least(:once).with(/JDBC - Exception. Retrying/, Hash)
 | 
				
			||||||
 | 
					    expect { p.multi_receive([event]) }.to_not raise_error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    t.join
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
							
								
								
									
										25
									
								
								spec/outputs/jdbc_derby_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								spec/outputs/jdbc_derby_spec.rb
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,25 @@
 | 
				
			|||||||
 | 
					require_relative '../jdbc_spec_helper'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					describe 'logstash-output-jdbc: derby', if: ENV['JDBC_DERBY_JAR'] do
 | 
				
			||||||
 | 
					  include_context 'rspec setup'
 | 
				
			||||||
 | 
					  include_context 'when initializing'
 | 
				
			||||||
 | 
					  include_context 'when outputting messages'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_jar_env) do
 | 
				
			||||||
 | 
					    'JDBC_DERBY_JAR'
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_create_table) do
 | 
				
			||||||
 | 
					    "CREATE table #{jdbc_test_table} (created_at timestamp not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit boolean not null)"
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					 
 | 
				
			||||||
 | 
					  let(:jdbc_settings) do
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      'driver_class' => 'org.apache.derby.jdbc.EmbeddedDriver',
 | 
				
			||||||
 | 
					      'connection_string' => 'jdbc:derby:memory:testdb;create=true',
 | 
				
			||||||
 | 
					      'driver_jar_path' => ENV[jdbc_jar_env],
 | 
				
			||||||
 | 
					      'statement' => jdbc_statement,
 | 
				
			||||||
 | 
					      'max_flush_exceptions' => 1
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
							
								
								
									
										25
									
								
								spec/outputs/jdbc_mysql_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								spec/outputs/jdbc_mysql_spec.rb
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,25 @@
 | 
				
			|||||||
 | 
					require_relative '../jdbc_spec_helper'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					describe 'logstash-output-jdbc: mysql', if: ENV['JDBC_MYSQL_JAR'] do
 | 
				
			||||||
 | 
					  include_context 'rspec setup'
 | 
				
			||||||
 | 
					  include_context 'when initializing'
 | 
				
			||||||
 | 
					  include_context 'when outputting messages'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_jar_env) do
 | 
				
			||||||
 | 
					    'JDBC_MYSQL_JAR'
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:systemd_database_service) do
 | 
				
			||||||
 | 
					    'mysql'
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_settings) do
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      'driver_class' => 'com.mysql.jdbc.Driver',
 | 
				
			||||||
 | 
					      'connection_string' => 'jdbc:mysql://localhost/logstash_output_jdbc_test?user=root',
 | 
				
			||||||
 | 
					      'driver_jar_path' => ENV[jdbc_jar_env],
 | 
				
			||||||
 | 
					      'statement' => jdbc_statement,
 | 
				
			||||||
 | 
					      'max_flush_exceptions' => 1
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
@ -1,94 +1,11 @@
 | 
				
			|||||||
require "logstash/devutils/rspec/spec_helper"
 | 
					require_relative '../jdbc_spec_helper'
 | 
				
			||||||
require "logstash/outputs/jdbc"
 | 
					 | 
				
			||||||
require "stud/temporary"
 | 
					 | 
				
			||||||
require "java"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
describe LogStash::Outputs::Jdbc do
 | 
					describe LogStash::Outputs::Jdbc do
 | 
				
			||||||
 | 
					 | 
				
			||||||
  let(:derby_settings) do
 | 
					 | 
				
			||||||
    { 
 | 
					 | 
				
			||||||
      "driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
 | 
					 | 
				
			||||||
      "connection_string" => "jdbc:derby:memory:testdb;create=true",
 | 
					 | 
				
			||||||
      "driver_jar_path" => ENV['JDBC_DERBY_JAR'],
 | 
					 | 
				
			||||||
      "statement" => [ "insert into log (created_at, message) values(?, ?)", "@timestamp" "message" ]
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  context 'rspec setup' do
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    it 'ensure derby is available' do
 | 
					 | 
				
			||||||
      j = ENV['JDBC_DERBY_JAR']
 | 
					 | 
				
			||||||
      expect(j).not_to be_nil, "JDBC_DERBY_JAR not defined, required to run tests"
 | 
					 | 
				
			||||||
      expect(File.exists?(j)).to eq(true), "JDBC_DERBY_JAR defined, but not valid"
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
    
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  context 'when initializing' do
 | 
					  context 'when initializing' do
 | 
				
			||||||
 | 
					 | 
				
			||||||
    it 'shouldn\'t register without a config' do
 | 
					    it 'shouldn\'t register without a config' do
 | 
				
			||||||
      expect { 
 | 
					      expect do
 | 
				
			||||||
        LogStash::Plugin.lookup("output", "jdbc").new()
 | 
					        LogStash::Plugin.lookup('output', 'jdbc').new
 | 
				
			||||||
      }.to raise_error(LogStash::ConfigurationError)
 | 
					      end.to raise_error(LogStash::ConfigurationError)
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					 | 
				
			||||||
    it 'shouldn\'t register with a missing jar file' do
 | 
					 | 
				
			||||||
      derby_settings['driver_jar_path'] = nil
 | 
					 | 
				
			||||||
      plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
 | 
					 | 
				
			||||||
      expect { plugin.register }.to raise_error
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    it 'shouldn\'t register with a missing jar file' do
 | 
					 | 
				
			||||||
      derby_settings['connection_string'] = nil
 | 
					 | 
				
			||||||
      plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
 | 
					 | 
				
			||||||
      expect { plugin.register }.to raise_error
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					 | 
				
			||||||
  context 'when outputting messages' do
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let(:event_fields) do
 | 
					 | 
				
			||||||
      { message: 'test-message' }
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
    let(:event) { LogStash::Event.new(event_fields) }
 | 
					 | 
				
			||||||
    let(:plugin) {
 | 
					 | 
				
			||||||
      # Setup plugin
 | 
					 | 
				
			||||||
      output = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
 | 
					 | 
				
			||||||
      output.register
 | 
					 | 
				
			||||||
      if ENV['JDBC_DEBUG'] == '1'
 | 
					 | 
				
			||||||
        output.logger.subscribe(STDOUT)
 | 
					 | 
				
			||||||
      end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      # Setup table
 | 
					 | 
				
			||||||
      c = output.instance_variable_get(:@pool).getConnection()
 | 
					 | 
				
			||||||
      stmt = c.createStatement()
 | 
					 | 
				
			||||||
      stmt.executeUpdate("CREATE table log (created_at timestamp, message varchar(512))")
 | 
					 | 
				
			||||||
      stmt.close()
 | 
					 | 
				
			||||||
      c.close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      output
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    it 'should save a event' do
 | 
					 | 
				
			||||||
      expect { plugin.receive(event) }.to_not raise_error
 | 
					 | 
				
			||||||
      
 | 
					 | 
				
			||||||
      # Wait for 1 second, for the buffer to flush
 | 
					 | 
				
			||||||
      sleep 1
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      c = plugin.instance_variable_get(:@pool).getConnection()
 | 
					 | 
				
			||||||
      stmt = c.createStatement()
 | 
					 | 
				
			||||||
      rs = stmt.executeQuery("select count(*) as total from log")
 | 
					 | 
				
			||||||
      count = 0
 | 
					 | 
				
			||||||
      while rs.next()
 | 
					 | 
				
			||||||
        count = rs.getInt("total")
 | 
					 | 
				
			||||||
      end
 | 
					 | 
				
			||||||
      stmt.close()
 | 
					 | 
				
			||||||
      c.close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      expect(count).to be > 0
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										27
									
								
								spec/outputs/jdbc_sqlite_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								spec/outputs/jdbc_sqlite_spec.rb
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,27 @@
 | 
				
			|||||||
 | 
					require_relative '../jdbc_spec_helper'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					describe 'logstash-output-jdbc: sqlite', if: ENV['JDBC_SQLITE_JAR'] do
 | 
				
			||||||
 | 
					  JDBC_SQLITE_FILE = '/tmp/logstash_output_jdbc_test.db'.freeze
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  before(:context) do
 | 
				
			||||||
 | 
					    File.delete(JDBC_SQLITE_FILE) if File.exist? JDBC_SQLITE_FILE
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  include_context 'rspec setup'
 | 
				
			||||||
 | 
					  include_context 'when initializing'
 | 
				
			||||||
 | 
					  include_context 'when outputting messages'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_jar_env) do
 | 
				
			||||||
 | 
					    'JDBC_SQLITE_JAR'
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:jdbc_settings) do
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      'driver_class' => 'org.sqlite.JDBC',
 | 
				
			||||||
 | 
					      'connection_string' => "jdbc:sqlite:#{JDBC_SQLITE_FILE}",
 | 
				
			||||||
 | 
					      'driver_jar_path' => ENV[jdbc_jar_env],
 | 
				
			||||||
 | 
					      'statement' => jdbc_statement,
 | 
				
			||||||
 | 
					      'max_flush_exceptions' => 1
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
										
											Binary file not shown.
										
									
								
							
										
											Binary file not shown.
										
									
								
							
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user