Adding A Custom GeoIP Field to Filebeat And ElasticSearch

As part of my project to create a Kibana dashboard to visualize my external threats, I decided I wanted a map view of where the IP addresses were coming from with geoip data. By default, Filebeat installs several dashboards that I used as inspiration, and saw what could be done, so I set out to imitate them.

My Goal
My Goal

However, it was not at all easy. Here is what I had to do.

General Design

The way I needed the data to flow is shown in the flowchart below.

Log Data Flow
Log Data Flow

There is no filebeat package that is distributed as part of pfSense, however.  FreeBSD does have one, but that would involve adding more stuff to my router that’s not part of the pfSense ecosystem, which would be a headache later on. Therefore, I ship the logs to an internal CentOS server where filebeat is installed. Then Filebeat needs to read and parse the firewall log.

Turn on Logging of the Default Block Rule in pfSense

First off, I had to enable the firewall to log the requests it was blocking. By default, pfSense blocks all incoming traffic, but does not log it. So I had to turn that on.

psSense Log Settings
psSense Log Settings

Once that was done, since I already had remote logging set up, my firewall logs flowed to my internal server.

Reading The Logs and Grok

The log files that were shipped over are in the format:

Jan 1 15:16:34 gateway filterlog: 5,,,1000000103,bge0.3,match,block,in,4,0x0,,64,0,0,DF,6,tcp,52,192.168.3.100,192.168.1.152,62078,52804,0,SA,2679530145,2021659250,65535,,mss;nop;wscale;sackOK;eol

It’s all one line, and it has a specific format that I wanted to parse out. By default, filebeat’s Grok parser will parse the message and render the firewall data as:

system.syslog.message: 7,,,1000000105,re0,match,block,in,6,0x50,0x00000,247,ICMPv6,58,8,2607:ae80:2::238,2001:558:6020:10b:44f6:849f:8998:2390,

Which does not really help me turn it into structured data.

To do this, grok is applying a parsing string that looks like:

%{SYSLOGTIMESTAMP:system.syslog.timestamp} %{SYSLOGHOST:system.syslog.hostname} %{DATA:system.syslog.program}(?:\\[%{POSINT:system.syslog.pid}\\])?: %{GREEDYMULTILINE:system.syslog.message}

Which is located, on CentOS, at:

/usr/share/filebeat/module/system/syslog/ingest/pipeline.json

That parsing string applied to the log format beginning in ‘Jan 1…’ above, results in a json block that looks like:

{
   "system": {
      "syslog": {
      "hostname": "gateway",
      "program": "filterlog",
      "message":         "5,,,1000000103,bge0.3,match,block,in,4,0x0,,64,0,0,DF,6,tcp,52,192.168.3.100,192.168.1.152,62078,52804,0,SA,2679530145,2021659250,65535,,mss;nop;wscale;sackOK;eol",
"timestamp": "Jan 1 15:16:34"
       }
   }
}

You can paste these two examples into the Grok Debugger in the Kibana console under dev tools. In this case, you also need to add the line  :

GREEDYMULTILINE (.|\n)*

In the ‘Custom Patterns’ area, since its not a pre-defined pattern.

How Does It Work?

Grok, I’ll admit, is pretty cool. It unites Regular Expressions’ group matching with a convenient output format.

If we break down the parsing string, we see that it has several parts:

%{SYSLOGTIMESTAMP:system.syslog.timestamp} 

Look for a SYSLOGTIMESTAMP (a pre-defined format) followed by a space, capture it, and emit that as a field called timestamp nested in a syslog block, nested in a system block. In this case, ‘Jan 1 15:16:34’

%{SYSLOGHOST:system.syslog.hostname} 

Look for a SYSLOGHOST (basically a word) followed by a space, and emit that as a field called hostname nested in a syslog block, nested in a system block. In this case, ‘gateway’

%{DATA:system.syslog.program}: (?:\\[%{POSINT:system.syslog.pid}\\])?:

Look for a DATA (word) followed by a colon, optionally followed by a POSINT and another colon. This would render a PID, if there were one, and output it as above.

%{GREEDYMULTILINE:system.syslog.message}

Finally, capture the rest of the data.

Field Definition

The file fields.yml in /etc/filebeat/fields.yml contains all the field definitions that are fed to Elasticsearch before the first index is created. If not, Elasticsearch will infer the fieldtypes when it reads the first document, and you can’t easily change it thereafter.

If you look in fields.yml, you’ll see all the output fields used above. The following command shows what is sent to Elasticsearch:

filebeat export template

What I am pretty sure happens is that when filebeat does its setup step, it will send over either the cached copy of the json of fields.yml stored in

 /usr/share/filebeat/kibana/5/index-pattern/filebeat.json

Or generate a new one from fields.yml. No matter what I did to fields.yml, it did not take until I did the following (generally following this and this):

  1. Set these two parameters in  filebeat.yml:
setup.template.name: "filebeat"
setup.template.fields: "fields.yml"
  1. Deleted all my indexes that were using the filebeat template in elastic search from the Kibana Dev Tools Console:
DELETE _template/filebeat
  1. And ran this on my filebeat server:
filebeat setup --template

Adding my GeoIP field

Since Elasticsearch needs to have the GeoIP field added before indexing, you need to proceed in a very specific order. If not, you will end up with not a geoip field, but two sub-fields called ‘lat’ and ‘lon’.

Figuring out your fields

In order to figure out my fields, I put my log file entry from my firewall into the Kibana Grok debugger and fiddled until I had what I wanted:

Jan 1 15:16:34 gateway filterlog: 5,,,1000000103,bge0.3,match,block,in,4,0x0,,64,0,0,DF,6,tcp,52,192.168.3.100,192.168.1.152,62078,52804,0,SA,2679530145,2021659250,65535,,mss;nop;wscale;sackOK;eol

Ended up as a parsing string like:

%{SYSLOGTIMESTAMP:system.syslog.timestamp} gateway filterlog%{GREEDYDATA:system.firewall.sequence},,,%{NUMBER:system.firewall.sequence2},%{BACULA_DEVICE:system.firewall.interface},match,block,in,%{NUMBER:system.firewall.data_1},%{WORD:system.firewall.data_2},,%{WORD:system.firewall.data_3},%{WORD:system.firewall.data_4},%{WORD:system.firewall.data_5},%{WORD:system.firewall.data_6},%{WORD:system.firewall.data_7},%{WORD:system.firewall.protocol},%{WORD:system.firewall.data_8},%{IP:system.firewall.source_ip},%{IP:system.firewall.dest_ip},%{NUMBER:system.firewall.source_port},%{NUMBER:system.firewall.dest_port},%{GREEDYMULTILINE:system.syslog.message}

I wasn’t sure how to skip fields that were variable, and I didn’t want to write some complex regex, so I stored them in fields called data_x, and I kludged lots of it together since all I wanted were the IP and port data. This could be definitely done better. A useful resource for this was this grok constructor,  which helps out with the weird built-in patterns. That’s why you see ‘BACULA_DEVICE’ as one of my patterns. Also, after several days of working on this, I didn’t want to mess with what I had working.

I added this to my  /usr/share/filebeat/module/system/syslog/ingest/pipeline.json file as the first pattern in the ‘patterns’ array. Then, to enable geoip processing, I added another entry in the ‘Processors’ section to do that:

{
   "geoip": {
      "field": "system.firewall.source_ip",
      "target_field": "system.firewall.fw-geoip",
      "ignore_failure": true
   }
}

This will read from the system.firewall.source_ip field and insert the geocoding data into system.firewall.fw-geoip. That is an entire field object with several subfields.

The important thing to know here is that the pipeline.json file is injected into Elasticsearch. You can see it in the Kibana Console with:

get _ingest/pipeline/filebeat-6.5.4-system-syslog-pipeline

Once that is all in, and you have determined your fields, it’s time to edit fields.yml

Fields.yml

For this, I highly recommend a yaml parser, like http://www.yamllint.com/, since I did this wrong a lot.

I added the following to my fields.yml file (snippet below starts with the system key entry  for reference,  and my ‘firewall’ block is inserted right before the ‘auth’ block. (Bold section is already in the file):

- key: system
  title: "System"
  description: >
    Module for parsing system log files.
  short_config: true
  fields:
    - name: system
      type: group
      description: >
        Fields from the system log files.
      fields:
        - name: firewall
          type: group
          fields:
          - name: sequence
            type: keyword
          - name: sequence2
            type: keyword
          - name: interface
            type: keyword
          - name: data_1
            type: keyword
          - name: data_2
            type: keyword
...
          - name: data_8
            type: keyword
          - name: source_ip
            type: ip
          - name: dest_ip
            type: ip
          - name: source_port
            type: long
          - name: dest_port
            type: long
          - name: fw-geoip
            type: group
            description: >
                Contains GeoIP information gathered based on the `system.auth.ip` field.
                Only present if the GeoIP Elasticsearch plugin is available and
                used.
            fields:
              - name: continent_name
                type: keyword
                description: >
                  The name of the continent.
              - name: city_name
                type: keyword
                description: >
                  The name of the city.
              - name: region_name
                type: keyword
                description: >
                  The name of the region.
              - name: country_iso_code  
                type: keyword
                description: >
                  Country ISO code.
              - name: location
                type: geo_point
                description: >
                  The longitude and latitude.
              - name: region_iso_code
                type: keyword
                description: >
                  Region ISO code.

Note that is is hand-formatted for this article. you’ll need to adjust the spacing. YAML is pretty unforgiving; no tabs, just spaces, and very sensitive to alignment.

Once you’ve saved that, you can test it with:

filebeat export template | grep firewall

If this returns a the first line with the ‘firewall’ group, then you can install it in Elasticsearch. Note that the nesting of the YAML corresponds to the nesting in your Grok parsing.

If that works, delete any existing templates and pipeline from the Kibana Dev Console:

DELETE  _template/filebeat

DELETE _ingest/pipeline/filebeat-*

and install the new ones:

filebeat setup --template

A few warnings: if you are using SSL on Kibana, your filebeat.yml will need  (per here):

ssl.verification_mode: none

 

Also, make sure all filebeats are off. Otherwise, the template may refresh from ingestion before you do it.

At this point, you ought to be able to start filebeat and see it start to pull in log files, and geocode them property. you ought to see, also, in the Kibana/Management/Index Patterns area, that the system.firewall.fw-geoip.location is of type geo_point. If you see an entry for system.firewall.fw-geoip.location.lat and system.firewall.fw-geoip.location.lon, it didn’t work.

filebeat -e

This ought to recreate your pipeline and start to flow data. It has some error handling, so you ought to be able to muddle through it. Problems I had were bad fields.yml mappings, as well as errors in my pipeline.json, and also not turning off other filebeat instances on other servers. You can then see the correctly formatted data in Kibana, and then create a map visualization for it using your new fw-geoip.location field. If you try this, and don’t see that field available, it also didn’t work (not a geo_point type field).

Finally, when you render this, make sure you filter only on your outbound interface. By default, you;ll be getting all blocks, including internal ones.

Working Map with Geocoding
Working Map with Geocoding

Conclusion

Wow. It’s a very clever way of doing this, and it works great once you grasp (grok) the programming paradigm. However, as a paradigm, it’s very convoluted, and, in my opinion, not too clearly explained. I hope this helps.

What I’m listening to as I do this: Beastie Boys’ Hot Sauce Committee Part 2, specifically  ‘Make Some Noise’. One of their last, and one of their best.