Compare commits
89 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
eacd2a2c38 | ||
|
309768c893 | ||
|
aba4e08bf5 | ||
|
e91db61e8c | ||
|
5f2a99c4a6 | ||
|
8e73958359 | ||
|
f48c12a8da | ||
|
7a2996e985 | ||
|
4c04b8c24d | ||
|
fe982c95aa | ||
|
d1a733d195 | ||
|
21217f7b03 | ||
|
2bdb75f1b7 | ||
|
6b5398b152 | ||
|
daebe44f32 | ||
|
079c3a6c78 | ||
|
3804eb59d2 | ||
|
ef6ed66cdd | ||
|
147cd3d67b | ||
|
6affac0a0c | ||
|
508c769650 | ||
|
d36d659e16 | ||
|
51a04faca3 | ||
|
cdd88fe322 | ||
|
e74d67b477 | ||
|
710791c3aa | ||
|
ccb30c7edd | ||
|
6bae1d81e3 | ||
|
667e066d74 | ||
|
8e15bc5f45 | ||
|
43142287de | ||
|
2a6f048fa0 | ||
|
318c1bd86a | ||
|
3085606eb7 | ||
|
f0d88a237f | ||
|
ca1c71ea68 | ||
|
cb4cefdfad | ||
|
44e1947f31 | ||
|
64a6bcfd55 | ||
|
238ef153e4 | ||
|
3bdd8ef3a8 | ||
|
9164605aae | ||
|
d2f99b05d2 | ||
|
c110bdd551 | ||
|
b3a6de6340 | ||
|
37631a62b7 | ||
|
76e0f439a0 | ||
|
43eb5d969d | ||
|
0f37792177 | ||
|
0e2e883cd1 | ||
|
34708157f4 | ||
|
6c852d21dc | ||
|
53eaee001d | ||
|
fe131f750e | ||
|
f04e00019b | ||
|
867fd37805 | ||
|
b3e8d1a0f8 | ||
|
25d14f2624 | ||
|
ab566ee969 | ||
|
7d699e400c | ||
|
542003e4e5 | ||
|
290aa63d2d | ||
|
a61dd21046 | ||
|
4a573cb599 | ||
|
80560f6692 | ||
|
2c00c5d016 | ||
|
a26b6106d4 | ||
|
d6869f594c | ||
|
5ec985b0df | ||
|
5a1fdb7c7f | ||
|
b14d61ccf0 | ||
|
baaeba3c07 | ||
|
d362e791e5 | ||
|
5f0f897114 | ||
|
721e128f29 | ||
|
fe2e23ac27 | ||
|
85b3f31051 | ||
|
e32b6e9bbd | ||
|
f1202f6454 | ||
|
26a32079f1 | ||
|
8d27e0f90d | ||
|
df811f3d29 | ||
|
d056093ab8 | ||
|
e83af287f0 | ||
|
0ff6f16ec7 | ||
|
e6e9ac3b04 | ||
|
707c005979 | ||
|
8f5ceb451a | ||
|
e6537d053f |
26
.github/ISSUE_TEMPLATE.md
vendored
Normal file
26
.github/ISSUE_TEMPLATE.md
vendored
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
<!--
|
||||||
|
|
||||||
|
Trouble installing the plugin under Logstash 2.4.0 with the message "duplicate gems"? See https://github.com/elastic/logstash/issues/5852
|
||||||
|
|
||||||
|
Please remember:
|
||||||
|
- I have not used every database engine in the world
|
||||||
|
- I have not got access to every database engine in the world
|
||||||
|
- Any support I provide is done in my own personal time which is limited
|
||||||
|
- Understand that I won't always have the answer immediately
|
||||||
|
|
||||||
|
Please provide as much information as possible.
|
||||||
|
|
||||||
|
-->
|
||||||
|
|
||||||
|
<!--- Provide a general summary of the issue in the Title above -->
|
||||||
|
|
||||||
|
## Expected & Actual Behavior
|
||||||
|
<!--- If you're describing a bug, tell us what should happen, and what is actually happening, and if necessary how to reproduce it -->
|
||||||
|
<!--- If you're suggesting a change/improvement, tell us how it should work -->
|
||||||
|
|
||||||
|
## Your Environment
|
||||||
|
<!--- Include as many relevant details about the environment you experienced the bug in -->
|
||||||
|
* Version of plugin used:
|
||||||
|
* Version of Logstash used:
|
||||||
|
* Database engine & version you're connecting to:
|
||||||
|
* Have you checked you've met the Logstash requirements for Java versions?:
|
8
.gitignore
vendored
8
.gitignore
vendored
|
@ -2,3 +2,11 @@
|
||||||
Gemfile.lock
|
Gemfile.lock
|
||||||
Gemfile.bak
|
Gemfile.bak
|
||||||
.bundle
|
.bundle
|
||||||
|
.vagrant
|
||||||
|
.mvn
|
||||||
|
vendor
|
||||||
|
lib/**/*.jar
|
||||||
|
|
||||||
|
.DS_Store
|
||||||
|
*.swp
|
||||||
|
*.log
|
||||||
|
|
25
.rubocop.yml
Normal file
25
.rubocop.yml
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
# I don't care for underscores in numbers.
|
||||||
|
Style/NumericLiterals:
|
||||||
|
Enabled: false
|
||||||
|
|
||||||
|
Style/ClassAndModuleChildren:
|
||||||
|
Enabled: false
|
||||||
|
|
||||||
|
Metrics/AbcSize:
|
||||||
|
Enabled: false
|
||||||
|
|
||||||
|
Metrics/CyclomaticComplexity:
|
||||||
|
Max: 9
|
||||||
|
|
||||||
|
Metrics/PerceivedComplexity:
|
||||||
|
Max: 10
|
||||||
|
|
||||||
|
Metrics/LineLength:
|
||||||
|
Enabled: false
|
||||||
|
|
||||||
|
Metrics/MethodLength:
|
||||||
|
Max: 50
|
||||||
|
|
||||||
|
Style/FileName:
|
||||||
|
Exclude:
|
||||||
|
- 'lib/logstash-output-jdbc_jars.rb'
|
11
.travis.yml
11
.travis.yml
|
@ -1,8 +1,13 @@
|
||||||
|
sudo: required
|
||||||
language: ruby
|
language: ruby
|
||||||
cache: bundler
|
cache: bundler
|
||||||
rvm:
|
rvm:
|
||||||
- jruby
|
- jruby-1.7.25
|
||||||
|
jdk:
|
||||||
|
- oraclejdk8
|
||||||
before_script:
|
before_script:
|
||||||
- wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
|
- bundle exec rake vendor
|
||||||
- export JDBC_DERBY_JAR=/tmp/derby.jar
|
- bundle exec rake install_jars
|
||||||
|
- ./scripts/travis-before_script.sh
|
||||||
|
- source ./scripts/travis-variables.sh
|
||||||
script: bundle exec rspec
|
script: bundle exec rspec
|
||||||
|
|
40
CHANGELOG.md
40
CHANGELOG.md
|
@ -1,6 +1,46 @@
|
||||||
# Change Log
|
# Change Log
|
||||||
All notable changes to this project will be documented in this file, from 0.2.0.
|
All notable changes to this project will be documented in this file, from 0.2.0.
|
||||||
|
|
||||||
|
## [5.3.0] - 2017-11-08
|
||||||
|
- Adds configuration options `enable_event_as_json_keyword` and `event_as_json_keyword`
|
||||||
|
- Adds BigDecimal support
|
||||||
|
- Adds additional logging for debugging purposes (with thanks to @mlkmhd's work)
|
||||||
|
|
||||||
|
## [5.2.1] - 2017-04-09
|
||||||
|
- Adds Array and Hash to_json support for non-sprintf syntax
|
||||||
|
|
||||||
|
## [5.2.0] - 2017-04-01
|
||||||
|
- Upgrades HikariCP to latest
|
||||||
|
- Fixes HikariCP logging integration issues
|
||||||
|
|
||||||
|
## [5.1.0] - 2016-12-17
|
||||||
|
- phoenix-thin fixes for issue #60
|
||||||
|
|
||||||
|
## [5.0.0] - 2016-11-03
|
||||||
|
- logstash v5 support
|
||||||
|
|
||||||
|
## [0.3.1] - 2016-08-28
|
||||||
|
- Adds connection_test configuration option, to prevent the connection test from occuring, allowing the error to be suppressed.
|
||||||
|
Useful for cockroachdb deployments. https://github.com/theangryangel/logstash-output-jdbc/issues/53
|
||||||
|
|
||||||
|
## [0.3.0] - 2016-07-24
|
||||||
|
- Brings tests from v5 branch, providing greater coverage
|
||||||
|
- Removes bulk update support, due to inconsistent behaviour
|
||||||
|
- Plugin now marked as threadsafe, meaning only 1 instance per-Logstash
|
||||||
|
- Raises default max_pool_size to match the default number of workers (1 connection per worker)
|
||||||
|
|
||||||
|
## [0.2.10] - 2016-07-07
|
||||||
|
- Support non-string entries in statement array
|
||||||
|
- Adds backtrace to exception logging
|
||||||
|
|
||||||
|
## [0.2.9] - 2016-06-29
|
||||||
|
- Fix NameError exception.
|
||||||
|
- Moved log_jdbc_exception calls
|
||||||
|
|
||||||
|
## [0.2.7] - 2016-05-29
|
||||||
|
- Backport retry exception logic from v5 branch
|
||||||
|
- Backport improved timestamp compatibility from v5 branch
|
||||||
|
|
||||||
## [0.2.6] - 2016-05-02
|
## [0.2.6] - 2016-05-02
|
||||||
- Fix for exception infinite loop
|
- Fix for exception infinite loop
|
||||||
|
|
||||||
|
|
51
README.md
51
README.md
|
@ -14,7 +14,7 @@ If you do find this works for a JDBC driver without an example, let me know and
|
||||||
This plugin does not bundle any JDBC jar files, and does expect them to be in a
|
This plugin does not bundle any JDBC jar files, and does expect them to be in a
|
||||||
particular location. Please ensure you read the 4 installation lines below.
|
particular location. Please ensure you read the 4 installation lines below.
|
||||||
|
|
||||||
## ChangeLog
|
## Changelog
|
||||||
See CHANGELOG.md
|
See CHANGELOG.md
|
||||||
|
|
||||||
## Versions
|
## Versions
|
||||||
|
@ -27,23 +27,13 @@ For development:
|
||||||
- See v1.4 branch for logstash 1.4
|
- See v1.4 branch for logstash 1.4
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
|
- Run `bin/logstash-plugin install logstash-output-jdbc` in your logstash installation directory
|
||||||
- Now either:
|
- Now either:
|
||||||
- Use driver_jar_path in your configuraton to specify a path to your jar file
|
- Use driver_jar_path in your configuraton to specify a path to your jar file
|
||||||
- Or:
|
- Or:
|
||||||
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
|
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
|
||||||
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
|
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
|
||||||
- And then configure (examples below)
|
- And then configure (examples can be found in the examples directory)
|
||||||
|
|
||||||
## Running tests
|
|
||||||
At this time tests only run against Derby, in an in-memory database.
|
|
||||||
Acceptance tests for individual database engines will be added over time.
|
|
||||||
|
|
||||||
Assuming valid jruby is installed
|
|
||||||
- First time, issue `jruby -S bundle install` to install dependencies
|
|
||||||
- Next, download Derby jar from https://db.apache.org/derby/
|
|
||||||
- Run the tests `JDBC_DERBY_JAR=path/to/derby.jar jruby -S rspec`
|
|
||||||
- Optionally add the `JDBC_DEBUG=1` env variable to add logging to stdout
|
|
||||||
|
|
||||||
## Configuration options
|
## Configuration options
|
||||||
|
|
||||||
|
@ -53,17 +43,46 @@ Assuming valid jruby is installed
|
||||||
| driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
|
| driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
|
||||||
| driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
|
| driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
|
||||||
| connection_string | String | JDBC connection URL | Yes | |
|
| connection_string | String | JDBC connection URL | Yes | |
|
||||||
|
| connection_test | Boolean | Run a JDBC connection test. Some drivers do not function correctly, and you may need to disable the connection test to supress an error. Cockroach with the postgres JDBC driver is such an example. | No | Yes |
|
||||||
|
| connection_test_query | String | Connection test and init query string, required for some JDBC drivers that don't support isValid(). Typically you'd set to this "SELECT 1" | No | |
|
||||||
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
|
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
|
||||||
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
|
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
|
||||||
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
|
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
|
||||||
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
|
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
|
||||||
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
|
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
|
||||||
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
|
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
|
||||||
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
|
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
|
||||||
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 |
|
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before the set of events are discarded. Set to a value less than 1 if you never want it to stop. This should be carefully configured with respect to retry_initial_interval and retry_max_interval, if your SQL server is not highly available | No | 10 |
|
||||||
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
|
| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
|
||||||
|
| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
|
||||||
|
| retry_sql_states | Array of strings | An array of custom SQL state codes you wish to retry until `max_flush_exceptions`. Useful if you're using a JDBC driver which returns retry-able, but non-standard SQL state codes in it's exceptions. | No | [] |
|
||||||
|
| event_as_json_keyword | String | The magic key word that the plugin looks for to convert the entire event into a JSON object. As Logstash does not support this out of the box with it's `sprintf` implementation, you can use whatever this field is set to in the statement parameters | No | @event |
|
||||||
|
| enable_event_as_json_keyword | Boolean | Enables the magic keyword set in the configuration option `event_as_json_keyword`. Without this enabled the plugin will not convert the `event_as_json_keyword` into JSON encoding of the entire event. | No | False |
|
||||||
|
|
||||||
## Example configurations
|
## Example configurations
|
||||||
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
|
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
|
||||||
|
|
||||||
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
|
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
|
||||||
|
|
||||||
|
## Development and Running tests
|
||||||
|
For development tests are recommended to run inside a virtual machine (Vagrantfile is included in the repo), as it requires
|
||||||
|
access to various database engines and could completely destroy any data in a live system.
|
||||||
|
|
||||||
|
If you have vagrant available (this is temporary whilst I'm hacking on v5 support. I'll make this more streamlined later):
|
||||||
|
- `vagrant up`
|
||||||
|
- `vagrant ssh`
|
||||||
|
- `cd /vagrant`
|
||||||
|
- `gem install bundler`
|
||||||
|
- `cd /vagrant && bundle install && bundle exec rake vendor && bundle exec rake install_jars`
|
||||||
|
- `./scripts/travis-before_script.sh && source ./scripts/travis-variables.sh`
|
||||||
|
- `bundle exec rspec`
|
||||||
|
|
||||||
|
## Releasing
|
||||||
|
- Update Changelog
|
||||||
|
- Bump version in gemspec
|
||||||
|
- Commit
|
||||||
|
- Create tag `git tag v<version-number-in-gemspec>`
|
||||||
|
- `bundle exec rake install_jars`
|
||||||
|
- `bundle exec rake pre_release_checks`
|
||||||
|
- `gem build logstash-output-jdbc.gemspec`
|
||||||
|
- `gem push`
|
||||||
|
|
12
Rakefile
12
Rakefile
|
@ -1 +1,11 @@
|
||||||
require "logstash/devutils/rake"
|
# encoding: utf-8
|
||||||
|
require 'logstash/devutils/rake'
|
||||||
|
require 'jars/installer'
|
||||||
|
require 'rubygems'
|
||||||
|
|
||||||
|
desc 'Fetch any jars required for this plugin'
|
||||||
|
task :install_jars do
|
||||||
|
ENV['JARS_HOME'] = Dir.pwd + '/vendor/jar-dependencies/runtime-jars'
|
||||||
|
ENV['JARS_VENDOR'] = 'false'
|
||||||
|
Jars::Installer.new.vendor_jars!(false)
|
||||||
|
end
|
||||||
|
|
18
THANKS.md
Normal file
18
THANKS.md
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
logstash-output-jdbc is a project originally created by Karl Southern
|
||||||
|
(the_angry_angel), but there are a number of people that have contributed
|
||||||
|
or implemented key features over time. We do our best to keep this list
|
||||||
|
up-to-date, but you can also have a look at the nice contributor graphs
|
||||||
|
produced by GitHub: https://github.com/theangryangel/logstash-output-jdbc/graphs/contributors
|
||||||
|
|
||||||
|
* [hordijk](https://github.com/hordijk)
|
||||||
|
* [dmitryakadiamond](https://github.com/dmitryakadiamond)
|
||||||
|
* [MassimoSporchia](https://github.com/MassimoSporchia)
|
||||||
|
* [ebuildy](https://github.com/ebuildy)
|
||||||
|
* [kushtrimjunuzi](https://github.com/kushtrimjunuzi)
|
||||||
|
* [josemazo](https://github.com/josemazo)
|
||||||
|
* [aceoliver](https://github.com/aceoliver)
|
||||||
|
* [roflmao](https://github.com/roflmao)
|
||||||
|
* [onesuper](https://github.com/onesuper)
|
||||||
|
* [phr0gz](https://github.com/phr0gz)
|
||||||
|
* [jMonsinjon](https://github.com/jMonsinjon)
|
||||||
|
* [mlkmhd](https://github.com/mlkmhd)
|
35
Vagrantfile
vendored
Normal file
35
Vagrantfile
vendored
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
# -*- mode: ruby -*-
|
||||||
|
# vi: set ft=ruby :
|
||||||
|
|
||||||
|
JRUBY_VERSION = "jruby-1.7"
|
||||||
|
|
||||||
|
Vagrant.configure(2) do |config|
|
||||||
|
|
||||||
|
config.vm.define "debian" do |deb|
|
||||||
|
deb.vm.box = 'debian/stretch64'
|
||||||
|
deb.vm.synced_folder '.', '/vagrant', type: :virtualbox
|
||||||
|
|
||||||
|
deb.vm.provision 'shell', inline: <<-EOP
|
||||||
|
apt-get update
|
||||||
|
apt-get install openjdk-8-jre ca-certificates-java git curl -y -q
|
||||||
|
curl -sSL https://rvm.io/mpapis.asc | sudo gpg --import -
|
||||||
|
curl -sSL https://get.rvm.io | bash -s stable --ruby=#{JRUBY_VERSION}
|
||||||
|
usermod -a -G rvm vagrant
|
||||||
|
EOP
|
||||||
|
end
|
||||||
|
|
||||||
|
config.vm.define "centos" do |centos|
|
||||||
|
centos.vm.box = 'centos/7'
|
||||||
|
centos.ssh.insert_key = false # https://github.com/mitchellh/vagrant/issues/7610
|
||||||
|
centos.vm.synced_folder '.', '/vagrant', type: :virtualbox
|
||||||
|
|
||||||
|
centos.vm.provision 'shell', inline: <<-EOP
|
||||||
|
yum update
|
||||||
|
yum install java-1.7.0-openjdk
|
||||||
|
curl -sSL https://rvm.io/mpapis.asc | sudo gpg --import -
|
||||||
|
curl -sSL https://get.rvm.io | bash -s stable --ruby=#{JRUBY_VERSION}
|
||||||
|
usermod -a -G rvm vagrant
|
||||||
|
EOP
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
|
@ -1,6 +1,7 @@
|
||||||
# Example: Apache Phoenix (HBase SQL)
|
# Example: Apache Phoenix (HBase SQL)
|
||||||
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
|
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
|
||||||
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
|
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
|
||||||
|
* Please see apache-phoenix-thin-hbase-sql for phoenix-thin. The examples are different.
|
||||||
```
|
```
|
||||||
input
|
input
|
||||||
{
|
{
|
||||||
|
|
28
examples/apache-phoenix-thin-hbase-sql.md
Normal file
28
examples/apache-phoenix-thin-hbase-sql.md
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
# Example: Apache Phoenix-Thin (HBase SQL)
|
||||||
|
|
||||||
|
**There are special instructions for phoenix-thin. Please read carefully!**
|
||||||
|
|
||||||
|
* Tested with Logstash 5.1.1 / Apache Phoenix 4.9
|
||||||
|
* HBase and Zookeeper must be both accessible from logstash machine
|
||||||
|
* At time of writing phoenix-client does not include all the required jars (see https://issues.apache.org/jira/browse/PHOENIX-3476), therefore you must *not* use the driver_jar_path configuration option and instead:
|
||||||
|
- `mkdir -p vendor/jar/jdbc` in your logstash installation path
|
||||||
|
- copy `phoenix-queryserver-client-4.9.0-HBase-1.2.jar` from the phoenix distribution into this folder
|
||||||
|
- download the calcite jar from https://mvnrepository.com/artifact/org.apache.calcite/calcite-avatica/1.6.0 and place it into your `vendor/jar/jdbc` directory
|
||||||
|
* Use the following configuration as a base. The connection_test => false and connection_test_query are very important and should not be omitted. Phoenix-thin does not appear to support isValid and these are necessary for the connection to be added to the pool and be available.
|
||||||
|
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
connection_test => false
|
||||||
|
connection_test_query => "select 1"
|
||||||
|
driver_class => "org.apache.phoenix.queryserver.client.Driver"
|
||||||
|
connection_string => "jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF"
|
||||||
|
statement => [ "UPSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
18
examples/cockroachdb.md
Normal file
18
examples/cockroachdb.md
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
# Example: CockroachDB
|
||||||
|
- Tested using postgresql-9.4.1209.jre6.jar
|
||||||
|
- **Warning** cockroach is known to throw a warning on connection test (at time of writing), thus the connection test is explicitly disabled.
|
||||||
|
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
driver_jar_path => '/opt/postgresql-9.4.1209.jre6.jar'
|
||||||
|
connection_test => false
|
||||||
|
connection_string => 'jdbc:postgresql://127.0.0.1:26257/test?user=root'
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
|
@ -9,8 +9,9 @@ input
|
||||||
}
|
}
|
||||||
output {
|
output {
|
||||||
jdbc {
|
jdbc {
|
||||||
|
driver_class => "com.mysql.jdbc.Driver"
|
||||||
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
|
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
|
||||||
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST(? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# Example: SQL Server
|
# Example: SQL Server
|
||||||
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
|
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
|
||||||
|
* Known to be working with Microsoft SQL Server Always-On Cluster (see https://github.com/theangryangel/logstash-output-jdbc/issues/37). With thanks to [@phr0gz](https://github.com/phr0gz)
|
||||||
```
|
```
|
||||||
input
|
input
|
||||||
{
|
{
|
||||||
|
@ -7,8 +8,25 @@ input
|
||||||
}
|
}
|
||||||
output {
|
output {
|
||||||
jdbc {
|
jdbc {
|
||||||
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
|
driver_jar_path => '/opt/sqljdbc42.jar'
|
||||||
|
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password"
|
||||||
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Another example, with mixed static strings and parameters, with thanks to [@MassimoSporchia](https://github.com/MassimoSporchia)
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
driver_jar_path => '/opt/sqljdbc42.jar'
|
||||||
|
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password"
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message, comment) VALUES(?, ?, ?, 'static string')", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
|
@ -10,6 +10,7 @@ output {
|
||||||
stdout { }
|
stdout { }
|
||||||
|
|
||||||
jdbc {
|
jdbc {
|
||||||
|
driver_class => "org.sqlite.JDBC"
|
||||||
connection_string => 'jdbc:sqlite:test.db'
|
connection_string => 'jdbc:sqlite:test.db'
|
||||||
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
# encoding: utf-8
|
# encoding: utf-8
|
||||||
require 'logstash/environment'
|
require 'logstash/environment'
|
||||||
|
|
||||||
root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
|
root_dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
|
||||||
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")
|
LogStash::Environment.load_runtime_jars! File.join(root_dir, 'vendor')
|
||||||
|
|
|
@ -1,17 +0,0 @@
|
||||||
class RingBuffer < Array
|
|
||||||
attr_reader :max_size
|
|
||||||
|
|
||||||
def initialize(max_size, enum = nil)
|
|
||||||
@max_size = max_size
|
|
||||||
enum.each { |e| self << e } if enum
|
|
||||||
end
|
|
||||||
|
|
||||||
def <<(el)
|
|
||||||
if self.size < @max_size || @max_size.nil?
|
|
||||||
super
|
|
||||||
else
|
|
||||||
self.shift
|
|
||||||
self.push(el)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
|
@ -1,141 +1,141 @@
|
||||||
# encoding: utf-8
|
# encoding: utf-8
|
||||||
require "logstash/outputs/base"
|
require 'logstash/outputs/base'
|
||||||
require "logstash/namespace"
|
require 'logstash/namespace'
|
||||||
require "stud/buffer"
|
require 'concurrent'
|
||||||
require "java"
|
require 'stud/interval'
|
||||||
require "logstash-output-jdbc_jars"
|
require 'java'
|
||||||
require "logstash-output-jdbc_ring-buffer"
|
require 'logstash-output-jdbc_jars'
|
||||||
|
require 'json'
|
||||||
|
require 'bigdecimal'
|
||||||
|
|
||||||
|
# Write events to a SQL engine, using JDBC.
|
||||||
|
#
|
||||||
|
# It is upto the user of the plugin to correctly configure the plugin. This
|
||||||
|
# includes correctly crafting the SQL statement, and matching the number of
|
||||||
|
# parameters correctly.
|
||||||
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
||||||
# Adds buffer support
|
concurrency :shared
|
||||||
include Stud::Buffer
|
|
||||||
|
|
||||||
config_name "jdbc"
|
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
|
||||||
|
|
||||||
|
RETRYABLE_SQLSTATE_CLASSES = [
|
||||||
|
# Classes of retryable SQLSTATE codes
|
||||||
|
# Not all in the class will be retryable. However, this is the best that
|
||||||
|
# we've got right now.
|
||||||
|
# If a custom state code is required, set it in retry_sql_states.
|
||||||
|
'08', # Connection Exception
|
||||||
|
'24', # Invalid Cursor State (Maybe retry-able in some circumstances)
|
||||||
|
'25', # Invalid Transaction State
|
||||||
|
'40', # Transaction Rollback
|
||||||
|
'53', # Insufficient Resources
|
||||||
|
'54', # Program Limit Exceeded (MAYBE)
|
||||||
|
'55', # Object Not In Prerequisite State
|
||||||
|
'57', # Operator Intervention
|
||||||
|
'58', # System Error
|
||||||
|
].freeze
|
||||||
|
|
||||||
|
config_name 'jdbc'
|
||||||
|
|
||||||
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
|
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
|
||||||
config :driver_class, :validate => :string
|
config :driver_class, validate: :string
|
||||||
|
|
||||||
# Does the JDBC driver support autocommit?
|
# Does the JDBC driver support autocommit?
|
||||||
config :driver_auto_commit, :validate => :boolean, :default => true, :required => true
|
config :driver_auto_commit, validate: :boolean, default: true, required: true
|
||||||
|
|
||||||
# Where to find the jar
|
# Where to find the jar
|
||||||
# Defaults to not required, and to the original behaviour
|
# Defaults to not required, and to the original behaviour
|
||||||
config :driver_jar_path, :validate => :string, :required => false
|
config :driver_jar_path, validate: :string, required: false
|
||||||
|
|
||||||
# jdbc connection string
|
# jdbc connection string
|
||||||
config :connection_string, :validate => :string, :required => true
|
config :connection_string, validate: :string, required: true
|
||||||
|
|
||||||
# jdbc username - optional, maybe in the connection string
|
# jdbc username - optional, maybe in the connection string
|
||||||
config :username, :validate => :string, :required => false
|
config :username, validate: :string, required: false
|
||||||
|
|
||||||
# jdbc password - optional, maybe in the connection string
|
# jdbc password - optional, maybe in the connection string
|
||||||
config :password, :validate => :string, :required => false
|
config :password, validate: :string, required: false
|
||||||
|
|
||||||
# [ "insert into table (message) values(?)", "%{message}" ]
|
# [ "insert into table (message) values(?)", "%{message}" ]
|
||||||
config :statement, :validate => :array, :required => true
|
config :statement, validate: :array, required: true
|
||||||
|
|
||||||
# If this is an unsafe statement, use event.sprintf
|
# If this is an unsafe statement, use event.sprintf
|
||||||
# This also has potential performance penalties due to having to create a
|
# This also has potential performance penalties due to having to create a
|
||||||
# new statement for each event, rather than adding to the batch and issuing
|
# new statement for each event, rather than adding to the batch and issuing
|
||||||
# multiple inserts in 1 go
|
# multiple inserts in 1 go
|
||||||
config :unsafe_statement, :validate => :boolean, :default => false
|
config :unsafe_statement, validate: :boolean, default: false
|
||||||
|
|
||||||
# Number of connections in the pool to maintain
|
# Number of connections in the pool to maintain
|
||||||
config :max_pool_size, :validate => :number, :default => 5
|
config :max_pool_size, validate: :number, default: 5
|
||||||
|
|
||||||
# Connection timeout
|
# Connection timeout
|
||||||
config :connection_timeout, :validate => :number, :default => 10000
|
config :connection_timeout, validate: :number, default: 10000
|
||||||
|
|
||||||
# We buffer a certain number of events before flushing that out to SQL.
|
# We buffer a certain number of events before flushing that out to SQL.
|
||||||
# This setting controls how many events will be buffered before sending a
|
# This setting controls how many events will be buffered before sending a
|
||||||
# batch of events.
|
# batch of events.
|
||||||
config :flush_size, :validate => :number, :default => 1000
|
config :flush_size, validate: :number, default: 1000
|
||||||
|
|
||||||
# The amount of time since last flush before a flush is forced.
|
# Set initial interval in seconds between retries. Doubled on each retry up to `retry_max_interval`
|
||||||
#
|
config :retry_initial_interval, validate: :number, default: 2
|
||||||
# This setting helps ensure slow event rates don't get stuck in Logstash.
|
|
||||||
# For example, if your `flush_size` is 100, and you have received 10 events,
|
|
||||||
# and it has been more than `idle_flush_time` seconds since the last flush,
|
|
||||||
# Logstash will flush those 10 events automatically.
|
|
||||||
#
|
|
||||||
# This helps keep both fast and slow log streams moving along in
|
|
||||||
# a timely manner.
|
|
||||||
#
|
|
||||||
# If you change this value please ensure that you change
|
|
||||||
# max_flush_exceptions accordingly.
|
|
||||||
config :idle_flush_time, :validate => :number, :default => 1
|
|
||||||
|
|
||||||
# Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
|
# Maximum time between retries, in seconds
|
||||||
|
config :retry_max_interval, validate: :number, default: 128
|
||||||
|
|
||||||
|
# Any additional custom, retryable SQL state codes.
|
||||||
|
# Suitable for configuring retryable custom JDBC SQL state codes.
|
||||||
|
config :retry_sql_states, validate: :array, default: []
|
||||||
|
|
||||||
|
# Run a connection test on start.
|
||||||
|
config :connection_test, validate: :boolean, default: true
|
||||||
|
|
||||||
|
# Connection test and init string, required for some JDBC endpoints
|
||||||
|
# notable phoenix-thin - see logstash-output-jdbc issue #60
|
||||||
|
config :connection_test_query, validate: :string, required: false
|
||||||
|
|
||||||
|
# Maximum number of sequential failed attempts, before we stop retrying.
|
||||||
# If set to < 1, then it will infinitely retry.
|
# If set to < 1, then it will infinitely retry.
|
||||||
#
|
# At the default values this is a little over 10 minutes
|
||||||
# You should carefully tune this in relation to idle_flush_time if your SQL server
|
config :max_flush_exceptions, validate: :number, default: 10
|
||||||
# is not highly available.
|
|
||||||
# i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
|
|
||||||
# longer than 200 seconds to reboot, then logstash will stop.
|
|
||||||
config :max_flush_exceptions, :validate => :number, :default => 0
|
|
||||||
|
|
||||||
config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
|
config :max_repeat_exceptions, obsolete: 'This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation.'
|
||||||
config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
|
config :max_repeat_exceptions_time, obsolete: 'This is no longer required'
|
||||||
|
config :idle_flush_time, obsolete: 'No longer necessary under Logstash v5'
|
||||||
|
|
||||||
|
# Allows the whole event to be converted to JSON
|
||||||
|
config :enable_event_as_json_keyword, validate: :boolean, default: false
|
||||||
|
|
||||||
|
# The magic key used to convert the whole event to JSON. If you need this, and you have the default in your events, you can use this to change your magic keyword.
|
||||||
|
config :event_as_json_keyword, validate: :string, default: '@event'
|
||||||
|
|
||||||
public
|
|
||||||
def register
|
def register
|
||||||
@logger.info("JDBC - Starting up")
|
@logger.info('JDBC - Starting up')
|
||||||
|
|
||||||
load_jar_files!
|
load_jar_files!
|
||||||
|
|
||||||
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
|
@stopping = Concurrent::AtomicBoolean.new(false)
|
||||||
|
|
||||||
if (@flush_size > 1000)
|
@logger.warn('JDBC - Flush size is set to > 1000') if @flush_size > 1000
|
||||||
@logger.warn("JDBC - Flush size is set to > 1000")
|
|
||||||
|
if @statement.empty?
|
||||||
|
@logger.error('JDBC - No statement provided. Configuration error.')
|
||||||
end
|
end
|
||||||
|
|
||||||
if @statement.length < 1
|
if !@unsafe_statement && @statement.length < 2
|
||||||
@logger.error("JDBC - No statement provided. Configuration error.")
|
|
||||||
end
|
|
||||||
|
|
||||||
if (!@unsafe_statement and @statement.length < 2)
|
|
||||||
@logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
|
@logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
|
||||||
end
|
end
|
||||||
|
|
||||||
setup_and_test_pool!
|
setup_and_test_pool!
|
||||||
|
|
||||||
buffer_initialize(
|
|
||||||
:max_items => @flush_size,
|
|
||||||
:max_interval => @idle_flush_time,
|
|
||||||
:logger => @logger
|
|
||||||
)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def receive(event)
|
def multi_receive(events)
|
||||||
return unless output?(event) or event.cancelled?
|
events.each_slice(@flush_size) do |slice|
|
||||||
return unless @statement.length > 0
|
retrying_submit(slice)
|
||||||
|
|
||||||
buffer_receive(event)
|
|
||||||
end
|
|
||||||
|
|
||||||
def flush(events, teardown=false)
|
|
||||||
if @unsafe_statement == true
|
|
||||||
unsafe_flush(events, teardown)
|
|
||||||
else
|
|
||||||
safe_flush(events, teardown)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def on_flush_error(e)
|
def close
|
||||||
return if @max_flush_exceptions < 1
|
@stopping.make_true
|
||||||
|
@pool.close
|
||||||
@exceptions_tracker << e.class
|
|
||||||
|
|
||||||
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
|
|
||||||
@logger.error("JDBC - max_flush_exceptions has been reached")
|
|
||||||
log_jdbc_exception(e)
|
|
||||||
raise LogStash::ShutdownSignal.new
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def teardown
|
|
||||||
buffer_flush(:final => true)
|
|
||||||
@pool.close()
|
|
||||||
super
|
super
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -158,131 +158,196 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
||||||
|
|
||||||
validate_connection_timeout = (@connection_timeout / 1000) / 2
|
validate_connection_timeout = (@connection_timeout / 1000) / 2
|
||||||
|
|
||||||
# Test connection
|
if !@connection_test_query.nil? and @connection_test_query.length > 1
|
||||||
test_connection = @pool.getConnection()
|
@pool.setConnectionTestQuery(@connection_test_query)
|
||||||
unless test_connection.isValid(validate_connection_timeout)
|
@pool.setConnectionInitSql(@connection_test_query)
|
||||||
@logger.error("JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.")
|
|
||||||
end
|
end
|
||||||
test_connection.close()
|
|
||||||
|
return unless @connection_test
|
||||||
|
|
||||||
|
# Test connection
|
||||||
|
test_connection = @pool.getConnection
|
||||||
|
unless test_connection.isValid(validate_connection_timeout)
|
||||||
|
@logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
|
||||||
|
end
|
||||||
|
test_connection.close
|
||||||
end
|
end
|
||||||
|
|
||||||
def load_jar_files!
|
def load_jar_files!
|
||||||
# Load jar from driver path
|
# Load jar from driver path
|
||||||
unless @driver_jar_path.nil?
|
unless @driver_jar_path.nil?
|
||||||
raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path
|
raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path
|
||||||
require @driver_jar_path
|
require @driver_jar_path
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
# Revert original behaviour of loading from vendor directory
|
# Revert original behaviour of loading from vendor directory
|
||||||
# if no path given
|
# if no path given
|
||||||
if ENV['LOGSTASH_HOME']
|
jarpath = if ENV['LOGSTASH_HOME']
|
||||||
jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
|
File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar')
|
||||||
else
|
else
|
||||||
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
|
File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar')
|
||||||
end
|
end
|
||||||
|
|
||||||
@logger.debug("JDBC - jarpath", path: jarpath)
|
@logger.trace('JDBC - jarpath', path: jarpath)
|
||||||
|
|
||||||
jars = Dir[jarpath]
|
jars = Dir[jarpath]
|
||||||
raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty?
|
raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty?
|
||||||
|
|
||||||
jars.each do |jar|
|
jars.each do |jar|
|
||||||
@logger.debug("JDBC - Loaded jar", :jar => jar)
|
@logger.trace('JDBC - Loaded jar', jar: jar)
|
||||||
require jar
|
require jar
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def safe_flush(events, teardown=false)
|
def submit(events)
|
||||||
connection = nil
|
connection = nil
|
||||||
statement = nil
|
statement = nil
|
||||||
|
events_to_retry = []
|
||||||
|
|
||||||
begin
|
begin
|
||||||
connection = @pool.getConnection()
|
connection = @pool.getConnection
|
||||||
statement = connection.prepareStatement(@statement[0])
|
|
||||||
|
|
||||||
events.each do |event|
|
|
||||||
next if event.cancelled?
|
|
||||||
next if @statement.length < 2
|
|
||||||
statement = add_statement_event_params(statement, event)
|
|
||||||
|
|
||||||
statement.addBatch()
|
|
||||||
end
|
|
||||||
|
|
||||||
statement.executeBatch()
|
|
||||||
statement.close()
|
|
||||||
@exceptions_tracker << nil
|
|
||||||
rescue => e
|
rescue => e
|
||||||
log_jdbc_exception(e)
|
log_jdbc_exception(e, true, nil)
|
||||||
ensure
|
# If a connection is not available, then the server has gone away
|
||||||
statement.close() unless statement.nil?
|
# We're not counting that towards our retry count.
|
||||||
connection.close() unless connection.nil?
|
return events, false
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
def unsafe_flush(events, teardown=false)
|
|
||||||
connection = nil
|
|
||||||
statement = nil
|
|
||||||
begin
|
|
||||||
connection = @pool.getConnection()
|
|
||||||
|
|
||||||
events.each do |event|
|
events.each do |event|
|
||||||
next if event.cancelled?
|
begin
|
||||||
|
statement = connection.prepareStatement(
|
||||||
statement = connection.prepareStatement(event.sprintf(@statement[0]))
|
(@unsafe_statement == true) ? event.sprintf(@statement[0]) : @statement[0]
|
||||||
|
)
|
||||||
statement = add_statement_event_params(statement, event) if @statement.length > 1
|
statement = add_statement_event_params(statement, event) if @statement.length > 1
|
||||||
|
statement.execute
|
||||||
statement.execute()
|
|
||||||
|
|
||||||
# cancel the event, since we may end up outputting the same event multiple times
|
|
||||||
# if an exception happens later down the line
|
|
||||||
event.cancel
|
|
||||||
@exceptions_tracker << nil
|
|
||||||
end
|
|
||||||
rescue => e
|
rescue => e
|
||||||
log_jdbc_exception(e)
|
if retry_exception?(e, event.to_json())
|
||||||
|
events_to_retry.push(event)
|
||||||
|
end
|
||||||
ensure
|
ensure
|
||||||
statement.close() unless statement.nil?
|
statement.close unless statement.nil?
|
||||||
connection.close() unless connection.nil?
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
connection.close unless connection.nil?
|
||||||
|
|
||||||
|
return events_to_retry, true
|
||||||
|
end
|
||||||
|
|
||||||
|
def retrying_submit(actions)
|
||||||
|
# Initially we submit the full list of actions
|
||||||
|
submit_actions = actions
|
||||||
|
count_as_attempt = true
|
||||||
|
|
||||||
|
attempts = 1
|
||||||
|
|
||||||
|
sleep_interval = @retry_initial_interval
|
||||||
|
while @stopping.false? and (submit_actions and !submit_actions.empty?)
|
||||||
|
return if !submit_actions || submit_actions.empty? # If everything's a success we move along
|
||||||
|
# We retry whatever didn't succeed
|
||||||
|
submit_actions, count_as_attempt = submit(submit_actions)
|
||||||
|
|
||||||
|
# Everything was a success!
|
||||||
|
break if !submit_actions || submit_actions.empty?
|
||||||
|
|
||||||
|
if @max_flush_exceptions > 0 and count_as_attempt == true
|
||||||
|
attempts += 1
|
||||||
|
|
||||||
|
if attempts > @max_flush_exceptions
|
||||||
|
@logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.")
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# If we're retrying the action sleep for the recommended interval
|
||||||
|
# Double the interval for the next time through to achieve exponential backoff
|
||||||
|
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
|
||||||
|
sleep_interval = next_sleep_interval(sleep_interval)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def add_statement_event_params(statement, event)
|
def add_statement_event_params(statement, event)
|
||||||
@statement[1..-1].each_with_index do |i, idx|
|
@statement[1..-1].each_with_index do |i, idx|
|
||||||
case event[i]
|
if @enable_event_as_json_keyword == true and i.is_a? String and i == @event_as_json_keyword
|
||||||
when Time
|
value = event.to_json
|
||||||
# Most reliable solution, cross JDBC driver
|
elsif i.is_a? String
|
||||||
statement.setString(idx + 1, event[i].iso8601())
|
value = event.get(i)
|
||||||
when LogStash::Timestamp
|
if value.nil? and i =~ /%\{/
|
||||||
# Most reliable solution, cross JDBC driver
|
value = event.sprintf(i)
|
||||||
statement.setString(idx + 1, event[i].to_iso8601())
|
end
|
||||||
when Fixnum, Integer
|
|
||||||
statement.setInt(idx + 1, event[i])
|
|
||||||
when Float
|
|
||||||
statement.setFloat(idx + 1, event[i])
|
|
||||||
when String
|
|
||||||
statement.setString(idx + 1, event[i])
|
|
||||||
when true
|
|
||||||
statement.setBoolean(idx + 1, true)
|
|
||||||
when false
|
|
||||||
statement.setBoolean(idx + 1, false)
|
|
||||||
else
|
else
|
||||||
if event[i].nil? and i =~ /%\{/
|
value = i
|
||||||
statement.setString(idx + 1, event.sprintf(i))
|
end
|
||||||
|
|
||||||
|
case value
|
||||||
|
when Time
|
||||||
|
# See LogStash::Timestamp, below, for the why behind strftime.
|
||||||
|
statement.setString(idx + 1, value.strftime(STRFTIME_FMT))
|
||||||
|
when LogStash::Timestamp
|
||||||
|
# XXX: Using setString as opposed to setTimestamp, because setTimestamp
|
||||||
|
# doesn't behave correctly in some drivers (Known: sqlite)
|
||||||
|
#
|
||||||
|
# Additionally this does not use `to_iso8601`, since some SQL databases
|
||||||
|
# choke on the 'T' in the string (Known: Derby).
|
||||||
|
#
|
||||||
|
# strftime appears to be the most reliable across drivers.
|
||||||
|
statement.setString(idx + 1, value.time.strftime(STRFTIME_FMT))
|
||||||
|
when Fixnum, Integer
|
||||||
|
if value > 2147483647 or value < -2147483648
|
||||||
|
statement.setLong(idx + 1, value)
|
||||||
|
else
|
||||||
|
statement.setInt(idx + 1, value)
|
||||||
|
end
|
||||||
|
when BigDecimal
|
||||||
|
# TODO: There has to be a better way than this. Find it.
|
||||||
|
statement.setBigDecimal(idx + 1, java.math.BigDecimal.new(value.to_s))
|
||||||
|
when Float
|
||||||
|
statement.setFloat(idx + 1, value)
|
||||||
|
when String
|
||||||
|
statement.setString(idx + 1, value)
|
||||||
|
when Array, Hash
|
||||||
|
statement.setString(idx + 1, value.to_json)
|
||||||
|
when true, false
|
||||||
|
statement.setBoolean(idx + 1, value)
|
||||||
else
|
else
|
||||||
statement.setString(idx + 1, nil)
|
statement.setString(idx + 1, nil)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
statement
|
statement
|
||||||
end
|
end
|
||||||
|
|
||||||
def log_jdbc_exception(exception)
|
def retry_exception?(exception, event)
|
||||||
|
retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState)))
|
||||||
|
log_jdbc_exception(exception, retrying, event)
|
||||||
|
|
||||||
|
retrying
|
||||||
|
end
|
||||||
|
|
||||||
|
def log_jdbc_exception(exception, retrying, event)
|
||||||
current_exception = exception
|
current_exception = exception
|
||||||
|
log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying')
|
||||||
|
|
||||||
|
log_method = (retrying ? 'warn' : 'error')
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => current_exception)
|
# TODO reformat event output so that it only shows the fields necessary.
|
||||||
|
|
||||||
|
@logger.send(log_method, log_text, :exception => current_exception, :statement => @statement[0], :event => event)
|
||||||
|
|
||||||
|
if current_exception.respond_to? 'getNextException'
|
||||||
current_exception = current_exception.getNextException()
|
current_exception = current_exception.getNextException()
|
||||||
|
else
|
||||||
|
current_exception = nil
|
||||||
|
end
|
||||||
|
|
||||||
break if current_exception == nil
|
break if current_exception == nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def next_sleep_interval(current_interval)
|
||||||
|
doubled = current_interval * 2
|
||||||
|
doubled > @retry_max_interval ? @retry_max_interval : doubled
|
||||||
|
end
|
||||||
end # class LogStash::Outputs::jdbc
|
end # class LogStash::Outputs::jdbc
|
||||||
|
|
17
log4j2.xml
Normal file
17
log4j2.xml
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Configuration>
|
||||||
|
<Appenders>
|
||||||
|
<File name="file" fileName="log4j2.log">
|
||||||
|
<PatternLayout pattern="%d{yyyy-mm-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
|
||||||
|
</File>
|
||||||
|
</Appenders>
|
||||||
|
|
||||||
|
<Loggers>
|
||||||
|
<!-- If we need to figure out whats happening for development purposes, disable this -->
|
||||||
|
<Logger name="com.zaxxer.hikari" level="off" />
|
||||||
|
|
||||||
|
<Root level="debug">
|
||||||
|
<AppenderRef ref="file"/>
|
||||||
|
</Root>
|
||||||
|
</Loggers>
|
||||||
|
</Configuration>
|
|
@ -1,29 +1,38 @@
|
||||||
Gem::Specification.new do |s|
|
Gem::Specification.new do |s|
|
||||||
s.name = 'logstash-output-jdbc'
|
s.name = 'logstash-output-jdbc'
|
||||||
s.version = "0.2.6"
|
s.version = '5.3.0'
|
||||||
s.licenses = [ "Apache License (2.0)" ]
|
s.licenses = ['Apache License (2.0)']
|
||||||
s.summary = "This plugin allows you to output to SQL, via JDBC"
|
s.summary = 'This plugin allows you to output to SQL, via JDBC'
|
||||||
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
|
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install 'logstash-output-jdbc'. This gem is not a stand-alone program"
|
||||||
s.authors = ["the_angry_angel"]
|
s.authors = ['the_angry_angel']
|
||||||
s.email = "karl+github@theangryangel.co.uk"
|
s.email = 'karl+github@theangryangel.co.uk'
|
||||||
s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
|
s.homepage = 'https://github.com/theangryangel/logstash-output-jdbc'
|
||||||
s.require_paths = [ "lib" ]
|
s.require_paths = ['lib']
|
||||||
|
|
||||||
|
# Java only
|
||||||
|
s.platform = 'java'
|
||||||
|
|
||||||
# Files
|
# Files
|
||||||
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
|
s.files = Dir.glob('{lib,spec}/**/*.rb') + Dir.glob('vendor/**/*') + %w(LICENSE.txt README.md)
|
||||||
|
|
||||||
# Tests
|
# Tests
|
||||||
s.test_files = s.files.grep(%r{^(test|spec|features)/})
|
s.test_files = s.files.grep(%r{^(test|spec|features)/})
|
||||||
|
|
||||||
# Special flag to let us know this is actually a logstash plugin
|
# Special flag to let us know this is actually a logstash plugin
|
||||||
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
|
s.metadata = { 'logstash_plugin' => 'true', 'logstash_group' => 'output' }
|
||||||
|
|
||||||
# Gem dependencies
|
# Gem dependencies
|
||||||
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
|
s.add_runtime_dependency 'logstash-core-plugin-api', '~> 2'
|
||||||
s.add_runtime_dependency 'stud'
|
s.add_runtime_dependency 'stud'
|
||||||
s.add_runtime_dependency "logstash-codec-plain"
|
s.add_runtime_dependency 'logstash-codec-plain'
|
||||||
|
|
||||||
s.add_development_dependency "logstash-devutils"
|
s.requirements << "jar 'com.zaxxer:HikariCP', '2.7.2'"
|
||||||
|
s.requirements << "jar 'org.apache.logging.log4j:log4j-slf4j-impl', '2.6.2'"
|
||||||
|
|
||||||
s.post_install_message = "logstash-output-jdbc 0.2.0 introduces several new features - please ensure you check the documentation in the README file"
|
s.add_development_dependency 'jar-dependencies'
|
||||||
|
s.add_development_dependency 'ruby-maven', '~> 3.3'
|
||||||
|
|
||||||
|
s.add_development_dependency "logstash-devutils", "~> 1.3", ">= 1.3.1"
|
||||||
|
|
||||||
|
s.add_development_dependency 'rubocop', '0.41.2'
|
||||||
end
|
end
|
||||||
|
|
19
scripts/minutes_to_retries.rb
Executable file
19
scripts/minutes_to_retries.rb
Executable file
|
@ -0,0 +1,19 @@
|
||||||
|
#!/usr/bin/env ruby -w
|
||||||
|
|
||||||
|
seconds_to_reach = 10 * 60
|
||||||
|
retry_max_interval = 128
|
||||||
|
|
||||||
|
current_interval = 2
|
||||||
|
total_interval = 0
|
||||||
|
exceptions_count = 1
|
||||||
|
|
||||||
|
loop do
|
||||||
|
break if total_interval > seconds_to_reach
|
||||||
|
exceptions_count += 1
|
||||||
|
|
||||||
|
current_interval = current_interval * 2 > retry_max_interval ? retry_max_interval : current_interval * 2
|
||||||
|
|
||||||
|
total_interval += current_interval
|
||||||
|
end
|
||||||
|
|
||||||
|
puts exceptions_count
|
10
scripts/travis-before_script.sh
Executable file
10
scripts/travis-before_script.sh
Executable file
|
@ -0,0 +1,10 @@
|
||||||
|
#!/bin/bash
|
||||||
|
wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
|
||||||
|
|
||||||
|
sudo apt-get install mysql-server postgresql-client postgresql -qq -y
|
||||||
|
echo "create database logstash; grant all privileges on logstash.* to 'logstash'@'localhost' identified by 'logstash'; flush privileges;" | sudo -u root mysql
|
||||||
|
echo "create user logstash PASSWORD 'logstash'; create database logstash; grant all privileges on database logstash to logstash;" | sudo -u postgres psql
|
||||||
|
|
||||||
|
wget http://search.maven.org/remotecontent?filepath=mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar -O /tmp/mysql.jar
|
||||||
|
wget http://search.maven.org/remotecontent?filepath=org/xerial/sqlite-jdbc/3.8.11.2/sqlite-jdbc-3.8.11.2.jar -O /tmp/sqlite.jar
|
||||||
|
wget http://central.maven.org/maven2/org/postgresql/postgresql/42.1.4/postgresql-42.1.4.jar -O /tmp/postgres.jar
|
5
scripts/travis-variables.sh
Normal file
5
scripts/travis-variables.sh
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
export JDBC_DERBY_JAR=/tmp/derby.jar
|
||||||
|
export JDBC_MYSQL_JAR=/tmp/mysql.jar
|
||||||
|
export JDBC_SQLITE_JAR=/tmp/sqlite.jar
|
||||||
|
export JDBC_POSTGRES_JAR=/tmp/postgres.jar
|
||||||
|
|
216
spec/jdbc_spec_helper.rb
Normal file
216
spec/jdbc_spec_helper.rb
Normal file
|
@ -0,0 +1,216 @@
|
||||||
|
require 'logstash/devutils/rspec/spec_helper'
|
||||||
|
require 'logstash/outputs/jdbc'
|
||||||
|
require 'stud/temporary'
|
||||||
|
require 'java'
|
||||||
|
require 'securerandom'
|
||||||
|
|
||||||
|
RSpec::Support::ObjectFormatter.default_instance.max_formatted_output_length = 80000
|
||||||
|
|
||||||
|
RSpec.configure do |c|
|
||||||
|
|
||||||
|
def start_service(name)
|
||||||
|
cmd = "sudo /etc/init.d/#{name}* start"
|
||||||
|
|
||||||
|
`which systemctl`
|
||||||
|
if $?.success?
|
||||||
|
cmd = "sudo systemctl start #{name}"
|
||||||
|
end
|
||||||
|
|
||||||
|
`#{cmd}`
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop_service(name)
|
||||||
|
cmd = "sudo /etc/init.d/#{name}* stop"
|
||||||
|
|
||||||
|
`which systemctl`
|
||||||
|
if $?.success?
|
||||||
|
cmd = "sudo systemctl stop #{name}"
|
||||||
|
end
|
||||||
|
|
||||||
|
`#{cmd}`
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
RSpec.shared_context 'rspec setup' do
|
||||||
|
it 'ensure jar is available' do
|
||||||
|
expect(ENV[jdbc_jar_env]).not_to be_nil, "#{jdbc_jar_env} not defined, required to run tests"
|
||||||
|
expect(File.exist?(ENV[jdbc_jar_env])).to eq(true), "#{jdbc_jar_env} defined, but not valid"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
RSpec.shared_context 'when initializing' do
|
||||||
|
it 'shouldn\'t register with a missing jar file' do
|
||||||
|
jdbc_settings['driver_jar_path'] = nil
|
||||||
|
plugin = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
|
||||||
|
expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
RSpec.shared_context 'when outputting messages' do
|
||||||
|
let(:logger) {
|
||||||
|
double("logger")
|
||||||
|
}
|
||||||
|
|
||||||
|
let(:jdbc_test_table) do
|
||||||
|
'logstash_output_jdbc_test'
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_drop_table) do
|
||||||
|
"DROP TABLE #{jdbc_test_table}"
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_statement_fields) do
|
||||||
|
[
|
||||||
|
{db_field: "created_at", db_type: "datetime", db_value: '?', event_field: '@timestamp'},
|
||||||
|
{db_field: "message", db_type: "varchar(512)", db_value: '?', event_field: 'message'},
|
||||||
|
{db_field: "message_sprintf", db_type: "varchar(512)", db_value: '?', event_field: 'sprintf-%{message}'},
|
||||||
|
{db_field: "static_int", db_type: "int", db_value: '?', event_field: 'int'},
|
||||||
|
{db_field: "static_bigint", db_type: "bigint", db_value: '?', event_field: 'bigint'},
|
||||||
|
{db_field: "static_float", db_type: "float", db_value: '?', event_field: 'float'},
|
||||||
|
{db_field: "static_bool", db_type: "boolean", db_value: '?', event_field: 'bool'},
|
||||||
|
{db_field: "static_bigdec", db_type: "decimal", db_value: '?', event_field: 'bigdec'}
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_create_table) do
|
||||||
|
fields = jdbc_statement_fields.collect { |entry| "#{entry[:db_field]} #{entry[:db_type]} not null" }.join(", ")
|
||||||
|
|
||||||
|
"CREATE table #{jdbc_test_table} (#{fields})"
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_drop_table) do
|
||||||
|
"DROP table #{jdbc_test_table}"
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_statement) do
|
||||||
|
fields = jdbc_statement_fields.collect { |entry| "#{entry[:db_field]}" }.join(", ")
|
||||||
|
values = jdbc_statement_fields.collect { |entry| "#{entry[:db_value]}" }.join(", ")
|
||||||
|
statement = jdbc_statement_fields.collect { |entry| entry[:event_field] }
|
||||||
|
|
||||||
|
statement.insert(0, "insert into #{jdbc_test_table} (#{fields}) values(#{values})")
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:systemd_database_service) do
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:event) do
|
||||||
|
# TODO: Auto generate fields from jdbc_statement_fields
|
||||||
|
LogStash::Event.new({
|
||||||
|
message: "test-message #{SecureRandom.uuid}",
|
||||||
|
float: 12.1,
|
||||||
|
bigint: 4000881632477184,
|
||||||
|
bool: true,
|
||||||
|
int: 1,
|
||||||
|
bigdec: BigDecimal.new("123.123")
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:plugin) do
|
||||||
|
# Setup logger
|
||||||
|
allow(LogStash::Outputs::Jdbc).to receive(:logger).and_return(logger)
|
||||||
|
|
||||||
|
# XXX: Suppress reflection logging. There has to be a better way around this.
|
||||||
|
allow(logger).to receive(:debug).with(/config LogStash::/)
|
||||||
|
|
||||||
|
# Suppress beta warnings.
|
||||||
|
allow(logger).to receive(:info).with(/Please let us know if you find bugs or have suggestions on how to improve this plugin./)
|
||||||
|
|
||||||
|
# Suppress start up messages.
|
||||||
|
expect(logger).to receive(:info).once.with(/JDBC - Starting up/)
|
||||||
|
|
||||||
|
# Setup plugin
|
||||||
|
output = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
|
||||||
|
output.register
|
||||||
|
|
||||||
|
output
|
||||||
|
end
|
||||||
|
|
||||||
|
before :each do
|
||||||
|
# Setup table
|
||||||
|
c = plugin.instance_variable_get(:@pool).getConnection
|
||||||
|
|
||||||
|
# Derby doesn't support IF EXISTS.
|
||||||
|
# Seems like the quickest solution. Bleurgh.
|
||||||
|
begin
|
||||||
|
stmt = c.createStatement
|
||||||
|
stmt.executeUpdate(jdbc_drop_table)
|
||||||
|
rescue
|
||||||
|
# noop
|
||||||
|
ensure
|
||||||
|
stmt.close
|
||||||
|
|
||||||
|
stmt = c.createStatement
|
||||||
|
stmt.executeUpdate(jdbc_create_table)
|
||||||
|
stmt.close
|
||||||
|
c.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Delete table after each
|
||||||
|
after :each do
|
||||||
|
c = plugin.instance_variable_get(:@pool).getConnection
|
||||||
|
|
||||||
|
stmt = c.createStatement
|
||||||
|
stmt.executeUpdate(jdbc_drop_table)
|
||||||
|
stmt.close
|
||||||
|
c.close
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'should save a event' do
|
||||||
|
expect { plugin.multi_receive([event]) }.to_not raise_error
|
||||||
|
|
||||||
|
# Verify the number of items in the output table
|
||||||
|
c = plugin.instance_variable_get(:@pool).getConnection
|
||||||
|
|
||||||
|
# TODO replace this simple count with a check of the actual contents
|
||||||
|
|
||||||
|
stmt = c.prepareStatement("select count(*) as total from #{jdbc_test_table} where message = ?")
|
||||||
|
stmt.setString(1, event.get('message'))
|
||||||
|
rs = stmt.executeQuery
|
||||||
|
count = 0
|
||||||
|
count = rs.getInt('total') while rs.next
|
||||||
|
stmt.close
|
||||||
|
c.close
|
||||||
|
|
||||||
|
expect(count).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'should not save event, and log an unretryable exception' do
|
||||||
|
e = event
|
||||||
|
original_event = e.get('message')
|
||||||
|
e.set('message', nil)
|
||||||
|
|
||||||
|
expect(logger).to receive(:error).once.with(/JDBC - Exception. Not retrying/, Hash)
|
||||||
|
expect { plugin.multi_receive([event]) }.to_not raise_error
|
||||||
|
|
||||||
|
e.set('message', original_event)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'it should retry after a connection loss, and log a warning' do
|
||||||
|
skip "does not run as a service, or known issue with test" if systemd_database_service.nil?
|
||||||
|
|
||||||
|
p = plugin
|
||||||
|
|
||||||
|
# Check that everything is fine right now
|
||||||
|
expect { p.multi_receive([event]) }.not_to raise_error
|
||||||
|
|
||||||
|
stop_service(systemd_database_service)
|
||||||
|
|
||||||
|
# Start a thread to restart the service after the fact.
|
||||||
|
t = Thread.new(systemd_database_service) { |systemd_database_service|
|
||||||
|
sleep 20
|
||||||
|
|
||||||
|
start_service(systemd_database_service)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.run
|
||||||
|
|
||||||
|
expect(logger).to receive(:warn).at_least(:once).with(/JDBC - Exception. Retrying/, Hash)
|
||||||
|
expect { p.multi_receive([event]) }.to_not raise_error
|
||||||
|
|
||||||
|
# Wait for the thread to finish
|
||||||
|
t.join
|
||||||
|
end
|
||||||
|
end
|
33
spec/outputs/jdbc_derby_spec.rb
Normal file
33
spec/outputs/jdbc_derby_spec.rb
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
require_relative '../jdbc_spec_helper'
|
||||||
|
|
||||||
|
describe 'logstash-output-jdbc: derby', if: ENV['JDBC_DERBY_JAR'] do
|
||||||
|
include_context 'rspec setup'
|
||||||
|
include_context 'when outputting messages'
|
||||||
|
|
||||||
|
let(:jdbc_jar_env) do
|
||||||
|
'JDBC_DERBY_JAR'
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_statement_fields) do
|
||||||
|
[
|
||||||
|
{db_field: "created_at", db_type: "timestamp", db_value: 'CAST(? as timestamp)', event_field: '@timestamp'},
|
||||||
|
{db_field: "message", db_type: "varchar(512)", db_value: '?', event_field: 'message'},
|
||||||
|
{db_field: "message_sprintf", db_type: "varchar(512)", db_value: '?', event_field: 'sprintf-%{message}'},
|
||||||
|
{db_field: "static_int", db_type: "int", db_value: '?', event_field: 'int'},
|
||||||
|
{db_field: "static_bigint", db_type: "bigint", db_value: '?', event_field: 'bigint'},
|
||||||
|
{db_field: "static_float", db_type: "float", db_value: '?', event_field: 'float'},
|
||||||
|
{db_field: "static_bool", db_type: "boolean", db_value: '?', event_field: 'bool'},
|
||||||
|
{db_field: "static_bigdec", db_type: "decimal", db_value: '?', event_field: 'bigdec'}
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_settings) do
|
||||||
|
{
|
||||||
|
'driver_class' => 'org.apache.derby.jdbc.EmbeddedDriver',
|
||||||
|
'connection_string' => 'jdbc:derby:memory:testdb;create=true',
|
||||||
|
'driver_jar_path' => ENV[jdbc_jar_env],
|
||||||
|
'statement' => jdbc_statement,
|
||||||
|
'max_flush_exceptions' => 1
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
24
spec/outputs/jdbc_mysql_spec.rb
Normal file
24
spec/outputs/jdbc_mysql_spec.rb
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
require_relative '../jdbc_spec_helper'
|
||||||
|
|
||||||
|
describe 'logstash-output-jdbc: mysql', if: ENV['JDBC_MYSQL_JAR'] do
|
||||||
|
include_context 'rspec setup'
|
||||||
|
include_context 'when outputting messages'
|
||||||
|
|
||||||
|
let(:jdbc_jar_env) do
|
||||||
|
'JDBC_MYSQL_JAR'
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:systemd_database_service) do
|
||||||
|
'mysql'
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_settings) do
|
||||||
|
{
|
||||||
|
'driver_class' => 'com.mysql.jdbc.Driver',
|
||||||
|
'connection_string' => 'jdbc:mysql://localhost/logstash?user=logstash&password=logstash',
|
||||||
|
'driver_jar_path' => ENV[jdbc_jar_env],
|
||||||
|
'statement' => jdbc_statement,
|
||||||
|
'max_flush_exceptions' => 1
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
41
spec/outputs/jdbc_postgres_spec.rb
Normal file
41
spec/outputs/jdbc_postgres_spec.rb
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
require_relative '../jdbc_spec_helper'
|
||||||
|
|
||||||
|
describe 'logstash-output-jdbc: postgres', if: ENV['JDBC_POSTGRES_JAR'] do
|
||||||
|
include_context 'rspec setup'
|
||||||
|
include_context 'when outputting messages'
|
||||||
|
|
||||||
|
let(:jdbc_jar_env) do
|
||||||
|
'JDBC_POSTGRES_JAR'
|
||||||
|
end
|
||||||
|
|
||||||
|
# TODO: Postgres doesnt kill connections fast enough for the test to pass
|
||||||
|
# Investigate options.
|
||||||
|
|
||||||
|
#let(:systemd_database_service) do
|
||||||
|
# 'postgresql'
|
||||||
|
#end
|
||||||
|
|
||||||
|
let(:jdbc_statement_fields) do
|
||||||
|
[
|
||||||
|
{db_field: "created_at", db_type: "timestamp", db_value: 'CAST(? as timestamp)', event_field: '@timestamp'},
|
||||||
|
{db_field: "message", db_type: "varchar(512)", db_value: '?', event_field: 'message'},
|
||||||
|
{db_field: "message_sprintf", db_type: "varchar(512)", db_value: '?', event_field: 'sprintf-%{message}'},
|
||||||
|
{db_field: "static_int", db_type: "int", db_value: '?', event_field: 'int'},
|
||||||
|
{db_field: "static_bigint", db_type: "bigint", db_value: '?', event_field: 'bigint'},
|
||||||
|
{db_field: "static_float", db_type: "float", db_value: '?', event_field: 'float'},
|
||||||
|
{db_field: "static_bool", db_type: "boolean", db_value: '?', event_field: 'bool'},
|
||||||
|
{db_field: "static_bigdec", db_type: "decimal", db_value: '?', event_field: 'bigdec'}
|
||||||
|
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_settings) do
|
||||||
|
{
|
||||||
|
'driver_class' => 'org.postgresql.Driver',
|
||||||
|
'connection_string' => 'jdbc:postgresql://localhost/logstash?user=logstash&password=logstash',
|
||||||
|
'driver_jar_path' => ENV[jdbc_jar_env],
|
||||||
|
'statement' => jdbc_statement,
|
||||||
|
'max_flush_exceptions' => 1
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,97 +1,11 @@
|
||||||
require "logstash/devutils/rspec/spec_helper"
|
require_relative '../jdbc_spec_helper'
|
||||||
require "logstash/outputs/jdbc"
|
|
||||||
require "stud/temporary"
|
|
||||||
require "java"
|
|
||||||
|
|
||||||
describe LogStash::Outputs::Jdbc do
|
describe LogStash::Outputs::Jdbc do
|
||||||
|
|
||||||
let(:derby_settings) do
|
|
||||||
{
|
|
||||||
"driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
|
|
||||||
"connection_string" => "jdbc:derby:memory:testdb;create=true",
|
|
||||||
"driver_jar_path" => ENV['JDBC_DERBY_JAR'],
|
|
||||||
# Grumble. Grumble.
|
|
||||||
# Derby doesn't like 'T' in timestamps as of current writing, so for now
|
|
||||||
# we'll just use CURRENT_TIMESTAMP as opposed to the event @timestamp
|
|
||||||
"statement" => [ "insert into log (created_at, message) values(CURRENT_TIMESTAMP, ?)", "message" ]
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'rspec setup' do
|
|
||||||
|
|
||||||
it 'ensure derby is available' do
|
|
||||||
j = ENV['JDBC_DERBY_JAR']
|
|
||||||
expect(j).not_to be_nil, "JDBC_DERBY_JAR not defined, required to run tests"
|
|
||||||
expect(File.exists?(j)).to eq(true), "JDBC_DERBY_JAR defined, but not valid"
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when initializing' do
|
context 'when initializing' do
|
||||||
|
|
||||||
it 'shouldn\'t register without a config' do
|
it 'shouldn\'t register without a config' do
|
||||||
expect {
|
expect do
|
||||||
LogStash::Plugin.lookup("output", "jdbc").new()
|
LogStash::Plugin.lookup('output', 'jdbc').new
|
||||||
}.to raise_error(LogStash::ConfigurationError)
|
end.to raise_error(LogStash::ConfigurationError)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'shouldn\'t register with a missing jar file' do
|
|
||||||
derby_settings['driver_jar_path'] = nil
|
|
||||||
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
|
|
||||||
expect { plugin.register }.to raise_error
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'shouldn\'t register with a missing jar file' do
|
|
||||||
derby_settings['connection_string'] = nil
|
|
||||||
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
|
|
||||||
expect { plugin.register }.to raise_error
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when outputting messages' do
|
|
||||||
|
|
||||||
let(:event_fields) do
|
|
||||||
{ message: 'test-message' }
|
|
||||||
end
|
|
||||||
let(:event) { LogStash::Event.new(event_fields) }
|
|
||||||
let(:plugin) {
|
|
||||||
# Setup plugin
|
|
||||||
output = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
|
|
||||||
output.register
|
|
||||||
if ENV['JDBC_DEBUG'] == '1'
|
|
||||||
output.logger.subscribe(STDOUT)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Setup table
|
|
||||||
c = output.instance_variable_get(:@pool).getConnection()
|
|
||||||
stmt = c.createStatement()
|
|
||||||
stmt.executeUpdate("CREATE table log (created_at timestamp, message varchar(512))")
|
|
||||||
stmt.close()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
output
|
|
||||||
}
|
|
||||||
|
|
||||||
it 'should save a event' do
|
|
||||||
expect { plugin.receive(event) }.to_not raise_error
|
|
||||||
|
|
||||||
# Wait for 1 second, for the buffer to flush
|
|
||||||
sleep 1
|
|
||||||
|
|
||||||
c = plugin.instance_variable_get(:@pool).getConnection()
|
|
||||||
stmt = c.createStatement()
|
|
||||||
rs = stmt.executeQuery("select count(*) as total from log")
|
|
||||||
count = 0
|
|
||||||
while rs.next()
|
|
||||||
count = rs.getInt("total")
|
|
||||||
end
|
|
||||||
stmt.close()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
expect(count).to be > 0
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
26
spec/outputs/jdbc_sqlite_spec.rb
Normal file
26
spec/outputs/jdbc_sqlite_spec.rb
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
require_relative '../jdbc_spec_helper'
|
||||||
|
|
||||||
|
describe 'logstash-output-jdbc: sqlite', if: ENV['JDBC_SQLITE_JAR'] do
|
||||||
|
JDBC_SQLITE_FILE = '/tmp/logstash_output_jdbc_test.db'.freeze
|
||||||
|
|
||||||
|
before(:context) do
|
||||||
|
File.delete(JDBC_SQLITE_FILE) if File.exist? JDBC_SQLITE_FILE
|
||||||
|
end
|
||||||
|
|
||||||
|
include_context 'rspec setup'
|
||||||
|
include_context 'when outputting messages'
|
||||||
|
|
||||||
|
let(:jdbc_jar_env) do
|
||||||
|
'JDBC_SQLITE_JAR'
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:jdbc_settings) do
|
||||||
|
{
|
||||||
|
'driver_class' => 'org.sqlite.JDBC',
|
||||||
|
'connection_string' => "jdbc:sqlite:#{JDBC_SQLITE_FILE}",
|
||||||
|
'driver_jar_path' => ENV[jdbc_jar_env],
|
||||||
|
'statement' => jdbc_statement,
|
||||||
|
'max_flush_exceptions' => 1
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue
Block a user