<div dir="ltr">ATTENZIONE! segue post lungo e pipposo<br><br>Salve lista,<br><br>questa settimana mi sono imbattuto in una parte di codice "legacy" che si occupa di analizzare <br>un flusso di dati ed estrarre degli eventi che hanno un inizio e una fine (tra le altre cose);<br>a questo codice andava aggiunto un controllo per verificare che l'evento fosse stato <br>registrato in maniera corretta a causa di un bug della sorgente dati. Per incasinare<br>ancora di più la situazione, la verifica dell'evento richiede l'analisi di dati registrati <br>dopo la fine dell'evento. Provo a spiegare meglio il flusso con del codice di esempio:<br><br># pseudo codice "originale" senza analisi dell'evento<br>def extract_events(data_store, start_date, end_date):<br> datagrams = data_store.get_datagrams(start_date, end_date)<br><br> for event in extract_events_from_datagrams(datagrams):<br> yield clean_event(event)<br><br>def extract_events_from_datagrams(datagrams):<br> for d in datagrams:<br> # logica di estrazione dell'evento molto complessa riassunta con<br> if not interesting_data(d):<br> continue<br> if event_start_detected:<br> event = Event(d)<br> if questo and quel_altro:<br> accumulate_event_data(event, d)<br> elif altre_condizioni: # ripetere per molte altre condizioni<br> cancel_event(event)<br> event = None<br> if event_end_detected:<br> yield event<br><br><br># pseudo codice con l'analisi a posteriori per la verifica dell'evento<br>def extract_events(data_store, start_date, end_date, post_process_check=False):<br> # in questo caso si legge tutto il dataset in memoria<br> datagrams = list(data_store.get_datagrams(start_date, end_date))<br><br> for event in extract_events_from_datagrams(datagrams):<br> if post_process_check:<br> event = post_process(event, datagram)<br> if event is None:<br> continue<br> event = clean_event(event)<br> if event:<br> yield event<br><br>def extract_events_from_datagrams(datagrams):<br> # si tiene traccia dell'indice di ogni dato per usarlo poi come<br> # punto di partenza nel post processing dell'evento<br> for idx, d in enumerate(datagrams):<br> # logica di estrazione dell'evento molto complessa riassunta con<br> if not interesting_data(d):<br> continue<br> if event_start_detected:<br> # si crea l'evento tenendo traccia dell'indice del primo dato<br> # associabile all'evento<br> event = Event(d, first_datagram=idx)<br> if questo and quel_altro:<br> accumulate_event_data(event, d)<br> if event_end_detected:<br> yield event<br><br># verifica la presenza di alcuni tipo di datagram che si possono essere <br># presentati poco dopo l'inizio dell'evento e poco dopo la fine dell'evento<br># restituisce None se l'evento va scartato altrimenti restituisce l'evento stesso<br>def post_process(event, datagrams):<br> inital_check_slot_start = even.start_timestamp + timedelta(seconds=50)<br> inital_check_slot_end = even.start_timestamp + timedelta(seconds=100)<br><br> final_check_slot_start = even.end_timestamp - timedelta(seconds=50)<br> final_check_slot_end = even.end_timestamp + timedelta(seconds=50)<br><br> for d in datagrams[event.first_datagram_idx: ]:<br><br> if initial_check_slot_start <= d.recorded_at <= initial_check_slot_end:<br> # si verifica la presenza di varie condizioni legate al dato<br> if should_discard_event_intial_slot(d):<br> return None<br><br> if final_check_slot_start <= d.recorded_at <= final_check_slot_end:<br> # si verifica la presenza di varie condizioni legate al dato<br> if should_discard_event_final_slot(d):<br> return None<br><br> if d.recorded_at > final_check_slot_end:<br> break<br><br> return event<br><br><br>Questo approccio funziona solo perché il dataset di partenza ha una dimensione<br>ragionevole, nell'ordine di qualche decina di megabyte, e perché questo post <br>processing viene richiamato manualmente solo in certe occasioni quindi il rischio di<br>esaurire la memoria del processo generale, a causa di molteplici chiamate in parallelo,<br>è molto basso.<br><br>Con questo finiamo il mega pippone introduttivo e passiamo al algoritmo in oggetto per il <br>quale sono alla ricerca di un design pattern che lo descriva (sono sicuro che esiste ma<br>vai e trovalo...)<br><br>Ora assumiamo che il dataset iniziale sia molto grande (nell'ordine di decine di gigabyte) <br>o che i dati siano presi da uno stream e che quindi non sia possibile mettere tutto in<br>ram, una soluzione veloce, assumendo di poter recuperare i dati storici da un db, <br>potrebbe essere quella di recuperare i dati dell'evento e oltre con una query del tipo:<br>SELECT * FROM data_table WHERE recorded_at >= initial_check_slot_start AND recorded_at <= final_check_slot_end<br><br>in questo modo la pressione sulla ram diminuisce ma la si sposta sul database o, nel caso<br>che questi dati siano presi da una risorsa in rete ad es. API REST, sul network.<br><br>Finalmente passo a descrivere il funzionamento di sto benedetto algoritmo, applicandolo<br>al problema descritto in precedenza<br><br>il corpo di extract_events_from_datagrams diventa più o meno<br><br>caching_iterator = CachingIterator(datagrams)<br>for d in caching_iterator:<br> if is_event_start(d):<br> event = create_event(d)<br> # da questo momento ogni dato letto da datasource viene messo in una cache<br> # che può essere consumata iterando su out_of_stream_iterator<br> out_of_stream_iterator = caching_iterator.start_caching_from(d) <br><br> # ...altra roba che arricchisce l'evento e poi<br> if is_event_end(d):<br> yield event, out_of_stream_iterator<br><br><br>mentre il corpo di post_process rimane essenzialmente invariato iterando su<br>out_if_stream_iterator il quale, una volta esaurita la cache, mette i nuovi dati in<br>un nuovo buffer che verrà usato da caching_iterator; una volta che il controllo<br>ritorna al loop di caching_iterator questo leggerà prima dal buffer interno e,<br>esaurito questo, caching_iterator ritornerà a chiedere dati al data source<br><br>Provo a riscrivere il comportamento del caching_iterator:<br><br>- leggi dal buffer privato, se vuoto leggi da datasource<br>- se è stato richiesto di tenere traccia dei dati alimenta la cache<br>- restituisci dato<br><br>mentre out_of_stream_iterator:<br><br>- leggi dalla cache<br>- se cache esaurita leggi da data_source e alimenta una cache locale<br>- restituisci dato<br>- quando si è finito passa la cache locale a caching_iterator nel su buffer privato)<br>in modo che questo possa riprendere da dove si è fermato<br><br>un implementazione in Python potrebbe essere quella in allegato<br><br>Se siete arrivati sino a qui complimenti per la pazienza! :)<br><br>Ora torniamo alla domanda iniziale, esiste un design pattern che descrive un approccio di questo tipo?<br clear="all"><br>-- <br><div class="gmail_signature"><div dir="ltr"><div><div dir="ltr"><div><div dir="ltr"><div><div dir="ltr"><div><div dir="ltr">"Unix IS user friendly. It's just selective about who its friend are"<br><br>"Nevertheless I still think it’s a bad idea to make things harder for ourselves
if we can avoid it."<br><span><br>"C is quirky, flawed, and an enormous success."<br> -- Dennis Ritchie<br></span><br><span>"Shipping is a feature. A really important feature. Your product must have it."<br><br>"</span><span>There is no such a thing as a temporary change or workaround: In most cases, workarounds are tech debt."<br><br></span></div><div>"La gatta frettolosa ha fatto i gattini ciechi"<br><br></div></div></div></div></div></div></div></div></div></div>
</div>