Skip to content

Commit

Permalink
[ISSUE #4770] Empty class member cause NullpointerException (#4777)
Browse files Browse the repository at this point in the history
* Add null check in writeOffset method

* delete todo

* Move data.put inside null check in writeOffset method

* simplify if judgement

* remove dev environment

* fix style
  • Loading branch information
scwlkq authored Feb 20, 2024
1 parent 5cc065f commit 1a5e06c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static CloudEvent convertRecordToEvent(ConnectRecord connectRecord) {
public static ConnectRecord convertEventToRecord(CloudEvent event) {
byte[] body = Objects.requireNonNull(event.getData()).toBytes();
LogUtil.info(log, "handle receive events {}", () -> new String(event.getData().toBytes(), Constants.DEFAULT_CHARSET));
// todo: recordPartition & recordOffset

ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis(), body);
for (String extensionName : event.getExtensionNames()) {
connectRecord.addExtension(extensionName, Objects.requireNonNull(event.getExtension(extensionName)).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ public OffsetStorageWriterImpl(String connectorName, OffsetManagementService off
}

@Override
public void writeOffset(RecordPartition partition, RecordOffset position) {
ConnectorRecordPartition extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition());
data.put(extendRecordPartition, position);
public void writeOffset(RecordPartition partition, RecordOffset offset) {
ConnectorRecordPartition extendRecordPartition;
if (partition != null) {
extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition());
data.put(extendRecordPartition, offset);
}
}

/**
Expand Down

0 comments on commit 1a5e06c

Please sign in to comment.