Vai al contenuto

Apache Storm: Guida Completa

  • di

Storm è un sistema distribuito, scalabile e fault tolerant, per data processing in tempo reale di grandi volumi di dati prodotti ad alta velocità sia stateless che stateful. Le API di default sono fornite in Java ma supporta anche altri diversi linguaggi di programmazione (Clojure, Java, Ruby, Python). Supporta tutte e 3 le semantiche di comunicazione: at-most-once, at-least-once, exactly-once.

Architettura

Storm presenta un’architettura master/worker nella quale il nodo principale (master), chiamato Nimbus node è incaricato della gestione e del monitoraggio di job di ingestion e calcolo. I nodi workers sono chiamati Supervisor e si occupano dell’esecuzione dei task assegnati dal nodo master, per mezzo di processi worker da essi gestiti. Il coordinamento tra nodo principale e nodi Supervisor è mantenuto attraverso Zookeper, che è la componente di Hadoop che si occupa della sincronizzazione delle configurazioni all’interno del cluster, coordinando le componenti che ne fanno uso.

A livello logico esistono tre concetti fondamentali all’interno dell’architettura Storm, la topology, gli spout e i bolt.

La Topology è l’insieme delle componenti di base che, collegate tra loro, danno luogo al processo di ingestion. Tali componenti sono gli Spout, che rappresentano le sorgenti di dati e i Bolt che hanno il compito di ricevere uno o più stream di dati, processarli ed all’occorrenza salvare i dati in una destinazione. Uno Spout può essere collegato a più Bolt e un Bolt può ricevere dati da più Spout o anche da altri Bolt. Questa flessibilità può dar luogo Topology molto complesse.

Infine, uno stream di dati è composto da Tuple, che sono insiemi ordinati di valori e rappresentano l’unità di processing di Spout e Bolt.

Implementazione della Topology

Dal punto di vista implementativo, realizzare di una topologia significa sviluppare classi Java che implementino determinate interfacce specifiche di ciascun oggetto (Spout o Bolt). In generale il programma Java conterrà oltre alle classi per ciascun Spout e Bolt, anche una classe main con la definizione della Topology e la relativa operazione di submit sul cluster Storm. Nelle classi Spout il metodo nextTuple contiene la logica di estrazione dei dati dalla sorgente; essi sono posti nello stream di Storm tramite il metodo emit dell’oggetto SpoutOutputCollector. Nei Bolt, invece, il metodo execute riceve una Tupla e la processa, rimettendo in circolo il risultato tramite il metodo emit dell’oggetto OutputCollector. Esistono già alcuni Spout e Bolt implementati in Storm che implementano le connessioni ai vari framework, come per esempio HdfsSpout e HdfsBolt, che rispettivamente sono in grado di leggere e scrivere su HDFS.

Storm mette a disposizione diversi tipi di operazioni di raggruppamento con le quali possiamo definire come lo stream di dati venga partizionato tra i vari task. La strategia di raggruppamento è definita nella fase di costruzione della topologia. Le opzioni utilizzabili sono:

  • Shuffle grouping: le tuple sono distribuite casualmente tra i task, in modo che ciascun Bolt processi lo stesso numero di righe.
  • Fields grouping: lo stream è partizionato in base a uno o più campi specificati. Le tuple che presentano gli stessi valori per tali campi, sono assegnate allo stesso task.
  • Partial Key grouping: funziona come il precedente, anche se vi è un bilanciamento del carico tra più Bolt, in modo da garantire un carico simile, soprattutto in caso di distribuzioni dei dati sbilanciate.
  • All grouping: lo stream è replicato su tutti i task.
  • Global grouping: l’intero stream è convogliato verso un singolo task.
  • Direct grouping: in questo caso, per ogni tupla è indicato il task che la processerà.

Quando si crea la topologia bisogna indicare il grado di parallelismo per essa, una volta creata solo alcuni aspetti del parallelismo possono essere modificati in fase di esecuzione:

  • Si possono cambiare il numero di executor (thread).
  • NON si può cambiare il numero di task, che rimangono statici durante la vita della topologia.

Trident

Trident è una API che lavora ad un livello di astrazione più elevato, consentendo la creazione di topologie in modo più rapido e conciso. Ciò evita l’implementazione di dettagli di basso livello, che vengo gestiti automaticamente. In Trident possono essere implementate in modo semplice le operazioni di join tra stream, filtri e aggregazioni. Inoltre vi è la possibilità di creare funzioni custom e applicarle alle Tuple dello stream.

Trident implementa anche il modello di delivery exactly-one processing (ogni Tupla è processata e salvata sempre e solo una volta), che con l’API standard è realizzabile solamente scrivendo del codice custom.

Gli scenari di utilizzo di Trident sono proprio quelli in cui è necessario l’exactly once processing, questa semantica introduce inevitabilmente un’elevata latenza, difatti è meglio utilizzare l’API standard negli scenari in cui è richiesta un’elevata performance di ingestion dei dati.

Trident infatti essendo uno strato aggiuntivo di software, semplifica di molto lo svilupo ma aggiunge alcune inefficienze nell’esecuzione.

Elaborazioni Stateful

Per la maggior parte dei casi d’uso, la memorizzazione dello stato in Storm viene lasciata al programmatore.

Se il vostro Bolt mantiene uno stato e va in crash dopo aver accumulato 3 settimane di dati aggregati che non avete memorizzato da nessuna parte, bhè questo è un bel problema perché non avete nessun modo per recuperarli.

Un esempio pratico: Word Count

Come l'”Hello World” sta a ai linguaggi di programmazione il “Word Count” sta ai framework di processamento, di seguito faremo l’esempio di una semplicissima topologia per il conteggio delle parole che possiamo definirlo come il passo base per approcciare ai framewor di processamento 😀

Input: un flusso di parole
Output: numero di occorrenze per ogni parola

In questo esempio ci sono tre unità di elaborazione:

  • RandomSentenceSpout: genera frasi casuali.
  • SplitSentenceBolt: riceve le frasi e le divide in parole, le singole parole vengono emesse nel bolt WordCount.
  • WordCountBolt: riceve le parole e incrementa il contatore associato ad ogni parola in una mappa chiave-valore dove la chiave è la parola stessa e il valore il numero di occorrenze.

Aprite il vostro IDE e create un nuovo progetto Java con gestore delle dipendenze Maven

Aggiungete nelle dipendenze Maven nel file pom.xml la seguente dipendenza storm:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.4.0</version>
    <scope>${provided.scope}</scope>
</dependency>

Ora creiamo le classi:

RandomSentenceSpout

public class RandomSentenceSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;
    Random _rand;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        _rand = new Random();
    }

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        String sentence = sentences[_rand.nextInt(sentences.length)];
        _collector.emit(new Values(sentence));
    }

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

SplitSentenceBolt

public static class SplitSentence extends BaseBasicBolt {
    /** 
    * The bolt will only emit the field "word" 
    */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    /** 
    * The bolt will receive the line from the 
    * words file and process it to Normalize this line 
    * The normalize will be put the words in lower case 
    * and split the line to get all words in this 
    **/
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String sentence = tuple.getStringByField("sentence");
        String words[] = sentence.split(" ");
        for (String word : words) {
            word = word.trim();
            if(!word.isEmpty()){ 
                word = word.toLowerCase(); 
                //Emit the word 
                basicOutputCollector.emit(new Values(word));
            }
        }
        // Acknowledge the tuple 
        basicOutputCollector.ack(input);
    }
}

WordCount Bolt

public static class WordCount extends BaseBasicBolt {
    private Map<String, Integer> counters;

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        /** 
        * If the word dosn't exist in the map we will create 
        * this, if not We will add 1 
        **/ 
        if(!counters.containsKey(word)){ 
            counters.put(str, 1); 
        } else { 
            Integer c = counters.get(word) + 1; 
            counters.put(word, c); 
        }
        //Set the tuple as Acknowledge 
        collector.ack(input);
        collector.emit(new Values(word, count));
    }

    /** 
    * On create 
    **/ 
    @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
        this.counters = new HashMap(); 
        this.collector = collector;                                                                 
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }

    /** * At the end of the spout (when the cluster is shutdown 
    * We will show the word counters 
    **/ 
    @Override 
    public void cleanup() { 
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue()); 
        } 
    }
}

Ora mettiamo tutto assieme costruendo la topologia!

public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 2);
    builder.setBolt("split", new SplitSentence(), 2)
        .shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split", new Fields("word"));

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
        conf.setNumWorkers(1);

        StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    } else {
        conf.setMaxTaskParallelism(3);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word-count", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }
}

Siamo pronti per lanciare il tutto facendo partire il main()!

Alcune informazioni utili:

Una topologia elabora tuple all’infinito, ovvero fin quando non viene ucciso il processo.

Storm consente due modalità di esecuzione: locale, cluster

Modalità locale: la topologia viene eseguita sulla macchina locale all’interno di una singola JVM. Questa modalità viene utilizzata per lo sviluppo, il test e il debug, perché è il modo più semplice per vedere tutti i componenti della topologia che lavorano insieme.

Modalità cluster: la topologia viene distribuita da Storm su
più nodi. La modalità cluster non mostra informazioni di debug, motivo per cui è considerata modalità di produzione.

  • La modalità cluster dovrebbe essere utilizzata per eseguire la nostra applicazione sull’insieme di dati reali
  • Sfrutta il parallelismo
  • Il codice dell’applicazione è distribuito in modo trasparente
  • La topologia è gestita e monitorata a tempo di esecuzione

Nel nostro caso abbiamo lanciato la topologia in modalità locale e quindi a singolo nodo, se vogliamo invece testare un deploy in modalità cluster possiamo farlo utilizzando Docker Compose;