From d8b28fca2a462273c3b7efc557670cdf62e32855 Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Mon, 6 Aug 2018 08:40:10 +0200 Subject: [PATCH] Read password using Beam FileSystems (allows gs://) --- build.sbt | 3 ++- .../dbeam/options/DBeamPipelineOptions.scala | 14 +++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index b83d5a0c..15e739a1 100644 --- a/build.sbt +++ b/build.sbt @@ -113,12 +113,13 @@ lazy val dbeamCore = project libraryDependencies ++= Seq( "org.slf4j" % "slf4j-simple" % slf4jVersion, "org.apache.beam" % "beam-runners-direct-java" % beamVersion, + "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion, "org.postgresql" % "postgresql" % "42.2.+", "mysql" % "mysql-connector-java" % "5.1.+", "com.google.cloud.sql" % "postgres-socket-factory" % "1.0.5", "com.google.cloud.sql" % "mysql-socket-factory" % "1.0.4", "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.5", - "com.google.auto.value" % "auto-value" % autoValueVersion % "provided", + "com.google.auto.value" % "auto-value" % autoValueVersion % Provided, "org.scalatest" %% "scalatest" % "3.0.5" % "test", "com.h2database" % "h2" % "1.4.196" % "test", "com.typesafe.slick" %% "slick" % "3.2.0" % "test" diff --git a/dbeam-core/src/main/scala/com/spotify/dbeam/options/DBeamPipelineOptions.scala b/dbeam-core/src/main/scala/com/spotify/dbeam/options/DBeamPipelineOptions.scala index 46c27109..c465fac2 100644 --- a/dbeam-core/src/main/scala/com/spotify/dbeam/options/DBeamPipelineOptions.scala +++ b/dbeam-core/src/main/scala/com/spotify/dbeam/options/DBeamPipelineOptions.scala @@ -17,8 +17,12 @@ package com.spotify.dbeam.options +import java.nio.channels.Channels + +import org.apache.beam.sdk.io.FileSystems import org.apache.beam.sdk.options.Validation.Required import org.apache.beam.sdk.options.{Default, Description, PipelineOptions} +import org.slf4j.{Logger, LoggerFactory} import scala.io.Source @@ -114,8 +118,16 @@ trait OutputOptions extends PipelineOptions { } object PipelineOptionsUtil { + val log: Logger = LoggerFactory.getLogger(PipelineOptionsUtil.getClass) def readPassword(options: DBeamPipelineOptions): Option[String] = { - Option(options.getPasswordFile).map(Source.fromFile(_).mkString.stripLineEnd) + FileSystems.setDefaultPipelineOptions(options) + Option(options.getPasswordFile) + .map(FileSystems.matchSingleFileSpec) + .map{m => + log.info("Reading password from file: {}", m.resourceId().toString) + Channels.newInputStream(FileSystems.open(m.resourceId())) + } + .map(Source.fromInputStream(_).mkString.stripLineEnd) .orElse(Option(options.getPassword)) } }