Scalable RAG applications on GCP with Serverless architecture - Part 2
12 Nov 2023 by dzlabIn a previous article, we built a serverless data pipeline to make an entire dataset searchable using simple English. In this second part, we will build a query anwering pipeline as represented by the Steps 7 to 10 in the diagram above. We will leverage pgvector Cosine search operator to filter documents from our dataset and Vertex AI with LangChain to generate a final answer based on the selected subset of documents.
Serverless Query Answering
In this section, we define a set of helper functions to implement each component of the Query Answering pipeline in the lib.py
file.
Let’s first define the dependencies in a requirements.txt
file
cloudevents
functions_framework=3.*
asyncio==3.4.3
asyncpg==0.27.0
cloud-sql-python-connector["asyncpg"]==1.2.3
pgvector==0.1.8
langchain==0.0.196
transformers==4.30.1
google-cloud-aiplatform==1.26.0
google-cloud-storage
Embed the query
Next, we define a helper function to generate the vector embedding for the user query.
from langchain.embeddings import VertexAIEmbeddings
# Generate embedding for the user query
def embed_query(user_query):
embeddings_service = VertexAIEmbeddings()
return embeddings_service.embed_query([user_query])
To learn more about using Vertex AI for retrieval check the following article
Retrive the documents
To filter documents from our dataset we use the pgvector cosine similarity search operator. The following helper function establish a connection to PostgreSQL and then submits a retrieval query that uses the Cosine operator <=>
:
import os
import asyncio
import asyncpg
from google.cloud.sql.connector import Connector
# Cloud SQL instance connection name
db_host = os.environ["INSTANCE_CONNECTION_NAME"] # e.g. project:region:instance
db_user = os.environ["DB_USER"] # e.g. 'my-db-user'
db_pass = os.environ["DB_PASS"] # e.g. 'my-db-password'
db_name = os.environ["DB_NAME"] # e.g. 'my-database'
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
# Find the documents most closely related to the input query.
def retrieve(query_embedding, similarity_threshold=0.8, num_matches=3):
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
# Create connection to Cloud SQL database
conn: asyncpg.Connection = await connector.connect_async(
db_host,
"asyncpg",
user=db_user,
password=db_pass,
db=db_name,
ip_type=ip_type,
)
# Use cosine similarity search to find documents
results = await conn.fetch("""
SELECT content
FROM document_embeddings
WHERE 1 - (embedding <=> $1) > $2
LIMIT $3
""",
query_embedding, similarity_threshold, num_matches)
await conn.close()
return results
Answer user query
Next, we use LangChain to answer the user query. After filtering documents from the dataset to only relevant ones using pgvector
, the next step is to add them to the prompt input for the VertexAI LLM model and to ask the model to answer the user query. This is simply done with LangChain’s Question Answering Chain as follows:
from langchain.chains.qa_with_sources import load_qa_with_sources_chain
from langchain.docstore.document import Document
from langchain.llms import VertexAI
def qa(matches, question):
llm = VertexAI()
chain = load_qa_with_sources_chain(llm)
documents = [Document(page_content=text) for text in matches]
inputs = {"input_documents": documents, "question": question}
outputs = chain(inputs, return_only_outputs=True)["output_text"]
return outputs
To learn more about using LangChain for Question Answering check the following article
All together
Finally, we create a simple Flask based API to answer user queries. Upon receiving a user request with a question, we embed the question, use the embeddings to filter out non relevant documents, then pass the selected documents along with the user query to an LLM to generate a response.
import os
from flask import Flask, request
from lib import embed_query, retrieve, qa
@app.route('/answer', methods= ['POST'])
def answer():
# Get the user query
user_query = request.data
# Embed user query
embed_query(user_query)
# Retrieve similar documents
matches = retrieve(query_embedding)
# Answer the query given found matches
response = qa(matches, user_query)
return response
if __name__ == "__main__":
app.run(port=8000, host='0.0.0.0', debug=True)
Deploy to Cloud Function
Finnally, we can package everything and deploy it to GCP. We will use Cloud Run to deloy our function as follows:
gcloud run deploy qa-function \
--source . \
--execution-environment gen2 \
--service-account fs-identity \
--set-env-vars INSTANCE_CONNECTION_NAME=$PROJECT_ID:$REGION:$INSTANCE_NAME
--allow-unauthenticated
After deployment finishes, we can test it with the following curl
curl -X POST -H "Content-Type: text/plain" \
-d "Tell me a joke" \
https://my-service-abcdef-uc.a.run.app
That’s all folks
In a previous article, we saw how to leverage Google Cloud managed services to build a serverless large-scale data pipeline to ingest and embed documents. In this article, we implemented a scalable retrieval system on top of the previously indexed documents with Vertex AI and Cloud SQL for Postgres.
I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc.