gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r30574 - eclectic/gplmt/gplmt


From: gnunet
Subject: [GNUnet-SVN] r30574 - eclectic/gplmt/gplmt
Date: Wed, 6 Nov 2013 18:46:37 +0100

Author: peterb
Date: 2013-11-06 18:46:36 +0100 (Wed, 06 Nov 2013)
New Revision: 30574

Modified:
   eclectic/gplmt/gplmt/Configuration.py
   eclectic/gplmt/gplmt/Tasks.py
   eclectic/gplmt/gplmt/Worker.py
Log:
Added support for "node", "start_absolute", "stop_absolute", "start_relative" 
and "stop_relative" tags in task lists


Modified: eclectic/gplmt/gplmt/Configuration.py
===================================================================
--- eclectic/gplmt/gplmt/Configuration.py       2013-11-06 17:32:36 UTC (rev 
30573)
+++ eclectic/gplmt/gplmt/Configuration.py       2013-11-06 17:46:36 UTC (rev 
30574)
@@ -41,7 +41,7 @@
         assert (None != logger)
         self.gplmt_filename = filename
         self.gplmt_logger = logger
-        self.gplmt_parallelism = 0
+        self.gplmt_parallelism = sys.maxint
         self.gplmt_notifications = ""
         self.gplmt_taskfile = None
         self.gplmt_nodesfile = None

Modified: eclectic/gplmt/gplmt/Tasks.py
===================================================================
--- eclectic/gplmt/gplmt/Tasks.py       2013-11-06 17:32:36 UTC (rev 30573)
+++ eclectic/gplmt/gplmt/Tasks.py       2013-11-06 17:46:36 UTC (rev 30574)
@@ -25,7 +25,9 @@
     import Util
     import sys
     import os
+    import re
     import gplmt
+    from datetime import datetime
     from xml.parsers import expat  
     from minixsv import pyxsval as xsv 
     from elementtree import ElementTree
@@ -76,6 +78,11 @@
         self.dest = None
         self.command_file = None
         self.output_prefix = None
+        self.node = ""
+        self.start_absolute = datetime.min
+        self.stop_absolute = datetime.max
+        self.start_relative = 0
+        self.stop_relative = sys.maxint
         
     def copy (self):
         t = Task ()
@@ -93,6 +100,11 @@
         t.dest = self.dest
         t.command_file = self.command_file
         t.output_prefix = self.output_prefix
+        t.node = self.node
+        t.start_absolute = self.start_absolute
+        t.stop_absolute = self.stop_absolute
+        t.start_relative = self.start_relative
+        t.stop_relative = self.stop_relative
         return t
     def log (self):
         glogger.log ("Task " + str(self.id) + ": " + self.name)
@@ -119,6 +131,12 @@
     def __init__(self):
         self.set = list()
 
+def parse_relative (text):
+    regex  = 
re.compile('(?P<sign>-?)P(?:(?P<years>\d+)Y)?(?:(?P<months>\d+)M)?(?:(?P<days>\d+)D)?(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)?')
+    duration = regex.match(text).groupdict(0)
+    
+    return int(duration['seconds']) + 60 * (int(duration['minutes']) + 60 * 
(int(duration['hours']) + 24 * \
+    (int(duration['days']) + 31 * int(duration['months']) + 365 * int(duration 
['years']))))
 
 def handle_task (elem, tasks):
     t = Task ()
@@ -194,6 +212,38 @@
             t.dest = child.text
             if ('' != g_configuration.gplmt_userdir and Operation.get == 
t.type):
                 t.dest = os.path.join(g_configuration.gplmt_userdir, 
os.path.basename(t.dest))
+        
+        if ((child.tag == "node") and (child.text != None)):
+            t.node = child.text
+            print "Node: " + t.node
+            
+        if ((child.tag == "start_absolute") and (child.text != None)):
+            try:
+                t.start_absolute = datetime.strptime(child.text, 
"%Y-%m-%dT%H:%M:%S")
+            except ValueError:
+                print "Invalid absolute start time '" +child.text+ "' for task 
id " + str (t.id) + " name " + t.name
+            print "start_absolute: " + t.start_absolute.strftime("%A, %d. %B 
%Y %I:%M%p")
+        
+        if ((child.tag == "stop_absolute") and (child.text != None)):
+            try:
+                t.stop_absolute = datetime.strptime(child.text, 
"%Y-%m-%dT%H:%M:%S")
+            except ValueError:
+                print "Invalid absolute stop time '" +child.text+ "' for task 
id " + str (t.id) + " name " + t.name
+            print "stop_absolute: " + t.stop_absolute.strftime("%A, %d. %B %Y 
%I:%M%p")
+                
+        if ((child.tag == "start_relative") and (child.text != None)):
+            try:
+                t.start_relative = parse_relative(child.text)
+            except ValueError:
+                print "Invalid relative start time '" +child.text+ "' for task 
id " + str (t.id) + " name " + t.name
+            print "start_relative: " + str(t.start_relative)
+            
+        if ((child.tag == "stop_relative") and (child.text != None)):
+            try:
+                t.stop_relative = parse_relative(child.text)
+            except ValueError:
+                print "Invalid relative stop time '" +child.text+ "' for task 
id " + str (t.id) + " name " + t.name
+            print "stop_relative: " + str(t.stop_relative)
 
     if (False == t.check()):
         print "Parsed invalid task with id " + str (t.id) + " name '" + t.name 
+ "'"

Modified: eclectic/gplmt/gplmt/Worker.py
===================================================================
--- eclectic/gplmt/gplmt/Worker.py      2013-11-06 17:32:36 UTC (rev 30573)
+++ eclectic/gplmt/gplmt/Worker.py      2013-11-06 17:46:36 UTC (rev 30574)
@@ -31,6 +31,7 @@
 import signal
 import inspect
 import subprocess
+import datetime
 
 try:
     import gplmt.Configuration as Configuration
@@ -93,7 +94,8 @@
         threading.Thread.__init__(self)
         self.threadID = threadID
         self.node = node
-        self.tasks = tasks    
+        self.tasks = tasks
+        self.timer = None
     def connect (self):
         raise NotImplementedError (inspect.stack()[0][3]) 
     def disconnect (self):       
@@ -105,8 +107,10 @@
     def exec_put (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self, task):
-        raise NotImplementedError (inspect.stack()[0][3]) 
-    def run(self):    
+        raise NotImplementedError (inspect.stack()[0][3])
+    def interrupt_task (self):
+        raise NotImplementedError (inspect.stack()[0][3])
+    def run(self):   
         global interrupt
         tasklist_success = True
         # Connecting
@@ -130,8 +134,41 @@
         if (interrupt):
             g_notifications.tasklist_completed (self.node, self.tasks, 
Tasks.Taskresult.user_interrupt, "")                        
         # Executing Tasks 
-        while (None != task and not interrupt):            
-            g_logger.log (self.node.hostname + " : Running task id " 
+str(task.id)+" '" + task.name + "'")                        
+        while (None != task and not interrupt):
+        
+            if (None != self.timer):
+                self.timer.cancel()
+                self.timer = None
+        
+            if (task.node and task.node != self.node.hostname):
+                g_logger.log (self.node.hostname + " : Ignoring task due to 
set node attribute");
+                task = self.tasks.get()
+                continue
+                
+            delta = int(max((task.start_absolute - 
datetime.datetime.now()).total_seconds(), task.start_relative))
+            
+            if (delta > 0):
+                g_logger.log (self.node.hostname + " : Continuing execution in 
" + str(delta) + " seconds")
+                for x in range(0, delta):
+                    time.sleep(1) 
+                    if (interrupt):
+                        g_notifications.tasklist_completed (self.node, 
self.tasks, Tasks.Taskresult.user_interrupt, "")
+                        g_notifications.task_completed (self.node, task, 
Tasks.Taskresult.user_interrupt, "task was interrupted", "")
+                        return
+                        
+                                                                     
+            
+            g_logger.log (self.node.hostname + " : Running task id " 
+str(task.id)+" '" + task.name + "'")     
+            
+            
+            delta = int(min((task.stop_absolute - 
datetime.datetime.now()).total_seconds(), task.stop_relative))
+            
+            if (delta > 0):
+                g_logger.log (self.node.hostname + " : Task will be 
interrupted in " + str(delta) + " seconds")
+                self.timer = threading.Timer(delta, self.interrupt_task)
+                self.timer.start()
+            
+                               
             g_notifications.task_started (self.node, task, "")
             task_result = None
             try:
@@ -217,9 +254,14 @@
         return TaskExecutionResult(Tasks.Taskresult.success, "exec_put 
successful", "")             
     def exec_get (self, task):
         print "TestWorker gets '" + task.name + "' " + task.src + "' '" + 
task.dest+ "'"
-        return TaskExecutionResult(Tasks.Taskresult.success, "exec_get 
successful", "")         
+        return TaskExecutionResult(Tasks.Taskresult.success, "exec_get 
successful", "")     
+    def interrupt_task (self):
+        print "TestWorker task is interrupted by timeout"  
 
 class LocalWorker (AbstractWorker):
+    def __init__(self, threadID, node, tasks):
+        AbstractWorker.__init__(self, threadID, node, tasks)
+        self.process = None
     def connect (self):
         g_logger.log ("LocalWorker connects to '" + self.node.hostname + "'")
         try:
@@ -241,7 +283,10 @@
         output = ""
         found = False
         try:
-            output = subprocess.check_output(task.command + " " + 
task.arguments, shell=True)
+            self.process = subprocess.Popen("exec " + task.command, 
stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
+            stdoutdata, stderrdata = self.process.communicate()
+            output = stdoutdata
+            #subprocess.check_output(task.command + " " + task.arguments, 
shell=True)
             output = output.rstrip()
         except subprocess.CalledProcessError as e:
             returncode = e.returncode
@@ -259,10 +304,17 @@
     def exec_put (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self, task):
-        raise NotImplementedError (inspect.stack()[0][3]) 
+        raise NotImplementedError (inspect.stack()[0][3])
+    def interrupt_task (self):
+        g_logger.log (self.node.hostname + " : Task interrupted by timeout")
+        if (None != self.process):
+            self.process.terminate()
 
 
 class RemoteSSHWorker (AbstractWorker):
+    def __init__(self, threadID, node, tasks):
+        AbstractWorker.__init__(self, threadID, node, tasks)
+        self.task_interrupted = False
     def connect (self):
         self.ssh = None
         if (interrupt):
@@ -369,6 +421,7 @@
         return self.exec_run (t)
     def exec_run (self, task):        
         global interrupt
+        self.task_interrupted = False
         message = "undefined"
         output = ""
         if(interrupt):
@@ -402,6 +455,11 @@
             if(interrupt):
                 result = Tasks.Taskresult.user_interrupt
                 break
+            if (self.task_interrupted):
+                channel.close()
+                exit_status = 0
+                break
+                
             if (timeout != -1):
                 delta = time.time() - start_time
                 if (delta > timeout):
@@ -427,7 +485,7 @@
                         stderr_data += data
                 if not got_data:
                     break        
-        if (result == Tasks.Taskresult.success):
+        if (not self.task_interrupted and result == Tasks.Taskresult.success):
             exit_status = channel.recv_exit_status ()          
         if (result == Tasks.Taskresult.success):
             if (task.expected_return_code != -1):                    
@@ -520,7 +578,10 @@
             pass     
         if (None == result):          
             result = TaskExecutionResult(Tasks.Taskresult.success, "Store 
source '"+task.src+"' in '" +task.dest+"'", "")      
-        return result     
+        return result
+    def interrupt_task (self):
+        g_logger.log (self.node.hostname + " : Task interrupted by timeout")
+        self.task_interrupted = True   
     
 class PlanetLabWorker (RemoteSSHWorker):
     def connect (self):
@@ -588,7 +649,7 @@
     def start (self):
         g_logger.log ("Starting execution for node " + self.node.hostname)
         self.thread.start()
-    
+
 class Worker:
     def __init__(self, logger, configuration, target, nodes, tasks, 
notifications):
         global g_logger;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]