From ca2892d60bde05896fbf12a3b000cb91b5396590 Mon Sep 17 00:00:00 2001 From: toyama0919 Date: Fri, 18 Sep 2015 13:43:31 +0900 Subject: [PATCH] fix id setting. So that the column define into _id. --- .../ElasticsearchOutputPlugin.java | 73 +++++++++++-------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPlugin.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPlugin.java index 0b4fb21..4a4ed3c 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPlugin.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPlugin.java @@ -1,9 +1,10 @@ package org.embulk.output.elasticsearch; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.inject.Inject; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; @@ -12,38 +13,33 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; +import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Column; +import org.embulk.spi.ColumnVisitor; import org.embulk.spi.Exec; import org.embulk.spi.OutputPlugin; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; -import org.embulk.spi.ColumnVisitor; import org.embulk.spi.TransactionalPageOutput; +import org.embulk.spi.type.Types; import org.slf4j.Logger; -import java.io.IOException; -import java.util.Date; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.inject.Inject; public class ElasticsearchOutputPlugin implements OutputPlugin @@ -110,18 +106,6 @@ public ConfigDiff transaction(ConfigSource config, Schema schema, try (Client client = createClient(task)) { } - // check that id is included in the schema or not if the id is not null. - if (task.getId().isPresent()) { - String id = task.getId().get(); - boolean found = false; - for (Column column : schema.getColumns()) { - if (column.equals(id)) { - found = true; - } - } - checkState(found, "id is not included in column names of the Schema."); - } - try { control.run(task.dump()); } catch (Exception e) { @@ -208,7 +192,6 @@ public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int processorIndex) { final PluginTask task = taskSource.loadTask(PluginTask.class); - Client client = createClient(task); BulkProcessor bulkProcessor = newBulkProcessor(task, client); ElasticsearchPageOutput pageOutput = new ElasticsearchPageOutput(task, client, bulkProcessor); @@ -224,6 +207,7 @@ public static class ElasticsearchPageOutput implements TransactionalPageOutput private BulkProcessor bulkProcessor; private PageReader pageReader; + private Column idColumn; private final String index; private final String type; @@ -244,6 +228,7 @@ public ElasticsearchPageOutput(PluginTask task, Client client, BulkProcessor bul void open(final Schema schema) { pageReader = new PageReader(schema); + idColumn = (id == null) ? null : schema.lookupColumn(id); } @Override @@ -342,7 +327,7 @@ public void timestampColumn(Column column) { }); contextBuilder.endObject(); - bulkProcessor.add(newIndexRequest().source(contextBuilder)); + bulkProcessor.add(newIndexRequest(getIdValue(idColumn)).source(contextBuilder)); } catch (IOException e) { Throwables.propagate(e); // TODO error handling @@ -350,9 +335,33 @@ public void timestampColumn(Column column) { } } - private IndexRequest newIndexRequest() + /** + * @param inputColumn + * @return + */ + private String getIdValue(Column inputColumn) { + if (inputColumn == null) return null; + if (pageReader.isNull(inputColumn)) return null; + String idValue = null; + if (Types.STRING.equals(inputColumn.getType())) { + idValue = pageReader.getString(inputColumn); + } else if (Types.BOOLEAN.equals(inputColumn.getType())) { + idValue = pageReader.getBoolean(inputColumn) + ""; + } else if (Types.DOUBLE.equals(inputColumn.getType())) { + idValue = pageReader.getDouble(inputColumn) + ""; + } else if (Types.LONG.equals(inputColumn.getType())) { + idValue = pageReader.getLong(inputColumn) + ""; + } else if (Types.TIMESTAMP.equals(inputColumn.getType())) { + idValue = pageReader.getTimestamp(inputColumn).toString(); + } else { + idValue = null; + } + return idValue; + } + + private IndexRequest newIndexRequest(String idValue) { - return Requests.indexRequest(index).type(type).id(id); + return Requests.indexRequest(index).type(type).id(idValue); } @Override