[Python] Domande sulla xmlrpclib.ServerProxy e sugli unittest
Roberto Bettazzoni
ml a bettazzoni.it
Ven 24 Ago 2007 14:21:46 CEST
Ciao a tutti,
avrei due domande da sottoporre a questo nobile convivio di serpidi
La risposta al Vs. buon cuore
In ogni caso, grazie :-)
1.
E' possibile dotare la xmlrpclib.ServerProxy di un timeout senza:
- dover estendere le classi httplib.HTTPConnection, httplib.HTTP
e xmlrpclib.Transport. (come fatto sul file xmlrpc.py in allegato)
- usare la socket.setdefaulttimeout()
sarebbe semplice, ma non posso permettermelo: il programma è
multi-thread ed usa le socket anche per altro. Inserire dei lock
per sincronizzare il tutto mi sembra molto più "brutto" della
soluzione che ho trovato.
- usare le Twisted: il programma client è multi-thread,
cambiarlo tutto ora è fuori questione (anche se ci sto pensando)
Insomma, ho cercato su internet e non ho trovato di meglio
di questo, ma mi sembra complesso e poco pulito.
I miei sensi di ragno vibrano, sento odore di bugs
... ho "perso un pezzo"? (aka: c'è una via semplice?)
... qualcuno ha un'idea migliore o diversa?
2.
Sui test in allegato
Dovendo testare il funzionamento di un oggetto di questo genere
(client-server) il modo che ho utilizzato mi sembra troppo
complesso e propenso a generare ad errori (soprattutto su s.o.
diversi).
Dato che per poter testare _funzionalmente_ un server devo
istanziarlo in un thread/processo diverso e provare ad accedervi,
non ho trovato un metodo più semplice che istanziarlo nel setUp()
e rimuoverlo nel tearDown().
La libreria "nose"
http://somethingaboutorange.com/mrl/projects/nose/
risolve il problema: permette di definire un setUp e tearDown a
livello di modulo e di classe.
Mi domandavo se esiste un metodo per fare ciò con le librerie
"standard" in maniera pulita.
Nota per i puristi dei test unitari.
A dispetto dell'utilizzo del modulo "unittest", i test in allegato
non sono unitari ma funzionali: testano il funzionamento complessivo,
non di una sua piccola parte. Ed è proprio quello che volevo fare
;-)
Scusate la lunghezza,
Roberto
p.s.
--- Perchè tanta sofferenza?
--- Il dettaglio per i curiosi :-)
Il problema che il codice in allegato deve risolvere
è di base semplice: fornire la libreria per effettuare
una chiamata XML-RPC, però sono dati questi vincoli
- sia il client che il server possono essere bloccati
da "eventi non prevedibili", ma non si devono mai
bloccare reciprocamente per un tempo indefinito.
- il codice deve funzionare su s.o. diversi.
- il client può essere invocato da un ambiente multi-thread
In particolare:
- il server accede a dei DB in rete, i suoi tempi di
risposta possono variare considerevolmente.
- il codice client è utilizzato da diversi processi e/o
Thread, su s.o. diversi, alcuni dei quali comunicano su
connessioni che possono staccarsi in ogni momento senza
inviare la chiusura TCP (es: connessioni da cellulare).
Il server o il client se ne possono accorgere solo con
un timeout di qualche tipo.
-------------- parte successiva --------------
#!/usr/bin/env python
__author__="Roberto Bettazzoni"
import socket
### XML-RPC Server
from SimpleXMLRPCServer import SimpleXMLRPCServer
class XMLRPCServer(SimpleXMLRPCServer):
""" Server XML-RPC
Su 'host' diventano disponibili tutti i metodi non privati dell'istanza
'publicMethods_instance'.
La chiamata viene gestita sincronamente, con il timeout 'recv_socket_timeout'
sulla socket di ricezione (nessun chiamante puo' bloccare con l'invio
il server per piu' di quei secondi, di default 1.0)
"""
def __init__(self, host , publicMethods_instance, recv_socket_timeout=1.0):
SimpleXMLRPCServer.__init__(self, host)
self.register_instance(publicMethods_instance)
self.recv_socket_timeout = recv_socket_timeout
def server_bind(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
SimpleXMLRPCServer.server_bind(self)
def handle_request(self):
"""Handle one request, possibly blocking."""
try:
request, client_address = self.get_request()
except socket.error:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except socket.timeout:
self.close_request(request)
except:
self.handle_error(request, client_address)
self.close_request(request)
def get_request(self):
while 1:
try:
sock, addr = self.socket.accept()
sock.settimeout(self.recv_socket_timeout)
return (sock, addr)
except socket.timeout:
pass
### XML-RPC Client
import xmlrpclib, httplib
class TimeoutedHTTPConnection(httplib.HTTPConnection):
connection_timeout = None
def connect(self):
"""override of the standard lib.
original comment:
Connect to the host and port specified in __init__."""
msg = "getaddrinfo returns an empty list"
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
# ------- TaZ hack
if self.connection_timeout:
self.sock.settimeout(self.connection_timeout)
# ------- end hack
if self.debuglevel > 0:
print "connect: (%s, %s)" % (self.host, self.port)
self.sock.connect(sa)
except socket.error, msg:
if self.debuglevel > 0:
print 'connect fail:', (self.host, self.port)
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
raise socket.error, msg
class TimeoutedHTTP(httplib.HTTP):
_connection_class = TimeoutedHTTPConnection
class TimeoutedTransport(xmlrpclib.Transport):
def make_connection(self, host):
# create a HTTP connection object from a host descriptor
host, extra_headers, x509 = self.get_host_info(host)
http = TimeoutedHTTP(host)
return http
class XMLRPCClient(xmlrpclib.ServerProxy):
""" Identico alla xmlrpclib.ServerProxy(uri) con l'aggiunta di un timeout
su tutte le comunicazioni della socket.
Attenzione per il m-threading: utilizza un campo "di classe".
Da verificare.
Please, RTFC
"""
def __init__(self, uri, timeout = 5.0):
TimeoutedHTTPConnection.connection_timeout = timeout
xmlrpclib.ServerProxy.__init__(self, uri,
transport=TimeoutedTransport())
-------------- parte successiva --------------
#!/usr/bin/env python
__author__="Roberto Bettazzoni"
import unittest, time, xmlrpclib, socket
from threading import Thread, Event
from xmlrpc import *
class DummyXmlrpcMethods:
def __init__(self):
self.stop = False
def echo(self, x):
return x
def add(self, x, y):
return x+y
def wait(self, wait_time_in_seconds):
wait_time = float(wait_time_in_seconds)
time.sleep(wait_time)
return 0
def stopServer(self):
self.stop = True
return 0
class ThreadedServer(XMLRPCServer, Thread):
def __init__(self, host, recv_timeout):
Thread.__init__(self)
self.ext_methods = DummyXmlrpcMethods()
XMLRPCServer.__init__(self, host, self.ext_methods, recv_timeout)
self.logRequests = 0 # this turns off log messages
self.evt_running=Event()
def run(self):
self.evt_running.set()
while not self.ext_methods.stop:
self.handle_request()
self.socket.close() # needed for *nix lib
def handle_error(self, request, client_address):
""" (The default is to print a traceback and continue.)
Do nothing
"""
pass
SERVER_PORT = 10069
SERVER_SOCKET_TIMEOUT = 0.05
class XMLRPCServer_Core_TestCase(unittest.TestCase):
def test_server_timeout(self):
srv = ThreadedServer(('', SERVER_PORT), SERVER_SOCKET_TIMEOUT)
srv.start()
srv.evt_running.wait() # wait for server
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(SERVER_SOCKET_TIMEOUT * 3) # client timeout > server timeout
try:
s.connect(("127.0.0.1",SERVER_PORT))
s.sendall("POST ")
self.assertEqual(s.recv(1024), "") # blocking, the server has to close
finally:
s.close()
finally:
xmlrpclib.ServerProxy('http://localhost:%d' % SERVER_PORT).stopServer()
srv.join()
class XMLRPCServer_Methods_TestCase(unittest.TestCase):
def setUp(self):
self.srv = ThreadedServer(('', SERVER_PORT), SERVER_SOCKET_TIMEOUT)
self.srv.start()
self.srv.evt_running.wait() # wait for server
self.testObj = xmlrpclib.ServerProxy('http://localhost:%d' % SERVER_PORT)
def tearDown(self):
self.testObj.stopServer()
self.srv.join()
def test_echo(self):
for x in (1, 2.0, "abc", [1,2,3], {"int":1, "list": [1,2,3]}):
self.assertEqual(self.testObj.echo(x), x)
def test_add(self):
self.assertEqual(self.testObj.add(1,2), 3)
self.assertEqual(self.testObj.add(1.0,2.0), 3.0)
self.assertEqual(self.testObj.add("a","b"), "ab")
def test_not_existing_method(self):
self.assertRaises(Exception, self.testObj.not_existing_method)
self.assertRaises(Exception, self.testObj.not_existing_method, 1)
self.assertRaises(Exception, self.testObj.not_existing_method, "")
CLIENT_RPC_TIMEOUT = 0.05
class XMLRPC_Client_TestCase(XMLRPCServer_Methods_TestCase):
""" repeat all the test of XMLRPCServer_Methods_TestCase but:
- use XMLRPCClient instead of the standard ServerProxy()
- test the client timeout (too much time to read a response from server)
"""
def setUp(self):
self.srv = ThreadedServer(('', SERVER_PORT), SERVER_SOCKET_TIMEOUT)
self.srv.start()
self.srv.evt_running.wait() # wait for server
self.testObj = XMLRPCClient('http://localhost:%d' % SERVER_PORT, CLIENT_RPC_TIMEOUT)
def tearDown(self):
self.testObj.stopServer()
self.srv.join()
def test_client_timeout(self):
self.assertEqual(self.testObj.wait(CLIENT_RPC_TIMEOUT / 5), 0)
self.assertRaises(socket.timeout, self.testObj.wait, CLIENT_RPC_TIMEOUT * 3)
time.sleep(CLIENT_RPC_TIMEOUT * 2) # wait for server's sleep
if __name__ == '__main__':
unittest.main()
Maggiori informazioni sulla lista
Python