Python работает на бесконечном процессе

У меня есть многопроцессорный веб-сервер с процессами, которые никогда не заканчиваются, я бы хотел проверить покрытие кода на весь проект в живой среде (не только из тестов).

Проблема заключается в том, что, поскольку процессы никогда не заканчиваются, у меня нет хорошего места для установки крючков cov.start() cov.stop() cov.save().

Поэтому я думал о нерестах потока, который в бесконечном цикле будет сохранять и комбинировать данные покрытия, а затем спать некоторое время, однако этот подход не работает, отчет о покрытии кажется пустым, за исключением линии сна.

Я был бы рад получить любые идеи о том, как получить покрытие моего кода, или любые советы о том, почему моя идея не работает. Вот фрагмент моего кода:

import coverage
cov = coverage.Coverage()
import time
import threading
import os

class CoverageThread(threading.Thread):
    _kill_now = False
    _sleep_time = 2

@classmethod
def exit_gracefully(cls):
    cls._kill_now = True

def sleep_some_time(self):
    time.sleep(CoverageThread._sleep_time)

def run(self):
    while True:
        cov.start()
        self.sleep_some_time()
        cov.stop()
        if os.path.exists('.coverage'):
            cov.combine()
        cov.save()
        if self._kill_now:
            break
    cov.stop()
    if os.path.exists('.coverage'):
        cov.combine()
    cov.save()
    cov.html_report(directory="coverage_report_data.html")
    print "End of the program. I was killed gracefully :)"

Ответы

Ответ 1

По-видимому, невозможно очень хорошо управлять coverage с помощью нескольких Threads. Когда запускается другой поток, остановка объекта coverage остановит весь охват, а start только перезапустит его в "начальном" потоке. Таким образом, ваш код в основном останавливает покрытие через 2 секунды для всех Thread, кроме CoverageThread.

Я немного поиграл с API, и можно получить доступ к измерениям без остановки объекта coverage. Таким образом, вы можете запускать поток, который периодически сохраняет данные покрытия, используя API. Первая реализация будет чем-то вроде этого

import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file

cov = Coverage(config_file=True)
cov.start()


def get_data_dict(d):
    """Return a dict like d, but with keys modified by `abs_file` and
    remove the copied elements from d.
    """
    res = {}
    keys = list(d.keys())
    for k in keys:
        a = {}
        lines = list(d[k].keys())
        for l in lines:
            v = d[k].pop(l)
            a[l] = v
        res[abs_file(k)] = a
    return res


class CoverageLoggerThread(threading.Thread):
    _kill_now = False
    _delay = 2

    def __init__(self, main=True):
        self.main = main
        self._data = CoverageData()
        self._fname = cov.config.data_file
        self._suffix = None
        self._data_files = CoverageDataFiles(basename=self._fname,
                                             warn=cov._warn)
        self._pid = os.getpid()
        super(CoverageLoggerThread, self).__init__()

    def shutdown(self):
        self._kill_now = True

    def combine(self):
        aliases = None
        if cov.config.paths:
            from coverage.aliases import PathAliases
            aliases = PathAliases()
            for paths in self.config.paths.values():
                result = paths[0]
                for pattern in paths[1:]:
                    aliases.add(pattern, result)

        self._data_files.combine_parallel_data(self._data, aliases=aliases)

    def export(self, new=True):
        cov_report = cov
        if new:
            cov_report = Coverage(config_file=True)
            cov_report.load()
        self.combine()
        self._data_files.write(self._data)
        cov_report.data.update(self._data)
        cov_report.html_report(directory="coverage_report_data.html")
        cov_report.report(show_missing=True)

    def _collect_and_export(self):
        new_data = get_data_dict(cov.collector.data)
        if cov.collector.branch:
            self._data.add_arcs(new_data)
        else:
            self._data.add_lines(new_data)
        self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
        self._data_files.write(self._data, self._suffix)

        if self.main:
            self.export()

    def run(self):
        while True:
            sleep(CoverageLoggerThread._delay)
            if self._kill_now:
                break

            self._collect_and_export()

        cov.stop()

        if not self.main:
            self._collect_and_export()
            return

        self.export(new=False)
        print("End of the program. I was killed gracefully :)")

Более стабильную версию можно найти в этом GIST. Этот код в основном захватывает информацию, собранную коллекционером, не останавливая ее. Функция get_data_dict принимает словарь в Coverage.collector и выдает доступные данные. Это должно быть достаточно безопасным, чтобы вы не теряли никаких измерений.
Файлы отчетов обновляются каждые _delay секунды.

Но если у вас много процессов, вам нужно добавить дополнительные усилия, чтобы убедиться, что весь процесс запустил CoverageLoggerThread. Это функция patch_multiprocessing, обезьяна, исправленная с патчей обезьян coverage...
Код находится в GIST. Он в основном заменяет исходный процесс специальным процессом, который запускает CoverageLoggerThread непосредственно перед запуском метода run и присоединяется к потоку в конце процесса. script main.py позволяет запускать различные тесты с помощью потоков и процессов.

В этом коде есть 2/3 недостатка, которые нужно соблюдать осторожно:

  • Плохая идея использовать функцию combine одновременно, поскольку она выполняет доступ к чтению/записи/удалению comcurrent в файлы .coverage.*. Это означает, что функция export не является супербезопасной. Это должно быть хорошо, поскольку данные реплицируются несколько раз, но я бы сделал некоторое тестирование, прежде чем использовать его в процессе производства.

  • Как только данные будут экспортированы, он останется в памяти. Поэтому, если база кода огромна, она может съесть некоторые ресурсы. Можно сбросить все данные и перезагрузить их, но я предположил, что если вы хотите регистрировать каждые 2 секунды, вы не хотите каждый раз перезагружать все данные. Если вы задерживаетесь за считанные минуты, я каждый раз создаю новый _data, используя CoverageData.read_file, чтобы перезагрузить предыдущее состояние покрытия для этого процесса.

  • Пользовательский процесс будет ждать _delay до окончания, когда мы присоединяемся к CoverageThreadLogger в конце процесса, поэтому, если у вас много быстрых процессов, вы хотите увеличить степень детализации сна до сможете быстрее обнаружить конец Процесса. Для этого просто нужен специальный цикл сна, который разбивается на _kill_now.

Сообщите мне, если это поможет вам каким-либо образом или если возможно улучшить этот смысл.


ИЗМЕНИТЬ: Кажется, вам не нужно, чтобы обезьяна исправляла многопроцессорный модуль, чтобы автоматически запускать регистратор. Используя .pth в вашей установке python, вы можете использовать переменную окружения, чтобы автоматически запускать ваш журнал на новых процессах:

# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
    import atexit
    from coverage_logger import CoverageLoggerThread
    thread_cov = CoverageLoggerThread(main=False)
    thread_cov.start()
    def close_cov()
        thread_cov.shutdown()
        thread_cov.join()
    atexit.register(close_cov)

Затем вы можете запустить свой журнал регистрации событий с помощью COVERAGE_LOGGER_START=1 python main.y

Ответ 2

Поскольку вы готовы запускать свой код по-разному для теста, почему бы не добавить способ завершить процесс тестирования? Кажется, что это будет проще, чем пытаться взломать покрытие.

Ответ 3

Вы можете напрямую использовать pyrasite, используя следующие две программы.

# start.py
import sys
import coverage

sys.cov = cov = coverage.coverage()
cov.start()

И этот

# stop.py
import sys

sys.cov.stop()
sys.cov.save()
sys.cov.html_report()

Еще один способ - проследить программу, используя lptrace, даже если она только печатает вызовы, которые могут быть полезны.