Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for runBackground mutex and log management #3971

Merged
merged 15 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example/scalalib/web/1-todo-webapp/test/src/WebAppTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8181, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8181")
finally server.stop()
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8182, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8182")
finally server.stop()
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8184, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8184")
finally server.stop()
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import utest._
object WebAppTests extends TestSuite {
def withServer[T](example: cask.main.Main)(f: String => T): T = {
val server = io.undertow.Undertow.builder
.addHttpListener(8081, "localhost")
.addHttpListener(8185, "localhost")
.setHandler(example.defaultHandler)
.build
server.start()
val res =
try f("http://localhost:8081")
try f("http://localhost:8185")
finally server.stop()
res
}
Expand Down
113 changes: 50 additions & 63 deletions main/client/src/mill/main/client/ServerLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public static class Result {
public Path serverDir;
}

static final int tailerRefreshIntervalMillis = 2;
final int serverProcessesLimit = 5;
final int serverInitWaitMillis = 10000;

Expand Down Expand Up @@ -120,75 +119,63 @@ public Result acquireLocksAndRun(String outDir) throws Exception {
}

int run(Path serverDir, boolean setJnaNoSys, Locks locks) throws Exception {
try (final FileToStreamTailer stdoutTailer = new FileToStreamTailer(
new java.io.File(serverDir + "/" + ServerFiles.stdout),
stdout,
tailerRefreshIntervalMillis);
final FileToStreamTailer stderrTailer = new FileToStreamTailer(
new java.io.File(serverDir + "/" + ServerFiles.stderr),
stderr,
tailerRefreshIntervalMillis); ) {
stdoutTailer.start();
stderrTailer.start();
String serverPath = serverDir + "/" + ServerFiles.runArgs;
try (OutputStream f = Files.newOutputStream(Paths.get(serverPath))) {
f.write(System.console() != null ? 1 : 0);
Util.writeString(f, BuildInfo.millVersion);
Util.writeArgs(args, f);
Util.writeMap(env, f);
}

if (locks.processLock.probe()) {
initServer(serverDir, setJnaNoSys, locks);
}

while (locks.processLock.probe()) Thread.sleep(3);

String socketName = ServerFiles.pipe(serverDir.toString());
AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName));
String serverPath = serverDir + "/" + ServerFiles.runArgs;
try (OutputStream f = Files.newOutputStream(Paths.get(serverPath))) {
f.write(System.console() != null ? 1 : 0);
Util.writeString(f, BuildInfo.millVersion);
Util.writeArgs(args, f);
Util.writeMap(env, f);
}

long retryStart = System.currentTimeMillis();
Socket ioSocket = null;
Throwable socketThrowable = null;
while (ioSocket == null && System.currentTimeMillis() - retryStart < serverInitWaitMillis) {
try {
ioSocket = AFUNIXSocket.connectTo(addr);
} catch (Throwable e) {
socketThrowable = e;
Thread.sleep(10);
}
}
if (locks.processLock.probe()) {
initServer(serverDir, setJnaNoSys, locks);
}

if (ioSocket == null) {
throw new Exception("Failed to connect to server", socketThrowable);
}
while (locks.processLock.probe()) Thread.sleep(3);

InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outPumperThread = new Thread(outPumper, "outPump");
outPumperThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
inThread.setDaemon(true);
outPumperThread.start();
inThread.start();

if (forceFailureForTestingMillisDelay > 0) {
Thread.sleep(forceFailureForTestingMillisDelay);
throw new Exception("Force failure for testing: " + serverDir);
}
outPumperThread.join();
String socketName = ServerFiles.pipe(serverDir.toString());
AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName));

long retryStart = System.currentTimeMillis();
Socket ioSocket = null;
Throwable socketThrowable = null;
while (ioSocket == null && System.currentTimeMillis() - retryStart < serverInitWaitMillis) {
try {
return Integer.parseInt(
Files.readAllLines(Paths.get(serverDir + "/" + ServerFiles.exitCode))
.get(0));
ioSocket = AFUNIXSocket.connectTo(addr);
} catch (Throwable e) {
return Util.ExitClientCodeCannotReadFromExitCodeFile();
} finally {
ioSocket.close();
socketThrowable = e;
Thread.sleep(10);
}
}

if (ioSocket == null) {
throw new Exception("Failed to connect to server", socketThrowable);
}

InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outPumperThread = new Thread(outPumper, "outPump");
outPumperThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
inThread.setDaemon(true);
outPumperThread.start();
inThread.start();

if (forceFailureForTestingMillisDelay > 0) {
Thread.sleep(forceFailureForTestingMillisDelay);
throw new Exception("Force failure for testing: " + serverDir);
}
outPumperThread.join();

try {
return Integer.parseInt(
Files.readAllLines(Paths.get(serverDir + "/" + ServerFiles.exitCode)).get(0));
} catch (Throwable e) {
return Util.ExitClientCodeCannotReadFromExitCodeFile();
} finally {
ioSocket.close();
}
}
}
15 changes: 13 additions & 2 deletions main/util/src/mill/util/Jvm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mill.util

import mill.api.Loose.Agg
import mill.api._
import mill.main.client.ServerFiles
import os.{ProcessOutput, SubProcess}

import java.io._
Expand Down Expand Up @@ -117,8 +118,18 @@ object Jvm extends CoursierSupport {
mainArgs,
workingDir,
if (!background) None
else if (runBackgroundLogToConsole) Some((os.Inherit, os.Inherit))
else Jvm.defaultBackgroundOutputs(ctx.dest),
else if (runBackgroundLogToConsole) {
val pwd0 = os.Path(java.nio.file.Paths.get(".").toAbsolutePath)
// Hack to forward the background subprocess output to the Mill server process
// stdout/stderr files, so the output will get properly slurped up by the Mill server
// and shown to any connected Mill client even if the current command has completed
Some(
(
os.PathAppendRedirect(pwd0 / ".." / ServerFiles.stdout),
os.PathAppendRedirect(pwd0 / ".." / ServerFiles.stderr)
)
)
} else Jvm.defaultBackgroundOutputs(ctx.dest),
useCpPassingJar
)
}
Expand Down
2 changes: 1 addition & 1 deletion mill
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
set -e

if [ -z "${DEFAULT_MILL_VERSION}" ] ; then
DEFAULT_MILL_VERSION=0.11.12
DEFAULT_MILL_VERSION=0.12.2
fi

if [ -z "$MILL_VERSION" ] ; then
Expand Down
3 changes: 0 additions & 3 deletions runner/src/mill/runner/MillBuildBootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ class MillBuildBootstrap(
}

def evaluateRec(depth: Int): RunnerState = {
mill.main.client.DebugLog.println(
"MillBuildBootstrap.evaluateRec " + depth + " " + targetsAndParams.mkString(" ")
)
// println(s"+evaluateRec($depth) " + recRoot(projectRoot, depth))
val prevFrameOpt = prevRunnerState.frames.lift(depth)
val prevOuterFrameOpt = prevRunnerState.frames.lift(depth - 1)
Expand Down
129 changes: 67 additions & 62 deletions runner/src/mill/runner/MillMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.io.{PipedInputStream, PrintStream}
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.util.Locale
import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters.*
import scala.util.Properties
import mill.java9rtexport.Export
import mill.api.{MillException, SystemStreams, WorkspaceRoot, internal}
Expand Down Expand Up @@ -33,7 +33,7 @@ object MillMain {
err.println(e.getCause.getMessage())
(false, onError)
case NonFatal(e) =>
err.println("An unexpected error occurred")
err.println("An unexpected error occurred " + e)
throw e
(false, onError)
}
Expand Down Expand Up @@ -221,69 +221,74 @@ object MillMain {
while (repeatForBsp) {
repeatForBsp = false

val (isSuccess, evalStateOpt) = Watching.watchLoop(
ringBell = config.ringBell.value,
watch = config.watch.value,
streams = streams,
setIdle = setIdle,
evaluate = (prevState: Option[RunnerState]) => {
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)

withOutLock(
config.noBuildLock.value || bspContext.isDefined,
config.noWaitForBuildLock.value,
out,
targetsAndParams,
streams
) {
val logger = getLogger(
streams,
config,
mainInteractive,
enableTicker =
config.ticker
.orElse(config.enableTicker)
.orElse(Option.when(config.disableTicker.value)(false)),
printLoggerState,
serverDir,
colored = colored,
colors = colors
)
Using.resource(logger) { _ =>
try new MillBuildBootstrap(
projectRoot = WorkspaceRoot.workspaceRoot,
output = out,
home = config.home,
keepGoing = config.keepGoing.value,
imports = config.imports,
env = env,
threadCount = threadCount,
targetsAndParams = targetsAndParams,
prevRunnerState = prevState.getOrElse(stateCache),
logger = logger,
disableCallgraph = config.disableCallgraph.value,
needBuildFile = needBuildFile(config),
requestedMetaLevel = config.metaLevel,
config.allowPositional.value,
systemExit = systemExit,
streams0 = streams0
).evaluate()
Using.resource(new TailManager(serverDir)) { tailManager =>
val (isSuccess, evalStateOpt) = Watching.watchLoop(
ringBell = config.ringBell.value,
watch = config.watch.value,
streams = streams,
setIdle = setIdle,
evaluate = (prevState: Option[RunnerState]) => {
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)

withOutLock(
config.noBuildLock.value || bspContext.isDefined,
config.noWaitForBuildLock.value,
out,
targetsAndParams,
streams
) {
Using.resource(getLogger(
streams,
config,
mainInteractive,
enableTicker =
config.ticker
.orElse(config.enableTicker)
.orElse(Option.when(config.disableTicker.value)(false)),
printLoggerState,
serverDir,
colored = colored,
colors = colors
)) { logger =>
SystemStreams.withStreams(logger.systemStreams) {
tailManager.withOutErr(logger.outputStream, logger.errorStream) {
new MillBuildBootstrap(
projectRoot = WorkspaceRoot.workspaceRoot,
output = out,
home = config.home,
keepGoing = config.keepGoing.value,
imports = config.imports,
env = env,
threadCount = threadCount,
targetsAndParams = targetsAndParams,
prevRunnerState = prevState.getOrElse(stateCache),
logger = logger,
disableCallgraph = config.disableCallgraph.value,
needBuildFile = needBuildFile(config),
requestedMetaLevel = config.metaLevel,
config.allowPositional.value,
systemExit = systemExit,
streams0 = streams0
).evaluate()
}
}
}
}
}
},
colors = colors
)
bspContext.foreach { ctx =>
repeatForBsp =
BspContext.bspServerHandle.lastResult == Some(
BspServerResult.ReloadWorkspace
)
streams.err.println(
s"`$bspCmd` returned with ${BspContext.bspServerHandle.lastResult}"
},
colors = colors
)
}
bspContext.foreach { ctx =>
repeatForBsp =
BspContext.bspServerHandle.lastResult == Some(
BspServerResult.ReloadWorkspace
)
streams.err.println(
s"`$bspCmd` returned with ${BspContext.bspServerHandle.lastResult}"
)
}

loopRes = (isSuccess, evalStateOpt)
loopRes = (isSuccess, evalStateOpt)
}
} // while repeatForBsp
bspContext.foreach { ctx =>
streams.err.println(
Expand Down
Loading
Loading