Difference between revisions of "Writing advanced adapters"

From khika
Jump to navigation Jump to search
Line 79: Line 79:
 
The first part is directory from where we want to read the logs
 
The first part is directory from where we want to read the logs
 
The second part is the regular expression of filenames. Files matching the regular expression will be processed. KHIKA Data Aggregator receives PaloAlto Firewall Logs over syslog protocol and stores it in the /opt/remotesyslog directory. It dynamically creates a directory with IP address of the syslog source device (PaloAlto firewall, in this case). Under the directory, dynamic files are created per day basis in YYYY-MM-DD.log format (Eg: 2019-05-31.log)
 
The second part is the regular expression of filenames. Files matching the regular expression will be processed. KHIKA Data Aggregator receives PaloAlto Firewall Logs over syslog protocol and stores it in the /opt/remotesyslog directory. It dynamically creates a directory with IP address of the syslog source device (PaloAlto firewall, in this case). Under the directory, dynamic files are created per day basis in YYYY-MM-DD.log format (Eg: 2019-05-31.log)
*ProcessLineOfFile is a function that you implement for parsing specific data. This is where all the processing logic has to be coded. You
+
*ProcessLineOfFile is a function that you implement for parsing specific data. This is where all the processing logic has to be coded. This function is coded on line 173.
 +
 
 +
173 def ProcessLineOfFile(line,file,line_count,logger1,hostname, keyArgsDict=None):
 +
174        global g_hostname,dict_report_stats,g_smallest_day,g_highest_day
 +
175        global isOccurrences
 +
176        if hostname != "" or hostname != None:
 +
177                g_hostname = hostname
 +
178        try:
 +
179                TL_GET_RAW_LOG=False
 +
180                if keyArgsDict:
 +
181                        TL_GET_RAW_LOG=keyArgsDict['TL_GET_RAW_LOG']
 +
182
 +
183                metadata = ""
 +
184                metadata += "tl_tag" + " \"PaloAltofw\" " + "tl_src_host \""+str(g_hostname) + "\" "
 +
185                keydata = ""
 +
186                epoch_time = ""
 +
187                event_str = ""
 +
188                line1 = line
 +
189                line = line.strip('\n')
 +
190                reader = csv.reader(StringIO.StringIO(line), delimiter=',',quotechar='"')
 +
191                line = reader.next()
 +
 
 +
It can have any processing logic and can get really complicated. After having the line in a variable, you can use regular expressions or split() or equivalent functions to separate the message into meaningful key-value pairs. Documentation of OEM helps to understand the messages. You can convert the timestamp from the message to EPOCH time. You create the complete message buffer in KHIKA Data Format and call printData() library function.
 +
277                khika_output= str(epochTime) + " : " + metadata + " event_str \"\""
 +
278                printData(khika_output, line, TL_GET_RAW_LOG)

Revision as of 10:12, 31 May 2019

After understanding Khika Data Format and going through the initial exercise of Writing you own KHIKA Data Adapters , it is the time to create a production level KHIKA Adapter. A few points to note here before we begin writing our own Adapter:

  • Adapters are scripts that execute on KHIKA Data Aggregator
  • Adapters can be written in any programming language (our favorite is python 2.7)
  • Adapters are scheduled processes and KHIKA Data Aggregator is responsible for scheduling them to run at a periodic interval (typically 1 minute to 5 minutes)
  • The Adapter scripts
    • read the raw log messages one-by-one (from source such as files, queues, APIs, Databases etc),
    • parse the log messages,
    • convert it in Khika Data Format
    • Write the output to stdout
  • KHIKA Data Aggregator pipes the output of the Aggregator script and send it to KHIKA over a SSL connection


With these concepts in mind, let proceed with an example of a production ready KHIKA Data Adapter. Login to your KHIKA Data Aggregator node (default username/password is khika/khika123). We will study a syslog based adapter that processes the messages received from a PaloAlto Firewall. Open file TLHook_Adaptor_PaloAlto.py from directory /opt/KHIKA/Apps/Adapters/PaloAltoFW.

Check first few lines of this file where we import some important python modules

     1 #!/bin/env python
     2 import os, sys
     3 import socket
     4 import csv,StringIO
     5 import logging #The logging libraries
     6 import re #python inbuilt regular expressions library
     7 import time
     8 from time import strptime #Useful time format conversion functions
     9 from datetime import datetime
    10 import random
    11 import calendar
    12 from ipaddress import IPv4Network,IPv4Address
    13 import pdb

Note that we have imported logging (line 5) and some useful time libraries (line 8,9). We have also imported 're' library for python regular expression. We will be using it in the code.

Now, lets move to the bottom of this file and locate function "__main__". This is start of execution of the code.

   295 if __name__ == "__main__":
   296         global isOccurrences
   297         isOccurrences = False
   298         dict_report_stats = {}
   299         g_smallest_day = 0
   300         g_highest_day = 0
   301         install_dir = os.path.dirname(os.path.realpath(__file__))
   302         sys.path.insert(0, install_dir)
   303         sys.path.insert(0, install_dir + os.sep + ".." + os.sep + "TLCOMMON" + os.sep)
   304         g_hostname = socket.gethostname()
   305         file_name_format = sys.argv[1] if len(sys.argv) == 2 else os.getenv("TL_WORKSPACE")+'_'+os.getenv("TL_ADAPTER")+'_'+os.getenv("TL_AGGREGATOR")
   306         from TLHook_common import *
   307         logfile_path = install_dir+ '/' + 'log_'+file_name_format+'.log'
   308         if not is_safe_path(logfile_path):
   309                 exit()
   310         logger = InitLogger(logfile_path,logging.INFO)
   311         logger.info("A new execution of script %s begins", __file__)
   312         int_time(logger)
   313         tz_file_path = install_dir+'/'+'timezone_'+file_name_format+'.csv'
   314         history_path = install_dir+'/history_'+file_name_format+'.csv'
   315         config_path = install_dir+'/config_'+file_name_format+'.csv'
   316
   317         if not is_safe_path(tz_file_path) or not is_safe_path(history_path) or not is_safe_path(config_path) :
   318                 logger.error("Path is invalid: history_path : %s ,timezone_file_path : %s , config_path : %s ",history_path,tz_file_path ,config_path )
   319                 exit()
   320         GetHostToTimeZoneDict(tz_file_path)
   321         ReadHistoryFile(history_path)
   322         ProcessUsingConfigFile(config_path, ProcessLineOfFile)
   323         PrintDashboardStatistics(dict_report_stats,logger,g_hostname,g_smallest_day,g_highest_day)
   324         WriteHistoryFile(history_path)
   325

After doing some initializations (such as setting PATH, log file, timezone etc), we import TLHook_common on line 306. This a common library and provides functions for Timezone, logging and offset maintenance etc. The source code for this library can be found in /opt/KHIKA/Apps/Adapters/TLCOMMON/TLHook_common.py file on your KHIKA Data Aggregator Node. The library has various useful functions such ReadHistoryFile(), ProcessUsingConfigFile(), ProcessDir(), ProcessFile(), ProcessAllLinesOfFile() etc. We encourage you to read the code.

As We set the

  • hstory_file for maintaining the offset, timestamp etc after each execution. (line 314)
  • config_file for reading the configuration from (it basically tells what files to read from what directory). (line 315)

Followed by this the real stuff starts. On line 321 we call ReadHistoryFile() function with history_path as the argument. We basically read the offsets maintained in the history_file to understand from where we want to start reading the files during this execution. Note that the script executes after a periodic interval and every execution will update the offset of the files (using WriteHistoryFile() function) at the end of its execution. Some global data structures are set in ReadHistoryFile() to help us seek() to the offset when we call ProcessUsingConfigFile() on line 322.

Function ProcessUsingConfigFile() takes two arguments, config_path and ProcessLineOfFile.

  • config_path is the file we set during our initialization. The config file is a csv file. Below is a sample of config file
     /opt/remotesyslog/172.28.1.16,2.*.log$,None
     /opt/remotesyslog/172.28.1.17,2.*.log$,None

The first part is directory from where we want to read the logs The second part is the regular expression of filenames. Files matching the regular expression will be processed. KHIKA Data Aggregator receives PaloAlto Firewall Logs over syslog protocol and stores it in the /opt/remotesyslog directory. It dynamically creates a directory with IP address of the syslog source device (PaloAlto firewall, in this case). Under the directory, dynamic files are created per day basis in YYYY-MM-DD.log format (Eg: 2019-05-31.log)

  • ProcessLineOfFile is a function that you implement for parsing specific data. This is where all the processing logic has to be coded. This function is coded on line 173.

173 def ProcessLineOfFile(line,file,line_count,logger1,hostname, keyArgsDict=None): 174 global g_hostname,dict_report_stats,g_smallest_day,g_highest_day 175 global isOccurrences 176 if hostname != "" or hostname != None: 177 g_hostname = hostname 178 try: 179 TL_GET_RAW_LOG=False 180 if keyArgsDict: 181 TL_GET_RAW_LOG=keyArgsDict['TL_GET_RAW_LOG'] 182 183 metadata = "" 184 metadata += "tl_tag" + " \"PaloAltofw\" " + "tl_src_host \""+str(g_hostname) + "\" " 185 keydata = "" 186 epoch_time = "" 187 event_str = "" 188 line1 = line 189 line = line.strip('\n') 190 reader = csv.reader(StringIO.StringIO(line), delimiter=',',quotechar='"') 191 line = reader.next()

It can have any processing logic and can get really complicated. After having the line in a variable, you can use regular expressions or split() or equivalent functions to separate the message into meaningful key-value pairs. Documentation of OEM helps to understand the messages. You can convert the timestamp from the message to EPOCH time. You create the complete message buffer in KHIKA Data Format and call printData() library function. 277 khika_output= str(epochTime) + " : " + metadata + " event_str \"\"" 278 printData(khika_output, line, TL_GET_RAW_LOG)