How to aggregate logs with no common data with Logstash and rsyslog

How to aggregate logs with no common data with Logstash and rsyslog

Have this example logfile:


aaaa ffff ggg
1111 6666 777
xxxx pppp qqq

Any of these lines have any field in common. So, we can not correlate them, having
in consideration the aggregate plugin in Logstash require a common field to use as task_id.

So, my take was to use rsyslog to forward these logs to a centralized location for ingesting later.

The basic idea is this: rsyslog, being syslog, adds some fields to each line: a timestamp, the host from
where the logs originate and the program generating the messages.

First, the forwarding: have rsyslog read the original logfile and send it to a remote syslog server for storage:


module(load="imfile" PollingInterval="10")
input(type="imfile"
File="{{ forwarder_log_file }}"
Tag="{{ forwarder_log_tag }}"
Severity="notice"
Facility="local7")
:syslogtag, isequal, "{{ forwarder_log_tag }}" {{ forwarder_syslog_logfile }}
& stop

Then, on the receiving side, enable the rsyslog tcp module for storing the logs locally:


module(load="imudp")
input(type="imudp" port={{ receiver_syslog_port }})
module(load="imtcp")
input(type="imtcp" port={{ receiver_syslog_port }})

Next, configure Filebeat to read the stored logs and send them to Logstash


filebeat.inputs:
- type: filestream
id: {{ forwarder_log_tag }}
enabled: true
paths:
- {{ forwarder_syslog_logfile }}
filebeat.config.modules:
path: /etc/filebeat/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
output.logstash:
hosts: ["{{ processor_logstash_address }}:{{ processor_logstash_port }}"]
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

After receiving the logs, rsyslog stores them in a logfile in /var/log

The result is this:


May 17 05:15:47 fear test-log aaaa ffff ggg
May 17 05:15:55 fear test-log 1111 6666 777
May 17 05:16:04 fear test-log xxxx pppp qqq

Note the timestamp, hostname and program name added to the beggining of each line.
Now you have a common field, the program name (test-log).

You will use the program name as task_id for the aggregation step in Logstash.

Now, the pipeline in Logstash.

This pipeline do these things:
1. Identifies each line by looking for a string. This assumes each line has a unique string.
2. For each line, adds a tag, identifying each line, in order.
3. Adds a name to each field.
4. Initializes a map, adding each field to it.
5. After reading the last line, creates new, distinct fields, using as values the stored contents on the aforementioned map. Your data are on these new fields

For this to work, remember to configure pipeline.workers to 1 in logstash.yml. You are required to do this.

This is the pipeline:


input {
beats {
port => {{ processor_beats_port }}
}
}
filter {
if "aaaa" in [message] {
mutate {
add_tag => ["start"]
}
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:eventdate} %{SYSLOGHOST:eventhost} %{SYSLOGPROG:eventprog} %{GREEDYDATA:field1} %{GREEDYDATA:field2} %{GREEDYDATA:field3}"
}
}
aggregate {
task_id => "%{eventprog}"
code => "
map['eventdata'] = []
map['eventdata'] << event.get('field1')
map['eventdata'] << event.get('field2')
map['eventdata'] << event.get('field3')
"
map_action => "create"
}
}
elseif "1111" in [message] {
mutate {
add_tag => ["middle"]
}
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:middledate} %{SYSLOGHOST:middlehost} %{SYSLOGPROG:eventprog} %{GREEDYDATA:field4} %{GREEDYDATA:field5} %{GREEDYDATA:field6}"
}
}
aggregate {
task_id => "%{eventprog}"
code => "
map['eventdata'] << event.get('field4')
map['eventdata'] << event.get('field5')
map['eventdata'] << event.get('field6')
"
map_action => "update"
}
}
elseif "xxxx" in [message] {
mutate {
add_tag => ["finish"]
}
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:enddate} %{SYSLOGHOST:endhost} %{SYSLOGPROG:eventprog} %{GREEDYDATA:field7} %{GREEDYDATA:field8} %{GREEDYDATA:field9}"
}
}
aggregate {
task_id => "%{eventprog}"
code => "
map['eventdata'] << event.get('field7')
map['eventdata'] << event.get('field8')
map['eventdata'] << event.get('field9')
event.set('eventdata', map['eventdata'])
event.set('eventfield1', map['eventdata'][0])
event.set('eventfield2', map['eventdata'][1])
event.set('eventfield3', map['eventdata'][2])
event.set('eventfield4', map['eventdata'][3])
event.set('eventfield5', map['eventdata'][4])
event.set('eventfield6', map['eventdata'][5])
event.set('eventfield7', map['eventdata'][6])
event.set('eventfield8', map['eventdata'][7])
event.set('eventfield9', map['eventdata'][8])
"
map_action => "update"
end_of_task => true
}
}
}
output {
elasticsearch {
hosts => ["https://{{ search_elasticsearch_address }}:{{ search_elasticsearch_port }}"]
index => "{{ forwarder_log_tag }}-%{+YYYY.MM.dd}"
ssl => true
cacert => '{{ processor_logstash_ca_certificate }}'
user => '{{ search_elasticsearch_user }}'
password => '{{ search_elasticsearch_password }}'
ssl_verification_mode => none
}
}

After processing, Logstash generate these:


{
"field2" => "ffff",
"process" => {
"name" => "test-log"
},
"field1" => "aaaa",
"host" => {
"hostname" => "fear.hell"
},
"tags" => [
[0] "start"
],
"@timestamp" => 2024-05-17T09:30:52.360920825Z,
"@version" => "1",
"message" => "May 17 05:15:47 fear test-log aaaa ffff ggg",
"field3" => "ggg",
"eventhost" => "fear",
"event" => {
"original" => "May 17 05:15:47 fear test-log aaaa ffff ggg"
},
"eventprog" => "test-log",
"eventdate" => "May 17 05:15:47"
}
{
"process" => {
"name" => "test-log"
},
"host" => {
"hostname" => "fear.hell"
},
"tags" => [
[0] "middle"
],
"middlehost" => "fear",
"@timestamp" => 2024-05-17T09:30:52.363651601Z,
"@version" => "1",
"field4" => "1111",
"message" => "May 17 05:15:55 fear test-log 1111 6666 777",
"field5" => "6666",
"middledate" => "May 17 05:15:55",
"field6" => "777",
"event" => {
"original" => "May 17 05:15:55 fear test-log 1111 6666 777"
},
"eventprog" => "test-log"
}
{
"eventdata" => [
[0] "aaaa",
[1] "ffff",
[2] "ggg",
[3] "1111",
[4] "6666",
[5] "777",
[6] "xxxx",
[7] "pppp",
[8] "qqq"
],
"host" => {
"hostname" => "fear.hell"
},
"tags" => [
[0] "finish"
],
"endhost" => "fear",
"field7" => "xxxx",
"enddate" => "May 17 05:16:04",
"@version" => "1",
"field8" => "pppp",
"message" => "May 17 05:16:04 fear test-log xxxx pppp qqq",
"eventfield2" => "ffff",
"field9" => "qqq",
"eventfield6" => "777",
"process" => {
"name" => "test-log"
},
"@timestamp" => 2024-05-17T09:30:52.364583258Z,
"eventfield4" => "1111",
"eventfield5" => "6666",
"eventfield8" => "pppp",
"eventfield3" => "ggg",
"eventfield7" => "xxxx",
"event" => {
"original" => "May 17 05:16:04 fear test-log xxxx pppp qqq"
},
"eventprog" => "test-log",
"eventfield1" => "aaaa",
"eventfield9" => "qqq"
}

With this, you have your data on separate fields, eventfieldX.

Now, you can send them to Elasticsearch for further analysis.

I have an Ansible playbook i used to deploy all the components: Deploy ELK + rsyslog