39 Commits
v1.5 ... v0.2.8

Author SHA1 Message Date
Karl Southern
ded1106b13 Address issue 44. 2016-06-28 22:38:36 +01:00
Karl Southern
2b27f39088 0.2.7 2016-05-29 13:45:26 +01:00
Karl Southern
7b337a8b91 Backport functionality from v5 branch. 2016-05-29 13:40:47 +01:00
Karl Southern
927e532b2a 0.2.6 2016-05-02 18:11:27 +01:00
Karl Southern
26a32a3f08 README update 2016-04-16 14:48:21 +01:00
Karl Southern
6bb84b165f Fecking version strings 2016-04-16 14:34:34 +01:00
Karl Southern
4e0292d222 rc1 for #36 2016-04-16 14:33:30 +01:00
Karl Southern
909cae01b3 Adds travis-ci badge 2016-04-12 11:20:19 +01:00
Karl Southern
6f2bd2ab3e Fiddling with travis-ci 2016-04-12 11:16:37 +01:00
Karl Southern
c5aeae1b02 Tags and versions are out of sequence. Bugger. 2016-04-11 18:22:11 +01:00
Karl Southern
a7d5a2e623 v0.2.4 2016-04-11 18:11:52 +01:00
Karl
3a64a22ac4 Merge pull request #32 from hordijk/patch-1
Fix toString method of LogStash::Timestamp
2016-04-11 17:21:25 +01:00
hordijk
c4b62769b9 Fix toString method of LogStash::Timestamp
According to LogStash::Timestamp (bb30cc773b/logstash-core-event/lib/logstash/timestamp.rb) doesn't support iso8601, which results in error if the timestamp of logstash is used directly.

If should support to_s of to_iso8601.

 :message=>"Failed to flush outgoing items", :outgoing_count=>1, :exception=>"NoMethodError", :backtrace=>["/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:255:in `add_statement_event_params'", "org/jruby/RubyArray.java:1613:in `each'", "org/jruby/RubyEnumerable.java:974:in `each_with_index'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:251:in `add_statement_event_params'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:203:in `safe_flush'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:200:in `safe_flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:120:in `flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:219:in `buffer_flush'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:216:in `buffer_flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:159:in `buffer_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:113:in `receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/outputs/base.rb:83:in `multi_receive'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/outputs/base.rb:83:in `multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/output_delegator.rb:130:in `worker_multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/output_delegator.rb:114:in `multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:305:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:305:in `output_batch'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:236:in `worker_loop'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:205:in `start_workers'"], :level=>:warn}
2016-04-11 15:19:48 +02:00
Karl Southern
b9e5f64d40 Bump minor version to fix documentation 2016-04-07 08:40:14 +01:00
Karl
c0e358aafb Merge pull request #30 from hordijk/master
Fix incorrect configuration option in the README.md for driver_jar
With thanks to @hordijk
2016-04-07 08:38:27 +01:00
hordijk
442ddf16eb Update README.md
Fix issue in documentation: driver_jar is not supported, it should be driver_jar_path

If driver_jar is used logstash will generate this error message=>"Unknown setting 'driver_path' for jdbc"
Used the driver_jar_path which is used in class LogStash::Outputs::Jdbc instead.
2016-04-07 08:35:58 +02:00
Karl Southern
4e7985dafd Addresses #28 - connection timeout bug 2016-02-16 15:29:08 +00:00
Karl Southern
ae51d77f05 Move examples and split up connection code
Bump version
2015-12-30 12:05:05 +00:00
Karl Southern
529c98aadb Addresses 22 not giving warning about incorrectly configured statements 2015-12-23 10:06:50 +00:00
Karl Southern
bfcd9bf69a Addresses issue 26 2015-12-23 09:42:53 +00:00
Karl
af55fde54a Merge pull request #25 from ebuildy/patch-1
Add Apache Phoenix example from @ebuildy
2015-12-22 09:37:27 +00:00
Thomas Decaux
9e05a01dff Add Apache Phoenix example 2015-12-07 10:07:52 +01:00
Karl
064647607e Merge pull request #24 from dmitryakadiamond/MariaDB-working-example
Maria db working example kindly provided by @dmitryakadiamond
2015-12-04 18:28:58 +00:00
Dmitry Morozov
1ece7f9abc formatting fix 2015-12-04 13:20:58 +00:00
Dmitry Morozov
38b7096419 README.md updated 2015-12-04 13:16:19 +00:00
Karl Southern
eef7473a0b Pushing. 2015-11-22 23:19:29 +00:00
Karl Southern
7a1da5b7cd Fix exceptions counter 2015-11-22 18:57:13 +00:00
Karl Southern
e56176bbea Fix missing nil counter 2015-11-19 14:29:47 +00:00
Karl Southern
49e751a9f8 Have to start some real work. Will complete tests over lunch. 2015-11-18 10:06:11 +00:00
Karl Southern
9804850714 WIP 2015-11-17 10:32:16 +00:00
Karl Southern
a6c669cc52 Adds unsafe_statement support 2015-11-15 12:35:57 +00:00
Karl Southern
e615829310 Stupid gemspec version number bullshit 2015-11-14 20:09:38 +00:00
Karl Southern
362e9ad0a0 Adds connection pooling 2015-11-14 20:06:35 +00:00
Karl Southern
4994cd810b Bump version 2015-11-06 15:02:18 +00:00
Karl Southern
1487b41b3e In retrospective, when would nil ever enter the equation at all? 2015-11-06 15:01:02 +00:00
Karl
dd29d16a31 Update logstash-output-jdbc.gemspec
Bump version.
2015-11-06 14:56:14 +00:00
Karl
ebe5596469 Update jdbc.rb
Removes improper nil check which breaks event sprintf formatting examples
2015-11-06 14:55:24 +00:00
Karl Southern
275cd6fc2f v2.0 tested 2015-10-30 18:29:22 +00:00
Karl Southern
7da6317083 Initial untested v2.0 commit 2015-10-30 17:55:42 +00:00
20 changed files with 605 additions and 197 deletions

2
.gitignore vendored
View File

@@ -2,4 +2,4 @@
Gemfile.lock
Gemfile.bak
.bundle
vendor
.vagrant

8
.travis.yml Normal file
View File

@@ -0,0 +1,8 @@
language: ruby
cache: bundler
rvm:
- jruby
before_script:
- wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
- export JDBC_DERBY_JAR=/tmp/derby.jar
script: bundle exec rspec

28
CHANGELOG.md Normal file
View File

@@ -0,0 +1,28 @@
# Change Log
All notable changes to this project will be documented in this file, from 0.2.0.
## [0.2.7] - 2016-05-29
- Backport retry exception logic from v5 branch
- Backport improved timestamp compatibility from v5 branch
## [0.2.6] - 2016-05-02
- Fix for exception infinite loop
## [0.2.5] - 2016-04-11
### Added
- Basic tests running against DerbyDB
- Fix for converting Logstash::Timestamp to iso8601 from @hordijk
## [0.2.4] - 2016-04-07
- Documentation fixes from @hordijk
## [0.2.3] - 2016-02-16
- Bug fixes
## [0.2.2] - 2015-12-30
- Bug fixes
## [0.2.1] - 2015-12-22
- Support for connection pooling support added through HikariCP
- Support for unsafe statement handling (allowing dynamic queries)
- Altered exception handling to now count sequential flushes with exceptions thrown

144
README.md
View File

@@ -1,4 +1,7 @@
# logstash-output-jdbc
[![Build Status](https://travis-ci.org/theangryangel/logstash-output-jdbc.svg?branch=master)](https://travis-ci.org/theangryangel/logstash-output-jdbc)
This plugin is provided as an external plugin and is not part of the Logstash project.
This plugin allows you to output to SQL databases, using JDBC adapters.
@@ -6,112 +9,61 @@ See below for tested adapters, and example configurations.
This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
If you do find this works for a JDBC driver without an example, let me know and provide a small example configuration if you can.
This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below.
## ChangeLog
See CHANGELOG.md
## Versions
- See master branch for logstash v1.5
Released versions are available via rubygems, and typically tagged.
For development:
- See master branch for logstash v5
- See v2.x branch for logstash v2
- See v1.5 branch for logstash v1.5
- See v1.4 branch for logstash 1.4
## Installation
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- Configure
- Now either:
- Use driver_jar_path in your configuraton to specify a path to your jar file
- Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- And then configure (examples below)
## Running tests
At this time tests only run against Derby, in an in-memory database.
Acceptance tests for individual database engines will be added over time.
Assuming valid jruby is installed
- First time, issue `jruby -S bundle install` to install dependencies
- Next, download Derby jar from https://db.apache.org/derby/
- Run the tests `JDBC_DERBY_JAR=path/to/derby.jar jruby -S rspec`
- Optionally add the `JDBC_DEBUG=1` env variable to add logging to stdout
## Configuration options
* driver_class, string, JDBC driver class to load
* connection_string, string, JDBC connection string
* statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim.
* flush_size, number, default = 1000, number of entries to buffer before sending to SQL
* idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time
* max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop
* max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value
| Option | Type | Description | Required? | Default |
| ------ | ---- | ----------- | --------- | ------- |
| driver_class | String | Specify a driver class if autoloading fails | No | |
| driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
| driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
| connection_string | String | JDBC connection URL | Yes | |
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 |
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
## Example configurations
### SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
```
input
{
stdin { }
}
output {
stdout { }
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
jdbc {
driver_class => 'org.sqlite.JDBC'
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### SQL Server
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
```
input
{
stdin { }
}
output {
jdbc {
driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### Postgres
With thanks to [@roflmao](https://github.com/roflmao)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => 'org.postgresql.Driver'
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### Oracle
With thanks to [@josemazo](https://github.com/josemazo)
* Tested with Express Edition 11g Release 2
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => "oracle.jdbc.driver.OracleDriver"
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### Mysql
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.

View File

@@ -0,0 +1,16 @@
# Example: Apache Phoenix (HBase SQL)
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:phoenix:ZOOKEEPER_HOSTNAME"
statement => [ "UPSERT INTO EVENTS log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

16
examples/mariadb.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: MariaDB
* Tested with Ubuntu 14.04.3 LTS, Server version: 10.1.9-MariaDB-1~trusty-log mariadb.org binary distribution
* Tested using https://downloads.mariadb.com/enterprise/tqge-whfa/connectors/java/connector-java-1.3.2/mariadb-java-client-1.3.2.jar (mariadb-java-client-1.3.2.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mariadb://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

16
examples/mysql.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: Mysql
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

20
examples/odps.md Normal file
View File

@@ -0,0 +1,20 @@
# Example: ODPS
With thanks to [@onesuper](https://github.com/onesuper)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => "com.aliyun.odps.jdbc.OdpsDriver"
driver_auto_commit => false
connection_string => "jdbc:odps:http://service.odps.aliyun.com/api?project=meta_dev&loglevel=DEBUG"
username => "abcd"
password => "1234"
max_pool_size => 5
flush_size => 10
statement => [ "INSERT INTO test_logstash VALUES(?, ?, ?);", "host", "@timestamp", "message" ]
}
}
```

16
examples/oracle.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: Oracle
With thanks to [@josemazo](https://github.com/josemazo)
* Tested with Express Edition 11g Release 2
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

15
examples/postgres.md Normal file
View File

@@ -0,0 +1,15 @@
# Example: Postgres
With thanks to [@roflmao](https://github.com/roflmao)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

14
examples/sql-server.md Normal file
View File

@@ -0,0 +1,14 @@
# Example: SQL Server
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

17
examples/sqlite.md Normal file
View File

@@ -0,0 +1,17 @@
# Example: SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
```
input
{
stdin { }
}
output {
stdout { }
jdbc {
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

View File

@@ -0,0 +1,5 @@
# encoding: utf-8
require 'logstash/environment'
root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")

View File

@@ -0,0 +1,17 @@
class RingBuffer < Array
attr_reader :max_size
def initialize(max_size, enum = nil)
@max_size = max_size
enum.each { |e| self << e } if enum
end
def <<(el)
if self.size < @max_size || @max_size.nil?
super
else
self.shift
self.push(el)
end
end
end

View File

@@ -3,22 +3,72 @@ require "logstash/outputs/base"
require "logstash/namespace"
require "stud/buffer"
require "java"
require "logstash-output-jdbc_jars"
require "logstash-output-jdbc_ring-buffer"
# Write events to a SQL engine, using JDBC.
#
# It is upto the user of the plugin to correctly configure the plugin. This
# includes correctly crafting the SQL statement, and matching the number of
# parameters correctly.
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Adds buffer support
include Stud::Buffer
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
RETRYABLE_SQLSTATE_CLASSES = [
# Classes of retryable SQLSTATE codes
# Not all in the class will be retryable. However, this is the best that
# we've got right now.
# If a custom state code is required, set it in retry_sql_states.
'08', # Connection Exception
'24', # Invalid Cursor State (Maybe retry-able in some circumstances)
'25', # Invalid Transaction State
'40', # Transaction Rollback
'53', # Insufficient Resources
'54', # Program Limit Exceeded (MAYBE)
'55', # Object Not In Prerequisite State
'57', # Operator Intervention
'58', # System Error
].freeze
config_name "jdbc"
# Driver class
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
config :driver_class, :validate => :string
# connection string
# Does the JDBC driver support autocommit?
config :driver_auto_commit, :validate => :boolean, :default => true, :required => true
# Where to find the jar
# Defaults to not required, and to the original behaviour
config :driver_jar_path, :validate => :string, :required => false
# jdbc connection string
config :connection_string, :validate => :string, :required => true
# jdbc username - optional, maybe in the connection string
config :username, :validate => :string, :required => false
# jdbc password - optional, maybe in the connection string
config :password, :validate => :string, :required => false
# [ "insert into table (message) values(?)", "%{message}" ]
config :statement, :validate => :array, :required => true
# If this is an unsafe statement, use event.sprintf
# This also has potential performance penalties due to having to create a
# new statement for each event, rather than adding to the batch and issuing
# multiple inserts in 1 go
config :unsafe_statement, :validate => :boolean, :default => false
# Number of connections in the pool to maintain
config :max_pool_size, :validate => :number, :default => 5
# Connection timeout
config :connection_timeout, :validate => :number, :default => 10000
# We buffer a certain number of events before flushing that out to SQL.
# This setting controls how many events will be buffered before sending a
# batch of events.
@@ -35,23 +85,120 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# a timely manner.
#
# If you change this value please ensure that you change
# max_repeat_exceptions_time accordingly.
# max_flush_exceptions accordingly.
config :idle_flush_time, :validate => :number, :default => 1
# Maximum number of repeating (sequential) exceptions, before we stop retrying
# Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
# If set to < 1, then it will infinitely retry.
config :max_repeat_exceptions, :validate => :number, :default => 5
#
# You should carefully tune this in relation to idle_flush_time if your SQL server
# is not highly available.
# i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
# longer than 200 seconds to reboot, then logstash will stop.
config :max_flush_exceptions, :validate => :number, :default => 0
# The max number of seconds since the last exception, before we consider it
# a different cause.
# This value should be carefully considered in respect to idle_flush_time.
config :max_repeat_exceptions_time, :validate => :number, :default => 30
config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
public
def register
@logger.info("JDBC - Starting up")
load_jar_files!
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
if (@flush_size > 1000)
@logger.warn("JDBC - Flush size is set to > 1000")
end
if @statement.length < 1
@logger.error("JDBC - No statement provided. Configuration error.")
end
if (!@unsafe_statement and @statement.length < 2)
@logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
end
setup_and_test_pool!
buffer_initialize(
:max_items => @flush_size,
:max_interval => @idle_flush_time,
:logger => @logger
)
end
def receive(event)
return unless output?(event) or event.cancelled?
return unless @statement.length > 0
buffer_receive(event)
end
def flush(events, teardown=false)
if @unsafe_statement == true
unsafe_flush(events, teardown)
else
safe_flush(events, teardown)
end
end
def on_flush_error(e)
return if @max_flush_exceptions < 1
@exceptions_tracker << e.class
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
@logger.error("JDBC - max_flush_exceptions has been reached")
log_jdbc_exception(e)
raise LogStash::ShutdownSignal.new
end
end
def teardown
buffer_flush(:final => true)
@pool.close()
super
end
private
def setup_and_test_pool!
# Setup pool
@pool = Java::ComZaxxerHikari::HikariDataSource.new
@pool.setAutoCommit(@driver_auto_commit)
@pool.setDriverClassName(@driver_class) if @driver_class
@pool.setJdbcUrl(@connection_string)
@pool.setUsername(@username) if @username
@pool.setPassword(@password) if @password
@pool.setMaximumPoolSize(@max_pool_size)
@pool.setConnectionTimeout(@connection_timeout)
validate_connection_timeout = (@connection_timeout / 1000) / 2
# Test connection
test_connection = @pool.getConnection()
unless test_connection.isValid(validate_connection_timeout)
@logger.error("JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.")
end
test_connection.close()
end
def load_jar_files!
# Load jar from driver path
unless @driver_jar_path.nil?
raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path
require @driver_jar_path
return
end
# Revert original behaviour of loading from vendor directory
# if no path given
if ENV['LOGSTASH_HOME']
jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
else
@@ -67,108 +214,130 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
@logger.debug("JDBC - Loaded jar", :jar => jar)
require jar
end
import @driver_class
driver = Object.const_get(@driver_class[@driver_class.rindex('.') + 1, @driver_class.length]).new
@connection = driver.connect(@connection_string, java.util.Properties.new)
@logger.debug("JDBC - Created connection", :driver => driver, :connection => @connection)
if (@flush_size > 1000)
@logger.warn("JDBC - Flush size is set to > 1000. May have performance penalties, depending on your SQL engine.")
end
@repeat_exception_count = 0
@last_exception_time = Time.now
if (@max_repeat_exceptions > 0) and ((@idle_flush_time * @max_repeat_exceptions) > @max_repeat_exceptions_time)
@logger.warn("JDBC - max_repeat_exceptions_time is set such that it may still permit a looping exception. You probably changed idle_flush_time. Considering increasing max_repeat_exceptions_time.")
end
buffer_initialize(
:max_items => @flush_size,
:max_interval => @idle_flush_time,
:logger => @logger
)
end
def receive(event)
return unless output?(event)
return unless @statement.length > 0
def safe_flush(events, teardown=false)
connection = nil
statement = nil
buffer_receive(event)
end
def flush(events, teardown=false)
statement = @connection.prepareStatement(@statement[0])
events.each do |event|
next if @statement.length < 2
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time, LogStash::Timestamp
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].iso8601())
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
when true
statement.setBoolean(idx + 1, true)
when false
statement.setBoolean(idx + 1, false)
when nil
statement.setString(idx + 1, nil)
else
statement.setString(idx + 1, event.sprintf(i))
end
end
statement.addBatch()
begin
connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e)
raise
end
begin
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
statement = connection.prepareStatement(@statement[0])
events.each do |event|
next if event.cancelled?
next if @statement.length < 2
statement = add_statement_event_params(statement, event)
statement.addBatch()
end
statement.executeBatch()
statement.close()
@exceptions_tracker << nil
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# Since the exceutebatch failed this should mean any events failed to be
# inserted will be re-run. We're going to log it for the lols anyway.
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
if e.getNextException() != nil
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
end
log_jdbc_exception(e)
if retry_exception?(e)
raise
end
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
end
statement.close()
end
def on_flush_error(e)
return if @max_repeat_exceptions < 1
if @last_exception == e.to_s
@repeat_exception_count += 1
else
@repeat_exception_count = 0
def unsafe_flush(events, teardown=false)
connection = nil
statement = nil
begin
connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e)
raise
end
if (@repeat_exception_count >= @max_repeat_exceptions) and (Time.now - @last_exception_time) < @max_repeat_exceptions_time
@logger.error("JDBC - Exception repeated more than the maximum configured", :exception => e, :max_repeat_exceptions => @max_repeat_exceptions, :max_repeat_exceptions_time => @max_repeat_exceptions_time)
raise e
begin
events.each do |event|
next if event.cancelled?
statement = connection.prepareStatement(event.sprintf(@statement[0]))
statement = add_statement_event_params(statement, event) if @statement.length > 1
statement.execute()
# cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line
event.cancel
@exceptions_tracker << nil
end
rescue => e
log_jdbc_exception(e)
if retry_exception?(e)
raise
end
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
end
end
def add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time
# See LogStash::Timestamp, below, for the why behind strftime.
statement.setString(idx + 1, event[i].strftime(STRFTIME_FMT))
when LogStash::Timestamp
# XXX: Using setString as opposed to setTimestamp, because setTimestamp
# doesn't behave correctly in some drivers (Known: sqlite)
#
# Additionally this does not use `to_iso8601`, since some SQL databases
# choke on the 'T' in the string (Known: Derby).
#
# strftime appears to be the most reliable across drivers.
statement.setString(idx + 1, event[i].time.strftime(STRFTIME_FMT))
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
when true
statement.setBoolean(idx + 1, true)
when false
statement.setBoolean(idx + 1, false)
else
if event[i].nil? and i =~ /%\{/
statement.setString(idx + 1, event.sprintf(i))
else
statement.setString(idx + 1, nil)
end
end
end
@last_exception_time = Time.now
@last_exception = e.to_s
statement
end
def teardown
buffer_flush(:final => true)
@connection.close()
super
def log_jdbc_exception(exception)
current_exception = exception
loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => current_exception)
current_exception = current_exception.getNextException()
break if current_exception == nil
end
end
def retry_exception?(exception)
if exception.respond_to? 'getSQLState'
return RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState[0,2])
end
true
end
end # class LogStash::Outputs::jdbc

View File

@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc'
s.version = "0.1.1"
s.version = "0.2.8"
s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
@@ -10,7 +10,8 @@ Gem::Specification.new do |s|
s.require_paths = [ "lib" ]
# Files
s.files = `git ls-files`.split($\)
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
@@ -18,7 +19,11 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
# Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 1.4.0", "< 2.0.0"
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
s.add_runtime_dependency 'stud'
s.add_runtime_dependency "logstash-codec-plain"
s.add_development_dependency "logstash-devutils"
s.post_install_message = "logstash-output-jdbc 0.2.0 introduces several new features - please ensure you check the documentation in the README file"
end

94
spec/outputs/jdbc_spec.rb Normal file
View File

@@ -0,0 +1,94 @@
require "logstash/devutils/rspec/spec_helper"
require "logstash/outputs/jdbc"
require "stud/temporary"
require "java"
describe LogStash::Outputs::Jdbc do
let(:derby_settings) do
{
"driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"connection_string" => "jdbc:derby:memory:testdb;create=true",
"driver_jar_path" => ENV['JDBC_DERBY_JAR'],
"statement" => [ "insert into log (created_at, message) values(?, ?)", "@timestamp" "message" ]
}
end
context 'rspec setup' do
it 'ensure derby is available' do
j = ENV['JDBC_DERBY_JAR']
expect(j).not_to be_nil, "JDBC_DERBY_JAR not defined, required to run tests"
expect(File.exists?(j)).to eq(true), "JDBC_DERBY_JAR defined, but not valid"
end
end
context 'when initializing' do
it 'shouldn\'t register without a config' do
expect {
LogStash::Plugin.lookup("output", "jdbc").new()
}.to raise_error(LogStash::ConfigurationError)
end
it 'shouldn\'t register with a missing jar file' do
derby_settings['driver_jar_path'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end
it 'shouldn\'t register with a missing jar file' do
derby_settings['connection_string'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end
end
context 'when outputting messages' do
let(:event_fields) do
{ message: 'test-message' }
end
let(:event) { LogStash::Event.new(event_fields) }
let(:plugin) {
# Setup plugin
output = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
output.register
if ENV['JDBC_DEBUG'] == '1'
output.logger.subscribe(STDOUT)
end
# Setup table
c = output.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
stmt.executeUpdate("CREATE table log (created_at timestamp, message varchar(512))")
stmt.close()
c.close()
output
}
it 'should save a event' do
expect { plugin.receive(event) }.to_not raise_error
# Wait for 1 second, for the buffer to flush
sleep 1
c = plugin.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
rs = stmt.executeQuery("select count(*) as total from log")
count = 0
while rs.next()
count = rs.getInt("total")
end
stmt.close()
c.close()
expect(count).to be > 0
end
end
end

Binary file not shown.

Binary file not shown.

Binary file not shown.