CSV to Elasticsearch in order to replace your Excel with Kibana

Posted on novembre 25, 2018 in Python

Last night, I was asked if I could setup some frontend to make some stats out of a CSV. In a more interactive and collaborative way than Excel.

I was first asked to do a small Django project since it used to be my goto technology at the time.

But for this need, the use of Elasticsearch was perfect and Kibana helped me to not develop any frontend and solve a lot of time.

The CSV export looked like this but had at least 30 columns and 200k lines:

Column1;Colum2;Column3
1;2;3
a;b;c
blih;blah;bluh

Python way to push the CSV to Elasticsearch

Elasticsearch requires JSON documents, so the first step was to convert the CSV to json.

Instead of writing a CSV to JSON parser, I used the pandas library which makes the whole process a lot easier and faster (the csv file had hundreds of thousands of lines).

And by looking at the official elasticsearch python SDK, I just needed to transform the whole CSV into a dict.

import sys
import pandas as pd
import argparse
from elasticsearch import Elasticsearch, helpers

def main():
    parser = argparse.ArgumentParser(description='Process some integers.')
    parser.add_argument('filename', type=str,
            help='filename to parse')
    parser.add_argument('index', type=str,
            help='index name to use')
    args = parser.parse_args()

    filename = args.filename
    index_name = args.index

    # initiate Elasticsearch connection
    es = Elasticsearch()

    # parse the csv with pandas
    df = pd.read_csv(filename, sep=';', error_bad_lines=False)
    # trim whitespace and stuff
    data_frame_trimmed = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
    # replace `nan` values with empty string
    data_frame_trimmed = df.fillna('')
    # transform the whole data frame into a huge python dict
    records = data_frame_trimmed.to_dict(orient='records')

    # use bulk actions to push the data
    actions = []
    for i, r in enumerate(records):
    actions.append({"_index": index_name,
            "_type": "vuln",
            "_id": i,
            "_source": r})
    ret = helpers.bulk(es, actions=actions)
    print(ret)

if __name__ == '__main__':
    main()

And voila !

$ curl http://localhost:9200/vuln_2018-11-25/_search?pretty | jq -r .
{
  "took": 0,
  "timed_out": false,
  "_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
  },
  "hits": {
"total": 3,
"max_score": 1,
"hits": [
  {
    "_index": "index_name_2018-11-25",
    "_type": "vuln",
    "_id": "0",
    "_score": 1,
    "_source": {
      "Column1": 1,
      "Column2": 2,
      "Column3": 3,
    }
  },
  {
    "_index": "index_name_2018-11-25",
    "_type": "vuln",
    "_id": "1",
    "_score": 1,
    "_source": {
      "Column1": "a",
      "Column2": "b",
      "Column3": "c",
    }
  },
  {
    "_index": "index_name_2018-11-25",
    "_type": "vuln",
    "_id": "2",
    "_score": 1,
    "_source": {
      "Column1": "blih",
      "Column2": "blah",
      "Column3": "bluh",
    }
  }
]
  }
}

After that you just need to install Kibana and enjoy your graphs, tables and so on in a more collaborative way, dynamic filters etc.