Thursday, December 11, 2014

MongDB ReplicaSet and Streaming

This blog is about providing information on mongodb replication and oplog streaming.

MongoDB instances (mongod) can be configured for replication. One instance will be working as primary while others will work as secondary. Primary instances are for reading and writing to mongo clusterl Secondary instances sync data written to primary using oplog (explain later in the same blog).

Below mentioned will created three mongod instance running on different ports.

Steps to create replica set …

mkdir -p /data/rs1 /data/rs2 /data/rs3
Create three mongod instances running on port no. 27017, 27018 and 27019
mongod --replSet m101 --logpath "1.log" --dbpath /data/rs1 --port 27017 --smallfiles --oplogSize 64 --fork

mongod --replSet m101 --logpath "2.log" --dbpath /data/rs2 --port 27018 --smallfiles --oplogSize 64 --fork

mongod --replSet m101 --logpath "3.log" --dbpath /data/rs3 --port 27019 --smallfiles --oplogSize 64 --fork
At this point all the instances are running separately, no replica is set.

Connect with one of the mongo instance

mongo —port 27017

Once you get mongo shell enter below configuration

config = { _id: "m101", members:[
          { _id : 0, host : "localhost:27017"},
          { _id : 1, host : "localhost:27018"},
          { _id : 2, host : "localhost:27019"} ]
};
rs.initiate(config);

Now, one of the instance will act as PRIMARY and remaining will be SECONDARY.


  • By default read / write operations are allowed only on the primary.To perform read operation on secondary one can enter slave.isOk() . Write operations are not permitted on secondary
  • Secondary are working as replica set (meaning write on primary will get sync with secondary in almost real time).
  • Secondary can be used for read load distribution which provides read operations horizontally scalable.

once replica is set, mongod instances will have local db which will have oplog.rs collection. oplog.rs collection is capped collection and it logs each operations (insert / update / delete) in any collection. for e.g. when we insert document in any of the collection in mongo, it is inserted in the oplog.rs.

For example below mentioned steps insert a document inserted in customer collection and its entry in oplog.rs collection.

It is possible to create tailable cursor on oplog.rs collection. (Note : Tailable cursor blocks the current thread on cursor.hasNext() method.)

Oplos will be rolled over once specified size of the oplog is reached deep

One of the implementation with my project we used oplog.rs to read document entry (insert / update / delete) for further processing. We also stored checkpoint       (oplog entry timestamp) so that when system is restarted it will process mongo documents from that point onwards. 

Program (java mongo driver) connected with one of the node will get all information about the replica set members. And if primary is down, application will transparently write to other mongod instance(which comes up as PRIMARY)

It is possible to add / remove members in the replica set without restarting mongo instances.

Now, let’s try to understanding sharding… Sharding add scalability to mongo architecture, shard instances are mongo instances and shard instance can have replica members to provide HA. Shard key can be created and there will be mongos instances which works as router to send read / write query to correct shard. Just to make aware that mongos communicate to mongod (PRIMARY) 

CRUD operations with mongo and oplog.rs corresponding entries

for e.g. if insert below mentioned statement we can find corrosponding entry in oplog.rs


m101:PRIMARY>use test;
m101:PRIMARY>db.customer.insert({“x”:1})
m101:PRIMARY>use local
m101:PRIMARY> db.oplog.rs.find().sort({"$natural":-1}).limit(1).pretty()
{
   "ts" : Timestamp(1418846623, 1),
   "h" : NumberLong("-5919409785401989933"),
   "v" : 2,
   "op" : "i",
   "ns" : "test.customer",
   "o" : {
      "_id" : ObjectId("5491e19f02d16dfe03bb04f6"),
      "x" : 1
   }
}


ns is namespace which is dbname.collectionname, o stands object that we are inserting in the namespace and op stands for type of operation we are performing in our case it is “i” stands for insert.


when we have update operation as mentioned below op will be type “u”


m101:PRIMARY> db.customer.update({"x":1},{"$set":{"y":1}});
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
m101:PRIMARY> use local
switched to db local
m101:PRIMARY> db.oplog.rs.find().sort({"$natural":-1}).limit(1).pretty()
{
   "ts" : Timestamp(1418847060, 1),
   "h" : NumberLong("-187714577821739110"),
   "v" : 2,
   "op" : "u",
   "ns" : "test.customer",
   "o2" : {
      "_id" : ObjectId("5491e19f02d16dfe03bb04f6")
   },
   "o" : {
      "$set" : {
          "y" : 1
      }
   }
}


in case when document is removed from any of the collection, we can have op value “d”


m101:PRIMARY> db.customer.remove({"x":1})
WriteResult({ "nRemoved" : 1 })
m101:PRIMARY> use local
switched to db local
m101:PRIMARY> db.oplog.rs.find().sort({"$natural":-1}).limit(1).pretty()
{
   "ts" : Timestamp(1418847272, 1),
   "h" : NumberLong("-849370824550739777"),
   "v" : 2,
   "op" : "d",
   "ns" : "test.customer",
   "b" : true,
   "o" : {
      "_id" : ObjectId("5491e19f02d16dfe03bb04f6")
   }
}


now let’s consider the scenario where we are updating existing value, there will not be any change in the oplog.rs entry, same as explained in the update entry.

Some of the useful links to further study


2 comments:

  1. While run this below cmd

    rs.initiate(config);

    i get a error,,,,,


    rs.initiate(config);
    { "ok" : 0, "errmsg" : "Missing expected field \"version\"", "code" : 93 }....

    ReplyDelete
  2. While run this below cmd

    rs.initiate(config);

    i get a error,,,,,


    rs.initiate(config);
    { "ok" : 0, "errmsg" : "Missing expected field \"version\"", "code" : 93 }....

    ReplyDelete