Compare commits
No commits in common. "master" and "v5.3.0" have entirely different histories.
@ -1,11 +1,6 @@
|
|||||||
# Change Log
|
# Change Log
|
||||||
All notable changes to this project will be documented in this file, from 0.2.0.
|
All notable changes to this project will be documented in this file, from 0.2.0.
|
||||||
|
|
||||||
## [5.3.0] - 2017-11-08
|
|
||||||
- Adds configuration options `enable_event_as_json_keyword` and `event_as_json_keyword`
|
|
||||||
- Adds BigDecimal support
|
|
||||||
- Adds additional logging for debugging purposes (with thanks to @mlkmhd's work)
|
|
||||||
|
|
||||||
## [5.2.1] - 2017-04-09
|
## [5.2.1] - 2017-04-09
|
||||||
- Adds Array and Hash to_json support for non-sprintf syntax
|
- Adds Array and Hash to_json support for non-sprintf syntax
|
||||||
|
|
||||||
|
@ -56,8 +56,6 @@ For development:
|
|||||||
| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
|
| retry_initial_interval | Number | Number of seconds before the initial retry in the event of a failure. On each failure it will be doubled until it reaches retry_max_interval | No | 2 |
|
||||||
| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
|
| retry_max_interval | Number | Maximum number of seconds between each retry | No | 128 |
|
||||||
| retry_sql_states | Array of strings | An array of custom SQL state codes you wish to retry until `max_flush_exceptions`. Useful if you're using a JDBC driver which returns retry-able, but non-standard SQL state codes in it's exceptions. | No | [] |
|
| retry_sql_states | Array of strings | An array of custom SQL state codes you wish to retry until `max_flush_exceptions`. Useful if you're using a JDBC driver which returns retry-able, but non-standard SQL state codes in it's exceptions. | No | [] |
|
||||||
| event_as_json_keyword | String | The magic key word that the plugin looks for to convert the entire event into a JSON object. As Logstash does not support this out of the box with it's `sprintf` implementation, you can use whatever this field is set to in the statement parameters | No | @event |
|
|
||||||
| enable_event_as_json_keyword | Boolean | Enables the magic keyword set in the configuration option `event_as_json_keyword`. Without this enabled the plugin will not convert the `event_as_json_keyword` into JSON encoding of the entire event. | No | False |
|
|
||||||
|
|
||||||
## Example configurations
|
## Example configurations
|
||||||
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
|
Example logstash configurations, can now be found in the examples directory. Where possible we try to link every configuration with a tested jar.
|
||||||
|
@ -269,7 +269,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
|||||||
|
|
||||||
def add_statement_event_params(statement, event)
|
def add_statement_event_params(statement, event)
|
||||||
@statement[1..-1].each_with_index do |i, idx|
|
@statement[1..-1].each_with_index do |i, idx|
|
||||||
if @enable_event_as_json_keyword == true and i.is_a? String and i == @event_as_json_keyword
|
if @enable_event_as_json_keyword and i.is_a? String and i == @event_as_json_keyword
|
||||||
value = event.to_json
|
value = event.to_json
|
||||||
elsif i.is_a? String
|
elsif i.is_a? String
|
||||||
value = event.get(i)
|
value = event.get(i)
|
||||||
@ -300,8 +300,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
|||||||
statement.setInt(idx + 1, value)
|
statement.setInt(idx + 1, value)
|
||||||
end
|
end
|
||||||
when BigDecimal
|
when BigDecimal
|
||||||
# TODO: There has to be a better way than this. Find it.
|
statement.setBigDecimal(idx + 1, value)
|
||||||
statement.setBigDecimal(idx + 1, java.math.BigDecimal.new(value.to_s))
|
|
||||||
when Float
|
when Float
|
||||||
statement.setFloat(idx + 1, value)
|
statement.setFloat(idx + 1, value)
|
||||||
when String
|
when String
|
||||||
@ -327,14 +326,16 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
|
|||||||
|
|
||||||
def log_jdbc_exception(exception, retrying, event)
|
def log_jdbc_exception(exception, retrying, event)
|
||||||
current_exception = exception
|
current_exception = exception
|
||||||
log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying')
|
log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') + '.'
|
||||||
|
|
||||||
|
if(event != nil)
|
||||||
|
log_text += ' event: "' + event + '".'
|
||||||
|
end
|
||||||
|
|
||||||
log_method = (retrying ? 'warn' : 'error')
|
log_method = (retrying ? 'warn' : 'error')
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
# TODO reformat event output so that it only shows the fields necessary.
|
@logger.send(log_method, log_text, :exception => current_exception)
|
||||||
|
|
||||||
@logger.send(log_method, log_text, :exception => current_exception, :statement => @statement[0], :event => event)
|
|
||||||
|
|
||||||
if current_exception.respond_to? 'getNextException'
|
if current_exception.respond_to? 'getNextException'
|
||||||
current_exception = current_exception.getNextException()
|
current_exception = current_exception.getNextException()
|
||||||
|
@ -1,10 +1,8 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
|
wget http://search.maven.org/remotecontent?filepath=org/apache/derby/derby/10.12.1.1/derby-10.12.1.1.jar -O /tmp/derby.jar
|
||||||
|
|
||||||
sudo apt-get install mysql-server postgresql-client postgresql -qq -y
|
sudo apt-get install mysql-server -qq -y
|
||||||
echo "create database logstash; grant all privileges on logstash.* to 'logstash'@'localhost' identified by 'logstash'; flush privileges;" | sudo -u root mysql
|
echo "create database logstash_output_jdbc_test;" | mysql -u root
|
||||||
echo "create user logstash PASSWORD 'logstash'; create database logstash; grant all privileges on database logstash to logstash;" | sudo -u postgres psql
|
|
||||||
|
|
||||||
wget http://search.maven.org/remotecontent?filepath=mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar -O /tmp/mysql.jar
|
wget http://search.maven.org/remotecontent?filepath=mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar -O /tmp/mysql.jar
|
||||||
wget http://search.maven.org/remotecontent?filepath=org/xerial/sqlite-jdbc/3.8.11.2/sqlite-jdbc-3.8.11.2.jar -O /tmp/sqlite.jar
|
wget http://search.maven.org/remotecontent?filepath=org/xerial/sqlite-jdbc/3.8.11.2/sqlite-jdbc-3.8.11.2.jar -O /tmp/sqlite.jar
|
||||||
wget http://central.maven.org/maven2/org/postgresql/postgresql/42.1.4/postgresql-42.1.4.jar -O /tmp/postgres.jar
|
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
export JDBC_DERBY_JAR=/tmp/derby.jar
|
export JDBC_DERBY_JAR=/tmp/derby.jar
|
||||||
export JDBC_MYSQL_JAR=/tmp/mysql.jar
|
export JDBC_MYSQL_JAR=/tmp/mysql.jar
|
||||||
export JDBC_SQLITE_JAR=/tmp/sqlite.jar
|
export JDBC_SQLITE_JAR=/tmp/sqlite.jar
|
||||||
export JDBC_POSTGRES_JAR=/tmp/postgres.jar
|
|
||||||
|
|
||||||
|
@ -4,8 +4,6 @@ require 'stud/temporary'
|
|||||||
require 'java'
|
require 'java'
|
||||||
require 'securerandom'
|
require 'securerandom'
|
||||||
|
|
||||||
RSpec::Support::ObjectFormatter.default_instance.max_formatted_output_length = 80000
|
|
||||||
|
|
||||||
RSpec.configure do |c|
|
RSpec.configure do |c|
|
||||||
|
|
||||||
def start_service(name)
|
def start_service(name)
|
||||||
@ -60,53 +58,24 @@ RSpec.shared_context 'when outputting messages' do
|
|||||||
"DROP TABLE #{jdbc_test_table}"
|
"DROP TABLE #{jdbc_test_table}"
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:jdbc_statement_fields) do
|
|
||||||
[
|
|
||||||
{db_field: "created_at", db_type: "datetime", db_value: '?', event_field: '@timestamp'},
|
|
||||||
{db_field: "message", db_type: "varchar(512)", db_value: '?', event_field: 'message'},
|
|
||||||
{db_field: "message_sprintf", db_type: "varchar(512)", db_value: '?', event_field: 'sprintf-%{message}'},
|
|
||||||
{db_field: "static_int", db_type: "int", db_value: '?', event_field: 'int'},
|
|
||||||
{db_field: "static_bigint", db_type: "bigint", db_value: '?', event_field: 'bigint'},
|
|
||||||
{db_field: "static_float", db_type: "float", db_value: '?', event_field: 'float'},
|
|
||||||
{db_field: "static_bool", db_type: "boolean", db_value: '?', event_field: 'bool'},
|
|
||||||
{db_field: "static_bigdec", db_type: "decimal", db_value: '?', event_field: 'bigdec'}
|
|
||||||
]
|
|
||||||
end
|
|
||||||
|
|
||||||
let(:jdbc_create_table) do
|
let(:jdbc_create_table) do
|
||||||
fields = jdbc_statement_fields.collect { |entry| "#{entry[:db_field]} #{entry[:db_type]} not null" }.join(", ")
|
"CREATE table #{jdbc_test_table} (created_at datetime not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit bit not null, static_bigint bigint not null, static_float float not null)"
|
||||||
|
|
||||||
"CREATE table #{jdbc_test_table} (#{fields})"
|
|
||||||
end
|
|
||||||
|
|
||||||
let(:jdbc_drop_table) do
|
|
||||||
"DROP table #{jdbc_test_table}"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:jdbc_statement) do
|
let(:jdbc_statement) do
|
||||||
fields = jdbc_statement_fields.collect { |entry| "#{entry[:db_field]}" }.join(", ")
|
["insert into #{jdbc_test_table} (created_at, message, message_sprintf, static_int, static_bit, static_bigint, static_float) values(?, ?, ?, ?, ?, ?, ?)", '@timestamp', 'message', 'sprintf-%{message}', 1, true, 4000881632477184, 12.1]
|
||||||
values = jdbc_statement_fields.collect { |entry| "#{entry[:db_value]}" }.join(", ")
|
|
||||||
statement = jdbc_statement_fields.collect { |entry| entry[:event_field] }
|
|
||||||
|
|
||||||
statement.insert(0, "insert into #{jdbc_test_table} (#{fields}) values(#{values})")
|
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:systemd_database_service) do
|
let(:systemd_database_service) do
|
||||||
nil
|
nil
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:event) do
|
let(:event_fields) do
|
||||||
# TODO: Auto generate fields from jdbc_statement_fields
|
{ message: "test-message #{SecureRandom.uuid}" }
|
||||||
LogStash::Event.new({
|
|
||||||
message: "test-message #{SecureRandom.uuid}",
|
|
||||||
float: 12.1,
|
|
||||||
bigint: 4000881632477184,
|
|
||||||
bool: true,
|
|
||||||
int: 1,
|
|
||||||
bigdec: BigDecimal.new("123.123")
|
|
||||||
})
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
let(:event) { LogStash::Event.new(event_fields) }
|
||||||
|
|
||||||
let(:plugin) do
|
let(:plugin) do
|
||||||
# Setup logger
|
# Setup logger
|
||||||
allow(LogStash::Outputs::Jdbc).to receive(:logger).and_return(logger)
|
allow(LogStash::Outputs::Jdbc).to receive(:logger).and_return(logger)
|
||||||
@ -124,12 +93,8 @@ RSpec.shared_context 'when outputting messages' do
|
|||||||
output = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
|
output = LogStash::Plugin.lookup('output', 'jdbc').new(jdbc_settings)
|
||||||
output.register
|
output.register
|
||||||
|
|
||||||
output
|
|
||||||
end
|
|
||||||
|
|
||||||
before :each do
|
|
||||||
# Setup table
|
# Setup table
|
||||||
c = plugin.instance_variable_get(:@pool).getConnection
|
c = output.instance_variable_get(:@pool).getConnection
|
||||||
|
|
||||||
# Derby doesn't support IF EXISTS.
|
# Derby doesn't support IF EXISTS.
|
||||||
# Seems like the quickest solution. Bleurgh.
|
# Seems like the quickest solution. Bleurgh.
|
||||||
@ -146,16 +111,8 @@ RSpec.shared_context 'when outputting messages' do
|
|||||||
stmt.close
|
stmt.close
|
||||||
c.close
|
c.close
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
# Delete table after each
|
output
|
||||||
after :each do
|
|
||||||
c = plugin.instance_variable_get(:@pool).getConnection
|
|
||||||
|
|
||||||
stmt = c.createStatement
|
|
||||||
stmt.executeUpdate(jdbc_drop_table)
|
|
||||||
stmt.close
|
|
||||||
c.close
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'should save a event' do
|
it 'should save a event' do
|
||||||
@ -163,9 +120,6 @@ RSpec.shared_context 'when outputting messages' do
|
|||||||
|
|
||||||
# Verify the number of items in the output table
|
# Verify the number of items in the output table
|
||||||
c = plugin.instance_variable_get(:@pool).getConnection
|
c = plugin.instance_variable_get(:@pool).getConnection
|
||||||
|
|
||||||
# TODO replace this simple count with a check of the actual contents
|
|
||||||
|
|
||||||
stmt = c.prepareStatement("select count(*) as total from #{jdbc_test_table} where message = ?")
|
stmt = c.prepareStatement("select count(*) as total from #{jdbc_test_table} where message = ?")
|
||||||
stmt.setString(1, event.get('message'))
|
stmt.setString(1, event.get('message'))
|
||||||
rs = stmt.executeQuery
|
rs = stmt.executeQuery
|
||||||
@ -189,7 +143,7 @@ RSpec.shared_context 'when outputting messages' do
|
|||||||
end
|
end
|
||||||
|
|
||||||
it 'it should retry after a connection loss, and log a warning' do
|
it 'it should retry after a connection loss, and log a warning' do
|
||||||
skip "does not run as a service, or known issue with test" if systemd_database_service.nil?
|
skip "does not run as a service" if systemd_database_service.nil?
|
||||||
|
|
||||||
p = plugin
|
p = plugin
|
||||||
|
|
||||||
|
@ -2,23 +2,15 @@ require_relative '../jdbc_spec_helper'
|
|||||||
|
|
||||||
describe 'logstash-output-jdbc: derby', if: ENV['JDBC_DERBY_JAR'] do
|
describe 'logstash-output-jdbc: derby', if: ENV['JDBC_DERBY_JAR'] do
|
||||||
include_context 'rspec setup'
|
include_context 'rspec setup'
|
||||||
|
include_context 'when initializing'
|
||||||
include_context 'when outputting messages'
|
include_context 'when outputting messages'
|
||||||
|
|
||||||
let(:jdbc_jar_env) do
|
let(:jdbc_jar_env) do
|
||||||
'JDBC_DERBY_JAR'
|
'JDBC_DERBY_JAR'
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:jdbc_statement_fields) do
|
let(:jdbc_create_table) do
|
||||||
[
|
"CREATE table #{jdbc_test_table} (created_at timestamp not null, message varchar(512) not null, message_sprintf varchar(512) not null, static_int int not null, static_bit boolean not null, static_bigint bigint not null, static_float float not null)"
|
||||||
{db_field: "created_at", db_type: "timestamp", db_value: 'CAST(? as timestamp)', event_field: '@timestamp'},
|
|
||||||
{db_field: "message", db_type: "varchar(512)", db_value: '?', event_field: 'message'},
|
|
||||||
{db_field: "message_sprintf", db_type: "varchar(512)", db_value: '?', event_field: 'sprintf-%{message}'},
|
|
||||||
{db_field: "static_int", db_type: "int", db_value: '?', event_field: 'int'},
|
|
||||||
{db_field: "static_bigint", db_type: "bigint", db_value: '?', event_field: 'bigint'},
|
|
||||||
{db_field: "static_float", db_type: "float", db_value: '?', event_field: 'float'},
|
|
||||||
{db_field: "static_bool", db_type: "boolean", db_value: '?', event_field: 'bool'},
|
|
||||||
{db_field: "static_bigdec", db_type: "decimal", db_value: '?', event_field: 'bigdec'}
|
|
||||||
]
|
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:jdbc_settings) do
|
let(:jdbc_settings) do
|
||||||
|
@ -2,6 +2,7 @@ require_relative '../jdbc_spec_helper'
|
|||||||
|
|
||||||
describe 'logstash-output-jdbc: mysql', if: ENV['JDBC_MYSQL_JAR'] do
|
describe 'logstash-output-jdbc: mysql', if: ENV['JDBC_MYSQL_JAR'] do
|
||||||
include_context 'rspec setup'
|
include_context 'rspec setup'
|
||||||
|
include_context 'when initializing'
|
||||||
include_context 'when outputting messages'
|
include_context 'when outputting messages'
|
||||||
|
|
||||||
let(:jdbc_jar_env) do
|
let(:jdbc_jar_env) do
|
||||||
@ -15,7 +16,7 @@ describe 'logstash-output-jdbc: mysql', if: ENV['JDBC_MYSQL_JAR'] do
|
|||||||
let(:jdbc_settings) do
|
let(:jdbc_settings) do
|
||||||
{
|
{
|
||||||
'driver_class' => 'com.mysql.jdbc.Driver',
|
'driver_class' => 'com.mysql.jdbc.Driver',
|
||||||
'connection_string' => 'jdbc:mysql://localhost/logstash?user=logstash&password=logstash',
|
'connection_string' => 'jdbc:mysql://localhost/logstash_output_jdbc_test?user=root',
|
||||||
'driver_jar_path' => ENV[jdbc_jar_env],
|
'driver_jar_path' => ENV[jdbc_jar_env],
|
||||||
'statement' => jdbc_statement,
|
'statement' => jdbc_statement,
|
||||||
'max_flush_exceptions' => 1
|
'max_flush_exceptions' => 1
|
||||||
|
@ -1,41 +0,0 @@
|
|||||||
require_relative '../jdbc_spec_helper'
|
|
||||||
|
|
||||||
describe 'logstash-output-jdbc: postgres', if: ENV['JDBC_POSTGRES_JAR'] do
|
|
||||||
include_context 'rspec setup'
|
|
||||||
include_context 'when outputting messages'
|
|
||||||
|
|
||||||
let(:jdbc_jar_env) do
|
|
||||||
'JDBC_POSTGRES_JAR'
|
|
||||||
end
|
|
||||||
|
|
||||||
# TODO: Postgres doesnt kill connections fast enough for the test to pass
|
|
||||||
# Investigate options.
|
|
||||||
|
|
||||||
#let(:systemd_database_service) do
|
|
||||||
# 'postgresql'
|
|
||||||
#end
|
|
||||||
|
|
||||||
let(:jdbc_statement_fields) do
|
|
||||||
[
|
|
||||||
{db_field: "created_at", db_type: "timestamp", db_value: 'CAST(? as timestamp)', event_field: '@timestamp'},
|
|
||||||
{db_field: "message", db_type: "varchar(512)", db_value: '?', event_field: 'message'},
|
|
||||||
{db_field: "message_sprintf", db_type: "varchar(512)", db_value: '?', event_field: 'sprintf-%{message}'},
|
|
||||||
{db_field: "static_int", db_type: "int", db_value: '?', event_field: 'int'},
|
|
||||||
{db_field: "static_bigint", db_type: "bigint", db_value: '?', event_field: 'bigint'},
|
|
||||||
{db_field: "static_float", db_type: "float", db_value: '?', event_field: 'float'},
|
|
||||||
{db_field: "static_bool", db_type: "boolean", db_value: '?', event_field: 'bool'},
|
|
||||||
{db_field: "static_bigdec", db_type: "decimal", db_value: '?', event_field: 'bigdec'}
|
|
||||||
|
|
||||||
]
|
|
||||||
end
|
|
||||||
|
|
||||||
let(:jdbc_settings) do
|
|
||||||
{
|
|
||||||
'driver_class' => 'org.postgresql.Driver',
|
|
||||||
'connection_string' => 'jdbc:postgresql://localhost/logstash?user=logstash&password=logstash',
|
|
||||||
'driver_jar_path' => ENV[jdbc_jar_env],
|
|
||||||
'statement' => jdbc_statement,
|
|
||||||
'max_flush_exceptions' => 1
|
|
||||||
}
|
|
||||||
end
|
|
||||||
end
|
|
@ -8,6 +8,7 @@ describe 'logstash-output-jdbc: sqlite', if: ENV['JDBC_SQLITE_JAR'] do
|
|||||||
end
|
end
|
||||||
|
|
||||||
include_context 'rspec setup'
|
include_context 'rspec setup'
|
||||||
|
include_context 'when initializing'
|
||||||
include_context 'when outputting messages'
|
include_context 'when outputting messages'
|
||||||
|
|
||||||
let(:jdbc_jar_env) do
|
let(:jdbc_jar_env) do
|
||||||
|
Loading…
x
Reference in New Issue
Block a user