20 Commits

Author SHA1 Message Date
Karl Southern
b5419813ba 0.2.9 2016-06-29 13:42:09 +01:00
Karl Southern
ded1106b13 Address issue 44. 2016-06-28 22:38:36 +01:00
Karl Southern
2b27f39088 0.2.7 2016-05-29 13:45:26 +01:00
Karl Southern
7b337a8b91 Backport functionality from v5 branch. 2016-05-29 13:40:47 +01:00
Karl Southern
927e532b2a 0.2.6 2016-05-02 18:11:27 +01:00
Karl Southern
26a32a3f08 README update 2016-04-16 14:48:21 +01:00
Karl Southern
6bb84b165f Fecking version strings 2016-04-16 14:34:34 +01:00
Karl Southern
4e0292d222 rc1 for #36 2016-04-16 14:33:30 +01:00
Karl Southern
909cae01b3 Adds travis-ci badge 2016-04-12 11:20:19 +01:00
Karl Southern
6f2bd2ab3e Fiddling with travis-ci 2016-04-12 11:16:37 +01:00
Karl Southern
c5aeae1b02 Tags and versions are out of sequence. Bugger. 2016-04-11 18:22:11 +01:00
Karl Southern
a7d5a2e623 v0.2.4 2016-04-11 18:11:52 +01:00
Karl
3a64a22ac4 Merge pull request #32 from hordijk/patch-1
Fix toString method of LogStash::Timestamp
2016-04-11 17:21:25 +01:00
hordijk
c4b62769b9 Fix toString method of LogStash::Timestamp
According to LogStash::Timestamp (bb30cc773b/logstash-core-event/lib/logstash/timestamp.rb) doesn't support iso8601, which results in error if the timestamp of logstash is used directly.

If should support to_s of to_iso8601.

 :message=>"Failed to flush outgoing items", :outgoing_count=>1, :exception=>"NoMethodError", :backtrace=>["/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:255:in `add_statement_event_params'", "org/jruby/RubyArray.java:1613:in `each'", "org/jruby/RubyEnumerable.java:974:in `each_with_index'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:251:in `add_statement_event_params'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:203:in `safe_flush'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:200:in `safe_flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:120:in `flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:219:in `buffer_flush'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:216:in `buffer_flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:159:in `buffer_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:113:in `receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/outputs/base.rb:83:in `multi_receive'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/outputs/base.rb:83:in `multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/output_delegator.rb:130:in `worker_multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/output_delegator.rb:114:in `multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:305:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:305:in `output_batch'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:236:in `worker_loop'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:205:in `start_workers'"], :level=>:warn}
2016-04-11 15:19:48 +02:00
Karl Southern
b9e5f64d40 Bump minor version to fix documentation 2016-04-07 08:40:14 +01:00
Karl
c0e358aafb Merge pull request #30 from hordijk/master
Fix incorrect configuration option in the README.md for driver_jar
With thanks to @hordijk
2016-04-07 08:38:27 +01:00
hordijk
442ddf16eb Update README.md
Fix issue in documentation: driver_jar is not supported, it should be driver_jar_path

If driver_jar is used logstash will generate this error message=>"Unknown setting 'driver_path' for jdbc"
Used the driver_jar_path which is used in class LogStash::Outputs::Jdbc instead.
2016-04-07 08:35:58 +02:00
Karl Southern
4e7985dafd Addresses #28 - connection timeout bug 2016-02-16 15:29:08 +00:00
Karl Southern
ae51d77f05 Move examples and split up connection code
Bump version
2015-12-30 12:05:05 +00:00
Karl Southern
529c98aadb Addresses 22 not giving warning about incorrectly configured statements 2015-12-23 10:06:50 +00:00
15 changed files with 408 additions and 265 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@
Gemfile.lock Gemfile.lock
Gemfile.bak Gemfile.bak
.bundle .bundle
.vagrant

8
.travis.yml Normal file
View File

@@ -0,0 +1,8 @@
language: ruby
cache: bundler
rvm:
- jruby
before_script:
- wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
- export JDBC_DERBY_JAR=/tmp/derby.jar
script: bundle exec rspec

32
CHANGELOG.md Normal file
View File

@@ -0,0 +1,32 @@
# Change Log
All notable changes to this project will be documented in this file, from 0.2.0.
## [0.2.9] - 2016-06-29
- Fix NameError exception.
- Moved log_jdbc_exception calls
## [0.2.7] - 2016-05-29
- Backport retry exception logic from v5 branch
- Backport improved timestamp compatibility from v5 branch
## [0.2.6] - 2016-05-02
- Fix for exception infinite loop
## [0.2.5] - 2016-04-11
### Added
- Basic tests running against DerbyDB
- Fix for converting Logstash::Timestamp to iso8601 from @hordijk
## [0.2.4] - 2016-04-07
- Documentation fixes from @hordijk
## [0.2.3] - 2016-02-16
- Bug fixes
## [0.2.2] - 2015-12-30
- Bug fixes
## [0.2.1] - 2015-12-22
- Support for connection pooling support added through HikariCP
- Support for unsafe statement handling (allowing dynamic queries)
- Altered exception handling to now count sequential flushes with exceptions thrown

150
README.md
View File

@@ -1,4 +1,7 @@
# logstash-output-jdbc # logstash-output-jdbc
[![Build Status](https://travis-ci.org/theangryangel/logstash-output-jdbc.svg?branch=master)](https://travis-ci.org/theangryangel/logstash-output-jdbc)
This plugin is provided as an external plugin and is not part of the Logstash project. This plugin is provided as an external plugin and is not part of the Logstash project.
This plugin allows you to output to SQL databases, using JDBC adapters. This plugin allows you to output to SQL databases, using JDBC adapters.
@@ -6,36 +9,41 @@ See below for tested adapters, and example configurations.
This has not yet been extensively tested with all JDBC drivers and may not yet work for you. This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
If you do find this works for a JDBC driver not listed, let me know and provide a small example configuration. If you do find this works for a JDBC driver without an example, let me know and provide a small example configuration if you can.
This plugin does not bundle any JDBC jar files, and does expect them to be in a This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below. particular location. Please ensure you read the 4 installation lines below.
## Headlines ## ChangeLog
- Support for connection pooling added in 0.2.0 See CHANGELOG.md
- Support for unsafe statement handling (allowing dynamic queries) in 0.2.0
- Altered exception handling to now count sequential flushes with exceptions thrown in 0.2.0
## Versions ## Versions
- See master branch for logstash v2+ Released versions are available via rubygems, and typically tagged.
For development:
- See master branch for logstash v5
- See v2.x branch for logstash v2
- See v1.5 branch for logstash v1.5 - See v1.5 branch for logstash v1.5
- See v1.4 branch for logstash 1.4 - See v1.4 branch for logstash 1.4
## Installation ## Installation
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory - Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
- Now either: - Now either:
- Use driver_class in your configuraton to specify a path to your jar file - Use driver_jar_path in your configuraton to specify a path to your jar file
- Or: - Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`) - Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation - Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- And then configure (examples below) - And then configure (examples below)
## Running tests ## Running tests
Assuming valid JDBC jar, and jruby is setup and installed, and you have issued `jruby -S bundle install` in the development directory At this time tests only run against Derby, in an in-memory database.
- `SQL_JAR=path/to/your.jar jruby -S bundle exec rspec` Acceptance tests for individual database engines will be added over time.
If you need to provide username and password you may do this via the environment variables `SQL_USERNAME` and `SQL_PASSWORD`.
Tests are not yet 100% complete. Assuming valid jruby is installed
- First time, issue `jruby -S bundle install` to install dependencies
- Next, download Derby jar from https://db.apache.org/derby/
- Run the tests `JDBC_DERBY_JAR=path/to/derby.jar jruby -S rspec`
- Optionally add the `JDBC_DEBUG=1` env variable to add logging to stdout
## Configuration options ## Configuration options
@@ -43,7 +51,7 @@ Tests are not yet 100% complete.
| ------ | ---- | ----------- | --------- | ------- | | ------ | ---- | ----------- | --------- | ------- |
| driver_class | String | Specify a driver class if autoloading fails | No | | | driver_class | String | Specify a driver class if autoloading fails | No | |
| driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True | | driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
| driver_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | | | driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
| connection_string | String | JDBC connection URL | Yes | | | connection_string | String | JDBC connection URL | Yes | |
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | | | username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | | | password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
@@ -56,120 +64,6 @@ Tests are not yet 100% complete.
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 | | max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
## Example configurations ## Example configurations
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome. If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
### SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
```
input
{
stdin { }
}
output {
stdout { }
jdbc {
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### SQL Server
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### Postgres
With thanks to [@roflmao](https://github.com/roflmao)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### Oracle
With thanks to [@josemazo](https://github.com/josemazo)
* Tested with Express Edition 11g Release 2
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### Mysql
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### MariaDB
* Tested with Ubuntu 14.04.3 LTS, Server version: 10.1.9-MariaDB-1~trusty-log mariadb.org binary distribution
* Tested using https://downloads.mariadb.com/enterprise/tqge-whfa/connectors/java/connector-java-1.3.2/mariadb-java-client-1.3.2.jar (mariadb-java-client-1.3.2.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mariadb://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### Apache Phoenix (HBase SQL)
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:phoenix:ZOOKEEPER_HOSTNAME"
statement => [ "UPSERT INTO EVENTS log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

View File

@@ -0,0 +1,16 @@
# Example: Apache Phoenix (HBase SQL)
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:phoenix:ZOOKEEPER_HOSTNAME"
statement => [ "UPSERT INTO EVENTS log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

16
examples/mariadb.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: MariaDB
* Tested with Ubuntu 14.04.3 LTS, Server version: 10.1.9-MariaDB-1~trusty-log mariadb.org binary distribution
* Tested using https://downloads.mariadb.com/enterprise/tqge-whfa/connectors/java/connector-java-1.3.2/mariadb-java-client-1.3.2.jar (mariadb-java-client-1.3.2.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mariadb://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

16
examples/mysql.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: Mysql
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

20
examples/odps.md Normal file
View File

@@ -0,0 +1,20 @@
# Example: ODPS
With thanks to [@onesuper](https://github.com/onesuper)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => "com.aliyun.odps.jdbc.OdpsDriver"
driver_auto_commit => false
connection_string => "jdbc:odps:http://service.odps.aliyun.com/api?project=meta_dev&loglevel=DEBUG"
username => "abcd"
password => "1234"
max_pool_size => 5
flush_size => 10
statement => [ "INSERT INTO test_logstash VALUES(?, ?, ?);", "host", "@timestamp", "message" ]
}
}
```

16
examples/oracle.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: Oracle
With thanks to [@josemazo](https://github.com/josemazo)
* Tested with Express Edition 11g Release 2
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

15
examples/postgres.md Normal file
View File

@@ -0,0 +1,15 @@
# Example: Postgres
With thanks to [@roflmao](https://github.com/roflmao)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

14
examples/sql-server.md Normal file
View File

@@ -0,0 +1,14 @@
# Example: SQL Server
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

17
examples/sqlite.md Normal file
View File

@@ -0,0 +1,17 @@
# Example: SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
```
input
{
stdin { }
}
output {
stdout { }
jdbc {
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

View File

@@ -6,10 +6,33 @@ require "java"
require "logstash-output-jdbc_jars" require "logstash-output-jdbc_jars"
require "logstash-output-jdbc_ring-buffer" require "logstash-output-jdbc_ring-buffer"
# Write events to a SQL engine, using JDBC.
#
# It is upto the user of the plugin to correctly configure the plugin. This
# includes correctly crafting the SQL statement, and matching the number of
# parameters correctly.
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Adds buffer support # Adds buffer support
include Stud::Buffer include Stud::Buffer
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
RETRYABLE_SQLSTATE_CLASSES = [
# Classes of retryable SQLSTATE codes
# Not all in the class will be retryable. However, this is the best that
# we've got right now.
# If a custom state code is required, set it in retry_sql_states.
'08', # Connection Exception
'24', # Invalid Cursor State (Maybe retry-able in some circumstances)
'25', # Invalid Transaction State
'40', # Transaction Rollback
'53', # Insufficient Resources
'54', # Program Limit Exceeded (MAYBE)
'55', # Object Not In Prerequisite State
'57', # Operator Intervention
'58', # System Error
].freeze
config_name "jdbc" config_name "jdbc"
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26 # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
@@ -44,7 +67,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
config :max_pool_size, :validate => :number, :default => 5 config :max_pool_size, :validate => :number, :default => 5
# Connection timeout # Connection timeout
config :connection_timeout, :validate => :number, :default => 2800 config :connection_timeout, :validate => :number, :default => 10000
# We buffer a certain number of events before flushing that out to SQL. # We buffer a certain number of events before flushing that out to SQL.
# This setting controls how many events will be buffered before sending a # This setting controls how many events will be buffered before sending a
@@ -83,25 +106,22 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
load_jar_files! load_jar_files!
@pool = Java::ComZaxxerHikari::HikariDataSource.new
@pool.setAutoCommit(@driver_auto_commit)
@pool.setDriverClassName(@driver_class) if @driver_class
@pool.setJdbcUrl(@connection_string)
@pool.setUsername(@username) if @username
@pool.setPassword(@password) if @password
@pool.setMaximumPoolSize(@max_pool_size)
@pool.setConnectionTimeout(@connection_timeout)
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions) @exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
if (@flush_size > 1000) if (@flush_size > 1000)
@logger.warn("JDBC - Flush size is set to > 1000") @logger.warn("JDBC - Flush size is set to > 1000")
end end
if @statement.length < 1
@logger.error("JDBC - No statement provided. Configuration error.")
end
if (!@unsafe_statement and @statement.length < 2)
@logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
end
setup_and_test_pool!
buffer_initialize( buffer_initialize(
:max_items => @flush_size, :max_items => @flush_size,
:max_interval => @idle_flush_time, :max_interval => @idle_flush_time,
@@ -131,7 +151,6 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
@logger.error("JDBC - max_flush_exceptions has been reached") @logger.error("JDBC - max_flush_exceptions has been reached")
log_jdbc_exception(e)
raise LogStash::ShutdownSignal.new raise LogStash::ShutdownSignal.new
end end
end end
@@ -144,6 +163,31 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
private private
def setup_and_test_pool!
# Setup pool
@pool = Java::ComZaxxerHikari::HikariDataSource.new
@pool.setAutoCommit(@driver_auto_commit)
@pool.setDriverClassName(@driver_class) if @driver_class
@pool.setJdbcUrl(@connection_string)
@pool.setUsername(@username) if @username
@pool.setPassword(@password) if @password
@pool.setMaximumPoolSize(@max_pool_size)
@pool.setConnectionTimeout(@connection_timeout)
validate_connection_timeout = (@connection_timeout / 1000) / 2
# Test connection
test_connection = @pool.getConnection()
unless test_connection.isValid(validate_connection_timeout)
@logger.error("JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.")
end
test_connection.close()
end
def load_jar_files! def load_jar_files!
# Load jar from driver path # Load jar from driver path
unless @driver_jar_path.nil? unless @driver_jar_path.nil?
@@ -172,7 +216,17 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end end
def safe_flush(events, teardown=false) def safe_flush(events, teardown=false)
connection = nil
statement = nil
begin
connection = @pool.getConnection() connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e, true)
raise
end
begin
statement = connection.prepareStatement(@statement[0]) statement = connection.prepareStatement(@statement[0])
events.each do |event| events.each do |event|
@@ -183,54 +237,68 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
statement.addBatch() statement.addBatch()
end end
begin
statement.executeBatch() statement.executeBatch()
statement.close() statement.close()
@exceptions_tracker << nil @exceptions_tracker << nil
rescue => e rescue => e
# Raising an exception will incur a retry from Stud::Buffer. if retry_exception?(e)
# Since the exceutebatch failed this should mean any events failed to be raise
# inserted will be re-run. We're going to log it for the lols anyway. end
log_jdbc_exception(e)
ensure ensure
connection.close(); statement.close() unless statement.nil?
connection.close() unless connection.nil?
end end
end end
def unsafe_flush(events, teardown=false) def unsafe_flush(events, teardown=false)
connection = nil
statement = nil
begin
connection = @pool.getConnection() connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e, true)
raise
end
begin
events.each do |event| events.each do |event|
next if event.cancelled? next if event.cancelled?
statement = connection.prepareStatement(event.sprintf(@statement[0])) statement = connection.prepareStatement(event.sprintf(@statement[0]))
statement = add_statement_event_params(statement, event) if @statement.length > 1 statement = add_statement_event_params(statement, event) if @statement.length > 1
begin
statement.execute() statement.execute()
# cancel the event, since we may end up outputting the same event multiple times # cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line # if an exception happens later down the line
event.cancel event.cancel
@exceptions_tracker << nil @exceptions_tracker << nil
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# We log for the lols.
log_jdbc_exception(e)
ensure
statement.close()
connection.close()
end end
rescue => e
if retry_exception?(e)
raise
end
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
end end
end end
def add_statement_event_params(statement, event) def add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx| @statement[1..-1].each_with_index do |i, idx|
case event[i] case event[i]
when Time, LogStash::Timestamp when Time
# Most reliable solution, cross JDBC driver # See LogStash::Timestamp, below, for the why behind strftime.
statement.setString(idx + 1, event[i].iso8601()) statement.setString(idx + 1, event[i].strftime(STRFTIME_FMT))
when LogStash::Timestamp
# XXX: Using setString as opposed to setTimestamp, because setTimestamp
# doesn't behave correctly in some drivers (Known: sqlite)
#
# Additionally this does not use `to_iso8601`, since some SQL databases
# choke on the 'T' in the string (Known: Derby).
#
# strftime appears to be the most reliable across drivers.
statement.setString(idx + 1, event[i].time.strftime(STRFTIME_FMT))
when Fixnum, Integer when Fixnum, Integer
statement.setInt(idx + 1, event[i]) statement.setInt(idx + 1, event[i])
when Float when Float
@@ -253,12 +321,23 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
statement statement
end end
def log_jdbc_exception(e) def log_jdbc_exception(exception, retrying)
ce = e current_exception = exception
loop do loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => ce) if retrying
ce = e.getNextException() @logger.error("JDBC Exception. Retrying.", :exception => current_exception)
break if ce == nil else
@logger.error("JDBC Exception. No retry.", :exception => current_exception)
end end
current_exception = current_exception.getNextException()
break if current_exception == nil
end
end
def retry_exception?(exception)
retrying = (exception.respond_to? 'getSQLState' and RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState[0,2]))
log_jdbc_exception(exception, retrying)
retrying
end end
end # class LogStash::Outputs::jdbc end # class LogStash::Outputs::jdbc

View File

@@ -1,6 +1,6 @@
Gem::Specification.new do |s| Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc' s.name = 'logstash-output-jdbc'
s.version = "0.2.2.rc1" s.version = "0.2.9"
s.licenses = [ "Apache License (2.0)" ] s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC" s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"

View File

@@ -4,92 +4,91 @@ require "stud/temporary"
require "java" require "java"
describe LogStash::Outputs::Jdbc do describe LogStash::Outputs::Jdbc do
def fetch_log_table_rowcount
# sleep for a second to let the flush happen let(:derby_settings) do
{
"driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"connection_string" => "jdbc:derby:memory:testdb;create=true",
"driver_jar_path" => ENV['JDBC_DERBY_JAR'],
"statement" => [ "insert into log (created_at, message) values(?, ?)", "@timestamp" "message" ]
}
end
context 'rspec setup' do
it 'ensure derby is available' do
j = ENV['JDBC_DERBY_JAR']
expect(j).not_to be_nil, "JDBC_DERBY_JAR not defined, required to run tests"
expect(File.exists?(j)).to eq(true), "JDBC_DERBY_JAR defined, but not valid"
end
end
context 'when initializing' do
it 'shouldn\'t register without a config' do
expect {
LogStash::Plugin.lookup("output", "jdbc").new()
}.to raise_error(LogStash::ConfigurationError)
end
it 'shouldn\'t register with a missing jar file' do
derby_settings['driver_jar_path'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end
it 'shouldn\'t register with a missing jar file' do
derby_settings['connection_string'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end
end
context 'when outputting messages' do
let(:event_fields) do
{ message: 'test-message' }
end
let(:event) { LogStash::Event.new(event_fields) }
let(:plugin) {
# Setup plugin
output = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
output.register
if ENV['JDBC_DEBUG'] == '1'
output.logger.subscribe(STDOUT)
end
# Setup table
c = output.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
stmt.executeUpdate("CREATE table log (created_at timestamp, message varchar(512))")
stmt.close()
c.close()
output
}
it 'should save a event' do
expect { plugin.receive(event) }.to_not raise_error
# Wait for 1 second, for the buffer to flush
sleep 1 sleep 1
stmt = @sql.createStatement() c = plugin.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
rs = stmt.executeQuery("select count(*) as total from log") rs = stmt.executeQuery("select count(*) as total from log")
count = 0 count = 0
while rs.next() while rs.next()
count = rs.getInt("total") count = rs.getInt("total")
end end
stmt.close() stmt.close()
c.close()
return count expect(count).to be > 0
end
let(:base_settings) { {
"driver_jar_path" => @driver_jar_path,
"connection_string" => @test_connection_string,
"username" => ENV['SQL_USERNAME'],
"password" => ENV['SQL_PASSWORD'],
"statement" => [ "insert into log (message) values(?)", "message" ],
"max_pool_size" => 1,
"flush_size" => 1,
"max_flush_exceptions" => 1
} }
let(:test_settings) { {} }
let(:plugin) { LogStash::Outputs::Jdbc.new(base_settings.merge(test_settings)) }
let(:event_fields) { { "message" => "This is a message!" } }
let(:event) { LogStash::Event.new(event_fields) }
before(:all) do
@driver_jar_path = File.absolute_path(ENV['SQL_JAR'])
@test_db_path = File.join(Stud::Temporary.directory, "test.db")
@test_connection_string = "jdbc:sqlite:#{@test_db_path}"
require @driver_jar_path
@sql = java.sql.DriverManager.get_connection(@test_connection_string, ENV['SQL_USERNAME'].to_s, ENV['SQL_PASSWORD'].to_s)
stmt = @sql.createStatement()
stmt.executeUpdate("CREATE table log (host text, timestamp datetime, message text);")
stmt.close()
end
before(:each) do
stmt = @sql.createStatement()
stmt.executeUpdate("delete from log")
stmt.close()
end
after(:all) do
File.unlink(@test_db_path)
Dir.rmdir(File.dirname(@test_db_path))
end
describe "safe statement" do
it "should register without errors" do
expect { plugin.register }.to_not raise_error
end
it "receive event, without error" do
plugin.register
expect { plugin.receive(event) }.to_not raise_error
expect(fetch_log_table_rowcount).to eq(1)
end end
end end
describe "unsafe statement" do
let(:event_fields) {
{ "message" => "This is a message!", "table" => "log" }
}
let(:test_settings) { {
"statement" => [ "insert into %{table} (message) values(?)", "message" ],
"unsafe_statement" => true
} }
it "should register without errors" do
expect { plugin.register }.to_not raise_error
end
it "receive event, without error" do
plugin.register
plugin.receive(event)
expect(fetch_log_table_rowcount).to eq(1)
end
end
end end