Как выполнять одновременно две функции
Я запускаю тест, но хочу одновременно запустить 2 функции. У меня есть камера, и я говорю ей, чтобы она двигалась через пеной, затем я регистрируюсь в камере через SSH, чтобы проверить скорость, на которую установлена камера. Когда я проверю скорость, камера остановилась, поэтому скорость не доступна. Есть ли способ, которым я могу заставить эти функции работать одновременно, чтобы проверить скорость камеры. Пример кода ниже:
class VerifyPan(TestAbsoluteMove):
def runTest(self):
self.dest.PanTilt._x=350
# Runs soap move command
threading.Thread(target = SudsMove).start()
self.command = './ptzpanposition -c 0 -u degx10'
# Logs into camera and checks speed
TestAbsoluteMove.Ssh(self)
# Position of the camera verified through Ssh (No decimal point added to the Ssh value)
self.assertEqual(self.Value, '3500')
Теперь я попробовал модуль потоковой передачи, как указано ниже. Нить не синхронизируется с функцией TestAbsoluteMove.Ssh(). Есть ли какой-либо другой код, который мне нужен для выполнения этой работы.
Я рассмотрел вопрос о переносе аргументов в оператор потока, который указывает, что поток выполняется при использовании функции Ssh(). Кто-нибудь знает, что ввести в это утверждение?
Извините, если я не объяснил правильно. Функция "SudsMove" перемещает камеру, и функция "Ssh" записывается в камеру и проверяет скорость движения камеры в данный момент. Проблема в том, что к тому моменту, когда функция "Ssh" регистрируется в камере, она остановлена. Мне нужно, чтобы обе функции выполнялись параллельно, поэтому я могу проверить скорость камеры, пока она все еще движется.
Спасибо
Ответы
Ответ 1
Если вы хотите использовать общую реализацию Python (CPython), вы, безусловно, можете использовать модуль multiprocessing, который творит чудеса (вы можете передавать несогласованные аргументы в подпроцессы, убивать задачи,...), предлагает интерфейс, аналогичный интерфейсу потоков, и не страдает от Global Interpreter Lock.
Недостатком является то, что подпроцессы порождаются, что занимает больше времени, чем создание потоков; это должно быть проблемой, если у вас много и много коротких задач. В ситуациях, когда каждая задача занимает "длинное" время, модуль multiprocessing должен быть большим.
Ответ 2
Импортируйте модуль threading
и запустите SudsMove()
следующим образом:
threading.Thread(target = SudsMove).start()
Это создаст и запустит фоновый поток, который выполняет движение.
ОТВЕТ НА ВОПРОС:
Насколько я понимаю, TestAbsoluteMove.Ssh(self)
проверяет скорость один раз и сохраняет результат в self.Value
?! И вы проверяете ожидаемое конечное наклонение/поворот/положение с помощью self.assertEqual(self.Value, '3500')
?!
Если это правильно, вам следует подождать, пока камера начнет движение. Вероятно, вы можете опросить скорость в определенном интервале:
# Move camera in background thread
threading.Thread(target = SudsMove).start()
# What does this do?
self.command = './ptzpanposition -c 0 -u degx10'
# Poll the current speed in an interval of 250 ms
import time
measuredSpeedsList = []
for i in xrange(20):
# Assuming that this call will put the result in self.Value
TestAbsoluteMove.Ssh(self)
measuredSpeedsList.append(self.Value)
time.sleep(0.25)
print "Measured movement speeds: ", measuredSpeedsList
Скорость движения будет самым большим значением в measuredSpeedsList
(т.е. max(measuredSpeedsList)
). Надеюсь, что это имеет смысл...
Ответ 3
Одновременно может работать только один поток. Об этом было подробно сказано здесь. Одним из решений будет использование двух отдельных процессов. Вышеприведенный ответ дает несколько советов.
Ответ 4
Если вы можете запустить свой код под Jython или IronPython, вы можете запускать несколько потоков одновременно; у них нет этого глупого "Global Interpreter Lock" в CPython.