Как выйти из генератора в определенное время?
Я читаю твиты из Twitter Streaming API. После подключения к API я получаю генератор.
Я просматриваю каждый полученный твит, но я хочу выйти из итератора, скажем, в 18 вечера. После получения каждого твита я проверяю, будет ли это позже указанной отметки времени и остановится.
Проблема в том, что я не получаю твиты достаточно часто. Итак, я мог бы получить один в 17:50 и следующий в 19 вечера. Это, когда я узнаю, что время прошло, и мне нужно остановиться.
Есть ли способ принудительно остановить остановку в 18PM?
Здесь представлен высокий уровень моего кода:
def getStream(tweet_iter):
for tweet in tweet_iter:
#do stuff
if time_has_passed():
return
tweet_iter = ConnectAndGetStream()
getStream(tweet_iter)
Ответы
Ответ 1
Ваша проблема может быть решена путем разделения функциональности вашего проекта на два отдельных процесса:
- Процесс twitter, который действует как обертка для API Twitter и
- Процесс мониторинга, который может завершить процесс twitter, когда достигнуто время выхода.
Следующий фрагмент кода прототипов описанных выше функций с использованием модуля многопроцессорности Python:
import multiprocessing as mp
import time
EXIT_TIME = '12:21' #'18:00'
def twitter():
while True:
print 'Twittttttttttt.....'
time.sleep(5)
def get_time():
return time.ctime().split()[3][:5]
if __name__ == '__main__':
# Execute the function as a process
p = mp.Process( target=twitter, args=() )
p.start()
# Monitoring the process p
while True:
print 'Checking the hour...'
if get_time() == EXIT_TIME:
p.terminate()
print 'Current time:', time.ctime()
print 'twitter process has benn terminated...'
break
time.sleep(5)
Конечно, вы можете использовать p.join(TIMEOUT) вместо того, чтобы использовать True, представленный в моем примере, как указано здесь.
Ответ 2
Вот пример с потоковым планировщиком и питоном:
import threading
import time
import os
import schedule
def theKillingJob():
print("Kenny and Cartman die!")
os._exit(1)
schedule.every().day.at("18:00").do(theKillingJob,'It is 18:00')
def getStream(tweet_iter):
for tweet in tweet_iter:
#do stuff
def kenny():
while True:
print("Kenny alive..")
schedule.run_pending()
time.sleep(1)
def cartman():
while True:
print("Cartman alive..")
tweet_iter = ConnectAndGetStream()
getStream(tweet_iter)
# You can change whenever you want to check for tweets by changing sleep time here
time.sleep(1)
if __name__ == '__main__':
daemon_kenny = threading.Thread(name='kenny', target=kenny)
daemon_cartman = threading.Thread(name='cartman', target=cartman)
daemon_kenny.setDaemon(True)
daemon_cartman.setDaemon(True)
daemon_kenny.start()
daemon_cartman.start()
daemon_kenny.join()
daemon_cartman.join()
Ответ 3
Создайте отдельный поток для производителя и используйте Queue
для связи. Я также должен был использовать threading.Event
для остановки производителя.
import itertools, queue, threading, time
END_TIME = time.time() + 5 # run for ~5 seconds
def time_left():
return END_TIME - time.time()
def ConnectAndGetStream(): # stub for the real thing
for i in itertools.count():
time.sleep(1)
yield "tweet {}".format(i)
def producer(tweets_queue, the_end): # producer
it = ConnectAndGetStream()
while not the_end.is_set():
tweets_queue.put(next(it))
def getStream(tweets_queue, the_end): # consumer
try:
while True:
tweet = tweets_queue.get(timeout=time_left())
print('Got', tweet)
except queue.Empty:
print('THE END')
the_end.set()
tweets_queue = queue.Queue() # you might wanna use the maxsize parameter
the_end = threading.Event()
producer_thread = threading.Thread(target=producer,
args=(tweets_queue, the_end))
producer_thread.start()
getStream(tweets_queue, the_end)
producer_thread.join()