Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…elasticsearch into toyama0919-master
  • Loading branch information
sakama committed Nov 30, 2015
2 parents c8f9ad6 + ca2892d commit f3c06f1
Showing 1 changed file with 55 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.embulk.output.elasticsearch;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -16,18 +18,30 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.embulk.config.*;
import org.embulk.spi.*;
import org.slf4j.Logger;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.Exec;
import org.embulk.spi.OutputPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import org.embulk.spi.type.Types;
import org.slf4j.Logger;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.inject.Inject;

public class ElasticsearchOutputPlugin
implements OutputPlugin
Expand Down Expand Up @@ -94,18 +108,6 @@ public ConfigDiff transaction(ConfigSource config, Schema schema,
try (Client client = createClient(task)) {
}

// check that id is included in the schema or not if the id is not null.
if (task.getId().isPresent()) {
String id = task.getId().get();
boolean found = false;
for (Column column : schema.getColumns()) {
if (column.equals(id)) {
found = true;
}
}
checkState(found, "id is not included in column names of the Schema.");
}

try {
control.run(task.dump());
} catch (Exception e) {
Expand Down Expand Up @@ -195,7 +197,6 @@ public TransactionalPageOutput open(TaskSource taskSource, Schema schema,
int processorIndex)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);

Client client = createClient(task);
BulkProcessor bulkProcessor = newBulkProcessor(task, client);
ElasticsearchPageOutput pageOutput = new ElasticsearchPageOutput(task, client, bulkProcessor);
Expand All @@ -211,6 +212,7 @@ public static class ElasticsearchPageOutput implements TransactionalPageOutput
private BulkProcessor bulkProcessor;

private PageReader pageReader;
private Column idColumn;

private final String index;
private final String type;
Expand All @@ -231,6 +233,7 @@ public ElasticsearchPageOutput(PluginTask task, Client client, BulkProcessor bul
void open(final Schema schema)
{
pageReader = new PageReader(schema);
idColumn = (id == null) ? null : schema.lookupColumn(id);
}

@Override
Expand Down Expand Up @@ -329,17 +332,41 @@ public void timestampColumn(Column column) {
});

contextBuilder.endObject();
bulkProcessor.add(newIndexRequest().source(contextBuilder));
bulkProcessor.add(newIndexRequest(getIdValue(idColumn)).source(contextBuilder));

} catch (IOException e) {
Throwables.propagate(e); // TODO error handling
}
}
}

private IndexRequest newIndexRequest()
/**
* @param inputColumn
* @return
*/
private String getIdValue(Column inputColumn) {
if (inputColumn == null) return null;
if (pageReader.isNull(inputColumn)) return null;
String idValue = null;
if (Types.STRING.equals(inputColumn.getType())) {
idValue = pageReader.getString(inputColumn);
} else if (Types.BOOLEAN.equals(inputColumn.getType())) {
idValue = pageReader.getBoolean(inputColumn) + "";
} else if (Types.DOUBLE.equals(inputColumn.getType())) {
idValue = pageReader.getDouble(inputColumn) + "";
} else if (Types.LONG.equals(inputColumn.getType())) {
idValue = pageReader.getLong(inputColumn) + "";
} else if (Types.TIMESTAMP.equals(inputColumn.getType())) {
idValue = pageReader.getTimestamp(inputColumn).toString();
} else {
idValue = null;
}
return idValue;
}

private IndexRequest newIndexRequest(String idValue)
{
return Requests.indexRequest(index).type(type).id(id);
return Requests.indexRequest(index).type(type).id(idValue);
}

@Override
Expand Down

0 comments on commit f3c06f1

Please sign in to comment.