Skip to content
This repository has been archived by the owner on Mar 11, 2022. It is now read-only.

Commit

Permalink
Add support for converting timestamps (#17)
Browse files Browse the repository at this point in the history
All 3 targets support parsing `UNIX` and `UNIX_MS`.

ecs-mapper lets users specify any kind of shorthand or date format as well, but does not validate them. They are passed as is to the target pipelines.

This is not a compatibility layer. However this can be useful once the user already knows which kind of pipeline they want to use, and know exactly which time format they need in the resulting pipeline.
  • Loading branch information
Tony Meehan authored Apr 27, 2020
1 parent 813c8ae commit 9ffcfae
Show file tree
Hide file tree
Showing 16 changed files with 494 additions and 70 deletions.
50 changes: 31 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@

## Warning

This tool is currently experimental and not yet officially supported.

Our goal is to solicit early feedback on making the process of converting data to ECS even easier.
So please feel welcome to open issues and PRs, and we will address them on a best effort basis.
This tool is currently experimental and not yet supported.
Our goal is to solicit feedback on the process to convert data to ECS to
make it even easier in the stack. Please feel welcome to open issues and
PRs and we will address them on a best effort basis.

## Synopsis

This tool turns a field mapping CSV to roughly equivalent pipelines for:
This tool turns a field mapping from a CSV to an equivalent pipeline for:

- Beats
- Elasticsearch
- Logstash
- [Beats](https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html)
- [Elasticsearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html)
- [Logstash](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html)

The goal of this tool is to generate starter pipelines of each flavor, to
help you get started quickly in mapping your event sources to ECS.
This tool generates starter pipelines for each solution above to help you
get started quickly in mapping new data sources to ECS.

A "mapping CSV" is what you get, when you start planning how to map an event
A mapping CSV is what you get when you start planning how to map a new data
source to ECS in a spreadsheet.

Colleagues may collaborate on a spreadsheet that looks like this:
Expand All @@ -31,37 +31,43 @@ Colleagues may collaborate on a spreadsheet that looks like this:
| ... | | |

You can export your spreadsheet to CSV, run it through the ECS mapper,
and get your starter pipelines generated.
and generate your starter pipelines.

Note that this tool generates starter pipelines. They only do the rename/copy
operations and some field format adjustments. It's up to you to integrate them
Note that this tool generates starter pipelines. They only do field rename and copy
operations as well as some field format adjustments. It's up to you to integrate them
in a complete pipeline that ingests and outputs the data however you need.

Scroll down to the [Examples](#examples) section below, to get right to a
Scroll down to the [Examples](#examples) section below to get right to a
concrete example you can play with.

## Maturity

This code is a proof of concept and is not officially supported.
The pipelines generated by this tool are not meant to be complete, nor production-ready.
The pipelines generated by this tool are likely not complete and probably need more
testing and validation before they are ready for production.
They are simply meant to give you a head start in mapping various sources to ECS.

## CSV Format

Here's more details on the CSV format supported by this tool. Since mapping
spreadsheets are primarily used by humans, it's totally fine to have as many columns
Here are more details on the CSV format supported by this tool. Since mapping
spreadsheets are used by humans, it's totally fine to have as many columns
as you need in your spreadsheets/CSV. Only the following columns will be considered:

| column name | required | allowed values | notes |
|-------------|----------|----------------|-------|
| source\_field | required | | A dotted Elasticsearch field name. Dots represent JSON nesting. Lines with empty "source\_field" are skipped. |
| destination\_field | required | | A dotted Elasticsearch field name. Dots represent JSON nesting. Can be left empty if there's no copy action (just a type conversion). |
| format\_action | optional | to\_float, to\_integer, to\_string, to\_boolean, to\_array, uppercase, lowercase, (empty) | Simple conversion to apply to the field value. |
| format\_action | optional | to\_float, to\_integer, to\_string, to\_boolean, to\_array, parse\_timestamp, uppercase, lowercase, (empty) | Simple conversion to apply to the field value. |
| timestamp\_format | optional | Only UNIX and UNIX\_MS formats are supported across all three tools. You may also specify other formats, like ISO8601, TAI64N, or a Java time pattern, but we will not validate whether the format is supported by the tool. |
| copy\_action | optional | rename, copy, (empty) | What to do with the field. If left empty, default action is based on the `--copy-action` flag. |

You can start from this
[spreadsheet template](https://docs.google.com/spreadsheets/d/1m5JiOTeZtUueW3VOVqS8bFYqNGEEyp0jAsgO12NFkNM). Make a copy of it in your Google Docs account, or download it as an Excel file.

When the destination field is @timestamp, then we always enforce an explicit date ```format_action``` of ```parse_timestamp``` to ```UNIX_MS``` avoid conversion problems downstream. If no ```timestamp_format``` is provided, then ```UNIX_MS``` is used. Please note that the timestamp layouts used by the [Filebeat processor for converting timestamps](https://www.elastic.co/guide/en/beats/filebeat/current/processor-timestamp.html) are different than the formats supported by date processors in Logstash and Elasticsearch Ingest Node.



## Usage and Dependencies

This is a simple Ruby program with no external dependencies, other than development
Expand Down Expand Up @@ -145,3 +151,9 @@ in [example/README.md](example/).

* At this time, the Beats pipelines don't perform "to\_array", "uppercase" nor
"lowercase" transformations. They could be implemented via the "script" processor.
* Only UNIX and UNIX\_MS timestamp formats are supported across Beats, Elasticsearch,
and Filebeat. For other timestamp formats, please modify the starter pipeline or add the
appropriate date processor in the generated pipeline by hand. Refer to the documentation
for [Beats](https://www.elastic.co/guide/en/beats/filebeat/current/processor-timestamp.html), [Elasticsearch](https://www.elastic.co/guide/en/elasticsearch/reference/master/date-processor.html), and [Logstash](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match).
* This tool does not currently support additional processors, like setting static
field values or dropping events based on a condition.
4 changes: 2 additions & 2 deletions ecs-mapper
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def main(options)
beats_pl = generate_beats_pipeline(mapping)
puts output_beats_pipeline(beats_pl, options[:output])

mutations, array_fields = generate_logstash_pipeline(mapping)
puts output_logstash_pipeline(mutations, array_fields, options[:output])
mutations, dates, array_fields = generate_logstash_pipeline(mapping)
puts output_logstash_pipeline(mutations, dates, array_fields, options[:output])
end

def create_output_directory!(dir)
Expand Down
21 changes: 21 additions & 0 deletions example/beats.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,25 @@
processors:
- timestamp:
field: some_timestamp_field
target_field: "@timestamp"
layouts: UNIX_MS
timezone: UTC
ignore_missing: true
ignore_failure: true
- timestamp:
field: some_other_timestamp
target_field: "@timestamp"
layouts: UNIX_MS
timezone: UTC
ignore_missing: true
ignore_failure: true
- timestamp:
field: some_new_timestamp
target_field: destination_timestamp
layouts: UNIX
timezone: UTC
ignore_missing: true
ignore_failure: true
- copy_fields:
fields:
- from: srcip
Expand Down
21 changes: 21 additions & 0 deletions example/beats/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ output.console:
# Add the generated Beats pipeline below

processors:
- timestamp:
field: some_timestamp_field
target_field: "@timestamp"
layouts: UNIX_MS
timezone: UTC
ignore_missing: true
ignore_failure: true
- timestamp:
field: some_other_timestamp
target_field: "@timestamp"
layouts: UNIX_MS
timezone: UTC
ignore_missing: true
ignore_failure: true
- timestamp:
field: some_new_timestamp
target_field: destination_timestamp
layouts: UNIX
timezone: UTC
ignore_missing: true
ignore_failure: true
- copy_fields:
fields:
- from: srcip
Expand Down
33 changes: 33 additions & 0 deletions example/elasticsearch.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,39 @@
"if": "ctx.new_event?.srcip != null"
}
},
{
"date": {
"field": "some_timestamp_field",
"target_field": "@timestamp",
"formats": [
"UNIX_MS"
],
"timezone": "UTC",
"ignore_failure": true
}
},
{
"date": {
"field": "some_other_timestamp",
"target_field": "@timestamp",
"formats": [
"UNIX_MS"
],
"timezone": "UTC",
"ignore_failure": true
}
},
{
"date": {
"field": "some_new_timestamp",
"target_field": "destination_timestamp",
"formats": [
"UNIX"
],
"timezone": "UTC",
"ignore_failure": true
}
},
{
"rename": {
"field": "srcport",
Expand Down
18 changes: 18 additions & 0 deletions example/logstash.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ filter {
convert => { '[event][id]' => 'string' }
rename => { '[hostip]' => '[host][ip]' }
}

date {
match => ["[some_timestamp_field]", "UNIX_MS"]
target => "[@timestamp]"
}


date {
match => ["[some_other_timestamp]", "UNIX_MS"]
target => "[@timestamp]"
}


date {
match => ["[some_new_timestamp]", "UNIX"]
target => "[destination_timestamp]"
}

if [host][ip] {
ruby {
code => "event.set('[host][ip]', Array(event.get('[host][ip]')) )"
Expand Down
18 changes: 18 additions & 0 deletions example/logstash/2-ecs-conversion.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ filter {
convert => { '[event][id]' => 'string' }
rename => { '[hostip]' => '[host][ip]' }
}

date {
match => ["[some_timestamp_field]", "UNIX_MS"]
target => "[@timestamp]"
}


date {
match => ["[some_other_timestamp]", "UNIX_MS"]
target => "[@timestamp]"
}


date {
match => ["[some_new_timestamp]", "UNIX"]
target => "[destination_timestamp]"
}

if [host][ip] {
ruby {
code => "event.set('[host][ip]', Array(event.get('[host][ip]')) )"
Expand Down
35 changes: 19 additions & 16 deletions example/mapping.csv
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
source_field,copy_action,format_action,destination_field,Notes
srcip,,,source.address,Copying srcip to source.address
srcip,,,source.ip,Copying srcip a second time to source.ip as well
new_event.srcip,,,source.ip,This new event type could also populate source.ip
srcport,rename,to_integer,source.port,
destip,,,destination.address,
destport,,to_integer,destination.port,
ts,copy,,timestamp,
action,rename,lowercase,event.action,
duration,rename,to_float,event.duration,
user_agent,rename,,user_agent.original,
log_level,rename,uppercase,log.level,
eventid,rename,to_string,event.id,IDs should be strings!
successful,,to_boolean,,
hostip,rename,to_array,host.ip,
process.args,,to_array,,
source_field,copy_action,format_action,timestamp_format,destination_field,Notes
srcip,,,,source.address,Copying srcip to source.address
srcip,,,,source.ip,Copying srcip a second time to source.ip as well
new_event.srcip,,,,source.ip,This new event type could also populate source.ip
some_timestamp_field,,parse_timestamp,,@timestamp,Convert this timestamp to UNIX_MS format
some_other_timestamp,,,,@timestamp,Convert this timestamp to default UNIX_MS
some_new_timestamp,,parse_timestamp,UNIX,destination_timestamp,Convert this timestamp to UNIX format
srcport,rename,to_integer,,source.port,
destip,,,,destination.address,
destport,,to_integer,,destination.port,
ts,copy,,,timestamp,
action,rename,lowercase,,event.action,
duration,rename,to_float,,event.duration,
user_agent,rename,,,user_agent.original,
log_level,rename,uppercase,,log.level,
eventid,rename,to_string,,event.id,IDs should be strings!
successful,,to_boolean,,,Format source field to boolean type
hostip,rename,to_array,,host.ip,
process.args,,to_array,,,Format source field to an array
25 changes: 14 additions & 11 deletions lib/beats_pipeline_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ def generate_beats_pipeline(mapping)
fields_to_copy = []
fields_to_rename = []
fields_to_convert = []
pipeline = []

mapping.each_pair do |_, row|
if same_field_name?(row)
next if row[:format_action].nil?
end

source_field = row[:source_field]

if row[:destination_field]
if row[:destination_field] and not ['parse_timestamp'].include?(row[:format_action])
statement = {
'from' => source_field,
'to' => row[:destination_field],
Expand Down Expand Up @@ -42,20 +44,21 @@ def generate_beats_pipeline(mapping)
statement = { 'from' => affected_field, 'type' => type }
fields_to_convert << statement

# elsif ['uppercase', 'lowercase', 'to_array'].include?(row[:format_action])
# processor = {
# 'script' => {
# 'lang' => 'javascript',
# 'source' => ... # herein lies all the fun :-)
# ignore_failure: true,
# }
# }

elsif ['parse_timestamp'].include?(row[:format_action])
pipeline << {
'timestamp' => {
'field' => row[:source_field],
'target_field' => row[:destination_field],
'layouts' => row[:timestamp_format],
'timezone' => "UTC",
'ignore_missing' => true,
'ignore_failure' => true
}
}
end
end
end

pipeline = []
if fields_to_copy.size > 0
pipeline << {
'copy_fields' => { 'fields' => fields_to_copy,
Expand Down
14 changes: 13 additions & 1 deletion lib/elasticsearch_pipeline_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def generate_elasticsearch_pipeline(mapping)
source_field = row[:source_field]

# copy/rename
if row[:destination_field]
if row[:destination_field] and not ['parse_timestamp'].include?(row[:format_action])
if 'copy' == row[:copy_action]
processor = {
set: {
Expand Down Expand Up @@ -76,7 +76,19 @@ def generate_elasticsearch_pipeline(mapping)
if: field_presence_predicate(affected_field),
}
}

elsif ['parse_timestamp'].include?(row[:format_action])
processor = {
'date' => {
field: row[:source_field],
target_field: row[:destination_field],
formats: [ row[:timestamp_format] ],
timezone: "UTC",
ignore_failure: true
}
}
end

end
pipeline << processor if processor # Skip lower/upper and others not done by convert processor
end
Expand Down
Loading

0 comments on commit 9ffcfae

Please sign in to comment.