48 Commits
v1.4 ... v0.2.8

Author SHA1 Message Date
Karl Southern
ded1106b13 Address issue 44. 2016-06-28 22:38:36 +01:00
Karl Southern
2b27f39088 0.2.7 2016-05-29 13:45:26 +01:00
Karl Southern
7b337a8b91 Backport functionality from v5 branch. 2016-05-29 13:40:47 +01:00
Karl Southern
927e532b2a 0.2.6 2016-05-02 18:11:27 +01:00
Karl Southern
26a32a3f08 README update 2016-04-16 14:48:21 +01:00
Karl Southern
6bb84b165f Fecking version strings 2016-04-16 14:34:34 +01:00
Karl Southern
4e0292d222 rc1 for #36 2016-04-16 14:33:30 +01:00
Karl Southern
909cae01b3 Adds travis-ci badge 2016-04-12 11:20:19 +01:00
Karl Southern
6f2bd2ab3e Fiddling with travis-ci 2016-04-12 11:16:37 +01:00
Karl Southern
c5aeae1b02 Tags and versions are out of sequence. Bugger. 2016-04-11 18:22:11 +01:00
Karl Southern
a7d5a2e623 v0.2.4 2016-04-11 18:11:52 +01:00
Karl
3a64a22ac4 Merge pull request #32 from hordijk/patch-1
Fix toString method of LogStash::Timestamp
2016-04-11 17:21:25 +01:00
hordijk
c4b62769b9 Fix toString method of LogStash::Timestamp
According to LogStash::Timestamp (bb30cc773b/logstash-core-event/lib/logstash/timestamp.rb) doesn't support iso8601, which results in error if the timestamp of logstash is used directly.

If should support to_s of to_iso8601.

 :message=>"Failed to flush outgoing items", :outgoing_count=>1, :exception=>"NoMethodError", :backtrace=>["/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:255:in `add_statement_event_params'", "org/jruby/RubyArray.java:1613:in `each'", "org/jruby/RubyEnumerable.java:974:in `each_with_index'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:251:in `add_statement_event_params'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:203:in `safe_flush'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:200:in `safe_flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:120:in `flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:219:in `buffer_flush'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:216:in `buffer_flush'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.22/lib/stud/buffer.rb:159:in `buffer_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-jdbc-0.2.3/lib/logstash/outputs/jdbc.rb:113:in `receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/outputs/base.rb:83:in `multi_receive'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/outputs/base.rb:83:in `multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/output_delegator.rb:130:in `worker_multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/output_delegator.rb:114:in `multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:305:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:305:in `output_batch'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:236:in `worker_loop'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.0-java/lib/logstash/pipeline.rb:205:in `start_workers'"], :level=>:warn}
2016-04-11 15:19:48 +02:00
Karl Southern
b9e5f64d40 Bump minor version to fix documentation 2016-04-07 08:40:14 +01:00
Karl
c0e358aafb Merge pull request #30 from hordijk/master
Fix incorrect configuration option in the README.md for driver_jar
With thanks to @hordijk
2016-04-07 08:38:27 +01:00
hordijk
442ddf16eb Update README.md
Fix issue in documentation: driver_jar is not supported, it should be driver_jar_path

If driver_jar is used logstash will generate this error message=>"Unknown setting 'driver_path' for jdbc"
Used the driver_jar_path which is used in class LogStash::Outputs::Jdbc instead.
2016-04-07 08:35:58 +02:00
Karl Southern
4e7985dafd Addresses #28 - connection timeout bug 2016-02-16 15:29:08 +00:00
Karl Southern
ae51d77f05 Move examples and split up connection code
Bump version
2015-12-30 12:05:05 +00:00
Karl Southern
529c98aadb Addresses 22 not giving warning about incorrectly configured statements 2015-12-23 10:06:50 +00:00
Karl Southern
bfcd9bf69a Addresses issue 26 2015-12-23 09:42:53 +00:00
Karl
af55fde54a Merge pull request #25 from ebuildy/patch-1
Add Apache Phoenix example from @ebuildy
2015-12-22 09:37:27 +00:00
Thomas Decaux
9e05a01dff Add Apache Phoenix example 2015-12-07 10:07:52 +01:00
Karl
064647607e Merge pull request #24 from dmitryakadiamond/MariaDB-working-example
Maria db working example kindly provided by @dmitryakadiamond
2015-12-04 18:28:58 +00:00
Dmitry Morozov
1ece7f9abc formatting fix 2015-12-04 13:20:58 +00:00
Dmitry Morozov
38b7096419 README.md updated 2015-12-04 13:16:19 +00:00
Karl Southern
eef7473a0b Pushing. 2015-11-22 23:19:29 +00:00
Karl Southern
7a1da5b7cd Fix exceptions counter 2015-11-22 18:57:13 +00:00
Karl Southern
e56176bbea Fix missing nil counter 2015-11-19 14:29:47 +00:00
Karl Southern
49e751a9f8 Have to start some real work. Will complete tests over lunch. 2015-11-18 10:06:11 +00:00
Karl Southern
9804850714 WIP 2015-11-17 10:32:16 +00:00
Karl Southern
a6c669cc52 Adds unsafe_statement support 2015-11-15 12:35:57 +00:00
Karl Southern
e615829310 Stupid gemspec version number bullshit 2015-11-14 20:09:38 +00:00
Karl Southern
362e9ad0a0 Adds connection pooling 2015-11-14 20:06:35 +00:00
Karl Southern
4994cd810b Bump version 2015-11-06 15:02:18 +00:00
Karl Southern
1487b41b3e In retrospective, when would nil ever enter the equation at all? 2015-11-06 15:01:02 +00:00
Karl
dd29d16a31 Update logstash-output-jdbc.gemspec
Bump version.
2015-11-06 14:56:14 +00:00
Karl
ebe5596469 Update jdbc.rb
Removes improper nil check which breaks event sprintf formatting examples
2015-11-06 14:55:24 +00:00
Karl Southern
275cd6fc2f v2.0 tested 2015-10-30 18:29:22 +00:00
Karl Southern
7da6317083 Initial untested v2.0 commit 2015-10-30 17:55:42 +00:00
Karl
470e6309b5 Update README.md 2015-09-04 08:46:50 +01:00
Karl
a7dde52b7b Merge pull request #12 from jMonsinjon/master
Added information for connecting to a MySql Database
2015-09-04 08:44:54 +01:00
Jeremie MONSINJON
53c0f5761d Added information for connecting to a MySql Database 2015-09-03 17:37:51 +02:00
Karl Southern
4287c01037 Small style fix up for patch 2015-07-22 16:30:37 +01:00
Karl
4f93fa7224 Merge pull request #11 from kushtrimjunuzi/master
Handling null values and added boolean data type.
2015-07-22 16:28:06 +01:00
kushtrim junuzi
b52a7358ff Handling null values and added boolean data type.\n Handling jdbc inner exceptions. 2015-07-22 08:06:57 -04:00
Karl Southern
beb8f72560 Fixes up README 2015-06-26 12:57:45 +01:00
Karl Southern
6b6973d6ee Fixes up 1.5+ compatibility 2015-06-26 12:45:45 +01:00
Karl Southern
c790a645f2 Initial commit of 1.5. Untested. Lunchtime is over 2015-06-10 12:31:31 +01:00
22 changed files with 642 additions and 162 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
*.gem
Gemfile.lock
Gemfile.bak
.bundle
.vagrant

8
.travis.yml Normal file
View File

@@ -0,0 +1,8 @@
language: ruby
cache: bundler
rvm:
- jruby
before_script:
- wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
- export JDBC_DERBY_JAR=/tmp/derby.jar
script: bundle exec rspec

28
CHANGELOG.md Normal file
View File

@@ -0,0 +1,28 @@
# Change Log
All notable changes to this project will be documented in this file, from 0.2.0.
## [0.2.7] - 2016-05-29
- Backport retry exception logic from v5 branch
- Backport improved timestamp compatibility from v5 branch
## [0.2.6] - 2016-05-02
- Fix for exception infinite loop
## [0.2.5] - 2016-04-11
### Added
- Basic tests running against DerbyDB
- Fix for converting Logstash::Timestamp to iso8601 from @hordijk
## [0.2.4] - 2016-04-07
- Documentation fixes from @hordijk
## [0.2.3] - 2016-02-16
- Bug fixes
## [0.2.2] - 2015-12-30
- Bug fixes
## [0.2.1] - 2015-12-22
- Support for connection pooling support added through HikariCP
- Support for unsafe statement handling (allowing dynamic queries)
- Altered exception handling to now count sequential flushes with exceptions thrown

2
Gemfile Normal file
View File

@@ -0,0 +1,2 @@
source 'https://rubygems.org'
gemspec

142
README.md
View File

@@ -1,95 +1,69 @@
# logstash-jdbc
JDBC output plugin for Logstash.
# logstash-output-jdbc
[![Build Status](https://travis-ci.org/theangryangel/logstash-output-jdbc.svg?branch=master)](https://travis-ci.org/theangryangel/logstash-output-jdbc)
This plugin is provided as an external plugin and is not part of the Logstash project.
Currently untested with logstash 1.5+. Support is planned.
This plugin allows you to output to SQL databases, using JDBC adapters.
See below for tested adapters, and example configurations.
## Warning
This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
If you do find this works for a JDBC driver without an example, let me know and provide a small example configuration if you can.
This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below.
## ChangeLog
See CHANGELOG.md
## Versions
Released versions are available via rubygems, and typically tagged.
For development:
- See master branch for logstash v5
- See v2.x branch for logstash v2
- See v1.5 branch for logstash v1.5
- See v1.4 branch for logstash 1.4
## Installation
- Copy lib directory contents into your logstash installation.
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- Configure
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
- Now either:
- Use driver_jar_path in your configuraton to specify a path to your jar file
- Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- And then configure (examples below)
## Running tests
At this time tests only run against Derby, in an in-memory database.
Acceptance tests for individual database engines will be added over time.
Assuming valid jruby is installed
- First time, issue `jruby -S bundle install` to install dependencies
- Next, download Derby jar from https://db.apache.org/derby/
- Run the tests `JDBC_DERBY_JAR=path/to/derby.jar jruby -S rspec`
- Optionally add the `JDBC_DEBUG=1` env variable to add logging to stdout
## Configuration options
* driver_class, string, JDBC driver class to load
* connection_string, string, JDBC connection string
* statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim.
* flush_size, number, default = 1000, number of entries to buffer before sending to SQL
* idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time
* max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop
* max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value
| Option | Type | Description | Required? | Default |
| ------ | ---- | ----------- | --------- | ------- |
| driver_class | String | Specify a driver class if autoloading fails | No | |
| driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
| driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
| connection_string | String | JDBC connection URL | Yes | |
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 |
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
## Example configurations
### SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
```
input
{
stdin { }
}
output {
stdout { }
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
jdbc {
driver_class => 'org.sqlite.JDBC'
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### SQL Server
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
```
input
{
stdin { }
}
output {
jdbc {
driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### Postgres
With thanks to [@roflmao](https://github.com/roflmao)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => 'org.postgresql.Driver'
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### Oracle
With thanks to [@josemazo](https://github.com/josemazo)
* Tested with Express Edition 11g Release 2
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => "oracle.jdbc.driver.OracleDriver"
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
/* vim: set ts=4 sw=4 tw=0 :*/
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.

1
Rakefile Normal file
View File

@@ -0,0 +1 @@
require "logstash/devutils/rake"

View File

@@ -0,0 +1,16 @@
# Example: Apache Phoenix (HBase SQL)
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:phoenix:ZOOKEEPER_HOSTNAME"
statement => [ "UPSERT INTO EVENTS log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

16
examples/mariadb.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: MariaDB
* Tested with Ubuntu 14.04.3 LTS, Server version: 10.1.9-MariaDB-1~trusty-log mariadb.org binary distribution
* Tested using https://downloads.mariadb.com/enterprise/tqge-whfa/connectors/java/connector-java-1.3.2/mariadb-java-client-1.3.2.jar (mariadb-java-client-1.3.2.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mariadb://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

16
examples/mysql.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: Mysql
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

20
examples/odps.md Normal file
View File

@@ -0,0 +1,20 @@
# Example: ODPS
With thanks to [@onesuper](https://github.com/onesuper)
```
input
{
stdin { }
}
output {
jdbc {
driver_class => "com.aliyun.odps.jdbc.OdpsDriver"
driver_auto_commit => false
connection_string => "jdbc:odps:http://service.odps.aliyun.com/api?project=meta_dev&loglevel=DEBUG"
username => "abcd"
password => "1234"
max_pool_size => 5
flush_size => 10
statement => [ "INSERT INTO test_logstash VALUES(?, ?, ?);", "host", "@timestamp", "message" ]
}
}
```

16
examples/oracle.md Normal file
View File

@@ -0,0 +1,16 @@
# Example: Oracle
With thanks to [@josemazo](https://github.com/josemazo)
* Tested with Express Edition 11g Release 2
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

15
examples/postgres.md Normal file
View File

@@ -0,0 +1,15 @@
# Example: Postgres
With thanks to [@roflmao](https://github.com/roflmao)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

14
examples/sql-server.md Normal file
View File

@@ -0,0 +1,14 @@
# Example: SQL Server
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

17
examples/sqlite.md Normal file
View File

@@ -0,0 +1,17 @@
# Example: SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
```
input
{
stdin { }
}
output {
stdout { }
jdbc {
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

View File

@@ -0,0 +1,5 @@
# encoding: utf-8
require 'logstash/environment'
root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")

View File

@@ -0,0 +1,17 @@
class RingBuffer < Array
attr_reader :max_size
def initialize(max_size, enum = nil)
@max_size = max_size
enum.each { |e| self << e } if enum
end
def <<(el)
if self.size < @max_size || @max_size.nil?
super
else
self.shift
self.push(el)
end
end
end

View File

@@ -3,23 +3,72 @@ require "logstash/outputs/base"
require "logstash/namespace"
require "stud/buffer"
require "java"
require "logstash-output-jdbc_jars"
require "logstash-output-jdbc_ring-buffer"
# Write events to a SQL engine, using JDBC.
#
# It is upto the user of the plugin to correctly configure the plugin. This
# includes correctly crafting the SQL statement, and matching the number of
# parameters correctly.
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Adds buffer support
include Stud::Buffer
config_name "jdbc"
milestone 1
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
# Driver class
RETRYABLE_SQLSTATE_CLASSES = [
# Classes of retryable SQLSTATE codes
# Not all in the class will be retryable. However, this is the best that
# we've got right now.
# If a custom state code is required, set it in retry_sql_states.
'08', # Connection Exception
'24', # Invalid Cursor State (Maybe retry-able in some circumstances)
'25', # Invalid Transaction State
'40', # Transaction Rollback
'53', # Insufficient Resources
'54', # Program Limit Exceeded (MAYBE)
'55', # Object Not In Prerequisite State
'57', # Operator Intervention
'58', # System Error
].freeze
config_name "jdbc"
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
config :driver_class, :validate => :string
# connection string
# Does the JDBC driver support autocommit?
config :driver_auto_commit, :validate => :boolean, :default => true, :required => true
# Where to find the jar
# Defaults to not required, and to the original behaviour
config :driver_jar_path, :validate => :string, :required => false
# jdbc connection string
config :connection_string, :validate => :string, :required => true
# jdbc username - optional, maybe in the connection string
config :username, :validate => :string, :required => false
# jdbc password - optional, maybe in the connection string
config :password, :validate => :string, :required => false
# [ "insert into table (message) values(?)", "%{message}" ]
config :statement, :validate => :array, :required => true
# If this is an unsafe statement, use event.sprintf
# This also has potential performance penalties due to having to create a
# new statement for each event, rather than adding to the batch and issuing
# multiple inserts in 1 go
config :unsafe_statement, :validate => :boolean, :default => false
# Number of connections in the pool to maintain
config :max_pool_size, :validate => :number, :default => 5
# Connection timeout
config :connection_timeout, :validate => :number, :default => 10000
# We buffer a certain number of events before flushing that out to SQL.
# This setting controls how many events will be buffered before sending a
# batch of events.
@@ -36,47 +85,43 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# a timely manner.
#
# If you change this value please ensure that you change
# max_repeat_exceptions_time accordingly.
# max_flush_exceptions accordingly.
config :idle_flush_time, :validate => :number, :default => 1
# Maximum number of repeating (sequential) exceptions, before we stop retrying
# Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
# If set to < 1, then it will infinitely retry.
config :max_repeat_exceptions, :validate => :number, :default => 5
#
# You should carefully tune this in relation to idle_flush_time if your SQL server
# is not highly available.
# i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
# longer than 200 seconds to reboot, then logstash will stop.
config :max_flush_exceptions, :validate => :number, :default => 0
# The max number of seconds since the last exception, before we consider it
# a different cause.
# This value should be carefully considered in respect to idle_flush_time.
config :max_repeat_exceptions_time, :validate => :number, :default => 30
config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
public
def register
@logger.info("JDBC - Starting up")
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
@logger.info(jarpath)
Dir[jarpath].each do |jar|
@logger.debug("JDBC - Loaded jar", :jar => jar)
require jar
end
load_jar_files!
import @driver_class
driver = Object.const_get(@driver_class[@driver_class.rindex('.') + 1, @driver_class.length]).new
@connection = driver.connect(@connection_string, java.util.Properties.new)
@logger.debug("JDBC - Created connection", :driver => driver, :connection => @connection)
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
if (@flush_size > 1000)
@logger.warn("JDBC - Flush size is set to > 1000. May have performance penalties, depending on your SQL engine.")
@logger.warn("JDBC - Flush size is set to > 1000")
end
@repeat_exception_count = 0
@last_exception_time = Time.now
if (@max_repeat_exceptions > 0) and ((@idle_flush_time * @max_repeat_exceptions) > @max_repeat_exceptions_time)
@logger.warn("JDBC - max_repeat_exceptions_time is set such that it may still permit a looping exception. You probably changed idle_flush_time. Considering increasing max_repeat_exceptions_time.")
if @statement.length < 1
@logger.error("JDBC - No statement provided. Configuration error.")
end
if (!@unsafe_statement and @statement.length < 2)
@logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
end
setup_and_test_pool!
buffer_initialize(
:max_items => @flush_size,
:max_interval => @idle_flush_time,
@@ -85,72 +130,214 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end
def receive(event)
return unless output?(event)
return unless output?(event) or event.cancelled?
return unless @statement.length > 0
buffer_receive(event)
end
def flush(events, teardown=false)
statement = @connection.prepareStatement(@statement[0])
events.each do |event|
next if @statement.length < 2
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].iso8601())
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
else
statement.setString(idx + 1, event.sprintf(i))
end
end
statement.addBatch()
if @unsafe_statement == true
unsafe_flush(events, teardown)
else
safe_flush(events, teardown)
end
begin
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
statement.executeBatch()
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# Since the exceutebatch failed this should mean any events failed to be
# inserted will be re-run. We're going to log it for the lols anyway.
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
end
statement.close()
end
def on_flush_error(e)
return if @max_repeat_exceptions < 1
return if @max_flush_exceptions < 1
if @last_exception == e.to_s
@repeat_exception_count += 1
else
@repeat_exception_count = 0
@exceptions_tracker << e.class
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
@logger.error("JDBC - max_flush_exceptions has been reached")
log_jdbc_exception(e)
raise LogStash::ShutdownSignal.new
end
if (@repeat_exception_count >= @max_repeat_exceptions) and (Time.now - @last_exception_time) < @max_repeat_exceptions_time
@logger.error("JDBC - Exception repeated more than the maximum configured", :exception => e, :max_repeat_exceptions => @max_repeat_exceptions, :max_repeat_exceptions_time => @max_repeat_exceptions_time)
raise e
end
@last_exception_time = Time.now
@last_exception = e.to_s
end
def teardown
buffer_flush(:final => true)
@connection.close()
@pool.close()
super
end
private
def setup_and_test_pool!
# Setup pool
@pool = Java::ComZaxxerHikari::HikariDataSource.new
@pool.setAutoCommit(@driver_auto_commit)
@pool.setDriverClassName(@driver_class) if @driver_class
@pool.setJdbcUrl(@connection_string)
@pool.setUsername(@username) if @username
@pool.setPassword(@password) if @password
@pool.setMaximumPoolSize(@max_pool_size)
@pool.setConnectionTimeout(@connection_timeout)
validate_connection_timeout = (@connection_timeout / 1000) / 2
# Test connection
test_connection = @pool.getConnection()
unless test_connection.isValid(validate_connection_timeout)
@logger.error("JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.")
end
test_connection.close()
end
def load_jar_files!
# Load jar from driver path
unless @driver_jar_path.nil?
raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path
require @driver_jar_path
return
end
# Revert original behaviour of loading from vendor directory
# if no path given
if ENV['LOGSTASH_HOME']
jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
else
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
end
@logger.debug("JDBC - jarpath", path: jarpath)
jars = Dir[jarpath]
raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty?
jars.each do |jar|
@logger.debug("JDBC - Loaded jar", :jar => jar)
require jar
end
end
def safe_flush(events, teardown=false)
connection = nil
statement = nil
begin
connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e)
raise
end
begin
statement = connection.prepareStatement(@statement[0])
events.each do |event|
next if event.cancelled?
next if @statement.length < 2
statement = add_statement_event_params(statement, event)
statement.addBatch()
end
statement.executeBatch()
statement.close()
@exceptions_tracker << nil
rescue => e
log_jdbc_exception(e)
if retry_exception?(e)
raise
end
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
end
end
def unsafe_flush(events, teardown=false)
connection = nil
statement = nil
begin
connection = @pool.getConnection()
rescue => e
log_jdbc_exception(e)
raise
end
begin
events.each do |event|
next if event.cancelled?
statement = connection.prepareStatement(event.sprintf(@statement[0]))
statement = add_statement_event_params(statement, event) if @statement.length > 1
statement.execute()
# cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line
event.cancel
@exceptions_tracker << nil
end
rescue => e
log_jdbc_exception(e)
if retry_exception?(e)
raise
end
ensure
statement.close() unless statement.nil?
connection.close() unless connection.nil?
end
end
def add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time
# See LogStash::Timestamp, below, for the why behind strftime.
statement.setString(idx + 1, event[i].strftime(STRFTIME_FMT))
when LogStash::Timestamp
# XXX: Using setString as opposed to setTimestamp, because setTimestamp
# doesn't behave correctly in some drivers (Known: sqlite)
#
# Additionally this does not use `to_iso8601`, since some SQL databases
# choke on the 'T' in the string (Known: Derby).
#
# strftime appears to be the most reliable across drivers.
statement.setString(idx + 1, event[i].time.strftime(STRFTIME_FMT))
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
when true
statement.setBoolean(idx + 1, true)
when false
statement.setBoolean(idx + 1, false)
else
if event[i].nil? and i =~ /%\{/
statement.setString(idx + 1, event.sprintf(i))
else
statement.setString(idx + 1, nil)
end
end
end
statement
end
def log_jdbc_exception(exception)
current_exception = exception
loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => current_exception)
current_exception = current_exception.getNextException()
break if current_exception == nil
end
end
def retry_exception?(exception)
if exception.respond_to? 'getSQLState'
return RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState[0,2])
end
true
end
end # class LogStash::Outputs::jdbc

View File

@@ -0,0 +1,29 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc'
s.version = "0.2.8"
s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
s.authors = ["the_angry_angel"]
s.email = "karl+github@theangryangel.co.uk"
s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
s.require_paths = [ "lib" ]
# Files
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
# Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
s.add_runtime_dependency 'stud'
s.add_runtime_dependency "logstash-codec-plain"
s.add_development_dependency "logstash-devutils"
s.post_install_message = "logstash-output-jdbc 0.2.0 introduces several new features - please ensure you check the documentation in the README file"
end

94
spec/outputs/jdbc_spec.rb Normal file
View File

@@ -0,0 +1,94 @@
require "logstash/devutils/rspec/spec_helper"
require "logstash/outputs/jdbc"
require "stud/temporary"
require "java"
describe LogStash::Outputs::Jdbc do
let(:derby_settings) do
{
"driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"connection_string" => "jdbc:derby:memory:testdb;create=true",
"driver_jar_path" => ENV['JDBC_DERBY_JAR'],
"statement" => [ "insert into log (created_at, message) values(?, ?)", "@timestamp" "message" ]
}
end
context 'rspec setup' do
it 'ensure derby is available' do
j = ENV['JDBC_DERBY_JAR']
expect(j).not_to be_nil, "JDBC_DERBY_JAR not defined, required to run tests"
expect(File.exists?(j)).to eq(true), "JDBC_DERBY_JAR defined, but not valid"
end
end
context 'when initializing' do
it 'shouldn\'t register without a config' do
expect {
LogStash::Plugin.lookup("output", "jdbc").new()
}.to raise_error(LogStash::ConfigurationError)
end
it 'shouldn\'t register with a missing jar file' do
derby_settings['driver_jar_path'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end
it 'shouldn\'t register with a missing jar file' do
derby_settings['connection_string'] = nil
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
expect { plugin.register }.to raise_error
end
end
context 'when outputting messages' do
let(:event_fields) do
{ message: 'test-message' }
end
let(:event) { LogStash::Event.new(event_fields) }
let(:plugin) {
# Setup plugin
output = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
output.register
if ENV['JDBC_DEBUG'] == '1'
output.logger.subscribe(STDOUT)
end
# Setup table
c = output.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
stmt.executeUpdate("CREATE table log (created_at timestamp, message varchar(512))")
stmt.close()
c.close()
output
}
it 'should save a event' do
expect { plugin.receive(event) }.to_not raise_error
# Wait for 1 second, for the buffer to flush
sleep 1
c = plugin.instance_variable_get(:@pool).getConnection()
stmt = c.createStatement()
rs = stmt.executeQuery("select count(*) as total from log")
count = 0
while rs.next()
count = rs.getInt("total")
end
stmt.close()
c.close()
expect(count).to be > 0
end
end
end

Binary file not shown.

Binary file not shown.

Binary file not shown.