[Python] Richiesta di consiglio...

Roberto Bettazzoni ml a bettazzoni.it
Sab 20 Gen 2007 11:17:46 CET


Marco ha scritto:
> Salve
> io vorrei far lanciare a un programma python un programma esterno,
> se uso il comando
> status = os.system(cmd)
> python si ferma e resta in attesa del programma esterno.
> A me servirebbe invece che il programma esterno fosse indipendente ma 
> recuperare il numero del processo per poterlo killare successimante.
> 
> Qualcuo ha qualche consiglio.
> Grazie...
> -- 
> Marco M.

Ciao,

mi sono appena iscritto alla lista e premetto che non mi sento
abbastanza pitonico da dare consigli ... comunque ci provo.
Il problema l'ho risolto così (in allegato, con gli unittest).
E' specifico per lanciare un'altro programma Python, ma con poche
modifiche può lanciare qualsiasi altro programma.
Funziona solo con solo Python >= 2.4

Spero che ti sia utile,
(ovviamente ogni critica/consiglio/miglioramento è ben accetta)

Roberto


-------------- parte successiva --------------
#!/usr/bin/env python
#
# author: Roberto Bettazzoni 
# date  : 10/01/2006
#

import os, sys, time, unittest
from pyprocessrunner import * 

PY_FILE_NAME_NOEND ="test1.py"
PY_FILE_NAME    ="test2.py"
PY3_FILE_NAME   ="test3.py"

HELLO_WORLD_PROGRAM = """
print "Hello World!"
"""

NEVERENDING_PROGRAM = """
import time
while 1:
    print "never ending program"
    time.sleep(0.1)
"""

ENDING_PROGRAM = """
import time, sys
print 'sleep for 0.1 sec.'
time.sleep(0.1)
print 'EoJ'
sys.exit(69) # it returns 69 to OS !!!
"""

ENDING_PROGRAM_WITH_2_INT_PARAMETERS = """
import time, sys
if len(sys.argv) != 3:
    print "BAD PARAMETERS"
    print sys.argv
    print 'sleep for 10 sec.'
    time.sleep(10.0)
a = int(sys.argv[1])
b = int(sys.argv[2])
c = a + b
print 'risultato = %d' % c
print 'sleep for 0.1 sec.'
time.sleep(0.1)
sys.exit(c) # return a + b to os !!!
"""

class File_pyprocessrunner_TestCase(unittest.TestCase):
    """
    Unit test for class PyProcess
    """    
    def testConstructor(self):
        p1= PyProcess(PY_FILE_NAME_NOEND)
        self.assertEqual(p1.fileName, PY_FILE_NAME_NOEND)
        self.assertEqual(p1.process, None)
        self.assertEqual(p1.cmd_list, ['python', PY_FILE_NAME_NOEND])
        self.assertFalse(p1.isRunning())
        self.assertEqual(p1.desc, PY_FILE_NAME_NOEND)
        p2= PyProcess(PY_FILE_NAME_NOEND, ["a", "b", "c"])
        self.assertEqual(p2.fileName, PY_FILE_NAME_NOEND)
        self.assertEqual(p2.process, None)
        self.assertEqual(p2.cmd_list, ['python', PY_FILE_NAME_NOEND, "a", "b", "c"])
        self.assertFalse(p2.isRunning())
        self.assertEqual(p2.desc, PY_FILE_NAME_NOEND)
        p3= PyProcess("A", None, "Descrizione")
        self.assertEqual(p3.fileName, "A")
        self.assertEqual(p3.process, None)
        self.assertEqual(p3.cmd_list, ['python', "A"])
        self.assertFalse(p3.isRunning())
        self.assertEqual(p3.desc, "Descrizione")
        self.assertFalse(p3.stdout_on_consolle)
        p4= PyProcess("BCD", ("a",), "", True)
        self.assertEqual(p4.fileName, "BCD")
        self.assertEqual(p4.process, None)
        self.assertEqual(p4.cmd_list, ['python', "BCD", "a"])
        self.assertFalse(p4.isRunning())
        self.assertEqual(p4.desc, "BCD")
        self.assertTrue(p4.stdout_on_consolle)

    def testProcessStandardOutput(self):
         # crea il programma su file
        f = file(PY_FILE_NAME, "wt")
        f.write(HELLO_WORLD_PROGRAM)
        f.close()
        try:
            p1= PyProcess(PY_FILE_NAME)
            p1.start()
            while (p1.isRunning()):
                time.sleep(0.05)
            f = file(p1.tmpOutput[1], "rt")
            try:
                self.assertEqual(f.read().strip(), "Hello World!")
            finally:
                f.close()
        finally:
            p1.kill() # effettua anche la delete dell'output
            os.remove(PY_FILE_NAME)

    def testLauncer(self):
        # crea il programma su file
        f = file(PY_FILE_NAME_NOEND, "wt")
        f.write(NEVERENDING_PROGRAM)
        f.close()

        p1= PyProcess(PY_FILE_NAME_NOEND)
        p2= PyProcess(PY_FILE_NAME_NOEND)
        p1.start()
        p2.start()
        time.sleep(0.05)
        p1.kill()
        time.sleep(0.05)
        p2.kill()
        time.sleep(0.05)
        try:
            os.remove(PY_FILE_NAME_NOEND)
        except:
            pass
 
    def testEndingProgramWith2Parameters(self):
        # crea il programma su file
        f = file(PY3_FILE_NAME, "wt")
        f.write(ENDING_PROGRAM_WITH_2_INT_PARAMETERS)
        f.close()
        
        p3= PyProcess(PY3_FILE_NAME, [ "11", "2" ])
        p3.start()
        while (not p3.isRunning()):
            pass
        # qui il processo gira
        while (p3.isRunning()):
            pass
        # qui no
        self.assertEqual(p3.process.poll(), 13)
        p3.kill() # deve essere safe !
        time.sleep(0.05)
        try:
            os.remove(PY3_FILE_NAME)
        except:
            pass
        
    def testEndingProgram(self):
        # crea il programma su file
        f = file(PY_FILE_NAME, "wt")
        f.write(ENDING_PROGRAM)
        f.close()
        
        p1= PyProcess(PY_FILE_NAME)
        p1.start()
        while (not p1.isRunning()):
            pass
        # qui il processo gira
        if (p1.isRunning()):
            self.assertRaises(Exception, p1.start)
        while (p1.isRunning()):
            pass
        # qui no
        self.assertEqual(p1.process.poll(), 69)
        p1.kill() # deve essere safe !
        time.sleep(0.05)
        try:
            os.remove(PY_FILE_NAME)
        except:
            pass



if __name__ == "__main__":
    unittest.main()
-------------- parte successiva --------------
#!/usr/bin/env python
#
# author: Roberto Bettazzoni 
# date  : 10/01/2006
#
import subprocess, tempfile, os
try:
    import win32api
    def killProcess(pid):
            PROCESS_TERMINATE = 1
            handle = win32api.OpenProcess(PROCESS_TERMINATE, False, pid)
            win32api.TerminateProcess(handle, -1)
            win32api.CloseHandle(handle)
    
except ImportError:
    # it's not windows 
    from signal import SIGTERM
    def killProcess(pid):
        os.kill(pid, SIGTERM)
    
TMP_POSTFIX = "_PyProc.tmp"
def removeTmpFiles():
    for f in os.listdir(tempfile.gettempdir()):
        if f.find(TMP_POSTFIX) > 0:
            try:
                os.remove(f)
            except OSError:
                pass

class PyProcess:
    def __init__(self, py_program_fileName, parameters = None, desc=None, stdout_on_consolle = False):
        self.fileName=str(py_program_fileName)
        self.process = None
        self.cmd_list = ['python', self.fileName]
        if not parameters is None:
            self.cmd_list.extend(parameters)
        self.tmpOutput = None
        self.desc = desc or py_program_fileName
        self.stdout_on_consolle = stdout_on_consolle

    def isRunning(self):
        return (not self.process is None) and (self.process.poll() is None)
        
    def start(self):
        """ start the process as a python script file.
            if the process is running
                raise an Exception
        """
        if self.isRunning():
            raise Exception("Cannot start a running process")
        self.deleteTmp()
        self.tmpOutput = tempfile.mkstemp(TMP_POSTFIX)
        if self.stdout_on_consolle:
            self.process = subprocess.Popen(self.cmd_list)
        else:
            self.process = subprocess.Popen(self.cmd_list, stdout = self.tmpOutput[0])

    def deleteTmp(self):
        if self.tmpOutput:
            try:
                os.close(self.tmpOutput[0])
            except (OSError, TypeError):
                pass
            try:
                os.remove(self.tmpOutput[1])
            except (OSError, TypeError):
                pass
        self.tmpOutput = None
    
    def kill( self ):
        if self.isRunning():
            killProcess(self.process.pid)
        self.deleteTmp()
 
   


Maggiori informazioni sulla lista Python