Swarm: Transparent scalability through portable continuations

James Earl Douglas

September 24, 2011

What is Swarm for?

Swarm was started by Ian Clarke, who observed common issues with scalable software.

With Swarm, we can address these issues.

One approach to scalability: MapReduce

Pros

Cons

void map(K key, V value, Mapper context)
void reduce(K key, Iterable values, Reducer context)

With Swarm, there is another way

Scala continuations

reset {
  shift { k: (Int => Int) =>
    k(21)
  } * 2
}

Scala continuations are portable

println(a())
println(b())
println(c())

Send a closure

def transport(f: (Unit => Bee), dest: Location) {
  dest match {
    case InetLocation(address, port) =>
      val skt = new java.net.Socket(address, port);
      val oos = new ObjectOutputStream(skt.getOutputStream())
      oos.writeObject(f)
      oos.close()
  }
}

Receive a closure

def listen(port: Short)(implicit tx: Transporter) {
  _local = Some(new InetLocation(localHost, port))

  val server = new java.net.ServerSocket(port);

  var runnable = new Runnable() {
    override def run() = {
      while (true) {
        val socket = server.accept()
        val ois = new ObjectInputStream(socket.getInputStream())
        val bee = ois.readObject().asInstanceOf[(Unit => Bee)]
        Swarm.continue(bee)
      }
    }
  }
  Swarm.executor.execute(runnable)
}

Route the continuation

Depending on the type of Bee instance: execute the continuation, send the continuation somewhere else, or do nothing.

def execute(bee: Bee)(implicit tx: Transporter) {
  bee match {
    case RefBee(f, ref) if (tx.isLocal(ref.location)) =>
      if (!Store.exists(ref.uid)) {
        val newRef = Store.relocated(ref.uid)
        ref.relocate(newRef.uid, newRef.location)
        tx.transport(f, ref.location)
      } else {
        Swarm.continue(f)
      }
    case RefBee(f, ref) => tx.transport(f, ref.location)
    case IsBee(f, dest) if (tx.isLocal(dest)) =>
      Swarm.continue(f)
    case IsBee(f, dest) => tx.transport(f, dest)
    case NoBee() =>
  }
}

Explicit relocation

The moveTo function provides the bottom-level mechanism for relocating an execution.

def moveTo(dest: Location) = shift {
  c: (Unit => Bee) =>
    IsBee(c, dest)
}

def explicitMoveTo: Bee @swarm = {
  val name = readLine("Enter your name: ")
  Swarm.moveTo(new InetLocation(getLocalHost, 9997))

  val age = parseInt(readLine(“Enter your age: "))
  Swarm.moveTo(new InetLocation(getLocalHost, 9998))

  println("Hello " + name + ", you are " + age + " years old!")
  NoBee()
}

Location-transparent variable

The Ref class uses moveTo to store and dereference a variable.

class Ref[A](val typeClass: Class[A],
             initLoc: Location,
             initUid: Long) {
  def apply(): A@swarm = {
    Swarm.dereference(this)
    Store(typeClass, uid).getOrElse(throw new RefNotFound())
  }

Local vs remote Ref

Refs can be explicitly defined on a given Swarm node.

val local = new InetLocation(getLocalHost, localPort)
val remote = new InetLocation(getLocalHost, remotePort)

val a = Ref(local, "bumble bee")
val b = Ref(local, "honey bee")
val c = Ref(remote, "stingless bee")

Dereference a Ref

Use moveTo when necessary to relocate the execution to the node with the data.

def dereference(ref: Ref[_]) = shift {
  c: (Unit => Bee) =>
    RefBee(c, ref)
}

println(a())
println(b())
println(c())

CPS collections

Richer data structures than Ref need collections compatible with CPS data types.

type swarm = cpsParam[Bee, Bee]

implicit def cpsIterable[A, Repr]
  (xs: IterableLike[A, Repr]) = new {
  def cps = new {
    def foreach[B](f: A => Any @swarm): Unit @swarm = {
      val it = xs.iterator
      while(it.hasNext) f(it.next)
    }
    def map[B, That](f: A => B @swarm)
      (implicit cbf: CanBuildFrom[Repr, B, That]):
      That @swarm = {
        val b = cbf(xs.repr)
        foreach(b += f(_))
        b.result
    }

Ref-based data structures

Build on Ref and CPS collections to make Swarm-aware data structures.

class RefMap[A](typeClass: Class[A], refMapKey: String)
        extends Serializable {

  private[this] val map =
    new HashMap[String, Tuple3[Class[A], Location, Long]]()

  def get(key: String): Option[A]@swarm = {
    if (map.contains(key)) {
      val tuple = map(key)
      val ref = new Ref(tuple._1, tuple._2, tuple._3)
      Some(ref())
    } else {
      None
    }
  }

Swarm Twitter

Continuation result rendezvous

def remember(f: => Any@swarm)
      (implicit tx: Transporter, local: Location): String = {
  val uuid = UUID.randomUUID.toString

  Swarm.spawn {
    val x = f
    Swarm.moveTo(local)
    Swarm.saveFutureResult(uuid, x)
  }

  uuid
}

The future of Swarm