Difference between revisions of "Writing advanced adapters"
(18 intermediate revisions by one other user not shown) | |||
Line 1: | Line 1: | ||
− | After understanding [[Khika Data Format|<span style="color:#0000ff"> Khika Data Format </span>]] and going through the initial exercise of [[Writing you own KHIKA Data Adapters|<span style="color:#0000ff"> Writing you own KHIKA Data Adapters </span>]], it is the time to create a production level KHIKA Adapter. A few points to note here before we begin writing our own Adapter: | + | After understanding [[Khika Data Format|<span style="color:#0000ff"> Khika Data Format </span>]] and going through the initial exercise of [[Writing you own KHIKA Data Adapters|<span style="color:#0000ff"> Writing you own KHIKA Data Adapters </span>]],Now it is the time to create a production level KHIKA Adapter. A few points to note here before we begin writing our own Adapter: |
* Adapters are scripts that execute on KHIKA Data Aggregator | * Adapters are scripts that execute on KHIKA Data Aggregator | ||
− | * Adapters can be written in any programming language (our favorite | + | * Adapters can be written in any programming language (our favorite being python 2.7) |
− | * Adapters are scheduled processes and KHIKA Data Aggregator is responsible for scheduling them to run at a periodic interval (typically 1 minute to 5 minutes) | + | * Adapters are scheduled processes and KHIKA Data Aggregator is responsible for scheduling them to run at a periodic interval (typically every 1 minute to 5 minutes) |
* The Adapter scripts | * The Adapter scripts | ||
− | **read the raw log messages one-by-one (from | + | **read the raw log messages one-by-one (from sources such as files, queues, APIs, Databases etc), |
**parse the log messages, | **parse the log messages, | ||
**convert it in [[Khika Data Format|<span style="color:#0000ff"> Khika Data Format </span>]] | **convert it in [[Khika Data Format|<span style="color:#0000ff"> Khika Data Format </span>]] | ||
Line 11: | Line 11: | ||
− | With these concepts in mind, let proceed with an example of a production ready KHIKA Data Adapter. Login to your KHIKA Data Aggregator node (default username/password is khika/khika123). We will study a syslog based adapter that processes the messages received from a PaloAlto Firewall. Open file TLHook_Adaptor_PaloAlto.py from directory /opt/KHIKA/Apps/Adapters/PaloAltoFW. | + | With these concepts in mind, let us proceed with an example of a production ready KHIKA Data Adapter. Login to your KHIKA Data Aggregator node (default username/password is khika/khika123). We will study a syslog based adapter that processes the messages received from a PaloAlto Firewall. Open file TLHook_Adaptor_PaloAlto.py from directory /opt/KHIKA/Apps/Adapters/PaloAltoFW. |
Check first few lines of this file where we import some important python modules | Check first few lines of this file where we import some important python modules | ||
Line 18: | Line 18: | ||
3 import socket | 3 import socket | ||
4 import csv,StringIO | 4 import csv,StringIO | ||
− | 5 import logging | + | 5 import '''logging''' #The logging libraries |
− | 6 import re | + | 6 import '''re''' #python inbuilt regular expressions library |
7 import time | 7 import time | ||
− | 8 from time import strptime | + | 8 from time import '''strptime''' #Useful time format conversion functions |
9 from datetime import datetime | 9 from datetime import datetime | ||
10 import random | 10 import random | ||
Line 28: | Line 28: | ||
13 import pdb | 13 import pdb | ||
+ | Note that we have imported logging (line 5) and some useful time libraries (line 8,9). We have also imported 're' library for python regular expression. We will be using it in the code. | ||
− | + | Now, lets move to the bottom of this file and locate function "__main__". This is start of execution of the code. | |
+ | |||
+ | 295 if __name__ == "__main__": | ||
+ | 296 global isOccurrences | ||
+ | 297 isOccurrences = False | ||
+ | 298 dict_report_stats = {} | ||
+ | 299 g_smallest_day = 0 | ||
+ | 300 g_highest_day = 0 | ||
+ | 301 install_dir = os.path.dirname(os.path.realpath(__file__)) | ||
+ | 302 sys.path.insert(0, install_dir) | ||
+ | 303 sys.path.insert(0, install_dir + os.sep + ".." + os.sep + "TLCOMMON" + os.sep) | ||
+ | 304 g_hostname = socket.gethostname() | ||
+ | 305 file_name_format = sys.argv[1] if len(sys.argv) == 2 else os.getenv("TL_WORKSPACE")+'_'+os.getenv("TL_ADAPTER")+'_'+os.getenv("TL_AGGREGATOR") | ||
+ | 306 from TLHook_common import * | ||
+ | 307 logfile_path = install_dir+ '/' + 'log_'+file_name_format+'.log' | ||
+ | 308 if not is_safe_path(logfile_path): | ||
+ | 309 exit() | ||
+ | 310 logger = InitLogger(logfile_path,logging.INFO) | ||
+ | 311 logger.info("A new execution of script %s begins", __file__) | ||
+ | 312 int_time(logger) | ||
+ | 313 tz_file_path = install_dir+'/'+'timezone_'+file_name_format+'.csv' | ||
+ | 314 history_path = install_dir+'/history_'+file_name_format+'.csv' | ||
+ | 315 config_path = install_dir+'/config_'+file_name_format+'.csv' | ||
+ | 316 | ||
+ | 317 if not is_safe_path(tz_file_path) or not is_safe_path(history_path) or not is_safe_path(config_path) : | ||
+ | 318 logger.error("Path is invalid: history_path : %s ,timezone_file_path : %s , config_path : %s ",history_path,tz_file_path ,config_path ) | ||
+ | 319 exit() | ||
+ | 320 GetHostToTimeZoneDict(tz_file_path) | ||
+ | 321 ReadHistoryFile(history_path) | ||
+ | 322 ProcessUsingConfigFile(config_path, ProcessLineOfFile) | ||
+ | 323 PrintDashboardStatistics(dict_report_stats,logger,g_hostname,g_smallest_day,g_highest_day) | ||
+ | 324 WriteHistoryFile(history_path) | ||
+ | 325 | ||
+ | |||
+ | After doing some initializations (such as setting PATH, log file, timezone etc), we import TLHook_common on line 306. This a common library and provides functions for Timezone, logging and offset maintenance etc. The source code for this library can be found in /opt/KHIKA/Apps/Adapters/TLCOMMON/TLHook_common.py file on your KHIKA Data Aggregator Node. The library has various useful functions such ReadHistoryFile(), ProcessUsingConfigFile(), ProcessDir(), ProcessFile(), ProcessAllLinesOfFile() etc. We encourage you to read the code. | ||
+ | |||
+ | As We set the | ||
+ | * hstory_file for maintaining the offset, timestamp etc after each execution. (line 314) | ||
+ | * config_file for reading the configuration from (it basically tells what files to read from what directory). (line 315) | ||
+ | |||
+ | Followed by this the real stuff starts. On line 321 we call ReadHistoryFile() function with history_path as the argument. We basically read the offsets maintained in the history_file to understand from where we want to start reading the files during this execution. Note that the script executes after a periodic interval and every execution will update the offset of the files (using WriteHistoryFile() function) at the end of its execution. Some global data structures are set in ReadHistoryFile() to help us ''seek()'' to the offset when we call ProcessUsingConfigFile() on line 322. | ||
+ | |||
+ | Function ProcessUsingConfigFile() takes two arguments, <span style="color:#ff0000">config_path</span> and <span style="color:#ff0000">ProcessLineOfFile</span>. | ||
+ | *config_path is the file we set during our initialization. The config file is a csv file. Below is a sample of config file | ||
+ | /opt/remotesyslog/172.28.1.16,2.*.log$,None | ||
+ | /opt/remotesyslog/172.28.1.17,2.*.log$,None | ||
+ | |||
+ | The first part is directory from where we want to read the logs | ||
+ | The second part is the regular expression of filenames. Files matching the regular expression will be processed. KHIKA Data Aggregator receives PaloAlto Firewall Logs over syslog protocol and stores it in the /opt/remotesyslog directory. It dynamically creates a directory with IP address of the syslog source device (PaloAlto firewall, in this case). Under the directory, dynamic files are created per day basis in YYYY-MM-DD.log format (Eg: 2019-05-31.log) | ||
+ | *ProcessLineOfFile is a function that you implement for parsing specific data. This is where all the processing logic has to be coded. This function is coded on line 173 in this case. | ||
+ | |||
+ | |||
+ | 173 def ProcessLineOfFile(line,file,line_count,logger1,hostname, keyArgsDict=None): | ||
+ | 174 global g_hostname,dict_report_stats,g_smallest_day,g_highest_day | ||
+ | 175 global isOccurrences | ||
+ | 176 if hostname != "" or hostname != None: | ||
+ | 177 g_hostname = hostname | ||
+ | 178 try: | ||
+ | 179 TL_GET_RAW_LOG=False | ||
+ | 180 if keyArgsDict: | ||
+ | 181 TL_GET_RAW_LOG=keyArgsDict['TL_GET_RAW_LOG'] | ||
+ | 182 | ||
+ | 183 metadata = "" | ||
+ | 184 metadata += "tl_tag" + " \"PaloAltofw\" " + "tl_src_host \""+str(g_hostname) + "\" " | ||
+ | 185 keydata = "" | ||
+ | 186 epoch_time = "" | ||
+ | 187 event_str = "" | ||
+ | 188 line1 = line | ||
+ | 189 line = line.strip('\n') | ||
+ | 190 reader = csv.reader(StringIO.StringIO(line), delimiter=',',quotechar='"') | ||
+ | 191 line = reader.next() | ||
+ | |||
+ | It can have any processing logic and can get really complicated. After having the line in a variable, you can use regular expressions or split() or equivalent functions to separate the message into meaningful key-value pairs. Documentation of OEM helps to understand the messages and specific key names can be added to the message at this point. KHIKA tries to maintain the key names suggested by the OEM, in addition to adding its own keys. The additional keys added by KHIKA starts with tl_ prefix. | ||
+ | |||
+ | You can convert the timestamp from the message to EPOCH time using appropriate time() library routines. You create the complete message buffer in KHIKA Data Format | ||
+ | |||
+ | epoch_time is calculated using human readable timestamp picked from the message. 'metadata' variable contains all the key-value pairs. event_str, which is the mandatory key and can have an empty value. We construct the entire message in khika_output variable. | ||
+ | |||
+ | |||
+ | 277 khika_output= str(epochTime) + " : " + metadata + " event_str \"\"" | ||
+ | |||
+ | and call printData() library function. | ||
+ | |||
+ | 278 printData(khika_output, line, TL_GET_RAW_LOG) | ||
+ | |||
+ | ''When the buffer is ready for output, we recommend using printData() library function.'' You may simply print the buffer using print statement, which is perfectly fine. However, printData() library function is handy as it does some cleanup for you. Besides, it takes a useful argument TL_GET_RAW_LOG which when set to 1 (true), maintains the raw message in a separate key "tl_raw". Maintaining the raw messages is a requirement in some audits and RFPs. | ||
+ | |||
+ | ProcessLineOfFile() is executed for all lines and for all the files. We recommend doing strong exception handling and logging the errors using "logger" class we initiated. Parsing could get really tricky at times and hence it is important to log all the errors so that you ensure that you are not missing any lines due to parsing errors. | ||
+ | |||
+ | After processing is complete, we call WriteHistoryFile() function with history_path as argument. This records the offset, timestamp etc in history_file so that the next execution starts by seeking() to appropriate offset. | ||
+ | |||
+ | This was an overview of writing advanced KHIKA Adapters. | ||
+ | |||
+ | In the subsequent sections we will talk about parsing messages to create useful key-value pairs with specific examples. We walk through some sample code focusing only on ProcessLineOfFile() function which you will be overriding most of the times while integrating with a new data source using syslog protocol. | ||
+ | |||
+ | We will also talk about a few more adapters where data sources is | ||
+ | *[[Monitoring a local file using OSSEC Integration|<span style="color:#0000ff"> A local file on a server </span>]] | ||
+ | *Amazon S3 | ||
+ | *Kafka | ||
+ | *Logstash | ||
+ | *some third party APIs | ||
+ | *databases | ||
+ | etc |
Latest revision as of 11:32, 21 August 2019
After understanding Khika Data Format and going through the initial exercise of Writing you own KHIKA Data Adapters ,Now it is the time to create a production level KHIKA Adapter. A few points to note here before we begin writing our own Adapter:
- Adapters are scripts that execute on KHIKA Data Aggregator
- Adapters can be written in any programming language (our favorite being python 2.7)
- Adapters are scheduled processes and KHIKA Data Aggregator is responsible for scheduling them to run at a periodic interval (typically every 1 minute to 5 minutes)
- The Adapter scripts
- read the raw log messages one-by-one (from sources such as files, queues, APIs, Databases etc),
- parse the log messages,
- convert it in Khika Data Format
- Write the output to stdout
- KHIKA Data Aggregator pipes the output of the Aggregator script and send it to KHIKA over a SSL connection
With these concepts in mind, let us proceed with an example of a production ready KHIKA Data Adapter. Login to your KHIKA Data Aggregator node (default username/password is khika/khika123). We will study a syslog based adapter that processes the messages received from a PaloAlto Firewall. Open file TLHook_Adaptor_PaloAlto.py from directory /opt/KHIKA/Apps/Adapters/PaloAltoFW.
Check first few lines of this file where we import some important python modules
1 #!/bin/env python 2 import os, sys 3 import socket 4 import csv,StringIO 5 import logging #The logging libraries 6 import re #python inbuilt regular expressions library 7 import time 8 from time import strptime #Useful time format conversion functions 9 from datetime import datetime 10 import random 11 import calendar 12 from ipaddress import IPv4Network,IPv4Address 13 import pdb
Note that we have imported logging (line 5) and some useful time libraries (line 8,9). We have also imported 're' library for python regular expression. We will be using it in the code.
Now, lets move to the bottom of this file and locate function "__main__". This is start of execution of the code.
295 if __name__ == "__main__": 296 global isOccurrences 297 isOccurrences = False 298 dict_report_stats = {} 299 g_smallest_day = 0 300 g_highest_day = 0 301 install_dir = os.path.dirname(os.path.realpath(__file__)) 302 sys.path.insert(0, install_dir) 303 sys.path.insert(0, install_dir + os.sep + ".." + os.sep + "TLCOMMON" + os.sep) 304 g_hostname = socket.gethostname() 305 file_name_format = sys.argv[1] if len(sys.argv) == 2 else os.getenv("TL_WORKSPACE")+'_'+os.getenv("TL_ADAPTER")+'_'+os.getenv("TL_AGGREGATOR") 306 from TLHook_common import * 307 logfile_path = install_dir+ '/' + 'log_'+file_name_format+'.log' 308 if not is_safe_path(logfile_path): 309 exit() 310 logger = InitLogger(logfile_path,logging.INFO) 311 logger.info("A new execution of script %s begins", __file__) 312 int_time(logger) 313 tz_file_path = install_dir+'/'+'timezone_'+file_name_format+'.csv' 314 history_path = install_dir+'/history_'+file_name_format+'.csv' 315 config_path = install_dir+'/config_'+file_name_format+'.csv' 316 317 if not is_safe_path(tz_file_path) or not is_safe_path(history_path) or not is_safe_path(config_path) : 318 logger.error("Path is invalid: history_path : %s ,timezone_file_path : %s , config_path : %s ",history_path,tz_file_path ,config_path ) 319 exit() 320 GetHostToTimeZoneDict(tz_file_path) 321 ReadHistoryFile(history_path) 322 ProcessUsingConfigFile(config_path, ProcessLineOfFile) 323 PrintDashboardStatistics(dict_report_stats,logger,g_hostname,g_smallest_day,g_highest_day) 324 WriteHistoryFile(history_path) 325
After doing some initializations (such as setting PATH, log file, timezone etc), we import TLHook_common on line 306. This a common library and provides functions for Timezone, logging and offset maintenance etc. The source code for this library can be found in /opt/KHIKA/Apps/Adapters/TLCOMMON/TLHook_common.py file on your KHIKA Data Aggregator Node. The library has various useful functions such ReadHistoryFile(), ProcessUsingConfigFile(), ProcessDir(), ProcessFile(), ProcessAllLinesOfFile() etc. We encourage you to read the code.
As We set the
- hstory_file for maintaining the offset, timestamp etc after each execution. (line 314)
- config_file for reading the configuration from (it basically tells what files to read from what directory). (line 315)
Followed by this the real stuff starts. On line 321 we call ReadHistoryFile() function with history_path as the argument. We basically read the offsets maintained in the history_file to understand from where we want to start reading the files during this execution. Note that the script executes after a periodic interval and every execution will update the offset of the files (using WriteHistoryFile() function) at the end of its execution. Some global data structures are set in ReadHistoryFile() to help us seek() to the offset when we call ProcessUsingConfigFile() on line 322.
Function ProcessUsingConfigFile() takes two arguments, config_path and ProcessLineOfFile.
- config_path is the file we set during our initialization. The config file is a csv file. Below is a sample of config file
/opt/remotesyslog/172.28.1.16,2.*.log$,None /opt/remotesyslog/172.28.1.17,2.*.log$,None
The first part is directory from where we want to read the logs The second part is the regular expression of filenames. Files matching the regular expression will be processed. KHIKA Data Aggregator receives PaloAlto Firewall Logs over syslog protocol and stores it in the /opt/remotesyslog directory. It dynamically creates a directory with IP address of the syslog source device (PaloAlto firewall, in this case). Under the directory, dynamic files are created per day basis in YYYY-MM-DD.log format (Eg: 2019-05-31.log)
- ProcessLineOfFile is a function that you implement for parsing specific data. This is where all the processing logic has to be coded. This function is coded on line 173 in this case.
173 def ProcessLineOfFile(line,file,line_count,logger1,hostname, keyArgsDict=None): 174 global g_hostname,dict_report_stats,g_smallest_day,g_highest_day 175 global isOccurrences 176 if hostname != "" or hostname != None: 177 g_hostname = hostname 178 try: 179 TL_GET_RAW_LOG=False 180 if keyArgsDict: 181 TL_GET_RAW_LOG=keyArgsDict['TL_GET_RAW_LOG'] 182 183 metadata = "" 184 metadata += "tl_tag" + " \"PaloAltofw\" " + "tl_src_host \""+str(g_hostname) + "\" " 185 keydata = "" 186 epoch_time = "" 187 event_str = "" 188 line1 = line 189 line = line.strip('\n') 190 reader = csv.reader(StringIO.StringIO(line), delimiter=',',quotechar='"') 191 line = reader.next()
It can have any processing logic and can get really complicated. After having the line in a variable, you can use regular expressions or split() or equivalent functions to separate the message into meaningful key-value pairs. Documentation of OEM helps to understand the messages and specific key names can be added to the message at this point. KHIKA tries to maintain the key names suggested by the OEM, in addition to adding its own keys. The additional keys added by KHIKA starts with tl_ prefix.
You can convert the timestamp from the message to EPOCH time using appropriate time() library routines. You create the complete message buffer in KHIKA Data Format
epoch_time is calculated using human readable timestamp picked from the message. 'metadata' variable contains all the key-value pairs. event_str, which is the mandatory key and can have an empty value. We construct the entire message in khika_output variable.
277 khika_output= str(epochTime) + " : " + metadata + " event_str \"\""
and call printData() library function.
278 printData(khika_output, line, TL_GET_RAW_LOG)
When the buffer is ready for output, we recommend using printData() library function. You may simply print the buffer using print statement, which is perfectly fine. However, printData() library function is handy as it does some cleanup for you. Besides, it takes a useful argument TL_GET_RAW_LOG which when set to 1 (true), maintains the raw message in a separate key "tl_raw". Maintaining the raw messages is a requirement in some audits and RFPs.
ProcessLineOfFile() is executed for all lines and for all the files. We recommend doing strong exception handling and logging the errors using "logger" class we initiated. Parsing could get really tricky at times and hence it is important to log all the errors so that you ensure that you are not missing any lines due to parsing errors.
After processing is complete, we call WriteHistoryFile() function with history_path as argument. This records the offset, timestamp etc in history_file so that the next execution starts by seeking() to appropriate offset.
This was an overview of writing advanced KHIKA Adapters.
In the subsequent sections we will talk about parsing messages to create useful key-value pairs with specific examples. We walk through some sample code focusing only on ProcessLineOfFile() function which you will be overriding most of the times while integrating with a new data source using syslog protocol.
We will also talk about a few more adapters where data sources is
- A local file on a server
- Amazon S3
- Kafka
- Logstash
- some third party APIs
- databases
etc