Dynamic RAG with Langchain and Langserve

May 15, 2024

What do you mean dynamic?

Most tutorials I've seen online to load documents into a relevant vector store do so from a document directory at the very start of the process. They take a number of documents already available, which might not always be the case when you're not sure what context your Langchain Application already needs to have

Luckily it's a short step

Setup

I won't be going through the setup of the app. This guide is best suited for apps created via LangServe, so assuming you already have an app setup with LangServe and maybe poetry, I'll start directly with the file in the routes

We'll start with a very basic route that basically just takes in a file uploaded by the user

from load_and_process import load_doc

@app.post("/load_and_process")
async def load_and_process(file: UploadFile):
    await load_doc(file)

Don't forget to add relevant validation on the client side as well as the server for the type of file you'll be uploading. I am assuming a PDF format and will be loading and splitting the document accordingly.

from tempfile import NamedTemporaryFile
from langchain_community.document_loaders import UnstructuredPDFLoader
from langchain_openai import OpenAIEmbeddings
from langchain_postgres import PGVector
from langchain_experimental.text_splitter import SemanticChunker
from fastapi import UploadFile
from config import PG_COLLECTION_NAME, PG_CONNECTION_STRING
import os


async def load_doc(file: UploadFile):
    docs = None
    with NamedTemporaryFile(suffix=".pdf") as tmp_file:
        tmp_file.write(await file.read())
        tmp_file_path = tmp_file.name
        loader = UnstructuredPDFLoader(tmp_file_path)
        docs = loader.load()
    embeddings = get_embeddings()
    chunks = split_text(docs, embeddings)
    create_vectorstore(chunks, embeddings)


def get_embeddings():
    openai_embeddings = OpenAIEmbeddings(
        api_key=os.getenv("OPENAI_API_KEY"),
    )
    return openai_embeddings


def split_text(docs, embeddings):
    text_splitter = SemanticChunker(embeddings=embeddings)
    chunks = text_splitter.split_documents(docs)
    return chunks


def create_vectorstore(chunks, embeddings):
    PGVector.from_documents(
        documents=chunks,
        embedding=embeddings,
        collection_name=PG_COLLECTION_NAME,
        pre_delete_collection=True,
        connection=PG_CONNECTION_STRING,
    )

Going over the code in detail:

  1. Since most PDF document loaders or otherwise from Langchain assume a directory path for the documents, we used NamedTemporaryFile to temporarily save the file to a location
  2. We then use the temporary path to load the PDF file and extract the relevant documents from it. (Note: In case you would like to cite sources for your queries or would like to update the vectorstore in terms of deleting documents, it is recommended to add a mapping between the actual name of the file and the temporary path given to the file, as the temporary path is the source being added to the metadata)
  3. When you're done loading your PDF, you can move outside the codeblock that destroys the temporary PDF file. Now you can go ahead and split the text with a splitter that suits your needs. Create relevant embeddings based on the LLM model you create and then store the chunks in a vector database

def invoke():
    llm = get_llm()
    prompt = hub.pull("rlm/rag-prompt")
    vectorstore = get_vector_store()
    retriever = vectorstore.as_retriever()
    rag_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
            | prompt
            | llm
            | StrOutputParser()
    )
    return rag_chain

Now you can go ahead and serve this via your routes

Langserve neatly adds relevant routes for streaming, invoking and other useful routes to your API

add_routes(app, rag_chain, path="/query")

If you're using this in a browser or frontend code, allow CORS via FastAPI as well

(Replace the * with relevant domains and headers when going to production to keep your application secure)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=["*"],
)

And that's it

You now have a very basic RAG app that can take in PDF's dynamically. In case you want to use multiple PDF's or be able to delete from the vectorstore collection, you can modify the code accordingly to create an initial vectorstore and then add and remove documents via API routes


Latest articles