Низкие записи InnoDB за секунду - AWS EC2 для MySQL RDS с использованием Python

У меня есть около 60 ГБ файлов JSON, которые я обрабатываю с помощью Python, а затем вставляю их в базу данных MySQL с помощью Python-MySQL Connector. Каждый файл JSON составляет около 500 МБ

Я использовал экземпляр AWS r3.xlarge EC2 с дополнительным томом для хранения 60 ГБ данных JSON.

Затем я использую экземпляр AWS RDS r3.xlarge MySQL. Эти экземпляры находятся в одном регионе и зоне доступности. Экземпляр EC2 использует следующий Python script для загрузки JSON, его анализа и вставки в MySQL RDS. Мой питон:

import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os

os.chdir("./json_data")

for file in glob.glob("*.json"):
    with open(file, 'rU') as data_file:
        results = json.load(data_file)
        print('working on file:', file)

    cnx = mysql.connector.connect(user='', password='',
        host='')

    cursor = cnx.cursor(buffered=True)

    DB_NAME = 'DB'

    def create_database(cursor):
        try:
            cursor.execute(
                "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
        except mysql.connector.Error as err:
            print("Failed creating database: {}".format(err))
            exit(1)

    try:
        cnx.database = DB_NAME    
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_BAD_DB_ERROR:
            create_database(cursor)
            cnx.database = DB_NAME
        else:
            print(err)
            exit(1)

    add_overall_data = ("INSERT INTO master" 
        "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
        "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")

    add_polyline = ("INSERT INTO polyline"
        "(Overview_polyline, request_no)"
        "VALUES (%(Overview_polyline)s, %(request_no)s)")

    add_summary = ("INSERT INTO summary"
        "(summary, request_no)"
        "VALUES (%(summary)s, %(request_no)s)")

    add_warnings = ("INSERT INTO warnings"
        "(warnings, request_no)"
        "VALUES (%(warnings)s, %(request_no)s)")

    add_waypoint_order = ("INSERT INTO waypoint_order"
        "(waypoint_order, request_no)"
        "VALUES (%(waypoint_order)s, %(request_no)s)")

    add_leg_data = ("INSERT INTO leg_data"
        "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
        "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
    error_messages = []
    for result in results:
        if result["status"] == "OK":
            for leg in result['routes'][0]['legs']:
                try: 
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": leg['dtf']['value'],
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": leg['start_address'],
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": leg['end_address']
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
                except KeyError, e:
                    error_messages.append(e)
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": "000",
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": 'unknown',
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": 'unknown'
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
            for overview_polyline in result['routes']:
                params = {
                "request_no": request_no,
                "Overview_polyline": overview_polyline['overview_polyline']['points']
                }
                cursor.execute(add_polyline, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for summary in result['routes']:
                params = {
                "request_no": request_no,
                "summary": summary['summary']
                }
                cursor.execute(add_summary, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for warnings in result['routes']:
                params = {
                "request_no": request_no,
                "warnings": str(warnings['warnings'])
                }
                cursor.execute(add_warnings, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for waypoint_order in result['routes']:
                params = {
                "request_no": request_no,
                "waypoint_order": str(waypoint_order['waypoint_order'])
                }
                cursor.execute(add_waypoint_order, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for steps in result['routes'][0]['legs'][0]['steps']:
                params = {
                "request_no": request_no,
                "leg_dt": steps['dt']['value'],
                "leg_ds": steps['ds']['value'],
                "leg_O_l": steps['start_location']['lat'],
                "leg_O_ln": steps['start_location']['lng'],
                "leg_D_l": steps['end_location']['lat'],
                "leg_D_ln": steps['end_location']['lng'],
                "leg_html_inst": steps['html_instructions'],
                "leg_polyline": steps['polyline']['points'],
                "leg_travel_mode": steps['travel_mode']
                }
                cursor.execute(add_leg_data, params)
        cnx.commit()
    print('error messages:', error_messages)
    cursor.close()
    cnx.close()
    print('finished' + file)

Используя htop на экземпляре Linux, я вижу следующее: htop процесса python

Что касается базы данных MySQL, используя MySQL Workbench, я могу видеть, что:

MySQL WorkBench Output

Этот python script длится несколько дней, но я только вставил около 20% данных в MySQL.

Мои вопросы - как я могу определить узкое место? Это Python script? Кажется, он использует небольшой объем памяти - могу ли я увеличить его? Я проверил размер пула InnoDB в соответствии с (Как повысить скорость записи InnoDB в секунду в MySQL DB) и обнаружил, что он большой:

SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
|               11674845184 |
+---------------------------+

Поскольку я использую экземпляр RDS и EC2 в том же регионе, я не думаю, что есть узкое место в сети. Указатели, на которых я должен искать самую большую экономию, будут очень рады!

ИЗМЕНИТЬ

Думаю, я, возможно, наткнулся на эту проблему. Для эффективности во время разбора я пишу каждый уровень JSON отдельно. Тем не менее, я должен выполнить запрос для соответствия вложенной части JSON с ее более высоким уровнем. При использовании небольших баз данных этот запрос имеет небольшие накладные расходы. Ive заметил, что скорость вставок резко уменьшилась на этом дБ. Это связано с тем, что он должен искать более крупный и постоянно растущий db для правильного подключения данных JSON.

Я не уверен, как решить эту проблему, кроме как ждать ее.

Ответы

Ответ 1

Я не вижу никаких определений таблиц в Python script.... Но когда мы пытаемся выполнять большие операции с данными - мы всегда будем отключать любые индексы базы данных при загрузке в MySQL - также, если у вас есть какие-либо ограничения /Foreign Ключевое принуждение - это также должно быть отключено при загрузке.

Autocommit отключается по умолчанию при подключении через Connector/Python.

Но я не вижу никаких коммандов в представленном вами коде

Подводя итоги

Отключить/удалить (для загрузки)

- Индексы
 - Ограничения  - Иностранные ключи  - Триггеры

В программе загрузки

- отключить автообновление - фиксировать все n записей (N будет зависеть от вашего размера буфера)

Ответ 2

мой англист плохой

Если я сделаю эту работу, я буду

  • использовать python конвертировать json в txt

  • использовать инструмент mysq imp, импортировать txt в mysql

если вы должны выполнить python + mysql allinone, я предлагаю использовать

insert table values(1),value(2)...value(xxx)  

почему 'SELECT request_no FROM master'multiple появление, следует читать из json

мой englist очень плохой...

Ответ 3

Учитывая эту информацию, похоже, что оба script и БД в основном неактивны. Тонкая настройка на уровне MySQL будет преждевременной.

Вам нужно больше информации о том, что делает ваша программа.

Начните с регистрации того, сколько времени занимает каждый из ваших запросов, сколько ошибок вы получаете и т.д.

Этим SELECTs может потребоваться добавление индекса для выполнения наилучшего, если это проблема вообще.