[Python] Domande sulla xmlrpclib.ServerProxy e sugli unittest

Roberto Bettazzoni ml a bettazzoni.it
Ven 24 Ago 2007 14:21:46 CEST


Ciao a tutti,

avrei due domande da sottoporre a questo nobile convivio di serpidi
La risposta al Vs. buon cuore
In ogni caso, grazie :-)

1.
E' possibile dotare la xmlrpclib.ServerProxy di un timeout senza:
   - dover estendere le classi httplib.HTTPConnection, httplib.HTTP
e xmlrpclib.Transport. (come fatto sul file xmlrpc.py in allegato)
   - usare la socket.setdefaulttimeout()
sarebbe semplice, ma non posso permettermelo: il programma è
multi-thread ed usa le socket anche per altro. Inserire dei lock
per sincronizzare il tutto mi sembra molto più "brutto" della
soluzione che ho trovato.
   - usare le Twisted: il programma client è multi-thread,
cambiarlo tutto ora è fuori questione (anche se ci sto pensando)

Insomma, ho cercato su internet e non ho trovato di meglio
di questo, ma mi sembra complesso e poco pulito.
I miei sensi di ragno vibrano, sento odore di bugs
... ho "perso un pezzo"? (aka: c'è una via semplice?)
... qualcuno ha un'idea migliore o diversa?


2.
Sui test in allegato
Dovendo testare il funzionamento di un oggetto di questo genere
(client-server) il modo che ho utilizzato mi sembra troppo
complesso e propenso a generare ad errori (soprattutto su s.o.
diversi).
Dato che per poter testare _funzionalmente_ un server devo
istanziarlo in un thread/processo diverso e provare ad accedervi,
non ho trovato un metodo più semplice che istanziarlo nel setUp()
e rimuoverlo nel tearDown().
La libreria "nose"
http://somethingaboutorange.com/mrl/projects/nose/
risolve il problema: permette di definire un setUp e tearDown a
livello di modulo e di classe.
Mi domandavo se esiste un metodo per fare ciò con le librerie
"standard" in maniera pulita.

Nota per i puristi dei test unitari.
A dispetto dell'utilizzo del modulo "unittest", i test in allegato
non sono unitari ma funzionali: testano il funzionamento complessivo,
non di una sua piccola parte. Ed è proprio quello che volevo fare
;-)


Scusate la lunghezza,
Roberto




p.s.
--- Perchè tanta sofferenza?
--- Il dettaglio per i curiosi :-)
Il problema che il codice in allegato deve risolvere
è di base semplice: fornire la libreria per effettuare
una chiamata XML-RPC, però sono dati questi vincoli
  - sia il client che il server possono essere bloccati
    da "eventi non prevedibili", ma non si devono mai
    bloccare reciprocamente per un tempo indefinito.
  - il codice deve funzionare su s.o. diversi.
  - il client può essere invocato da un ambiente multi-thread

In particolare:
- il server accede a dei DB in rete, i suoi tempi di
risposta possono variare considerevolmente.
- il codice client è utilizzato da diversi processi e/o
Thread, su s.o. diversi, alcuni dei quali comunicano su
connessioni che possono staccarsi in  ogni momento senza
inviare la chiusura TCP (es: connessioni da cellulare).
Il server o il client se ne possono accorgere solo con
un timeout di qualche tipo.

-------------- parte successiva --------------
#!/usr/bin/env python

__author__="Roberto Bettazzoni"

import socket

### XML-RPC Server
from SimpleXMLRPCServer import SimpleXMLRPCServer

class XMLRPCServer(SimpleXMLRPCServer):
    """ Server XML-RPC
        Su 'host' diventano disponibili tutti i metodi non privati dell'istanza
        'publicMethods_instance'.
        La chiamata viene gestita sincronamente, con il timeout 'recv_socket_timeout'
        sulla socket di ricezione (nessun chiamante puo' bloccare con l'invio
        il server per piu' di quei secondi, di default 1.0)
    """
    def __init__(self, host , publicMethods_instance, recv_socket_timeout=1.0):
        SimpleXMLRPCServer.__init__(self, host)
        self.register_instance(publicMethods_instance)
        self.recv_socket_timeout = recv_socket_timeout
        
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        SimpleXMLRPCServer.server_bind(self)

    def handle_request(self):
        """Handle one request, possibly blocking."""
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except socket.timeout:
                self.close_request(request)
            except:
                self.handle_error(request, client_address)
                self.close_request(request)

    def get_request(self):
        while 1:
            try:
                sock, addr = self.socket.accept()
                sock.settimeout(self.recv_socket_timeout)
                return (sock, addr)
            except socket.timeout:
                pass


### XML-RPC Client

import xmlrpclib, httplib

class TimeoutedHTTPConnection(httplib.HTTPConnection):
    connection_timeout = None
    def connect(self):
        """override of the standard lib.
            original comment:
            Connect to the host and port specified in __init__."""
        msg = "getaddrinfo returns an empty list"
        for res in socket.getaddrinfo(self.host, self.port, 0,
                                      socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            try:
                self.sock = socket.socket(af, socktype, proto)
                # ------- TaZ hack
                if self.connection_timeout:
                    self.sock.settimeout(self.connection_timeout)
                # ------- end hack
                if self.debuglevel > 0:
                    print "connect: (%s, %s)" % (self.host, self.port)
                self.sock.connect(sa)
            except socket.error, msg:
                if self.debuglevel > 0:
                    print 'connect fail:', (self.host, self.port)
                if self.sock:
                    self.sock.close()
                self.sock = None
                continue
            break
        if not self.sock:
            raise socket.error, msg
    
class TimeoutedHTTP(httplib.HTTP):
    _connection_class = TimeoutedHTTPConnection

class TimeoutedTransport(xmlrpclib.Transport):        
    def make_connection(self, host):
        # create a HTTP connection object from a host descriptor
        host, extra_headers, x509 = self.get_host_info(host)
        http = TimeoutedHTTP(host)
        return http

class XMLRPCClient(xmlrpclib.ServerProxy):
    """ Identico alla xmlrpclib.ServerProxy(uri) con l'aggiunta di un timeout
        su tutte le comunicazioni della socket.
        Attenzione per il m-threading: utilizza un campo "di classe".
        Da verificare.
        Please, RTFC
    """
    def __init__(self, uri, timeout = 5.0):
        TimeoutedHTTPConnection.connection_timeout = timeout
        xmlrpclib.ServerProxy.__init__(self,  uri,
                                       transport=TimeoutedTransport())
  

    
-------------- parte successiva --------------
#!/usr/bin/env python

__author__="Roberto Bettazzoni"

import unittest, time, xmlrpclib, socket
from threading import Thread, Event
from xmlrpc import *

class DummyXmlrpcMethods:
    def __init__(self):
        self.stop = False

    def echo(self, x):
        return x

    def add(self, x, y):
        return x+y

    def wait(self, wait_time_in_seconds):
        wait_time = float(wait_time_in_seconds)
        time.sleep(wait_time)
        return 0

    def stopServer(self):
        self.stop = True
        return 0


class ThreadedServer(XMLRPCServer, Thread):
    def __init__(self, host, recv_timeout):
        Thread.__init__(self)
        self.ext_methods = DummyXmlrpcMethods()
        XMLRPCServer.__init__(self, host, self.ext_methods, recv_timeout)
        self.logRequests = 0 # this turns off log messages
        self.evt_running=Event()
        
    def run(self):
        self.evt_running.set()
        while not self.ext_methods.stop:
            self.handle_request()
        self.socket.close() # needed for *nix lib

    def handle_error(self, request, client_address):
        """ (The default is to print a traceback and continue.)
            Do nothing
        """
        pass




SERVER_PORT = 10069
SERVER_SOCKET_TIMEOUT = 0.05

class XMLRPCServer_Core_TestCase(unittest.TestCase):
    def test_server_timeout(self):
        srv = ThreadedServer(('', SERVER_PORT), SERVER_SOCKET_TIMEOUT)
        srv.start()
        srv.evt_running.wait() # wait for server
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(SERVER_SOCKET_TIMEOUT * 3) # client timeout > server timeout
            try:
                s.connect(("127.0.0.1",SERVER_PORT))
                s.sendall("POST ")                  
                self.assertEqual(s.recv(1024), "")  # blocking, the server has to close
            finally:
                s.close()
        finally:
            xmlrpclib.ServerProxy('http://localhost:%d' % SERVER_PORT).stopServer()
            srv.join()

class XMLRPCServer_Methods_TestCase(unittest.TestCase):
    def setUp(self):
        self.srv = ThreadedServer(('', SERVER_PORT), SERVER_SOCKET_TIMEOUT)
        self.srv.start()
        self.srv.evt_running.wait() # wait for server
        self.testObj = xmlrpclib.ServerProxy('http://localhost:%d' % SERVER_PORT)

    def tearDown(self):
        self.testObj.stopServer()
        self.srv.join()
    
    def test_echo(self):
        for x in (1, 2.0, "abc", [1,2,3], {"int":1, "list": [1,2,3]}):
            self.assertEqual(self.testObj.echo(x), x)

    def test_add(self):
        self.assertEqual(self.testObj.add(1,2),      3)
        self.assertEqual(self.testObj.add(1.0,2.0),  3.0)
        self.assertEqual(self.testObj.add("a","b"),  "ab")

    def test_not_existing_method(self):
        self.assertRaises(Exception, self.testObj.not_existing_method)
        self.assertRaises(Exception, self.testObj.not_existing_method, 1)
        self.assertRaises(Exception, self.testObj.not_existing_method, "")


CLIENT_RPC_TIMEOUT = 0.05

class XMLRPC_Client_TestCase(XMLRPCServer_Methods_TestCase):
    """ repeat all the test of XMLRPCServer_Methods_TestCase but:
        - use XMLRPCClient instead of the standard ServerProxy()
        - test the client timeout (too much time to read a response from server)
    """
    def setUp(self):
        self.srv = ThreadedServer(('', SERVER_PORT), SERVER_SOCKET_TIMEOUT)
        self.srv.start()
        self.srv.evt_running.wait() # wait for server
        self.testObj = XMLRPCClient('http://localhost:%d' % SERVER_PORT, CLIENT_RPC_TIMEOUT)
        
    def tearDown(self):
        self.testObj.stopServer()
        self.srv.join()
    
    def test_client_timeout(self):
        self.assertEqual(self.testObj.wait(CLIENT_RPC_TIMEOUT / 5), 0)
        self.assertRaises(socket.timeout, self.testObj.wait, CLIENT_RPC_TIMEOUT * 3)
        time.sleep(CLIENT_RPC_TIMEOUT * 2) # wait for server's sleep

if __name__ == '__main__':
    unittest.main()


Maggiori informazioni sulla lista Python