<div dir="ltr">ATTENZIONE! segue post lungo e pipposo<br><br>Salve lista,<br><br>questa settimana mi sono imbattuto in una parte di codice "legacy" che si occupa di analizzare <br>un flusso di dati ed estrarre degli eventi che hanno un inizio e una fine (tra le altre cose);<br>a questo codice andava aggiunto un controllo per verificare che l'evento fosse stato <br>registrato in maniera corretta a causa di un bug della sorgente dati. Per incasinare<br>ancora di più la situazione, la verifica dell'evento richiede l'analisi di dati registrati <br>dopo la fine dell'evento. Provo a spiegare meglio il flusso con del codice di esempio:<br><br># pseudo codice "originale" senza analisi dell'evento<br>def extract_events(data_store, start_date, end_date):<br>    datagrams = data_store.get_datagrams(start_date, end_date)<br><br>    for event in extract_events_from_datagrams(datagrams):<br>        yield clean_event(event)<br><br>def extract_events_from_datagrams(datagrams):<br>    for d in datagrams:<br>        # logica di estrazione dell'evento molto complessa riassunta con<br>        if not interesting_data(d):<br>            continue<br>        if event_start_detected:<br>            event = Event(d)<br>        if questo and quel_altro:<br>            accumulate_event_data(event, d)<br>        elif altre_condizioni: # ripetere per molte altre condizioni<br>            cancel_event(event)<br>            event = None<br>        if event_end_detected:<br>            yield event<br><br><br># pseudo codice con l'analisi a posteriori per la verifica dell'evento<br>def extract_events(data_store, start_date, end_date, post_process_check=False):<br>    # in questo caso si legge tutto il dataset in memoria<br>    datagrams = list(data_store.get_datagrams(start_date, end_date))<br><br>    for event in extract_events_from_datagrams(datagrams):<br>        if post_process_check:<br>            event = post_process(event, datagram)<br>        if event is None:<br>            continue<br>        event = clean_event(event)<br>        if event:<br>            yield event<br><br>def extract_events_from_datagrams(datagrams):<br>    # si tiene traccia dell'indice di ogni dato per usarlo poi come<br>    # punto di partenza nel post processing dell'evento<br>    for idx, d in enumerate(datagrams):<br>        # logica di estrazione dell'evento molto complessa riassunta con<br>        if not interesting_data(d):<br>            continue<br>        if event_start_detected:<br>            # si crea l'evento tenendo traccia dell'indice del primo dato<br>            # associabile all'evento<br>            event = Event(d, first_datagram=idx)<br>        if questo and quel_altro:<br>            accumulate_event_data(event, d)<br>        if event_end_detected:<br>            yield event<br><br># verifica la presenza di alcuni tipo di datagram che si possono essere <br># presentati poco dopo l'inizio dell'evento e poco dopo la fine dell'evento<br># restituisce None se l'evento va scartato altrimenti restituisce l'evento stesso<br>def post_process(event, datagrams):<br>    inital_check_slot_start = even.start_timestamp + timedelta(seconds=50)<br>    inital_check_slot_end = even.start_timestamp + timedelta(seconds=100)<br><br>    final_check_slot_start = even.end_timestamp - timedelta(seconds=50)<br>    final_check_slot_end = even.end_timestamp + timedelta(seconds=50)<br><br>    for d in datagrams[event.first_datagram_idx: ]:<br><br>        if initial_check_slot_start <= d.recorded_at <= initial_check_slot_end:<br>            # si verifica la presenza di varie condizioni legate al dato<br>            if should_discard_event_intial_slot(d):<br>                return None<br><br>        if final_check_slot_start <= d.recorded_at <= final_check_slot_end:<br>            # si verifica la presenza di varie condizioni legate al dato<br>            if should_discard_event_final_slot(d):<br>                return None<br><br>        if d.recorded_at > final_check_slot_end:<br>            break<br><br>    return event<br><br><br>Questo approccio funziona solo perché il dataset di partenza ha una dimensione<br>ragionevole, nell'ordine di qualche decina di megabyte, e perché questo post <br>processing viene richiamato manualmente solo in certe occasioni quindi il rischio di<br>esaurire la memoria del processo generale, a causa di molteplici chiamate in parallelo,<br>è molto basso.<br><br>Con questo finiamo il mega pippone introduttivo e passiamo al algoritmo in oggetto per il <br>quale sono alla ricerca di un design pattern che lo descriva (sono sicuro che esiste ma<br>vai e trovalo...)<br><br>Ora assumiamo che il dataset iniziale sia molto grande (nell'ordine di decine di gigabyte) <br>o che i dati siano presi da uno stream e che quindi non sia possibile mettere tutto in<br>ram, una soluzione veloce, assumendo di poter recuperare i dati storici da un db, <br>potrebbe essere quella di recuperare i dati dell'evento e oltre con una query del tipo:<br>SELECT * FROM data_table WHERE recorded_at >= initial_check_slot_start AND recorded_at <= final_check_slot_end<br><br>in questo modo la pressione sulla ram diminuisce ma la si sposta sul database o, nel caso<br>che questi dati siano presi da una risorsa in rete ad es. API REST, sul network.<br><br>Finalmente passo a descrivere il funzionamento di sto benedetto algoritmo, applicandolo<br>al problema descritto in precedenza<br><br>il corpo di extract_events_from_datagrams diventa più o meno<br><br>caching_iterator = CachingIterator(datagrams)<br>for d in caching_iterator:<br>    if is_event_start(d):<br>        event = create_event(d)<br>        # da questo momento ogni dato letto da datasource viene messo in una cache<br>        # che può essere consumata iterando su out_of_stream_iterator<br>        out_of_stream_iterator = caching_iterator.start_caching_from(d) <br><br>    # ...altra roba che arricchisce l'evento e poi<br>    if is_event_end(d):<br>        yield event, out_of_stream_iterator<br><br><br>mentre il corpo di post_process rimane essenzialmente invariato iterando su<br>out_if_stream_iterator il quale, una volta esaurita la cache, mette i nuovi dati in<br>un nuovo buffer che verrà usato da caching_iterator; una volta che il controllo<br>ritorna al loop di caching_iterator questo leggerà prima dal buffer interno e,<br>esaurito questo, caching_iterator ritornerà a chiedere dati al data source<br><br>Provo a riscrivere il comportamento del caching_iterator:<br><br>- leggi dal buffer privato, se vuoto leggi da datasource<br>- se è stato richiesto di tenere traccia dei dati alimenta la cache<br>- restituisci dato<br><br>mentre out_of_stream_iterator:<br><br>- leggi dalla cache<br>- se cache esaurita leggi da data_source e alimenta una cache locale<br>- restituisci dato<br>- quando si è finito passa la cache locale a caching_iterator nel su buffer privato)<br>in modo che questo possa riprendere da dove si è fermato<br><br>un implementazione in Python potrebbe essere quella in allegato<br><br>Se siete arrivati sino a qui complimenti per la pazienza! :)<br><br>Ora torniamo alla domanda iniziale, esiste un design pattern che descrive un approccio di questo tipo?<br clear="all"><br>-- <br><div class="gmail_signature"><div dir="ltr"><div><div dir="ltr"><div><div dir="ltr"><div><div dir="ltr"><div><div dir="ltr">"Unix IS user friendly. It's just selective about who its friend are"<br><br>"Nevertheless I still think it’s a bad idea to make things harder for ourselves
if we can avoid it."<br><span><br>"C is quirky, flawed, and an enormous success."<br>                                   -- Dennis Ritchie<br></span><br><span>"Shipping is a feature. A really important feature. Your product must have it."<br><br>"</span><span>There is no such a thing as a temporary change or workaround: In most cases, workarounds are tech debt."<br><br></span></div><div>"La gatta frettolosa ha fatto i gattini ciechi"<br><br></div></div></div></div></div></div></div></div></div></div>
</div>