diff --git a/build.sbt b/build.sbt index 674f6ed0..7e86a902 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val sparkVersion = "2.2.0" -val catsCoreVersion = "1.0.0-MF" -val catsEffectVersion = "0.4" -val catsMtlVersion = "0.0.2" +val catsCoreVersion = "1.0.1" +val catsEffectVersion = "0.7" +val catsMtlVersion = "0.2.2" val scalatest = "3.0.3" val shapeless = "2.3.2" val scalacheck = "1.13.5" @@ -27,11 +27,12 @@ lazy val cats = project scalacOptions += "-Ypartial-unification" ) .settings(libraryDependencies ++= Seq( - "org.typelevel" %% "cats-core" % catsCoreVersion, - "org.typelevel" %% "cats-effect" % catsEffectVersion, - "org.typelevel" %% "cats-mtl-core" % catsMtlVersion, - "org.apache.spark" %% "spark-core" % sparkVersion % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided")) + "org.typelevel" %% "cats-core" % catsCoreVersion, + "org.typelevel" %% "cats-effect" % catsEffectVersion, + "org.typelevel" %% "cats-mtl-core" % catsMtlVersion, + "org.typelevel" %% "alleycats-core" % catsCoreVersion, + "org.apache.spark" %% "spark-core" % sparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided")) .dependsOn(dataset % "test->test;compile->compile") lazy val dataset = project diff --git a/cats/src/main/scala/frameless/cats/implicits.scala b/cats/src/main/scala/frameless/cats/implicits.scala index f8ca73a5..f5de3576 100644 --- a/cats/src/main/scala/frameless/cats/implicits.scala +++ b/cats/src/main/scala/frameless/cats/implicits.scala @@ -1,21 +1,45 @@ package frameless package cats -import _root_.cats.implicits._ import _root_.cats._ +import _root_.cats.kernel.{CommutativeMonoid, CommutativeSemigroup} +import _root_.cats.implicits._ +import alleycats.Empty import scala.reflect.ClassTag import org.apache.spark.rdd.RDD object implicits extends FramelessSyntax with SparkDelayInstances { implicit class rddOps[A: ClassTag](lhs: RDD[A]) { - def csum(implicit m: Monoid[A]): A = lhs.reduce(_ |+| _) - def cmin(implicit o: Order[A]): A = lhs.reduce(_ min _) - def cmax(implicit o: Order[A]): A = lhs.reduce(_ max _) + def csum(implicit m: CommutativeMonoid[A]): A = + lhs.fold(m.empty)(_ |+| _) + def csumOption(implicit m: CommutativeSemigroup[A]): Option[A] = + lhs.aggregate[Option[A]](None)( + (acc, a) => Some(acc.fold(a)(_ |+| a)), + (l, r) => l.fold(r)(x => r.map(_ |+| x) orElse Some(x)) + ) + + def cmin(implicit o: Order[A], e: Empty[A]): A = { + if (lhs.isEmpty) e.empty + else lhs.reduce(_ min _) + } + def cminOption(implicit o: Order[A]): Option[A] = + csumOption(new CommutativeSemigroup[A] { + def combine(l: A, r: A) = l min r + }) + + def cmax(implicit o: Order[A], e: Empty[A]): A = { + if (lhs.isEmpty) e.empty + else lhs.reduce(_ max _) + } + def cmaxOption(implicit o: Order[A]): Option[A] = + csumOption(new CommutativeSemigroup[A] { + def combine(l: A, r: A) = l max r + }) } implicit class pairRddOps[K: ClassTag, V: ClassTag](lhs: RDD[(K, V)]) { - def csumByKey(implicit m: Monoid[V]): RDD[(K, V)] = lhs.reduceByKey(_ |+| _) + def csumByKey(implicit m: CommutativeSemigroup[V]): RDD[(K, V)] = lhs.reduceByKey(_ |+| _) def cminByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ min _) def cmaxByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ max _) } diff --git a/cats/src/test/scala/frameless/cats/FramelessSyntaxTests.scala b/cats/src/test/scala/frameless/cats/FramelessSyntaxTests.scala index 7df912ce..cd4eabd2 100644 --- a/cats/src/test/scala/frameless/cats/FramelessSyntaxTests.scala +++ b/cats/src/test/scala/frameless/cats/FramelessSyntaxTests.scala @@ -40,7 +40,7 @@ class FramelessSyntaxTests extends TypedDatasetSuite { def pure[A](x: A): ReaderT[IO, SparkSession, A] = ReaderT.pure(x) def handleErrorWith[A](fa: ReaderT[IO, SparkSession, A])(f: Throwable => ReaderT[IO, SparkSession, A]): ReaderT[IO, SparkSession, A] = ReaderT(r => fa.run(r).handleErrorWith(e => f(e).run(r))) - def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.lift(IO.raiseError(e)) + def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.liftF(IO.raiseError(e)) def flatMap[A, B](fa: ReaderT[IO, SparkSession, A])(f: A => ReaderT[IO, SparkSession, B]): ReaderT[IO, SparkSession, B] = fa.flatMap(f) def tailRecM[A, B](a: A)(f: A => ReaderT[IO, SparkSession, Either[A, B]]): ReaderT[IO, SparkSession, B] = ReaderT.catsDataMonadForKleisli[IO, SparkSession].tailRecM(a)(f) diff --git a/cats/src/test/scala/frameless/cats/test.scala b/cats/src/test/scala/frameless/cats/test.scala index 288dc8d1..2aa2d11e 100644 --- a/cats/src/test/scala/frameless/cats/test.scala +++ b/cats/src/test/scala/frameless/cats/test.scala @@ -1,19 +1,23 @@ package frameless package cats +import _root_.cats.Foldable import _root_.cats.implicits._ + import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession -import org.scalatest.Matchers -import org.scalacheck.Arbitrary -import Arbitrary._ import org.apache.spark.rdd.RDD -import org.scalatest._ -import prop._ import org.apache.spark.{SparkConf, SparkContext => SC} + import org.scalatest.compatible.Assertion import org.scalactic.anyvals.PosInt +import org.scalatest.Matchers +import org.scalacheck.Arbitrary +import org.scalatest._ +import Arbitrary._ +import prop._ +import scala.collection.immutable.SortedMap import scala.reflect.ClassTag trait SparkTests { @@ -74,25 +78,52 @@ class Test extends PropSpec with Matchers with PropertyChecks with SparkTests { } } - property("rdd simple numeric monoid example") { + property("rdd simple numeric commutative semigroup") { import frameless.cats.implicits._ - val theSeq = 1 to 20 - val toy = theSeq.toRdd - toy.cmin shouldBe 1 - toy.cmax shouldBe 20 - toy.csum shouldBe theSeq.sum + + forAll { seq: List[Int] => + val expectedSum = if (seq.isEmpty) None else Some(seq.sum) + val expectedMin = if (seq.isEmpty) None else Some(seq.min) + val expectedMax = if (seq.isEmpty) None else Some(seq.max) + + val rdd = seq.toRdd + + rdd.cmin shouldBe expectedMin.getOrElse(0) + rdd.cminOption shouldBe expectedMin + + rdd.cmax shouldBe expectedMax.getOrElse(0) + rdd.cmaxOption shouldBe expectedMax + + rdd.csum shouldBe expectedSum.getOrElse(0) + rdd.csumOption shouldBe expectedSum + } } - property("rdd Map[Int,Int] monoid example") { + property("rdd of SortedMap[Int,Int] commutative monoid") { import frameless.cats.implicits._ - val rdd: RDD[Map[Int, Int]] = 1.to(20).zip(1.to(20)).toRdd.map(Map(_)) - rdd.csum shouldBe 1.to(20).zip(1.to(20)).toMap + forAll { seq: List[SortedMap[Int, Int]] => + val rdd = seq.toRdd + rdd.csum shouldBe Foldable[List].fold(seq) + } + } + + property("rdd tuple commutative semigroup example") { + import frameless.cats.implicits._ + forAll { seq: List[(Int, Int)] => + val expectedSum = if (seq.isEmpty) None else Some(Foldable[List].fold(seq)) + val rdd = seq.toRdd + + rdd.csum shouldBe expectedSum.getOrElse(0 -> 0) + rdd.csumOption shouldBe expectedSum + } } - property("rdd tuple monoid example") { + property("pair rdd numeric commutative semigroup example") { import frameless.cats.implicits._ - val seq = Seq( (1,2), (2,3), (5,6) ) + val seq = Seq( ("a",2), ("b",3), ("d",6), ("b",2), ("d",1) ) val rdd = seq.toRdd - rdd.csum shouldBe seq.reduce(_|+|_) + rdd.cminByKey.collect.toSeq should contain theSameElementsAs Seq( ("a",2), ("b",2), ("d",1) ) + rdd.cmaxByKey.collect.toSeq should contain theSameElementsAs Seq( ("a",2), ("b",3), ("d",6) ) + rdd.csumByKey.collect.toSeq should contain theSameElementsAs Seq( ("a",2), ("b",5), ("d",7) ) } } diff --git a/docs/src/main/tut/Cats.md b/docs/src/main/tut/Cats.md index df35e6b2..d485b4fd 100644 --- a/docs/src/main/tut/Cats.md +++ b/docs/src/main/tut/Cats.md @@ -27,7 +27,7 @@ implicit val sync: Sync[ReaderT[IO, SparkSession, ?]] = new Sync[ReaderT[IO, Spa def pure[A](x: A): ReaderT[IO, SparkSession, A] = ReaderT.pure(x) def handleErrorWith[A](fa: ReaderT[IO, SparkSession, A])(f: Throwable => ReaderT[IO, SparkSession, A]): ReaderT[IO, SparkSession, A] = ReaderT(r => fa.run(r).handleErrorWith(e => f(e).run(r))) - def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.lift(IO.raiseError(e)) + def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.liftF(IO.raiseError(e)) def flatMap[A, B](fa: ReaderT[IO, SparkSession, A])(f: A => ReaderT[IO, SparkSession, B]): ReaderT[IO, SparkSession, B] = fa.flatMap(f) def tailRecM[A, B](a: A)(f: A => ReaderT[IO, SparkSession, Either[A, B]]): ReaderT[IO, SparkSession, B] = ReaderT.catsDataMonadForKleisli[IO, SparkSession].tailRecM(a)(f) @@ -101,7 +101,7 @@ And now, we can set the description for the computation being run: val resultWithDescription: Action[(Seq[(Int, String)], Long)] = for { r <- result.withDescription("fancy cats") session <- ReaderT.ask[IO, SparkSession] - _ <- ReaderT.lift { + _ <- ReaderT.liftF { IO { println(s"Description: ${session.sparkContext.getLocalProperty("spark.job.description")}") } @@ -131,6 +131,21 @@ println(data.cmax) println(data.cmin) ``` +In case the RDD is empty, the `csum`, `cmax` and `cmin` will use the default values for the type of +elements inside the RDD. There are counterpart operations to those that have an `Option` return type +to deal with the case of an empty RDD: + +```tut:book +val data: RDD[(Int, Int, Int)] = sc.emptyRDD + +println(data.csum) +println(data.csumOption) +println(data.cmax) +println(data.cmaxOption) +println(data.cmin) +println(data.cminOption) +``` + The following example aggregates all the elements with a common key. ```tut:book @@ -148,9 +163,11 @@ totalPerUser.collectAsMap The same example would work for more complex keys. ```tut:book +import scala.collection.immutable.SortedMap + val allDataComplexKeu = - sc.makeRDD( ("Bob", Map("task1" -> 10)) :: - ("Joe", Map("task1" -> 1, "task2" -> 3)) :: ("Bob", Map("task1" -> 10, "task2" -> 1)) :: ("Joe", Map("task3" -> 4)) :: Nil ) + sc.makeRDD( ("Bob", SortedMap("task1" -> 10)) :: + ("Joe", SortedMap("task1" -> 1, "task2" -> 3)) :: ("Bob", SortedMap("task1" -> 10, "task2" -> 1)) :: ("Joe", SortedMap("task3" -> 4)) :: Nil ) val overalTasksPerUser = allDataComplexKeu.csumByKey