Thursday, 31 May 2012

"Emulating" C#' using keyword in Scala

Not quiet the same but better than dealing with closing the resource manually every time I'm using a DB connection or input stream or the like...

package utils.using
import java.sql.Connection
import javax.persistence.EntityManager
/**
* Declares an object that can be used.
*/
trait Usable[+T] {
def use[R](body: (T) => R): R
}
/**
* Defines a generic container for objects that can be used and closed.
* When use is invoked it will execute the body and close the object afterwards.
*/
abstract case class Closeable[+T](content: T) extends Usable[T] {
override def use[R](body: (T) => R): R =
try {
body(content)
}
finally {
close
}
def close: Unit
}
/**
* Package object which provides the "using" method.
* Also contains some predefined implicit conversions for common types:
* - java.sql.Connection
* - java.io.Closeable
* - scala.io.BufferedSource
* - java.sql.PreparedStatement
* - javax.persistence.EntityManager
*/
object `package` {
/**
* Sets auto commit to false and executes the body.
* On successful execution the transaction is committed
* otherwise it will be rolled back.
* The connection is closed after use.
*/
implicit def SQLConnection2Useable[X <: Connection](instance: X) = new Usable[X] {
def use[R](body: (X) => R): R = {
try {
instance.setAutoCommit(false)
val result = body(instance)
instance.commit()
result
}
catch {
case e =>
instance.rollback()
throw e
}
finally {
instance.close
}
}
}
implicit def IOCloseable2Useable[X <: java.io.Closeable](instance: X) = new Closeable(instance) {
def close = instance.close
}
implicit def scalaioBufferedSource2Useable[X <: scala.io.BufferedSource](instance: X) = new Closeable(instance) {
def close = instance.close
}
implicit def javasqlPreparedStatement2Useable[X <: java.sql.PreparedStatement](instance: X) = new Closeable(instance) {
def close = instance.close
}
/**
* Starts a transaction or fails if it is already started.
* When the body is executed successfully the transaction is committed
* otherwise it will be rolled back.
* The entity manager is closed after use.
*/
implicit def javaxpersistenceEntityManager2Useable[X <: EntityManager](instance: X) = new Usable[X] {
def use[R](body: (X) => R): R = {
val tx = instance.getTransaction
if (tx.isActive())
throw new Exception("Transaction is active. Unable to nest transactions.")
try {
tx.begin()
val result = body(instance)
tx.commit()
result
}
catch {
case e =>
tx.rollback()
throw e
}
finally {
instance.close
}
}
}
/**
* Uses the useable with the given body.
*/
def using[W, T](useables: Usable[W])(body: (W) => T): T = useables.use(body)
/**
* Nested use of all given useables:
*
* <pre>
* val result: Int = using(u1,u2) {
* println("body")
* 123
* }
* </pre>
*
* Will effectively do:
*
* <pre>
* val result: Int = using(u1) {
* using(u2) {
* println("body")
* 123
* }
* }
* </pre>
*/
def using[T](useables: Usable[_]*)(body: => T): T = {
// the deepest (last element in useables) "use" method should invoke "body" - "body _" passes "body" without invoking it
val callChain = useables.foldRight(body _) {
case (useable, nestedCall) =>
// returns a function that calls the current usable "use" method with in turn executes the "nestedCall"
() => useable.use(_ => nestedCall())
}
// invoke the top function of the nested call chain
callChain()
}
}
view raw package.scala hosted with ❤ by GitHub

And here the most convoluted way of outputting a string:

import java.io.BufferedReader
import java.io.ByteArrayInputStream
import java.io.InputStreamReader
import utils.using._
object Main extends App {
// the "used" instance is passed to the body i.e. "in" is of type BufferedReader
using(new BufferedReader(new InputStreamReader(new ByteArrayInputStream("Testing using".getBytes)))) { in =>
println(in.readLine)
}
val i1 = new BufferedReader(new InputStreamReader(new ByteArrayInputStream("Testing".getBytes)))
val i2 = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(" using".getBytes)))
// here the instances are not passed to the body
// couldn't think of a way preserving the type of each instance
using(i1, i2) {
print(i1.readLine)
println(i2.readLine)
}
// will throw java.io.IOException: Stream closed
i1.readLine
// slightly more useful
val emf = Persistence.createEntityManagerFactory("persistence_unit")
using(emf.createEntityManager) { em =>
// use em here...
}
}
view raw using.scala hosted with ❤ by GitHub

Saturday, 26 May 2012

Akka 2 and setReceiveTimeout

The Akka documentation states that setReceiveTimeout is
"A timeout mechanism can be used to receive a message when no initial message is received within a certain time."
I was unsure about the "initial message" part and turns out that the underlying actor receives a timeout every time there hasn't been a message within the specified timeframe i.e. not just when no initial message has been received.

Small code sample to show the behaviour of setReceiveTimeout:

import java.util.Date
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.util.duration._
class MyActor extends Actor {
context.setReceiveTimeout(5 seconds)
def receive = {
case x: String => println(new Date + " / Recieved: " + x)
case ReceiveTimeout => println(new Date + " / No message received since 5 seconds")
}
}
object Main extends App {
val sys = ActorSystem("test")
val a = sys.actorOf(Props[MyActor])
println(new Date + " / Started")
Thread.sleep(15000) // expect two timeout messages, one after 5 seconds, next after 10
// send ten msg's (1 every second) just when we should receive the third timeout
1 to 10 foreach { x =>
a ! "Message: " + x
Thread.sleep(1000)
}
// the next timeout message should be received 5 seconds after the loop finishes
}

Output:


Sun May 27 13:23:57 EST 2012 / Started
Sun May 27 13:24:02 EST 2012 / No message received since 5 seconds
Sun May 27 13:24:07 EST 2012 / No message received since 5 seconds
Sun May 27 13:24:12 EST 2012 / Recieved: Message: 1
Sun May 27 13:24:13 EST 2012 / Recieved: Message: 2
Sun May 27 13:24:14 EST 2012 / Recieved: Message: 3
Sun May 27 13:24:15 EST 2012 / Recieved: Message: 4
Sun May 27 13:24:16 EST 2012 / Recieved: Message: 5
Sun May 27 13:24:17 EST 2012 / Recieved: Message: 6
Sun May 27 13:24:18 EST 2012 / Recieved: Message: 7
Sun May 27 13:24:19 EST 2012 / Recieved: Message: 8
Sun May 27 13:24:20 EST 2012 / Recieved: Message: 9
Sun May 27 13:24:21 EST 2012 / Recieved: Message: 10
Sun May 27 13:24:26 EST 2012 / No message received since 5 seconds
Sun May 27 13:24:31 EST 2012 / No message received since 5 seconds