Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Use SLF4J for logging in kernel-api #2305

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,13 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
Test / javaOptions ++= Seq("-ea"),
libraryDependencies ++= Seq(
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "2.0.9",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.slf4j" % "slf4j-log4j12" % "2.0.9" % "test"
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand All @@ -251,7 +253,8 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13" % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.slf4j" % "slf4j-log4j12" % "2.0.9" % "test"
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@

import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Logging;

public class TableImpl implements Table, Logging {
public class TableImpl implements Table {
public static Table forPath(TableClient tableClient, String path)
throws TableNotFoundException {
// Resolve the path to fully qualified table path using the `TableClient` APIs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.util.Logging;
import io.delta.kernel.internal.util.Tuple2;

/**
Expand All @@ -56,7 +55,7 @@
* - {@link #getAddFilesAsColumnarBatches}: return all active (not tombstoned) AddFiles as
* {@link ColumnarBatch}s
*/
public class LogReplay implements Logging {
public class LogReplay {

/** Read schema when searching for the latest Protocol and Metadata. */
public static final StructType PROTOCOL_METADATA_READ_SCHEMA = new StructType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.Snapshot;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
Expand All @@ -35,15 +38,15 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Logging;
import io.delta.kernel.internal.util.Tuple2;
import static io.delta.kernel.internal.fs.Path.getName;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

public class SnapshotManager
implements Logging {
public class SnapshotManager {
public SnapshotManager() {}

private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);

/**
* - Verify the versions are contiguous.
* - Verify the versions start with `expectedStartVersion` if it's specified.
Expand Down Expand Up @@ -98,7 +101,7 @@ private CloseableIterator<FileStatus> listFrom(
TableClient tableClient,
long startVersion)
throws IOException {
logDebug(String.format("startVersion: %s", startVersion));
logger.debug("startVersion: {}", startVersion);
return tableClient
.getFileSystemClient()
.listFrom(FileNames.listingPrefix(logPath, startVersion));
Expand Down Expand Up @@ -158,7 +161,7 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
TableClient tableClient,
long startVersion,
Optional<Long> versionToLoad) {
logDebug(String.format("startVersion: %s, versionToLoad: %s", startVersion, versionToLoad));
logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad);

return listFromOrNone(
logPath,
Expand Down Expand Up @@ -225,9 +228,9 @@ private SnapshotImpl createSnapshot(
TableClient tableClient) {
final String startingFromStr = initSegment
.checkpointVersionOpt
.map(v -> String.format(" starting from checkpoint version %s.", v))
.map(v -> String.format("starting from checkpoint version %s.", v))
.orElse(".");
logInfo(() -> String.format("Loading version %s%s", initSegment.version, startingFromStr));
logger.info("Loading version {} {}", initSegment.version, startingFromStr);

return new SnapshotImpl(
logPath,
Expand Down Expand Up @@ -324,13 +327,13 @@ protected Optional<LogSegment> getLogSegmentForVersion(
// recursive call to [[getLogSegmentForVersion]] below (same as before the refactor).
newFiles = Collections.emptyList();
}
logDebug(() ->
logger.atDebug().setMessage(() ->
String.format(
"newFiles: %s",
Arrays.toString(newFiles.stream()
.map(x -> new Path(x.getPath()).getName()).toArray())
)
);
).log();

if (newFiles.isEmpty() && !startCheckpointOpt.isPresent()) {
// We can't construct a snapshot because the directory contained no usable commit
Expand All @@ -356,31 +359,32 @@ protected Optional<LogSegment> getLogSegmentForVersion(
final List<FileStatus> checkpoints = checkpointsAndDeltas._1;
final List<FileStatus> deltas = checkpointsAndDeltas._2;

logDebug(() ->
logger.atDebug().setMessage(() ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this .setMessage API? I googled around but couldn't find it. Could you link to it? Does it just take a in a supplier?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.format(
"\ncheckpoints: %s\ndeltas: %s",
Arrays.toString(checkpoints.stream().map(
x -> new Path(x.getPath()).getName()).toArray()),
Arrays.toString(deltas.stream().map(
x -> new Path(x.getPath()).getName()).toArray())
)
);
).log();

// Find the latest checkpoint in the listing that is not older than the versionToLoad
final CheckpointInstance maxCheckpoint = versionToLoadOpt.map(CheckpointInstance::new)
.orElse(CheckpointInstance.MAX_VALUE);
logDebug(String.format("lastCheckpoint: %s", maxCheckpoint));
logger.debug("lastCheckpoint: {}", maxCheckpoint);

final List<CheckpointInstance> checkpointFiles = checkpoints
.stream()
.map(f -> new CheckpointInstance(f.getPath()))
.collect(Collectors.toList());
logDebug(() ->
String.format("checkpointFiles: %s", Arrays.toString(checkpointFiles.toArray())));
logger.atDebug().setMessage(() ->
String.format("checkpointFiles: %s", Arrays.toString(checkpointFiles.toArray()))
).log();

final Optional<CheckpointInstance> newCheckpointOpt =
Checkpointer.getLatestCompleteCheckpointFromList(checkpointFiles, maxCheckpoint);
logDebug(String.format("newCheckpointOpt: %s", newCheckpointOpt));
logger.debug("newCheckpointOpt: {}", newCheckpointOpt);

final long newCheckpointVersion = newCheckpointOpt
.map(c -> c.version)
Expand Down Expand Up @@ -415,7 +419,7 @@ protected Optional<LogSegment> getLogSegmentForVersion(

return -1L;
});
logDebug(String.format("newCheckpointVersion: %s", newCheckpointVersion));
logger.debug("newCheckpointVersion: {}", newCheckpointVersion);

// TODO: we can calculate deltasAfterCheckpoint and deltaVersions more efficiently
// If there is a new checkpoint, start new lineage there. If `newCheckpointVersion` is -1,
Expand All @@ -427,23 +431,24 @@ protected Optional<LogSegment> getLogSegmentForVersion(
new Path(fileStatus.getPath())) > newCheckpointVersion)
.collect(Collectors.toList());

logDebug(() ->
logger.atDebug().setMessage(() ->
String.format(
"deltasAfterCheckpoint: %s",
Arrays.toString(deltasAfterCheckpoint.stream().map(
x -> new Path(x.getPath()).getName()).toArray())
)
);
).log();

// todo again naming confusing (specify after checkpoint?)
final LinkedList<Long> deltaVersionsAfterCheckpoint = deltasAfterCheckpoint
.stream()
.map(fileStatus -> FileNames.deltaVersion(new Path(fileStatus.getPath())))
.collect(Collectors.toCollection(LinkedList::new));

logDebug(() ->
logger.atDebug().setMessage(() ->
String.format("deltaVersions: %s",
Arrays.toString(deltaVersionsAfterCheckpoint.toArray())));
Arrays.toString(deltaVersionsAfterCheckpoint.toArray()))
).log();

// We may just be getting a checkpoint file after the filtering
if (!deltaVersionsAfterCheckpoint.isEmpty()) {
Expand Down

This file was deleted.

44 changes: 44 additions & 0 deletions kernel/kernel-api/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (2023) The Delta Lake Project Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need two licenses here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied these from connectors I'm assuming they were originally copied from Spark

# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Set everything to be logged to the file target/unit-tests.log
test.appender=file
log4j.rootCategory=info, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Tests that launch java subprocesses can set the "test.appender" system property to
# "console" to avoid having the child process's logs overwrite the unit test's
# log file.
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t: %m%n
44 changes: 44 additions & 0 deletions kernel/kernel-defaults/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (2023) The Delta Lake Project Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Set everything to be logged to the file target/unit-tests.log
test.appender=file
log4j.rootCategory=info, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Tests that launch java subprocesses can set the "test.appender" system property to
# "console" to avoid having the child process's logs overwrite the unit test's
# log file.
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t: %m%n