Writing a high quality data pipeline for master data with apache spark – Part 3

Published on: October 14, 2019

Applying our best practices in a Spark application

In a previous article, we explored a number of best practices for building a data pipeline. We then followed up with an article detailing which technologies and/or frameworks can help us adhere to these principles. We now continue with a last article in this series, in which we will show how you can build Apache Spark pipelines in Scala while adhering to the proposed principles.

Obviously, this will be the most technical article of the series. We will not try to create a basic Spark tutorial here. The internet is already full of those. By the way, to learn Spark, if we can give you a small piece of advice: don’t just start reading. Instead, make your hands dirty with a hands-on tutorial like this one.

This is not a Scala tutorial either. To cover the basics, this one will already provide much more than you need. To go beyond the basic level, this video will get you started. You will need a bit of Scala knowledge to be able to read this article.

How do we model a job?

A job is a piece of logic that transforms a number of inputs to a number of outputs (datasets, a trained model, a visualization). A job here could just be a standalone program, which is perfectly doable, and we could then use Luigi / Airflow / Oozie to chain jobs. But remember the importance of composability and reuse of logic.

A better way to model a job here is just a function in Scala with inputs and outputs. A function is composable, and if we want our standalone program, it’s easy to wrap.

Take the example below, computing the population of a city:

  case class Point(x: Int,y: Int)
  case class Polygon(coordinates: Seq[Point])
  case class City(name: String, geometry: Polygon)
  case class House(location: Point, inhabitants: Int)
  case class CityPopulation(city: City, nrOfInhabitants: Int, nrOfHouses: Int)

  def getCityPopulation(spark: SparkSession, cities: Dataset[City], 
                    houses: Dataset[House]): Dataset[CityPopulation] = {
    // do some fancy spatial join here
    var ds: Dataset[CityPopulation] = ???
    ds
  }

Data validation

We could just write some documentation for our function, but why do that if we could just as well express these constraints in code and enforce them. We will use Amazon Deequ for this, but it’s easy enough to take something else or even roll your own if you don’t want to use Amazon Deequ or if you’re not even considering Apache Spark.

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}

case class ValidationError(result: VerificationResult) extends Exception

def getCityPopulation(spark: SparkSession, cities: Dataset[City], houses: Dataset[House]): Try[Dataset[CityPopulation]] = {
    import spark.implicits._
    val result = VerificationSuite().onData(cities.toDF()).addCheck(Check(CheckLevel.Error, "City validation").isUnique("name")).run()
    if (result.status == CheckStatus.Error) {
      Failure(ValidationError(result))
    } else {
      // do some fancy spatial join here
      var ds: Dataset[CityPopulation] = ???
      Success(ds)
    }
  }

In the above example, we now check for the unicity of the city names. It’s easy to see how adding some convenience functions might make it even more readable. If the code is not readable enough, we will need to add documentation repeating what we said in the code.

Let’s try to add a Unique annotation:

sealed trait ValidationAnnotation extends scala.annotation.StaticAnnotation;
case class Unique(s: Any) extends ValidationAnnotation
case class NotNull(s: Any) extends ValidationAnnotation

...
case class City(@Unique name: String, geometry: Polygon)

Now can we write a tool that builds a VerificationSuite automatically for a dataset. We could use reflection or generic programming using the frameless library (in below example, we use reflection):

import scala.reflect.runtime.universe._

case class VerificationError(z: VerificationResult) extends Exception

object validation {

  def getFieldsWithAnnotation[T: TypeTag, B <: ValidationAnnotation: TypeTag] = typeOf[T].members.filter(!_.isMethod).map(_.typeSignature).collect {
    case t if t <:< typeOf[B] => t.typeSymbol.name.toString
  }

  def getAutoValidationChecks[TP: TypeTag]: Seq[Check] = {
    val unique_fields = getFieldsWithAnnotation[TP, Unique].map(n => Check(CheckLevel.Error, "Unicity of " + n).isUnique(n)).toSeq
    val non_null = getFieldsWithAnnotation[TP, NotNull].map(n => Check(CheckLevel.Error, "nozt null constraint " + n).isComplete(n)).toSeq
    unique_fields ++ non_null
  }

  def verify[T: TypeTag](ds: Dataset[T], extraVerifications: Seq[Check] = Seq()): Try[Dataset[T]] = {
    val result = new VerificationSuite().onData(ds.toDF()).addChecks(getAutoValidationChecks[T] ++ extraVerifications).run()
    if (result.status == CheckStatus.Error) {
      Failure(VerificationError(result))
    } else {
      Success(ds)
    }
  }
}

This syntactic magically simplifies our job a bit:

import validation._
def getCityPopulation(spark: SparkSession, cities: Dataset[City], houses: Dataset[House]): Try[Dataset[CityPopulation]] = {
    import spark.implicits._
    for {
        c <- verify(cities),
        h <- verify(houses),
        population = { // or <- instead of = if you plan on returning a Try
              // do some fancy spatial join here
             var ds: Dataset[CityPopulation] = c.spatialJoin(h).aggregateStuff()
             ds
        }
        yield population
    }

Now that we have a job, let’s go on to model a pipeline.

How can we model a pipeline?

Scala allows us to chain jobs together using For Comprehensions that are concise and readable:

def loadCities(spark: SparkSession): Try[Dataset[City]] = ???
def loadHouses(spark: SparkSession) : Try[Dataset[House]] = ???
for {
    cities <- loadCities(spark)
    houses <- loadHouses(spark)
    population <- getCityPopulation(spark,cities,houses)
} yield population

This way, our pipeline definition is completely separate from our job definition. If we now want to run with a test dataset or with cities of another country, we have no problem. We just change the For Comprehension.

However, there is still a problem left to solve. Suppose the “houses” dataset actually takes ages to compute and doesn’t change every minute, so we want to store a version of it somewhere (database table ? flat file ?) to speed up our pipeline.

Let’s introduce a function cacheInHadoop. When there is no cache, it will just output the data it got as input, but when a cache is present and valid, it will not use its input and return the stored output instead:

import scala.concurrent.duration._
def loadCities(spark: SparkSession): Try[Dataset[City]] = ???
def loadHouses(spark: SparkSession) : Try[Dataset[House]] = ???
def cacheInHadoop[T](spark: SparkSession, input: Dataset[T], filename: String, duration: Duration) : Try[Dataset[T]] = ???

for {
  cities <- loadCities(spark)
  houses <- loadHouses(spark)
  cache_houses <- cacheInHadoop(spark, houses, "hdfs://tmp/houses.orc", 1.days)
  population <- getCityPopulation(spark,cities,cache_houses)
} yield population

Our cache job still needs the “houses” dataset as an input in case the cache is invalid. However, we have a problem here. Even if the cache is valid, it will still compute our Dataset[House] (since the cacheInHadoop needs it as an argument).

However, most operations on Spark datasets are lazy: they are only computed if they are consumed. So, this might not be a problem at all: the “houses” dataset is created but never actually consumed, so Spark doesn’t do any work to compute if not needed.

This is not the case for all possible spark operations. For instance, iterative operations will most probably not be lazy. So, we still need a solution.

Introducing laziness.

In the above code we use Try objects to model things that may have failed. We would have to “box” this Try into something that doesn’t get executed right away. In FP, a box or a wrapper is called a monad.

We can use existing libraries for this: Cats-Effect has IO, there is ZIO, Monix has Tasks. Below, we define a class SJob loosely based on the frameless Job class (which is rather simple):

import scala.util._
import scala.util.control.NonFatal

// non functional requirements for running this job (eg max cache age)
case class NFRequirements(maxOutputAge: Duration, disableValidations: Boolean = false, other: Map[String, String] = Map());

abstract class SJob[A](implicit spark: SparkSession) {
  self =>

  def run(requirements: NFRequirements): Try[A]

  def map[B](fn: A => B): SJob[B] = new SJob[B] {
    override def run(requirements: NFRequirements): Try[B] = {
      try {
        self.run(requirements).map(fn)
      } catch {
        case t: Throwable => Failure(t)
      }
    }
  }

  def foreach(fn: A => Unit): Unit = ???

  def flatMap[B](fn: A => SJob[B]): SJob[B] = new SJob[B] {
    override def run(requirements: NFRequirements): Try[B] = {
      val x = self.run(requirements)
      x match {
        case Success(value) => fn(value).run(requirements)
        case Failure(x) => Failure(x)
      }
    }
  }
}

object SJob {
  def apply[A](a: => A)(implicit spark: SparkSession): SJob[A] = new SJob[A]() {
    override def run(requirements: NFRequirements): Try[A] = {
      try {
        Success(a)
      } catch {
        case t: Throwable => Failure(t)
      }
    }
  }
}

And now we can use it in the new version of our jobs:

implicit val spark = ...

def loadCities(spark: SparkSession): SJob[Dataset[City]] = ???
def loadHouses(spark: SparkSession) : SJob[Dataset[House]] = ???
def cacheInHadoop[T: Encoder](spark: SparkSession, input: SJob[Dataset[T]], filename: String, duration: Duration) : SJob[Dataset[T]] = ???

def verifyJob[T: Encoder](ds: SJob[Dataset[T]])(implicit spark: SparkSession) : SJob[Dataset[T]] = ds.flatMap(x => {
    new SJob[Dataset[T]]() {
      override def run(requirements: NFRequirements): Try[Dataset[T]] = validation.verify(x)
    }
})


def getCityPopulation(spark: SparkSession, citiesT: SJob[Dataset[City]], 
        housesT: SJob[Dataset[House]]): SJob[Dataset[CityPopulation]] = SJob {
    import spark.implicits._
    for {
      houses <- verifyJob(housesT)
      cities <- verifyJob(citiesT)
      population = {
          // do some fancy spatial join here which will result in a Try[Dataset[CityPopulation]]
      }
    } yield population
}

We introduce a “validate” function here that will validate a Try[Dataset] and turn a success into a failure if it doesn’t validate.

Using the new job definition, we can rewrite our pipeline:

val cities = loadCities(spark)
val houses = loadHouses(spark)
val cache_houses = cacheInHadoop(spark, houses, "hdfs://tmp/houses.orc", 1.days)
val population = getCityPopulation(spark, cities, cache_houses)

// now run the pipeline
val result = population.run(NFRequirements(...))

Something fundamental has changed here: we don’t pass datasets or computed values around anymore. Instead, we pass functions (wrapped in an SJob) that can compute the datasets if needed.

When we now compose multiple SJobs together, we will actually get a new SJob (the Scala for comprehension in the getCityPopulation function above actually does this: the result of the for comprehension is a SJob)

We have now constructed a pipeline in a very lightweight way:

  • We only introduced a couple of (very light) classes, no new DAG library to learn.
  • We have high-level building blocks (getCityPopulation instead of leftJoin or filter or map)
  • The compiler ensures that the datatypes are compatible between the jobs
  • We have data validation built in.

Unit testing

Writing a unit test is trivial for our code. The goal of our unit test is to catch failures of non-typechecked code like spark SQL queries. We can use Scalatest for this:

import org.scalatest.FunSuite
import ...



class TestComputeCities extends FunSuite {
    val someHouses = Seq(
      House(Point(1,1),4),
      House(Point(1,3),6),
      House(Point(3,1),1),
      House(Point(3,3),8)
    )
    val someCities = Seq(
      City("city1", Polygon(Seq(Point(0,0),Point(0,2),Point(4,2),Point(4,0),Point(0,0)))),
      City("city2", Polygon(Seq(Point(0,2),Point(4,2),Point(4,4),Point(0,4),Point(0,2)))),
    )

    test("compute city population") {
      val spark = ...
      import spark.implicits._
      val houses = SJob {someHouses.toDS}
      val cities = SJob {someCities.toDS}
      val pop = getCityPopulation(spark, cities, houses).run(NFRequirements(...))
      pop match {
        case Success(data) => {
              val populations = data.collect()
              assert(someHouses.map(_.inhabitants).reduce(_+_) >=
                  populations.map(_.nrOfInhabitants).reduce(_+_),
                  "total population should at least be populations of cities")
              )
        }
        case _ => {
            fail("could not compute city population")
        }
      }
      
    }
}

Here we did one basic check, but we could also check what happens when we feed invalid data to our job (e.g. overlapping cities), or we could check the correctness of the results for some known situations.

In practice

At Kapernikov we use a slightly more elaborate SJob variant. The main changes with the version in this article are:

  • We (optionally) keep some metadata, allowing us to know the age of a dataset, or the high-level computation graph.
  • We added a lot of convenience / syntactic sugar, e.g. to quickly add input/output validation to an existing SJob without validation, or to remove boilerplate for SJob[Dataset] and SJob[Dataframe].

I still don’t want to call it a library. This is still a very thin wrapper around Spark objects. For me, the applied principles are more important than the way there were applied.

Author

Frank Dekervel

Frank is one of the founders of Kapernikov. Together with his partner, Rein Lemmens, Frank started a web services agency in 2004. But with the addition of a third partner, Jan-Fred ...