Подпроцесс Python в .exe
Я создаю python script, который скопирует файлы и папки по сети. это кросс-платформенный, поэтому я создаю файл .exe с помощью cx_freeze
Я использовал метод Popen модуля подпроцесса
если я запустил файл .py, он работает как ожидалось, но когда я создаю подпроцесс .exe не создается в системе
Я просмотрел всю документацию модуля subprocess, но я не нашел никакого решения
все остальное (я использую Tkinter, который также отлично работает) работает в подпроцессе accept.exe.
любая идея, как я могу вызвать подпроцесс в .exe.file?
Этот файл вызывает другой .py файл
def start_scheduler_action(self, scheduler_id, scheduler_name, list_index):
scheduler_detail=db.get_scheduler_detail_using_id(scheduler_id)
for detail in scheduler_detail:
source_path=detail[2]
if not os.path.exists(source_path):
showerror("Invalid Path","Please select valid path", parent=self.new_frame)
return
self.forms.new_scheduler.start_scheduler_button.destroy()
#Create stop scheduler button
if getattr(self.forms.new_scheduler, "stop_scheduler_button", None)==None:
self.forms.new_scheduler.stop_scheduler_button = tk.Button(self.new_frame, text='Stop scheduler', width=10, command=lambda:self.stop_scheduler_action(scheduler_id, scheduler_name, list_index))
self.forms.new_scheduler.stop_scheduler_button.grid(row=11, column=1, sticky=E, pady=10, padx=1)
scheduler_id=str(scheduler_id)
# Get python paths
if sys.platform == "win32":
proc = subprocess.Popen(['where', "python"], env=None, stdout=subprocess.PIPE)
else:
proc = subprocess.Popen(['which', "python"], env=None,stdout=subprocess.PIPE)
out, err = proc.communicate()
if err or not out:
showerror("", "Python not found", parent=self.new_frame)
else:
try:
paths = out.split(os.pathsep)
# Create python path
python_path = (paths[len(paths) - 1]).split('\n')[0]
cmd = os.path.realpath('scheduler.py')
#cmd='scheduler.py'
if sys.platform == "win32":
python_path=python_path.splitlines()
else:
python_path=python_path
# Run the scheduler file using scheduler id
proc = subprocess.Popen([python_path, cmd, scheduler_id], env=None, stdout=subprocess.PIPE)
message="Started the scheduler : %s" %(scheduler_name)
showinfo("", message, parent=self.new_frame)
#Add process id to scheduler table
process_id=proc.pid
#showinfo("pid", process_id, parent=self.new_frame)
def get_process_id(name):
child = subprocess.Popen(['pgrep', '-f', name], stdout=subprocess.PIPE, shell=False)
response = child.communicate()[0]
return [int(pid) for pid in response.split()]
print(get_process_id(scheduler_name))
# Add the process id in database
self.db.add_process_id(scheduler_id, process_id)
# Add the is_running status in database
self.db.add_status(scheduler_id)
except Exception as e:
showerror("", e)
И этот файл называется:
def scheduler_copy():
date= strftime("%m-%d-%Y %H %M %S", localtime())
logFile = scheduler_name + "_"+scheduler_id+"_"+ date+".log"
#file_obj=open(logFile, 'w')
# Call __init__ method of xcopy file
xcopy=XCopy(connection_ip, username , password, client_name, server_name, domain_name)
check=xcopy.connect()
# Cretae a log file for scheduler
file_obj=open(logFile, 'w')
if check is False:
file_obj.write("Problem in connection..Please check connection..!!")
return
scheduler_next_run=schedule.next_run()
scheduler_next_run="Next run at: " +str(scheduler_next_run)
# If checkbox_value selected copy all the file to new directory
if checkbox_value==1:
new_destination_path=xcopy.create_backup_directory(share_folder, destination_path, date)
else:
new_destination_path=destination_path
# Call backup method for coping data from source to destination
try:
xcopy.backup(share_folder, source_path, new_destination_path, file_obj, exclude)
file_obj.write("Scheduler completed successfully..\n")
except Exception as e:
# Write the error message of the scheduler to log file
file_obj.write("Scheduler failed to copy all data..\nProblem in connection..Please check connection..!!\n")
# #file_obj.write("Error while scheduling")
# return
# Write the details of scheduler to log file
file_obj.write("Total skipped unmodified file:")
file_obj.write(str(xcopy.skipped_unmodified_count))
file_obj.write("\n")
file_obj.write("Total skipped file:")
file_obj.write(str(xcopy.skipped_file))
file_obj.write("\n")
file_obj.write("Total copied file:")
file_obj.write(str(xcopy.copy_count))
file_obj.write("\n")
file_obj.write("Total skipped folder:")
file_obj.write(str(xcopy.skipped_folder))
file_obj.write("\n")
# file_obj.write(scheduler_next_run)
file_obj.close()
Ответы
Ответ 1
В исходном коде есть неловкость, но я не буду тратить на это время. Например, если вы хотите найти source_path, лучше использовать цикл for
с break
/else
:
for detail in scheduler_detail:
source_path = detail[2]
break # found
else:
# not found: raise an exception
...
Некоторые советы:
- Попробуйте отделить код пользовательского интерфейса и подпроцесс, не смешивая их.
- Использовать исключения и обработчики исключений.
- Если вам нужен переносимый код: избегайте системного вызова (в Windows нет
pgrep
).
Поскольку ваше приложение упаковано в virtualenv (я делаю предположение, что cx_freeze делает подобное), у вас нет доступа к общесистемному Python. У вас даже нет этого в Windows. Поэтому вам нужно использовать упакованный Python (в любом случае это лучшая практика).
Если вы хотите вызвать Python script как подпроцесс, это означает, что у вас есть два упакованных приложения: вам нужно создать exe
для основного приложения и для scheduler.py
script. Но с этим нелегко общаться.
Другим решением является использование multiprocessing
для создания нового процесса Python. Поскольку вы не хотите дожидаться окончания обработки (что может быть длинным), вам необходимо создать процессы демона. Способ сделать это объясняется в модуле multiprocessing
.
В принципе:
import time
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.daemon = True
p.start()
# let it live and die, don't call: `p.join()`
time.sleep(1)
Конечно, нам нужно адаптировать это к вашей проблеме.
Вот как бы я это сделал (я удалил код, связанный с UI):
import scheduler
class SchedulerError(Exception):
pass
class YourClass(object):
def start_scheduler_action(self, scheduler_id, scheduler_name, list_index):
scheduler_detail = db.get_scheduler_detail_using_id(scheduler_id)
for detail in scheduler_detail:
source_path = detail[2]
break
else:
raise SchedulerError("Invalid Path", "Missing source path", parent=self.new_frame)
if not os.path.exists(source_path):
raise SchedulerError("Invalid Path", "Please select valid path", parent=self.new_frame)
p = Process(target=scheduler.scheduler_copy, args=('source_path',))
p.daemon = True
p.start()
self.db.add_process_id(scheduler_id, p.pid)
Чтобы проверить, все ли работает ваш процесс, я рекомендую использовать psutil. Это действительно отличный инструмент!
Вы можете определить свой scheduler.py
script следующим образом:
def scheduler_copy(source_path):
...
Многопроцессорное взаимодействие с потоковым Python
Цитируя этот ответ: fooobar.com/questions/15727/...
Модуль threading
использует потоки, модуль multiprocessing
использует процессы. Разница в том, что потоки выполняются в одном и том же пространстве памяти, тогда как процессы имеют отдельную память. Это затрудняет обмен объектами между процессами с многопроцессорной обработкой. Поскольку потоки используют одну и ту же память, необходимо принять меры предосторожности или одновременно записать два потока в одну и ту же память. Для этого используется глобальная блокировка интерпретатора.
Здесь преимущество многопроцессорности в многопоточности заключается в том, что вы можете убить (или прекратить) процесс; вы не можете убить нить. Вам может понадобиться psutil для этого.
Ответ 2
Это не точное решение, которое вы ищете, но следующее предложение должно быть предпочтительным по двум причинам.
- Это больше pythonic way
-
subprocess
немного дороже.
Предложения, которые вы можете рассмотреть
-
Не используйте подпроцесс для извлечения системного пути. Попробуйте проверить os.getenv('PATH')
, чтобы получить переменную env и попытаться найти, находится ли путь python. Для окон нужно вручную добавить Python-путь, иначе вы можете напрямую проверить Program Files
Я думаю
-
Для проверки идентификатора процесса вы можете попробовать psutils
. Замечательный ответ представлен здесь как получить список процессов на Python?
-
Вызов другого script из python script. Это не выглядит круто. Неплохо, но я бы этого не хотел.
-
В приведенном выше коде строка - if sys.platform == "win32":
имеет такое же значение в if
и else
condition == > вам не нужна условная инструкция здесь.
Вы написали очень хороший рабочий код, чтобы рассказать вам. Продолжайте кодирование!
Ответ 3
Если вы хотите запустить подпроцесс в exe файле, вы можете использовать
import subprocess
program=('example')
arguments=('/command')
subprocess.call([program, arguments])