A Semantic Code Search Solution for TrakCare Using IRIS Vector Search
This article presents a potential solution for semantic code search in TrakCare using IRIS Vector Search.
Here's a brief overview of results from the TrakCare Semantic code search for the query: "Validation before database object save".
-
Code Embedding model
There are numerous embedding models designed for sentences and paragraphs, but they are not ideal for code specific embeddings.
Three code-specific embedding models were evaluated: voyage-code-2, CodeBERT, GraphCodeBERT. While none of these models were pre-trained for the ObjectScripts language, they still outperformed general-purpose embedding models in this context.
The CodeBERT was chosen as the embedding model for this solution. offering reliable performance without the need for an API key. 😁
classGraphCodeBERTEmbeddingModel:def__init__(self, model_name="microsoft/codebert-base"):
self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
self.model = RobertaModel.from_pretrained(model_name)
defget_embedding(self, text):"""
Generate a CodeBERT embedding for the given text.
"""
inputs = self.tokenizer(text, return_tensors="pt", max_length=512, truncation=True, padding="max_length")
with torch.no_grad():
outputs = self.model(**inputs)
# Use the [CLS] token embedding for the representation
cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
return cls_embedding
-
IRIS Vector database
A table is defined with a VECTOR-typed column to store the embeddings. Please note that COLUMNAR index is not supported for VECTOR-typed column.
CodeBERT embeddings have 768 dimensions. It can process texts of the maximal length of 512 tokens.
CREATETABLE TrakCareCodeVector (
fileVARCHAR(150),
codes VARCHAR(2000),
codes_vector VECTOR(DOUBLE,768)
)
-
Python DB-API
The Python DB-API is used to establish a connection with IRIS instance to execute the SQL statements.
- Build the vector database for Trakcare source code.
- Retrieve the top_K highest DOT_PRODUCT code embeddings from IRIS vector database.
# build IRIS vector databaseimport iris
import os
from dotenv import load_dotenv
load_dotenv()
classIrisConn:"""Connection with IRIS instance to execute the SQL statements """def__init__(self) -> None:
connection_string = os.getenv("CONNECTION_STRING")
username = os.getenv("IRISUSERNAME")
password = os.getenv("PASSWORD")
self.connection = iris.connect(
connectionstr=connection_string,
username=username,
password=password,
timeout=10000,
)
self.cursor = self.connection.cursor()
definsert(self, params: list):try:
sql = "INSERT INTO TrakCareCodeVector (file, codes, codes_vector) VALUES (?, ?, TO_VECTOR(?,double))"
self.cursor.execute(sql,params)
except Exception as ex:
print(ex)
deffetch_query(self, query: str):
self.cursor.execute(query)
return self.cursor.fetchall()
defclose_db(self):
self.cursor.close()
self.connection.close()
from transformers import AutoTokenizer, AutoModel, RobertaTokenizer, RobertaModel, logging
import torch
import numpy as np
import os
from db import IrisConn
from GraphcodebertEmbeddings import MethodEmbeddingGenerator
from IRISClassParser import parse_directory
import sys, getopt
classGraphCodeBERTEmbeddingModel:def__init__(self, model_name="microsoft/codebert-base"):
self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
self.model = RobertaModel.from_pretrained(model_name)
defget_embedding(self, text):"""
Generate a CodeBERT embedding for the given text.
"""
inputs = self.tokenizer(text, return_tensors="pt", max_length=512, truncation=True, padding="max_length")
with torch.no_grad():
outputs = self.model(**inputs)
# Use the [CLS] token embedding for the representation
cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
return cls_embedding
classIrisVectorDB:def__init__(self, vector_dim):"""
Initialize the IRIS vector database.
"""
self.conn = IrisConn()
self.vector_dim = vector_dim
definsert(self, description: str, codes: str, vector):
params=[description, codes, f'{vector.tolist()}']
self.conn.insert(params)
defsearch(self, query_vector, top_k=5):
query_vectorStr = query_vector.tolist()
query = f"SELECT TOP {top_k} file,codes FROM TrakCareCodeVector ORDER BY VECTOR_COSINE(codes_vector, TO_VECTOR('{query_vectorStr}',double)) DESC"
results = self.conn.fetch_query(query)
return results
# Chatbot for code retrievalclassCodeRetrieveChatbot:def__init__(self, embedding_model, vector_db):
self.embedding_model = embedding_model
self.vector_db = vector_db
defadd_to_database(self, description, code_snippet, embedding = None):if embedding isNone:
embedding = self.embedding_model.get_embedding(code_snippet)
self.vector_db.insert(description, code_snippet, embedding)
defretrieve_code(self, query, top_k=5):"""
Retrieve the most relevant code snippets for the given query.
"""
query_embedding = self.embedding_model.get_embedding(query)
results = self.vector_db.search(query_embedding, top_k)
return results-
Code Chunks
Since CodeBERT can process texts with the maximal length of 512 tokens. Large classes and methods have to be chunked into smaller parts. Each chunk is then embedded and stored in the vector database.
from transformers import AutoTokenizer, AutoModel, RobertaTokenizer, RobertaModel
import torch
from IRISClassParser import parse_directory
classMethodEmbeddingGenerator:def__init__(self, model_name="microsoft/codebert-base"):"""
Initialize the embedding generator with CodeBERT.
:param model_name: The name of the pretrained CodeBERT model.
"""
self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
self.model = RobertaModel.from_pretrained(model_name)
self.max_tokens = self.tokenizer.model_max_length # Typically 512 for CodeBERTdefchunk_method(self, method_implementation):"""
Split method implementation into chunks based on lines of code that approximate the token limit.
:param method_implementation: The method implementation as a string.
:return: A list of chunks.
"""
lines = method_implementation.splitlines()
chunks = []
current_chunk = []
current_length = 0for line in lines:
# Estimate tokens of the line
line_token_estimate = len(self.tokenizer.tokenize(line))
if current_length + line_token_estimate <= self.max_tokens - 2:
current_chunk.append(line)
current_length += line_token_estimate
else:
# Add the current chunk to chunks and reset
chunks.append("\n".join(current_chunk))
current_chunk = [line]
current_length = line_token_estimate
# Add the last chunk if it has contentif current_chunk:
chunks.append("\n".join(current_chunk))
return chunks
defget_embeddings(self, method_implementation):"""
Generate embeddings for a method implementation, handling large methods by chunking.
:param method_implementation: The method implementation as a string.
:return: A list of embeddings (one for each chunk).
"""
chunks = self.chunk_method(method_implementation)
embeddings = {}
for chunk in chunks:
inputs = self.tokenizer(chunk, return_tensors="pt", truncation=True, padding=True, max_length=self.max_tokens)
with torch.no_grad():
outputs = self.model(**inputs)
# Use the [CLS] token embedding (index 0) as the representation
cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze(0)
embeddings[chunk] = cls_embedding.numpy()
return embeddings
defprocess_methods(self, methods):"""
Process a list of methods to generate embeddings for each.
:param methods: A list of dictionaries with method names and implementations.
:return: A dictionary with method names as keys and embeddings as values.
"""
method_embeddings = {}
for method in methods:
method_name = method["name"]
implementation = method["implementation"]
print(f"Processing method embedding: {method_name}")
method_embeddings[method_name] = self.get_embeddings(implementation)
return method_embeddings-
UI - The Angular APP
The stack uses Angular as the frontend and Python (Flask) as the backend.
-
Future Directions
The searching result is not perfec because the embedding model is not pre-trained for objectscripts.