Skip to content
Draft

. #148

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 84 additions & 5 deletions cafy_pytest/cafy_gta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@
import json
from utils.cafybase import CafyBase
from .cafygta_config import CafyGTA_Configs
import subprocess
import builtins

class TimeCollectorPlugin:
def __init__(self):
self.original_sleep = time.sleep
self.original_subprocess_run = subprocess.run
self.original_exec = builtins.exec
self.granular_time_testcase_dict = dict()
self.test_case_name = None
self.total_sleep_time = 0
self.total_set_command_time = 0
self.total_get_command_time = 0
self.command_list = ['set_command','get_command','sleep']
self.total_bash_time = 0
self.total_exec_time = 0
self.command_list = ['set_command','get_command','sleep','bash', 'exec']

def update_granular_time_testcase_dict(self, current_test, event, method_name, elapsed_time, feature_type):
"""
Expand Down Expand Up @@ -134,6 +140,55 @@ def patch_set_or_get_methods_for_test_instance(self, item):
if item.cls is not None:
setattr(item.cls, '_decorated', True)

def patch_subprocess_run(self,*args, **kwargs):
"""
Monkey patch subprocess.run to measure the execution time of bash commands.
and updating bash
"""
bash_command = args[0]
if isinstance(bash_command, str):
kwargs["shell"] = True
start_time = time.perf_counter()
result = self.original_subprocess_run(*args, **kwargs)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
elapsed_time_seconds = end_time - start_time
elapsed_time_microseconds = elapsed_time_seconds * 1000000
elapsed_time = '%.2f' % (elapsed_time_microseconds)
caller_frame = inspect.currentframe().f_back
caller_method_name = caller_frame.f_code.co_name
caller_class = caller_frame.f_locals.get('self').__class__
if caller_class is not None:
caller_method = getattr(caller_class, caller_method_name, None)
if caller_method is not None:
current_test = self.test_case_name
feature_type = self.get_method_type(caller_method)
self.update_granular_time_testcase_dict(current_test, "bash", ".".join([caller_class.__name__, caller_method.__name__,"bash"]), elapsed_time, feature_type)
return result

def patch_exec(self, *args, **kwargs):
"""
Monkey patch exec to measure the execution time of dynamic Python code.
"""
code = args[0]
start_time = time.perf_counter()
result = self.original_exec(*args, **kwargs)
end_time = time.perf_counter()
elapsed_time_seconds = end_time - start_time
elapsed_time_microseconds = elapsed_time_seconds * 1000000
elapsed_time = '%.2f' % (elapsed_time_microseconds)
caller_frame = inspect.currentframe().f_back
caller_method_name = caller_frame.f_code.co_name
caller_class = caller_frame.f_locals.get('self').__class__
if caller_class is not None:
caller_method = getattr(caller_class, caller_method_name, None)
if caller_method is not None:
# Update the granular time test case dictionary with the exec command execution time
current_test = self.test_case_name
feature_type = self.get_method_type(caller_method)
self.update_granular_time_testcase_dict(current_test, "exec", ".".join([caller_class.__name__, caller_method.__name__, "exec"]), elapsed_time, feature_type)
return result

def pytest_runtest_protocol(self, item, nextitem):
'''
Method pytest_runtest_protocol : it will Monkey patch sleep , subprocess run etc
Expand All @@ -146,6 +201,8 @@ def pytest_runtest_protocol(self, item, nextitem):
time.sleep = self.measure_sleep_time
# Monkey patch 'set' methods for all classes in the module
self.patch_set_or_get_methods_for_test_instance(item)
subprocess.run = self.patch_subprocess_run
builtins.exec = self.patch_exec
# get class name of the test case method
base_class_name = ""
if item.cls:
Expand All @@ -169,7 +226,7 @@ def update_gta_dict(self,current_test, command):
if hasattr(CafyLog,"gta_dict") and command in CafyLog.gta_dict:
for key, value in CafyLog.gta_dict[command].items():
self.granular_time_testcase_dict[current_test][command][key] = value

def update_CafyLog_gta_dict(self, current_test):
"""
Update the CafyLog Granular Time Accounting (GTA) dictionary with timing information for the current test
Expand Down Expand Up @@ -211,6 +268,10 @@ def get_time_data(self,data, event):
self.total_set_command_time = self.total_set_command_time + total_sum
elif event == 'get_command':
self.total_get_command_time = self.total_get_command_time + total_sum
elif event == 'bash':
self.total_bash_time = self.total_bash_time + total_sum
elif event == 'exec':
self.total_exec_time = self.total_exec_time + total_sum
tmp_dict[command] = ["{:.2f}".format(total_sum), length, feature_type]
else:
tmp_dict[command] = timings_list
Expand All @@ -236,21 +297,33 @@ def collect_granular_time_accouting_report(self):
time_report[test_case]['get_command'] = self.get_time_data(events["get_command"],'get_command')
else:
time_report[test_case]['get_command'] = dict()

if 'bash' in events:
time_report[test_case]['bash'] = self.get_time_data(events["bash"],'bash')
else:
time_report[test_case]['bash'] = dict()
if 'exec' in events:
time_report[test_case]['exec'] = self.get_time_data(events["exec"],'exec')
else:
time_report[test_case]['exec'] = dict()
time_report[test_case]['total_sleep_time'] = "{:.2f}".format(self.total_sleep_time)
time_report[test_case]['total_set_command_time'] = "{:.2f}".format(self.total_set_command_time)
time_report[test_case]['total_get_command_time'] = "{:.2f}".format(self.total_get_command_time)
time_report[test_case]['total_time'] = "{:.2f}".format(self.total_sleep_time+self.total_set_command_time+self.total_get_command_time)
time_report[test_case]['total_bash_time'] = "{:.2f}".format(self.total_bash_time)
time_report[test_case]['total_exec_time'] = "{:.2f}".format(self.total_exec_time)
time_report[test_case]['total_time'] = "{:.2f}".format(self.total_sleep_time+self.total_set_command_time+self.total_get_command_time+self.total_bash_time+self.total_exec_time)
self.total_sleep_time = 0
self.total_set_command_time = 0
self.total_get_command_time = 0
self.total_bash_time = 0
self.total_exec_time = 0
return time_report

def add_gta_data_into_db(self, time_report,aggregated_gta_data,run_id='local_run'):
'''
add_gta_data_into_db
:param time_report: gta report json data
'''

try:
URL = CafyGTA_Configs.get_gta_url()
API_KEY = CafyGTA_Configs.get_api_key()
Expand Down Expand Up @@ -300,17 +373,23 @@ def get_aggregated_gta_data(self,gta_data,run_time):
'total_sleep_time': 0,
'total_set_command_time': 0,
'total_get_command_time': 0,
'total_bash_time':0,
'total_exec_time':0,
'total_event_time': 0,
'total_run_time': run_time
}
for test,value in gta_data.items():
sleep_time = gta_data[test]['totals']['total_sleep_time']
set_command_time = gta_data[test]['totals']['total_set_command_time']
get_command_time = gta_data[test]['totals']['total_get_command_time']
bash_time = gta_data[test]['totals']['total_bash_time']
exec_time = gta_data[test]['totals']['total_exec_time']
aggregated_data['total_sleep_time'] = aggregated_data['total_sleep_time'] + sleep_time
aggregated_data['total_set_command_time'] = aggregated_data['total_set_command_time'] + set_command_time
aggregated_data['total_get_command_time'] = aggregated_data['total_get_command_time'] + get_command_time
aggregated_data['total_event_time'] = aggregated_data['total_event_time'] + sleep_time + get_command_time + set_command_time
aggregated_data['total_bash_time'] = aggregated_data['total_bash_time'] + bash_time
aggregated_data['total_exec_time'] = aggregated_data['total_exec_time'] + bash_time
aggregated_data['total_event_time'] = aggregated_data['total_event_time'] + sleep_time + get_command_time + set_command_time + bash_time + exec_time
return aggregated_data

def pytest_terminal_summary(self, terminalreporter):
Expand Down