Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mproch committed Jul 15, 2021
1 parent ecb6285 commit 5dbc01d
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package pl.touk.nussknacker.engine.management

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationInt

case class FlinkConfig(jobManagerTimeout: Option[FiniteDuration],
case class FlinkConfig(jobManagerTimeout: FiniteDuration = 1 minute,
queryableStateProxyUrl: Option[String],
restUrl: String,
shouldVerifyBeforeDeploy: Option[Boolean])
shouldVerifyBeforeDeploy: Boolean = true)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.concurrent.{Await, Future}

class FlinkRestManager(config: FlinkConfig, modelData: ModelData, mainClassName: String)
(implicit backend: SttpBackend[Future, Nothing, NothingT])
extends FlinkProcessManager(modelData, config.shouldVerifyBeforeDeploy.getOrElse(true), mainClassName) with LazyLogging {
extends FlinkProcessManager(modelData, config.shouldVerifyBeforeDeploy, mainClassName) with LazyLogging {

protected lazy val jarFile: File = new FlinkModelJar().buildJobJar(modelData)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import sttp.model.StatusCode

import java.io.File
import java.util.concurrent.TimeoutException
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}

class HttpFlinkClient(config: FlinkConfig)(implicit backend: SttpBackend[Future, Nothing, NothingT], ec: ExecutionContext) extends FlinkClient with LazyLogging {
Expand Down Expand Up @@ -105,7 +104,7 @@ class HttpFlinkClient(config: FlinkConfig)(implicit backend: SttpBackend[Future,
}

//FIXME: get rid of sleep, refactor?
def waitForSavepoint(jobId: ExternalDeploymentId, savepointId: String, timeoutLeft: Long = config.jobManagerTimeout.getOrElse(1 minute).toMillis): Future[SavepointResult] = {
def waitForSavepoint(jobId: ExternalDeploymentId, savepointId: String, timeoutLeft: Long = config.jobManagerTimeout.toMillis): Future[SavepointResult] = {
val start = System.currentTimeMillis()
if (timeoutLeft <= 0) {
return Future.failed(new Exception(s"Failed to complete savepoint in time for $jobId and trigger $savepointId"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SimpleInfluxClient(config: InfluxConfig)(implicit backend: SttpBackend[Fut
def addAuth[T, S](req: Request[T, S]): RequestT[Identity, T, S] = (for {
user <- config.user
password <- config.password
} yield req .auth.basic(user, password)).getOrElse(req)
} yield req.auth.basic(user, password)).getOrElse(req)

addAuth(basicRequest.get(uri.params("db" -> config.database, "q" -> query)))
.response(asJson[InfluxResponse])
Expand Down

0 comments on commit 5dbc01d

Please sign in to comment.