28 Commits
v1.4 ... v0.2.1

Author SHA1 Message Date
Karl
af55fde54a Merge pull request #25 from ebuildy/patch-1
Add Apache Phoenix example from @ebuildy
2015-12-22 09:37:27 +00:00
Thomas Decaux
9e05a01dff Add Apache Phoenix example 2015-12-07 10:07:52 +01:00
Karl
064647607e Merge pull request #24 from dmitryakadiamond/MariaDB-working-example
Maria db working example kindly provided by @dmitryakadiamond
2015-12-04 18:28:58 +00:00
Dmitry Morozov
1ece7f9abc formatting fix 2015-12-04 13:20:58 +00:00
Dmitry Morozov
38b7096419 README.md updated 2015-12-04 13:16:19 +00:00
Karl Southern
eef7473a0b Pushing. 2015-11-22 23:19:29 +00:00
Karl Southern
7a1da5b7cd Fix exceptions counter 2015-11-22 18:57:13 +00:00
Karl Southern
e56176bbea Fix missing nil counter 2015-11-19 14:29:47 +00:00
Karl Southern
49e751a9f8 Have to start some real work. Will complete tests over lunch. 2015-11-18 10:06:11 +00:00
Karl Southern
9804850714 WIP 2015-11-17 10:32:16 +00:00
Karl Southern
a6c669cc52 Adds unsafe_statement support 2015-11-15 12:35:57 +00:00
Karl Southern
e615829310 Stupid gemspec version number bullshit 2015-11-14 20:09:38 +00:00
Karl Southern
362e9ad0a0 Adds connection pooling 2015-11-14 20:06:35 +00:00
Karl Southern
4994cd810b Bump version 2015-11-06 15:02:18 +00:00
Karl Southern
1487b41b3e In retrospective, when would nil ever enter the equation at all? 2015-11-06 15:01:02 +00:00
Karl
dd29d16a31 Update logstash-output-jdbc.gemspec
Bump version.
2015-11-06 14:56:14 +00:00
Karl
ebe5596469 Update jdbc.rb
Removes improper nil check which breaks event sprintf formatting examples
2015-11-06 14:55:24 +00:00
Karl Southern
275cd6fc2f v2.0 tested 2015-10-30 18:29:22 +00:00
Karl Southern
7da6317083 Initial untested v2.0 commit 2015-10-30 17:55:42 +00:00
Karl
470e6309b5 Update README.md 2015-09-04 08:46:50 +01:00
Karl
a7dde52b7b Merge pull request #12 from jMonsinjon/master
Added information for connecting to a MySql Database
2015-09-04 08:44:54 +01:00
Jeremie MONSINJON
53c0f5761d Added information for connecting to a MySql Database 2015-09-03 17:37:51 +02:00
Karl Southern
4287c01037 Small style fix up for patch 2015-07-22 16:30:37 +01:00
Karl
4f93fa7224 Merge pull request #11 from kushtrimjunuzi/master
Handling null values and added boolean data type.
2015-07-22 16:28:06 +01:00
kushtrim junuzi
b52a7358ff Handling null values and added boolean data type.\n Handling jdbc inner exceptions. 2015-07-22 08:06:57 -04:00
Karl Southern
beb8f72560 Fixes up README 2015-06-26 12:57:45 +01:00
Karl Southern
6b6973d6ee Fixes up 1.5+ compatibility 2015-06-26 12:45:45 +01:00
Karl Southern
c790a645f2 Initial commit of 1.5. Untested. Lunchtime is over 2015-06-10 12:31:31 +01:00
12 changed files with 421 additions and 89 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
*.gem
Gemfile.lock
Gemfile.bak
.bundle

2
Gemfile Normal file
View File

@@ -0,0 +1,2 @@
source 'https://rubygems.org'
gemspec

118
README.md
View File

@@ -1,28 +1,61 @@
# logstash-jdbc
JDBC output plugin for Logstash.
# logstash-output-jdbc
This plugin is provided as an external plugin and is not part of the Logstash project.
Currently untested with logstash 1.5+. Support is planned.
This plugin allows you to output to SQL databases, using JDBC adapters.
See below for tested adapters, and example configurations.
## Warning
This has not yet been extensively tested with all JDBC drivers and may not yet work for you.
If you do find this works for a JDBC driver not listed, let me know and provide a small example configuration.
This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below.
## Headlines
- Support for connection pooling added in 0.2.0
- Support for unsafe statement handling (allowing dynamic queries) in 0.2.0
- Altered exception handling to now count sequential flushes with exceptions thrown in 0.2.0
## Versions
- See master branch for logstash v2+
- See v1.5 branch for logstash v1.5
- See v1.4 branch for logstash 1.4
## Installation
- Copy lib directory contents into your logstash installation.
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- Configure
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
- Now either:
- Use driver_class in your configuraton to specify a path to your jar file
- Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- And then configure (examples below)
## Running tests
Assuming valid JDBC jar, and jruby is setup and installed, and you have issued `jruby -S bundle install` in the development directory
- `SQL_JAR=path/to/your.jar jruby -S bundle exec rspec`
If you need to provide username and password you may do this via the environment variables `SQL_USERNAME` and `SQL_PASSWORD`.
Tests are not yet 100% complete.
## Configuration options
* driver_class, string, JDBC driver class to load
* connection_string, string, JDBC connection string
* statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim.
* flush_size, number, default = 1000, number of entries to buffer before sending to SQL
* idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time
* max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop
* max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value
| Option | Type | Description | Required? | Default |
| ------ | ---- | ----------- | --------- | ------- |
| driver_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No | |
| connection_string | String | JDBC connection URL | Yes | |
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No | |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No | |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes | |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No | False |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No | 5 |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No | 2800 |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No | 1000 |
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No | 1 |
| max_flush_exceptions | Number | Number of sequential flushes which cause an exception, before we stop logstash. Set to a value less than 1 if you never want it to stop. This should be carefully configured with relation to idle_flush_time if your SQL instance is not highly available. | No | 0 |
## Example configurations
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.
### SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
@@ -35,7 +68,6 @@ output {
stdout { }
jdbc {
driver_class => 'org.sqlite.JDBC'
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
@@ -51,7 +83,6 @@ input
}
output {
jdbc {
driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
@@ -67,7 +98,6 @@ input
}
output {
jdbc {
driver_class => 'org.postgresql.Driver'
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
@@ -85,11 +115,59 @@ input
}
output {
jdbc {
driver_class => "oracle.jdbc.driver.OracleDriver"
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
/* vim: set ts=4 sw=4 tw=0 :*/
### Mysql
With thanks to [@jMonsinjon](https://github.com/jMonsinjon)
* Tested with Version 14.14 Distrib 5.5.43, for debian-linux-gnu (x86_64)
* Tested using http://dev.mysql.com/downloads/file.php?id=457911 (mysql-connector-java-5.1.36-bin.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```
### MariaDB
* Tested with Ubuntu 14.04.3 LTS, Server version: 10.1.9-MariaDB-1~trusty-log mariadb.org binary distribution
* Tested using https://downloads.mariadb.com/enterprise/tqge-whfa/connectors/java/connector-java-1.3.2/mariadb-java-client-1.3.2.jar (mariadb-java-client-1.3.2.jar)
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:mariadb://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```
### Apache Phoenix (HBase SQL)
* Tested with Ubuntu 14.04.03 / Logstash 2.1 / Apache Phoenix 4.6
* <!> HBase and Zookeeper must be both accessible from logstash machine <!>
```
input
{
stdin { }
}
output {
jdbc {
connection_string => "jdbc:phoenix:ZOOKEEPER_HOSTNAME"
statement => [ "UPSERT INTO EVENTS log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
```

1
Rakefile Normal file
View File

@@ -0,0 +1 @@
require "logstash/devutils/rake"

View File

@@ -0,0 +1,5 @@
# encoding: utf-8
require 'logstash/environment'
root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")

View File

@@ -0,0 +1,17 @@
class RingBuffer < Array
attr_reader :max_size
def initialize(max_size, enum = nil)
@max_size = max_size
enum.each { |e| self << e } if enum
end
def <<(el)
if self.size < @max_size || @max_size.nil?
super
else
self.shift
self.push(el)
end
end
end

View File

@@ -3,23 +3,46 @@ require "logstash/outputs/base"
require "logstash/namespace"
require "stud/buffer"
require "java"
require "logstash-output-jdbc_jars"
require "logstash-output-jdbc_ring-buffer"
class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# Adds buffer support
include Stud::Buffer
config_name "jdbc"
milestone 1
# Driver class
config :driver_class, :validate => :string
# Driver class - No longer required
config :driver_class, :obsolete => "driver_class is no longer required and can be removed from your configuration"
# connection string
# Where to find the jar
# Defaults to not required, and to the original behaviour
config :driver_jar_path, :validate => :string, :required => false
# jdbc connection string
config :connection_string, :validate => :string, :required => true
# jdbc username - optional, maybe in the connection string
config :username, :validate => :string, :required => false
# jdbc password - optional, maybe in the connection string
config :password, :validate => :string, :required => false
# [ "insert into table (message) values(?)", "%{message}" ]
config :statement, :validate => :array, :required => true
# If this is an unsafe statement, use event.sprintf
# This also has potential performance penalties due to having to create a
# new statement for each event, rather than adding to the batch and issuing
# multiple inserts in 1 go
config :unsafe_statement, :validate => :boolean, :default => false
# Number of connections in the pool to maintain
config :max_pool_size, :validate => :number, :default => 5
# Connection timeout
config :connection_timeout, :validate => :number, :default => 2800
# We buffer a certain number of events before flushing that out to SQL.
# This setting controls how many events will be buffered before sending a
# batch of events.
@@ -36,45 +59,40 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
# a timely manner.
#
# If you change this value please ensure that you change
# max_repeat_exceptions_time accordingly.
# max_flush_exceptions accordingly.
config :idle_flush_time, :validate => :number, :default => 1
# Maximum number of repeating (sequential) exceptions, before we stop retrying
# Maximum number of sequential flushes which encounter exceptions, before we stop retrying.
# If set to < 1, then it will infinitely retry.
config :max_repeat_exceptions, :validate => :number, :default => 5
#
# You should carefully tune this in relation to idle_flush_time if your SQL server
# is not highly available.
# i.e. If your idle_flush_time is 1, and your max_flush_exceptions is 200, and your SQL server takes
# longer than 200 seconds to reboot, then logstash will stop.
config :max_flush_exceptions, :validate => :number, :default => 0
# The max number of seconds since the last exception, before we consider it
# a different cause.
# This value should be carefully considered in respect to idle_flush_time.
config :max_repeat_exceptions_time, :validate => :number, :default => 30
config :max_repeat_exceptions, :obsolete => "This has been replaced by max_flush_exceptions - which behaves slightly differently. Please check the documentation."
config :max_repeat_exceptions_time, :obsolete => "This is no longer required"
public
def register
@logger.info("JDBC - Starting up")
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
@logger.info(jarpath)
Dir[jarpath].each do |jar|
@logger.debug("JDBC - Loaded jar", :jar => jar)
require jar
end
load_jar_files!
import @driver_class
@pool = Java::ComZaxxerHikari::HikariDataSource.new
@pool.setJdbcUrl(@connection_string)
driver = Object.const_get(@driver_class[@driver_class.rindex('.') + 1, @driver_class.length]).new
@connection = driver.connect(@connection_string, java.util.Properties.new)
@pool.setUsername(@username) if @username
@pool.setPassword(@password) if @password
@logger.debug("JDBC - Created connection", :driver => driver, :connection => @connection)
@pool.setMaximumPoolSize(@max_pool_size)
@pool.setConnectionTimeout(@connection_timeout)
@exceptions_tracker = RingBuffer.new(@max_flush_exceptions)
if (@flush_size > 1000)
@logger.warn("JDBC - Flush size is set to > 1000. May have performance penalties, depending on your SQL engine.")
end
@repeat_exception_count = 0
@last_exception_time = Time.now
if (@max_repeat_exceptions > 0) and ((@idle_flush_time * @max_repeat_exceptions) > @max_repeat_exceptions_time)
@logger.warn("JDBC - max_repeat_exceptions_time is set such that it may still permit a looping exception. You probably changed idle_flush_time. Considering increasing max_repeat_exceptions_time.")
@logger.warn("JDBC - Flush size is set to > 1000")
end
buffer_initialize(
@@ -85,72 +103,155 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
end
def receive(event)
return unless output?(event)
return unless output?(event) or event.cancelled?
return unless @statement.length > 0
buffer_receive(event)
end
def flush(events, teardown=false)
statement = @connection.prepareStatement(@statement[0])
if @unsafe_statement == true
unsafe_flush(events, teardown)
else
safe_flush(events, teardown)
end
end
def on_flush_error(e)
return if @max_flush_exceptions < 1
@exceptions_tracker << e.class
if @exceptions_tracker.reject { |i| i.nil? }.count >= @max_flush_exceptions
@logger.error("JDBC - max_flush_exceptions has been reached")
log_jdbc_exception(e)
raise LogStash::ShutdownSignal.new
end
end
def teardown
buffer_flush(:final => true)
@pool.close()
super
end
private
def load_jar_files!
# Load jar from driver path
unless @driver_jar_path.nil?
raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path
require @driver_jar_path
return
end
# Revert original behaviour of loading from vendor directory
# if no path given
if ENV['LOGSTASH_HOME']
jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
else
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
end
@logger.debug("JDBC - jarpath", path: jarpath)
jars = Dir[jarpath]
raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty?
jars.each do |jar|
@logger.debug("JDBC - Loaded jar", :jar => jar)
require jar
end
end
def safe_flush(events, teardown=false)
connection = @pool.getConnection()
statement = connection.prepareStatement(@statement[0])
events.each do |event|
next if event.cancelled?
next if @statement.length < 2
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].iso8601())
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
else
statement.setString(idx + 1, event.sprintf(i))
end
end
statement = add_statement_event_params(statement, event)
statement.addBatch()
end
begin
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
statement.executeBatch()
statement.close()
@exceptions_tracker << nil
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# Since the exceutebatch failed this should mean any events failed to be
# inserted will be re-run. We're going to log it for the lols anyway.
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
log_jdbc_exception(e)
ensure
connection.close();
end
statement.close()
end
def on_flush_error(e)
return if @max_repeat_exceptions < 1
def unsafe_flush(events, teardown=false)
connection = @pool.getConnection()
if @last_exception == e.to_s
@repeat_exception_count += 1
else
@repeat_exception_count = 0
events.each do |event|
next if event.cancelled?
statement = connection.prepareStatement(event.sprintf(@statement[0]))
statement = add_statement_event_params(statement, event) if @statement.length > 1
begin
statement.execute()
# cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line
event.cancel
@exceptions_tracker << nil
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# We log for the lols.
log_jdbc_exception(e)
ensure
statement.close()
connection.close()
end
end
if (@repeat_exception_count >= @max_repeat_exceptions) and (Time.now - @last_exception_time) < @max_repeat_exceptions_time
@logger.error("JDBC - Exception repeated more than the maximum configured", :exception => e, :max_repeat_exceptions => @max_repeat_exceptions, :max_repeat_exceptions_time => @max_repeat_exceptions_time)
raise e
end
@last_exception_time = Time.now
@last_exception = e.to_s
end
def teardown
buffer_flush(:final => true)
@connection.close()
super
def add_statement_event_params(statement, event)
@statement[1..-1].each_with_index do |i, idx|
case event[i]
when Time, LogStash::Timestamp
# Most reliable solution, cross JDBC driver
statement.setString(idx + 1, event[i].iso8601())
when Fixnum, Integer
statement.setInt(idx + 1, event[i])
when Float
statement.setFloat(idx + 1, event[i])
when String
statement.setString(idx + 1, event[i])
when true
statement.setBoolean(idx + 1, true)
when false
statement.setBoolean(idx + 1, false)
else
if event[i].nil? and i =~ /%\{/
statement.setString(idx + 1, event.sprintf(i))
else
statement.setString(idx + 1, nil)
end
end
end
statement
end
def log_jdbc_exception(e)
ce = e
loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => ce)
ce = e.getNextException()
break if ce == nil
end
end
end # class LogStash::Outputs::jdbc

View File

@@ -0,0 +1,29 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc'
s.version = "0.2.1"
s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
s.authors = ["the_angry_angel"]
s.email = "karl+github@theangryangel.co.uk"
s.homepage = "https://github.com/theangryangel/logstash-output-jdbc"
s.require_paths = [ "lib" ]
# Files
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
# Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
s.add_runtime_dependency 'stud'
s.add_runtime_dependency "logstash-codec-plain"
s.add_development_dependency "logstash-devutils"
s.post_install_message = "logstash-output-jdbc 0.2.0 introduces several new features - please ensure you check the documentation in the README file"
end

95
spec/outputs/jdbc_spec.rb Normal file
View File

@@ -0,0 +1,95 @@
require "logstash/devutils/rspec/spec_helper"
require "logstash/outputs/jdbc"
require "stud/temporary"
require "java"
describe LogStash::Outputs::Jdbc do
def fetch_log_table_rowcount
# sleep for a second to let the flush happen
sleep 1
stmt = @sql.createStatement()
rs = stmt.executeQuery("select count(*) as total from log")
count = 0
while rs.next()
count = rs.getInt("total")
end
stmt.close()
return count
end
let(:base_settings) { {
"driver_jar_path" => @driver_jar_path,
"connection_string" => @test_connection_string,
"username" => ENV['SQL_USERNAME'],
"password" => ENV['SQL_PASSWORD'],
"statement" => [ "insert into log (message) values(?)", "message" ],
"max_pool_size" => 1,
"flush_size" => 1,
"max_flush_exceptions" => 1
} }
let(:test_settings) { {} }
let(:plugin) { LogStash::Outputs::Jdbc.new(base_settings.merge(test_settings)) }
let(:event_fields) { { "message" => "This is a message!" } }
let(:event) { LogStash::Event.new(event_fields) }
before(:all) do
@driver_jar_path = File.absolute_path(ENV['SQL_JAR'])
@test_db_path = File.join(Stud::Temporary.directory, "test.db")
@test_connection_string = "jdbc:sqlite:#{@test_db_path}"
require @driver_jar_path
@sql = java.sql.DriverManager.get_connection(@test_connection_string, ENV['SQL_USERNAME'].to_s, ENV['SQL_PASSWORD'].to_s)
stmt = @sql.createStatement()
stmt.executeUpdate("CREATE table log (host text, timestamp datetime, message text);")
stmt.close()
end
before(:each) do
stmt = @sql.createStatement()
stmt.executeUpdate("delete from log")
stmt.close()
end
after(:all) do
File.unlink(@test_db_path)
Dir.rmdir(File.dirname(@test_db_path))
end
describe "safe statement" do
it "should register without errors" do
expect { plugin.register }.to_not raise_error
end
it "receive event, without error" do
plugin.register
expect { plugin.receive(event) }.to_not raise_error
expect(fetch_log_table_rowcount).to eq(1)
end
end
describe "unsafe statement" do
let(:event_fields) {
{ "message" => "This is a message!", "table" => "log" }
}
let(:test_settings) { {
"statement" => [ "insert into %{table} (message) values(?)", "message" ],
"unsafe_statement" => true
} }
it "should register without errors" do
expect { plugin.register }.to_not raise_error
end
it "receive event, without error" do
plugin.register
plugin.receive(event)
expect(fetch_log_table_rowcount).to eq(1)
end
end
end

Binary file not shown.

Binary file not shown.

Binary file not shown.