Использование Tweepy для прослушивания потока и поиска твитов. Как остановить предыдущий поиск и прослушать только новый поток?
Я использую Flask и Tweepy для поиска живых твитов. На интерфейсе у меня есть текстовый ввод пользователя и кнопка "Поиск". В идеале, когда пользователь вводит поисковый запрос во вход и нажимает кнопку "Поиск", Tweepy должен прослушивать новый поисковый запрос и останавливать предыдущий поток в поисковых терминах. При нажатии кнопки "Поиск" выполняется эта функция:
@app.route('/search', methods=['POST'])
# gets search-keyword and starts stream
def streamTweets():
search_term = request.form['tweet']
search_term_hashtag = '#' + search_term
# instantiate listener
listener = StdOutListener()
# stream object uses listener we instantiated above to listen for data
stream = tweepy.Stream(auth, listener)
if stream is not None:
print "Stream disconnected..."
stream.disconnect()
stream.filter(track=[search_term or search_term_hashtag], async=True)
redirect('/stream') # execute '/stream' sse
return render_template('index.html')
Маршрут /stream
, который выполняется во второй-последней строке в приведенном выше коде, выглядит следующим образом:
@app.route('/stream')
def stream():
# we will use Pub/Sub process to send real-time tweets to client
def event_stream():
# instantiate pubsub
pubsub = red.pubsub()
# subscribe to tweet_stream channel
pubsub.subscribe('tweet_stream')
# initiate server-sent events on messages pushed to channel
for message in pubsub.listen():
yield 'data: %s\n\n' % message['data']
return Response(stream_with_context(event_stream()), mimetype="text/event-stream")
Мой код работает отлично, в том смысле, что он запускает новый поток и ищет заданный термин всякий раз, когда нажимается кнопка "Поиск", но не останавливает предыдущий поиск. Например, если мой первый поисковый запрос был "NYC", а затем я хотел найти другой термин, скажем, "Лос-Анджелес", он даст мне результаты как для "NYC", так и для "Лос-Анджелеса", что не то, что я хотеть. Я хочу, чтобы меня обыскали только "Лос-Анджелес". Как это исправить? Другими словами, как остановить предыдущий поток? Я просмотрел другие предыдущие потоки, и я знаю, что мне нужно использовать stream.disconnect()
, но я не уверен, как реализовать это в своем коде. Любая помощь или ввод были бы весьма полезными. Большое спасибо!
Ответы
Ответ 1
Ниже приведен код, который отменяет старые потоки при создании нового потока. Он работает, добавляя новые потоки в глобальный список и затем вызывающий stream.disconnect()
во всех потоках в списке всякий раз, когда создается новый поток.
diff --git a/app.py b/app.py
index 1e3ed10..f416ddc 100755
--- a/app.py
+++ b/app.py
@@ -23,6 +23,8 @@ auth.set_access_token(access_token, access_token_secret)
app = Flask(__name__)
red = redis.StrictRedis()
+# Add a place to keep track of current streams
+streams = []
@app.route('/')
def index():
@@ -32,12 +34,18 @@ def index():
@app.route('/search', methods=['POST'])
# gets search-keyword and starts stream
def streamTweets():
+ # cancel old streams
+ for stream in streams:
+ stream.disconnect()
+
search_term = request.form['tweet']
search_term_hashtag = '#' + search_term
# instantiate listener
listener = StdOutListener()
# stream object uses listener we instantiated above to listen for data
stream = tweepy.Stream(auth, listener)
+ # add this stream to the global list
+ streams.append(stream)
stream.filter(track=[search_term or search_term_hashtag],
async=True) # make sure stream is non-blocking
redirect('/stream') # execute '/stream' sse
То, что это не решает, - проблема управления сеансом. При вашей текущей настройке поиск одним пользователем повлияет на поиск всех пользователей. Этого можно избежать, предоставляя вашим пользователям некоторый идентификатор и сохраняя свои потоки вместе с их идентификатором. Самый простой способ сделать это, скорее всего, будет использовать поддержку Flask session. Вы также можете сделать это с помощью requestId
, как предложил Пьер. В любом случае вам также понадобится код, чтобы заметить, когда пользователь закрыл страницу и закрыл ее поток.
Ответ 2
Отказ от ответственности: я ничего не знаю о Tweepy, но это, похоже, проблема дизайна.
Вы пытаетесь добавить состояние в RESTful API? У вас может возникнуть проблема с дизайном.
Как ответил JRichardSnape, ваш API не должен заботиться об отмене запроса; это должно быть сделано в интерфейсе. Я имею в виду здесь, в javascript/AJAX/etc, вызывающем эту функцию, добавление другого вызова в новую функцию
@app.route('/cancelSearch', methods=['POST'])
С помощью "POST", в котором есть условия поиска. Пока у вас нет состояния, вы не можете сделать это безопасно в асинхронном вызове: Представьте, что кто-то другой делает один и тот же поиск в то же время, а затем отменяет его, отменяет оба (помните, у вас нет состояния, чтобы вы не знали, кого вы отменяете). Возможно, вам нужно состояние с вашим дизайном.
Если вы должны продолжать использовать это и не против нарушать правило "без гражданства", добавьте "состояние" к вашему запросу. В этом случае это не так уж плохо, потому что вы можете запустить поток и называть его с помощью userId, а затем убить поток каждый новый поиск
def streamTweets():
search_term = request.form['tweet']
userId = request.form['userId'] # If your limit is one request per user at a time. If multiple windows can be opened and you want to follow this limit, store userId in a cookie.
#Look for any request currently running with this ID, and cancel them
В качестве альтернативы вы можете вернуть requestId
, который вы оставите в интерфейсе, можете вызвать cancelSearch?requestId=$requestId
. В cancelSearch вам нужно будет найти ожидающий запрос (звучит как в tweepy, так как вы не используете свои собственные потоки) и отключите его.
Из любопытства я просто смотрел, что происходит при поиске в Google, и он использует запрос GET. Посмотрите (инструменты отладки → Сеть, затем введите текст и посмотрите автозаполнение). Google использует токен, отправленный с каждым запросом (каждый раз, когда вы вводите что-то)). Это не значит, что он используется для этого, но это в основном то, что я описал. Если вы не хотите сеанса, используйте уникальный идентификатор.
Ответ 3
Ну, я решил это, используя метод таймера. Но все же я ищу питоновский путь.
from streamer import StreamListener
def stream():
hashtag = input
#assign each user an ID ( for pubsub )
StreamListener.userid = random_user_id
def handler(signum, frame):
print("Forever is over")
raise Exception("end of time")
def main_stream():
stream = tweepy.Stream(auth, StreamListener())
stream.filter(track=track,async=True)
redirect(url_for('map_stream'))
def close_stream():
# this is for closing client list in redis but don't know it working
obj = redis.client_list(tweet_stream)
redis_client_list = obj[0]['addr']
redis.client_kill(redis_client_list)
stream = tweepy.Stream(auth, StreamListener())
stream.disconnect()
import signal
signal.signal(signal.SIGALRM, handler)
signal.alarm(300)
try:
main_stream()
except Exception:
close_stream()
print("function terminate")