[Python] Richiesta di consiglio...
Roberto Bettazzoni
ml a bettazzoni.it
Sab 20 Gen 2007 11:17:46 CET
Marco ha scritto:
> Salve
> io vorrei far lanciare a un programma python un programma esterno,
> se uso il comando
> status = os.system(cmd)
> python si ferma e resta in attesa del programma esterno.
> A me servirebbe invece che il programma esterno fosse indipendente ma
> recuperare il numero del processo per poterlo killare successimante.
>
> Qualcuo ha qualche consiglio.
> Grazie...
> --
> Marco M.
Ciao,
mi sono appena iscritto alla lista e premetto che non mi sento
abbastanza pitonico da dare consigli ... comunque ci provo.
Il problema l'ho risolto così (in allegato, con gli unittest).
E' specifico per lanciare un'altro programma Python, ma con poche
modifiche può lanciare qualsiasi altro programma.
Funziona solo con solo Python >= 2.4
Spero che ti sia utile,
(ovviamente ogni critica/consiglio/miglioramento è ben accetta)
Roberto
-------------- parte successiva --------------
#!/usr/bin/env python
#
# author: Roberto Bettazzoni
# date : 10/01/2006
#
import os, sys, time, unittest
from pyprocessrunner import *
PY_FILE_NAME_NOEND ="test1.py"
PY_FILE_NAME ="test2.py"
PY3_FILE_NAME ="test3.py"
HELLO_WORLD_PROGRAM = """
print "Hello World!"
"""
NEVERENDING_PROGRAM = """
import time
while 1:
print "never ending program"
time.sleep(0.1)
"""
ENDING_PROGRAM = """
import time, sys
print 'sleep for 0.1 sec.'
time.sleep(0.1)
print 'EoJ'
sys.exit(69) # it returns 69 to OS !!!
"""
ENDING_PROGRAM_WITH_2_INT_PARAMETERS = """
import time, sys
if len(sys.argv) != 3:
print "BAD PARAMETERS"
print sys.argv
print 'sleep for 10 sec.'
time.sleep(10.0)
a = int(sys.argv[1])
b = int(sys.argv[2])
c = a + b
print 'risultato = %d' % c
print 'sleep for 0.1 sec.'
time.sleep(0.1)
sys.exit(c) # return a + b to os !!!
"""
class File_pyprocessrunner_TestCase(unittest.TestCase):
"""
Unit test for class PyProcess
"""
def testConstructor(self):
p1= PyProcess(PY_FILE_NAME_NOEND)
self.assertEqual(p1.fileName, PY_FILE_NAME_NOEND)
self.assertEqual(p1.process, None)
self.assertEqual(p1.cmd_list, ['python', PY_FILE_NAME_NOEND])
self.assertFalse(p1.isRunning())
self.assertEqual(p1.desc, PY_FILE_NAME_NOEND)
p2= PyProcess(PY_FILE_NAME_NOEND, ["a", "b", "c"])
self.assertEqual(p2.fileName, PY_FILE_NAME_NOEND)
self.assertEqual(p2.process, None)
self.assertEqual(p2.cmd_list, ['python', PY_FILE_NAME_NOEND, "a", "b", "c"])
self.assertFalse(p2.isRunning())
self.assertEqual(p2.desc, PY_FILE_NAME_NOEND)
p3= PyProcess("A", None, "Descrizione")
self.assertEqual(p3.fileName, "A")
self.assertEqual(p3.process, None)
self.assertEqual(p3.cmd_list, ['python', "A"])
self.assertFalse(p3.isRunning())
self.assertEqual(p3.desc, "Descrizione")
self.assertFalse(p3.stdout_on_consolle)
p4= PyProcess("BCD", ("a",), "", True)
self.assertEqual(p4.fileName, "BCD")
self.assertEqual(p4.process, None)
self.assertEqual(p4.cmd_list, ['python', "BCD", "a"])
self.assertFalse(p4.isRunning())
self.assertEqual(p4.desc, "BCD")
self.assertTrue(p4.stdout_on_consolle)
def testProcessStandardOutput(self):
# crea il programma su file
f = file(PY_FILE_NAME, "wt")
f.write(HELLO_WORLD_PROGRAM)
f.close()
try:
p1= PyProcess(PY_FILE_NAME)
p1.start()
while (p1.isRunning()):
time.sleep(0.05)
f = file(p1.tmpOutput[1], "rt")
try:
self.assertEqual(f.read().strip(), "Hello World!")
finally:
f.close()
finally:
p1.kill() # effettua anche la delete dell'output
os.remove(PY_FILE_NAME)
def testLauncer(self):
# crea il programma su file
f = file(PY_FILE_NAME_NOEND, "wt")
f.write(NEVERENDING_PROGRAM)
f.close()
p1= PyProcess(PY_FILE_NAME_NOEND)
p2= PyProcess(PY_FILE_NAME_NOEND)
p1.start()
p2.start()
time.sleep(0.05)
p1.kill()
time.sleep(0.05)
p2.kill()
time.sleep(0.05)
try:
os.remove(PY_FILE_NAME_NOEND)
except:
pass
def testEndingProgramWith2Parameters(self):
# crea il programma su file
f = file(PY3_FILE_NAME, "wt")
f.write(ENDING_PROGRAM_WITH_2_INT_PARAMETERS)
f.close()
p3= PyProcess(PY3_FILE_NAME, [ "11", "2" ])
p3.start()
while (not p3.isRunning()):
pass
# qui il processo gira
while (p3.isRunning()):
pass
# qui no
self.assertEqual(p3.process.poll(), 13)
p3.kill() # deve essere safe !
time.sleep(0.05)
try:
os.remove(PY3_FILE_NAME)
except:
pass
def testEndingProgram(self):
# crea il programma su file
f = file(PY_FILE_NAME, "wt")
f.write(ENDING_PROGRAM)
f.close()
p1= PyProcess(PY_FILE_NAME)
p1.start()
while (not p1.isRunning()):
pass
# qui il processo gira
if (p1.isRunning()):
self.assertRaises(Exception, p1.start)
while (p1.isRunning()):
pass
# qui no
self.assertEqual(p1.process.poll(), 69)
p1.kill() # deve essere safe !
time.sleep(0.05)
try:
os.remove(PY_FILE_NAME)
except:
pass
if __name__ == "__main__":
unittest.main()
-------------- parte successiva --------------
#!/usr/bin/env python
#
# author: Roberto Bettazzoni
# date : 10/01/2006
#
import subprocess, tempfile, os
try:
import win32api
def killProcess(pid):
PROCESS_TERMINATE = 1
handle = win32api.OpenProcess(PROCESS_TERMINATE, False, pid)
win32api.TerminateProcess(handle, -1)
win32api.CloseHandle(handle)
except ImportError:
# it's not windows
from signal import SIGTERM
def killProcess(pid):
os.kill(pid, SIGTERM)
TMP_POSTFIX = "_PyProc.tmp"
def removeTmpFiles():
for f in os.listdir(tempfile.gettempdir()):
if f.find(TMP_POSTFIX) > 0:
try:
os.remove(f)
except OSError:
pass
class PyProcess:
def __init__(self, py_program_fileName, parameters = None, desc=None, stdout_on_consolle = False):
self.fileName=str(py_program_fileName)
self.process = None
self.cmd_list = ['python', self.fileName]
if not parameters is None:
self.cmd_list.extend(parameters)
self.tmpOutput = None
self.desc = desc or py_program_fileName
self.stdout_on_consolle = stdout_on_consolle
def isRunning(self):
return (not self.process is None) and (self.process.poll() is None)
def start(self):
""" start the process as a python script file.
if the process is running
raise an Exception
"""
if self.isRunning():
raise Exception("Cannot start a running process")
self.deleteTmp()
self.tmpOutput = tempfile.mkstemp(TMP_POSTFIX)
if self.stdout_on_consolle:
self.process = subprocess.Popen(self.cmd_list)
else:
self.process = subprocess.Popen(self.cmd_list, stdout = self.tmpOutput[0])
def deleteTmp(self):
if self.tmpOutput:
try:
os.close(self.tmpOutput[0])
except (OSError, TypeError):
pass
try:
os.remove(self.tmpOutput[1])
except (OSError, TypeError):
pass
self.tmpOutput = None
def kill( self ):
if self.isRunning():
killProcess(self.process.pid)
self.deleteTmp()
Maggiori informazioni sulla lista
Python