Ответ 1
По-видимому, невозможно очень хорошо управлять coverage
с помощью нескольких Threads
.
Когда запускается другой поток, остановка объекта coverage
остановит весь охват, а start
только перезапустит его в "начальном" потоке.
Таким образом, ваш код в основном останавливает покрытие через 2 секунды для всех Thread
, кроме CoverageThread
.
Я немного поиграл с API, и можно получить доступ к измерениям без остановки объекта coverage
.
Таким образом, вы можете запускать поток, который периодически сохраняет данные покрытия, используя API.
Первая реализация будет чем-то вроде этого
import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file
cov = Coverage(config_file=True)
cov.start()
def get_data_dict(d):
"""Return a dict like d, but with keys modified by `abs_file` and
remove the copied elements from d.
"""
res = {}
keys = list(d.keys())
for k in keys:
a = {}
lines = list(d[k].keys())
for l in lines:
v = d[k].pop(l)
a[l] = v
res[abs_file(k)] = a
return res
class CoverageLoggerThread(threading.Thread):
_kill_now = False
_delay = 2
def __init__(self, main=True):
self.main = main
self._data = CoverageData()
self._fname = cov.config.data_file
self._suffix = None
self._data_files = CoverageDataFiles(basename=self._fname,
warn=cov._warn)
self._pid = os.getpid()
super(CoverageLoggerThread, self).__init__()
def shutdown(self):
self._kill_now = True
def combine(self):
aliases = None
if cov.config.paths:
from coverage.aliases import PathAliases
aliases = PathAliases()
for paths in self.config.paths.values():
result = paths[0]
for pattern in paths[1:]:
aliases.add(pattern, result)
self._data_files.combine_parallel_data(self._data, aliases=aliases)
def export(self, new=True):
cov_report = cov
if new:
cov_report = Coverage(config_file=True)
cov_report.load()
self.combine()
self._data_files.write(self._data)
cov_report.data.update(self._data)
cov_report.html_report(directory="coverage_report_data.html")
cov_report.report(show_missing=True)
def _collect_and_export(self):
new_data = get_data_dict(cov.collector.data)
if cov.collector.branch:
self._data.add_arcs(new_data)
else:
self._data.add_lines(new_data)
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
self._data_files.write(self._data, self._suffix)
if self.main:
self.export()
def run(self):
while True:
sleep(CoverageLoggerThread._delay)
if self._kill_now:
break
self._collect_and_export()
cov.stop()
if not self.main:
self._collect_and_export()
return
self.export(new=False)
print("End of the program. I was killed gracefully :)")
Более стабильную версию можно найти в этом GIST.
Этот код в основном захватывает информацию, собранную коллекционером, не останавливая ее.
Функция get_data_dict
принимает словарь в Coverage.collector
и выдает доступные данные. Это должно быть достаточно безопасным, чтобы вы не теряли никаких измерений.
Файлы отчетов обновляются каждые _delay
секунды.
Но если у вас много процессов, вам нужно добавить дополнительные усилия, чтобы убедиться, что весь процесс запустил CoverageLoggerThread
. Это функция patch_multiprocessing
, обезьяна, исправленная с патчей обезьян coverage
...
Код находится в GIST. Он в основном заменяет исходный процесс специальным процессом, который запускает CoverageLoggerThread
непосредственно перед запуском метода run
и присоединяется к потоку в конце процесса.
script main.py
позволяет запускать различные тесты с помощью потоков и процессов.
В этом коде есть 2/3 недостатка, которые нужно соблюдать осторожно:
-
Плохая идея использовать функцию
combine
одновременно, поскольку она выполняет доступ к чтению/записи/удалению comcurrent в файлы.coverage.*
. Это означает, что функцияexport
не является супербезопасной. Это должно быть хорошо, поскольку данные реплицируются несколько раз, но я бы сделал некоторое тестирование, прежде чем использовать его в процессе производства. -
Как только данные будут экспортированы, он останется в памяти. Поэтому, если база кода огромна, она может съесть некоторые ресурсы. Можно сбросить все данные и перезагрузить их, но я предположил, что если вы хотите регистрировать каждые 2 секунды, вы не хотите каждый раз перезагружать все данные. Если вы задерживаетесь за считанные минуты, я каждый раз создаю новый
_data
, используяCoverageData.read_file
, чтобы перезагрузить предыдущее состояние покрытия для этого процесса. -
Пользовательский процесс будет ждать
_delay
до окончания, когда мы присоединяемся кCoverageThreadLogger
в конце процесса, поэтому, если у вас много быстрых процессов, вы хотите увеличить степень детализации сна до сможете быстрее обнаружить конец Процесса. Для этого просто нужен специальный цикл сна, который разбивается на_kill_now
.
Сообщите мне, если это поможет вам каким-либо образом или если возможно улучшить этот смысл.
ИЗМЕНИТЬ:
Кажется, вам не нужно, чтобы обезьяна исправляла многопроцессорный модуль, чтобы автоматически запускать регистратор. Используя .pth
в вашей установке python, вы можете использовать переменную окружения, чтобы автоматически запускать ваш журнал на новых процессах:
# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
import atexit
from coverage_logger import CoverageLoggerThread
thread_cov = CoverageLoggerThread(main=False)
thread_cov.start()
def close_cov()
thread_cov.shutdown()
thread_cov.join()
atexit.register(close_cov)
Затем вы можете запустить свой журнал регистрации событий с помощью COVERAGE_LOGGER_START=1 python main.y