Batching Writes in a Storm Topology

A common use case for Storm is to process some data by transforming streams of tuples. At Shareablee, we use Storm for collecting data from various APIs and one of our needs is to write denormalized data to Amazon S3 so that it can be processed later using Hadoop. If we stored each tuple we would end up with a huge amount of tiny files (which would take forever in Hadoop). How do you accumulate data (state!) so you can batch the writes to S3 in a stream processing framework like Storm?

Coming to terms with state

In general, keeping your topologies free from state will save you lots of grief. The key to stateful bolts is to remember the following properties of Storm bolts:

  • Prepared bolts allow you to maintain state for a bolt
  • Prepared bolts process one tuple at a time
  • They can be tried more than once

We can safely accumulate state inside a bolt without worrying that some other process will change it right out from under us. State is local to the process and since it will only process one tuple at a time, it is safe to change the state within the bolt. 

Since bolts can be retried (see the guaranteed message processing in the Storm wiki), we need to model our problem in a way that is safe to do more than once. Since our problem is side effected (we want to write to files and S3), anything that relies on this data must know that there may be duplicates. This was not an issue for our use case (we remove duplicates map reduce side), but you may want to stick to writing to transactional databases if it is. 

Tick tock clock spout

A simple way to model our problem is to create a spout that emits every n seconds. We assume that in the time between ticks we will be accumulating data. The accumulator bolt will be listening to the clock stream and perform the writes (in batch) only when it receives a tick from the clock spout. Here’s a simple clock spout that can be configured in the Storm config:

;; Interval is set in the topology config clock.interval.seconds
;; In the event of a failure, waits until the interval before emitting
(defspout clock-spout ["timestamp"]
  [conf context collector]
  (let [interval (* 1000 (Integer. (get conf "clock.interval.seconds")))]
    (spout
     (nextTuple []
                   (Thread/sleep interval)
                   (log-message "Tick tock")
                   (emit-spout! collector [(System/currentTimeMillis)]))
     (ack [id]))))

It does a simple task which is to emit the current time from every n seconds. By default it waits before emitting, so in the event of a failure you don’t send premature ticks.

Accumulator bolt for batch writes

The accumulator bolt that will batch writes will perform two tasks; accumulate data and export it periodically. It will be a prepared bolt (stateful) that accepts two different streams (you must specify this when declaring the topology). Here’s a skeleton:

(defn gen-tmp-file
  "Generates a temporary file with the given prefix and suffix.
   Prefix should have a trailing underscore. Returns a file path.

   Example:
   (gen-tmp-file \"my_data_\" \".csv\")"
  [prefix suffix]
  (let [tmp-file (java.io.File/createTempFile prefix suffix)]
    (.getAbsolutePath tmp-file)))
    
(defbolt accumulate-data ["f1" "f2"] {:prepare true}
  [conf context collector]
  ;; State is captured in a single atom, adjust as you need
  (let [state (atom {:last-tick (System/currentTimeMillis)
                     :tmp-file (gen-tmp-file "my_tmp_" ".csv"})]
    (bolt
     (execute
      [tuple]
      ;; This bolt can take two tuples, one which is a timestamp from
      ;; the clock and the other which is a data tuple
      (if (:timestamp tuple)
        ;; We can assume that the IO is safe as each bolt is a
        ;; separate process and will not run more than 1 tuple at a
        ;; time with the given state
        (output-tmp-file (:tmp-file @state))
        (do
          (log-message "Recieved clock tick: " (:timestamp tuple))
          (log-message "Last tick: " (:last-tick @state))
          ;; Set the datetime since last tick in case you want to check time since to decide whether to write out
          (swap! state assoc :last-tick (:timestamp tuple))
          ;; Do something with your accumulated file
          (output-tmp-file (:tmp-file @state)
          (swap! state dissoc-in [:tmp-files k])))
        
        ;; If this is not a tick from the clock spout its data to accumulate
        (do
          (log-message "Appending to file " (tmp-file @state))
          (spit (tmp-file @state) content :append true)))
      (ack! collector tuple)))))

Accumulating the data by appending it to temp files allows us to keep the data on disk rather than in memory. All we are storing in the state is the temporary file’s path. We use the JVM’s built in utilities to generate unique temporary file names to avoid collisions with other processes that may be accumulating to file. The tick is merely a signal to perform the export of the accumulated data. The storing of the last tick timestamp is not necessary in this simplified example, but would be useful if you wanted to only write if the file gets large enough or a certain threshold of time has passed.

Wrapping up

Using this method you can easily parallelize the accumulation and batch writing of data for your topology. Since we are using a clock we are guaranteed to have all batch writes to performed after n seconds where n is the interval from the clock spout. If we were to decide when to export based on the file size we could get into a situation where data sits there forever. 

We used files to accumulate data, but that may not make sense for your situation. If this were to batch writes to a database I would be more inclined to store the data in memory (in the state atom) and have a shorter interval from the clock spout so we don’t use up too much memory. Writing to files can be error prone (what if you forget a newline!) and keeping the data structures in memory is much simpler. 

In the event of a failure, the tuple could be tried again (for the clock or the data stream) so keep that in mind when modeling your problem. It is also possible that a failure that crashes the topology (Storm has a fail fast design) you will lose data that is accumulated in memory. Writing to temporary files could potentially allow you to recover from that, but I haven’t needed to design around that.