Month: March 2016

Installing Apache Spark in local mode on Windows 8

In this post i will walk through the process of downloading and running Apache Spark on Windows 8 X64 in local mode on a single computer.

Prerequisites

  1. Java Development Kit (JDK either 7 or 8) ( I installed it on this path ‘C:\Program Files\Java\jdk1.7.0_67’).
  2. Scala 2.11.7 ( I installed it on this path ‘C:\Program Files (x86)\scala’ . This is optional).
  3. After installation, we need to set the following environment variables:
    1. JAVA_HOME , the value is JDK path.
      In my case it will be ‘C:\Program Files\Java\jdk1.7.0_67’. for more details click here.
      Then append it to PATH environment variable as ‘%JAVA_HOME%\bin’ .
    2. SCALA_HOME,
      In my case it will be  ‘C:\Program Files (x86)\scala’.
      Then append it to PATH environment variable as ‘%SCALA_HOME%\bin’ .

Downloading and installing Spark

  1. It is easy to follow the instructions on http://spark.apache.org/docs/latest/ and download Spark 1.6.0 (Jan 04 2016) with the “Pre-build for Hadoop 2.6 and later” package type from http://spark.apache.org/downloads.html

spark1

2. Extract the zipped file to D:\Spark.

3. Spark has two shells, they are existed in ‘C:\Spark\bin\’ directory :

       a. Scala shell (C:\Spark\bin\spark-shell.cmd).
b .Python shell (C:\Spark\bin\pyspark.cmd).

4. You can run of one them, and you will see the following exception:
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.This issue is often caused by a missing winutils.exe file that Spark needs in order to initialize the Hive context, which in turn depends onHadoop, which requires native libraries on Windows to work properly. Unfortunately, this happens even if you are using Spark in local mode without utilizing any of the HDFS features directly.

issue-screen

To resolve this problem, you need to:

a. download the 64-bit winutils.exe (106KB)

b. copy the downloaded file winutils.exe into a folder like D:\hadoop\bin (or                     D:\spark\hadoop\bin)

c. set the environment variable HADOOP_HOME to point to the above directory but without \bin. For example:

  • if you copied the winutils.exe to D:\hadoop\bin, set HADOOP_HOME=D:\hadoop
  • if you copied the winutils.exe to D:\spark\hadoop\bin, set HADOOP_HOME=D:\spark\hadoop

d. Double-check that the environment variable HADOOP_HOME is set properly by         opening the Command Prompt and running echo %HADOOP_HOME%

e. You will also notice that when starting the spark-shell.cmd, Hive will create a C:\tmp\hive folder. If you receive any errors related to permissions of this folder, use the following commands to set that permissions on that folder:

  • List current permissions: %HADOOP_HOME%\bin\winutils.exe ls \tmp\hive
  • Set permissions: %HADOOP_HOME%\bin\winutils.exe chmod 777 \tmp\hive
  • List updated permissions: %HADOOP_HOME%\bin\winutils.exe ls \tmp\hive

5. Re-run spark-shell,it should work as expected.

Text search sample

program-spark

Hope that will help !

 

Advertisement

Bulk Insert Java API of Elasticsearch

The bulk API allows one to index and delete several documents in a single request. Here is a sample usage

accounts.json (Json file which needs to be inserted in elasticsearch)

acccounts (download file from the given link)

Sample data present in file:

{“index”:{“_id”:”1″}}
{“account_number”:1,”balance”:39225,”firstname”:”Amber”,
“lastname”:”Duke”,”age”:32,”gender”:”M”,”address”:”880 Holmes Lane”,”employer”:”Pyrami”,”email”:”amberduke@pyrami.com”,”city”:”Brogan”,
“state”:”IL”}

Java File

package com.elasticsearch.index;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class IndexData {

public static void main(String[] args) throws ParseException, IOException {

// configuration setting
Settings settings = Settings.settingsBuilder()
.put(“cluster.name”, “test-cluster”).build();
TransportClient client = TransportClient.builder().settings(settings).build();

String hostname = “<Your-Hostname>”;
int port = 9300;
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostname),port));

// bulk API
BulkRequestBuilder bulkBuilder = client.prepareBulk();

long bulkBuilderLength = 0;
String readLine = “”;
String index = “testindex”;
String type = “testtype”;
String _id = null;

BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream(“accounts.json”))));
JSONParser parser = new JSONParser();

while((readLine = br.readLine()) != null){
// it will skip the metadata line which is supported by bulk insert format
if (readLine.startsWith(“{\”index”)){
continue;
} else {

Object json = parser.parse(readLine);
if(((JSONObject)json).get(“account_number”)!=null){
_id = String.valueOf(((JSONObject)json).get(“account_number”));
System.out.println(_id);
}

//_id is unique field in elasticsearch
JSONObject jsonObject = (JSONObject) json;
bulkBuilder.add(client.prepareIndex(index, type, String.valueOf(_id)).setSource(jsonObject));
bulkBuilderLength++;

try {
if(bulkBuilderLength % 100== 0){
System.out.println(“##### ” + bulkBuilderLength + ” data indexed.”);
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
System.out.println(“##### Bulk Request failure with error: ” +                                              bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
br.close();

if(bulkBuilder.numberOfActions() > 0){
System.out.println(“##### ” + bulkBuilderLength + ” data indexed.”);
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
System.out.println(“##### Bulk Request failure with error: ” + bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
}

}

Maven dependencies:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>

Hit the elasticsearch URL to check the number of records inserted :

curl XGET https://<ip-address&gt;:9200/corsearch/unified/_count

Response : 
{
count: 1000,
_shards:{
total: 3,
successful: 3,
failed: 0
}
}

 

ElasticSearch cluster setup on AWS

Elasticsearch is a distributed search server offering powerful search functionality over schema-free documents in (near) real time. All of this functionality is exposed via a RESTful JSON API. It’s built on top of Apache Lucene and like all great projects it’s open source.

I’ll assume you already have an Amazon AWS account, and can navigate yourself around the AWS console. If not – get started over at aws.amazon.com.

Fire up instances with your favorite AMI. I chose 3 instances of  m3.xlarge running the latest Ubuntu, but whatever works for you.

Steps to setup Elasticsearch cluster :

1. Download a copy of Elasticsearch 1.5.2

wget  https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.5.2.zip

2. Unzip the zipped folder

unzip elasticsearch-1.5.2.zip

3. Install the cloud AWS plugin so you can take advantage of features like automatic node discovery-

– Go to bin folder of ELASTICSEARCH_HOME and type –

. /plugin install elasticsearch/elasticsearch-cloud-aws/2.5.0

– You can check installed plugin through:

. /plugin –list

4. Configuring Elasticsearch by editing its elasticsearch.yml file –

cluster.name: test-cluster
node.name: "AWS-ES-1"
node.master: true
node.data: true
index.number_of_shards: 3
index.number_of_replicas: 0
plugin.mandatory: cloud-aws
network.host: _ec2_
discovery.type: ec2
discovery.zen.ping.unicast.hosts: ["52.71.255.37", "54.165.19.167", "52.91.6.18"]
cloud.aws.access_key: xxxxxxxxxxxxxxx
cloud.aws.secret_key: xxxxxxxxxxxxxxx

Note:

  • Pink colored configurations can be changed as per requirement.
  • name.node will be different for all the nodes and cluster.name should be same.
  • Blue colored configurations will remain same for all the nodes.

5. Run elasticsearch

./bin/elasticsearch

Your node is now running and is accessible (when using default configuration) on port 9200

curl -XGET ‘http://<EC2 instance ip>:9200/_cluster/health?pretty=true’
{
“cluster_name” : “test-cluster”,
“status” : “green”,
“timed_out” : false,
“number_of_nodes” : 3,
“number_of_data_nodes” : 3,
“active_primary_shards” : 3,
“active_shards” : 3,
“relocating_shards” : 0,
“initializing_shards” : 0,
“unassigned_shards” : 0,
“delayed_unassigned_shards”: 0,
“number_of_pending_tasks” : 0,
“number_of_in_flight_fetch”: 0,
“task_max_waiting_in_queue_millis”: 0,
“active_shards_percent_as_number”: 100
}

6. Repeat same steps for the rest of the nodes in your cluster.


That’s all !!