Writing advanced adapters

From khika
Jump to navigation Jump to search

After understanding Khika Data Format and going through the initial exercise of Writing you own KHIKA Data Adapters , it is the time to create a production level KHIKA Adapter. A few points to note here before we begin writing our own Adapter:

  • Adapters are scripts that execute on KHIKA Data Aggregator
  • Adapters can be written in any programming language (our favorite is python 2.7)
  • Adapters are scheduled processes and KHIKA Data Aggregator is responsible for scheduling them to run at a periodic interval (typically 1 minute to 5 minutes)
  • The Adapter scripts
    • read the raw log messages one-by-one (from source such as files, queues, APIs, Databases etc),
    • parse the log messages,
    • convert it in Khika Data Format
    • Write the output to stdout
  • KHIKA Data Aggregator pipes the output of the Aggregator script and send it to KHIKA over a SSL connection


With these concepts in mind, let proceed with an example of a production ready KHIKA Data Adapter. Login to your KHIKA Data Aggregator node (default username/password is khika/khika123). We will study a syslog based adapter that processes the messages received from a PaloAlto Firewall. Open file TLHook_Adaptor_PaloAlto.py from directory /opt/KHIKA/Apps/Adapters/PaloAltoFW.

Check first few lines of this file where we import some important python modules

     1 #!/bin/env python
     2 import os, sys
     3 import socket
     4 import csv,StringIO
     5 import logging #The logging libraries
     6 import re #python inbuilt regular expressions library
     7 import time
     8 from time import strptime #Useful time format conversion functions
     9 from datetime import datetime
    10 import random
    11 import calendar
    12 from ipaddress import IPv4Network,IPv4Address
    13 import pdb

Note that we have imported logging (line 5) and some useful time libraries (line 8,9). We have also imported 're' library for python regular expression. We will be using it in the code.

Now, lets move to the bottom of this file and locate function "__main__". This is start of execution of the code.

   295 if __name__ == "__main__":
   296         global isOccurrences
   297         isOccurrences = False
   298         dict_report_stats = {}
   299         g_smallest_day = 0
   300         g_highest_day = 0
   301         install_dir = os.path.dirname(os.path.realpath(__file__))
   302         sys.path.insert(0, install_dir)
   303         sys.path.insert(0, install_dir + os.sep + ".." + os.sep + "TLCOMMON" + os.sep)
   304         g_hostname = socket.gethostname()
   305         file_name_format = sys.argv[1] if len(sys.argv) == 2 else os.getenv("TL_WORKSPACE")+'_'+os.getenv("TL_ADAPTER")+'_'+os.getenv("TL_AGGREGATOR")
   306         from TLHook_common import *
   307         logfile_path = install_dir+ '/' + 'log_'+file_name_format+'.log'
   308         if not is_safe_path(logfile_path):
   309                 exit()
   310         logger = InitLogger(logfile_path,logging.INFO)
   311         logger.info("A new execution of script %s begins", __file__)
   312         int_time(logger)
   313         tz_file_path = install_dir+'/'+'timezone_'+file_name_format+'.csv'
   314         history_path = install_dir+'/history_'+file_name_format+'.csv'
   315         config_path = install_dir+'/config_'+file_name_format+'.csv'
   316
   317         if not is_safe_path(tz_file_path) or not is_safe_path(history_path) or not is_safe_path(config_path) :
   318                 logger.error("Path is invalid: history_path : %s ,timezone_file_path : %s , config_path : %s ",history_path,tz_file_path ,config_path )
   319                 exit()
   320         GetHostToTimeZoneDict(tz_file_path)
   321         ReadHistoryFile(history_path)
   322         ProcessUsingConfigFile(config_path, ProcessLineOfFile)
   323         PrintDashboardStatistics(dict_report_stats,logger,g_hostname,g_smallest_day,g_highest_day)
   324         WriteHistoryFile(history_path)
   325

After doing some initializations (such as setting PATH, log file, timezone etc), we import TLHook_common on line 306. This a common library and provides functions for Timezone, logging and offset maintenance etc. The source code for this library can be found in /opt/KHIKA/Apps/Adapters/TLCOMMON/TLHook_common.py file on your KHIKA Data Aggregator Node. The library has various useful functions such ReadHistoryFile(), ProcessUsingConfigFile(), ProcessDir(), ProcessFile(), ProcessAllLinesOfFile() etc. We encourage you to read the code.

As We set the

  • hstory_file for maintaining the offset, timestamp etc after each execution. (line 314)
  • config_file for reading the configuration from (it basically tells what files to read from what directory). (line 315)

Followed by this the real stuff starts. On line 321 we call ReadHistoryFile() function with history_path as the argument. We basically read the offsets maintained in the history_file to understand from where we want to start reading the files during this execution. Note that the script executes after a periodic interval and every execution will update the offset of the files (using WriteHistoryFile() function) at the end of its execution. Some global data structures are set in ReadHistoryFile() to help us seek() to the offset when we call ProcessUsingConfigFile() on line 322.

Function ProcessUsingConfigFile() takes two arguments, config_path and ProcessLineOfFile.

  • config_path is the file we set during our initialization. The config file is a csv file. Below is a sample of config file
     /opt/remotesyslog/172.28.1.16,2.*.log$,None
     /opt/remotesyslog/172.28.1.17,2.*.log$,None

The first part is directory from where we want to read the logs The second part is the regular expression of filenames. Files matching the regular expression will be processed. KHIKA Data Aggregator receives PaloAlto Firewall Logs over syslog protocol and stores it in the /opt/remotesyslog directory. It dynamically creates a directory with IP address of the syslog source device (PaloAlto firewall, in this case). Under the directory, dynamic files are created per day basis in YYYY-MM-DD.log format (Eg: 2019-05-31.log)

  • ProcessLineOfFile is a function that you implement for parsing specific data. This is where all the processing logic has to be coded. This function is coded on line 173.


  173 def ProcessLineOfFile(line,file,line_count,logger1,hostname, keyArgsDict=None):
  174         global g_hostname,dict_report_stats,g_smallest_day,g_highest_day
  175         global isOccurrences
  176         if hostname != "" or hostname != None:
  177                 g_hostname = hostname
  178         try:
  179                 TL_GET_RAW_LOG=False
  180                 if keyArgsDict:
  181                         TL_GET_RAW_LOG=keyArgsDict['TL_GET_RAW_LOG']
  182
  183                 metadata = ""
  184                 metadata += "tl_tag" + " \"PaloAltofw\" " + "tl_src_host \""+str(g_hostname) + "\" "
  185                 keydata = ""
  186                 epoch_time = ""
  187                 event_str = ""
  188                 line1 = line
  189                 line = line.strip('\n')
  190                 reader = csv.reader(StringIO.StringIO(line), delimiter=',',quotechar='"')
  191                 line = reader.next()

It can have any processing logic and can get really complicated. After having the line in a variable, you can use regular expressions or split() or equivalent functions to separate the message into meaningful key-value pairs. Documentation of OEM helps to understand the messages. You can convert the timestamp from the message to EPOCH time. You create the complete message buffer in KHIKA Data Format

epoch_time is calculated using human readable timestamp picked from the message. 'metadata' variable contains all the key-value pairs. event_str, which is the mandatory key has empty value.


  277                 khika_output= str(epochTime) + " : " + metadata + " event_str \"\""

and call printData() library function.

  278                 printData(khika_output, line, TL_GET_RAW_LOG)

When the buffer is ready for output, we recommend using printData() library function.

ProcessLineOfFile() is executed for all lines and for all the files. We recommend doing strong exception handling and logging the errors using "logger" class we initiated. Parsing could get really tricky at times and hence it is important to log all the errors so that you ensure that you are not missing any lines due to parsing errors.

After processing is complete, we call WriteHistoryFile() function with history_path as argument. This records the offset, timestamp etc in history_file so that the next execution starts by seeking() to appropriate offset.