Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for JSONConverter in sink connector #71

Merged
merged 15 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# UNRELEASED
- [BREAKING CHANGE] Converter support for sink connector has changed. See README.md for details.
- [BREAKING CHANGE] Configuration parameters have changed for url, database, authentication, and last change sequence. See README.md for details.
- [UPGRADED] Connector now supports all authentication types via the `cloudant.auth.type` configuration parameter. When using an authentication type of "iam", the API key is configured via the `cloudant.apikey` configuration parameter.
- [UPGRADED] Upgraded Gradle distribution from 4.5.1 to 7.4
Expand Down
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@ the [Kafka Connector documentation](http://docs.confluent.io/3.0.1/connect/userg

1. `bootstrap.servers`
2. If using a standalone worker `offset.storage.file.filename`.
3. The following configuration when using the Cloudant connector as either a source or a sink:

Parameter | Value
---:|:---
key.converter|org.apache.kafka.connect.json.JsonConverter
value.converter|org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable|true
value.converter.schemas.enable|true
### Converter configuration

Assume these settings in a file `connect-standalone.properties` or `connect-distributed.properties`.
The kafka distribution defaults are usually as follows:
```
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
```

#### Converter configuration: sink connector
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source connector converter needs covering when we do the PR for that work - my intention is that we support JsonConverter on source and sink which simplifies things (as mentioned above it's the default anyway so no need to explicitly set it in config)


For the sink connector, kafka keys are currently ignored; therefore the key converter settings are not relevant.

For the sink connector, we assume that the values in kafka are serialized JSON objects, and therefore `JsonConverter` is supported. If your values contain a schema (`{"schema": {...}, "payload": {...}}`), then set `value.converter.schemas.enable=true`, otherwise set `value.converter.schemas.enable=false`. Any other converter that converts the message values into `org.apache.kafka.connect.data.Struct` or `java.util.Map` types should also work. However, it must be noted that the subsequent serialization of `Map` or `Struct` values to JSON documents in the sink may not match expectations if a schema has not been provided.

### Authentication

Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ dependencies {
testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '1.6.4'
testImplementation group: 'org.easymock', name: 'easymock', version: '3.4'
testImplementation group: 'com.carrotsearch', name: 'junit-benchmarks', version: '0.7.2'
// for logging output when running tests
testRuntimeOnly group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.36'
}

// Java versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,24 @@

import com.ibm.cloud.cloudant.internal.ServiceFactory;
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.BulkDocs;
import com.ibm.cloud.cloudant.v1.model.Document;
import com.ibm.cloud.cloudant.v1.model.DocumentResult;
import com.ibm.cloud.cloudant.v1.model.PostBulkDocsOptions;
import com.ibm.cloud.cloudant.v1.model.PutDatabaseOptions;
import com.ibm.cloud.cloudant.v1.model.*;
import com.ibm.cloud.sdk.core.service.exception.ServiceResponseException;
import com.ibm.cloudant.kafka.common.CloudantConst;
import com.ibm.cloudant.kafka.common.InterfaceConst;
import com.ibm.cloudant.kafka.common.MessageKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

public class JavaCloudantUtil {

Expand Down Expand Up @@ -67,7 +61,7 @@ public class JavaCloudantUtil {
);
}

public static JSONArray batchWrite(Map<String, String> props, JSONArray data)
public static JSONArray batchWrite(Map<String, String> props, List<Map<String, Object>> data)
throws JSONException {
// wrap result to JSONArray
JSONArray result = new JSONArray();
Expand All @@ -76,13 +70,7 @@ public static JSONArray batchWrite(Map<String, String> props, JSONArray data)
// get client object
Cloudant service = getClientInstance(props);

List<Document> listOfDocs = new ArrayList<>();
for(int i=0; i < data.length(); i++){
Map<String, Object> docProperties = data.getJSONObject(i).toMap();
Document doc = new Document();
doc.setProperties(docProperties);
listOfDocs.add(doc);
}
List<Document> listOfDocs = data.stream().map(d -> {Document doc = new Document(); doc.setProperties(d); return doc; }).collect(Collectors.toList());

// attempt to create database
createTargetDb(service, props.get(InterfaceConst.DB));
Expand Down Expand Up @@ -116,6 +104,7 @@ public static JSONArray batchWrite(Map<String, String> props, JSONArray data)
result.put(jsonResult);
}
} catch (Exception e) {
LOG.error("Exception caught in batchWrite()", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may need to be revisited in another PR - the worrying thing is that we were just swallowing exceptions from the cloudant client which I had manage to trigger with a misconfigured test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, there needs to be a separate look at error handling to conform to the behaviours of the built-in errors.tolerance=all and none flags. (all implies silently ignore bad messages so I guess that's all we have right now!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we open a ticket specifically for investigating and improving error handling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a note in my error handling epic, I think strictly speaking we should iterate the result and push to the DLQ or whatever error handling is configured specifically for each document/message, but I'm ok with us improving that later.

if(e.getMessage().equals(String.format(ResourceBundleUtil.get(
MessageKey.CLOUDANT_LIMITATION)))){
// try to put items from jsonResult before exception occurred
Expand Down
93 changes: 20 additions & 73 deletions src/main/java/com/ibm/cloudant/kafka/connect/CloudantSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@
import com.ibm.cloudant.kafka.common.MessageKey;
import com.ibm.cloudant.kafka.common.utils.JavaCloudantUtil;
import com.ibm.cloudant.kafka.common.utils.ResourceBundleUtil;

import com.ibm.cloudant.kafka.schema.ConnectRecordMapper;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand All @@ -48,64 +45,33 @@ public class CloudantSinkTask extends SinkTask {
private int taskNumber;
public static String guid_schema = null;
private Boolean replication;
public static volatile JSONArray jsonArray = new JSONArray();

private List<Map<String, Object>> jsonArray = new LinkedList<>();

private static ConnectRecordMapper<SinkRecord> mapper = new ConnectRecordMapper<>();

@Override
public String version() {
return new CloudantSinkConnector().version();
}


//TODO: all sinkRecords in first Thread
@Override
public void put(Collection<SinkRecord> sinkRecords) {

LOG.info("Thread[" + Thread.currentThread().getId() + "].sinkRecords = " + sinkRecords.size());

for (SinkRecord record : sinkRecords) {
JSONObject jsonRecord;

JSONTokener tokener = new JSONTokener(record.value().toString());
jsonRecord = new JSONObject(tokener);

if (jsonRecord.has(CloudantConst.CLOUDANT_REV)) {
jsonRecord.remove(CloudantConst.CLOUDANT_REV);
}

if(jsonRecord.has(CloudantConst.CLOUDANT_DOC_ID)){
if(replication == false) {
//Add archive schema from SinkRecord when available
jsonRecord.put(InterfaceConst.KC_SCHEMA, record.valueSchema());

//Create object id from kafka
jsonRecord.put(CloudantConst.CLOUDANT_DOC_ID,
record.topic() + "_" +
record.kafkaPartition().toString() + "_" +
Long.toString(record.kafkaOffset()) + "_" +
jsonRecord.get(CloudantConst.CLOUDANT_DOC_ID));
}
//OPTION B: IF replication == true => Do Nothing => Create mirror from Cloudant object

//OPTION C (not implemented): generate new id with cloudant
/*else {
LOG.info(MessageKey.GUID_SCHEMA + ": " + guid_schema);
LOG.warn(CloudantConst.CLOUDANT_DOC_ID + "from source database will removed");

//remove Cloudant _id
jsonRecord.remove(CloudantConst.CLOUDANT_DOC_ID);
}*/
}
jsonArray.put(jsonRecord);

if ((jsonArray != null) && (jsonArray.length() >= batch_size)) {

flush(null);

}
}
}

sinkRecords.stream()
.map(mapper) // Convert ConnectRecord to Map
.sequential() // Avoid concurrent access to jsonArray
.forEach(recordValueAsMap -> {
recordValueAsMap.remove(CloudantConst.CLOUDANT_REV); // Remove the _rev
jsonArray.add(recordValueAsMap);
if (jsonArray.size() >= batch_size) {
flush(null);
}
});
}

@Override
public void stop() {
Expand Down Expand Up @@ -137,31 +103,12 @@ public void start(Map<String, String> props) {
@Override
public void flush(Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets) {
LOG.debug("Flushing output stream for {" + config.getString(InterfaceConst.URL) + "}");

try {

if ((jsonArray != null) && (jsonArray.length() > 0)) {

JSONArray results = JavaCloudantUtil.batchWrite(config.originalsStrings(), jsonArray);
LOG.info("Committed " + jsonArray.length() + " documents to -> " + config.getString(InterfaceConst.URL));

// The results array has a record for every single document commit
// Processing this is expensive!
if (results != null) {
/*
for (int i = 0; i < results.length(); i++) {
JSONObject result = (JSONObject) results.get(i);
LOG.debug(result.toString());
}
*/
}
}

} catch (JSONException e) {
LOG.error(e.getMessage(), e);
JavaCloudantUtil.batchWrite(config.originalsStrings(), jsonArray);
LOG.info("Committed " + jsonArray.size() + " documents to -> " + config.getString(InterfaceConst.URL));
} finally {
// Release memory (regardless if documents got committed or not)
jsonArray = new JSONArray(); ;
jsonArray = new LinkedList<>();
}
}

Expand Down
103 changes: 103 additions & 0 deletions src/main/java/com/ibm/cloudant/kafka/schema/ConnectRecordMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.ibm.cloudant.kafka.schema;

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class ConnectRecordMapper<R extends ConnectRecord<R>> implements Function<ConnectRecord<R>, Map<String, Object>> {

private static Logger LOG = LoggerFactory.getLogger(ConnectRecordMapper.class);

public Map<String, Object> apply(ConnectRecord<R> record) {
// we can convert from a struct or a map - assume a map when a value schema is not provided
Schema.Type schemaType = record.valueSchema() == null ? Schema.Type.MAP : record.valueSchema().type();
Map<String, Object> toReturn = new HashMap<>();
switch (schemaType) {
case MAP:
if (record.value() instanceof Map) {
return convertMap((Map) record.value(), toReturn);
} else {
throw new IllegalArgumentException(String.format("Type %s not supported with schema of type Map (or no schema)",
record.value().getClass()));
}
case STRUCT:
if (record.value() instanceof Struct) {
return convertStruct((Struct) record.value(), toReturn);
} else {
throw new IllegalArgumentException(String.format("Type %s not supported with schema of type Struct",
record.value().getClass()));
}
default:
throw new IllegalArgumentException(String.format("Schema type %s not supported", record.valueSchema().type()));
}
}

// convert struct to map by adding key/values to passed in map, and returning it
private Map<String, Object> convertStruct(Struct struct, Map<String, Object> outMap) {
Schema schema = struct.schema();

// iterate fields and add to map
for (Field f : schema.fields()) {
Object value = struct.get(f);
outMap.put(f.name(), getField(f.schema().type(), value));
}
return outMap;
}

// convert kafka map to map by adding key/values to passed in map, and returning it
private Map<String, Object> convertMap(Map inMap, Map<String, Object> outMap) {

for (Object k : inMap.keySet()) {
if (k instanceof String) {
Object v = inMap.get(k);
if (v instanceof Map) {
outMap.put((String)k, convertMap((Map)v, new HashMap<>()));
} else if (v instanceof Struct) {
outMap.put((String)k, convertStruct((Struct)v, new HashMap<>()));
} else {
// assume that JSON serialiser knows how to deal with it
outMap.put((String)k, v);
}
} else {
throw new IllegalArgumentException("unsupported type in map key " + k.getClass());
}
}
return outMap;
}

// get field value, recursing if necessary for struct types
private Object getField(Type type, Object value) {

switch (type) {
// primitive types: just return value (JSON serialiser will deal with conversion later)
case ARRAY:
case BOOLEAN:
case BYTES:
case FLOAT32:
case FLOAT64:
case INT16:
case INT32:
case INT64:
case INT8:
case STRING:
return value;
// map/struct cases: chain a new map onto this one, as the value, and recursively fill in its contents
case MAP:
return convertMap((Map)value, new HashMap<>());
case STRUCT:
return convertStruct((Struct)value, new HashMap<>());
default:
throw new IllegalArgumentException("unknown type " + type);
}

}

}
Loading