case class Name(fnames: List[String], lname: String)
object Name {
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
implicit val encoder: Encoder[Name] = Encoders.kryo
}
object Main extends App {
import org.apache.spark.sql.Dataset
import Spark.sparkSession.implicits._
val people: Dataset[Name] =
List(
Name(List("John"), "Doe"),
Name(List("Jane"), "Doe"),
Name(List("Alice"), "Bob")
)
.toDS()
val families: Dataset[Name] =
people.groupByKey {
case Name(_, lname) => lname
}
.reduceGroups {
(n1, n2) => Name(n1.fnames ++ n2.fnames, n1.lname)
}
.map {
case (_, n) => n
}
{
println
families.collect
.mkString("\n")
}
.sparkSession.close()
Spark}