Group by Key

case class Name(fnames: List[String], lname: String)

object Name {

  import org.apache.spark.sql.Encoder
  import org.apache.spark.sql.Encoders

  implicit val encoder: Encoder[Name] = Encoders.kryo
}
object Main extends App {

  import org.apache.spark.sql.Dataset
  import Spark.sparkSession.implicits._

  val people: Dataset[Name] =
    List(
      Name(List("John"), "Doe"),
      Name(List("Jane"), "Doe"),
      Name(List("Alice"), "Bob")
    )
    .toDS()

  val families: Dataset[Name] =
    people
      .groupByKey {
        case Name(_, lname) => lname
      }
      .reduceGroups {
        (n1, n2) => Name(n1.fnames ++ n2.fnames, n1.lname)
      }
      .map {
        case (_, n) => n
      }
   
  println {
    families
      .collect
      .mkString("\n")
  }

  Spark.sparkSession.close()
}