A Sample of Lambda Architecture Project

I would like to present my pet project: A Sample of Lambda Architecture Project, the boilerplate for detecting IoT sensor anomalies using the Lambda architecture. It's based on Akka/Scala cluster, and utilizes Cassandra database and Redis in-memory data store.

The project assumes two data processing layers: the fast one (to ensure SLA requirements for latency) and another one based on machine learning (to ensure low rate of false positives). One may imagine it as a data flow graph from one origin (events) with two branches (those processing layers), hence the Lambda architecture (λ symbol). Please note that the classic Lambda architecture is slightly different, however the idea is similar.

The details are available on the project page, plus I'm going to expand the project with the additional functionality (using protobufs, real world clustering and more benchmarks with the analysis actor in different programming languages). Let me just note here the code examples to give better understanding how Lambda architecture works in the project.

Speed layer is directly used by the analysis Akka actor (with the history of events saved in Cassandra database). The layer uses the heuristics based on the standard deviation:

\src: Analyzer.scala

// ANCHOR: withHeuristic begin /** * Calculates the probability of the anomaly with the heuristics. * * @param value The analyzed value * @param history The previous values * @return The probability of the anomaly */ def withHeuristic(value: Double, history: Iterable[Double]): Double = { val size = history.size val avg = history.sum / size def sqrDiff(x: Double) = (x - avg) * (x - avg) val stdDev = math.sqrt((0.0 /: history)(_ + sqrDiff(_)) / size) val valueDev = math.abs(value - avg) val anomaly = (valueDev - stdDev) / (2 * stdDev) // truncate the value to be in the [0, 1] interval anomaly.max(0).min(1) }

Batch layer uses the Random Forest classification model (machine learning algorithms are provided by Smile engine). The corresponding actor runs the batch analysis on a schedule and saves the model into Redis in-memory data store. The model is used later by the analysis actor:

\src: Analyzer.scala

// ANCHOR: withTrainedModel begin /** * Calculates the probability of the anomaly with the trained model. * * @param value The analyzed value * @param rf The trained model (Random Forest classification) * @return The probability of the anomaly */ def withTrainedModel(value: Double, rf: RandomForest): Double = { val probability = ML.RandomForest(rf).predict(Array(value)) // anomaly class has the index 1 probability(1) }

Serving layer is represented by the analysis Akka actor. It uses the results from both speed and batch layers with the weighted average:

\src: Analyzer.scala

// ANCHOR: analyze begin /** * Calculates the probability of the anomaly using both heuristics and trained model * * @param name Name of the IoT sensor * @param entries Sensor history entries * @param rf Optionally trained model * @return Meta information with the results of the analysis */ private def analyze(name: String, entries: Iterable[Entry], rf: Option[RandomForest]) = { val values = entries.map(_.value) val value = values.head val approxAnomaly = Analyzer.withHeuristic(value, values) val mlAnomalyOpt = rf.map(Analyzer.withTrainedModel(value, _)) val avgAnomaly = mlAnomalyOpt match { case Some(mlAnomaly) => (35.0 * approxAnomaly + 65.0 * mlAnomaly) / 100.0 case None => approxAnomaly } val ts = new Date(System.currentTimeMillis) SensorMeta(name, ts, approxAnomaly, mlAnomalyOpt.getOrElse(-1), avgAnomaly) }

The actor can run within a cluster therefore providing the scalability and required performance (please note that Cassandra and Redis replicas might be needed though). The project describes a sample cluster configuration (with manual setup), but it surely could be automated.

Scalateχ \src: LambdaSample.scalatex

Comments

Popular posts from this blog

Web application framework comparison by memory consumption

Trac Ticket Workflow

Shellcode detection using libemu