Advanced MapReduce

Use MapReduce sparingly

In Riak KV, MapReduce is the primary method for non-primary-key-based querying. Although useful for tasks such as batch processing jobs, MapReduce operations can be very computationally expensive, to the extent that they can degrade performance in production clusters operating under load. Because of this potential for performance degradation, we recommend running MapReduce operations in a controlled, rate-limited fashion and never for realtime querying purposes.

MapReduce, the data processing paradigm popularized by Google, is provided by Riak KV to aggregate results as background batch processes.


In Riak KV, MapReduce is one of the primary methods for non-primary-key-based querying alongside secondary indexes. Riak KV allows you to run MapReduce jobs using Erlang or JavaScript.

Deprecation Warning

Javascript MapReduce is deprecated and will be removed in a future version.

Why Do We Use MapReduce for Querying Riak KV?

Key/value stores like Riak KV generally do not offer the kinds of complex querying capabilities found in other data storage systems, such as relational databases. MapReduce enables you to perform powerful queries over the data stored in Riak KV but should be used with caution.

The main goal of MapReduce is to spread the processing of a query across many systems to take advantage of parallel processing power. This is generally done by dividing the query into several steps, i.e. dividing the dataset into several chunks and then running those step/chunk pairs on separate physical hosts. Riak KV’s MapReduce has an additional goal: increasing data locality. When processing a large dataset, it’s often much more efficient to take the computation to the data than it is to bring the data to the computation.

“Map” and “Reduce” are phases in the query process. Map functions take one piece of data as input and produce zero or more results as output. If you’re familiar with mapping over a list in functional programming languages, you’re already familiar with the “Map” steps in a MapReduce query.

MapReduce caveats

MapReduce should generally be treated as a fallback rather than a standard part of an application. There are often ways to model data such that dynamic queries become single key retrievals, which are dramatically faster and more reliable in Riak KV, and tools such as Riak search and 2i are simpler to use and may place less strain on a cluster.


One consequence of Riak KV’s processing model is that MapReduce queries have an effective R value of 1. The queries are distributed to a representative sample of the cluster where the data is expected to be found, and if one server lacks a copy of data it’s supposed to have, a MapReduce job will not attempt to look for it elsewhere.

For more on the value of R, see our documentation on replication properties.

Key lists

Asking Riak KV to generate a list of all keys in a production environment is generally a bad idea. It’s an expensive operation.

Attempting to constrain that operation to a bucket (e.g., mapred_bucket as used below) does not help because Riak KV must still pull all keys from storage to determine which ones are in the specified bucket.

If at all possible, run MapReduce against a list of known keys.

Code distribution

As we’ll discuss in this document, the functions invoked from Erlang MapReduce must be available on all servers in the cluster unless using the client library from an Erlang shell.

Security restrictions

If Riak’s security functionality is enabled, there are two restrictions on MapReduce that come into play:

  • The riak_kv.mapreduce permission must be granted to the user (or via the user’s groups)
  • Other than the module riak_kv_mapreduce, any Erlang modules distributed with Riak KV will not be accessible to custom MapReduce code unless made available via the add_path mechanism documented in Installing Custom Code.

How Riak KV’s MapReduce Queries Are Specified

MapReduce queries in Riak KV have two components: (1) a list of inputs and (2) a list of “steps,” or “phases.”

Each element of the input list is an object location, as specified by bucket type, bucket, and key. This location may also be annotated with “key-data,” which will be passed as an argument to a map function when evaluated on the object stored under that bucket-key pair.

Each element of the phases list is a description of a map function, a reduce function, or a link function. The description includes where to find the code for the phase function (for map and reduce phases), static data passed to the function every time it is executed during that phase, and a flag indicating whether or not to include the results of that phase in the final output of the query.

The phase list describes the chain of operations through which each input will flow. That is, the initial inputs will be fed to the first phase in the list and the output of that phase will be fed as input to the next phase in the list. This stream will continue through the final phase.

How Phases Work

Map Phase

The input list to a map phase must be a list of (possibly annotated) bucket-key pairs. For each pair, Riak KV will send the request to evaluate the map function to the partition that is responsible for storing the data for that bucket-key. The vnode hosting that partition will look up the object stored under that bucket-key and evaluate the map function with the object as an argument. The other arguments to the function will be the annotation, if any is included, with the bucket-key, and the static data for the phase, as specified in the query.


Be aware that most Riak KV clusters will retain deleted objects for some period of time (3 seconds by default), and the MapReduce framework does not conceal these from submitted jobs. These tombstones can be recognized and filtered out by looking for X-Riak-Deleted in the object metadata with a value of true.

Reduce Phase

Reduce phases accept any list of data as input, and produce any list of data as output. They also receive a phase-static value, specified in the query definition.

The most important thing to understand is that the function defining the reduce phase may be evaluated multiple times, and the input of later evaluations will include the output of earlier evaluations.

For example, a reduce phase may implement the set-union function. In that case, the first set of inputs might be [1,2,2,3], and the output would be [1,2,3]. When the phase receives more inputs, say [3,4,5], the function will be called with the concatenation of the two lists: [1,2,3,3,4,5].

Other systems refer to the second application of the reduce function as a “re-reduce.” There are at least a few reduce-query implementation strategies that work with Riak KV’s model.

One strategy is to implement the phase preceding the reduce phase such that its output is “the same shape” as the output of the reduce phase. This is how the examples in this document are written, and the way that we have found produces the cleanest code.

An alternative strategy is to make the output of a reduce phase recognizable such that it can be extracted from the input list on subsequent applications. For example, if inputs from the preceding phase are numbers, outputs from the reduce phase could be objects or strings. This would allow the function to find the previous result and apply new inputs to it.

Link phases find links matching patterns specified in the query definition. The patterns specify which buckets and tags links must have.

“Following a link” means adding it to the output list of this phase. The output of this phase is often most useful as input to a map phase or to another reduce phase.

Invoking MapReduce

To illustrate some key ideas, we’ll define a simple module that implements a map function to return the key value pairs contained in a bucket and use it in a MapReduce query via Riak KV’s HTTP API.

Here is our example MapReduce function:



% Returns bucket and key pairs from a map phase
get_keys(Value,_Keydata,_Arg) ->

Save this file as mr_example.erl and proceed to compiling the module.

Note on the Erlang Compiler

You must use the Erlang compiler (erlc) associated with the Riak KV installation or the version of Erlang used when compiling Riak KV from source.

Compiling the module is a straightforward process:

erlc mr_example.erl

Successful compilation will result in a new .beam file, mr_example.beam.

Send this file to your operator, or read about installing custom code on your Riak KV nodes. Once your file has been installed, all that remains is to try the custom function in a MapReduce query. For example, let’s return keys contained within a bucket named messages (please pick a bucket which contains keys in your environment).

curl -XPOST localhost:8098/mapred \
  -H 'Content-Type: application/json'   \
  -d '{"inputs":"messages","query":[{"map":{"language":"erlang","module":"mr_example","function":"get_keys"}}]}'

The result should be a JSON map of bucket and key names expressed as key/value pairs.

Be sure to install the MapReduce function as described above on all of the nodes in your cluster to ensure proper operation.

Phase functions

MapReduce phase functions have the same properties, arguments, and return values whether you write them in Javascript or Erlang.

Map phase functions

Map functions take three arguments (in Erlang, arity-3 is required). Those arguments are:

  1. Value: the value found at a key. This will be a Riak object, which in Erlang is defined and manipulated by the riak_object module. In Javascript, a Riak object looks like this:

     "bucket_type" : BucketTypeAsString,
     "bucket" : BucketAsString,
     "key" : KeyAsString,
     "vclock" : VclockAsString,
     "values" : [
                "metadata" : {
                    "Links":[...List of link objects],
                    // ...other metadata...
                "data" : ObjectData
            // ...other metadata/data values (siblings)...
  2. KeyData : key data that was submitted with the inputs to the query or phase.

  3. Arg : a static argument for the entire phase that was submitted with the query.

A map phase should produce a list of results. You will see errors if the output of your map function is not a list. Return the empty list if your map function chooses not to produce output. If your map phase is followed by another map phase, the output of the function must be compatible with the input to a map phase - a list of bucket-key pairs or bucket-key-keydata triples.

Map function examples

These map functions return the value (data) of the object being mapped:

fun(Value, _KeyData, _Arg) ->

These map functions filter their inputs based on the arg and return bucket-key pairs for a subsequent map phase:

fun(Value, _KeyData, Arg) ->
  Key = riak_object:key(Value),
  Bucket = riak_object:bucket(Value),
  case erlang:byte_size(Key) of
    L when L > Arg ->
    _ -> []

Reduce phase functions

Reduce functions take two arguments. Those arguments are:

  1. ValueList: the list of values produced by the preceding phase in the MapReduce query.
  2. Arg : a static argument for the entire phase that was submitted with the query.

A reduce function should produce a list of values, but it must also be true that the function is commutative, associative, and idempotent. That is, if the input list [a,b,c,d] is valid for a given F, then all of the following must produce the same result:

  F([a,d] ++ F([c,b]))

Reduce function examples

These reduce functions assume the values in the input are numbers and sum them:

fun(Values, _Arg) ->
  [lists:foldl(fun erlang:'+'/2, 0, Values)]

These reduce functions sort their inputs:

fun(Values, _Arg) ->

MapReduce Examples

Riak KV supports describing MapReduce queries in Erlang syntax through the Protocol Buffers API. This section demonstrates how to do so using the Erlang client.

Distributing Erlang MapReduce Code

Any modules and functions you use in your Erlang MapReduce calls must be available on all nodes in the cluster. Please read about installing custom code.

Erlang Example

Before running some MapReduce queries, let’s create some objects to run them on. Unlike the first example when we compiled mr_example.erl and distributed it across the cluster, this time we’ll use the Erlang client library and shell.

1> {ok, Client} = riakc_pb_socket:start("", 8087).
2> Mine = riakc_obj:new(<<"groceries">>, <<"mine">>,
                        term_to_binary(["eggs", "bacon"])).
3> Yours = riakc_obj:new(<<"groceries">>, <<"yours">>,
                         term_to_binary(["bread", "bacon"])).
4> riakc_pb_socket:put(Client, Yours, [{w, 1}]).
5> riakc_pb_socket:put(Client, Mine, [{w, 1}]).

Now that we have a client and some data, let’s run a query and count how many occurrences of groceries.

6> Count = fun(G, undefined, none) ->
             [dict:from_list([{I, 1}
              || I <- binary_to_term(riak_object:get_value(G))])]
7> Merge = fun(Gcounts, none) ->
             [lists:foldl(fun(G, Acc) ->
                            dict:merge(fun(_, X, Y) -> X+Y end,
                                       G, Acc)
8> {ok, [{1, [R]}]} = riakc_pb_socket:mapred(
                         [{<<"groceries">>, <<"mine">>},
                          {<<"groceries">>, <<"yours">>}],
                         [{map, {qfun, Count}, none, false},
                          {reduce, {qfun, Merge}, none, true}]).
9> L = dict:to_list(R).
Riak Object Representations

Note how the riak_object module is used in the MapReduce function but the riakc_obj module is used on the client. Riak objects are represented differently internally to the cluster than they are externally.

Given the lists of groceries we created, the sequence of commands above would result in L being bound to [{"bread",1},{"eggs",1},{"bacon",2}].

Erlang Query Syntax

riakc_pb_socket:mapred/3 takes a client and two lists as arguments. The first list contains bucket-key pairs. The second list contains the phases of the query.

riakc_pb_socket:mapred_bucket/3 replaces the first list of bucket-key pairs with the name of a bucket; see the warnings above about using this in a production environment.


The mapred/3 input objects are given as a list of tuples in the format {Bucket, Key} or {{Bucket, Key}, KeyData}. Bucket and Key should be binaries, and KeyData can be any Erlang term. The former form is equivalent to {{Bucket,Key},undefined}.


The query is given as a list of map, reduce and link phases. Map and reduce phases are each expressed as tuples in the following form:

{Type, FunTerm, Arg, Keep}

Type is an atom, either map or reduce. Arg is a static argument (any Erlang term) to pass to each execution of the phase. Keep is either true or false and determines whether results from the phase will be included in the final value of the query. Riak KV assumes that the final phase will return results.

FunTerm is a reference to the function that the phase will execute and takes any of the following forms:

  • {modfun, Module, Function} where Module and Function are atoms that name an Erlang function in a specific module
  • {qfun,Fun} where Fun is a callable fun term (closure or anonymous function)
  • {jsfun,Name} where Name is a binary that, when evaluated in Javascript, points to a built-in Javascript function
  • {jsanon, Source} where Source is a binary that, when evaluated in Javascript is an anonymous function
  • {jsanon, {Bucket, Key}} where the object at {Bucket, Key} contains the source for an anonymous Javascript function
qfun Note

Using qfun in compiled applications can be a fragile operation. Please keep the following points in mind:

  1. The module in which the function is defined must be present and exactly the same version on both the client and Riak KV nodes.

  2. Any modules and functions used by this function (or any function in the resulting call stack) must also be present on the Riak KV nodes.

Errors about failures to ensure both 1 and 2 are often surprising, usually seen as opaque missing-function or function-clause errors. Especially in the case of differing module versions, this can be difficult to diagnose without expecting the issue and knowing of Module:info/0.

When using the Erlang shell, anonymous MapReduce functions can be defined and sent to Riak KV instead of deploying them to all servers in advance, but condition #2 above still holds.

Link phases are expressed in the following form:

{link, Bucket, Tag, Keep}

Bucket is either a binary name of a bucket to match, or the atom _, which matches any bucket. Tag is either a binary tag to match, or the atom _, which matches any tag. Keep has the same meaning as in map and reduce phases.

There are a small group of prebuilt Erlang MapReduce functions available with Riak KV. Check them out on GitHub.

Bigger Data Examples

Loading Data

This Erlang script will load historical stock-price data for Google (ticker symbol “GOOG”) into your existing Riak KV cluster so we can use it. Paste the code below into a file called load_data.erl inside the dev directory (or download it below).

#!/usr/bin/env escript
%% -*- erlang -*-
main([]) ->
    io:format("Requires one argument: filename with the CSV data~n");
main([Filename]) ->
    {ok, Data} = file:read_file(Filename),
    Lines = tl(re:split(Data, "\r?\n", [{return, binary},trim])),
    lists:foreach(fun(L) -> LS = re:split(L, ","), format_and_insert(LS) end, Lines).

format_and_insert(Line) ->
    JSON = io_lib:format("{\"Date\":\"~s\",\"Open\":~s,\"High\":~s,\"Low\":~s,\"Close\":~s,\"Volume\":~s,\"Adj. Close\":~s}", Line),
    Command = io_lib:format("curl -XPUT -d '~s' -H 'content-type: application/json'", [hd(Line),JSON]),
    io:format("Inserting: ~s~n", [hd(Line)]),

Make the script executable:

chmod +x load_data.erl

Download the CSV file of stock data linked below and place it in the dev directory where we’ve been working.

Now load the data into Riak KV.

./load_data.erl goog.csv

Map only: find the days on which the high was over $600.00

From the Erlang shell with the client library loaded, let’s define a function which will check each value in our goog bucket to see if the stock’s high for the day was above $600.

> HighFun = fun(O, _, LowVal) ->
>   {struct, Map} = mochijson2:decode(riak_object:get_value(O)),
>   High = proplists:get_value(<<"High">>, Map, -1.0),
>   case High > LowVal of
>      true -> [riak_object:key(O)];
>      false -> []
> end end.

Now we’ll use mapred_bucket/3 to send that function to the cluster.

> riakc_pb_socket:mapred_bucket(Riak, <<"goog">>, [{map, {qfun, HighFun}, 600, true}]).

Map only: find the days on which the close is lower than open

This example is slightly more complicated: instead of comparing a single field against a fixed value, we’re looking for days when the stock declined.

> CloseLowerFun = fun(O, _, _) ->
>    {struct, Map} = mochijson2:decode(riak_object:get_value(O)),
>    Close = proplists:get_value(<<"Close">>, Map, -1.0),
>    Open = proplists:get_value(<<"Open">>, Map, -2.0),
>    case Close < Open of
>       true -> [riak_object:key(O)];
>       false -> []
> end end.

> riakc_pb_socket:mapred_bucket(Riak, <<"goog">>, [{map, {qfun, CloseLowerFun}, none, true}]).

Map and Reduce: find the maximum daily variance in price by month

Here things start to get tricky. We’ll use map to determine each day’s rise or fall, and our reduce phase will identify each month’s largest variance.

DailyMap = fun(O, _, _) ->
   {struct, Map} = mochijson2:decode(riak_object:get_value(O)),
   Date = binary_to_list(proplists:get_value(<<"Date">>, Map, "0000-00-00")),
   High = proplists:get_value(<<"High">>, Map, 0.0),
   Low = proplists:get_value(<<"Low">>, Map, 0.0),
   Month = string:substr(Date, 1, 7),
   [{Month, abs(High - Low)}]

MonthReduce = fun(List, _) ->
    {Highs, _} = lists:foldl(
      fun({Month, _Value}=Item, {Accum, PrevMonth}) ->
              case Month of
                  PrevMonth ->
                      %% Highest value is always first in the list, so
                      %% skip over this one
                      {Accum, PrevMonth};
                  _ ->
                      {[Item] ++ Accum, Month}
      {[], ""},
> riakc_pb_socket:mapred_bucket(Riak, <<"goog">>, [{map, {qfun, DailyMap}, none, false}, {reduce, {qfun, MonthReduce}, none, true}]).

A MapReduce Challenge

Here is a scenario involving the data you already have loaded.

MapReduce Challenge: Find the largest day for each month in terms of dollars traded, and subsequently the largest overall day.

Hint: You will need at least one each of map and reduce phases.

Streaming MapReduce

Because Riak KV distributes the map phases across the cluster to increase data locality, you can gain access to the results of those individual computations as they finish via streaming. Streaming can be very helpful when getting access to results from a high latency MapReduce job that only contains map phases. Streaming of results from reduce phases isn’t as useful, but if your map phases return data (keep: true), they will be returned to the client even if the reduce phases haven’t executed. This will let you use streaming with a reduce phase to collect the results of the map phases while the jobs are run and then get the result to the reduce phase at the end.

Streaming via the HTTP API

You can enable streaming with MapReduce jobs submitted to the /mapred resource by adding ?chunked=true to the url. The response will be sent using HTTP 1.1 chunked transfer encoding with Content-Type: multipart/mixed. Be aware that if you are streaming a set of serialized objects (like JSON objects), the chunks are not guaranteed to be separated along the same boundaries that your serialized objects are. For example, a chunk may end in the middle of a string representing a JSON object, so you will need to decode and parse your responses appropriately in the client.

Streaming via the Erlang API

You can use streaming with Erlang via the Riak KV local client or the Erlang Protocol Buffers API. In either case, you will provide the call to mapred_stream with a Pid that will receive the streaming results.

For examples, see MapReduce pbstream.erl

Troubleshooting MapReduce, illustrated

The most important advice: when developing Erlang MapReduce against Riak KV, prototype against a development environment using the Erlang shell. The shell allows for rapid feedback and iteration; once code needs to be deployed to a server for production use, changing it is more time-consuming.

Module not in path

$ curl -XPOST localhost:8098/mapred \
>   -H 'Content-Type: application/json'   \
>   -d '{"inputs":"messages","query":[{"map":{"language":"erlang","module":"mr_example","function":"get_keys"}}]}'

{"phase":0,"error":"invalid module named in PhaseSpec function:\n must be a valid module name (failed to load mr_example: nofile)"}

Node in process of starting

$ curl -XPOST localhost:8098/mapred   -H 'Content-Type: application/json'     -d '{"inputs":"messages","query":[{"map":{"language":"erlang","module":"mr_example","function":"get_keys"}}]}'

<html><head><title>500 Internal Server Error</title></head><body><h1>Internal Server Error</h1>The server encountered an error while processing this request:<br><pre>{error,{error,function_clause,

Erlang errors

> riakc_pb_socket:mapred_bucket(Riak, <<"goog">>, [{map, {qfun, DailyFun}, none, true}]).

The Erlang shell truncates error messages; when using MapReduce, typically the information you need is buried more deeply within the stack.

We can get a longer error message this way:

> {error, ErrorMsg} = riakc_pb_socket:mapred_bucket(Riak, <<"goog">>, [{map, {qfun, DailyFun}, none, true}]).

> io:format("~p~n", [ErrorMsg]).

Still truncated, but this provides enough context to see the problem: string,substr,[\\\"2009-06-10\\\",0,7]. Erlang’s string:substr function starts indexing strings at 1, not 0.

Exceptional tip

When experimenting with MapReduce from the Erlang shell, it is helpful to avoid breaking the connection to Riak KV when an exception is trapped by the shell. Use catch_exception:

> catch_exception(true).