Swarm

Transparent Scalability through Portable Continuations

Continuation Workshop 2011, Tokyo, Japan

James Douglas
@jearldouglas

What is Swarm for?

Swarm was started by Ian Clarke, who observed common issues with scalable software.

With Swarm, we can address these issues.

One approach to scalability: MapReduce

Pros

Cons

void map(K key, V value, Mapper context)
void reduce(K key, Iterable values, Reducer context)

With Swarm, there is another way

Scala continuations

reset {
  shift { k: (Int => Int) =>
    k(21)
  } * 2
}

Scala continuations are portable

println(a())
println(b())
println(c())

Send a closure

def transport(f: (Unit => Bee), dest: Location) {
  dest match {
    case InetLocation(address, port) =>
      val skt = new java.net.Socket(address, port);
      val oos = new ObjectOutputStream(skt.getOutputStream())
      oos.writeObject(f)
      oos.close()
  }
}

Receive a closure

def listen(port: Short)(implicit tx: Transporter) {
  _local = Some(new InetLocation(localHost, port))

  val server = new java.net.ServerSocket(port);

  var runnable = new Runnable() {
    override def run() = {
      while (true) {
        val socket = server.accept()
        val ois = new ObjectInputStream(socket.getInputStream())
        val bee = ois.readObject().asInstanceOf[(Unit => Bee)]
        Swarm.continue(bee)
      }
    }
  }
  Swarm.executor.execute(runnable)
}

Route the continuation

Depending on the type of Bee instance: execute the continuation, send the continuation somewhere else, or do nothing.

def execute(bee: Bee)(implicit tx: Transporter) {
  bee match {
    case RefBee(f, ref) if (tx.isLocal(ref.location)) =>
      if (!Store.exists(ref.uid)) {
        val newRef = Store.relocated(ref.uid)
        ref.relocate(newRef.uid, newRef.location)
        tx.transport(f, ref.location)
      } else {
        Swarm.continue(f)
      }
    case RefBee(f, ref) => tx.transport(f, ref.location)
    case IsBee(f, dest) if (tx.isLocal(dest)) =>
      Swarm.continue(f)
    case IsBee(f, dest) => tx.transport(f, dest)
    case NoBee() =>
  }
}

Explicit relocation

The moveTo function provides the bottom-level mechanism for relocating an execution.

def moveTo(dest: Location) = shift {
  c: (Unit => Bee) =>
    IsBee(c, dest)
}
def explicitMoveTo: Bee @swarm = {
  val name = readLine("Enter your name: ")
  Swarm.moveTo(new InetLocation(getLocalHost, 9997))

  val age = parseInt(readLine(“Enter your age: "))
  Swarm.moveTo(new InetLocation(getLocalHost, 9998))

  println("Hello " + name + ", you are " + age + " years old!")
  NoBee()
}

Location-transparent variable

The Ref class uses moveTo to store and dereference a variable.

class Ref[A](val typeClass: Class[A],
             initLoc: Location,
             initUid: Long) {
  def apply(): A@swarm = {
    Swarm.dereference(this)
    Store(typeClass, uid).getOrElse(throw new RefNotFound())
  }

Local vs remote Ref

Refs can be explicitly defined on a given Swarm node.

val local = new InetLocation(getLocalHost, localPort)
val remote = new InetLocation(getLocalHost, remotePort)

val a = Ref(local, "bumble bee")
val b = Ref(local, "honey bee")
val c = Ref(remote, "stingless bee")

Dereference a Ref

Use moveTo when necessary to relocate the execution to the node with the data.

def dereference(ref: Ref[_]) = shift {
  c: (Unit => Bee) =>
    RefBee(c, ref)
}

println(a())
println(b())
println(c())

CPS collections

Richer data structures than Ref need collections compatible with CPS data types.

type swarm = cpsParam[Bee, Bee]
implicit def cpsIterable[A, Repr]
  (xs: IterableLike[A, Repr]) = new {
  def cps = new {
    def foreach[B](f: A => Any @swarm): Unit @swarm = {
      val it = xs.iterator
      while(it.hasNext) f(it.next)
    }
    def map[B, That](f: A => B @swarm)
      (implicit cbf: CanBuildFrom[Repr, B, That]): 
      That @swarm = {
        val b = cbf(xs.repr)
        foreach(b += f(_))
        b.result
    }

Ref-based data structures

Build on Ref and CPS collections to make Swarm-aware data structures.

class RefMap[A](typeClass: Class[A], refMapKey: String)
        extends Serializable {

  private[this] val map =
    new HashMap[String, Tuple3[Class[A], Location, Long]]()

  def get(key: String): Option[A]@swarm = {
    if (map.contains(key)) {
      val tuple = map(key)
      val ref = new Ref(tuple._1, tuple._2, tuple._3)
      Some(ref())
    } else {
      None
    }
  }

Swarm Twitter

Continuation result rendezvous

def remember(f: => Any@swarm)
      (implicit tx: Transporter, local: Location): String = {
  val uuid = UUID.randomUUID.toString

  Swarm.spawn {
    val x = f
    Swarm.moveTo(local)
    Swarm.saveFutureResult(uuid, x)
  }

  uuid
}

The future of Swarm