Выполнить методы класса в потоках (python)
В настоящее время я изучаю Python и Classes, и у меня есть основной вопрос, но я не нашел ответа на него. Скажем, у меня есть этот фиктивный класс
class DomainOperations:
def __init__(self, domain):
self.domain = domain
self.domain_ip = ''
self.website_thumbnail = ''
def resolve_domain(self):
#resolve domain to ipv4 and save to self.domain_ip
def generate_website_thumbnail(self):
#generate website thumbnail and save the url to self.website_thumbnail
Я хочу запускать одновременно resol_domain и generate_website_thumbnail, а когда потоки закончены, я хочу напечатать IP и миниатюру.
EDIT: Я знаю, что должен использовать потоки, может быть, что-то вроде этого
r = DomainOperations('google.com')
t1 = threading.Thread(target=r.resolve_domain)
t1.start()
t2 = threading.Thread(target=r.generate_website_thumbnail)
t2.start()
Но следует ли использовать их вне класса? Должен ли я писать другой класс для обработки потоков?
Что такое правильный путь?
Ответы
Ответ 1
Если вы вызываете их из класса, это просто:
import threading
class DomainOperations:
def __init__(self):
self.domain_ip = ''
self.website_thumbnail = ''
def resolve_domain(self):
self.domain_ip = 'foo'
def generate_website_thumbnail(self):
self.website_thumbnail= 'bar'
def run(self):
t1 = threading.Thread(target=self.resolve_domain)
t2 = threading.Thread(target=self.generate_website_thumbnail)
t1.start()
t2.start()
t1.join()
t2.join()
print(self.domain_ip, self.website_thumbnail)
if __name__ == '__main__':
d = DomainOperations()
d.run()
Ответ 2
Вы можете наследовать класс Thread в DomainOperation, таким образом, код будет более понятным и понятным. Вы должны переопределить метод run().
from threading import threading.Thread
class DomainOperations(Thread):
def __init__(self):
self.domain_ip = ''
self.website_thumbnail = ''
def resolve_domain(self):
self.domain_ip = 'foo'
def generate_website_thumbnail(self):
self.website_thumbnail= 'bar'
def run(self):
#domain will be resolved on first thread
self.resolve_domain()
#thumbnail will be resolved on second OR newly created below thread
thread2 = Thread(target=self.generate_website_thumbnail)
thread.start()
# thread1 will wait for thread2
self.join()
# thread2 will wait for thread1, if it late.
thread2.join()
# here it will print ip and thumbnail before exiting first thread
print(self.domain_ip, self.website_thumbnail)
И вы начнете свои темы таким образом.
if __name__ == '__main__':
thread1 = DomainOperations()
thread1.start()
Ответ 3
def post_test(tbid, line_num, response_time):
"""
:param tbid: 参数id
:return:
"""
# 请求参数
data = {'tbId': tbid, 'conditions': [{"key": "", "type": 1}], 'pageNum': 1, 'pageSize': 12}
# 请求启动时间
start = time.time()
# post请求
r = requests.post(url=url, data=json.dumps(data), headers=headers)
# 请求结束时间
end = time.time()
# 保留两位小数
finall_time = float('%.2f' % float(end - start))
text = json.loads(r.text)
# IO写入 只写入200的
with open('text6.csv', 'a', newline='') as csvfile:
if text['statusCode'] == '200':
throughput = line_num * response_time / finall_time
throughput = float('%.2f' % float(throughput))
print('the perf_counter time of %s is %s and the content is %s ,throughput is %s' % (
tbid, finall_time, json.loads(r.text), throughput))
spamwriter = csv.writer(csvfile, dialect='excel')
spamwriter.writerow([tbid] + [finall_time] + [throughput])
def start_thread(csv_name):
tbid, response_time_sort, throughput_sort = read_csv(csv_name)
print(tbid)
line_num = len(tbid)
response_times = 5
for j in range(response_times):
for i in tbid:
t = threading.Thread(target=post_test, args=(i, line_num, response_times))
t.start()
t.join()
Я не знаю, как вызвать метод в классе, особенно если у него есть параметры инициализации, но вы можете попробовать этот метод。 Я пытаюсь использовать несколько процессов для решения этой проблемы, верно。