Showing posts with label classification. Show all posts
Showing posts with label classification. Show all posts

Wednesday, September 20, 2017

Trident ML Indexing Sentiment Classification results with ElasticSearch

Trident ML Indexing Sentiment Classification results with ElasticSearch


In Storm, we may have scenarios in which we like to index results obtained from real-time processing or machine learning into a search and analytics engine. For example, we may have some text streaming in from Kafka messaging system which will go through a TwitterSentimentClassifier (which is available in Trident-ML). After that, we may wish to save the text together with the classified sentiment label as an indexed document in ElasticSearch. This post shows one way to realize such an implementation.

First create a Maven project (e.g. with groupId="com.memeanalytics" and artifactId="es-create-index"), the complete source code of the project can be downloaded from the link below:

https://dl.dropboxusercontent.com/u/113201788/storm/es-create-index.tar.gz

Configure pom.xml and libraries to be used

Before we proceed, I would like to discuss how to write a elasticsearch client which is compatible with Trident-ML as we will be using both in this project. Traditionally an elasticsearch java client can be implemented using native code such as this:

import static org.elasticsearch.node.NodeBuiler.*;

Node node=nodeBuilder().clusterName("elasticsearch").node();
Client client=node.getClient();

//TODO HERE: put document, delete document, etc using the client

node.close();

The above code requires the following dependency in pom.xml:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.4.0</version>
</dependency>

However, as the pom and coding of this library has dependency on lucene-core [version=3.6.0] that is an older version that is not compatible with lucene-analyzers [version=3.6.2] which is currently one of Trident-MLs dependency (The TwitterTokenizer in TwitterSentimentClassifier uses this library). As a result, the elasticsearch library above cannot be used if the TwitterSentimentClassifier in Trident-ML is to be used in this project.

Since the above java code and elastic library cannot be used in this project, the project uses httpclient [version=4.3] from org.apache.httpcomponents in its place to communicate with elasticsearch via RESTful api. The httpclient provides CloseableHttpClient and operators such as HttpGet, HttpPut, HttpDelete,

The dependencies section of the pom for this project looks like the following:

<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>com.github.pmerienne</groupId>
<artifactId>trident-ml</artifactId>
<version>0.0.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3</version>
</dependency>

Spout

Once the pom.xml is properly updated, we can move to implement the code for the Storm spout used in this project. The spout, named TweetCommentSpout, reads tweets from "src/test/resources/twitter-sentiment.csv" and emits them in batch to the Trident topology. the implementation of the spout is shown below:

package com.memeanalytics.es_create_index;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.TextInstance;
import com.github.pmerienne.trident.ml.preprocessing.EnglishTokenizer;
import com.github.pmerienne.trident.ml.preprocessing.TextTokenizer;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class TweetCommentSpout implements IBatchSpout {

private static final long serialVersionUID = 1L;
private static List<List<Object>> data=new ArrayList<List<Object>>();

private int batchIndex;
private int batchSize=10;

static{
BufferedReader br=null;
FileInputStream is=null;
String filePath="src/test/resources/twitter-sentiment.csv";
try {
is=new FileInputStream(filePath);
br=new BufferedReader(new InputStreamReader(is));
String line=null;
while((line=br.readLine())!=null)
{
String[] values = line.split(",");
Integer label=Integer.parseInt(values[0]);
String text=values[1];
// TextTokenizer tokenizer=new EnglishTokenizer();
// List<String> tokens = tokenizer.tokenize(text);
// TextInstance<Integer> instance=new TextInstance<Integer>(label, tokens);
data.add(new Values(text, label));
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch(IOException ex)
{
ex.printStackTrace();
}

}
public void open(Map conf, TopologyContext context) {
// TODO Auto-generated method stub

}

public void emitBatch(long batchId, TridentCollector collector) {
// TODO Auto-generated method stub
int maxBatchCount = data.size() / batchSize;
if(maxBatchCount > 0 && batchIndex < maxBatchCount)
{
for(int i=(batchSize * batchIndex); i < data.size() && i < (batchIndex+1) * batchSize; ++i)
{
collector.emit(data.get(i));
}
batchIndex++;
}
}

public void ack(long batchId) {
// TODO Auto-generated method stub

}

public void close() {
// TODO Auto-generated method stub

}

public Map getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

public Fields getOutputFields() {
// TODO Auto-generated method stub
return new Fields("text", "label");
}

}

The tuples emitted by the spout contains two fields: "text" and "label", the label is ignored, we are going to have the Trident-MLs TweetSentimentClassifier predict the sentiment label for us instead.

Trident operation for ElasticSearch

Next we are going to implement a BaseFilter, named CreateESIndex, which is a Trident operation that create an indexed document in ElasticSearch from each tweet text and its predicted sentiment label. The implementation of the Trident operation is shown below:

package com.memeanalytics.es_create_index;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;

import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class CreateESIndex extends BaseFilter{

private static final long serialVersionUID = 1L;
private int esIndex=1;
private String wsUrl="http://127.0.0.1:9200";
private String indexName="twittersentiment"; //must be lowercase
private String typeName="trident";
private CloseableHttpClient client;
private String lastIndexedDocumentIdQueryJson="{"query": { "match_all": {}}, "size": 1,"+
""sort": ["+
"{"+
""_timestamp": {"+
""order": "desc""+
"}"+
"}"+
"]"+
"}";

public boolean isKeep(TridentTuple tuple) {
// TODO Auto-generated method stub
Boolean prediction =tuple.getBooleanByField("prediction");
String comment=tuple.getStringByField("text");
System.out.println(comment + " >> " + prediction);

if(client != null)
{
HttpPut method=new HttpPut(wsUrl+"/"+indexName+"/"+typeName+"/"+esIndex);

Date currentTime= new Date();
SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat format2 = new SimpleDateFormat("HH:mm:ss");
String dateString = format1.format(currentTime)+"T"+format2.format(currentTime);

CloseableHttpResponse response=null;
try{
String json = "{"text":""+comment+"", "prediction":""+prediction+"", "postTime":""+dateString+""}";
System.out.println(json);
StringEntity params=new StringEntity(json);
params.setContentType(new BasicHeader(HTTP.CONTENT_TYPE, "application/json"));

method.setEntity(params);

method.addHeader("Accept", "application/json");
method.addHeader("Content-type", "application/json");

response = client.execute(method);

HttpEntity entity=response.getEntity();
String responseText=EntityUtils.toString(entity);
System.out.println(responseText);
}catch(IOException ex) {
ex.printStackTrace();
}finally {
method.releaseConnection();
}
esIndex++;
}

return true;
}

@Override
public void prepare(Map conf, TridentOperationContext context) {

client=HttpClients.custom().setRetryHandler(new MyRetryHandler()).build();

CloseableHttpResponse response=null;
HttpDelete method=new HttpDelete(wsUrl+"/"+indexName);
try{
response = client.execute(method);
HttpEntity entity=response.getEntity();
String responseBody=EntityUtils.toString(entity);
System.out.println(responseBody);
}catch(IOException ex)
{
ex.printStackTrace();
}
}

private class MyRetryHandler implements HttpRequestRetryHandler {

public boolean retryRequest(IOException arg0, int arg1, HttpContext arg2) {
// TODO Auto-generated method stub
return false;
}
}

@Override
public void cleanup() {
try {
client.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

In the prepare() method of the CreateESIndex, a RESTful DELETE call is performed to delete all indexed documents under twittersentiment/trident in ElasticSearch. This is to ensure that no data will be under twittersentiment/trident when the bolt is run. Now in its isKeep() method, the tweet text and its associated predicted sentiment label is serialized to a json and sent to elasticsearch via a http PUT call. The CloseableHttpClient object is closed in its cleanup() method.

Trident topology

Now we have the neccessary spout and trident operation, we can define a simple Trident topology which stream tweets-> classified by TwitterSentimentClassifier -> indexed by ElasticSearch. Below is the implementation in the main class:

package com.memeanalytics.es_create_index;

import com.github.pmerienne.trident.ml.nlp.TwitterSentimentClassifier;

import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App
{
public static void main( String[] args )
{
LocalCluster cluster=new LocalCluster();
Config config=new Config();

cluster.submitTopology("TridentWriteToESDemo", config, buildTopology());

try{
Thread.sleep(10000);
}catch(InterruptedException ex)
{

}

cluster.killTopology("TridentWriteToESDemo");
cluster.shutdown();
}

private static StormTopology buildTopology()
{
TridentTopology topology=new TridentTopology();

TweetCommentSpout spout=new TweetCommentSpout();

topology.newStream("classifyAndIndex", spout).each(new Fields("text"), new TwitterSentimentClassifier(), new Fields("prediction")).each(new Fields("text", "prediction"), new CreateESIndex());

return topology.build();
}
}

Once it is completed, run the following command in the project root folder:

> mvn compile exec:java

download file now

Read more »

Wednesday, September 13, 2017

Trident ML Text Classification using KLD

Trident ML Text Classification using KLD


This post shows some very basic example of how to use the Kullback-Leibler Distance text classification algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-text-classifier-kld"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-text-classifier-kld.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:

Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for text classification):

<dependency>
<groupId>com.github.pmerienne</groupId>
<artifactId>trident-ml</artifactId>
<version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_text_classifier_kld.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the ReuterNewsSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_text_classifier_kld;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class ReuterNewsSpout implements IBatchSpout {

private static final long serialVersionUID = 1L;
private List<List<Object>> trainingData=new ArrayList<List<Object>>();
private static Map<Integer, List<Object>> testingData=new HashMap<Integer, List<Object>>();

private int batchSize=10;
private int batchIndex=0;

public ReuterNewsSpout()
{
try{
loadReuterNews();
}catch(FileNotFoundException ex)
{
ex.printStackTrace();
}catch(IOException ex)
{
ex.printStackTrace();
}
}

public static List<List<Object>> getTestingData()
{
List<List<Object>> result=new ArrayList<List<Object>>();
for(Integer topic_index : testingData.keySet())
{
result.add(testingData.get(topic_index));
}

return result;
}

private void loadReuterNews() throws FileNotFoundException, IOException
{
Map<String, Integer> topics=new HashMap<String, Integer>();
String filePath="src/test/resources/reuters.csv";
FileInputStream inputStream=new FileInputStream(filePath);
BufferedReader reader= new BufferedReader(new InputStreamReader(inputStream));
String line;
while((line = reader.readLine())!=null)
{
String topic = line.split(",")[0];
if(!topics.containsKey(topic))
{
topics.put(topic, topics.size());
}
Integer topic_index=topics.get(topic);

int index = line.indexOf(" - ");
if(index==-1) continue;

String text=line.substring(index, line.length()-1);

if(testingData.containsKey(topic_index))
{
List<Object> values=new ArrayList<Object>();
values.add(topic_index);
values.add(text);
trainingData.add(values);
}
else
{
testingData.put(topic_index, new Values(topic_index, text));
}
}
reader.close();
}
public void open(Map conf, TopologyContext context) {
// TODO Auto-generated method stub

}

public void emitBatch(long batchId, TridentCollector collector) {
// TODO Auto-generated method stub

int maxBatchIndex = (trainingData.size() / batchSize);

if(trainingData.size() > batchSize && batchIndex < maxBatchIndex)
{
for(int i=batchIndex * batchSize; i < trainingData.size() && i < (batchIndex+1) * batchSize; ++i)
{
collector.emit(trainingData.get(i));
}


batchIndex++;

//System.out.println("Progress: "+batchIndex +" / "+maxBatchIndex);
}
}

public void ack(long batchId) {
// TODO Auto-generated method stub

}

public void close() {
// TODO Auto-generated method stub

}

public Map getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

public Fields getOutputFields() {
// TODO Auto-generated method stub
return new Fields("label", "text");
}

}


As can be seen above, the ReuterNewsSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a new article containing the fields ("label", "text"). The "label" field is integer value (represents the topic of the news article), while "text" field is a string which is text of the news article. the training records are obtained in such a way that the correct prediction learned from the text classification should be predicting the topic of a news article given the text of the news article.

KLD Text Classification in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a class label for each of the data record using KLD classifier algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_text_classifier_kld;

import java.util.List;

import com.github.pmerienne.trident.ml.nlp.ClassifyTextQuery;
import com.github.pmerienne.trident.ml.nlp.KLDClassifier;
import com.github.pmerienne.trident.ml.nlp.TextClassifierUpdater;
import com.github.pmerienne.trident.ml.preprocessing.TextInstanceCreator;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;


public class App
{
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
{
LocalDRPC drpc=new LocalDRPC();

LocalCluster cluster=new LocalCluster();

Config config=new Config();

cluster.submitTopology("KLDDemo", config, buildTopology(drpc));

try{
Thread.sleep(20000);
}catch(InterruptedException ex)
{
ex.printStackTrace();
}

List<List<Object>> testingData = ReuterNewsSpout.getTestingData();

for(int i=0; i < testingData.size(); ++i)
{
List<Object> testingDataRecord=testingData.get(i);
String drpc_args="";
for(Object val : testingDataRecord){
if(drpc_args.equals(""))
{
drpc_args+=val;
}
else
{
drpc_args+=(","+val);
}
}
System.out.println(drpc.execute("predict", drpc_args));
}

cluster.killTopology("KLDDemo");
cluster.shutdown();

drpc.shutdown();
}

private static StormTopology buildTopology(LocalDRPC drpc)
{
ReuterNewsSpout spout=new ReuterNewsSpout();

TridentTopology topology=new TridentTopology();

TridentState classifierModel = topology.newStream("training", spout).each(new Fields("label", "text"), new TextInstanceCreator<Integer>(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new TextClassifierUpdater("newsClassifier", new KLDClassifier(9)));

topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(classifierModel, new Fields("instance"), new ClassifyTextQuery("newsClassifier"), new Fields("prediction"));
return topology.build();
}
}

package com.memeanalytics.trident_text_classifier_kld;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.TextInstance;
import com.github.pmerienne.trident.ml.preprocessing.EnglishTokenizer;
import com.github.pmerienne.trident.ml.preprocessing.TextTokenizer;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction{

private static final long serialVersionUID = 1L;

public void execute(TridentTuple tuple, TridentCollector collector) {
// TODO Auto-generated method stub
String drpc_args=tuple.getString(0);
String[] args=drpc_args.split(",");
Integer label=Integer.parseInt(args[0]);

String text=args[1];

TextTokenizer textAnalyzer=new EnglishTokenizer();
List<String> tokens=textAnalyzer.tokenize(text);


TextInstance<Integer> instance=new TextInstance<Integer>(label, tokens);

collector.emit(new Values(instance));
}

}

As can be seen above, the Trident topology has a TextInstanceCreator<Integer> trident operation which convert raw ("label", "text") tuple into an TextInstance<Integer> object which can be consumed by TextClassifierUpdater. The TextClassifierUpdater object from Trident-ML updates the underlying classifierModel via KLDClassifier training algorithm.

The DRPCStream allows user to pass in a new testing instance to the classifierModel which will then return a "predict" field, that contains the predicted label of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an TextInstance<Integer> (Note you can set the label to null in DRPCArgsToInstance.execute() method as the label will be predicted instead) which can be passed into the ClassifyTextQuery which then uses KLD and classifierModel to determine the predicted label.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

download file now

Read more »

Friday, August 18, 2017

Trident ML Classification using Perceptron

Trident ML Classification using Perceptron


This post shows some very basic example of how to use the perceptron classification algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-classifier-perceptron"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-classifier-perceptron.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:
Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for perceptron classification):

<dependency>
<groupId>com.github.pmerienne</groupId>
<artifactId>trident-ml</artifactId>
<version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_classifier_perceptron.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the NANDSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_classifier_perceptron;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class NANDSpout implements IBatchSpout {

private int batchSize=10;

public void open(Map conf, TopologyContext context) {
// TODO Auto-generated method stub

}

public void emitBatch(long batchId, TridentCollector collector) {
// TODO Auto-generated method stub
final Random rand=new Random();
for(int i=0; i < batchSize; ++i)
{
boolean x0=rand.nextBoolean();
boolean x1=rand.nextBoolean();
boolean label = !(x0 && x1);
List<Object> values=new ArrayList<Object>();
values.add(label);
values.add(x0 ? 1.0 : 0.0);
values.add(x1 ? 1.0 : 0.0);
//values.add(x0 ? 1.0 + noise(rand) : 0.0 + noise(rand));
//values.add(x1 ? 1.0 + noise(rand) : 0.0 + noise(rand));
collector.emit(values);
}
}

public static double noise(Random rand)
{
return rand.nextDouble()* 0.0001 - 0.00005;
}

public void ack(long batchId) {
// TODO Auto-generated method stub

}

public void close() {
// TODO Auto-generated method stub

}

public Map getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

public Fields getOutputFields() {
// TODO Auto-generated method stub
return new Fields("label", "x0", "x1");
}

}

As can be seen above, the NANDSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a training record containing the fields ("label", "x0", "x1"). The label is boolean value, while x0, x1 are double values which are either 1 (true) or 0 (false). the training records are obtained in such a way that the correct prediction should be a NAND gate from the classification.

Perceptron Classification in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a class label for each of the data record using perceptron classifier algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_classifier_perceptron;

import java.util.Random;

import com.github.pmerienne.trident.ml.classification.ClassifierUpdater;
import com.github.pmerienne.trident.ml.classification.ClassifyQuery;
import com.github.pmerienne.trident.ml.classification.PerceptronClassifier;
import com.github.pmerienne.trident.ml.preprocessing.InstanceCreator;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
{
LocalDRPC drpc=new LocalDRPC();

LocalCluster cluster=new LocalCluster();
Config config=new Config();

cluster.submitTopology("PerceptronDemo", config, buildTopology(drpc));

try{
Thread.sleep(10000);
}catch(InterruptedException ex)
{
ex.printStackTrace();
}

for(int i=0; i < 10; ++i)
{
String drpc_args=createDRPCTestingSample();
System.out.println(drpc.execute("predict", drpc_args));
try{
Thread.sleep(1000);
}catch(InterruptedException ex)
{
ex.printStackTrace();
}
}

cluster.killTopology("PerceptronDemo");
cluster.shutdown();

drpc.shutdown();
}

private static String createDRPCTestingSample()
{
String drpc_args="";

final Random rand=new Random();

boolean bit_x0=rand.nextBoolean();
boolean bit_x1=rand.nextBoolean();
boolean label = !(bit_x0 && bit_x1);

double x0=bit_x0 ? 1.0 + NANDSpout.noise(rand) : 0.0 + NANDSpout.noise(rand);
double x1=bit_x1 ? 1.0 + NANDSpout.noise(rand) : 0.0 + NANDSpout.noise(rand);

drpc_args+=label;
drpc_args+=(","+x0);
drpc_args+=(","+x1);

return drpc_args;
}

private static StormTopology buildTopology(LocalDRPC drpc)
{
TridentTopology topology=new TridentTopology();
NANDSpout spout=new NANDSpout();
TridentState classifierModel = topology.newStream("training", spout).shuffle().each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClassifierUpdater<Boolean>("perceptron", new PerceptronClassifier()));

topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(classifierModel, new Fields("instance"), new ClassifyQuery<Boolean>("perceptron"), new Fields("predict"));

return topology.build();

}
}
package com.memeanalytics.trident_classifier_perceptron;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.Instance;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction {

private static final long serialVersionUID = 1L;

public void execute(TridentTuple tuple, TridentCollector collector) {
// TODO Auto-generated method stub
String drpc_args=tuple.getString(0);
String[] args=drpc_args.split(",");
boolean label=Boolean.parseBoolean(args[0]);

double[] features=new double[args.length-1];
for(int i=1; i < args.length; ++i)
{
features[i-1]=Double.parseDouble(args[i]);
}

Instance<Boolean> instance=new Instance<Boolean>(label, features);

collector.emit(new Values(instance));
}

}

As can be seen above, the Trident topology has a InstanceCreator<Boolean> trident operation which convert raw ("label", "x0", "x1") tuple into an Instance<Boolean> object which can be consumed by ClassifierUpdater. The ClassifierUpdater object from Trident-ML updates the underlying classifierModel via perceptron training algorithm.

The DRPCStream allows user to pass in a new testing instance to the classifierModel which will then return a "predict" field, that contains the predicted label of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an Instance<Boolean> which can be passed into the ClassifyQuery which then uses perceptron and classifierModel to determine the predicted label.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

download file now

Read more »