Mar 2, 2014

Quick start MongoDB MapReduce with Java

In this tutorial we will how to use MapRedulce feature of MongoDB with java. First we will see few commands of MongoDB and then we will connect this DB with Java Connector and perform basic MapReduce feature and then a slightly more complex MapReduce function. So first lets start with MongoDB.
Download latest version of MongoDB from http://www.mongodb.org/downloads. One which I have used for this tutorial is mongodb-win32-i386-2.4.6.zip.  Now extract this file on local machine.
Go to %MONGODB_HOME%/bin and execute command mongod –help this will show options for starting instance on MongoDB.



So we will start instance by executing following command. We will pass location where the data will be stored by using --dbpath as follows.



You can also see files created at this location.



Now in another window we will start mongodb console by executing command mongo as follows.



Now we will use following three commands that will show us what databases are present in MongoDB (show databases), how to select particular DB (use <db_name>) and how to check what tables exists in particular database (show tables).




Now we will download driver for connecting to MongoDB from http://central.maven.org/maven2/org/mongodb/mongo-java-driver/2.9.3/mongo-java-driver-2.9.3.jar and add this jar in your Eclipse project. Also we will create a utility class that will create a connection to MongoDB and will create a DBCollection (Object where data will be stored in mongo db). So following will be our utility class.
package com.techcielo.mongodb.util;

import java.net.UnknownHostException;

import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;

public class MongoDBUtil {
      
       public static DBCollection createCollection(String name,boolean forceRecreate){
              try {
                     MongoClient client = new MongoClient();
                     DB db = client.getDB("test");
                     if(db.collectionExists(name)){
                           if(forceRecreate){
                                  db.getCollection(name).drop();
                           }
                           else
                           {
                                  return db.getCollection(name);
                           }
                     }
                     DBObject mydata = BasicDBObjectBuilder.start().add("capped", true).add("size", 200000000l).get();
               DBCollection collection = db.createCollection(name, mydata);
               return collection;
              }
              catch (UnknownHostException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
              }
              return null;
       }
}

Here since we have only one instance which we want to connect so we are not passing server list also since it is on local machine (localhost) we are also not passing server name and we use default Constructor to create instance of MongoClient. Second thing we will call getDB method on MongoDB to connect to test database of MongoDB.
Once this is done now we will insert some data in this collection. So we will insert following data in this collection. We will fire following query in our open source database northwind to get the data that we will insert here.
SELECT od.OrderID, ProductID, UnitPrice, Quantity, CustomerId, EmployeeID, ShipCountry
FROM order_details od
JOIN orders o ON o.orderId = od.orderId
LIMIT 0 , 100
So here I will just create one class that will give me list of bean with appropriate data, leaving it to user how he/she wants to load the data e.g. by hitting DB or by flat file anything that he/she likes. We are interested in end result. So following will be our class.

package com.techcielo.mongodb.main;

import java.util.List;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.techcielo.mongodb.bean.OrderBean;
import com.techcielo.mongodb.util.DBUtil;
import com.techcielo.mongodb.util.MongoDBUtil;


public class Main {
       public static void main(String[] args) {
              insertData();
       }
       public static void insertData(){
              List<OrderBean> orderlist = DBUtil.getOrderData();           
              System.out.println(orderlist.size());
              DBCollection coll = MongoDBUtil.createCollection("orderdata",true);
              for(OrderBean order:orderlist){
                     BasicDBObject obj = new BasicDBObject();
                     obj.append("orderId",order.getOrderid());
                     obj.append("productId",order.getProductid());
                     obj.append("custId",order.getCustid());
                     obj.append("quantity",order.getQuantity());
                     obj.append("unitPrice",order.getUnitprice());
                     obj.append("shipCountry",order.getShipctry());
                     coll.insert(obj);
              }
              System.out.println("Total Count:"+coll.getCount());
       }
}
When this code is executed following will be output

Total Count:99

So now we have inserted the data now let’s check this data with MapReduce. So now let’s find total order value of each order id.

public static void getMapReduceData(){
              DBCollection coll = MongoDBUtil.createCollection("orderdata", false);
              String map = "function() {emit(this.orderId,{cost: this.quantity*this.unitPrice});}";
              String reduce = "function(key, values) { "
                                         + "var sum = 0; "
                                         + "values.forEach(function(doc) { "
                                         + "sum += doc.cost;}); "
                                         + "return sum;} ";        
             
              MapReduceCommand cmd = new MapReduceCommand(coll, map, reduce, null, MapReduceCommand.OutputType.INLINE, null);
             
              MapReduceOutput out = coll.mapReduce(cmd);
        for (DBObject o : out.results())
        {
              Long orderid = (Long) o.get("_id");
              Double cost = 0.0;
              Object obj = o.get("value");
              if (obj instanceof DBObject) {                 
                           DBObject new_name = (DBObject) obj;
                           cost = (Double) new_name.get("cost");
                     }
              else
              {
                     cost = (Double) o.get("value");
              }
            System.out.println(orderid+":"+cost);
        }
       }

And call this method from your main class after loading data from northwind database. So following will be output of this method

10248:440.0
10249:1863.4
10250:1813.0
10251:670.8
10252:3730.0
10253:1444.8000000000002
10254:625.2
10255:2490.5
10256:517.8
10257:1119.9
10258:2018.6
10259:100.8
10260:1746.2
10261:448.0
10262:624.8
10263:2464.8
10264:724.5
10265:1176.0
10266:364.79999999999995
10267:4031.0
10268:1101.2
10269:676.0
10270:1376.0
10271:48.0
10272:1456.0
10273:2142.4
10274:538.6
10275:307.2
10276:420.0
10277:1200.8
10278:1488.8
10279:468.0
10280:613.2
10281:86.5
10282:155.39999999999998
10283:1414.8000000000002
10284:1452.0