• Call: +1 (858) 429-9131

Posts Tagged ‘cassandra’

Mapreduce using Hadoop + pig/hive on AWS EC2 hadoop cluster

This article discuss about running mapreduce jobs using the apache tools called pig and hive.Before we can process the data we need to upload the files to be processed to HDFS/S3.  We recommend uploading to hdfs and keeping the important files in s3 for backup is a better practice. s3 is easily accessible from commandline using tools like s3cmd. HDFS is a failover cluster filesystem which provides enough protection to your data over instance failures.

Mapreduce

MapReduce is a programming model and an associated implementation for processing and generating large data sets. We can specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.

The main steps hadoop takes to run a job are

  1. The client, which submits the MapReduce job.
  2. The jobtracker, which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker.
  3. The tasktrackers, which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker.
  4. The distributed filesystem (normally HDFS), which is used for sharing job files between the other entities.

Hadoop Map/Reduce is very powerful, but

o   Requires a Java Programmer.

o   Harder to write and also time consuming.

o   Difficult to update frequently.

A solution is to Run jobs using pig(Piglatin)/hive(HiveQL).

Pig

• An engine for executing programs on top of Hadoop

• It provides a language, Pig Latin, to specify these programs

Pig has Two main parts:

– A high level language to express data analysis

– Compiler to generate mapreduce programs (which can run on top of Hadoop)

Pig Latin is the name of the language with which Pig scripts are written. Pig also provides an interactive shell for executing simple commands, called Grunt. Pig Latin is a high level language. Pig runs on top of Hadoop. It collect the data for processing from Hadoop HDFS filesystem and Submit the jobs to the Hadoop mapreduce system.

A sample mapreduce job (like a Hello World program) using pig is given below

It is assumed that you are on one of the machines which is a part of a hadoop cluster having NameNode/DataNode as well as JobTracker/TaskTracker setup.

We will be executing piglatin commands using grunt shell. Switch to hadoop user first .

Consider we have a file ‘users’ on our local filesystem which contain data to be processed.First we have to upload it to hdfs. Then

# pig -x mapreduce

this command will take you to grunt shell. Pig Latin statements are generally

organized in the following manner:

A LOAD statement reads data from the file system.Then we process the data.And writes output to the file system using STORE statement. A DUMP statement displays output to the screen.

grunt> Users = load ‘users’ as (name, age);

grunt> Fltrd = filter Users by age >= 18 and age <= 25;

grunt> Pages = load ‘pages’ as (user, url);

grunt> Jnd = join Fltrd by name, Pages by user;

grunt> Grpd = group Jnd by url;

grunt> Smmd = foreach Grpd generate group, COUNT(Jnd) as clicks;

grunt> Srtd = order Smmd by clicks desc;

grunt> Top5 = limit Srtd 5;

grunt> store Top5 into ‘top5sites’;

We can also view the progress of the job through the web interface http://<ipaddress of jobtracker machine>:50030.

Tools like PigPen (an eclipse plugin) are available  that helps us create pig-scripts, test them using the example generator and then submit them to a hadoop cluster.

There is another tool called oozie – Oozie is a server based Workflow Engine specialized in running workflow jobs with actions that run Hadoop Map/Reduce and Pig jobs.

Pig tasks can be modeled as a workflow in oozie. These are deployed to the Oozie server using a command line utility. Once deployed, the workflows can be started and manipulated as necessary using the same utility. Once the workflow is started Oozie will run through each flow.. The web console for Oozie server can be used to monitor the progress of various workflow jobs being managed by the server.

Hive

 

Pig, was causing some slowdowns at Facebook company as it needed training to bring business intelligence users up to speed. So the development team decided to write Hive which has an SQL like syntax.

Apache Hive is a data warehouse infrastructure built on top of Apache Hadoop. It provides tools for querying and analysis of large data sets stored in Hadoop files. Hive defines a simple SQL-like query language, called HiveQL, that enables users familiar with SQL to query the data. Also it allows custom mappers and reducers to perform more sophisticated analysis that may not be supported by the built-in capabilities of the language.

Some of the queries in HiveQL are given below, which is very similar to the SQL.

# show tables;

# describe <tablename>;

# SELECT * FROM <tablename> LIMIT 10;

#  CREATE TABLE table_name

#  ALTER TABLE table_name RENAME TO new_table_name

#  DROP TABLE table_name

NoSQL databases like Cassandra provide support for hadoop. Cassandra supports running Hadoop MapReduce jobs against the Cassandra cluster. With proper cluster configuration, MapReduce jobs can retrieve data from Cassandra and then output results either back into Cassandra, or into a file system.

Cassandra Cluster on AWS EC2 with Cassandra 7.x and ubuntu 10.04

Cassandra is a highly scalable, eventually consistent, distributed, structured key-value store. Cassandra brings together  Dynamo’s fully distributed design  and Bigtable’s ColumnFamily-based data model.

In a cluster, Cassandra nodes exchange information about one another using a mechanism called Gossip. The nodes in a cluster needs to know one another.  Nodes named “seed”s are the centre of this communication mechanism. It’s customary to pick a small number of relatively stable nodes to serve as your seeds. Do make sure that each seed also knows of at least one other. Having two nodes is what is preferred.

Lets have a look at how we can bring a Cassandra cluster up with Cassandra 7.x on ubuntu 10.04

First of all you have to install the java/jdk .  As that is out of scope for our discussion please do it on your own and let’s start with cassandra.

Add the following repositories to your apt sources list

vim /etc/apt/sources.list.d/cassandra.list

[bash]deb http://www.apache.org/dist/cassandra/debian 07x main
deb-src http://www.apache.org/dist/cassandra/debian 07x main[/bash]

Import the following keys and add it to apt-key

[bash]

gpg –keyserver keyserver.ubuntu.com –recv-keys 4BD736A82B5C1B00

gpg –export –armor 4BD736A82B5C1B00 | sudo apt-key add –

gpg –keyserver keyserver.ubuntu.com –recv-keys F758CE318D77295D

gpg –export –armor F758CE318D77295D | sudo apt-key add –

[/bash]

Execute

[bash]apt-get update[/bash]

and make sure that no error is there with accessing the packages.

Installing cassandra on all nodes(machines) with  which we intend to build the cluster.

[bash]apt-get install cassandra  –yes[/bash]

Now edit the configuration file for Cassandra

vim /etc/cassandra/cassandra.yaml

Here  I will discuss the important directives that has to be edited for the cluster to take effect

initial_token:

eg:  initial_token:  136112946768375385385349842972707284582

This parameter determines the position of each node in the Cassandra ring. Initial token for the first seed node should be ‘0’.Here is a simple Python script that helps to calculate the token values.

[bash]

#! /usr/bin/python

import sys

if (len(sys.argv) > 1):

num=int(sys.argv[1])

else:

num=int(raw_input(“How many nodes are in your cluster? “))

for i in range(0, num):

print ‘node %d: %d’ % (i, (i*(2**127)/num))

[/bash]

executing this script will prompt you for the no. of nodes in your cluster. Then it will output the initial tokens for each node.

For eg: Consider a 2 node cluster, the tokens will be

node 0: 0

node 1: 85070591730234615865843651857942052864

auto_bootstrap: false

You can set this to false as we are just going to start the cluster for the first time.

seeds:

-< ip address >

As I told you earlier, the seeds mentioned here will control the communication between the nodes.

You can give the ips of the two nodes here  for which you assigned the first two initial tokens generated by the script above.

Example:

Seeds:

-192.168.1.10

-192.168.1.13

This seed entries should be the same on all nodes of the cluster.

listen_address:

&

rpc_address:

You can leave both empty.

Starting  the Cassandra

For starting Cassandra you can either use an init script/ or the command “cassandra”. Here I will use the second option.

As Cassandra service was started during the installation some values will be stored in /var/lib/cassandra/data directory. So Before starting Cassandra follow these steps.

[bash]

1)      /etc/init.d/cassandra stop

2)      rm –rf  /var/lib/cassandra/data

3)      mkdir /var/lib/cassandra/data

[/bash]

After doing these steps on all the nodes please run the following  command to start Cassandra on each node starting from the seed node 1

[bash]# cassandra &[/bash]

After starting Cassandra on all the nodes you can check the cluster status using the following command

[bash]nodetool -h <ip of the node >  -p 8080 ring[/bash]

or

[bash]nodetool -h localhost -p 8080 ring[/bash]