Bulk Insert Java API of Elasticsearch

The bulk API allows one to index and delete several documents in a single request. Here is a sample usage

accounts.json (Json file which needs to be inserted in elasticsearch)

acccounts (download file from the given link)

Sample data present in file:

{“index”:{“_id”:”1″}}
{“account_number”:1,”balance”:39225,”firstname”:”Amber”,
“lastname”:”Duke”,”age”:32,”gender”:”M”,”address”:”880 Holmes Lane”,”employer”:”Pyrami”,”email”:”amberduke@pyrami.com”,”city”:”Brogan”,
“state”:”IL”}

Java File

package com.elasticsearch.index;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class IndexData {

public static void main(String[] args) throws ParseException, IOException {

// configuration setting
Settings settings = Settings.settingsBuilder()
.put(“cluster.name”, “test-cluster”).build();
TransportClient client = TransportClient.builder().settings(settings).build();

String hostname = “<Your-Hostname>”;
int port = 9300;
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostname),port));

// bulk API
BulkRequestBuilder bulkBuilder = client.prepareBulk();

long bulkBuilderLength = 0;
String readLine = “”;
String index = “testindex”;
String type = “testtype”;
String _id = null;

BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream(“accounts.json”))));
JSONParser parser = new JSONParser();

while((readLine = br.readLine()) != null){
// it will skip the metadata line which is supported by bulk insert format
if (readLine.startsWith(“{\”index”)){
continue;
} else {

Object json = parser.parse(readLine);
if(((JSONObject)json).get(“account_number”)!=null){
_id = String.valueOf(((JSONObject)json).get(“account_number”));
System.out.println(_id);
}

//_id is unique field in elasticsearch
JSONObject jsonObject = (JSONObject) json;
bulkBuilder.add(client.prepareIndex(index, type, String.valueOf(_id)).setSource(jsonObject));
bulkBuilderLength++;

try {
if(bulkBuilderLength % 100== 0){
System.out.println(“##### ” + bulkBuilderLength + ” data indexed.”);
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
System.out.println(“##### Bulk Request failure with error: ” +                                              bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
br.close();

if(bulkBuilder.numberOfActions() > 0){
System.out.println(“##### ” + bulkBuilderLength + ” data indexed.”);
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
System.out.println(“##### Bulk Request failure with error: ” + bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
}

}

Maven dependencies:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>

Hit the elasticsearch URL to check the number of records inserted :

curl XGET https://<ip-address&gt;:9200/corsearch/unified/_count

Response : 
{
count: 1000,
_shards:{
total: 3,
successful: 3,
failed: 0
}
}

 

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s