[Python] algoritmo in cerca di design pattern

Francesco Pischedda francesco.pischedda a gmail.com
Dom 11 Feb 2018 10:11:04 CET


ATTENZIONE! segue post lungo e pipposo

Salve lista,

questa settimana mi sono imbattuto in una parte di codice "legacy" che si
occupa di analizzare
un flusso di dati ed estrarre degli eventi che hanno un inizio e una fine
(tra le altre cose);
a questo codice andava aggiunto un controllo per verificare che l'evento
fosse stato
registrato in maniera corretta a causa di un bug della sorgente dati. Per
incasinare
ancora di più la situazione, la verifica dell'evento richiede l'analisi di
dati registrati
dopo la fine dell'evento. Provo a spiegare meglio il flusso con del codice
di esempio:

# pseudo codice "originale" senza analisi dell'evento
def extract_events(data_store, start_date, end_date):
    datagrams = data_store.get_datagrams(start_date, end_date)

    for event in extract_events_from_datagrams(datagrams):
        yield clean_event(event)

def extract_events_from_datagrams(datagrams):
    for d in datagrams:
        # logica di estrazione dell'evento molto complessa riassunta con
        if not interesting_data(d):
            continue
        if event_start_detected:
            event = Event(d)
        if questo and quel_altro:
            accumulate_event_data(event, d)
        elif altre_condizioni: # ripetere per molte altre condizioni
            cancel_event(event)
            event = None
        if event_end_detected:
            yield event


# pseudo codice con l'analisi a posteriori per la verifica dell'evento
def extract_events(data_store, start_date, end_date,
post_process_check=False):
    # in questo caso si legge tutto il dataset in memoria
    datagrams = list(data_store.get_datagrams(start_date, end_date))

    for event in extract_events_from_datagrams(datagrams):
        if post_process_check:
            event = post_process(event, datagram)
        if event is None:
            continue
        event = clean_event(event)
        if event:
            yield event

def extract_events_from_datagrams(datagrams):
    # si tiene traccia dell'indice di ogni dato per usarlo poi come
    # punto di partenza nel post processing dell'evento
    for idx, d in enumerate(datagrams):
        # logica di estrazione dell'evento molto complessa riassunta con
        if not interesting_data(d):
            continue
        if event_start_detected:
            # si crea l'evento tenendo traccia dell'indice del primo dato
            # associabile all'evento
            event = Event(d, first_datagram=idx)
        if questo and quel_altro:
            accumulate_event_data(event, d)
        if event_end_detected:
            yield event

# verifica la presenza di alcuni tipo di datagram che si possono essere
# presentati poco dopo l'inizio dell'evento e poco dopo la fine dell'evento
# restituisce None se l'evento va scartato altrimenti restituisce l'evento
stesso
def post_process(event, datagrams):
    inital_check_slot_start = even.start_timestamp + timedelta(seconds=50)
    inital_check_slot_end = even.start_timestamp + timedelta(seconds=100)

    final_check_slot_start = even.end_timestamp - timedelta(seconds=50)
    final_check_slot_end = even.end_timestamp + timedelta(seconds=50)

    for d in datagrams[event.first_datagram_idx: ]:

        if initial_check_slot_start <= d.recorded_at <=
initial_check_slot_end:
            # si verifica la presenza di varie condizioni legate al dato
            if should_discard_event_intial_slot(d):
                return None

        if final_check_slot_start <= d.recorded_at <= final_check_slot_end:
            # si verifica la presenza di varie condizioni legate al dato
            if should_discard_event_final_slot(d):
                return None

        if d.recorded_at > final_check_slot_end:
            break

    return event


Questo approccio funziona solo perché il dataset di partenza ha una
dimensione
ragionevole, nell'ordine di qualche decina di megabyte, e perché questo
post
processing viene richiamato manualmente solo in certe occasioni quindi il
rischio di
esaurire la memoria del processo generale, a causa di molteplici chiamate
in parallelo,
è molto basso.

Con questo finiamo il mega pippone introduttivo e passiamo al algoritmo in
oggetto per il
quale sono alla ricerca di un design pattern che lo descriva (sono sicuro
che esiste ma
vai e trovalo...)

Ora assumiamo che il dataset iniziale sia molto grande (nell'ordine di
decine di gigabyte)
o che i dati siano presi da uno stream e che quindi non sia possibile
mettere tutto in
ram, una soluzione veloce, assumendo di poter recuperare i dati storici da
un db,
potrebbe essere quella di recuperare i dati dell'evento e oltre con una
query del tipo:
SELECT * FROM data_table WHERE recorded_at >= initial_check_slot_start AND
recorded_at <= final_check_slot_end

in questo modo la pressione sulla ram diminuisce ma la si sposta sul
database o, nel caso
che questi dati siano presi da una risorsa in rete ad es. API REST, sul
network.

Finalmente passo a descrivere il funzionamento di sto benedetto algoritmo,
applicandolo
al problema descritto in precedenza

il corpo di extract_events_from_datagrams diventa più o meno

caching_iterator = CachingIterator(datagrams)
for d in caching_iterator:
    if is_event_start(d):
        event = create_event(d)
        # da questo momento ogni dato letto da datasource viene messo in
una cache
        # che può essere consumata iterando su out_of_stream_iterator
        out_of_stream_iterator = caching_iterator.start_caching_from(d)

    # ...altra roba che arricchisce l'evento e poi
    if is_event_end(d):
        yield event, out_of_stream_iterator


mentre il corpo di post_process rimane essenzialmente invariato iterando su
out_if_stream_iterator il quale, una volta esaurita la cache, mette i nuovi
dati in
un nuovo buffer che verrà usato da caching_iterator; una volta che il
controllo
ritorna al loop di caching_iterator questo leggerà prima dal buffer interno
e,
esaurito questo, caching_iterator ritornerà a chiedere dati al data source

Provo a riscrivere il comportamento del caching_iterator:

- leggi dal buffer privato, se vuoto leggi da datasource
- se è stato richiesto di tenere traccia dei dati alimenta la cache
- restituisci dato

mentre out_of_stream_iterator:

- leggi dalla cache
- se cache esaurita leggi da data_source e alimenta una cache locale
- restituisci dato
- quando si è finito passa la cache locale a caching_iterator nel su buffer
privato)
in modo che questo possa riprendere da dove si è fermato

un implementazione in Python potrebbe essere quella in allegato

Se siete arrivati sino a qui complimenti per la pazienza! :)

Ora torniamo alla domanda iniziale, esiste un design pattern che descrive
un approccio di questo tipo?

-- 
"Unix IS user friendly. It's just selective about who its friend are"

"Nevertheless I still think it’s a bad idea to make things harder for
ourselves if we can avoid it."

"C is quirky, flawed, and an enormous success."
                                   -- Dennis Ritchie

"Shipping is a feature. A really important feature. Your product must have
it."

"There is no such a thing as a temporary change or workaround: In most
cases, workarounds are tech debt."

"La gatta frettolosa ha fatto i gattini ciechi"
-------------- parte successiva --------------
Un allegato HTML è stato rimosso...
URL: <http://lists.python.it/pipermail/python/attachments/20180211/e4eadd75/attachment-0001.html>
-------------- parte successiva --------------
Un allegato non testuale è stato rimosso....
Nome:        caching-iterator.py
Tipo:        text/x-python-script
Dimensione:  2871 bytes
Descrizione: non disponibile
URL:         <http://lists.python.it/pipermail/python/attachments/20180211/e4eadd75/attachment-0001.bin>


Maggiori informazioni sulla lista Python