Running Spacy on Spark/Scala with Jep

Jep is an open source library which makes it possible to invoke Python code from within the JVM, thus letting Java/Scala code to leaverage 3rd party libraries.

This is very interesting in the case of Spark/Scala as it allows us to leverage the Python machine learning eco-system from the confort for the JVM and the powerful distributed capabilities of Spark.

In this article, we will see how to use Spacy to perfrom Named Entity Recognition (NER) from a Spark program and combine the power of both to solve a Machine Learning problem at scale. The code for this tutorial can be found here.

First, we need to install Jep and Spacy (as well as download the NER model) python modules

$ pip install jep
$ pip install spacy
$ python -m spacy download en_core_web_sm

We need to locate the Jep installation path as we will need to provide the Jep jars to our JVM.

$ pip show jep
Name: jep
Version: 3.9.0
Summary: Jep embeds CPython in Java
Home-page: https://github.com/ninia/jep
Author: Jep Developers
Author-email: jep-project@googlegroups.com
License: zlib/libpng
Location: /usr/local/share/conda/envs/py3/lib/python3.9/site-packages
Requires: 
Required-by: 

Once we located the Jep installation folder, we need to expose it using the JAVA_LIBRARY_PATH environment variable

$ export JAVA_LIBRARY_PATH=/usr/local/share/conda/envs/py3/lib/python3.9/site-packages/jep

Now, we are ready to run Jep code. Let’s define a Python script to run Spacy NER on an input text:

import spacy

nlp = spacy.load("en_core_web_sm")

def ner(text):
  doc = nlp(text)
  result = []
  for token in doc:
    result.append((token.text, token.pos_, token.dep_))
  return result

To test that we can run this from JVM, we will use a simple scala code that creates a Jep interpreter, loads the previous Python script, then call it against a test text:

import jep.Jep

object ScalaSpacyExample extends App {

  // Create a Jen interpreter
  val jep = new Jep()

  // Run the python script inside Jep interpreter
  jep.runScript("src/main/python/spacy_ner.py")

  // define a test text
  val text = "The red fox jumped over the lazy dog."

  // Call the Python function inside the Jep interpreter to perform NER
  jep.eval(s"result = ner('$text')")

  // Get the value of the `result` variable from the Jep interpreter
  val result = jep.getValue("result")

  // Convert the java object returned by Jep and print it out
  println(result.asInstanceOf[ArrayList[Object]].asScala
      .map(_.asInstanceOf[java.util.List[String]].asScala.mkString(", "))
      .mkString("|"))
}

Running this scala snippet will give us an output that looks like this:

$ sbt "runMain dzlab.ScalaSpacyExample"
The, DET, det|red, ADJ, amod|fox, NOUN, nsubj|jumped, VERB, ROOT|over, ADP, prep|the, DET, det|lazy, ADJ, amod|dog, NOUN, pobj|., PUNCT, punct

Now, as we validate that our Python script with Spacy can be invoked from a scala program, we can go a head and try to run called from within a Spark program. The main idea is to load a Jep interpreter on every partition of the Spark RDD, and load the Python Spacy script on it then invoke the NER function on every text instance.

import collection.JavaConverters._
import java.util.ArrayList
import jep.{Jep, SharedInterpreter}
import org.apache.spark.{SparkConf, SparkContext}

object SparkSpacyExample extends App {

  // Create a spark config
  val conf = new SparkConf()
    .setAppName("Spark Job")
    .setIfMissing("spark.master", "local[*]")

  // Create a spark context
  val sc = new SparkContext(conf)

  // Read the text file into a Spark RDD
  val textFile = sc.textFile("data/title_StackOverflow.txt")

  // Run a Jep interpreter inside every Spark partition
  val resultRDD = textFile.mapPartitions{input =>
    // Create a Jen interpreter
    val jep = new SharedInterpreter()

    // Run the python script inside Jep interpreter
    val scriptFile = "src/main/python/spacy_ner.py"
    val script = scala.io.Source.fromFile(scriptFile).mkString
    jep.exec(script)

    // Process every line with Spacy NER inside the Jep interpreter
    val output = input.map(text=>{
      jep.eval(s"result = ner('$text')")
      val result = jep.getValue("result")
      // Convert the Jep result into printable output
      result.asInstanceOf[ArrayList[Object]].asScala
      .map(_.asInstanceOf[java.util.List[String]].asScala.mkString(", "))
      .mkString("|")
    })
    output
  }
  println(resultRDD.collect().mkString("\n"))
}

Running this Spark job will give us an output that looks like this:

$ sbt "runMain dzlab.SparkSpacyExample"
. . .
21/08/22 17:55:20 INFO DAGScheduler: Job 0 finished: collect at SparkSpacyExample.scala:33, took 3.434171 s
How, ADV, advmod|do, AUX, aux|I, PRON, nsubj|fill, VERB, ROOT|a, DET, det|DataSet, PROPN, dobj|or, CCONJ, cc|a, DET, det|DataTable, PROPN, conj|from, ADP, prep|a, DET, det|LINQ, ADJ, amod|query, NOUN, compound|resultset, NOUN, pobj|?, PUNCT, punct
. . .