Thursday 25 December 2014

Apache Spark a brief overview

Apache Spark a PHD resarch project at UCBerkley came into limelight when it broke the record of sorting petabyte of data recently.
Various Sorting Records.
Databrick



What is Spark ?
Apache Spark™ is a fast cluster computing framework and general engine for large-scale data processing.
Spark Goals:
Generality : Diverse workloads, operators, jobsizes.
Latency:Low Latency
Fault Tolerance


Spark supports Hadoop, Amazon S3, Cassandra, cluster management tools like YARN and Mesos. Spark does more In memory data processing as compared to DiskBased processing as in Hadoop.

Spark stack comes bundled with tools like Spark SQL, MLlib, Spark Streaming and GraphX.


  1. SparkSQL: Unified access to structured data , provides compatibility with Apace Hive and Standard Connectivity to tools like JDBC and ODBC.
  2. Spark Streaming:  For Scalable fault tolerant Streaming applications and spark can run in both batch and interactive mode.
  3. MLlib:Scalable Machine Learning library.
  4. GraphX:Large Scale Graph Processing Framework.
Learn More about Spark


Spark vs Hadoop

Spark is 100X times faster then Hadoop.
The speed can be attributed to the fact that Spark keeps the intermediate data cached in local JVM.  Hadoop on the other hand in the name of Fault Tolerance write the intermediate data on to the disk and disk is expensive.
                                                                                                               *Image from Spark



Spark doesnt replace anything in Hadoop Ecosystem rather it offers a readable, testable way to write programs freeing us from the painful Map Reduce jobs. MR model is unsuitable for Iterative algorithms. MR jobs are pain to program too. Although there are tools to reduce our efforts in writing MR jobs like Hive, Cascading etc but internally they call MR jobs thus not improving the performance.

Spark programming model
The main abstraction for computation in Spark is Resilient(Can start automatically) Distributed Datasets.

What is RDD ? 
RDD(Spark Paper) :
RDD's are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.


Spark transformations applies same data operation to many data items. This results in better Fault tolerance as only the lineage of transformation is logged rather then the actual data. Also if some of RDD's are lost it has enough information to how it is derived from other  RDD's.
RDD can be created through Transformations(map,filter,join). Spark creates Direct Acyclic Graph of these transformation . Once the RDD's are defined through transformations , actions can be applied on them.
Actions are application that returns a value(count,collect and save). In Spark RDD's can be stored in disk by calling persist.
Since it is just an RDD it can be queried via SQL Interface, ML algos etc.

In short
User Program => 
Create Spark Context
sc = new SparkContest
Create Distributed Datasets called RDD's 
Perform Operations.

Inside spark context act as client and master per application.

Block tracker => What is in memory what is on Disk ?
Shuffle => Shuffle operation like Groupby

Scheduler talk through cluster manager talks to a worker.
Contains Block Manager for Block Mgmt.
Recieves task that run in thread pools.
Task can talk to HDFS.

Saturday 4 October 2014

Data Analysis Yahoo Finance Data

What the Script does :
=> Downloads historical Stock Prices for a company from Yahoo Finance website.
=> Calculates the 10 day moving average.
=> Generate the graph for the data


a) Necessary Imports:
1. Import Panda. Panda is a data analysis library of python.  Pandas has tool to read and write between in-memory data structures and different file formats. It has efficient data frame object for data manipulation with better indexing support.
2. Import Datetime: Provides classes to manipulate date and time objects. Api to convert between different file formats.
3. Import matplotlib: Python 2D plotting library , simple to use and genrates graphs, plots etc with few lines of code. 
4. Import Numpy : Scientific computing package for python,N-Dimensional array object ,Linear algebra related functions.
5. Import urlib: URL handling module for python
import pandas as pd
import pandas.io.data
from datetime import timedelta
import datetime as dt
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import matplotlib as mpl
import urllib.request
import numpy as numpy
from datetime import datetime
from matplotlib.pyplot import *
import matplotlib.dates as mdates

b) Create a class called Stock 
The main(Init) function should accept as parameter the company symbol, lookback period, window size,and the end date.
def __init__(self,symbol,lookback_period,window_size,end=datetime.date.today()): 
Suppose your lookback period is 100 then get prices for 100 days.
SO to get prices for 100 days subtract from the end date the lookback period.
But for that you need to use same format . 
In this example I have used timedelta.  Time delta helps you get the start date by specifying the number of days  from given date. Like your end date is today and you want stoc prices for last 100 days ,we can use timedelta.

start = end - timedelta(days=lookback_period)
c) Convert date into required format.
start_date = start.isoformat() 
d)  get the required Url for data analysis. 
url = "http://ichart.finance.yahoo.com/table.csv?s={0}".format(symbol)
url += "&a={0}&b={1}&c={2}".format(start_month,start_day,start_year)
url += "&d={0}&e={1}&f={2}".format(end_month,end_day,end_year)
e) Parse data
df = pd.read_csv(urllib.request.urlopen(url))
#get the adj close from csv
saved_column = df['Adj Close']
#get the matching date
y_data = df['Date']
f) Get the moving average
def movingaverage(self,interval, window_size):
window = numpy.ones(int(window_size))/float(window_size)
return numpy.convolve(interval, window, 'same')

Moving Average smooths price fluctuations by removing the noise. It computes the averages of a subset of full data set.

Moving Average Wiki 

g) Generate the graph
 x_data = x_points[0:70]
#get the moving average
y_av = self.movingaverage(saved_column,window_size)
#generate graph
figure("Plot of stocks")
x = [dt.datetime.strptime(d,"%Y-%m-%d").date() for d in x_data]
gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
gca().xaxis.set_major_locator(mdates.DayLocator())
plot(x,y_av,'r')
plot(x,close,'g')
gcf().autofmt_xdate()
xlabel("Date")
ylabel("adjusted close")
show()

GIT

Tuesday 23 September 2014

Hadoop a little deeper


Map Reduce :  Follows Master -  Slave Model 
Master :
NameNode
JobTracker 

Slave :
DataNode
Task Tracker


Dynamo/ Casandra => Peer to Peer

Client is neither master nor Slave.
=> submit map reduce task
=> describe how data to be processed.
=> retrieve data.

Hadoop Components  :

 
NameNode : 
=> Files are not stored in NameNode , it just contains filesystem meta data which points files to blocks 
=> Metadata also contains information such as DiskSpace, last access to NN, permissions . 
=> Name Node is rack aware. To what Rack data node is on.  
=> NameNode coordinates access to data node.



Data Node:
=> Manages Data 
=> Sends Heartbeat message to NN to say they are alive.
=> Comunicates with one another to replicate data, move and copy data around
=> Stores data as blocks.  

JobTracker: 
=> Manages job and resources in Hadoop. 
=> Client application submit MR request to Job Tracker
=> Schedule Client jobs and allocates task to Task Tracker.

 
Task Tracker: 
=> Slaves deployed at each machine. 
=> They follow instruction of Job Tracker and runs Map Reduce task
=> Handles movement of data between map and reduce phase.


Secondry NameNode:  
=> Name is a misnomer. 
=> It does all housekeeping task in HDFS. 
=> Namenode store all filesystem metadata in RAM. It doesnt have any capability to persist data to disk. 2ndNN sends message to NN every hour pull all the data from NameNode and merges into a file called Checkpoint.
 



 *Image taken from http://www.gigacom.com


Hadoop 1.**   => Hadoop 2.**:

1. Horizontal Scaling
2. Single Point of Failure for Name Node
3. Impossible to run Non Map Reduce tools because of tight coupling of JobTracker + MR
4. Does not support Multitanency
5. Job Tracker overburdened bcz of too much work.




1. Horizontal Scaling

NameNode all metadata stored in RAM of NameNode
RAM size is limited you cannot take it beyond certain point. 
Bottleneck after 4000 Nodes.

2. Single Point of Failure
No backup node if Namenode fails

3. Impossible to run Non Map Reduce tools because of tight coupling of JobTracker + MR
Only Map Reduce processing can be achieved
Realtime analytics , MPI difficult, No Graph processing
You cannot do in HDFS you have to move data out of HDFS. 
Only Batch processing in 1.** 


4. Multitanency :
Only 1 type of job at a time even if you run it from different application not possible.


Hadoop Component 2.** extra
1. Resource Manager
2. Namenode High Availability
3. YARN: Yet another resource Negotiator.


Instead of having single Name node multiple name nodes are there. Independent to each other. Adhere to specific namespace.

Both JobTracker and task Tracker removed in Hadoop 2 .
Job Tracker task : Resource Management & Job Scheduling was split into two components.below 

New Components in Hadoop2.


1. Resource Manager :  
=> Scheduler that allocates resources in the cluster to various running application. 
=> Schedules task based on Application Container. 

2. Application Manager
=> Launches task in containers. 
=> Starting Application Master container on failure.

3. Node Manager:  
=> Runs on each node
=> Follows orders of Resource Manager. 
=> Responsible for maintaining container 
=> manages resources of a single node.


Other Feautures:

Name Node HA:
Automatic failover and recovery for NameNode master survice.
2 Namenodes :   Active and Passive when one fails other take control.

Snapshots:
Point intime recovery for backup.

Federation :
Generic block storage layer.

Wednesday 10 September 2014

A * Star Algorithm

A* is a path finding/Graph Traversal  algorithm generally used in game programming to determine the shortest path to reach a particular destination following Best First Search approach. Can be called as an extension of Dijkstra and reduces the number of comparisons while guaranteeing optimality. Heuristic is used in A*. A* uses heuristic to determine most promising node.   






Consider a grid with each square numbered. As in the above figure , cross sign (Sq. 14) represents start point and cross sign (Sq 35) represents destination. All Blue bars in a square represents the block. 
Each square considered a node.
Some terminology used with A*
Node Data
H Value :  Heuristic Value
G Value : Movement Cost                
F Value : G + H Value
Parent :  Pointer Back to previous node.

List
Open Node : List of Nodes to be visited
Close Node :  Already Visited.

Task : Shortest Path from 14 to 35.
H Value : Distance of a Node from destination node in our case from Sq 35.
Precompute all H value

If H value is Zero A* behaves like Dijksatra Algorithm. From Node 17 H value is 3 as you can go from 17 to 23 to 29 leading to 35
For Node 25 H value is 5 as you go from node 25 to 26 to 27 to 28 to 29 leading to 35 , block nodes are also considered for precomputing H values.

G Value is movement cost from Start node to another node.
Consider Movement cost of 10 For Horizontal and Vertical and of 14 for diagonal movement.
Diagonal > Horizontal (Pythagorean theoram)
So to Move from 14 to 8 the cost is 10 and from 14 to 9 it is 14.
Each square has parents.
For Sq. 14 we need to get parent , which can be done through open list and close list.
Write now no node on Close or open list.

All Node parent to14.
CloseList : 14
Open List : 7,8,9,13,15,19,20,21
Calculate G Cost for all. for 8 it is for Node 14 it is 0 for Node 8 it is 10 so total is 5. For Node 9 it is 14.




The Numbers in Black represents the H value of Nodes ie Distance of Node to Node 35.
The numbers in Blue represent G Value Node 8 as G Value 10.
F Value is sum of G Value + H Value which is written in red for the nodes. (For node 7 it is 17 => 10+7)
Need to Use Node with smallest F Value.
So in our case square number 15 as it has F-Value 15 which is minimum. Node 20 too has value 15 and can be selected too.
Take Node 15 put in close list.
Now repeat process for Node 15.

Now Calculate all values for Node 15.
CloseList : 14 , 15
Open List : 7,8,9,13,19,20,21,10
We see Node 14 already on Close list dont do anything.
Consider Node 8 so we check if it is more faster to go to 8 from 14 or  from 15.
For Node 15 We see :
G Value for Node 15 was 10 . G Value for Node 8 considering the current node 14 is 14
So total Movement is 14+10 ie 24
If the total movement > current G value dont change values else
If G value wrt node 15 is less then G Value of Node 8 then re parent node  8 to node 15.

If(Parents G value+Current G < Parents New G){
Add new Block to close list.
New Block parent = Prev Block Parent.
}


This process continues till you are next to destination node.

Closed : 14,15,20
Open : 7,8,9,13,19,21,10,22,25,27
Keep Doing and then see neighbor is destination so make destination parented to the other node and you are done.
Trace back path from destination to source.
35 to 29 to 22 to 15 to 14.
Make sure you change the parent if you find new G value smaller then previous computed.



All Green Arrow denotes final position of main node parents. Red is the path from source to destination.

Shortest Path : 14 ->15->22->29->35

   

 
Useful links for the topic 

Wiki
Stanford Amits Page
Video  
Video
Video
Algo
 

Friday 11 July 2014

Hadoop-HDFS Code Analysis-1

I had hard time understanding the HDFS code, not only because it was too big and complex but also because of lack of documentation available.

1. Create File System
FileSystem is an abstract class extending Hadoops configured class.
Like a Normal file system it defines all File functions.
All File System that Hadoop use indirectly extends FileSystem class.
Which File System to be invoked depends on  Hadoop Configuration file.
 Suppose we specify hdfs://localhost:9000 which tells hadoop uses DistributedFileSystem.
This file can be invoked from core-default.xml
file:/// invokes LocalFileSystem

2.Client invokes Server
mkdirs function assume calls the DistributedFileSystems mkdir

public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    return mkdirsInternal(f, permission, true);
  }

mkdir() calls mkdirInternal which inturn uses variable : dfs which is object of DFSClient
Apache File Descritption

DFSClient can connect to a Hadoop Filesystem and perform basic file tasks. It uses the ClientProtocol to communicate with a NameNode daemon, and connects directly to DataNodes to read / write block data. Hadoop DFS users should obtain an instance of DistributedFileSystem, which uses DFSClient to handle filesystem tasks.

Initialize Method is called.

All operation on DistributedFileSystem transferred to DFSClient.

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats)
    throws IOException {

DFSclient method call forwarded to namenode and its assoicate variable rpcNameNode
Client Protocol provide all method through which methods from client to server can be invoked.
All method calls from rpcNamenode come down to Invokers invoke() method.
This way the calls are transferred to server side method calls.
Eventually it gets connected to Server class.
3 major components of server
Listener, Responder, Handler,


...

Wednesday 4 June 2014

Consensus Protocols : Overview

Consensus Problem :  Mutually Agreeing on a particular Value


In this I have Considered both Txn and Normal operations consensus . 


Synchronous System :
Bounded Message Delay
No timeouts

Asynchronous : 
Message delay finite but unbounded
Consensus not always possible

Correctness of Consensus Protocol :
Agreement : All Nodes agree on a value
Validity : Value proposed is Valid ie.. It is proposed by one of participant node.

Non Triviality* : There exist a accessible configuration in which the decision is to commit
Integrity*: Each (RM decides atmost once)

Termination: All nodes Eventually Terminates
*For Txn commit

Agreement can be between majority of nodes.
Database Commit protocol that always says Abbort is not Valid
Termination of messages exchanged so that the protocol is useful.

2PC : 2 Phase Commit: 

1)Coordinator : Contact all nodes, suggest value , gather response(Form of Vote)
2)If everyone agrees contact for committing else contact for abort.




IMP: Consensus here is not on value proposed but weather to accept or reject the value nor the value.
Not on what value should but rather weather to accept or reject value.

Real Time Example
Transfer money from one account to another
Move file between servers

TO check weather commit happened see log.

Phase 1: Coordinator Request
Coordinator request (REQUEST Message to all nodes)
Eg : Delete file from directory

Nodes receive requests execute tranxn locally
Write to local log Request ,Result,VOTE_COMMIT or VOTE_ABBORT
Send VOTE_COMMIT to cordinator

Coordinator receive vote VOTE_ABORT
Write GLOBAL_ABORT/COMMIT to Global Log and sends GLOBAL_ABORT/COMMIT to participant nodes.
No of messages : 3n where n is number of nodes. ( N - Value Propose, N-Vote , N -Commit/Abbort)

Failures: 
1)Fail Stop : Nodes Crash and Never recover
2)Fail Recover: Crash and recover after sometime
3)Byzantine: Divert from protocol specification .
Example an army of Soldiers, coordinator is a general . Agree on one point or fail. Some soldiers traitor :
All loyal traitors should agree
Small Number of traitors should not be able to trick loyal soldiers.

Case 1: Before messages exchanged server crashes. (No worries) bcz protocol never started.
Case 2: Some proposal messages sent but not all.

Some nodes received proposal starting 2PC ,and some havent recieved any proposal. If coordinator do not recover in time , nodes that have sent proposal keeps on waiting for things that never finished.
Protocol Blocked.

Have another node to become co-ordinator. When timeout occurs that node can finish task that co-ordinator started.

Contacts all nodes to find out to whom the nodes voted.
So all nodes has to keep their descision on persistent storage.

If one another participating node fails before recovery node has committed, cant be recovered.
Recovery node cant distinguish between all nodes having voted to commit or abort.
Coordinator is participant too and then it fails.
 
Cordinator logs the result in persistent storage.
Advantage : Low  message Complexity.

Link: 
http://the-paper-trail.org/blog/consensus-protocols-two-phase-commit/


2PC transaction Commit: 

TM has following states : init,Preparing, Commit and aborted.
Let Trxn cordinator TMi

Phase 1: Obtaining a decision 
Initiate TM
TM ask all participant to prepare to commit transaction
TM adds record<Prepare>to its recovery long and saves it in stable storage
Send Prepare to all sites at which t is executed.

After receiving all messgae RM(Resource Manager ) can decide if it can commit on transection.
If transaction can be commited adds the record <ready> to log.
Flush all record to storage
Send Ready msg to TM
Else add <no> to log and sends a msg abort to TM

Phase 2: Aborting a decision
T commited if only TM receives ready msg else abort
TM add decision <commit> or <abort> to log 
TM sends msg to each particpant informing of descison.
RM does the same on recieving msg (Stores in log)

 
2PC suffers from Single Point of Failure.


Failure 2PC TXN RM:
RM participating in TXn fails and then recovers and then examins log:
if log says <commit> no action
log says<abort>no action
log says <ready> RM must consult TM for the fate. IF commite, redo log and write commit
else write abort.

If site contains no record => RM failed before responding to prepare. TM abort txn and RM should undo T

Failure TXN TM:
If TM fails before completion RM decides fate
IF one of RM contains commit then T is commited else aborted.
If RM of any site doesnt contain ready, no decision can be taken <abort>
IF RM has ready then sited must wait for TM to recover. Blocking problem

TOTAL MSG: 1 to initiate TM, 2n in phase 1 n in phase 2 total : 3n+1 
4 msg delay
n+1 writes to stable storage n by RM and 1 by TM


_____________________________________________________

3 PC
Problem of 2 PC removed by an extra Phase.
Idea break 2PC second phase into 2 phases. 
1st Prepare to commit phase. 
Co-ordinated sends to all if it has received yes from all .Nodes store them and then sends msg to cordinator that prepare to commit was received.
Purpose is to communicate result of vote to every node so that the state can be recovered. 

Last phase : If coordinator receives result of prepare of commit from all replica it commits, if delivery is not confirmed and our system can handle f failure co-ordinator can go ahead once it recieves f+1 msg.
If co-ordinator crash at any time , recovery node can take over transaction and query state from any remaining node.
If anyother node  crashes , it can read form other nodes.
Problem :
Network was divided into 2 partitions and both had different results.
Works for fail-stop not for fail-recover.


Cordinator fails before it has received prepared to commit replies from nodes,new co-ordinator takes over.Recover Co-or will interrogates nodes to know of the decision and completes them. At same time main co-or recovers and relies it has not received replies from all times them out,sending abort to all.messages gets interleaved with commit msg of recovery co-or resulting in inconsistent state.
_____________________________________________________________________

PAXOS 
One Node act as proposer and is responsible for initiating protocol.
Acceptors other nodes.
Acceptors accept or reject .
Once majority have  accepted protocol can terminate.
Proposer sends propose req to acceptor, acceptor responds once acceptor agrees, proposer sends commit request to acceptor.

Paxos adds 2 main improvements to 2PC.
1) Order to determine what proposal can be accepted.
2) Consider accepted if majority has done that.

Every proposer tagged with unique sequence number . Number required to decide on ordering of which proposer came first,
Once proposal arrives acceptors sees whats the highest number of proposal they have received.If new proposer number > previous acceptor reply saying it wont accept any proposal less then the recieved seq num.
All proposers unique seq number.
if two proposers are agreed on by majority atleast one will be common to both.
When new proposal is made new majority will guarantee acceptor that saw both previous proposals or two acceptors that saw one each.

Once it receives message from majority, proposer can go ahead and ask to commit to a value.

Case if 2 proposers propose first proposal accepted by just majority and if before reply of prepare to commit the acceptor fails . no majority. 
Now new proposer propose and its accepted by majority, it commits
Failed acceptor wakes up and sends final accept msg to orignal proposal first proposer commits too . Correctness voilated.  
Solution both propose same value. If any proposer receives higher value then it accepts it.


Failure :
If f failure allowed, no of acceptor 2f+1
If proposer fail, another can take over. 
If orignal recovers and see no > his own . 
It can agree on that.
Issue if both proposal send proposal one more then previous in every request, will never come to consensus.

Acceptor need to record highest proposal in stable storage.
if that storage crash cannot take part(Byzantine)

Efficiency:
1st phase : f+1 msg and recieve f+1 replies
repeat till 4 phases : 4f+4 msg
Delay till protocol completed 4 msgs.
It can be seen that 2F+1 needed for consensus with despite failure of f.
 

Paxos Commit Protocol:
Txn initiator calls TM
TM sends prepare to RM 
RM sends Phase 2a to acceptor 
acceptor sends phase 2b to TM
TM sends commit to RM
TOTAL MSG : 2F *(n+1) + 3n + 1 msgs
5 msg delays
n+2F+1 writes to stable storage.

Analysis:
1 msg initiator to TM
N prepare msg from TM to RM
N commit from TM to RM
N(2F+1) phase 2a commit msg from RM to acceptor as each RM sends to acceptor 

2F+1 phase 2b msgs.

total 2f(n+1)  + 3n+2

Transection initiated by one of RM so 1 prepare msg can be avoided bwn RM and TM
if TM in same node as acceptor phase 2b msg can be avoided. 

If each acceptor same node as RM and TM also on RM
F+1 of phase 2a and f+1 of phase 2b can be avoided.

Links:
PaperTrail
Paxos Commit 
Paxos Simple 
 
 
  

Thursday 29 May 2014

Understanding Evaluation Metrices.



Confusion Matrix is a tabular layout that can be used to measure the performance of an algorithm. Each Column represents predicted value and each row represents actual value. 
Lets take an example of testing a person for cancer. 
If the person has cancer , the test will say it as positive and if not then negative.
There are can be 4 outcomes if we consider test as an experiment.

1) The patients were suffering from cancer and our test recognized them as suffering from Cancer : These are called TRUE POSITIVE.
2) The patients were suffering from cancer but tests showed negative : These are called TRUE NEGATIVE
3) Patients who were healthy but were diagnosed as suffering from Cancer : FALSE POSITIVE
4) Patients who were not suffering from cancer and test said the same : FALSE NEGATIVE.

Precision (Positive Predicted Value) : proportion of patients test showed as having cancer and they actually had cancer.

\mathit{PPV} = \mathit{TP} / (\mathit{TP} + \mathit{FP})
  
Recall(True Positive Rate)  : proportion of patients that actually had Cancer were diagnosed by test as having cancer. :
\mathit{TPR} = \mathit{TP} / P = \mathit{TP} / (\mathit{TP}+\mathit{FN})
 
Precision and Recall Inversely Related

Accuracy : 
Ratio of Correctly classified instances to total instances.
In layman terms : Number of times the test were right 
\mathit{ACC} = (\mathit{TP} + \mathit{TN}) / (P + N)

Where P and N are total number of tests.


F1 :  is a measure that combines the precision and recall rates by computing the harmonic mean between them. F-Measure does not consider True Negatives into account. 

\mathit{F1} = 2 \mathit{TP} / (2 \mathit{TP} + \mathit{FP} + \mathit{FN})


ROC Curve
Receiver Operating Curve(ROC)  is a plot to show change in performance of Binary Classifier with change in Threshold. The graphs are plotted as the fraction of true positive by total actual positive called the TRUE POSITIVE RATE and true negatives by total actual negatives also called the TRUE NEGATIVE RATE. Values range between 0 and 1.


 

Saturday 3 May 2014

Simplifying the map reduce framework

Google MapReduce

What is Map Reduce ?
 Map Reduce is a model to process large amount of data in Parallel.Let user handle Computation aspects and hides the messy details of parallelization, fault tolerance,data distribution and load balancing .

Whats the programming model for MapReduce ?
Input <Key,Value> pair  -> Output <key,Value>pairs.
Map and Reduce two functions.
Map takes input pair,producing intermediate key value pair.
Group all intermediate values assiciated with same intermediate key k and pass it to Reduce funtion.

Reduce function :  Merge these intermediate values producing a smaller set if values.
 

What are some of the examples? 
Word Count. Count all occurrence of words in a set of documents. 
Map funciton -> each word plus an associated count..
Reduce function -> sum together all counts emitted.

Distributed Grep :  Map : Emits line if matches specific pattern, 
reduce identity function ,copies data to output.   

Count of URL access frequency : 
Map : Logs of web page request outputs <URL,1>
Reduce : Adds all values for the same URL <URL,total count>

Reverse Web Link Graph : 
Map: outout<target,source>
Link pairs target url found on source.  
Reduce : sums all source URL associated with a target.

Distributed Sort
Machine Learning.

What are the steps invloved in Map Reduce ?
1.Split the input file into M pieces of 16 MB or 64 MB.Starts many copies on cluster machine.
2.Follows master and slave model.1 Master many slaves or workers. Master  asssigns idle workers a task from M map and R reduce tasks.
3.Map worker reads input split . generate key value pair form it,pass it to Map function.
4.Map produces intermediate <Key,Value>pair which are buffered in memory.
5. Buffered pairs written to local disk periodically.Local disk partioned into R regions by partitioning function.
6.Location of these passed passed bak to master who forward these to workers.
7.Worker when recieves notification of a location, uses RPC to read buffered data from buffered Disk. Sort the data by intermediate key for grouping.
8.Reduce worker , iterates over sorted intermediate key and for each unique key it passes the <key,Value> pair to reduce function.
9.Writes the output to files.
 
Master :
Stores state(Idle,in-progress or compleated) and identity of worker machine.






Why Combiner Function ?
A mapper function(WOrdCount) produces output in the form <'the',1> with word and its count.  This output is then sent over the network to a single reduce task and clubbed together to produce as many output files as the number of reducer function.
Combiner function does parital merging of data ,before it is sent over the network. Combiner function runs on each machine that has Mapper funciton.
Combiner function significantly speeds up the MapReduce operation by minimizing the number of <k,v> pair that will be sent over the network therby improving the bandwidth. Combiner code is similar to Reducer. 

Saturday 19 April 2014

Extracting Movie Information from IMDB

A Python Script to access Movie information from IMDB..
you can look at the script on my GIT repository.
GIT 
Goal : To extract Movie Information from IMDB.
Process : 
1) For data scrapping best python Library is Beautiful Soup.
2)Since I use Python3 Mechanize is not available so the work is a little difficult.
3)UrlLib2 module not found error resolved by  this code :
try:
    import urllib.request as urllib2
except:
    import urllib2


4)BeautifulSoup extracts information from the HTML code.
5)Search for a perticular title on IMDB.
6) Most of the time the first result is the one we are searching for .
7)Use Web Scrapping to extratct Movie Information.
8)Rating,Starcast,Critic Raiting and all the information is extracted.

rating = soup1.findAll('span',{'itemprop':'ratingValue'})[0].string

extracts rating from IMDB file . Here itemprop is a Defined class and You are extracting the first element of the itemprop class with name "rating Value"
Beautiful soup API useful for Scrapping
string  : returns the value.
text : returns the text associated with h1..h4, div tags.
other useful ways extract link using 

mainlink=soup.findAll('td',{'class':'result_text'})[0].a['href']


Wednesday 26 March 2014

Python and NLP

I recently worked on a project titled "Recommending Similar defects on Apache Hadoop" .Its a recommendation system that predicts similar defects and then predicts the effort estimate for each defect.
Steps:
1) Extract XML/Excel data from Apache Hadoop Issue Tracker.
https://issues.apache.org/jira/browse/HADOOP
2)Convert the extracted data into CSV for persistent storage.
3)Extract required Column


Python COde :

import csv
import re

def col_selector(table, column_key):
    return [row[column_key] for row in table]

with open("Data/next.csv","r") as csvfile:
    reader = csv.DictReader(csvfile, delimiter=",")
    table = [row for row in reader]
    foo_col = col_selector(table, "Summary")
    bar_col = col_selector(table, "Description")

The above example extract two columns from Apache Hadoop Issue Tracker CSV file.  Your program must include python library called csv.py
http://docs.python.org/2/library/csv.html

4)From these Column we will generate a set of words specific to Hadoop.
We will apply various NLP to generate various words from the summary and description.

5)There are 5 Steps in Natural Language Processing 
1. Tokenizing
2. Stemming
3. Stop Word Removal
4. Vector Space Representation
5. Similarity Measures

Step 1 : Tokenizing :
 The tokenization process involves breaking a stream of characters of text up into words or phrases, symbols or other meaningful elements called tokens. Before indexing, we Fillter out all common English stopwords.I obtained a list of around 800 stopwords online. 
K. Bounge. Stop Word List.
https://sites.google.com/site/kevinbouge/stopwords-lists
The list contained articles, pronouns, verbs etc. I filtered out all those words from our extracted text. After reviewing the list, we felt stopwords list for a Hadoop Database has to be built separately, as numbers and sym-
bols are also to be filtered out. 


Step 2:
Stemming is used to try to identify a ground form for each word in the text. Some words that carry the same information can be used in different grammatical ways, depending on how the creator of the report wrote it down. This phase will remove a xes and other components from each token
that resulted from tokenization so that only the stem of each word remains. For stemming, we used a python library called PortorStemmer. We passed to it stream of extracted words. Words like caller, called, calling whose stem was call were Filtered and only 1 word, call, was kept in the nal list.I Filtered around 1200 words this way.


Step 3:
Stop Word Removal 
Synonyms removal and replace by 1 common word.I used wordnet NLTK to perform this.
Second Phase : Spell checking: List compared with list of misspelled words.

Step 4:
Vector Space representation.
After the first 3 steps I had around 5500 words. These words were used to identify tags.Each defect with tags was then represented into a Vector space model.Used general method used by scikit.

Step 4: Similarity Measure.
Calculated the cosine similarity between the two defect vectors.
 

Sunday 2 March 2014

The Curious Case of Leonardo Di Caprio's Oscar :Sentiment Analyisis

I was very excited yesterday night for the Oscars as Leonardo Di Caprio was in the last few of Best actor nominees. Though he has done some brilliant movies in the past and he is a great actor , I was not confident with this movie getting him the award as I felt he has done much better work in other films . But Still fingers were crossed for brilliant actor like Leonardo. I was just curious to see how twitter is doing with the Oscars. I did sentiment analysis on Tweets to see what’s people point of view on Leonardo is Just before the Oscar . How many of them wanted him to win. How many feel that Leonardo is not the right person for Oscars and someother actor should win it.
Sentiment Analysis on tweets gave me interesting results.
Steps :
1. Extract tweets with HashTag on Leonardo
2. Generate CSV of Tweets
3. Extract required information
4. Natural Language Processing - Tokenizing ,Stamming etc.
5.Classify them as Positive Negative Neutral
6.Apply Naivebayes.

Positive Tweets
RT @FindingSquishy_: If #Leonardo Di Caprio wins an Oscar tonight, Tumblr will probably break
if #Leonardo di Caprio doesn't win an oscar I am going to scream
RT @Mohammed_Meho: #Leonardo Di Caprio better win an Oscar tonight.
RT @Miralemcc: #The Wolf of the Wall Street and# Leonardo di Caprio for #Oscars2014



Negative Tweet
#Leonardo Di Caprio doesn't deserve and never has deserved an oscar. Deal with it

.............................................

Step1 : Step 1 is Scrapping tweets for the required tag. This can be done using the twitter API or You can use online sites for searching tweets and extract the search results from it. There are many sites that can give you direct Sentiment analysis results like NCSU project : 
http://www.csc.ncsu.edu/faculty/healey/tweet_viz/tweet_app/
Stanford Project : 
Sentiment140
http://www.sentiment140.com/
But I choose twitter seeker that just gives you search result without sentiments and I wanted to do Sentiment analysis myself. 
TwitterSeeker generates you a Excel sheet with all tweet information.

You can filter it by selecting language as english  In the image I applied no filter. 
Excel file generated will have user name ,time of posting,tweet and many other as option. In the current case I am only concerned with the tweet. 

STEP 2 : generate CSV of Tweets. 
For my data as input to ML algorithms , I used CSV file. CSV is Comma Seperated Value format in which each column is seperated by delimiter. After getting excel from twitter I converted into a CSV file. 

STEP 3: Extract Requried Information:
This is the step where your knowledge of Data mining will come into use. As in the present I am only concerned with one column that is tweet. Now general tweet is generally in a form 
Username @User #tag Link
which can very randomly.
Now I removed all the unnecessary words from it . All usernames tags and links.


#updated every day.

STEP 4: Tag Generation. 
Get tags for All tweets. 

STEP 5: Sentiment Analysis :
For sentiment Analysis I am using ANEW dataset from University of Florida.
Our Dictionary Datset was composed of 3 main components:

Valence which is the pleasantness of stimulus
Arousal Intensity of Provoked Emotion
Dominance Degree of control exerted by Stimulus.


We decided to use the arousal ratings to estimate polarity
of a tweet. The following steps were followed regarding the
same.

  • Generate tags for each tweet.
  • For each word i in the tweet that exist in the Arousal Dictionary, extract the mean and standard deviation of valence, arousal, dominance.
  • Count number of tags for each tweet. If they are zero or 1 ignore it because of less information to estimate
  • sentiment.
  • To calculate the overall mean and standard deviation of each twitter feed , numerically average the generated n tags mean and standard deviations.
STEP 6: From List of tweets that I collected, this is what I got.