Skip to content

Commit

Permalink
Read password using Beam FileSystems (allows gs://)
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Aug 6, 2018
1 parent 882bbae commit d8b28fc
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ lazy val dbeamCore = project
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-simple" % slf4jVersion,
"org.apache.beam" % "beam-runners-direct-java" % beamVersion,
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
"org.postgresql" % "postgresql" % "42.2.+",
"mysql" % "mysql-connector-java" % "5.1.+",
"com.google.cloud.sql" % "postgres-socket-factory" % "1.0.5",
"com.google.cloud.sql" % "mysql-socket-factory" % "1.0.4",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.5",
"com.google.auto.value" % "auto-value" % autoValueVersion % "provided",
"com.google.auto.value" % "auto-value" % autoValueVersion % Provided,
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.h2database" % "h2" % "1.4.196" % "test",
"com.typesafe.slick" %% "slick" % "3.2.0" % "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package com.spotify.dbeam.options

import java.nio.channels.Channels

import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.options.Validation.Required
import org.apache.beam.sdk.options.{Default, Description, PipelineOptions}
import org.slf4j.{Logger, LoggerFactory}

import scala.io.Source

Expand Down Expand Up @@ -114,8 +118,16 @@ trait OutputOptions extends PipelineOptions {
}

object PipelineOptionsUtil {
val log: Logger = LoggerFactory.getLogger(PipelineOptionsUtil.getClass)
def readPassword(options: DBeamPipelineOptions): Option[String] = {
Option(options.getPasswordFile).map(Source.fromFile(_).mkString.stripLineEnd)
FileSystems.setDefaultPipelineOptions(options)
Option(options.getPasswordFile)
.map(FileSystems.matchSingleFileSpec)
.map{m =>
log.info("Reading password from file: {}", m.resourceId().toString)
Channels.newInputStream(FileSystems.open(m.resourceId()))
}
.map(Source.fromInputStream(_).mkString.stripLineEnd)
.orElse(Option(options.getPassword))
}
}

0 comments on commit d8b28fc

Please sign in to comment.