Introduction

    Logstash is the one core parts of the now reknown ELK stack. A lot of people in the IT industry now use this stack to centralize its logs, analyze them and create dashboards using all this data. Most of us do so by sending logs into a Logstash instance, whether it’s using Filebeat for files, Logspout for containers or anything else. We then extract some fields from these logs using a grok pattern, and tunnel these sets of fields into elasticsearch events.

    But what happens if your “events” are spread along several lines of logs, and you want to have all of this event’s data into a single elasticsearch event ?

    One example of this need is java/python stack traces. They belong together and lines are worthless on their own. Filebeat has a “multiline” feature where you define a start pattern, and it will aggregate all lignes between 2 occurrences of this pattern with the first one.
    But what’s interesting us today are postfix logs, they’re not that simple.

Postfix Logs

    Postfix is a very complex piece of software, capable of doing a lot of things in a lot of different ways. In my opinion, it is a very early example of a microservice architecture of some sort. It has several components, who are either daemons, scheduled tasks or on-the-fly agents handling a single task each time.
    But let’s assume we have a pretty common setup, where postfix is simply configured to listen on the loopback interface (lo : 127.0.0.1) on port 25, and forward any correctly formatted email to a centralized mail relay.

Nov  8 11:02:12 myserver postfix/smtpd[31097]: connect from localhost[127.0.0.1]
Nov  8 11:02:12 myserver postfix/smtpd[31097]: 3A9C96093D: client=localhost[127.0.0.1]
Nov  8 11:02:12 myserver postfix/cleanup[31124]: 3A9C96093D: message-id=<20201108100212.3A9C96093D@myserver.localdomain>
Nov  8 11:02:12 myserver postfix/smtpd[31097]: disconnect from localhost[127.0.0.1]
Nov  8 11:02:12 myserver postfix/qmgr[2638]: 3A9C96093D: from=<sender@domain.com>, size=5493, nrcpt=2 (queue active)
Nov  8 11:02:42 myserver postfix/smtp[31126]: connect to myrelay.domain.com[198.51.100.12]:587: Connection timed out
Nov  8 11:02:42 myserver postfix/smtp[31126]: 3A9C96093D: to=<recipient1@domain.com>, relay=myrelay.domain.com[198.51.100.12]:587, delay=31, delays=0.01/0/30/0.24, dsn=2.0.0, status=sent (250 Ok)
Nov  8 11:02:42 myserver postfix/smtp[31126]: 3A9C96093D: to=<recipient2@domain.com>, relay=myrelay.domain.com[198.51.100.12]:587, delay=31, delays=0.01/0/30/0.24, dsn=2.0.0, status=sent (250 Ok)
Nov  8 11:02:42 myserver postfix/qmgr[2638]: 3A9C96093D: removed

    So we can see that, first, “smtpd” daemon received a connection from localhost. This connection created an email, that was put in queue. Then, “qmgr” started to read the queue and parse the email. Since destination was the default mail relay which is an smtp server, a “smtp” proces was spawned, which connected to the relay, and sent 1 email per recpient. The relay accepted them (“250 Ok” & “status=sent”).
    Once all recipients where handled, “qmgr” has been able to delete the message from its queue. Done.

    But indexing all these lines in ELK makes no sense. This whole set of lines represents one single event. How do we manage this ?

Logstash aggregate filter plugin

    This plugin is what we need.
    It will aggregate all lines that contains a common value for a field. We can then extract fields and data from each of them, and save all of it in a single event. The documentation is at the end of this article.

First we will match the log against a series of GROK patterns so that each of the lines we want is parsed :

grok {
    patterns_dir => ["/etc/logstash/conf.d/patterns/"]
    match => [
      "message",
      "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOHST:hostname} (?<program>postfix/smtpd)\[\d+\]: %{POSTFIX_MSG_ID:taskid}: client=(?<client_hostname>[^\]]+)\[%{IP:client_ip}\]",
      "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOHST:hostname} (?<program>postfix/qmgr)\[\d+\]: %{POSTFIX_MSG_ID:taskid}: from=<(?<from_mail>[^>]+)>, size=%{INT:size}, nrcpt=%{INT:nrcpt}",
      "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOHST:hostname} (?<program>postfix/smtp)\[\d+\]: %{POSTFIX_MSG_ID:taskid}: to=<(?<to_mail>[^>]+)>,.* relay=%{NOTSPACE:relay},.* status=%{NOTSPACE:status}.*",
      "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOHST:hostname} (?<program>postfix/qmgr)\[\d+\]: %{POSTFIX_MSG_ID:taskid}: removed"
    ]
  }

Some of these patterns being non-standard :

SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
POSTFIX_MSG_ID [A-F0-9]{10}

This way we have everything we want:

  • The sender
  • The recipients (and how many of them)
  • The status of the emission

Now the interesting part. Let’s make it one single event (code explained just below)

  if ![taskid] {
    drop {}
  } else if [program] == "postfix/smtpd" {
    aggregate {
      task_id => "%{taskid}"
      code => "
        map['client_hostname'] ||= event.get('client_hostname')
        map['client_ip'] ||= event.get('client_ip')
        map['program'] ||= event.get('program')
        event.cancel()
      "
      map_action => create
    }
  } else if [program] == "postfix/qmgr" and [from_mail] {
    aggregate {
      task_id => "%{taskid}"
      code => "
        map['from_mail'] ||= event.get('from_mail')
        map['size'] ||= event.get('size')
        map['nrcpt'] ||= event.get('nrcpt')
        event.cancel()
      "
      map_action => update
    }
  } else if [program] == "postfix/smtp" {
    aggregate {
      task_id => "%{taskid}"
      code => "
        map['to_mail'] ||= [];
        map['to_mail'] << event.get('to_mail');
        map['relay'] ||= [];
        map['relay'] << event.get('relay');
        map['status'] ||= [];
        map['status'] << event.get('status')
        event.cancel()
      "
      map_action => update
    }
  } else if [program] == "postfix/qmgr" and ![from_mail] {
    aggregate {
      task_id => "%{taskid}"
      map_action => update
      code => "
        event.set('client_hostname',map['client_hostname']);
        event.set('client_ip',map['client_ip']);
        event.set('from_mail',map['from_mail']);
        event.set('size',map['size']);
        event.set('nrcpt',map['nrcpt']);
        event.set('to_mail',map['to_mail']);
        event.set('relay',map['relay']);
        event.set('status',map['status']);
        event.set('program',map['program']);
      "
      end_of_task => true
      timeout_task_id_field => "taskid"
      timeout_timestamp_field  => "@timestamp"
      timeout => 600
      push_map_as_event_on_timeout => true
      timeout_tags => ['_aggregatetimeout']
    }
  }
  mutate {
    convert => [
      "size", "integer",
      "nrcpt", "integer"
    ]
    remove_field => ["host","message","beat.version","beat.hostname","prospector.type","input.type","offset","source","timestamp","type"]

    This is a lot. What happens here ?
    First, we drop any event that did not parse, and thus, does not have a “taskid”. We don’t need those.
    Then, we’re looking for the first line of log, which is the “smtpd” line. In this case, we create a “map”, which is a persistent set of data in memory, associated with this value of “taskid”. It will last longer than the current event, but no more than a timeout, defined in the last “aggregate” block.
    After that, all other lines that match will add more data in the map. (The “from”, “to” and their other attributes). Note that “event.cancel()” means the content of the line isn’t sent to the output but dropped. only the data in the map remains.
    Eventually, we’ll see the “removed” line, which means the whole workflow is finished. Now we’ll add all the data stored in the map in the current event, and sent it to the output.

You can see the result in Kibana : Kibana event

Logstash configuration

Be careful, there’s a very important parameter to change in Logstash’s configuration “logstash.yml” for this to work :

pipeline.workers: 1

Otherwise, Logstash will have several worker threads which do not share maps, so that would prevent your aggregation from working correctly, as there would be no guarantee a single worker would receive all lines from a single message.

Going Further

    I’ve removed from the above code the ability to handle local emails, that it, not sent through localhost:25 but through the unix socket, but if you need that, just take a look at your log. The log lines are pretty similar to “smtpd”.
    I also think that some of my configuration may be redundant, because it comes from hours of fiddling with them until I found a typo in the first lines of it.

    What you do with this data now depends on your needs. In our case, we wanted to see which VMs were sending the most mails, to whom, and through which relay. This setup fufilled this needs perfectly.

References