TWS-4: Gossip protocol: Epidemics and rumors to the rescue


Having successfully completed a grueling yet enjoyable ‘Cloud Computing Concepts’ course at Coursera, from the University of Illinois at  Urbana-Champaign,  by Prof Indranil Gupta, I continue on my “Thinking Web Scale (TWS)” series of posts. In this post, I would like to dwell on Gossip Protocol.

Gossip protocol finds its way into distributed system from Epidemiology, a branch of science, which studies and models how diseases, rumors spread through society.   The gossip protocol disseminates information –  the way diseases, rumors spread in society or the way a computer virus is able to infect large networks very rapidly

Gossip protocol is particularly relevant in large distributed systems with hundreds and hundreds of servers spread across multiple data centers for e.g.  Social networks like Facebook, Google or Twitter etc.. The servers that power Google’s search, or the Facebook or Twitter engine is made of hundreds of commercial off the shelf (COTS) computers. This is another way of saying that the designers of these systems should fold extremely high failure rates of the servers into their design. In other words “failures will be the norm and not the exception”

As mentioned in my earlier post, in these large distributed systems  servers will be fail and new servers will be continuously joining the system. The distributed system must be able to accommodate servers joining or leaving the system. There is no global clock and each server has its own clock. To handle server failures data is replicated over many servers which obviously leads to issues of maintaining data consistency between the replicas.

A well-designed distributed system must include in its design key properties of

  1. Availability – Data should be available when you want it
  2. Consistency – Data should consistent across multiple copes
  3. Should be fault tolerant
  4. Should be scalable
  5. Handle servers joining or leaving the systems transparently

One interesting aspect of Distributed Systems much like Operating System (OS) is the fact that a lot of the design choices are based on engineering judgments. The design choices are usually a trade-off of slightly different performance characteristics. Some of them are obvious and some not so obvious.

Why Gossip protocol? What makes it attractive?

Here are some approaches

  1. Centralized Server:

Let us assume that in a network of servers we have a server (Server A) has some piece of information which it needs to spread to other servers. One way is to have this server send the message to all the servers. While this would work there are 2 obvious deficiencies with this approach

  1. The Server A will hog the bandwidth in transmitting the information to all other servers
  2. Server A will be a hot spot besides also being a Single Point of Failure

Cons: In other words if we have a central server always disseminating information then we run into the issue of ‘Single point of Failure’ of this central  server.

  1. Directed Graph

Assuming that we construct a directed overlay graph over the network of servers, we could transmit the message from server A to all other servers. While this approach, has the advantage of lesser traffic as  each server node will typically have around a 1 -3 children. This will result in lesser bandwidth utilization. However the disadvantage to this approach, will be that , when an intermediate non-leaf node fails then information will not reach all children of the failed nodes.

 Cons: Does not handle failures of non-leaf nodes well

  1. Ring Architecture

In this architecture we could have Server A, pass the message round the ring till it gets to the desired server. Clearly each node has one predecessor and one successor. Like the previous example this has the drawback that if one or more servers of the ring fail then the message does not get to its destination.

Cons: Does not handle failures of nodes in the ring well

Note: We should note that these engineering choices only make sense in certain circumstances. So for e.g. the directed graph or the ring structure discussed below have deficiencies for the distributed system case, however  these are accepted design patterns in computer networking for e.g. the Token Ring IEEE 802.5 and graph of nodes in a network. Hierarchical trees are the norm in telecom networks where international calls reach the main trunk exchange, then the central office and finally to the local office in a route that is a root-non-leaf-leaf route.

  1. Gossip protocol

Enter the Gossip protocol (here is a good summary on gossip protocol). In the gossip protocol each server sends the message to ‘b’ random peers. The value ‘b’ typically a small number is called the fan-out.  The server A which has the data is assumed to be ‘infected’. In the beginning only server A is infected while all other servers are ‘susceptible’.  Each server receiving the message is now considered to be infected. Each infected server transmits to ‘b’ other servers. It is likely that the receiving sever is already infected in which case it will drop the message.

In many ways this is similar to the spread of a disease is through a virus. The disease spreads when an infected person comes in contact with another person.

The nice part about the gossip protocol is that is light weight and it can infect the entire set of servers in the order of O (log N)

This is fairly obvious as each round the ‘b’ infected servers will infect ‘b*n’ other servers where ‘n’ is the fan-out.
The computation is as follows

Let x0 = n (Initial state, all un-infected) and y0 =1 (1 infected server) at time t = 0
With x0 + y= n + 1 at all times

Let β be the contact rate between the ‘susceptible’ and ‘infected’  (x*y), then the rate of infection can be represents as
dx/dt= -βxy

The negative sign indicates that the number of ‘non-infected’ servers will decrease over time
(It is amazing how we can capture the entire essence of the spread of disease through a simple, compact equation)

The solution for the above equation (which I have taken in good faith, as my knowledge in differential equations is a faint memory. Hope to refresh my memory when I get the chance, though!)
x=n(n+1)/(n+e^β(n+1)t )  – 1
y=(n+1)/(1+ne^(-β(n+1)t)) – 2

The solution (1)  clearly shows that the number ‘x’ of un-infected servers  at time‘t’ rapidly to 0 as the denominator becomes too large. The number of infected units ‘y’  as t increases tends to n+1, or in other words all servers get infected

This method where infected server sends a message to ‘b’ servers is known as the ‘push’ approach.

Pros: The Gossip protocol clearly is more resilient to servers failing as the gossip message is sent a ‘b’ random targets and can handle failures better.
Cons: There is a possibility that the ‘b’ random targets selected for infection are already infected, in which case the infection can die rapidly if these infected servers fail. 

The solution for the above is to have a ‘pull’ approach where after a time ‘t’ the un-infected servers pull the data from random servers. This way the un-infected servers will also get infected if they pull the data from already infected servers

A third approach is to have a combination of a push-pull approach.
Gossip has been used extensively in Facebook’s and Apache’s Cassandra NoSQL database. Amazon’s Dynamo DB and Riak NoSQL DB also use forms of Gossip Protocol

Failure detection: Gossip protocol has been used extensively in detecting failures. The failed servers are removed from the membership list and this is list is gossiped so that all servers have a uniform view of the set of live servers. However, as with any approach this is prone to high rate  false-positives,  where servers are assumed to have failed even though this may have been  marked as ‘failed’ because of a temporary network failure.   Moreover the network load on epidemic style membership lists are also high.

Some methods to handle false positives is to initially place failed servers under a ‘suspicion’.  When the number of messages attributing failure to this server increases above a threshold ‘t’, then the server is assumed to have failed and removed from the membership list.

Cassandra uses a failure ‘accrual’ mechanism to detect failures in the distributed NoSQL datanase

Epidemic protocols, like the gossip protocol are particularly useful in large scale distributed systems where servers leave and join the system.

One interesting application of the epidemic protocol is to simply to collect the overall state of the system.  If we consider an information exchange where all nodes have set an internal value xi = 0 except node 1 which has x1=1 (infected)  (from the book Distributed Systems: Principles & paradigms by Andrew Tannenbaum and Maarten Van Steen)

where xi = 1 if i =1, or 0 if i > 1
If the nodes gossip this value and compute the average (xi + xj) /2, then after a period of time this value will tend towards 1/N where N is the total number of nodes in the system. Hence all the servers in the system will become aware of the total size of the system.

Conclusion: Gossip protocol has widespread application in distributed systems of today, from spreading information, membership, failure detection, monitoring and alarming. It is really interesting to note that the theory of epidemics or disease spread from a branch of sociology become so important in a field of computer science.

Also see
1. A crime map of India in R: Crimes against women
2.  What’s up Watson? Using IBM Watson’s QAAPI with Bluemix, NodeExpress – Part 1
3.  Bend it like Bluemix, MongoDB with autoscaling – Part 2
4. Informed choices through Machine Learning : Analyzing Kohli, Tendulkar and Dravid
5. Thinking Web Scale (TWS-3): Map-Reduce – Bring compute to data

Thinking Web Scale (TWS-3): Map-Reduce – Bring compute to data


In the last decade and a half, there has arisen a class of problem that are becoming very critical in the computing domain. These problems deal with computing in a highly distributed environments. A key characteristic of this domain is the need to grow elastically with increasing workloads while tolerating failures without missing a beat.  In short I would like to refer to this as ‘Web Scale Computing’ where the number of servers exceeds several 100’s and the data size is of the order of few hundred terabytes to several Exabytes.

There are several features that are unique to large scale distributed systems

  1. The servers used are not specialized machines but regular commodity, off-the-shelf servers
  2. Failures are not the exception but the norm. The design must be resilient to failures
  3. There is no global clock. Each individual server has its own internal clock with its own skew and drift rates. Algorithms exist that can create a notion of a global clock
  4. Operations happen at these machines concurrently. The order of the operations, things like causality and concurrency, can be evaluated through special algorithms like Lamport or Vector clocks
  5. The distributed system must be able to handle failures where servers crash, disk fails or there is a network problem. For this reason data is replicated across servers, so that if one server fails the data can still be obtained from copies residing on other servers.
  6. Since data is replicated there are associated issues of consistency. Algorithms exist that ensure that the replicated data is either ‘strongly’ consistent or ‘eventually’ consistent. Trade-offs are often considered when choosing one of the consistency mechanisms
  7. Leaders are elected democratically.  Then there are dictators who get elected through ‘bully’ing.

In some ways distributed systems behave like a murmuration of starlings (or a school of fish),  where a leader is elected on the fly (pun unintended) and the starlings or fishes change direction based on a few (typically 6) closest neighbors.

This series of posts, Thinking Web Scale (TWS) ,  will be about Web Scale problems and the algorithms designed to address this.  I would like to keep these posts more essay-like and less pedantic.

In the early days,  computing used to be done in a single monolithic machines with its own CPU, RAM and a disk., This situation was fine for a long time,  as technology promptly kept its date with Moore’s Law which stated that the “ computing power  and memory capacity’ will  double every 18 months. However this situation changed drastically as the data generated from machines grew exponentially – whether it was the call detail records, records from retail stores, click streams, tweets, and status updates of social networks of today

These massive amounts of data cannot be handled by a single machine. We need to ‘divide’ and ‘conquer this data for processing. Hence there is a need for a hundreds of servers each handling a slice of the data.

The first post is about the fairly recent computing paradigm “Map-Reduce”.  Map- Reduce is a product of Google Research and was developed to solve their need to calculate create an Inverted Index of Web pages, to compute the Page Rank etc. The algorithm was initially described in a white paper published by Google on the Map-Reduce algorithm. The Page Rank algorithm now powers Google’s search which now almost indispensable in our daily lives.

The Map-Reduce assumes that these servers are not perfect, failure-proof machines. Rather Map-Reduce folds into its design the assumption that the servers are regular, commodity servers performing a part of the task. The hundreds of terabytes of data is split into 16MB to 64MB chunks and distributed into a file system known as ‘Distributed File System (DFS)’.  There are several implementations of the Distributed File System. Each chunk is replicated across servers. One of the servers is designated as the “Master’. This “Master’ allocates tasks to ‘worker’ nodes. A Master Node also keeps track of the location of the chunks and their replicas.

When the Map or Reduce has to process data, the process is started on the server in which the chunk of data resides.

The data is not transferred to the application from another server. The Compute is brought to the data and not the other way around. In other words the process is started on the server where the data, intermediate results reside

The reason for this is that it is more expensive to transmit data. Besides the latencies associated with data transfer can become significant with increasing distances

Map-Reduce had its genesis from a Lisp Construct of the same name

Where one could apply a common operation over a list of elements and then reduce the resulting list of elements with a reduce operation

The Map-Reduce was originally created by Google solve Page Rank problem Now Map-Reduce is used across a wide variety of problems.

The main components of Map-Reduce are the following

  1. Mapper: Convert all d ∈ D to (key (d), value (d))
  2. Shuffle: Moves all (k, v) and (k’, v’) with k = k’ to same machine.
  3. Reducer: Transforms {(k, v1), (k, v2) . . .} to an output D’ k = f(v1, v2, . . .). …
  4. Combiner: If one machine has multiple (k, v1), (k, v2) with same k then it can perform part of Reduce before Shuffle

A schematic of the Map-Reduce is included below\

2

Map Reduce is usually a perfect fit for problems that have an inherent property of parallelism. To these class of problems the map-reduce paradigm can be applied in simultaneously to a large sets of data.  The “Hello World” equivalent of Map-Reduce is the Word count problem. Here we simultaneously count the occurrences of words in millions of documents

The map operation scans the documents in parallel and outputs a key-value pair. The key is the word and the value is the number of occurrences of the word. E.g. In this case ‘map’ will scan each word and emit the word and the value 1 for the key-value pair

So, if the document contained

“All men are equal. Some men are more equal than others”

Map would output

(all,1),  (men,1), (are,1), (equal,1), (some,1), (men,1), (are,1),  (equal,1), (than,1), (others,1)

The Reduce phase will take the above output and give sum all key value pairs with the same key

(all,1),  (men,2), (are,2),(equal,2), (than,1), (others,1)

So we get to count all the words in the document

In the Map-Reduce the Master node assigns tasks to Worker nodes which process the data on the individual chunks

3

Map-Reduce also makes short work of dealing with large matrices and can crunch matrix operations like matrix addition, subtraction, multiplication etc.

Matrix-Vector multiplication

As an example if we consider a Matrix-Vector multiplication (taken from the book Mining Massive Data Sets by Jure Leskovec, Anand Rajaraman et al

For a n x n matrix if we have M with the value mij in the ith row and jth column. If we need to multiply this with a vector vj, then the matrix-vector product of M x vj is given by xi

1

Here the product of mij x vj   can be performed by the map function and the summation can be performed by a reduce operation. The obvious question is, what if the vector vj or the matrix mij did not fit into memory. In such a situation the vector and matrix are divided into equal sized slices and performed acorss machines. The application would have to work on the data to consolidate the partial results.

Fortunately, several problems in Machine Learning, Computer Vision, Regression and Analytics which require large matrix operations. Map-Reduce can be used very effectively in matrix manipulation operations. Computation of Page Rank itself involves such matrix operations which was one of the triggers for the Map-Reduce paradigm.

Handling failures:  As mentioned earlier the Map-Reduce implementation must be resilient to failures where failures are the norm and not the exception. To handle this the ‘master’ node periodically checks the health of the ‘worker’ nodes by pinging them. If the ping response does not arrive, the master marks the worker as ‘failed’ and restarts the task allocated to worker to generate the output on a server that is accessible.

Stragglers: Executing a job in parallel brings forth the famous saying ‘A chain is as strong as the weakest link’. So if there is one node which is straggler and is delayed in computation due to disk errors, the Master Node starts a backup worker and monitors the progress. When either the straggler or the backup complete, the master kills the other process.

Mining Social Networks, Sentiment Analysis of Twitterverse also utilize Map-Reduce.

However, Map-Reduce is not a panacea for all of the industry’s computing problems (see To Hadoop, or not to Hadoop)

But the Map-Reduce is a very critical paradigm in the distributed computing domain as it is able to handle mountains of data, can handle multiple simultaneous failures, and is blazingly fast.

Also see
1. A crime map of India in R: Crimes against women
2.  What’s up Watson? Using IBM Watson’s QAAPI with Bluemix, NodeExpress – Part 1
3.  Bend it like Bluemix, MongoDB with autoscaling – Part 2
4. Informed choices through Machine Learning : Analyzing Kohli, Tendulkar and Dravid

To see all posts click ‘Index of Posts

A Cloud medley with IBM Bluemix, Cloudant DB and Node.js


Published as a tutorial in IBM Cloudant – Bluemix tutorial and demos

Here is an interesting Cloud medley based on IBM’s Bluemix PaaS platform, Cloudant DB and Node.js. This application  creates a Webserver using Node.js and uses REST APIs to perform CRUD operations on a Cloudant DB. Cloudant DB is a NoSQL Database as a service (DBaaS) that can handle a wide variety of data types like JSON, full text and geo-spatial data. The documents  are stored, indexed and distributed across a elastic datastore spanning racks, datacenters and perform replication of data across datacenters.Cloudant  allows one to work with self-describing JSON documents through  RESTful APIs making every document in the Cloudant database accessible as JSON via a URL.

This application on Bluemix uses REST APIs to perform the operations of inserting, updating, deleting and listing documents on the Cloudant DB.  The code can be forked from Devops at bluemix-cloudant. The code can also be clone from GitHub at bluemix-cloudant.

1) Once the code is forked the application can be deployed on to Bluemix using

cf login -a https://api.ng.bluemix.net
cf push bm-cloudant -p . -m 512M

2) After this is successful go to the Bluemix dashboard and add the Cloudant DB service.  The CRUD operations can be performed by invoking REST API calls using an appropriate REST client like SureUtils ot Postman in the browser of your choice.

Here are the details of the Bluemix-Cloudant application

3) Once the Cloudant DB service has been added to the Web started Node.js application we need to parse the process.env variable to obtain the URL of the Cloudant DB and the port and host to be used for the Web server.

The Node.js Webserver is started based on the port and host values obtained from process.env

require('http').createServer(function(req, res) {
//Set up the DB connection
if (process.env.VCAP_SERVICES) {
// Running on Bluemix. Parse for  the port and host that we've been assigned.
var env = JSON.parse(process.env.VCAP_SERVICES);
var host = process.env.VCAP_APP_HOST;
var port = process.env.VCAP_APP_PORT;
....
}
....
// Perform CRUD operations through REST APIs
// Insert document
if(req.method == 'POST') {
insert_records(req,res);
}
// List documents
else if(req.method == 'GET') {
list_records(req,res);
}
// Update a document
else if(req.method == 'PUT') {
update_records(req,res);
}
// Delete a document
else if(req.method == 'DELETE') {
delete_record(req,res);
}
}).listen(port, host);

2) Access to the Cloudant DB Access to Cloudant DB is obtained as follows

if (process.env.VCAP_SERVICES) {
// Running on Bluemix. Parse the port and host that we've been assigned.
var env = JSON.parse(process.env.VCAP_SERVICES);
var host = process.env.VCAP_APP_HOST;
var port = process.env.VCAP_APP_PORT;
console.log('VCAP_SERVICES: %s', process.env.VCAP_SERVICES);
// Also parse Cloudant settings.
var cloudant = env['cloudantNoSQLDB'][0]['credentials'];
}
var db = new pouchdb('books'),
remote =cloudant.url + '/books';
opts = {
continuous: true
};
// Replicate the DB to remote
console.log(remote);
db.replicate.to(remote, opts);
db.replicate.from(remote, opts);

Access to the Cloudant DB is through the cloudant.url shown above

3)  Once the access to the DB is setup we can perform CRUD operations. There are many options for the backend DB. In this application I have PouchDB.

4) Inserting a document: To insert documents into the Cloudant DB based on Pouch DB we need to do the following

var insert_records = function(req, res) {
//Parse the process.env for the port and host that we've been assigned
if (process.env.VCAP_SERVICES) {
// Running on Bluemix. Parse the port and host that we've been assigned.
var env = JSON.parse(process.env.VCAP_SERVICES);
var host = process.env.VCAP_APP_HOST;
var port = process.env.VCAP_APP_PORT;
console.log('VCAP_SERVICES: %s', process.env.VCAP_SERVICES);
// Also parse Cloudant settings.
var cloudant = env['cloudantNoSQLDB'][0]['credentials'];
}
var db = new pouchdb('books'),
remote =cloudant.url + '/books';
opts = {
continuous: true
};
// Replicate the DB to remote
console.log(remote);
db.replicate.to(remote, opts);
db.replicate.from(remote, opts);
// Put 3 documents into the DB
db.put({
author: 'John Grisham',
Title : 'The Firm'
}, 'book1', function (err, response) {
console.log(err || response);
});
...
...
res.writeHead(200, {'Content-Type': 'text/plain'});
res.write("3 documents is inserted");
res.end();
}; // End insert_records

The nice part about Cloudant DB is that you can access your database through the URL. The steps are shown below. Once your application is running. Click on your application. You should see the screen as below.

1

Click on Cloudant as shown by the arrow.

Next click on the “Launch’ icon

2

This should bring up the Cloudant dashboard. The database will be empty.

3

If you use a REST API Client to send a POST API call then the Application will insert 3 documents.

4

The documents inserted can be seen by sending the GET REST API call.

5

The nice part of Cloudant DB is that you can use the URL to see your database. If you refresh your screen you should see the “books” database added. Clicking this database you should see the 3 documents that have been added

6

If you click “Edit doc” you should see the details of the document

7

5) Updating a document

The process to update a document in the database is shown below

// Update book3
db.get('book3', function(err, response) {
console.log(response);
return db.put({
_id: 'book3',
_rev: response._rev,
author: response.author,
Title : 'The da Vinci Code',
});
}, function(err, response) {
if (err) {
console.log("error " + err);
} else {
console.log("Success " + response);
}
});

This is performed with a PUT REST API call

8

The updated list is shown below

9

This can be further verified in the Cloudant DB dashboard for book3.

10

6) Deleting a document

The code to delete a document in PouchDB is shown below

//Deleting document book1
db.get('book1', function(err, doc) {
db.remove(doc, function(err, response) {
console.log(err || response);
});
});

The REST calls to delete a document and the result  are shown below

11

12

Checking the Cloudant dashboard we see that only book2 & book3 are present and book1 has been deleted

13

7) Displaying documents in the database

The code for displaying the list of documents is shown below

var docs = db.allDocs(function(err, response) {
val = response.total_rows;
var details = "";
j=0;
for(i=0; i < val; i++) {
db.get(response.rows[i].id, function (err,doc){
j++;
details= details + JSON.stringify(doc.Title) + " by  " +  JSON.stringify(doc.author) + "\n";
// Kludge because of Node.js asynchronous handling. To be fixed - T V Ganesh
if(j == val) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.write(details);
res.end();
console.log(details);
}
}); // End db.get
} //End for
}); // End db.allDocs

If you happened to notice, I had to use a kludge to work around Node.js’ idiosyncracy of handling asynchronous calls. I was fooled by the remarkable similarity of Node.js & hence  javascript to C language that I thought functions within functions would work sequentially. However I had undergo much grief  trying to get Node.js to work sequentially. I wanted to avoid the ‘async’ module but was unsuccessful with trying to code callbacks. So the kludge! I will work this out eventually but this workaround will have to do for now!

As always you can use the “Files and Logs” in the Bluemix dashboard to get any output that are written to stdout.

Note: As always I can’t tell how useful the command
'cf  logs <application name> -- recent is for debugging.

Hope you enjoyed this Cloud Medley of Bluemix, Cloudant and Node.js!

Disclaimer: This article represents the author’s viewpoint only and doesn’t necessarily represent IBM’s positions, strategies or opinions

You may also like
1. Brewing a potion with Bluemix, PostgreSQL & Node.js in the cloud
2.  A Bluemix recipe with MongoDB and Node.js
3.  Spicing up IBM Bluemix with MongoDB and NodeExpress
4.  Rock N’ Roll with Bluemix, Cloudant & NodeExpress


Find me on Google+

Technological hurdles: 2012 and beyond


Published in Telecom Asia, Jan 11,2012 – Technological hurdles – 2012 and beyond

You must have heard it all by now – the technological trends for 2012 and the future. The predictions range over BigData, cloud computing, internet of things, LTE, semantic web, social commerce and so on.

In this post, I thought I should focus on what seems to be significant hurdles as we advance to the future. So for a change, I wanted to play the doomsayer rather than a soothsayer. The positive trends are bound to continue and in our exuberance we may lose sight of the hurdles before us. Besides, “problems are usually opportunities in disguise”. So here is my list of the top issues that is facing the industry now.

Bandwidth shortage: A key issue of the computing infrastructure of today is data affinity, which is the result of the dual issues of data latency and the economics of data transfer. Jim Gray (Turing award in 1998) whose paper on “Distributed Computing Economics” states that that programs need to be migrated to the data on which they operate rather than transferring large amounts of data to the programs. In this paper Jim Gray tells us that the economics of today’s computing depends on four factors namely computation, networking, database storage and database access. He then equates $1 as follows

One dollar equates to

= 1 $

≈ 1 GB sent over the WAN

≈ 10 Tops (tera cpu operations)

≈ 8 hours of cpu time

≈ 1 GB disk space

≈ 10 M database accesses

≈ 10 TB of disk bandwidth

≈ 10 TB of LAN bandwidth

As can be seen from above breakup, there is a disproportionate contribution by the WAN bandwidth in comparison to the others.  In others words while the processing power of CPUs and the storage capacities have multiplied accompanied by dropping prices, the cost of bandwidth has been high. Moreover the available bandwidth is insufficient to handle the explosion of data traffic.

In fact it has been found that  the “cheapest and fastest way to move a Terabyte cross country is sneakernet (i.e. the transfer of electronic information, especially computer files, by physically carrying removable media such as magnetic tape, compact discs, DVDs, USB flash drives, or external drives from one computer to another).

With the burgeoning of bandwidth hungry applications it is obvious that we are going to face a bandwidth shortage. The industry will have to come with innovative solutions to provide what I would like to refer as “bandwidth-on-demand”.

The Spectrum Crunch: Powerful smartphones, extremely fast networks, content-rich applications, and increasing user awareness, have together resulted in a virtual explosion of mobile broadband data usage. There are 2 key drivers behind this phenomenal growth in mobile data. One is the explosion of devices-smartphones, tablet PCs, e-readers, laptops with wireless access. The second is video. Over 30% of overall mobile data traffic is video streaming, which is extremely bandwidth hungry. All these devices deliver high-speed content and web browsing on the move. The second is video. Over 30% of overall mobile data traffic is video streaming, which is extremely bandwidth hungry. The rest of the traffic is web browsing, file downloads, and email

The growth in mobile data traffic has been exponential. According to a report by Ericsson, mobile data is expected to double annually till 2015. Mobile broadband will see a billion subscribers this year (2011), and possibly touch 5 billion by 2015.

In an IDATE (a consulting firm) report,  the total mobile data will exceed 127 exabytes (an exabyte is 1018 bytes, or 1 mn terabytes) by 2020, an increase of over 33% from 2010).

Given the current usage trends, coupled with the theoretical limits of available spectrum, the world will run out of available spectrum for the growing army of mobile users. The current spectrum availability cannot support the surge in mobile data traffic indefinitely, and demand for wireless capacity will outstrip spectrum availability by the middle of this decade or by 2014.

This is a really serious problem. In fact, it is a serious enough issue to have the White House raise a memo titled “Unleashing the Wireless Broadband Revolution”. Now the US Federal Communication Commission (FCC) has taken the step to meet the demand by letting wireless users access content via unused airwaves on the broadcast spectrum known as “White Spaces”. Google and Microsoft are already working on this technology which will allow laptops, smartphones and other wireless devices to transfer in GB instead of MB thro Wi-Fi.

But spectrum shortage is an immediate problem that needs to be addressed immediately.

IPv4 exhaustion: IPv4 address space exhaustion has been around for quite some time and warrants serious attention in the not too distant future.  This problem may be even more serious than the Y2K problem. The issue is that IPv4 can address only 2^32 or 4.3 billion devices. Already the pool has been exhausted because of new technologies like IMS which uses an all IP Core and the Internet of things with more devices, sensors connected to the internet – each identified by an IP address. The solution to this problem has been addressed long back and requires that the Internet adopt IPv6 addressing scheme. IPv6 uses 128-bit long address and allows 3.4 x 1038 or 340 trillion, trillion, trillion unique addresses. However the conversion to IPv6 is not happening at the required pace and pretty soon will have to be adopted on war footing. It is clear that while the transition takes place, both IPv4 and IPv6 will co-exist so there will be an additional requirement of devices on the internet to be able to convert from one to another.

We are bound to run into a wall if organizations and enterprises do not upgrade their devices to be able to handle IPv6.

Conclusion: These are some of the technological hurdles that confront the computing industry.  Given mankind’s ability to come up with innovative solutions we may find new industries being spawned in solving these bottlenecks.

Find me on Google+

Technologies to watch: 2012 and beyond


Published in Telecom Asia – Technologies to watch:2012 and beyond

Published in Telecoms Europe – Hot technologies for 2012 and beyond

A keen observer of the technological firmament, today, will observe a grand spectacle of diverse technological events. Some technological trends will blaze a trail and will become trend setters while others will vanish without a trace. The factors that make certain technologies to endure in comparison to others could be many, ranging from pure necessity to a coolness factor, from innovativeness to a cost factor.  This article looks at some of the technologies that are certain to be trail blazers in the years to come

Software Defined Networks (SDNs):  Software Defined Networks (SDNs) are based on the path breaking paradigm of separating the control of a network flow from the actual flow of data. SDN is the result of pioneering effort by Stanford University and University of California, Berkeley and is based on the Open Flow Protocol and represents a paradigm shift to the way networking elements operate. Software Defined Networks (SDN) decouples the routing and switching of the data flows and moves the control of the flow to a separate network element namely, the Flow controller.   The motivation for this is that the flow of data packets through the network can be controlled in a programmatic manner. The OpenFlow Protocol has 3 components to it. The Flow Controller that controls the flows, the OpenFlow switch and the Flow Table and a secure connection between the Flow Controller and the OpenFlow switch. Software Define Networks (SDNs) also include the ability to virtualize the network resources. Virtualized network resources are known as a “network slice”. A slice can span several network elements including the network backbone, routers and hosts. The ability to control multiple traffic flows programmatically provides enormous flexibility and power in the hands of users.  SDNs are bound to be the networks elements of the future.

Smart Grids: The energy industry is delicately poised for a complete transformation with the evolution of the smart grid concept. There is now an imminent need for an increased efficiency in power generation, transmission and distribution coupled with a reduction of energy losses. In this context many leading players in the energy industry are coming up with a connected end-to-end digital grid to smartly manage energy transmission and distribution.  The digital grid will have smart meters, sensors and other devices distributed throughout the grid capable of sensing, collecting, analyzing and distributing the data to devices that can take action on them. The huge volume of collected data will be sent to intelligent device which will use the wireless 3G networks to transmit the data.  Appropriate action like alternate routing and optimal energy distribution would then happen. Smart Grids are a certainty given that this technology addresses the dire need of efficient energy management. Smart Grids besides managing energy efficiently also save costs by preventing inefficiency and energy losses.

The NoSQL Paradigm: In large web applications where performance and scalability are key concerns a non –relational database like NoSQL is a better choice to the more traditional relational databases. There are several examples of such databases – the more reputed are Google’s BigTable,   HBase, Amazon’s Dynamo, CouchDB  & MongoDB. These databases partition the data horizontally and distribute it among many regular commodity servers.  Accesses to the data are based on get(key) or set(key, value) type of APIs. Accesses to the data are based on a consistent hashing scheme for example the Distributed Hash Table (DHT) method. The ability to distribute data and the queries to one of several servers provides the key benefit of scalability. Clearly having a single database handling an enormous amount of transactions will result in performance degradation as the number of transaction increases. Applications that have to frequently access and manage petabytes of data will clearly have to move to the NoSQL paradigm of databases.

Near Field Communications (NFC): Near Field Communications (NFC) is a technology whose time has come. Mobile phones enabled with NFC technology can be used for a variety of purposes. One such purpose is integrating credit card functionality into mobile phones using NFC. Already the major players in mobile are integrating NFC into their newer versions of mobile phones including Apple’s iPhone, Google’s Android, and Nokia. We will never again have to carry in our wallets with a stack of credit cards. Our mobile phone will double up as a Visa, MasterCard, etc. NFC also allows retail stores to send promotional coupons to subscribers who are in the vicinity of the shopping mall. Posters or trailers of movies running in a theatre can be sent as multi-media clips when travelling near a movie hall. NFC also allows retail stores to send promotional coupons to subscribers who are  in the vicinity of the shopping mall besides allowing exchanging contact lists with friends when they are close proximity.

The Other Suspects: Besides the above we have other usual suspects

Long Term Evolution (LTE): LTE enables is latest wireless technology that enables wireless access speeds of up to 56 Mbps. With the burgeoning interest in tablets, smartphones with the countless apps LTE will be used heavily as we move along. For a vision of where telecom is headed, do read my post ‘The Future of Telecom“.

Cloud Computing: Cloud Computing is the other technology that is bound to gain momentum in the years ahead. Besides obviating the need for upfront capital expenditure the cloud enables quick and easy deployment of applications. Moreover the elasticity of the cloud will make it irresistible to large enterprises and corporations.

The above is a list of technologies to watch as create new paths and blaze new trails. All these technologies are bound to transform the world as we know it and make our lives easier, better and more comfortable. These are the technologies that we need to focus on as we move bravely into our future. Do read my post for the year 2011 “Technology Trends – 2011 and beyond

Find me on Google+

Cache-22


If you want performance you need to partition data. If you partition data you will not get performance! That sounded clever but is it true? Well, it can be if the architecture of your application is naïve.

The problem I am describing here is when there is a need to partition data across multiple geographical regions. Partitioning data essentially spreads data among several servers resulting in fast accesses. But when the data is spread across large geographical distances then this will result in significant network latencies. This is something that cannot be avoided.

Memcached is a common technique to store commonly read data into in-memory caches preventing frequent dips to the database. Memcached accesses data through “gets” and updates data through “sets”. Data is accessed based on a key which is hashed to one of several participating servers. Thus memcached distributes the data among several participating servers in a server list.  Reads and writes are of the order of O(1) and extremely fast. This works fine as long as servers belong to a single region or if the data center is in the same region. The network latencies will be low and the latency of the application will not be severely affected.

Now consider a situation where the memcached servers have to be distributed to multi-region data centers. While this is an excellent scheme for Disaster Recovery (DR) it introduces its own set of attendant problems.

Memcached will hash the entire data set and distribute it over the entire server list. Now “gets” of data from one geographical region to another will have significant latency. Since the laws of physics mandate that nothing can exceed the speed of light, we will be stuck with appreciable latencies for inter-region reads and writes.  So while a multi-region deployment provides for geographical resiliency it does introduce issues of latency and degraded throughput.

So what is the solution? One possible solution is to replicate the data across the regions. The solution to this problem is to replicate data in all the regions.  One technique that I can think of is to have the application to implement “local reads & global writes”.  This technique provides for the AP part of the CAP Theorem. The CAP theorem states that it is impossible to completely provide Consistency, Availability and Partition tolerance to distributed application. The “local reads & global writes” method will assure availability and partition tolerance while providing for eventual consistency.

In this technique, updates are done both on local servers along with asynchronous writes to all data centers. The writes are hence global in nature. The updates will not wait for all writes to complete before moving along. However reads will be local ensuring that the latency is low. Data reads based on data proximity will ensure that latency is really low.

Since writes are asynchronous the data will tend to be “eventually consistent” rather than being “strongly consistent” but this is a tradeoff that can be taken into account. Ideally it will be essential to implement the quorum protocol along with the “local reads & global writes” technique to ensure that you read your writes.

The application could have a modified quorum protocol such that R+ W > N where R is the number of data reads and W is the number of writes to servers and N is the total number of servers in the memcached server list.

Similar technique has been used in Cassandra & CouchDB etc.

With the “local reads & global writes” technique it is possible keep the latencies within reasonable limits since data reads will be based on proximity. Also replication the data to all regions will also ensure that eventually all regions will have a consistent view of the data.

Find me on Google+

Eliminating the Performance Drag


Nothing is more important to designers and architects of web applications than performance.   Everybody dreams of applications that can roar and spit fire! Attributes that are most sought after in large scale web applications are low latency and high throughput.  Performance is money. So it is really important to work the design and churn the best possible design.

Squeezing performance from your application is truly an art. The challenges and excitement are somewhat similar to what a race car designer or an aeronautical engineer would experience in reducing the drag on the machine while increasing the thrust of the engine. Understanding the physics of program execution is truly a rare art.

This post will attempt to touch upon some key aspects that have to be looked at a little more closely to wring the maximum performance from your application.

The Store Clerk Pattern: This is an oft quoted analogy to describe the relation between latency and throughput. In this example the application can be likened to retail store with store clerks checking out customers who queue with their purchases. Assume that there are 5 store clerks and each take 1sec to process a customer. If there 5 customers the response time for each customer will be 1 sec and a total throughput of 5 customers/sec can be achieved. However if a 6th customer enters then he/she will have to wait 1 sec in the queue while 5 customers are being handled by the store clerks. For this customer the response time will be 1 sec (waiting) + 1 sec (processing) or a total response time of 2 secs. It can be readily seen that as more customers queue up the wait times will increase and the latency will keep increasing. Also it has to be noted that the throughput will plateau at 5 customers/sec and cannot go above that.

The first point of attack in improving performance is to identify the store clerk pattern in your application. Identify where you application has a queue of incoming requests and a thread pool to address these requests as they get processed.  The latency and throughput are governed by the number of parallel thread (store clerks) who process and the wait times in the queue. One naïve technique is to increase the number of threads in the pool or increase the number of pools. However this may be limited by the CPU and system limitations. What is extremely important is to identify what factors contribute to the processing of each request. While processing, do threads need to access and retrieve data? Do they have to make API or SQL calls?  Identify what is the worst case performance of the thread and determine if this worst case can be improved by a different algorithm. So the key in the store clerk pattern is a) to optimize the threads in the pool and b) to improve the worst case performance of processing the request.

Resource Contention: This is another area of the application that needs to be looked at very closely. It is quite likely that data is being shared by many threads. Access to shared data is going to involve locks and waits. Identify and determine the worst case wait for threads. Is your application read-heavy and write-light or write-heavy and read-light? In the former situation it may be worthwhile to use a Reader-Writer locking algorithm in which many number of readers can simultaneously read data by updating a semaphore. However a write, which happens occasionally, will result in locking the resource and cause the  wait of all reader threads. However if the application is write-heavy then other alternatives like message based locking could be used. Clearly thread waits can be a drain on performance.

Algorithmic changes: If there are modules that perform enormous number of insertions, updates or deletions on data in memory then this has to be looked at closely. Determine the type of data structures or STLs being used.  The solution is to be able to re-organize data so that the operation happens much more efficiently ideally reaching towards   O (1). Maybe the data may need to be organized as hash map of lists or a hash map pointing to n-ary trees instead of a list of lists. This will really require deep thought and careful analysis to identify the best possible approach that provides the least possible times for the most common operation.

From Relational to NoSQL : Though the transition from a RDBMS to a NoSQL databases like Cassandra, CouchDB  etc would really be based on scalability, the ability to partition data horizontally and hash the key for accesses, updations and deletions will be really fast and is an avenue that is worth looking into.

Caching : This is a widely used technique to reduce frequent SQL queries to the database. Data that is commonly used can be cached in-memory. One such technique is to use memcached.  Memcached caches data across several servers. Access to data is through hashing and is of the order of O (1). If there is a miss of data in the memcached server’s then data is accessed through a SQL query. Access to data is through simple get, put methods in which the key is hashed to identify the server in which the data is stored.

Profiling : The judicious use of profiling tools  is extremely important in  optimizing performance. Tools like valgrind truly help in identifying bottlenecks. Other tools also help in monitoring thread pools and identifying where resource contention is taking place. It may also be worthwhile to timestamp different modules and collect data over several thousand runs, average them and pin-point trouble spots.

These are some technique that can be used for optimizing performance. However improving performance beyond a point will really depend on being able to visualize the application in execution and divining problem hot spots.

Find me on Google+

To Hadoop, or not to Hadoop


Published in Telecom Asia, Jul 23, 2012 – To Hadoop or not to Hadoop

To Hadoop, or not to Hadoop: that is the question.  In many  of my discussions I find that Hadoop with its implementation of Map-Reduce crops  ups time and time and again.  To many Map-Reduce is the panacea for all kinds of performance evils. It appears that somehow using the Map-Reduce in your application will magically transform your application into a high performing, screaming application.

The fact is the Map-Reduce algorithm is applicable to only certain class of problems.  Ideally it is suited to what is commonly referred to as “embarrassingly parallel” class of problems.  These are problems that are inherently parallel for e.g. the creation of inverted indices from web crawled documents.

Map-Reduce is an algorithm that has popularized by Google. The term map –reduce actually originates from Lisp in which the “map” function takes a list of arguments and performs the same operation on all of its argument. The” reduce” then applies a common criterion to pick a reduced set of values from this list. Google uses the Map-Reduce to create an inverted index. An inverted index basically provides a mapping of a word with the list of documents in which it occurs. This typically happens in two stages. A set of parallel “map” tasks take as input documents, parse them and emits a sequence of (word, document id) pairs. In other words, the map takes as input a key value pair (k1, v1) and maps it into an intermediate (k2, v2) pair. The reduce tasks take the pair of (word, document id), reduce them, and emit a (word, list {document id}). Clearly applications, like the inverted index, make sense for the Map-Reduce algorithm as several mapping tasks can work in parallel on separate documents. Another typical application is counting the occurrence of words in documents or the number of times a web URL has been hit from a traffic log.

The key point in all these typical class of problems is that the problem can be handled in parallel. Tasks that can execute independently besides being inherently parallel are eminently suitable for Hadoop processing. These tasks work on extraordinarily large data sets.  This is also another criterion for Hadoop worthy applications.

Hadoop uses a large number of commodity servers to execute the algorithm. A complementary technology along with Hadoop is the Hadoop Distributed File System (HDFS). The HDFS is a storage system in which the input data is partitioned across several servers. Google uses the Google File System (GFS) for its inverted index and page ranking algorithm.

Typical applications that are prime candidates for Hadoop are those applications that have to operate on terabytes of data. Also the additional requirement is that the application can run some sort of transformation or “map” algorithm on the data independently and produce an intermediate result for the “reduce” part of the algorithm. The “reduce” essentially applies some criteria on the intermediate sets to produce a zero or 1 output.

However several real world applications do not fall into this category where we can parallelize the execution of the application. For example an e-retail application which allows users to search for book, electronic products, add to shopping card and finally make the purchase, in my opinion,  is not really suited for Hadoop as each individual transaction is separate  and typically has its own unique sequential flow. An Ad serving application also is not ideally suited for Hadoop. Each individual transaction has its own individual flow in time.

However on closer look we can see that there are certain aspects of the application that are conducive to Hadoop based Map-Reduce algorithm. For e.g. if the application needs to search through large data sets for example the e-retail application will have tens of thousands of electronic products and books from different vendors with their own product id.  We could use Hadoop to pre-process these large amounts of data, classify and create smaller data sets which the e-retail or other application can use. Hadoop is a clear winner when large data sets have to searched, sorted or some subset selected from.

In these kinds of applications Hadoop has a clear edge over other types as it can really crunch data. Hadoop is also resilient to failures and is based on the principle of “data locality” which allows the “map” or “reduce” to use data stored locally on its sever or in a neighboring machine.

Hence while Hadoop is no silver bullet for all types of applications if due diligence is performed we can identify aspects of the application which can be crunched by Hadoop.

Find me on Google+

The Future is C-cubed: Computing, Communication and the Cloud


We are the on the verge of the next great stage of technological evolution. The trickle of different trends clearly point to what I would like to term as C-cubed (C3) representing the merger of computing technologies, communication advances and the cloud.

There are no surprises in this assessment. Clearly it does not fall into the category of Chaos theories’ “butterfly effect” where a seemingly unrelated cause has a far-reaching effect, typically the fluttering of a butterfly in Puerto Rico is enough to cause an earthquake in China.

The C-cubed future that seems very probable is based on the advances in mobile broadband, advances in communication and the emergence of cloud computing.  A couple of years back Scott McNealy of Sun Microsystems believed that the “network is the computer”. Now with the introduction of Google’s Chrome book this trend will soon catch on. In fact I can easily visualize a ubiquitous device which I would like to call as the “cloudbook”.

The cloudbook would be a device that would resemble a tablet like the iPad, Playbook etc but would carry little or no hard disk.  Local storage will be through USB devices or SD-Cards which these days come with large storage in the range of 80GB and above. The Cloud book would have no operating system. It would simply have a bootstrap program which will allow the user to choose from several different Operating Systems (OS) namely Window’s, Linux, Solaris and Mac etc which will execute on the cloud. All applications will be executed directly on the cloud. The user will also store all his programs and data on the cloud.  Some amount of offline storage will be possible in portable storage devices like the memory stick, SD card etc.

The cloudbook will be a ubiquitous device.  It will access the internet through mobile broadband.  The access could be through a GPRS, WCDMA or a LTE connection. With the blazing speeds of 56 Mbps promised by LTE the ability to access the public cloud for executing programs and for storing of data is extremely feasible. Access should be almost instantaneous. Using the mobile broadband for access and the cloud for computing and storage will be the trend in the future.

Besides its use for computing, the cloudbook will also be used for making voice or video calls. This is the promise of IP Multimedia Systems (IMS) technology. IMS is a technology that has been in the wings for quite some time. IMS technology envisages an all-IP Core Network that will be used for transporting voice, data and video. As the speeds of the IP pipes become faster and the algorithms to iron out QOS issues are worked out the complete magnificence of the vision of IMS will become a reality and high speed video applications will become common place.

The cloudbook will use the WCDMA, 3G, network to make voice and video calls to others. The 3G RNC or the 4G eNodeB’s will enable the transmission and reception of voice, data or video to and from the Core Network. LTE networks will either user Circuit Switched Fall Back (CSFB) or VOLTE (Voice over LTE) to transfer voice and video over either the 3G network or over the Evolved Packet Core (EPC).  In the future high speed video based calls and applications will be extremely prevalent and a device like the cloudbook will increase the user experience manifold.

Besides IMS also envisions Applications Server (AS) spread across the network providing other services like Video-on-Demand, Real-time multi player gaming. It is clear that these AS may actually be instances sitting off the public cloud.

Hence the future clearly points to a marriage of computing, communication and the cloud where each will have a symbiotic relationship with the other resulting in each other. The network can be visualized as one large ambient network of IMS Call Session Control Function Servers (CSCFs) , Virtualized Servers on the Cloud and Application servers (AS).

Mobile broadband will become commonplace and all computing and communication will be through 3G or 4G networks.

The future is almost here and the future is C-cubed (C3)!!!

Published in Telecom Asia, Jul 8 2011 – The Future is C-cubed

Find me on Google+