raymondtay.github.io

Blogging site

Journey into the IO Monad Part 2

Continuing from my previous post, i thought to document an interpretation of a solution to logging using cats-effect and how another open source effort from the cats-effect ecosystem has helped me further and i believe it could help you too.

First, let’s frame the context and problem. The context is that i’ve been using cats library and its ecosystem of libraries (and contributed to cats) for a number of years (did a talk in 2017 to share my experience with the Singapore Scala community and evangelise cats, follow this if you like) and a common problem i ended up building several times is logging.

The Writer Monad is what i reached out whenever i want to implement logging traversing computations, state because its easy to use and i started exploring ways to integrate the writer monad into the IO monad. When you are starting to learn something new, its absolutely common to experiment and you should not fear about experimenting things but first let’s establish a basis in which to study. The code snippet below represents the fibonacci sequence in IO monad:

def fib(n: Int, a: Long = 0, b: Long = 1)
       (implicit cs: ContextShift[IO]): IO[Long] =
  IO.suspend {
    if (n == 0) IO.pure(a) else {
      val next = fib(n - 1, b, a + b)
      // Every 100 cycles, introduce a logical thread fork
      if (n % 100 == 0)
        cs.shift *> next
      else
        next
    }
  }

It’s a lazily evaluated computation (via suspend) and the computation allows another thread to handle its computation (via shift which is analogous to moving the evaluation). So, how can i traced this computation ? That is, i want to capture any significant observations i saw in the logic.

Here’s one rendition :

def fibW(n: Int, a: Long = 0, b: Long = 1)
        (implicit cs: ContextShift[IO], W: IO[Writer[List[String],Long]]): IO[Writer[List[String],Long]] =
  IO.suspend {
    if (n == 0) W >>= { writer => IO{ Writer(writer.written :+ s"=> Done $a", a) } } else {

      def next(WW: IO[Writer[List[String], Long]]) =
        WW >>= {writer => fibW(n - 1, b, a + b)(cs, IO{writer.tell(List(s"=> Next $a"))})}

      // Every 100 cycles, introduce a logical thread fork
      if (n % 10 == 0)
        W >>= { writer => cs.shift *> next(IO{writer.tell(List(s"=> Context Shift !"))}) }
      else
        W >>= { writer => next(IO{writer.tell(List(s"=> regular "))}) }
    }
  }

and if you like to read the full source code, follow this


What is happening here is that the Writer Monad is being passed along the way and each step is being recorded (you can see this from the writer.tell(...) ). You might wonder what’s the essential difference between say IO(println("some log statement")) or even println("some log statement) ? The key difference is that this approach allows the developer to collect all the logs in sequence and leaves her/him the flexibility to push these logs to any data consumer downstream.


Coming back … so it wasn’t that hard and if i have to get both the value and logs written then i would run:

fibW(100).unsafeRunSync.value
fibW(100).unsafeRunSync.written

Naturally, if you wish to pass those logs to a scala logger implementation so that a de-centralized logging framework e.g. logstash, scalyr can pick them up for analysis, whatever.

This approach seemed good … and it gave me flexibility in going one way, and i was interested in whether there was another approach which allowed me to directly leverage a scala logger implementation, preferably something that’s open sourced since this is a common problem ?

Turns out Chris Davenport created the log4cats project which leverages the Scala logger. Awesome! Next, i started looking at what i can do with log4cats, here’s my rendition and you can follow the full source code:

// Describe the fibonacci with logging
def fib[F[_]:Sync:ContextShift](n: Int, a: Long = 0, b: Long = 1): F[Long] = 
 Sync[F].suspend {
  if (n == 0) Logger[F].info(s"=> Done $a") *> Sync[F].pure(a)
  else {
   val next = Logger[F].info(s"=> Next $a") *> fib(n - 1, b, a + b)
  
   // Every 100 cycles, introduce a logical thread fork
   if (n % 100 == 0)
    Logger[F].info(s"Context Shift!") *> ContextShift[F].shift *> next
   else next
  }
 }
// create a IO task
def fibL[F[_]:Sync:ContextShift](n: Int, a:Long = 0, b:Long = 1) =
  Logger[F].info("Logging Started.") *>
  fib(n, a, b) >>=
    { result =>
        Logger[F].info("Logging Ended.") *> Sync[F].pure(result)
    }
// run the IO task
fibL[IO](100).unsafeRunSync

This work by log4cats is awesome because, it gave developers another way to flush logs downstream directly, if the developer chooses to do so.


Links to other related posts: