Things have a funny way of working out this way. A couple features were pushed back from a previous release and some last minute improvements were thrown in, and suddenly we found ourselves dragging out a lot more fresh code in our release than usual. All this the night before one of our heavy API users was launching something of their own. They were expecting to hit us thousands of times a second and most of their calls touched some piece of code that hadn’t been tested in the wild. Ordinarily, we would soft launch and put the system through its paces. But now we had no time for that. We really wanted to hammer the entire stack, yesterday, and so we couldn’t rely on internal compute resources.

Typically, people turn to a service for this sort of thing but for the load we wanted, they charge many hundreds of dollars. It was also short notice and after hours, so there was going to be some sort of premium to boot.

At first, I thought we should use something like JMeter from some EC2 machines. However, we didn’t have anything set up for that. What were we going to do with the stats? How would we synchronize starting and stopping? It just seemed like this path was going to take a while.

I wanted to go to bed. Soon.

We routinely run Hadoop jobs on EC2, so everything was already baked to launch a bunch of machines and run jobs. Initially, it seemed like a silly idea, but when the hammer was sitting right there, I saw nails. I Googled it to see if anyone had tried. Of course not. Why would they? And if they did, why admit it? Perhaps nobody else found themselves on the far right side of the Sleep vs Sensitivity-to-Shame scale.

Sleep vs Sensitivity to Shame

So, it was settled — Hadoop it is! I asked my colleagues to assemble the list of test URLs. Along with some static stuff (no big deal) and a couple dozen basic pages, we had a broad mixture of API requests and AJAX calls we needed to simulate. For the AJAX stuff, we simply grabbed URLs from the Firebug console. Everything else was already in tests or right on the surface, so we’d have our new test set in less than an hour. I figured a few dozen lines of ruby code using Hadoop streaming could probably do what I had in mind.

I’ve read quite a few post-mortems that start off like this, but read on, it turns out alright.

Hadoop Streaming

Hadoop Streaming is a utility that ships with Hadoop that works with pretty much any language. Any executable that reads from stdin and writes to stdout can be a mapper or reducer. By default, they read line-by-line with the bytes up to the first tab character representing the key and any remainder becomes the value. It’s a great resource and bridges the power of Hadoop with the ease of quick scripts. We use it a lot when we need to scale out otherwise simple jobs.

Designing Our Job

We had just two basic requirements for our job:

  1. Visit URLs quickly
  2. Produce response time stats

Input File

The only input in this project is a list of URLs — only keys and no values. The job would have to run millions of URLs through the process to sustain the desired load for the desired time but we only had hundreds of calls that needed testing. First, we wanted to skew the URL frequency towards the most common calls. To do that, we just put those URLs in multiple times. Then we wanted to shuffle them for better distribution. Finally, we just needed lots of copies.

for i in {1..10000};  do sort -R < sample_urls_list.txt; done > full_urls_list.txt

Mapper

The mapper was going to do most of the work for us. It needed to fetch URLs as quickly as possible and record the elapsed time for each request. Hadoop processes definitely have overhead and even though each EC2 instance could likely be fetching hundreds of URLs at once, it couldn’t possibly run hundreds of mappers. To get past this issue, we had two options: 1) just launch more machines and under-utilize them or 2) fetch lots of URLs concurrently with each mapper. We’re trying not to needlessly waste money, so #1 is out.

I had used the curb gem (libcurl bindings for ruby) on several other projects and it worked really well. It turns out that it was going to be especially helpful here since it has a Multi class which can run concurrent requests each with blocks that function essentially as callbacks. With a little hackery, it could be turned into a poor/lazy/sleep-deprived man’s thread pool.

The main loop:

@curler.perform do
  flush_buffer!
  STDIN.take(@concurrent_requests-@curler.requests.size).each do |url|
    add_url(url.chomp)
  end
end

Blocks for success and failure:

curl.on_success do
  if errorlike_content?(curl.body_str)
    log_error(curl.url,'errorlike content')
  else
    @response_buffer<<[curl.url,Time.now-start_time]
  end
end
curl.on_failure do |curl_obj,args|
  error_type = args.first.to_s if args.is_a?(Array)
  log_error(curl.url,error_type)
end

As you can see, each completion calls a block that outputs the URL and the number of seconds it took to process the request. A little healthy paranoia about thread safety resulted in the extra steps of buffering and flushing output — this would ensure we don’t interleave output coming from multiple callbacks.

If there is an error, the mapper will just emit the URL without an elapsed time as a hint to the stats aggregator. In addition, it uses the ruby “warn” method to emit a line to stderr. This increments a built-in Hadoop counter mechanism that watches stderr for messages in the following format:

reporter:counter:[group],[counter],[amount]

in this case the line is:

reporter:counter:error,[errortype],1

This is a handy way to report what’s happening while a job is in progress and is surfaced through the standard Hadoop job web interface.

Mapper-Only Implementation

The project could actually be done here if all we wanted was raw data to analyze via some stats software or a database. One could simply cat together the part files from HDFS and start crunching.

In this case, the whole job would look like this:

hadoop  jar $HADOOP_HOME/hadoop-streaming.jar                                  \
-D mapred.map.tasks=100                                                        \
-D mapred.reduce.tasks=0                                                       \
-D mapred.job.name="Stress Test - Mapper Only"                                 \
-D mapred.speculative.execution=false                                          \
-input  "/mnt/hadoop/urls.txt"                                                 \
-output "/mnt/hadoop/stress_output"                                            \
-mapper "$MY_APP_PATH/samples/stress/get_urls.rb 100"

and then when it finishes:

hadoop dfs -cat /mnt/hadoop/stress_output/part* > my_combined_data.txt

Reducer

In our case, I wanted to use the reducer to compute the stats as part of the job. In Hadoop streaming, a reducer can expect to receive lines through stdin, sorted by key and that the same key will not find its way to multiple reducers. This eliminates the requirement that the reducer code track state for more than one key at a time — it can simply do whatever it does to values associated with a key (e.g. aggregate) and move on when a new key arrives. This is a good time to mention the aggregate package which could be used as the reducer to accumulate stats. In our case, I wanted to control my mapper output as well as retain the flexibility to not run a reducer altogether and just get raw data.

The streaming job command looks like this:

hadoop  jar $HADOOP_HOME/hadoop-streaming.jar                                  \
-D mapred.map.tasks=100                                                        \
-D mapred.reduce.tasks=8                                                       \
-D mapred.job.name="Stress Test - Full"                                 \
-D mapred.speculative.execution=false                                          \
-input  "/mnt/hadoop/urls.txt"                                                 \
-output "/mnt/hadoop/stress_output"                                            \
-mapper "$MY_APP_PATH/samples/stress/get_urls.rb 100"                          \
-reducer$MY_APP_PATH/samples/stress/stats_aggregator.rb --reducer”

For each key (URL) and value (elapsed time), the following variables get updated

  1. sum – total time elapsed for all requests
  2. min – fastest response
  3. max – slowest response
  4. count – number of requests processed
key,val = l.chomp.split("\t",2)
if last_key.nil? || last_key!=key
  write_stats(curr_rec) unless last_key.nil?
  curr_rec = STATS_TEMPLATE.dup.update(:key=>key)
  last_key=key
end
 
if val && val!=''
  val=val.to_f
  curr_rec[:sum]+=val
  curr_rec[:count]+=1
  curr_rec[:min]=val if curr_rec[:min].nil? || val<curr_rec[:min]
  curr_rec[:max]=val if curr_rec[:max].nil? || val>curr_rec[:max]
else
  curr_rec[:errors]+=1
end

Finally, as we flush, we compute the overall average for the key.

def write_stats(stats_hash)
  stats_hash[:average]=stats_hash[:sum]/stats_hash[:count] if stats_hash[:count]>0
  puts stats_hash.values_at(*STATS_TEMPLATE.keys).join("\t")
end

Final Stats (optional)

When the job completes, it produces as many part files as there are total reducers. Before this data can be loaded into, say, a spreadsheet, it needs to be merged and converted into a friendly format. A few more lines of code get us a csv file that can easily be dropped into your favorite spreadsheet/charting software:

hadoop dfs -cat /mnt/hadoop/stress_output/part* | $MY_APP_PATH/samples/stress/stats_aggregator.rb --csv > final_stats.csv

Our CSV converter looks like this:

class CSVConverter
  def print_stats
    puts STATS_TEMPLATE.keys.to_csv
    STDIN.each_line do |l|
      puts l.chomp.split("\t").to_csv
    end
  end
end

Source Code

The mapper, get_urls.rb:

#!/usr/bin/env ruby1.9
require 'rubygems'
require 'curb'
 
class MultiCurler
  DEFAULT_CONCURRENT_REQUESTS = 100
  def initialize(opts={})
    @concurrent_requests = opts[:concurrent_requests] || DEFAULT_CONCURRENT_REQUESTS
    @curler = Curl::Multi.new
    @response_buffer=[]
  end
  def start
    while !STDIN.eof?
      STDIN.take(@concurrent_requests).each do |url|
        add_url(url.chomp)
      end
      run
    end
  end
 
  private
 
  def run
    @curler.perform do
      flush_buffer!
      STDIN.take(@concurrent_requests-@curler.requests.size).each do |url|
        add_url(url.chomp)
      end
    end
    flush_buffer!
  end
 
  def flush_buffer!
    while output = @response_buffer.pop
      puts output.join("\t")
    end
  end
 
  def add_url(u)
 
    #skip really obvious input errors
    return log_error(u,'missing url') if u.nil?
    return log_error(u,'invalid url') unless u=~/^http:\/\//i
 
    c = Curl::Easy.new(u) do|curl|
      start_time = Time.now
      curl.follow_location = true
      curl.enable_cookies=true
      curl.on_success do
        if errorlike_content?(curl.body_str)
          log_error(curl.url,'errorlike content')
        else
          @response_buffer<<[curl.url,Time.now-start_time]
        end
      end
      curl.on_failure do |curl_obj,args|
        error_type = args.first.to_s if args.is_a?(Array)
        log_error(curl.url,error_type)
      end
    end
    @curler.add(c)
  end
 
  def errorlike_content?(page_body)
    page_body.nil? || page_body=='' || page_body=~/(unexpected error|something went wrong|Api::Error)/i
  end
 
  def log_error(url,error_type)
    @response_buffer<<[url,nil]
    warn "reporter:counter:error,#{error_type||'unknown'},1"
  end
 
end
 
 
concurrent_requests = ARGV.first ? ARGV.first.to_i : nil
 
runner=MultiCurler.new(:concurrent_requests=>concurrent_requests)
runner.start

The reducer and postprocessing script, stats_aggregator.rb:

#!/usr/bin/env ruby1.9
require 'rubygems'
require 'csv'
 
 
module Stats
  STATS_TEMPLATE={:key=>nil,:sum=>0,:average=>nil,:max=>nil,:min=>nil,:errors=>0,:count=>0}
 
  class Reducer
    def run
 
      last_key=nil
      curr_rec=nil
 
      STDIN.each_line do |l|
        key,val = l.chomp.split("\t",2)
        if last_key.nil? || last_key!=key
          write_stats(curr_rec) unless last_key.nil?
          curr_rec = STATS_TEMPLATE.dup.update(:key=>key)
          last_key=key
        end
 
        if val && val!=''
          val=val.to_f
          curr_rec[:sum]+=val
          curr_rec[:count]+=1
          curr_rec[:min]=val if curr_rec[:min].nil? || val<curr_rec[:min]
          curr_rec[:max]=val if curr_rec[:max].nil? || val>curr_rec[:max]
        else
          curr_rec[:errors]+=1
        end
      end
      write_stats(curr_rec) if curr_rec
    end
 
    private
 
    def write_stats(stats_hash)
      stats_hash[:average]=stats_hash[:sum]/stats_hash[:count] if stats_hash[:count]>0
      puts stats_hash.values_at(*STATS_TEMPLATE.keys).join("\t")
    end
  end
 
 
  class CSVConverter
    def print_stats
      puts STATS_TEMPLATE.keys.to_csv
      STDIN.each_line do |l|
        puts l.chomp.split("\t").to_csv
      end
    end
  end
 
 
end
 
 
mode = ARGV.shift
 
case mode
  when '--reducer' #hadoop mode
    Stats::Reducer.new.run
  when '--csv' #csv converter; run with: hadoop dfs -cat /mnt/hadoop/stress_output/part* | stats_aggregator.rb --csv
    Stats::CSVConverter.new.print_stats
  else
    abort 'Invalid mode specified for stats aggregator.  Valid options are --reducer, --csv'
end

Reckoning (Shame Computation)

In a moment of desperation, we used Hadoop to solve a problem for which it is very tenuously appropriate, but it actually turned out great. I wrote very little code and it worked with almost no iteration and just a couple of up-front hours invested for an easily repeatable process. Our last minute stress test exposed a few issues that we were able to quickly correct and resulted in a smooth launch. All this, and it only cost us about $10 of EC2 time.

Hadoop Streaming is a powerful tool that every Hadoop shop, even the pure Java shops, should consider part of their toolbox. Lots of big data jobs are actually simple except for scale, so your local python, ruby, bash, perl, or whatever coder along with some EC2 dollars can give you access to some pretty powerful stuff.