Sunday, August 18, 2019

Real-Time Data Analysis From Debug Logs


Debug logs contain a plethora of business and system information.  Most commonly, the logs are cached and post processed, but occasionally it's valuable to process the logs as they are being appended to.  For instance, monitoring the current system state from logs requires a means to read the logs as they are being updated, extract log events and calculating system metrics from the events.

Let's use a simple example, one that monitors (or follows) a 'top.dat' file which is generated by piping 'top' to the file.  The python monitoring utility will process the 'top.dat' file log events as they are written.  Since we wish to monitor the log file(s) and display metrics along the way, we need at least two threads of control: one to monitor and extract information from the 'top.dat' file, the other to display info to the user.  This example calculates the cpu load by extracting the idle percentage from top.

     1 #!/usr/bin/python
     2 import logging
     3 import threading
     4 import time
     5 import glob;
     6 import re;
     7 import datetime;
     8
     9 class Display:
    10   def __init__(self):
    11     self.lock_ = threading.Lock();
    12     self.map_=dict();
    13     pass;
    14   
    15   def update(self,key,val):
    16     self.lock_.acquire();
    17     self.map_[key]=val;
    18     self.lock_.release();
    19   
    20   def display(self):
    21     self.lock_.acquire();
    22     print "---------------"
    23     print datetime.datetime.now();
    24     for k in self.map_.keys():
    25       print "%s : %s"%(k,self.map_[k]);
    26     print "\n"
    27     self.lock_.release();
    28
    29 display=Display();
    30
    31 class fileProcessor:
    32   def __init__(self, fileName): 
    33     self.fileName_=fileName;
    34     logging.info("following %s"%(fileName));
    35     self.follow(open(self.fileName_,'r'));
    36
    37   def follow(self, fp):
    38     fp.seek(0,2);
    39     while True:
    40         line = fp.readline();
    41         if not line:
    42             time.sleep(0.1);
    43             continue;
    44         self.handle(line);
    45
    46   def handle(self, line):
    47 #   logging.info("processing line %s"%(line));
    48     m=re.match('.+, (.+) id,.*',line);
    49     if (m):
    50 #     print m.group(0);
    51       cpuLoad=100.0-float(m.group(1));
    52       print "cpuLoad: %s"%(cpuLoad);
    53       display.update('CpuLoad',cpuLoad);
    54
    55 def thread_function(name):
    56   logging.info("running %s"%(name));
    57   obj=fileProcessor(name);
    58
    59 if __name__ == "__main__":
    60   format = "%(asctime)s: %(message)s";
    61   logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S");
    62
    63   logging.info("main process initializing");
    64   t1=threading.Thread(target=thread_function, args=('./top.dat',));
    65   t1.start();
    66   logging.info("main process terminating");

Not terribly interesting, and you could readily accomplish the same thing on the main thread, there is only one file to monitor.

But suppose you wish to monitor X files, then you really need threading.

     1 #!/usr/bin/python
     2 import logging
     3 import threading
     4 import time
     5 import glob;
     6 import re;
     7 import datetime;
     8
     9 class Display:
    10   def __init__(self):
    11     self.lock_ = threading.Lock();
    12     self.map_=dict();
    13     pass;
    14   
    15   def update(self,key,val):
    16     self.lock_.acquire();
    17     self.map_[key]=val;
    18     self.lock_.release();
    19   
    20   def display(self):
    21     self.lock_.acquire();
    22     print "---------------"
    23     print datetime.datetime.now();
    24     for k in self.map_.keys():
    25       print "%s : %s"%(k,self.map_[k]);
    26     print "\n"
    27     self.lock_.release();
    28
    29 display=Display();
    30
    31 class fileProcessor:
    32   def __init__(self, fileName): 
    33     self.fileName_=fileName;
    34     logging.info("following %s"%(fileName));
    35     self.follow(open(self.fileName_,'r'));
    36
    37   def follow(self, fp):
    38     fp.seek(0,2);
    39     while True:
    40         line = fp.readline();
    41         if not line:
    42             time.sleep(0.1);
    43             continue;
    44         self.handle(line);
    45
    46   def handle(self, line):
    47 #   logging.info("processing line %s"%(line));
    48     m=re.match('.+, (.+) id,.*',line);
    49     if (m):
    50 #     print m.group(0);
    51       cpuLoad=100.0-float(m.group(1));
    52       print "cpuLoad: %s"%(cpuLoad);
    53       display.update('CpuLoad',cpuLoad);
    54
    55 def thread_function(name):
    56   logging.info("running %s"%(name));
    57   obj=fileProcessor(name);
    58
    59 if __name__ == "__main__":
    60   format = "%(asctime)s: %(message)s";
    61   logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S");
    62
    63   logging.info("main process initializing");
    64   D=dict();
    65   while(1):
    66     fList=glob.glob("./*.dat");
    67     logging.info("fList: %s"%(str(fList)));
    68      for e in fList:
    69       if not e in D.keys():
    70         logging.info("starting thread for '%s'"%(e));
    71         D[e]=threading.Thread(target=thread_function, args=(e,));
    72         D[e].start();
    73     display.display();
    74     time.sleep(3);
    75   logging.info("main process terminating");

If you had 100 '*.dat' files the above would spawn 100 threads, each following a dedicated file, each updating the display which is rendered every 3 seconds.

Just recently put something similar together to monitor dozens of user logs and tracking uptime and job submission events. Works real well, hope you have equal success.

Cheers.

No comments:

Post a Comment