Running Spacy on Spark/Scala with Jep
21 Aug 2021 by dzlabJep is an open source library which makes it possible to invoke Python code from within the JVM, thus letting Java/Scala code to leaverage 3rd party libraries.
This is very interesting in the case of Spark/Scala as it allows us to leverage the Python machine learning eco-system from the confort for the JVM and the powerful distributed capabilities of Spark.
In this article, we will see how to use Spacy to perfrom Named Entity Recognition (NER) from a Spark program and combine the power of both to solve a Machine Learning problem at scale. The code for this tutorial can be found here.
First, we need to install Jep and Spacy (as well as download the NER model) python modules
$ pip install jep
$ pip install spacy
$ python -m spacy download en_core_web_sm
We need to locate the Jep installation path as we will need to provide the Jep jars to our JVM.
$ pip show jep
Name: jep
Version: 3.9.0
Summary: Jep embeds CPython in Java
Home-page: https://github.com/ninia/jep
Author: Jep Developers
Author-email: jep-project@googlegroups.com
License: zlib/libpng
Location: /usr/local/share/conda/envs/py3/lib/python3.9/site-packages
Requires:
Required-by:
Once we located the Jep installation folder, we need to expose it using the JAVA_LIBRARY_PATH
environment variable
$ export JAVA_LIBRARY_PATH=/usr/local/share/conda/envs/py3/lib/python3.9/site-packages/jep
Now, we are ready to run Jep code. Let’s define a Python script to run Spacy NER on an input text:
import spacy
nlp = spacy.load("en_core_web_sm")
def ner(text):
doc = nlp(text)
result = []
for token in doc:
result.append((token.text, token.pos_, token.dep_))
return result
To test that we can run this from JVM, we will use a simple scala code that creates a Jep interpreter, loads the previous Python script, then call it against a test text:
import jep.Jep
object ScalaSpacyExample extends App {
// Create a Jen interpreter
val jep = new Jep()
// Run the python script inside Jep interpreter
jep.runScript("src/main/python/spacy_ner.py")
// define a test text
val text = "The red fox jumped over the lazy dog."
// Call the Python function inside the Jep interpreter to perform NER
jep.eval(s"result = ner('$text')")
// Get the value of the `result` variable from the Jep interpreter
val result = jep.getValue("result")
// Convert the java object returned by Jep and print it out
println(result.asInstanceOf[ArrayList[Object]].asScala
.map(_.asInstanceOf[java.util.List[String]].asScala.mkString(", "))
.mkString("|"))
}
Running this scala snippet will give us an output that looks like this:
$ sbt "runMain dzlab.ScalaSpacyExample"
The, DET, det|red, ADJ, amod|fox, NOUN, nsubj|jumped, VERB, ROOT|over, ADP, prep|the, DET, det|lazy, ADJ, amod|dog, NOUN, pobj|., PUNCT, punct
Now, as we validate that our Python script with Spacy can be invoked from a scala program, we can go a head and try to run called from within a Spark program. The main idea is to load a Jep interpreter on every partition of the Spark RDD, and load the Python Spacy script on it then invoke the NER function on every text instance.
import collection.JavaConverters._
import java.util.ArrayList
import jep.{Jep, SharedInterpreter}
import org.apache.spark.{SparkConf, SparkContext}
object SparkSpacyExample extends App {
// Create a spark config
val conf = new SparkConf()
.setAppName("Spark Job")
.setIfMissing("spark.master", "local[*]")
// Create a spark context
val sc = new SparkContext(conf)
// Read the text file into a Spark RDD
val textFile = sc.textFile("data/title_StackOverflow.txt")
// Run a Jep interpreter inside every Spark partition
val resultRDD = textFile.mapPartitions{input =>
// Create a Jen interpreter
val jep = new SharedInterpreter()
// Run the python script inside Jep interpreter
val scriptFile = "src/main/python/spacy_ner.py"
val script = scala.io.Source.fromFile(scriptFile).mkString
jep.exec(script)
// Process every line with Spacy NER inside the Jep interpreter
val output = input.map(text=>{
jep.eval(s"result = ner('$text')")
val result = jep.getValue("result")
// Convert the Jep result into printable output
result.asInstanceOf[ArrayList[Object]].asScala
.map(_.asInstanceOf[java.util.List[String]].asScala.mkString(", "))
.mkString("|")
})
output
}
println(resultRDD.collect().mkString("\n"))
}
Running this Spark job will give us an output that looks like this:
$ sbt "runMain dzlab.SparkSpacyExample"
. . .
21/08/22 17:55:20 INFO DAGScheduler: Job 0 finished: collect at SparkSpacyExample.scala:33, took 3.434171 s
How, ADV, advmod|do, AUX, aux|I, PRON, nsubj|fill, VERB, ROOT|a, DET, det|DataSet, PROPN, dobj|or, CCONJ, cc|a, DET, det|DataTable, PROPN, conj|from, ADP, prep|a, DET, det|LINQ, ADJ, amod|query, NOUN, compound|resultset, NOUN, pobj|?, PUNCT, punct
. . .