Category: Elasticsearch

Bulk Insert Java API of Elasticsearch

The bulk API allows one to index and delete several documents in a single request. Here is a sample usage

accounts.json (Json file which needs to be inserted in elasticsearch)

acccounts (download file from the given link)

Sample data present in file:

{“index”:{“_id”:”1″}}
{“account_number”:1,”balance”:39225,”firstname”:”Amber”,
“lastname”:”Duke”,”age”:32,”gender”:”M”,”address”:”880 Holmes Lane”,”employer”:”Pyrami”,”email”:”amberduke@pyrami.com”,”city”:”Brogan”,
“state”:”IL”}

Java File

package com.elasticsearch.index;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class IndexData {

public static void main(String[] args) throws ParseException, IOException {

// configuration setting
Settings settings = Settings.settingsBuilder()
.put(“cluster.name”, “test-cluster”).build();
TransportClient client = TransportClient.builder().settings(settings).build();

String hostname = “<Your-Hostname>”;
int port = 9300;
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostname),port));

// bulk API
BulkRequestBuilder bulkBuilder = client.prepareBulk();

long bulkBuilderLength = 0;
String readLine = “”;
String index = “testindex”;
String type = “testtype”;
String _id = null;

BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream(“accounts.json”))));
JSONParser parser = new JSONParser();

while((readLine = br.readLine()) != null){
// it will skip the metadata line which is supported by bulk insert format
if (readLine.startsWith(“{\”index”)){
continue;
} else {

Object json = parser.parse(readLine);
if(((JSONObject)json).get(“account_number”)!=null){
_id = String.valueOf(((JSONObject)json).get(“account_number”));
System.out.println(_id);
}

//_id is unique field in elasticsearch
JSONObject jsonObject = (JSONObject) json;
bulkBuilder.add(client.prepareIndex(index, type, String.valueOf(_id)).setSource(jsonObject));
bulkBuilderLength++;

try {
if(bulkBuilderLength % 100== 0){
System.out.println(“##### ” + bulkBuilderLength + ” data indexed.”);
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
System.out.println(“##### Bulk Request failure with error: ” +                                              bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
br.close();

if(bulkBuilder.numberOfActions() > 0){
System.out.println(“##### ” + bulkBuilderLength + ” data indexed.”);
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
System.out.println(“##### Bulk Request failure with error: ” + bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
}

}

Maven dependencies:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>

Hit the elasticsearch URL to check the number of records inserted :

curl XGET https://<ip-address&gt;:9200/corsearch/unified/_count

Response : 
{
count: 1000,
_shards:{
total: 3,
successful: 3,
failed: 0
}
}

 

Advertisement

ElasticSearch cluster setup on AWS

Elasticsearch is a distributed search server offering powerful search functionality over schema-free documents in (near) real time. All of this functionality is exposed via a RESTful JSON API. It’s built on top of Apache Lucene and like all great projects it’s open source.

I’ll assume you already have an Amazon AWS account, and can navigate yourself around the AWS console. If not – get started over at aws.amazon.com.

Fire up instances with your favorite AMI. I chose 3 instances of  m3.xlarge running the latest Ubuntu, but whatever works for you.

Steps to setup Elasticsearch cluster :

1. Download a copy of Elasticsearch 1.5.2

wget  https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.5.2.zip

2. Unzip the zipped folder

unzip elasticsearch-1.5.2.zip

3. Install the cloud AWS plugin so you can take advantage of features like automatic node discovery-

– Go to bin folder of ELASTICSEARCH_HOME and type –

. /plugin install elasticsearch/elasticsearch-cloud-aws/2.5.0

– You can check installed plugin through:

. /plugin –list

4. Configuring Elasticsearch by editing its elasticsearch.yml file –

cluster.name: test-cluster
node.name: "AWS-ES-1"
node.master: true
node.data: true
index.number_of_shards: 3
index.number_of_replicas: 0
plugin.mandatory: cloud-aws
network.host: _ec2_
discovery.type: ec2
discovery.zen.ping.unicast.hosts: ["52.71.255.37", "54.165.19.167", "52.91.6.18"]
cloud.aws.access_key: xxxxxxxxxxxxxxx
cloud.aws.secret_key: xxxxxxxxxxxxxxx

Note:

  • Pink colored configurations can be changed as per requirement.
  • name.node will be different for all the nodes and cluster.name should be same.
  • Blue colored configurations will remain same for all the nodes.

5. Run elasticsearch

./bin/elasticsearch

Your node is now running and is accessible (when using default configuration) on port 9200

curl -XGET ‘http://<EC2 instance ip>:9200/_cluster/health?pretty=true’
{
“cluster_name” : “test-cluster”,
“status” : “green”,
“timed_out” : false,
“number_of_nodes” : 3,
“number_of_data_nodes” : 3,
“active_primary_shards” : 3,
“active_shards” : 3,
“relocating_shards” : 0,
“initializing_shards” : 0,
“unassigned_shards” : 0,
“delayed_unassigned_shards”: 0,
“number_of_pending_tasks” : 0,
“number_of_in_flight_fetch”: 0,
“task_max_waiting_in_queue_millis”: 0,
“active_shards_percent_as_number”: 100
}

6. Repeat same steps for the rest of the nodes in your cluster.


That’s all !!