[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28173 - gnunet-planetlab/gplmt/gplmt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28173 - gnunet-planetlab/gplmt/gplmt |
Date: |
Thu, 18 Jul 2013 17:38:15 +0200 |
Author: wachs
Date: 2013-07-18 17:38:15 +0200 (Thu, 18 Jul 2013)
New Revision: 28173
Modified:
gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
changes to ssh worker
Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-18 14:48:06 UTC (rev
28172)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-18 15:38:15 UTC (rev
28173)
@@ -39,7 +39,7 @@
from gplmt.SCP import SCPClient
from gplmt.SCP import SCPException
except ImportError as e:
- print "That's a bug! please check README: " + str(e)
+ print "That's a bug! please check README: " + __file__ + " : " + str(e)
sys.exit(1)
@@ -229,21 +229,184 @@
def exec_get (self):
raise NotImplementedError (inspect.stack()[0][3])
+
class RemoteSSHWorker (AbstractWorker):
def connect (self):
- raise NotImplementedError (inspect.stack()[0][3])
+ try:
+ ssh = paramiko.SSHClient()
+ if (g_configuration.ssh_use_known_hosts):
+ g_logger.log (self.node.hostname + " : Loading known hosts")
+ ssh.load_system_host_keys ()
+ # Automatically add new hostkeys
+ if (g_configuration.ssh_add_unkown_hostkeys == True):
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ # check for private key existance
+ keyfile = None
+ if (g_configuration.ssh_keyfile != None):
+ if (os.path.exists (g_configuration.ssh_keyfile)):
+ g_logger.log (self.node.hostname + " : Found " +
g_configuration.ssh_keyfile)
+ keyfile = g_configuration.ssh_keyfile
+ else:
+ g_logger.log (self.node.hostname + " : Not found " +
g_configuration.ssh_keyfile)
+ g_logger.log (self.node.hostname + " : Trying to connect to '"
+Util.print_ssh_connection (self.node) + "'")
+ if self.node.username is not None: #credentials are supplied in
node file
+ if (self.node.password is not None):
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ username=self.node.username,
+ password=self.node.password,
+ timeout=10)
+ else:
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ username=self.node.username,
+ timeout=10)
+ elif ("" != g_configuration.ssh_username):
+ g_logger.log (self.node.hostname + " : Using private keyfile
'" +str(keyfile)+ "'")
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ username=g_configuration.ssh_username,
+ password=g_configuration.ssh_password,
+ timeout=10,
+ key_filename=keyfile)
+ elif ("" != g_configuration.ssh_password):
+ g_logger.log (self.node.hostname + " : Trying to connect to "
+
+ self.node.hostname +
+ " using password '" +
g_configuration.ssh_password+
+ "' and private keyfile '" +str(keyfile)+ "'")
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ password=g_configuration.ssh_password,
+ timeout=10,
+ key_filename=keyfile)
+ else:
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ timeout=10,
+ key_filename=keyfile)
+ self.transport = ssh.get_transport()
+ except (IOError,
+ paramiko.SSHException,
+ paramiko.BadHostKeyException,
+ paramiko.AuthenticationException,
+ socket.error) as e:
+ g_logger.log (self.node.hostname + " : Error while trying to
connect: " + str(e))
+ return False
+ g_logger.log (self.node.hostname + " : Connected!")
+ return True
def disconnect (self):
raise NotImplementedError (inspect.stack()[0][3])
def exec_run_per_host (self, task):
raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self, task):
- raise NotImplementedError (inspect.stack()[0][3])
+ global interrupt
+ message = "undefined"
+ output = ""
+ if(interrupt):
+ message = "'"+ task.name + "' interrupted by user"
+ g_logger.log (self.node.hostname + " : Task '"+ message)
+ return TaskExecutionResult(Tasks.Taskresult.user_interrupt,
"interrupted by user", "")
+ if ((task.command == None) and (task.arguments == None)):
+ message = "'"+ task.name + "' no command to execute"
+ g_logger.log (self.node.hostname + " : Task " + message)
+ return TaskExecutionResult(Tasks.Taskresult.fail, "no command to
execute", "")
+ try:
+ channel = self.transport.open_session()
+ channel.settimeout(1.0)
+ channel.get_pty ()
+ #print self.node + " CMD: " + task.command + " " + task.arguments
+ channel.exec_command(task.command + " " + task.arguments)
+
+ except SSHException as e:
+ g_logger.log (self.node.hostname + " : Error while trying to
connect: " + str(e))
+
+ if (task.timeout > 0):
+ timeout = task.timeout
+ else:
+ timeout = -1
+ result = Tasks.Taskresult.success
+ exit_status = -1
+ start_time = time.time ()
+
+ stdout_data = ""
+ stderr_data = ""
+
+ while 1:
+ if(interrupt):
+ result = Tasks.Taskresult.user_interrupt
+ break
+ if (timeout != -1):
+ delta = time.time() - start_time
+ if (delta > timeout):
+ g_logger.log (self.node.hostname + " : Timeout after "
+str(delta) +" seconds")
+ result = Tasks.Taskresult.timeout
+ break
+ (r, w, e) = select.select([channel], [], [], 1)
+ if r:
+ got_data = False
+ if channel.recv_ready():
+ data = r[0].recv(4096)
+ if data:
+ got_data = True
+ g_logger.log (self.node.hostname + " : " + data)
+ output += data
+ stdout_data += data
+ if channel.recv_stderr_ready():
+ data = r[0].recv_stderr(4096)
+ if data:
+ got_data = True
+ g_logger.log (self.node.hostname + " : " + data)
+ output += data
+ stderr_data += data
+ if not got_data:
+ break
+
+ if (result == Tasks.Taskresult.success):
+ exit_status = channel.recv_exit_status ()
+
+ if (result == Tasks.Taskresult.success):
+ if (task.expected_return_code != -1):
+ if (exit_status != task.expected_return_code):
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, but exit code "
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
+ g_logger.log (stdout_data)
+ g_logger.log (stderr_data)
+ result = Tasks.Taskresult.return_value_did_not_match
+ else:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code "
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
+
+ if (task.expected_output != None):
+ output_contained = False
+ if (task.expected_output in stdout_data):
+ output_contained = True
+ if (task.expected_output in stderr_data):
+ output_contained = True
+ if (output_contained == True):
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' expected output '"+task.expected_output+"' was found")
+ else:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' expected output '"+task.expected_output+"' was not found")
+ result = Tasks.Taskresult.output_did_not_match
+
+ if (result == Tasks.Taskresult.success):
+ message = "'"+ task.name + "' successful"
+ g_logger.log (self.node.hostname + " : Task " + message)
+ elif (result == Tasks.Taskresult.timeout):
+ message = "'"+ task.name + "' with timeout"
+ g_logger.log (self.node.hostname + " : Task "+ message)
+ elif (result == Tasks.Taskresult.user_interrupt):
+ message = "'"+ task.name + "' interrupted by user"
+ g_logger.log (self.node.hostname + " : Task "+ message)
+ else:
+ message = "'"+ task.name + "' failed"
+ g_logger.log (self.node.hostname + " : Task "+ message)
+ return TaskExecutionResult(result, message, output)
+
+ return TaskExecutionResult(Tasks.Taskresult.success, "exec_run
successful", "")
def exec_put (self):
raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self):
raise NotImplementedError (inspect.stack()[0][3])
-class PlanetLabWorker (AbstractWorker):
+class PlanetLabWorker (RemoteSSHWorker):
def connect (self):
raise NotImplementedError (inspect.stack()[0][3])
def disconnect (self):
@@ -256,7 +419,6 @@
raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self):
raise NotImplementedError (inspect.stack()[0][3])
-
class NodeWorker ():
def __init__(self, target, node, tasks):
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28173 - gnunet-planetlab/gplmt/gplmt,
gnunet <=