29 Commits

Author SHA1 Message Date
Karl Southern
bfcd9bf69a Addresses issue 26 2015-12-23 09:42:53 +00:00
Karl
af55fde54a Merge pull request #25 from ebuildy/patch-1
Add Apache Phoenix example from @ebuildy
2015-12-22 09:37:27 +00:00
Thomas Decaux
9e05a01dff Add Apache Phoenix example 2015-12-07 10:07:52 +01:00
Karl
064647607e Merge pull request #24 from dmitryakadiamond/MariaDB-working-example
Maria db working example kindly provided by @dmitryakadiamond
2015-12-04 18:28:58 +00:00
Dmitry Morozov
1ece7f9abc formatting fix 2015-12-04 13:20:58 +00:00
Dmitry Morozov
38b7096419 README.md updated 2015-12-04 13:16:19 +00:00
Karl Southern
eef7473a0b Pushing. 2015-11-22 23:19:29 +00:00
Karl Southern
7a1da5b7cd Fix exceptions counter 2015-11-22 18:57:13 +00:00
Karl Southern
e56176bbea Fix missing nil counter 2015-11-19 14:29:47 +00:00
Karl Southern
49e751a9f8 Have to start some real work. Will complete tests over lunch. 2015-11-18 10:06:11 +00:00
Karl Southern
9804850714 WIP 2015-11-17 10:32:16 +00:00
Karl Southern
a6c669cc52 Adds unsafe_statement support 2015-11-15 12:35:57 +00:00
Karl Southern
e615829310 Stupid gemspec version number bullshit 2015-11-14 20:09:38 +00:00
Karl Southern
362e9ad0a0 Adds connection pooling 2015-11-14 20:06:35 +00:00
Karl Southern
4994cd810b Bump version 2015-11-06 15:02:18 +00:00
Karl Southern
1487b41b3e In retrospective, when would nil ever enter the equation at all? 2015-11-06 15:01:02 +00:00
Karl
dd29d16a31 Update logstash-output-jdbc.gemspec
Bump version.
2015-11-06 14:56:14 +00:00
Karl
ebe5596469 Update jdbc.rb
Removes improper nil check which breaks event sprintf formatting examples
2015-11-06 14:55:24 +00:00
Karl Southern
275cd6fc2f v2.0 tested 2015-10-30 18:29:22 +00:00
Karl Southern
7da6317083 Initial untested v2.0 commit 2015-10-30 17:55:42 +00:00
Karl
470e6309b5 Update README.md 2015-09-04 08:46:50 +01:00
Karl
a7dde52b7b Merge pull request #12 from jMonsinjon/master
Added information for connecting to a MySql Database
2015-09-04 08:44:54 +01:00
Jeremie MONSINJON
53c0f5761d Added information for connecting to a MySql Database 2015-09-03 17:37:51 +02:00
Karl Southern
4287c01037 Small style fix up for patch 2015-07-22 16:30:37 +01:00
Karl
4f93fa7224 Merge pull request #11 from kushtrimjunuzi/master
Handling null values and added boolean data type.
2015-07-22 16:28:06 +01:00
kushtrim junuzi
b52a7358ff Handling null values and added boolean data type.\n Handling jdbc inner exceptions. 2015-07-22 08:06:57 -04:00
Karl Southern
beb8f72560 Fixes up README 2015-06-26 12:57:45 +01:00
Karl Southern
6b6973d6ee Fixes up 1.5+ compatibility 2015-06-26 12:45:45 +01:00
Karl Southern
c790a645f2 Initial commit of 1.5. Untested. Lunchtime is over 2015-06-10 12:31:31 +01:00
12 changed files with 429 additions and 88 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
*.gem
Gemfile.lock
Gemfile.bak
.bundle

2
Gemfile Normal file
View File

@@ -0,0 +1,2 @@
source 'https://rubygems.org'
gemspec

120
README.md
View File

@@ -1,28 +1,63 @@
# logstash-jdbc # logstash-output-jdbc
JDBC output plugin for Logstash.
This plugin is provided as an external plugin and is not part of the Logstash project. This plugin is provided as an external plugin and is not part of the Logstash project.
Currently untested with logstash 1.5+. Support is planned. This plugin allows you to output to SQL databases, using JDBC adapters.
See below for tested adapters, and example configurations.
## Warning
This has not yet been extensively tested with all JDBC drivers and may not yet work for you. This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
If you do find this works for a JDBC driver not listed, let me know and provide a small example configuration.
This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below.
## Headlines
- Support for connection pooling added in 0.2.0
- Support for unsafe statement handling (allowing dynamic queries) in 0.2.0
- Altered exception handling to now count sequential flushes with exceptions thrown in 0.2.0
## Versions
- See master branch for logstash v2+
- See v1.5 branch for logstash v1.5
- See v1.4 branch for logstash 1.4
## Installation ## Installation
- Copy lib directory contents into your logstash installation. - Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`) - Now either:
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation - Use driver_class in your configuraton to specify a path to your jar file
- Configure - Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- And then configure (examples below)
## Running tests
Assuming valid JDBC jar, and jruby is setup and installed, and you have issued `jruby -S bundle install` in the development directory
- `SQL_JAR=path/to/your.jar jruby -S bundle exec rspec`
If you need to provide username and password you may do this via the environment variables `SQL_USERNAME` and `SQL_PASSWORD`.
Tests are not yet 100% complete.
## Configuration options ## Configuration options
* driver_class, string, JDBC driver class to load
* connection_string, string, JDBC connection string | Option | Type | Description | Required? | Default |
* statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | ------ | ---- | ----------- | --------- | ------- |
* flush_size, number, default = 1000, number of entries to buffer before sending to SQL | driver_class | String | Specify a driver class if autoloading fails | No | |
* idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time | driver_auto_commit | Boolean | If the driver does not support auto commit, you should set this to false | No | True |
* max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop | driver_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
* max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value | connection_string | String | JDBC connection URL | Yes | |
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 |
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
## Example configurations ## Example configurations
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
### SQLite3 ### SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc * Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db` * SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
@@ -35,7 +70,6 @@ output {
stdout { } stdout { }
jdbc { jdbc {
driver_class => 'org.sqlite.JDBC'
connection_string => 'jdbc:sqlite:test.db' connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
} }
@@ -51,7 +85,6 @@ input
} }
output { output {
jdbc { jdbc {
driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;" connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
} }
@@ -67,7 +100,6 @@ input
} }
output { output {
jdbc { jdbc {
driver_class => 'org.postgresql.Driver'
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password' connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
} }
@@ -85,11 +117,59 @@ input
} }
output { output {
jdbc { jdbc {
driver_class => "oracle.jdbc.driver.OracleDriver"
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID" connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ] statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
} }
} }
``` ```
/* vim: set ts=4 sw=4 tw=0 :*/ ### Mysql
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### MariaDB
* Tested with Ubuntu 14.04.3 LTS, Server version: 10.1.9-MariaDB-1~trusty-log mariadb.org binary distribution
* Tested using https://downloads.mariadb.com/enterprise/tqge-whfa/connectors/java/connector-java-1.3.2/mariadb-java-client-1.3.2.jar (mariadb-java-client-1.3.2.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mariadb://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### Apache Phoenix (HBase SQL)
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:phoenix:ZOOKEEPER_HOSTNAME"
statement => [ "UPSERT INTO EVENTS log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

1
Rakefile Normal file
View File

@@ -0,0 +1 @@
require "logstash/devutils/rake"

View File

@@ -0,0 +1,5 @@
# encoding: utf-8
require 'logstash/environment'
root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")

View File

@@ -0,0 +1,17 @@
class RingBuffer < Array
attr_reader :max_size
def initialize(max_size, enum = nil)
@max_size = max_size
enum.each { |e| self << e } if enum
end
def <<(el)
if self.size < @max_size || @max_size.nil?
super
else
self.shift
self.push(el)
end
end
end

View File

@@ -3,23 +3,49 @@ require "logstash/outputs/base"
require "logstash/namespace" require "logstash/namespace"
require "stud/buffer" require "stud/buffer"
require "java" require "java"
require "logstash-output-jdbc_jars"
require "logstash-output-jdbc_ring-buffer"
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Adds buffer support # Adds buffer support
include Stud::Buffer include Stud::Buffer
config_name "jdbc" config_name "jdbc"
milestone 1
# Driver class # Driver class - Reintroduced for https://github.com/theangryangel/logstash-output-jdbc/issues/26
config :driver_class, :validate => :string config :driver_class, :validate => :string
# connection string # Does the JDBC driver support autocommit?
config :driver_auto_commit, :validate => :boolean, :default => true, :required => true
# Where to find the jar
# Defaults to not required, and to the original behaviour
config :driver_jar_path, :validate => :string, :required => false
# jdbc connection string
config :connection_string, :validate => :string, :required => true config :connection_string, :validate => :string, :required => true
# jdbc username - optional, maybe in the connection string
config :username, :validate => :string, :required => false
# jdbc password - optional, maybe in the connection string
config :password, :validate => :string, :required => false
# [ "insert into table (message) values(?)", "%{message}" ] # [ "insert into table (message) values(?)", "%{message}" ]
config :statement, :validate => :array, :required => true config :statement, :validate => :array, :required => true
# If this is an unsafe statement, use event.sprintf
# This also has potential performance penalties due to having to create a
# new statement for each event, rather than adding to the batch and issuing
# multiple inserts in 1 go
config :unsafe_statement, :validate => :boolean, :default => false
# Number of connections in the pool to maintain
config :max_pool_size, :validate => :number, :default => 5
# Connection timeout
config :connection_timeout, :validate => :number, :default => 2800
# We buffer a certain number of events before flushing that out to SQL. # We buffer a certain number of events before flushing that out to SQL.
# This setting controls how many events will be buffered before sending a # This setting controls how many events will be buffered before sending a
# batch of events. # batch of events.
@@ -36,45 +62,44 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# a timely manner. # a timely manner.
# #
# If you change this value please ensure that you change # If you change this value please ensure that you change
# max_repeat_exceptions_time accordingly. # max_flush_exceptions accordingly.
config :idle_flush_time, :validate => :number, :default => 1 config :idle_flush_time, :validate => :number, :default => 1
# Maximum number of repeating (sequential) exceptions, before we stop retrying # Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
# If set to < 1, then it will infinitely retry. # If set to < 1, then it will infinitely retry.
config :max_repeat_exceptions, :validate => :number, :default => 5 #
# You should carefully tune this in relation to idle_flush_time if your SQL server
# is not highly available.
# i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
# longer than 200 seconds to reboot, then logstash will stop.
config :max_flush_exceptions, :validate => :number, :default => 0
# The max number of seconds since the last exception, before we consider it config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
# a different cause. config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
# This value should be carefully considered in respect to idle_flush_time.
config :max_repeat_exceptions_time, :validate => :number, :default => 30
public public
def register def register
@logger.info("JDBC - Starting up") @logger.info("JDBC - Starting up")
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar") load_jar_files!
@logger.info(jarpath)
Dir[jarpath].each do |jar|
@logger.debug("JDBC - Loaded jar", :jar => jar)
require jar
end
import @driver_class @pool = Java::ComZaxxerHikari::HikariDataSource.new
driver = Object.const_get(@driver_class[@driver_class.rindex('.') + 1, @driver_class.length]).new @pool.setAutoCommit(@driver_auto_commit)
@connection = driver.connect(@connection_string, java.util.Properties.new) @pool.setDriverClassName(@driver_class) if @driver_class
@logger.debug("JDBC - Created connection", :driver => driver, :connection => @connection) @pool.setJdbcUrl(@connection_string)
@pool.setUsername(@username) if @username
@pool.setPassword(@password) if @password
@pool.setMaximumPoolSize(@max_pool_size)
@pool.setConnectionTimeout(@connection_timeout)
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
if (@flush_size > 1000) if (@flush_size > 1000)
@logger.warn("JDBC - Flush size is set to > 1000. May have performance penalties, depending on your SQL engine.") @logger.warn("JDBC - Flush size is set to > 1000")
end
@repeat_exception_count = 0
@last_exception_time = Time.now
if (@max_repeat_exceptions > 0) and ((@idle_flush_time * @max_repeat_exceptions) > @max_repeat_exceptions_time)
@logger.warn("JDBC - max_repeat_exceptions_time is set such that it may still permit a looping exception. You probably changed idle_flush_time. Considering increasing max_repeat_exceptions_time.")
end end
buffer_initialize( buffer_initialize(
@@ -85,72 +110,155 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end end
def receive(event) def receive(event)
return unless output?(event) return unless output?(event) or event.cancelled?
return unless @statement.length > 0 return unless @statement.length > 0
buffer_receive(event) buffer_receive(event)
end end
def flush(events, teardown=false) def flush(events, teardown=false)
statement = @connection.prepareStatement(@statement[0]) if @unsafe_statement == true
unsafe_flush(events, teardown)
else
safe_flush(events, teardown)
end
end
def on_flush_error(e)
return if @max_flush_exceptions < 1
@exceptions_tracker << e.class
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
@logger.error("JDBC - max_flush_exceptions has been reached")
log_jdbc_exception(e)
raise LogStash::ShutdownSignal.new
end
end
def teardown
buffer_flush(:final => true)
@pool.close()
super
end
private
def load_jar_files!
# Load jar from driver path
unless @driver_jar_path.nil?
raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path
require @driver_jar_path
return
end
# Revert original behaviour of loading from vendor directory
# if no path given
if ENV['LOGSTASH_HOME']
jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
else
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
end
@logger.debug("JDBC - jarpath", path: jarpath)
jars = Dir[jarpath]
raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty?
jars.each do |jar|
@logger.debug("JDBC - Loaded jar", :jar => jar)
require jar
end
end
def safe_flush(events, teardown=false)
connection = @pool.getConnection()
statement = connection.prepareStatement(@statement[0])
events.each do |event| events.each do |event|
next if event.cancelled?
next if @statement.length < 2 next if @statement.length < 2
statement = add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].iso8601())
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
else
statement.setString(idx + 1, event.sprintf(i))
end
end
statement.addBatch() statement.addBatch()
end end
begin begin
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
statement.executeBatch() statement.executeBatch()
statement.close()
@exceptions_tracker << nil
rescue => e rescue => e
# Raising an exception will incur a retry from Stud::Buffer. # Raising an exception will incur a retry from Stud::Buffer.
# Since the exceutebatch failed this should mean any events failed to be # Since the exceutebatch failed this should mean any events failed to be
# inserted will be re-run. We're going to log it for the lols anyway. # inserted will be re-run. We're going to log it for the lols anyway.
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e) log_jdbc_exception(e)
ensure
connection.close();
end end
statement.close()
end end
def on_flush_error(e) def unsafe_flush(events, teardown=false)
return if @max_repeat_exceptions < 1 connection = @pool.getConnection()
if @last_exception == e.to_s events.each do |event|
@repeat_exception_count += 1 next if event.cancelled?
else
@repeat_exception_count = 0 statement = connection.prepareStatement(event.sprintf(@statement[0]))
statement = add_statement_event_params(statement, event) if @statement.length > 1
begin
statement.execute()
# cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line
event.cancel
@exceptions_tracker << nil
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# We log for the lols.
log_jdbc_exception(e)
ensure
statement.close()
connection.close()
end
end end
if (@repeat_exception_count >= @max_repeat_exceptions) and (Time.now - @last_exception_time) < @max_repeat_exceptions_time
@logger.error("JDBC - Exception repeated more than the maximum configured", :exception => e, :max_repeat_exceptions => @max_repeat_exceptions, :max_repeat_exceptions_time => @max_repeat_exceptions_time)
raise e
end
@last_exception_time = Time.now
@last_exception = e.to_s
end end
def teardown def add_statement_event_params(statement, event)
buffer_flush(:final => true) @statement[1..-1].each_with_index do |i, idx|
@connection.close() case event[i]
super when Time, LogStash::Timestamp
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].iso8601())
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
when true
statement.setBoolean(idx + 1, true)
when false
statement.setBoolean(idx + 1, false)
else
if event[i].nil? and i =~ /%\{/
statement.setString(idx + 1, event.sprintf(i))
else
statement.setString(idx + 1, nil)
end
end
end
statement
end end
def log_jdbc_exception(e)
ce = e
loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => ce)
ce = e.getNextException()
break if ce == nil
end
end
end # class LogStash::Outputs::jdbc end # class LogStash::Outputs::jdbc

View File

@@ -0,0 +1,29 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc'
s.version = "0.2.2.rc1"
s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
s.authors = ["the_angry_angel"]
s.email = "karl+github@theangryangel.co.uk"
s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
s.require_paths = [ "lib" ]
# Files
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
# Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
s.add_runtime_dependency 'stud'
s.add_runtime_dependency "logstash-codec-plain"
s.add_development_dependency "logstash-devutils"
s.post_install_message = "logstash-output-jdbc 0.2.0 introduces several new features - please ensure you check the documentation in the README file"
end

95
spec/outputs/jdbc_spec.rb Normal file
View File

@@ -0,0 +1,95 @@
require "logstash/devutils/rspec/spec_helper"
require "logstash/outputs/jdbc"
require "stud/temporary"
require "java"
describe LogStash::Outputs::Jdbc do
def fetch_log_table_rowcount
# sleep for a second to let the flush happen
sleep 1
stmt = @sql.createStatement()
rs = stmt.executeQuery("select count(*) as total from log")
count = 0
while rs.next()
count = rs.getInt("total")
end
stmt.close()
return count
end
let(:base_settings) { {
"driver_jar_path" => @driver_jar_path,
"connection_string" => @test_connection_string,
"username" => ENV['SQL_USERNAME'],
"password" => ENV['SQL_PASSWORD'],
"statement" => [ "insert into log (message) values(?)", "message" ],
"max_pool_size" => 1,
"flush_size" => 1,
"max_flush_exceptions" => 1
} }
let(:test_settings) { {} }
let(:plugin) { LogStash::Outputs::Jdbc.new(base_settings.merge(test_settings)) }
let(:event_fields) { { "message" => "This is a message!" } }
let(:event) { LogStash::Event.new(event_fields) }
before(:all) do
@driver_jar_path = File.absolute_path(ENV['SQL_JAR'])
@test_db_path = File.join(Stud::Temporary.directory, "test.db")
@test_connection_string = "jdbc:sqlite:#{@test_db_path}"
require @driver_jar_path
@sql = java.sql.DriverManager.get_connection(@test_connection_string, ENV['SQL_USERNAME'].to_s, ENV['SQL_PASSWORD'].to_s)
stmt = @sql.createStatement()
stmt.executeUpdate("CREATE table log (host text, timestamp datetime, message text);")
stmt.close()
end
before(:each) do
stmt = @sql.createStatement()
stmt.executeUpdate("delete from log")
stmt.close()
end
after(:all) do
File.unlink(@test_db_path)
Dir.rmdir(File.dirname(@test_db_path))
end
describe "safe statement" do
it "should register without errors" do
expect { plugin.register }.to_not raise_error
end
it "receive event, without error" do
plugin.register
expect { plugin.receive(event) }.to_not raise_error
expect(fetch_log_table_rowcount).to eq(1)
end
end
describe "unsafe statement" do
let(:event_fields) {
{ "message" => "This is a message!", "table" => "log" }
}
let(:test_settings) { {
"statement" => [ "insert into %{table} (message) values(?)", "message" ],
"unsafe_statement" => true
} }
it "should register without errors" do
expect { plugin.register }.to_not raise_error
end
it "receive event, without error" do
plugin.register
plugin.receive(event)
expect(fetch_log_table_rowcount).to eq(1)
end
end
end

Binary file not shown.

Binary file not shown.

Binary file not shown.