Failure type functions
Contents
Failure type functions#
This notebook contains the helper functions to run failure_type_classifier.ipynb
.
import datetime
import pandas as pd
import numpy as np
import boto3
from io import BytesIO
from scipy.signal import convolve2d
from pathlib import Path
Metric Template Functions#
These are functions copied from ../data_sources/TestGrid/metrics_template.ipynb
as a workaround to this issue where functions imported from notebooks must be in the same directory.
def decode_run_length(x):
"""
Decodes the run length encoded data into an unrolled form.
Returns a list of values.
E.g. takes in [{"value":12, "count":3}, {"value":1, "count":2}]
and gives [12, 12, 12, 1, 1]
"""
lst = []
for run_length in x:
extension = [run_length["value"]] * run_length["count"]
lst.extend(extension)
return lst
class CephCommunication:
"""
Class to establish communication with a ceph s3 bucket.
It connects with the bucket and provides methods to read and write data in the parquet format.
"""
def __init__(
self, s3_endpoint_url, aws_access_key_id, aws_secret_access_key, s3_bucket
):
self.s3_endpoint_url = s3_endpoint_url
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.s3_resource = boto3.resource(
"s3",
endpoint_url=self.s3_endpoint_url,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
)
self.bucket = s3_bucket
## Todo: Add try catch
def upload_to_ceph(self, dataframe, s3_path, filename):
"""
This helper function takes as input the data frame to be uploaded, and the output filename.
It then saves the data frame in the defined ceph bucket.
"""
parquet_buffer = BytesIO()
dataframe.to_parquet(parquet_buffer)
s3_obj = self.s3_resource.Object(self.bucket, f"{s3_path}/{filename}")
status = s3_obj.put(Body=parquet_buffer.getvalue())
return status
def read_from_ceph(self, s3_path, filename):
"""
Helper function to read from ceph and see if the saved data is correct.
"""
buffer = BytesIO()
s3_object = self.s3_resource.Object(self.bucket, f"{s3_path}/{filename}")
s3_object.download_fileobj(buffer)
df_temp = pd.read_parquet(buffer)
return df_temp
def save_to_disk(dataframe, path, filename):
"""
Helper function to save the dataframe
as a parquet file to disk.
"""
dataset_base_path = Path(path)
dataset_base_path.mkdir(parents=True, exist_ok=True)
dataframe.to_parquet(f"{path}/{filename}")
return True
def read_from_disk(path, filename):
"""
Helper function to read from disk and see if the saved data is the same.
"""
return pd.read_parquet(f"{path}/{filename}")
Flakiness detection functions#
These functions are used in the ./testgrid_flakiness_detection.ipynb
notebook to determine optimal flake detection calculations.
def naive_flake_calc(test_row):
return (
100
* np.logical_or(np.array(test_row) == 12, np.array(test_row) == 13).sum()
/ len(test_row)
)
def calc_consecutive_failures(test_row, i):
"""This function calculates number of consecutive failures
Parameters
----------
test_row : array
array of test runs with 0, 1, 12 values as not run, pass, fail respectively
i : float, int
index in array i
Returns
----------
result: int
number of consecutive failures from index i
"""
result = 0
while i < len(test_row) and (test_row[i] == 12 or test_row[i] == 0):
if test_row[i] == 12:
result += 1
i += 1
return result
def calc_flakiness_score(test_row, ignore_failures_in_a_row=3):
"""This function calculates flakiness score as the number of edges divided by total runs.
At google, If the test is failing three times in a row, then only it reported as real failures;
otherwise, it's considered a flaky test.
(https://testing.googleblog.com/2016/05/flaky-tests-at-google-and-how-we.html)
Hence, we ignored three or more than three consecutive failures
and test cases that are not run while calculating the flakiness score.
We always consider label 13 as an edge.
since currently, each failed test is retry, and if it's passed on a subsequent run it is considered as flaky.
Parameters
----------
test_row : array
array of test runs with 0, 1, 12, 13 values as not run, pass, fail, flaky respectively
Returns
----------
flakiness: int
Flakiness score lies between 0 and 100; 0 is no flakiness, and 100 is maximum flakiness.
"""
flips = 0
i = 0
## ignore not run instances in the starting
while i < len(test_row) and test_row[i] == 0:
i += 1
## intializing last_passing variable value
if i >= len(test_row):
return 0
elif test_row[i] == 1:
last_passing = True
elif test_row[i] == 13:
last_passing = True
flips += 1
elif test_row[i] == 12:
last_passing = False
else:
last_passing = True
considerd = 1
i += 1
while i < len(test_row):
## ignoring more than three consecutive failures
## If the test is consecutively failing for three or more than three runs,
## we considered did not consider it an edge.
cf = calc_consecutive_failures(test_row, i)
if cf >= ignore_failures_in_a_row:
i = i + cf
if i >= len(test_row):
break
s = test_row[i]
if s == 1:
## run is pass
considerd += 1
last_passing = True
elif s == 0:
## not run
pass
elif s == 13:
## flaky
flips += 1
considerd += 1
last_passing = True
elif s == 12:
## run is fail
considerd += 1
if last_passing:
flips += 1
last_passing = False
i += 1
if considerd == 0:
return 0
## multiplying by 2 since flakiness score lies between 0 and 100
flakiness = (flips / (considerd)) * 100
return flakiness
def calc_flake_edges(test_array):
"""This function calculates the number of edges, the transition of a
particular test case from pass to fail.
Parameters
----------
test_array : numpy array
array of test runs with 0, 1, 12, 13 values as not run, pass, fail, flaky respectively
Returns
----------
flake_edges: numpy array
array where the value are the starting of the edge
"""
flake_edges = [] ## array to store edges
ignore_failures_in_a_row = 3
i = 0
valid = 0
while i < len(test_array) and test_array[i] == 0:
i += 1
## intializing last_passing variable value
if i >= len(test_array):
return 0
elif test_array[i] == 1:
last_passing = True
valid = i
elif test_array[i] == 13:
last_passing = True
elif test_array[i] == 12:
last_passing = False
else:
last_passing = True
i += 1
## Finding all the edges in our test runs
while i < len(test_array):
## ignoring more than three consecutive failures
## If the test is consecutively failing for three or more than three runs,
## we do not consider it an edge.
cf = calc_consecutive_failures(test_array, i)
if cf >= ignore_failures_in_a_row:
i = i + cf
if i >= len(test_array):
break
s = test_array[i]
if s == 1:
## run is pass
last_passing = True
valid = i
elif s == 0:
## not run
pass
elif s == 13:
last_passing = True
flake_edges.append(i)
elif s == 12:
## run is fail
if last_passing:
flake_edges.append(valid)
last_passing = False
i = i + 1
return flake_edges
def flake_edge_end(test_array, flake_edges, index):
"""This function calculates the end of the edges. Starting of the edge will always be 1 or 13
Parameters
----------
test_array : numpy array
array of test runs with 0, 1, 12, 13 values as not run, pass, fail, flaky respectively
Returns
----------
flake_edges: numpy array
array where the value are the starting of the edge
"""
flake_end = flake_edges[index]
while test_array[flake_end] != 12 and test_array[flake_end] != 13:
flake_end = flake_end + 1
if flake_end > len(test_array):
break
return flake_end
def calc_optimal_flakiness_score(test_array, threshold=30):
"""Calculate the flakiness score between edges since it will maximize the flakiness score.
Specifically, we calculate the flakiness score between the two farthest edges
which have a flakiness score greater than a threshold.
Parameters
----------
test_array : array
array of test runs with 0, 1, 12, 13 values as not run, pass, fail, flaky respectively
threshold: int default 30
Returns
----------
modified_test_array: numpy array
modified test grid where the value of failure due to flake is 13
flake_edges_dict: dictionary
key of the dictionary is a tuple of time period, and value is the flakiness score between the time period
"""
modified_test_array = test_array.copy()
flake_edges_dict = {}
flake_edges = calc_flake_edges(test_array)
## flakiness score between the two farthest edges
p = 0
q = 0
while p < len(flake_edges):
possible_flake = False
for q in range(p + 1, len(flake_edges)):
flake_end = flake_edge_end(test_array, flake_edges, q)
curr_flake = calc_flakiness_score(
test_array[flake_edges[p] : flake_end + 1]
)
if curr_flake > threshold:
possible_flake = True
max_flake = curr_flake
max_p = flake_edges[p]
max_q = flake_end
else:
break
p = q
if possible_flake:
for k in range(max_p, flake_end + 1):
if modified_test_array[k] == 12:
modified_test_array[k] = 13
curr = k + 1
while curr < len(modified_test_array):
if modified_test_array[curr] == 12:
modified_test_array[curr] = 13
else:
break
arr = []
arr.append(max_p)
arr.append(max_q)
flake_edges_dict[tuple(arr)] = max_flake
if p == len(flake_edges) - 1:
break
return modified_test_array, flake_edges_dict
def flake_annotation(val_array, flake_score, flake_score_threshold):
# for illustration purposes, we are removing the flaky labels
val_array = [12 if (x == 13) else x for x in val_array]
if flake_score > flake_score_threshold:
return [13 if (x == 12) else x for x in val_array]
else:
return val_array
Failure type classification functions#
These functions are used in ./failure_type_classifier.ipynb
to detect different types of flakes.
def detect_infra_flake(data, grid, tab_name, grid_name):
"""
This function takes a 2d numpy array "grid" and uses a diagonal edge detecting
filter to identify time windows in which 'infrastructure flakes' occured.
Returns a list of dates and test indexes
"""
infra_flakes_found = []
# 2d filter that will have its highest value when convolved with a diagonal pattern.
infra_flake_filter = np.array([[-1, 1], [1, -1]])
# Find the spots on the map where the convolution had its maximum value.
spots = convolve2d(infra_flake_filter, grid, mode="valid")
infra_flakes = np.where(spots == 4)
dates = data[tab_name][grid_name]["timestamps"]
infra_flake_dates = np.array(dates)[list([infra_flakes][0][1])]
infra_flake_dates = [
datetime.date.fromtimestamp(x // 1000) for x in infra_flake_dates
]
infra_flake_tests = list([infra_flakes][0][0])
infra_flakes_found = list(zip(infra_flake_dates, infra_flake_tests))
return infra_flakes_found
def detect_install_flake(data, grid, tab_name, grid_name):
"""
If greater than 90% of tests are not run for 2 or more consecutive days,
then we will record this period as as an install flake.
"""
install_flakes = []
n_rows, n_cols = grid.shape
grid = pd.DataFrame(grid)
not_run_percent = grid.apply(lambda x: sum(x == 0) / n_rows, axis=0)
install_errors = not_run_percent > 0.90
install_error_streaks = run_length_encode(install_errors)
for i in install_error_streaks:
if i[0] is True and i[1] >= 2:
install_flakes.append((i[2] - i[1], i[2]))
dates = data[tab_name][grid_name]["timestamps"]
install_flake_dates = []
if install_flakes:
install_flake_dates = np.array(dates)[list([install_flakes][0][0])]
install_flake_dates = [
datetime.date.fromtimestamp(x // 1000) for x in install_flake_dates
]
return install_flake_dates
def detect_new_test_failures(data, grid, tab_name, grid_name):
"""
If 6 or more consecutive failures occur, then we will record this period
as a new test failure
"""
grid = pd.DataFrame(grid)
new_test_failures = grid.apply(single_new_test_failure, axis=1)
none_empties = new_test_failures[new_test_failures.apply(lambda x: len(x)) > 0]
dates = data[tab_name][grid_name]["timestamps"]
for i, j in enumerate(none_empties):
none_empties[i] = [np.array(dates)[[x[0], x[1]]] for x in j]
none_empties[i] = [
(
datetime.date.fromtimestamp(x[0] // 1000),
datetime.date.fromtimestamp(x[1] // 1000),
)
for x in none_empties[i]
]
idx = list(none_empties.index)
new_test_failures = [(idx[i], none_empties[i]) for i in range(len(none_empties))]
return new_test_failures
def single_new_test_failure(test):
"""given a test as an array of values, uses run length encoding to
find occurences of 6 or moe consecutive failures for a test."""
new_test_failure = []
rle = run_length_encode(test)
end_of_grid = []
if rle[-1][0] == 0 and rle[-2][0] == -1:
for i, j in reversed(list(enumerate(rle[:-2]))):
if j[0] == 1:
end_of_grid = rle[i:]
break
count = 0
for streak in end_of_grid:
if streak[0] == -1:
count += streak[1]
if count >= 6:
new_test_failure.append((end_of_grid[0][2], end_of_grid[-1][2]))
return new_test_failure
def detect_flaky_test(data, grid, tab_name, grid_name):
"""
Apply run calc_optimal_flakiness_score to out grid
"""
flaky_tests = []
dates = data[tab_name][grid_name]["timestamps"]
for i, j in enumerate(grid):
# use the calc_optimal_flakiness_score function imported from testgrid_flakiness_detection notebook
found_flakes = calc_optimal_flakiness_score(grid[i])
if len(found_flakes[1].keys()) > 0:
times = [np.array(dates)[[x[0], x[1]]] for x in found_flakes[1].keys()]
times = [
(
datetime.date.fromtimestamp(x[0] // 1000),
datetime.date.fromtimestamp(x[1] // 1000),
)
for x in times
]
flaky_tests.append((i, found_flakes[1], times))
return flaky_tests
def detect_failures(data, grid, tab_name, grid_name):
"""
This takens in a grid and runs all of our detectors and outputs a report
"""
failure_report = {}
# use the decode_run_length function imported from TestGrid_EDA notebook
x = np.array(list(pd.DataFrame(grid).statuses.apply(decode_run_length)))
failure_report["flaky_tests"] = detect_flaky_test(data, x, tab_name, grid_name)
x = pd.DataFrame(x).apply(lambda x: [normalize(y) for y in x])
x = np.array(x)
failure_report["infra_flake"] = detect_infra_flake(data, x, tab_name, grid_name)
failure_report["install_flake"] = detect_install_flake(data, x, tab_name, grid_name)
failure_report["new_test_failure"] = detect_new_test_failures(
data, x, tab_name, grid_name
)
return failure_report
def print_report(results, tab_name, grid_name):
print(
f"Failure Report for: \n\
{tab_name}/{grid_name}",
end="\n\n",
)
print("Flaky Tests:")
for ft in results["flaky_tests"]:
print(f"Test number {ft[0]} had flakes at:")
for i in ft[2]:
print(f"{i[1]} to {i[0]}")
print("\b")
print(
"Infra Flake:",
)
for infr in results["infra_flake"]:
print(f"Test number {infr[1]} had an infra flake at {infr[0]}")
print("\b")
print("Install Flake:")
for inst in results["install_flake"]:
print(f"An install flake started on {inst}")
print("\b")
print("New Test Failures:")
for ntf in results["new_test_failure"]:
print(f"Test number {ntf[0]} had new test failures at:")
for i in ntf[1]:
print(f"{i[1]} to {i[0]}")
# We want to re-map the values so that the output of the convolution will be more interpretable.
def normalize(x):
if x == 1:
return 1
if x == 12:
return -1
if x == 0:
return 0
def run_length_encode(x):
"""run length encoding"""
rle = []
count = 1
for i, j in enumerate(x):
key = j
if i == len(x) - 1:
rle.append((key, count, i))
break
if key == x[i + 1]:
count += 1
else:
rle.append((key, count, i))
count = 1
return rle
# Temporary formating of the results dictionary
# The results dic contains datetime object and
# tuple keys that can't be jsoned. At some point
# we'll have to define an output schema.
def format_results(results):
for key in results:
results[key] = str(results[key])
return results