Compare commits
47 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b27f39088 | ||
|
|
7b337a8b91 | ||
|
|
927e532b2a | ||
|
|
26a32a3f08 | ||
|
|
6bb84b165f | ||
|
|
4e0292d222 | ||
|
|
909cae01b3 | ||
|
|
6f2bd2ab3e | ||
|
|
c5aeae1b02 | ||
|
|
a7d5a2e623 | ||
|
|
3a64a22ac4 | ||
|
|
c4b62769b9 | ||
|
|
b9e5f64d40 | ||
|
|
c0e358aafb | ||
|
|
442ddf16eb | ||
|
|
4e7985dafd | ||
|
|
ae51d77f05 | ||
|
|
529c98aadb | ||
|
|
bfcd9bf69a | ||
|
|
af55fde54a | ||
|
|
9e05a01dff | ||
|
|
064647607e | ||
|
|
1ece7f9abc | ||
|
|
38b7096419 | ||
|
|
eef7473a0b | ||
|
|
7a1da5b7cd | ||
|
|
e56176bbea | ||
|
|
49e751a9f8 | ||
|
|
9804850714 | ||
|
|
a6c669cc52 | ||
|
|
e615829310 | ||
|
|
362e9ad0a0 | ||
|
|
4994cd810b | ||
|
|
1487b41b3e | ||
|
|
dd29d16a31 | ||
|
|
ebe5596469 | ||
|
|
275cd6fc2f | ||
|
|
7da6317083 | ||
|
|
470e6309b5 | ||
|
|
a7dde52b7b | ||
|
|
53c0f5761d | ||
|
|
4287c01037 | ||
|
|
4f93fa7224 | ||
|
|
b52a7358ff | ||
|
|
beb8f72560 | ||
|
|
6b6973d6ee | ||
|
|
c790a645f2 |
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
*.gem
|
||||||
|
Gemfile.lock
|
||||||
|
Gemfile.bak
|
||||||
|
.bundle
|
||||||
|
.vagrant
|
||||||
8
.travis.yml
Normal file
8
.travis.yml
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
language: ruby
|
||||||
|
cache: bundler
|
||||||
|
rvm:
|
||||||
|
- jruby
|
||||||
|
before_script:
|
||||||
|
- wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
|
||||||
|
- export JDBC_DERBY_JAR=/tmp/derby.jar
|
||||||
|
script: bundle exec rspec
|
||||||
28
CHANGELOG.md
Normal file
28
CHANGELOG.md
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
# Change Log
|
||||||
|
All notable changes to this project will be documented in this file, from 0.2.0.
|
||||||
|
|
||||||
|
## [0.2.7] - 2016-05-29
|
||||||
|
- Backport retry exception logic from v5 branch
|
||||||
|
- Backport improved timestamp compatibility from v5 branch
|
||||||
|
|
||||||
|
## [0.2.6] - 2016-05-02
|
||||||
|
- Fix for exception infinite loop
|
||||||
|
|
||||||
|
## [0.2.5] - 2016-04-11
|
||||||
|
### Added
|
||||||
|
- Basic tests running against DerbyDB
|
||||||
|
- Fix for converting Logstash::Timestamp to iso8601 from @hordijk
|
||||||
|
|
||||||
|
## [0.2.4] - 2016-04-07
|
||||||
|
- Documentation fixes from @hordijk
|
||||||
|
|
||||||
|
## [0.2.3] - 2016-02-16
|
||||||
|
- Bug fixes
|
||||||
|
|
||||||
|
## [0.2.2] - 2015-12-30
|
||||||
|
- Bug fixes
|
||||||
|
|
||||||
|
## [0.2.1] - 2015-12-22
|
||||||
|
- Support for connection pooling support added through HikariCP
|
||||||
|
- Support for unsafe statement handling (allowing dynamic queries)
|
||||||
|
- Altered exception handling to now count sequential flushes with exceptions thrown
|
||||||
141
README.md
141
README.md
@@ -1,96 +1,69 @@
|
|||||||
# logstash-jdbc
|
# logstash-output-jdbc
|
||||||
|
|
||||||
**This is an old version of the plugin, for Logstash v1.4 only. It is no longer receiving backports of fixes and contains several flaws which have been addressed in the Logstash v2 compatible versions, and above. Please upgrade/see the other branches in the source repository.**
|
[](https://travis-ci.org/theangryangel/logstash-output-jdbc)
|
||||||
|
|
||||||
JDBC output plugin for Logstash.
|
|
||||||
This plugin is provided as an external plugin and is not part of the Logstash project.
|
This plugin is provided as an external plugin and is not part of the Logstash project.
|
||||||
|
|
||||||
## Warning
|
This plugin allows you to output to SQL databases, using JDBC adapters.
|
||||||
|
See below for tested adapters, and example configurations.
|
||||||
|
|
||||||
This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
|
This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
|
||||||
|
|
||||||
|
If you do find this works for a JDBC driver without an example, let me know and provide a small example configuration if you can.
|
||||||
|
|
||||||
|
This plugin does not bundle any JDBC jar files, and does expect them to be in a
|
||||||
|
particular location. Please ensure you read the 4 installation lines below.
|
||||||
|
|
||||||
|
## ChangeLog
|
||||||
|
See CHANGELOG.md
|
||||||
|
|
||||||
|
## Versions
|
||||||
|
Released versions are available via rubygems, and typically tagged.
|
||||||
|
|
||||||
|
For development:
|
||||||
|
- See master branch for logstash v5
|
||||||
|
- See v2.x branch for logstash v2
|
||||||
|
- See v1.5 branch for logstash v1.5
|
||||||
|
- See v1.4 branch for logstash 1.4
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
- Copy lib directory contents into your logstash installation.
|
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
|
||||||
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
|
- Now either:
|
||||||
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
|
- Use driver_jar_path in your configuraton to specify a path to your jar file
|
||||||
- Configure
|
- Or:
|
||||||
|
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
|
||||||
|
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
|
||||||
|
- And then configure (examples below)
|
||||||
|
|
||||||
|
## Running tests
|
||||||
|
At this time tests only run against Derby, in an in-memory database.
|
||||||
|
Acceptance tests for individual database engines will be added over time.
|
||||||
|
|
||||||
|
Assuming valid jruby is installed
|
||||||
|
- First time, issue `jruby -S bundle install` to install dependencies
|
||||||
|
- Next, download Derby jar from https://db.apache.org/derby/
|
||||||
|
- Run the tests `JDBC_DERBY_JAR=path/to/derby.jar jruby -S rspec`
|
||||||
|
- Optionally add the `JDBC_DEBUG=1` env variable to add logging to stdout
|
||||||
|
|
||||||
## Configuration options
|
## Configuration options
|
||||||
* driver_class, string, JDBC driver class to load
|
|
||||||
* connection_string, string, JDBC connection string
|
| Option | Type | Description | Required? | Default |
|
||||||
* statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim.
|
| ------ | ---- | ----------- | --------- | ------- |
|
||||||
* flush_size, number, default = 1000, number of entries to buffer before sending to SQL
|
| driver_class | String | Specify a driver class if autoloading fails | No | |
|
||||||
* idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time
|
| driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
|
||||||
* max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop
|
| driver_jar_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
|
||||||
* max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value
|
| connection_string | String | JDBC connection URL | Yes | |
|
||||||
|
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
|
||||||
|
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
|
||||||
|
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
|
||||||
|
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
|
||||||
|
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
|
||||||
|
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
|
||||||
|
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
|
||||||
|
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 |
|
||||||
|
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
|
||||||
|
|
||||||
## Example configurations
|
## Example configurations
|
||||||
### SQLite3
|
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
|
||||||
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
|
|
||||||
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
|
|
||||||
```
|
|
||||||
input
|
|
||||||
{
|
|
||||||
stdin { }
|
|
||||||
}
|
|
||||||
output {
|
|
||||||
stdout { }
|
|
||||||
|
|
||||||
jdbc {
|
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
|
||||||
driver_class => 'org.sqlite.JDBC'
|
|
||||||
connection_string => 'jdbc:sqlite:test.db'
|
|
||||||
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### SQL Server
|
|
||||||
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
|
|
||||||
```
|
|
||||||
input
|
|
||||||
{
|
|
||||||
stdin { }
|
|
||||||
}
|
|
||||||
output {
|
|
||||||
jdbc {
|
|
||||||
driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
|
|
||||||
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
|
|
||||||
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Postgres
|
|
||||||
With thanks to [@roflmao](https://github.com/roflmao)
|
|
||||||
```
|
|
||||||
input
|
|
||||||
{
|
|
||||||
stdin { }
|
|
||||||
}
|
|
||||||
output {
|
|
||||||
jdbc {
|
|
||||||
driver_class => 'org.postgresql.Driver'
|
|
||||||
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
|
|
||||||
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Oracle
|
|
||||||
With thanks to [@josemazo](https://github.com/josemazo)
|
|
||||||
* Tested with Express Edition 11g Release 2
|
|
||||||
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
|
|
||||||
```
|
|
||||||
input
|
|
||||||
{
|
|
||||||
stdin { }
|
|
||||||
}
|
|
||||||
output {
|
|
||||||
jdbc {
|
|
||||||
driver_class => "oracle.jdbc.driver.OracleDriver"
|
|
||||||
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
|
|
||||||
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
/* vim: set ts=4 sw=4 tw=0 :*/
|
|
||||||
|
|||||||
16
examples/apache-phoenix-hbase-sql.md
Normal file
16
examples/apache-phoenix-hbase-sql.md
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# Example: Apache Phoenix (HBase SQL)
|
||||||
|
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
|
||||||
|
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
connection_string => "jdbc:phoenix:ZOOKEEPER_HOSTNAME"
|
||||||
|
statement => [ "UPSERT INTO EVENTS log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
16
examples/mariadb.md
Normal file
16
examples/mariadb.md
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# Example: MariaDB
|
||||||
|
* Tested with Ubuntu 14.04.3 LTS, Server version: 10.1.9-MariaDB-1~trusty-log mariadb.org binary distribution
|
||||||
|
* Tested using https://downloads.mariadb.com/enterprise/tqge-whfa/connectors/java/connector-java-1.3.2/mariadb-java-client-1.3.2.jar (mariadb-java-client-1.3.2.jar)
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
connection_string => "jdbc:mariadb://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
16
examples/mysql.md
Normal file
16
examples/mysql.md
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# Example: Mysql
|
||||||
|
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
|
||||||
|
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
|
||||||
|
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
20
examples/odps.md
Normal file
20
examples/odps.md
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
# Example: ODPS
|
||||||
|
With thanks to [@onesuper](https://github.com/onesuper)
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
driver_class => "com.aliyun.odps.jdbc.OdpsDriver"
|
||||||
|
driver_auto_commit => false
|
||||||
|
connection_string => "jdbc:odps:http://service.odps.aliyun.com/api?project=meta_dev&loglevel=DEBUG"
|
||||||
|
username => "abcd"
|
||||||
|
password => "1234"
|
||||||
|
max_pool_size => 5
|
||||||
|
flush_size => 10
|
||||||
|
statement => [ "INSERT INTO test_logstash VALUES(?, ?, ?);", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
16
examples/oracle.md
Normal file
16
examples/oracle.md
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# Example: Oracle
|
||||||
|
With thanks to [@josemazo](https://github.com/josemazo)
|
||||||
|
* Tested with Express Edition 11g Release 2
|
||||||
|
* Tested using http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html (ojdbc6.jar)
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
15
examples/postgres.md
Normal file
15
examples/postgres.md
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
# Example: Postgres
|
||||||
|
With thanks to [@roflmao](https://github.com/roflmao)
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
14
examples/sql-server.md
Normal file
14
examples/sql-server.md
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
# Example: SQL Server
|
||||||
|
* Tested using http://msdn.microsoft.com/en-gb/sqlserver/aa937724.aspx
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
jdbc {
|
||||||
|
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
17
examples/sqlite.md
Normal file
17
examples/sqlite.md
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
# Example: SQLite3
|
||||||
|
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
|
||||||
|
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
|
||||||
|
```
|
||||||
|
input
|
||||||
|
{
|
||||||
|
stdin { }
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
stdout { }
|
||||||
|
|
||||||
|
jdbc {
|
||||||
|
connection_string => 'jdbc:sqlite:test.db'
|
||||||
|
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
5
lib/logstash-output-jdbc_jars.rb
Normal file
5
lib/logstash-output-jdbc_jars.rb
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
# encoding: utf-8
|
||||||
|
require 'logstash/environment'
|
||||||
|
|
||||||
|
root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
|
||||||
|
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")
|
||||||
17
lib/logstash-output-jdbc_ring-buffer.rb
Normal file
17
lib/logstash-output-jdbc_ring-buffer.rb
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
class RingBuffer < Array
|
||||||
|
attr_reader :max_size
|
||||||
|
|
||||||
|
def initialize(max_size, enum = nil)
|
||||||
|
@max_size = max_size
|
||||||
|
enum.each { |e| self << e } if enum
|
||||||
|
end
|
||||||
|
|
||||||
|
def <<(el)
|
||||||
|
if self.size < @max_size || @max_size.nil?
|
||||||
|
super
|
||||||
|
else
|
||||||
|
self.shift
|
||||||
|
self.push(el)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -3,23 +3,72 @@ require "logstash/outputs/base"
|
|||||||
require "logstash/namespace"
|
require "logstash/namespace"
|
||||||
require "stud/buffer"
|
require "stud/buffer"
|
||||||
require "java"
|
require "java"
|
||||||
|
require "logstash-output-jdbc_jars"
|
||||||
|
require "logstash-output-jdbc_ring-buffer"
|
||||||
|
|
||||||
|
# Write events to a SQL engine, using JDBC.
|
||||||
|
#
|
||||||
|
# It is upto the user of the plugin to correctly configure the plugin. This
|
||||||
|
# includes correctly crafting the SQL statement, and matching the number of
|
||||||
|
# parameters correctly.
|
||||||
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
||||||
# Adds buffer support
|
# Adds buffer support
|
||||||
include Stud::Buffer
|
include Stud::Buffer
|
||||||
|
|
||||||
config_name "jdbc"
|
STRFTIME_FMT = '%Y-%m-%d %T.%L'.freeze
|
||||||
milestone 1
|
|
||||||
|
|
||||||
# Driver class
|
RETRYABLE_SQLSTATE_CLASSES = [
|
||||||
|
# Classes of retryable SQLSTATE codes
|
||||||
|
# Not all in the class will be retryable. However, this is the best that
|
||||||
|
# we've got right now.
|
||||||
|
# If a custom state code is required, set it in retry_sql_states.
|
||||||
|
'08', # Connection Exception
|
||||||
|
'24', # Invalid Cursor State (Maybe retry-able in some circumstances)
|
||||||
|
'25', # Invalid Transaction State
|
||||||
|
'40', # Transaction Rollback
|
||||||
|
'53', # Insufficient Resources
|
||||||
|
'54', # Program Limit Exceeded (MAYBE)
|
||||||
|
'55', # Object Not In Prerequisite State
|
||||||
|
'57', # Operator Intervention
|
||||||
|
'58', # System Error
|
||||||
|
].freeze
|
||||||
|
|
||||||
|
config_name "jdbc"
|
||||||
|
|
||||||
|
# Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
|
||||||
config :driver_class, :validate => :string
|
config :driver_class, :validate => :string
|
||||||
|
|
||||||
# connection string
|
# Does the JDBC driver support autocommit?
|
||||||
|
config :driver_auto_commit, :validate => :boolean, :default => true, :required => true
|
||||||
|
|
||||||
|
# Where to find the jar
|
||||||
|
# Defaults to not required, and to the original behaviour
|
||||||
|
config :driver_jar_path, :validate => :string, :required => false
|
||||||
|
|
||||||
|
# jdbc connection string
|
||||||
config :connection_string, :validate => :string, :required => true
|
config :connection_string, :validate => :string, :required => true
|
||||||
|
|
||||||
|
# jdbc username - optional, maybe in the connection string
|
||||||
|
config :username, :validate => :string, :required => false
|
||||||
|
|
||||||
|
# jdbc password - optional, maybe in the connection string
|
||||||
|
config :password, :validate => :string, :required => false
|
||||||
|
|
||||||
# [ "insert into table (message) values(?)", "%{message}" ]
|
# [ "insert into table (message) values(?)", "%{message}" ]
|
||||||
config :statement, :validate => :array, :required => true
|
config :statement, :validate => :array, :required => true
|
||||||
|
|
||||||
|
# If this is an unsafe statement, use event.sprintf
|
||||||
|
# This also has potential performance penalties due to having to create a
|
||||||
|
# new statement for each event, rather than adding to the batch and issuing
|
||||||
|
# multiple inserts in 1 go
|
||||||
|
config :unsafe_statement, :validate => :boolean, :default => false
|
||||||
|
|
||||||
|
# Number of connections in the pool to maintain
|
||||||
|
config :max_pool_size, :validate => :number, :default => 5
|
||||||
|
|
||||||
|
# Connection timeout
|
||||||
|
config :connection_timeout, :validate => :number, :default => 10000
|
||||||
|
|
||||||
# We buffer a certain number of events before flushing that out to SQL.
|
# We buffer a certain number of events before flushing that out to SQL.
|
||||||
# This setting controls how many events will be buffered before sending a
|
# This setting controls how many events will be buffered before sending a
|
||||||
# batch of events.
|
# batch of events.
|
||||||
@@ -36,47 +85,43 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
|||||||
# a timely manner.
|
# a timely manner.
|
||||||
#
|
#
|
||||||
# If you change this value please ensure that you change
|
# If you change this value please ensure that you change
|
||||||
# max_repeat_exceptions_time accordingly.
|
# max_flush_exceptions accordingly.
|
||||||
config :idle_flush_time, :validate => :number, :default => 1
|
config :idle_flush_time, :validate => :number, :default => 1
|
||||||
|
|
||||||
# Maximum number of repeating (sequential) exceptions, before we stop retrying
|
# Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
|
||||||
# If set to < 1, then it will infinitely retry.
|
# If set to < 1, then it will infinitely retry.
|
||||||
config :max_repeat_exceptions, :validate => :number, :default => 5
|
#
|
||||||
|
# You should carefully tune this in relation to idle_flush_time if your SQL server
|
||||||
|
# is not highly available.
|
||||||
|
# i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
|
||||||
|
# longer than 200 seconds to reboot, then logstash will stop.
|
||||||
|
config :max_flush_exceptions, :validate => :number, :default => 0
|
||||||
|
|
||||||
# The max number of seconds since the last exception, before we consider it
|
config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
|
||||||
# a different cause.
|
config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
|
||||||
# This value should be carefully considered in respect to idle_flush_time.
|
|
||||||
config :max_repeat_exceptions_time, :validate => :number, :default => 30
|
|
||||||
|
|
||||||
public
|
public
|
||||||
def register
|
def register
|
||||||
@logger.info("JDBC - Starting up")
|
@logger.info("JDBC - Starting up")
|
||||||
|
|
||||||
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
|
load_jar_files!
|
||||||
@logger.info(jarpath)
|
|
||||||
Dir[jarpath].each do |jar|
|
|
||||||
@logger.debug("JDBC - Loaded jar", :jar => jar)
|
|
||||||
require jar
|
|
||||||
end
|
|
||||||
|
|
||||||
import @driver_class
|
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
|
||||||
|
|
||||||
driver = Object.const_get(@driver_class[@driver_class.rindex('.') + 1, @driver_class.length]).new
|
|
||||||
@connection = driver.connect(@connection_string, java.util.Properties.new)
|
|
||||||
|
|
||||||
@logger.debug("JDBC - Created connection", :driver => driver, :connection => @connection)
|
|
||||||
|
|
||||||
if (@flush_size > 1000)
|
if (@flush_size > 1000)
|
||||||
@logger.warn("JDBC - Flush size is set to > 1000. May have performance penalties, depending on your SQL engine.")
|
@logger.warn("JDBC - Flush size is set to > 1000")
|
||||||
end
|
end
|
||||||
|
|
||||||
@repeat_exception_count = 0
|
if @statement.length < 1
|
||||||
@last_exception_time = Time.now
|
@logger.error("JDBC - No statement provided. Configuration error.")
|
||||||
|
|
||||||
if (@max_repeat_exceptions > 0) and ((@idle_flush_time * @max_repeat_exceptions) > @max_repeat_exceptions_time)
|
|
||||||
@logger.warn("JDBC - max_repeat_exceptions_time is set such that it may still permit a looping exception. You probably changed idle_flush_time. Considering increasing max_repeat_exceptions_time.")
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if (!@unsafe_statement and @statement.length < 2)
|
||||||
|
@logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
|
||||||
|
end
|
||||||
|
|
||||||
|
setup_and_test_pool!
|
||||||
|
|
||||||
buffer_initialize(
|
buffer_initialize(
|
||||||
:max_items => @flush_size,
|
:max_items => @flush_size,
|
||||||
:max_interval => @idle_flush_time,
|
:max_interval => @idle_flush_time,
|
||||||
@@ -85,72 +130,211 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
|||||||
end
|
end
|
||||||
|
|
||||||
def receive(event)
|
def receive(event)
|
||||||
return unless output?(event)
|
return unless output?(event) or event.cancelled?
|
||||||
return unless @statement.length > 0
|
return unless @statement.length > 0
|
||||||
|
|
||||||
buffer_receive(event)
|
buffer_receive(event)
|
||||||
end
|
end
|
||||||
|
|
||||||
def flush(events, teardown=false)
|
def flush(events, teardown=false)
|
||||||
statement = @connection.prepareStatement(@statement[0])
|
if @unsafe_statement == true
|
||||||
|
unsafe_flush(events, teardown)
|
||||||
events.each do |event|
|
else
|
||||||
next if @statement.length < 2
|
safe_flush(events, teardown)
|
||||||
|
|
||||||
@statement[1..-1].each_with_index do |i, idx|
|
|
||||||
case event[i]
|
|
||||||
when Time
|
|
||||||
# Most reliable solution, cross JDBC driver
|
|
||||||
statement.setString(idx + 1, event[i].iso8601())
|
|
||||||
when Fixnum, Integer
|
|
||||||
statement.setInt(idx + 1, event[i])
|
|
||||||
when Float
|
|
||||||
statement.setFloat(idx + 1, event[i])
|
|
||||||
when String
|
|
||||||
statement.setString(idx + 1, event[i])
|
|
||||||
else
|
|
||||||
statement.setString(idx + 1, event.sprintf(i))
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
statement.addBatch()
|
|
||||||
end
|
end
|
||||||
|
|
||||||
begin
|
|
||||||
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
|
|
||||||
statement.executeBatch()
|
|
||||||
rescue => e
|
|
||||||
# Raising an exception will incur a retry from Stud::Buffer.
|
|
||||||
# Since the exceutebatch failed this should mean any events failed to be
|
|
||||||
# inserted will be re-run. We're going to log it for the lols anyway.
|
|
||||||
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
|
|
||||||
end
|
|
||||||
|
|
||||||
statement.close()
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def on_flush_error(e)
|
def on_flush_error(e)
|
||||||
return if @max_repeat_exceptions < 1
|
return if @max_flush_exceptions < 1
|
||||||
|
|
||||||
if @last_exception == e.to_s
|
@exceptions_tracker << e.class
|
||||||
@repeat_exception_count += 1
|
|
||||||
else
|
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
|
||||||
@repeat_exception_count = 0
|
@logger.error("JDBC - max_flush_exceptions has been reached")
|
||||||
|
log_jdbc_exception(e)
|
||||||
|
raise LogStash::ShutdownSignal.new
|
||||||
end
|
end
|
||||||
|
|
||||||
if (@repeat_exception_count >= @max_repeat_exceptions) and (Time.now - @last_exception_time) < @max_repeat_exceptions_time
|
|
||||||
@logger.error("JDBC - Exception repeated more than the maximum configured", :exception => e, :max_repeat_exceptions => @max_repeat_exceptions, :max_repeat_exceptions_time => @max_repeat_exceptions_time)
|
|
||||||
raise e
|
|
||||||
end
|
|
||||||
|
|
||||||
@last_exception_time = Time.now
|
|
||||||
@last_exception = e.to_s
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def teardown
|
def teardown
|
||||||
buffer_flush(:final => true)
|
buffer_flush(:final => true)
|
||||||
@connection.close()
|
@pool.close()
|
||||||
super
|
super
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def setup_and_test_pool!
|
||||||
|
# Setup pool
|
||||||
|
@pool = Java::ComZaxxerHikari::HikariDataSource.new
|
||||||
|
|
||||||
|
@pool.setAutoCommit(@driver_auto_commit)
|
||||||
|
@pool.setDriverClassName(@driver_class) if @driver_class
|
||||||
|
|
||||||
|
@pool.setJdbcUrl(@connection_string)
|
||||||
|
|
||||||
|
@pool.setUsername(@username) if @username
|
||||||
|
@pool.setPassword(@password) if @password
|
||||||
|
|
||||||
|
@pool.setMaximumPoolSize(@max_pool_size)
|
||||||
|
@pool.setConnectionTimeout(@connection_timeout)
|
||||||
|
|
||||||
|
validate_connection_timeout = (@connection_timeout / 1000) / 2
|
||||||
|
|
||||||
|
# Test connection
|
||||||
|
test_connection = @pool.getConnection()
|
||||||
|
unless test_connection.isValid(validate_connection_timeout)
|
||||||
|
@logger.error("JDBC - Connection is not valid. Please check connection string or that your JDBC endpoint is available.")
|
||||||
|
end
|
||||||
|
test_connection.close()
|
||||||
|
end
|
||||||
|
|
||||||
|
def load_jar_files!
|
||||||
|
# Load jar from driver path
|
||||||
|
unless @driver_jar_path.nil?
|
||||||
|
raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path
|
||||||
|
require @driver_jar_path
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
# Revert original behaviour of loading from vendor directory
|
||||||
|
# if no path given
|
||||||
|
if ENV['LOGSTASH_HOME']
|
||||||
|
jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
|
||||||
|
else
|
||||||
|
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
|
||||||
|
end
|
||||||
|
|
||||||
|
@logger.debug("JDBC - jarpath", path: jarpath)
|
||||||
|
|
||||||
|
jars = Dir[jarpath]
|
||||||
|
raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty?
|
||||||
|
|
||||||
|
jars.each do |jar|
|
||||||
|
@logger.debug("JDBC - Loaded jar", :jar => jar)
|
||||||
|
require jar
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def safe_flush(events, teardown=false)
|
||||||
|
connection = nil
|
||||||
|
statement = nil
|
||||||
|
|
||||||
|
begin
|
||||||
|
connection = @pool.getConnection()
|
||||||
|
rescue => e
|
||||||
|
log_jdbc_exception(e)
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
|
||||||
|
begin
|
||||||
|
statement = connection.prepareStatement(@statement[0])
|
||||||
|
|
||||||
|
events.each do |event|
|
||||||
|
next if event.cancelled?
|
||||||
|
next if @statement.length < 2
|
||||||
|
statement = add_statement_event_params(statement, event)
|
||||||
|
|
||||||
|
statement.addBatch()
|
||||||
|
end
|
||||||
|
|
||||||
|
statement.executeBatch()
|
||||||
|
statement.close()
|
||||||
|
@exceptions_tracker << nil
|
||||||
|
rescue => e
|
||||||
|
log_jdbc_exception(e)
|
||||||
|
if retry_exception?(e)
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
statement.close() unless statement.nil?
|
||||||
|
connection.close() unless connection.nil?
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def unsafe_flush(events, teardown=false)
|
||||||
|
connection = nil
|
||||||
|
statement = nil
|
||||||
|
begin
|
||||||
|
connection = @pool.getConnection()
|
||||||
|
rescue => e
|
||||||
|
log_jdbc_exception(e)
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
|
||||||
|
begin
|
||||||
|
events.each do |event|
|
||||||
|
next if event.cancelled?
|
||||||
|
|
||||||
|
statement = connection.prepareStatement(event.sprintf(@statement[0]))
|
||||||
|
statement = add_statement_event_params(statement, event) if @statement.length > 1
|
||||||
|
|
||||||
|
statement.execute()
|
||||||
|
|
||||||
|
# cancel the event, since we may end up outputting the same event multiple times
|
||||||
|
# if an exception happens later down the line
|
||||||
|
event.cancel
|
||||||
|
@exceptions_tracker << nil
|
||||||
|
end
|
||||||
|
rescue => e
|
||||||
|
log_jdbc_exception(e)
|
||||||
|
if retry_exception?(e)
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
statement.close() unless statement.nil?
|
||||||
|
connection.close() unless connection.nil?
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_statement_event_params(statement, event)
|
||||||
|
@statement[1..-1].each_with_index do |i, idx|
|
||||||
|
case event[i]
|
||||||
|
when Time
|
||||||
|
# See LogStash::Timestamp, below, for the why behind strftime.
|
||||||
|
statement.setString(idx + 1, event[i].strftime(STRFTIME_FMT))
|
||||||
|
when LogStash::Timestamp
|
||||||
|
# XXX: Using setString as opposed to setTimestamp, because setTimestamp
|
||||||
|
# doesn't behave correctly in some drivers (Known: sqlite)
|
||||||
|
#
|
||||||
|
# Additionally this does not use `to_iso8601`, since some SQL databases
|
||||||
|
# choke on the 'T' in the string (Known: Derby).
|
||||||
|
#
|
||||||
|
# strftime appears to be the most reliable across drivers.
|
||||||
|
statement.setString(idx + 1, event[i].time.strftime(STRFTIME_FMT))
|
||||||
|
when Fixnum, Integer
|
||||||
|
statement.setInt(idx + 1, event[i])
|
||||||
|
when Float
|
||||||
|
statement.setFloat(idx + 1, event[i])
|
||||||
|
when String
|
||||||
|
statement.setString(idx + 1, event[i])
|
||||||
|
when true
|
||||||
|
statement.setBoolean(idx + 1, true)
|
||||||
|
when false
|
||||||
|
statement.setBoolean(idx + 1, false)
|
||||||
|
else
|
||||||
|
if event[i].nil? and i =~ /%\{/
|
||||||
|
statement.setString(idx + 1, event.sprintf(i))
|
||||||
|
else
|
||||||
|
statement.setString(idx + 1, nil)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
statement
|
||||||
|
end
|
||||||
|
|
||||||
|
def log_jdbc_exception(exception)
|
||||||
|
current_exception = exception
|
||||||
|
loop do
|
||||||
|
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => current_exception)
|
||||||
|
current_exception = current_exception.getNextException()
|
||||||
|
break if current_exception == nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def retry_exception?(exception)
|
||||||
|
return (exception.class != java.sql.SQLException or
|
||||||
|
RETRYABLE_SQLSTATE_CLASSES.include?(e.getSQLState[0,2]))
|
||||||
|
end
|
||||||
end # class LogStash::Outputs::jdbc
|
end # class LogStash::Outputs::jdbc
|
||||||
|
|||||||
29
logstash-output-jdbc.gemspec
Normal file
29
logstash-output-jdbc.gemspec
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
Gem::Specification.new do |s|
|
||||||
|
s.name = 'logstash-output-jdbc'
|
||||||
|
s.version = "0.2.7"
|
||||||
|
s.licenses = [ "Apache License (2.0)" ]
|
||||||
|
s.summary = "This plugin allows you to output to SQL, via JDBC"
|
||||||
|
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
|
||||||
|
s.authors = ["the_angry_angel"]
|
||||||
|
s.email = "karl+github@theangryangel.co.uk"
|
||||||
|
s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
|
||||||
|
s.require_paths = [ "lib" ]
|
||||||
|
|
||||||
|
# Files
|
||||||
|
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
|
||||||
|
|
||||||
|
# Tests
|
||||||
|
s.test_files = s.files.grep(%r{^(test|spec|features)/})
|
||||||
|
|
||||||
|
# Special flag to let us know this is actually a logstash plugin
|
||||||
|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
|
||||||
|
|
||||||
|
# Gem dependencies
|
||||||
|
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
|
||||||
|
s.add_runtime_dependency 'stud'
|
||||||
|
s.add_runtime_dependency "logstash-codec-plain"
|
||||||
|
|
||||||
|
s.add_development_dependency "logstash-devutils"
|
||||||
|
|
||||||
|
s.post_install_message = "logstash-output-jdbc 0.2.0 introduces several new features - please ensure you check the documentation in the README file"
|
||||||
|
end
|
||||||
94
spec/outputs/jdbc_spec.rb
Normal file
94
spec/outputs/jdbc_spec.rb
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
require "logstash/devutils/rspec/spec_helper"
|
||||||
|
require "logstash/outputs/jdbc"
|
||||||
|
require "stud/temporary"
|
||||||
|
require "java"
|
||||||
|
|
||||||
|
describe LogStash::Outputs::Jdbc do
|
||||||
|
|
||||||
|
let(:derby_settings) do
|
||||||
|
{
|
||||||
|
"driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
|
||||||
|
"connection_string" => "jdbc:derby:memory:testdb;create=true",
|
||||||
|
"driver_jar_path" => ENV['JDBC_DERBY_JAR'],
|
||||||
|
"statement" => [ "insert into log (created_at, message) values(?, ?)", "@timestamp" "message" ]
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'rspec setup' do
|
||||||
|
|
||||||
|
it 'ensure derby is available' do
|
||||||
|
j = ENV['JDBC_DERBY_JAR']
|
||||||
|
expect(j).not_to be_nil, "JDBC_DERBY_JAR not defined, required to run tests"
|
||||||
|
expect(File.exists?(j)).to eq(true), "JDBC_DERBY_JAR defined, but not valid"
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when initializing' do
|
||||||
|
|
||||||
|
it 'shouldn\'t register without a config' do
|
||||||
|
expect {
|
||||||
|
LogStash::Plugin.lookup("output", "jdbc").new()
|
||||||
|
}.to raise_error(LogStash::ConfigurationError)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'shouldn\'t register with a missing jar file' do
|
||||||
|
derby_settings['driver_jar_path'] = nil
|
||||||
|
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
|
||||||
|
expect { plugin.register }.to raise_error
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'shouldn\'t register with a missing jar file' do
|
||||||
|
derby_settings['connection_string'] = nil
|
||||||
|
plugin = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
|
||||||
|
expect { plugin.register }.to raise_error
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when outputting messages' do
|
||||||
|
|
||||||
|
let(:event_fields) do
|
||||||
|
{ message: 'test-message' }
|
||||||
|
end
|
||||||
|
let(:event) { LogStash::Event.new(event_fields) }
|
||||||
|
let(:plugin) {
|
||||||
|
# Setup plugin
|
||||||
|
output = LogStash::Plugin.lookup("output", "jdbc").new(derby_settings)
|
||||||
|
output.register
|
||||||
|
if ENV['JDBC_DEBUG'] == '1'
|
||||||
|
output.logger.subscribe(STDOUT)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Setup table
|
||||||
|
c = output.instance_variable_get(:@pool).getConnection()
|
||||||
|
stmt = c.createStatement()
|
||||||
|
stmt.executeUpdate("CREATE table log (created_at timestamp, message varchar(512))")
|
||||||
|
stmt.close()
|
||||||
|
c.close()
|
||||||
|
|
||||||
|
output
|
||||||
|
}
|
||||||
|
|
||||||
|
it 'should save a event' do
|
||||||
|
expect { plugin.receive(event) }.to_not raise_error
|
||||||
|
|
||||||
|
# Wait for 1 second, for the buffer to flush
|
||||||
|
sleep 1
|
||||||
|
|
||||||
|
c = plugin.instance_variable_get(:@pool).getConnection()
|
||||||
|
stmt = c.createStatement()
|
||||||
|
rs = stmt.executeQuery("select count(*) as total from log")
|
||||||
|
count = 0
|
||||||
|
while rs.next()
|
||||||
|
count = rs.getInt("total")
|
||||||
|
end
|
||||||
|
stmt.close()
|
||||||
|
c.close()
|
||||||
|
|
||||||
|
expect(count).to be > 0
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
BIN
vendor/jar-dependencies/runtime-jars/HikariCP-2.4.2.jar
vendored
Normal file
BIN
vendor/jar-dependencies/runtime-jars/HikariCP-2.4.2.jar
vendored
Normal file
Binary file not shown.
BIN
vendor/jar-dependencies/runtime-jars/slf4j-api-1.7.13.jar
vendored
Normal file
BIN
vendor/jar-dependencies/runtime-jars/slf4j-api-1.7.13.jar
vendored
Normal file
Binary file not shown.
BIN
vendor/jar-dependencies/runtime-jars/slf4j-nop-1.7.13.jar
vendored
Normal file
BIN
vendor/jar-dependencies/runtime-jars/slf4j-nop-1.7.13.jar
vendored
Normal file
Binary file not shown.
Reference in New Issue
Block a user