[PIPython] Credo di diventare pazzo

Valentino Volonghi aka Dialtone dialton3
Ven 19 Nov 2004 15:16:44 CET


Credo di essere li` li` per diventare pazzo...

Sto usando Twisted Matrix per creare un rss-aggregator ad altissime 
prestazioni (sicuramente superiori agli altri aggregators che ho visto 
in rete).
Ma tra me e la gloria assoluta (:P) si sta frapponendo un 'Ctrl+C' e 
qualche problemino di troppo...

Dunque, il problema principale e` che faccio partire 730 download di 
feed da vari indirizzi.

Tutto gira bene fino a circa il 400esimo. E da circa il 340esimo noto un 
certo rallentamento nella chain.
Arrivato al 400esimo comincia a rallentare sempre piu` fino a bloccarsi.
A questo punto usando Ctrl+C tutto si sblocca e l'esecuzione arriva fino 
alla fine (interrompendosi ovviamente) e lo
fa a una velocita` incredibile.

Il momento in cui si blocca coincide quasi perfettamente col momento in 
cui il carico sulla rete si azzera (perche` ormai ha
scaricato tutti i feed).

Sarebbe interessante risolvere il problema, perche` ora come ora sono 
riuscito a impiegare 4 minuti e 38 secondi per 730 feed, ovvero
circa 380 decimi di secondo per ogni feed (parsing compreso), eliminando 
questo rallentamento assurdo, sono sicuro di poter scendere sotto
i 4 minuti come minimo, e diventerebbe l'aggregator piu` potente di 
internet :P.

Tanto per fare un esempio, Straw (l'rss aggregator per GNOME) impiega 
almeno un minuto abbondante per scaricarne una ventina.
Io per scaricarne 35 impiego 12 secondi (compreso il parsing). E va in 
crisi con 200 feed (ma anche meno), nel senso che resetta la connessione 
e tante
cosette simili. Io riesco a portare a termine tutti e 730 feed, ma ho 
questo problema bastardo del Ctrl+C...

Come parser utilizzo lo Universal Feed Parser di Mark Pilgrim, 
scaricabile dal suo sito, che non mi ha dato nessun problema.

Non capendo dove stesse il problema ho provato a installare uno spewer 
come 'debugger' e ho ottenuto questo:

function callWithLogger in 
/usr/lib/python2.3/site-packages/twisted/python/log.py, line 54
method logPrefix of twisted.internet.tcp.Client at 1085149132
function callWithContext in 
/usr/lib/python2.3/site-packages/twisted/python/log.py, line 49
method getContext of twisted.python.context.ContextTracker at 1081585292
method callWithContext of twisted.python.context.ContextTracker at 
1081585292
method _doReadOrWrite of twisted.internet.default.SelectReactor at 
1077521228
method doRead of twisted.internet.tcp.Client at 1085149132
method fileno of socket._socketobject at 1085157164
method removeReader of twisted.internet.default.SelectReactor at 1077521228
method removeWriter of twisted.internet.default.SelectReactor at 1077521228
method connectionLost of twisted.internet.tcp.Client at 1085149132
method connectionLost of twisted.internet.tcp.Client at 1085149132
method connectionLost of twisted.internet.tcp.Client at 1085149132
method _closeSocket of twisted.internet.tcp.Client at 1085149132
method shutdown of socket._socketobject at 1085157164
method connectionLost of twisted.web.client.HTTPPageGetter at 1086481388
method connectionLost of twisted.web.client.HTTPPageGetter at 1086481388
method handleResponseEnd of twisted.web.client.HTTPPageGetter at 1086481388
method noPage of twisted.web.client.HTTPClientFactory at 1085148876
method connectionLost of twisted.internet.tcp.Connector at 1085149100
method clientConnectionLost of twisted.web.client.HTTPClientFactory at 
1085148876
method doStop of twisted.web.client.HTTPClientFactory at 1085148876
method __repr__ of twisted.web.client.HTTPClientFactory at 1085148876
method msg of twisted.python.log.LogPublisher at 1081585612
method getContext of twisted.python.context.ContextTracker at 1081585292
method _emit of twisted.python.log.DefaultObserver at 1081585644
method stopFactory of twisted.web.client.HTTPClientFactory at 1085148876
method runUntilCurrent of twisted.internet.default.SelectReactor at 
1077521228
method timeout of twisted.internet.default.SelectReactor at 1077521228
method doSelect of twisted.internet.default.SelectReactor at 1077521228
method fileno of socket._socketobject at 1086221044
method fileno of socket._socketobject at 1086589444
method fileno of socket._socketobject at 1085552996
method fileno of socket._socketobject at 1086453556
[UN SACCO DI QUESTI]
method fileno of socket._socketobject at 1086477372
method fileno of socket._socketobject at 1086623612
method fileno of socket._socketobject at 1085203316
method fileno of socket._socketobject at 1085733820

##@#@#@ Ctrl+C

[ANCORA UN PO' DI QUELLI SOPRA]

E dopo questo punto si va dritti fino alla fine a velocita` warp.

Il blocco avviene a caso su un qualsiasi feed dal 400esimo in su.

Il codice lo trovato in allegato, e` MOLTO commentato, anche perche` non 
capendo dove stesse il problema ho fatto
pesante refactoring e poi lo sto scrivendo anche per un mio amico (e 
quindi deve capire cosa faccio col codice).

Lo so che gli utilizzatori e fruitori del magico mondo di twisted sono 
pochi la fuori, ma se qualcuno sa darmi una mano...
Lo scongiuro in ginocchio!!!!

PS: Ho chiesto sulla ML di twisted, ma non sembra che abbia sortito 
effetti positivi purtroppo :(
Il massimo che mi e` stato detto e` che potrebbe essere colpa del 
resolver di sistema che e` bloccante, mi e` stato
consigliato di sostituirlo con il resolver di twisted, ma ha un baco... 
sic...

Se volete il file con la lista dei feed e il parser ve li mando 
tranquillamente.

-- 
Valentino Volonghi aka Dialtone
Linux User #310274, Gentoo Proud User
X Python Newsreader developer
http://sourceforge.net/projects/xpn/

-------------- parte successiva --------------
from twisted.internet import reactor, protocol, defer
#from twisted.names import client
#reactor.installResolver(client.createResolver())
from twisted.python.util import spewer
from twisted.web import client
import feedparser, time, out, sys
#sys.settrace(spewer)

#rss_feeds = out.rss_feed
# This is the default site list
rss_feeds = [('http://www.nongnu.org/straw/news.rss','straw'),
             ('http://googlenews.74d.com/rss/google_it.rss','google'),
             ('http://www.pythonware.com/daily/rss.xml','pythonware'),
             ('http://www.theinquirer.net/inquirer.rss','inq'),
             ('http://www.groklaw.net/backend/GrokLaw.rdf','grok'),
             ('http://www.livejournal.com/users/moshez/data/rss','zadka'),
             ('http://www.pythonware.com/news.rdf','pwn'),
             ('http://www.docuverse.com/blog/donpark/rss.xml','780'),
             ('http://www.gawker.com/index.xml','781'),
             ('http://www.gizmodo.net/index.xml','782'),
             ('http://www.gotdotnet.com/team/dbox/rss.aspx','783'),
             ('http://www.hutteman.com/weblog/rss.xml','784'),
             ('http://www.hyperorg.com/blogger/index.rdf','785'),
             ('http://www.infoworld.com/rss/news.rdf','787'),
             ('http://www.intertwingly.net/blog/index.rss','790'),
             ('http://www.itfacts.biz/index.xml','791'),
             ('http://www.joelonsoftware.com/rss.xml','792'),
             ('http://www.kottke.org/index.rdf','794'),
             ('http://www.kottke.org/remainder/index.rdf','795'),
             ('http://www.lessig.org/blog/index.rdf','797'),
             ('http://www.lockergnome.com/lockergnome.xml','798'),
             ('http://www.macintouch.com/rss.xml','799'),
             ('http://www.macmerc.com/index.rss','800'),
             ('http://www.macminute.com/headlines.xml','801'),
             ('http://www.macosxhints.com/backend/geeklog.rdf','802'),
             ('http://www.macrumors.com/macrumors.xml','803'),
             ('http://www.megnut.com/index.xml','804'),
             ('http://www.mezzoblue.com/rss/index.xml','805'),
             ('http://www.moskalyuk.com/blog/index.rdf','806'),
             ('http://www.nwfusion.com/netflash.rss','807'),
             ('http://www.ozzie.net/blog/rss.xml','809'),
             ('http://www.scripting.com/rss.xml','812'),
             ('http://www.simplebits.com/xml/rss.xml','814'),
             ('http://www.sixapart.com/log/index.rdf','816'),
             ('http://www.smartmobs.com/index.rdf','817')
            ]

# michele a berthold.com

INTER_QUERY_TIME = 300

class FeederProtocol(object):
    def __init__(self):
        self.parsed = 1
        self.with_errors = 0
        self.error_list = []
        # This dict structure will be the following:
        # { 'URL': (TIMESTAMP, value) }
        self.cache = {}
        
    def startDownloading(self, site):
        #print "Looking if",site[0],"cached...",
        
        # Try to get the tuple (TIMESTAMP, FEED_STRUCT) from the dict if it has
        # already been downloaded. Otherwise assign None to already_got
        already_got = self.cache.get(site[0], None)

        # Ok guys, we got it cached, let's see what we will do
        if already_got:
            # Well, it's cached, but will it be recent enough?
            elapsed_time = time.time() - already_got[0]
            
            # Woooohooo it is, elapsed_time is less than INTER_QUERY_TIME so I
            # can get the page from the memory, recent enough
            if elapsed_time < INTER_QUERY_TIME:
                return True
            
            else:    
                # Uhmmm... actually it's a bit old, I'm going to get it from the
                # Net then, then I'll parse it and then I'll try to memoize it
                # again
                return False
            
        else: 
            # Well... We hadn't it cached in, so we need to get it from the Net
            # now, It's useless to check if it's recent enough, it's not there.
            return False

    def gotError(self, traceback, extra_args):
        # An Error as occurred, print traceback infos and go on
        print traceback
        self.with_errors += 1
        self.error_list.append(extra_args)
        print "="*20
        print "Trying to go on..."
        
    def getPageFromMemory(self, key=None):
        # Getting the second element of the tuple which is the parsed structure
        # of the feed at address key, the first element of the tuple is the
        # timestamp
        return defer.succeed(self.cache.get(key,key)[1])

    def parseFeed(self, feed):
        # This is self explaining :)
        print "parsing..."
        parsed = feedparser.parse(feed)
        print "parsed feed"
        return parsed
   
    def memoize(self, feed, addr):
        # feed is the raw structure, just as returned from feedparser.parse()
        # while addr is the address from which the feed was got.
        print "Memoizing",addr,"..."
        self.cache.setdefault(addr, (time.time(),feed))
        return feed
    
    def workOnPage(self, parsed_feed, addr):
        # As usual, addr is the feed address and file is the file in
        # which you can eventually save the structure.
        print "-"*20
        print "finished retrieving"
        print "Feed Version:",parsed_feed.get('version','Unknown')
        
        #
        #  Uncomment the following if you want to print the feeds
        #
        chan = parsed_feed.get('channel', None)
        if chan:
            print chan.get('title', '')
            #print chan.get('link', '')
            #print chan.get('tagline', '')
            #print chan.get('description','')
        print "-"*20
        #items = parsed_feed.get('items', None)
        #if items:
        #    for item in items:
        #        print '\tTitle: ', item.get('title','')
        #        print '\tDate: ', item.get('date', '')
        #        print '\tLink: ', item.get('link', '')
        #        print '\tDescription: ', item.get('description', '')
        #        print '\tSummary: ', item.get('summary','')
        #        print "-"*20
        #print "got",addr
        #print "="*40
        
    def stopWorking(self, data=None):
        print "Closing connection number %d..."%(self.parsed,)
        print "=-"*20
        
        # This is here only for testing. When a protocol/interface will be
        # created to communicate with this rss-aggregator server, we won't need
        # to die after we parsed some feeds just one time.
        self.parsed += 1
        print self.parsed,  len(rss_feeds)
        if self.parsed > len(rss_feeds):
            print "Closing all..."
            print self.with_errors
            #for i in self.cache:
            #    print i
            #print time.time()-tp
            for i in self.error_list:
                print i
            reactor.stop()

    def getFeeds(self, where=None):
        #print "getting feeds"
        # This is to get the feeds we want
        if not where: # We don't have a database, then we use the local
                      # variabile rss_feeds
            return rss_feeds
        else: return None
        
    def start(self, data=None):
        # Here we gather all the urls for the feeds
        #self.factory.tries += 1
        for feed in self.getFeeds():
        
            # Now we start telling the reactor that it has
            # to get all the feeds one by one...
            cached = self.startDownloading(feed)
            if not cached: 
                # When the feed is not cached, it's time to
                # go and get it from the web directly
                d = client.getPage(feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting'))
                
                # Parse the feed and if there's some errors call self.gotError
                d.addCallback(self.parseFeed)
                d.addErrback(self.gotError, (feed[0], 'parsing'))
                
                # Now memoize it, if there's some error call self.getError
                d.addCallback(self.memoize, feed[0])
                d.addErrback(self.gotError, (feed[0], 'memoizing'))
                
            else: # If it's cached
                d = self.getPageFromMemory(feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting from memory'))
            
            # When you get the raw structure you can work on
            # to format it in the best way you can think of.
            # For any error call self.gotError.
            d.addCallback(self.workOnPage, feed[0])
            d.addErrback(self.gotError, (feed[0], 'working on page'))
            
            # And when the for loop is ended we put 
            # stopWorking on the callback for the last 
            # feed gathered
            d.addCallback(self.stopWorking)
            d.addErrback(self.gotError, (feed[0], 'while stopping'))

        # This is to try the memoize feature
        #if self.factory.tries<3:
        #    d.addCallback(self.start)    

class FeederFactory(protocol.ClientFactory):
    protocol = FeederProtocol()
    def __init__(self):
        # tries is used to make more connection to use the
        # memoizing feature
        #self.tries = 0
        
        # Here we put in the FeederProtocol instance a reference to
        # FeederFactory under the name of self.factory (seen from protocol)
        self.protocol.factory = self
        self.protocol.start()

f = FeederFactory()
#tp = time.time()
reactor.run()


More information about the Python mailing list