Source code for server.web.error_log_clustering

import nltk
from nltk import corpus, sent_tokenize, word_tokenize
from nltk.stem.porter import PorterStemmer
from pandas import DataFrame
import sklearn.feature_extraction.text as ext


[docs] def cluster_error_logs(df: DataFrame) -> DataFrame: """Cluster error logs using unsupervised learning.""" def log_tokenizer(text: str) -> list[str]: # Tokenize by sentence, then by word tokens = [word for sent in sent_tokenize(text) for word in word_tokenize(sent)] # Remove words that are not made of alpha characters tokens = [word.lower() for word in tokens if word.isalpha()] # Remove morphological affixes from words, leaving only the word stem. stemmer = PorterStemmer() stemmed_tokens = [] for token in tokens: if token.isalpha() and token not in stopwords: stemmed_tokens.append(stemmer.stem(token)) return stemmed_tokens def get_stop_words() -> list[str]: # Get stop words nltk.download("stopwords", quiet=True) stopwords = corpus.stopwords.words("english") # Add additional stop words that otherwise produce warnings stopwords.extend(["could", "might", "must", "need", "sha", "wo", "would"]) return stopwords nltk.download("punkt", quiet=True) # Get input data stopwords = get_stop_words() # Learn the vocabulary dictionary and return document-term matrix. count_vectorizer = ext.CountVectorizer( tokenizer=log_tokenizer, stop_words=stopwords, token_pattern=None ) doc_matrix = count_vectorizer.fit_transform(df["error"]) # Learn the idf vector (global term weights), then Tf-idf-weighted document-term matrix tf_idf_transformer = ext.TfidfTransformer().fit_transform(doc_matrix) log_scores = tf_idf_transformer.toarray() per_log_score = [] for row in log_scores: score = row.sum() / len(row.nonzero()[0]) if len(row.nonzero()[0]) > 0 else 0 per_log_score.append(score) df["error_score"] = per_log_score df = df.groupby(["error_score"]).agg( { "error_score": "count", "task_instance_id": lambda x: list(set(x)), "task_id": lambda x: list(set(x)), "error": "first", "workflow_run_id": "first", "workflow_id": "first", "error_time": "first", } ) df.rename( columns={ "error": "sample_error", "error_score": "group_instance_count", "task_id": "task_ids", "error_time": "first_error_time", "task_instance_id": "task_instance_ids", }, inplace=True, ) df.sort_values(by=["group_instance_count"], ascending=False, inplace=True) return df