Как загрузить данные навалом в хранилище приложений? Старые методы не работают
Это должно быть довольно распространенным требованием и простым процессом: загрузка данных навалом в хранилище данных appengine.
Однако ни одно из старых решений, упомянутых в stackoverflow (ссылки ниже *), похоже, больше не работает. Метод bulkloader, который был самым разумным решением при загрузке в хранилище данных с использованием API DB, не работает с API NDB
И теперь метод bulkloader, кажется, устарел, а старые ссылки, которые все еще присутствуют в документах, приводят к неправильной странице. Вот пример
https://developers.google.com/appengine/docs/python/tools/uploadingdata
Эта ссылка выше по-прежнему присутствует на этой странице: https://developers.google.com/appengine/docs/python/tools/uploadinganapp
Каков рекомендуемый метод для одновременной загрузки данных?
Двумя возможными альтернативами кажутся 1) использование remote_api или 2) запись CSV файла в ведро GCS и чтение из него. У кого-нибудь есть опыт успешного использования любого метода?
Любые указатели будут очень благодарны. Спасибо!
[* Решения, предлагаемые по приведенным ниже ссылкам, более не действительны]
[1] Как отправить данные навалом в хранилище Google appengine?
[2] Как вставить массовые данные в хранилище данных Google App Engine?
Ответы
Ответ 1
Некоторые из вас могут быть в моей ситуации: я не могу использовать утилиту импорта/экспорта хранилища данных, потому что мои данные должны быть преобразованы, прежде чем попасть в хранилище данных.
В итоге я использовал apache-beam (поток данных Google Cloud).
Вам нужно всего лишь написать несколько строк кода "луч", чтобы
- прочитайте свои данные (например, размещены в облачном хранилище) - вы получаете
PCollection
строк, - выполняйте любое преобразование, которое вы хотите (так что вы получаете
PCollection
объектов хранилища данных), - сбрасывать их в раковину хранилища данных.
См. Раздел Как ускорить массовый импорт в облачный хранилище Google с несколькими рабочими? для конкретного варианта использования.
Я мог писать со скоростью 800 единиц в секунду в моем хранилище данных с 5 рабочими. Это позволило мне завершить задачу импорта (с 16 миллионами строк) примерно через 5 часов. Если вы хотите сделать это быстрее, используйте больше работников: D
Ответ 2
Способ 1: Использовать remote_api
Как: напишите файл bulkloader.yaml и запустите его напрямую, используя команду appcfg.py upload_data из терминала
Я не рекомендую этот метод по нескольким причинам: 1. огромная латентность 2. отсутствие поддержки NDB
Метод 2: GCS и использование mapreduce
Загрузка файла данных в GCS:
Используйте " storage-file-transfer-json-python" проект github (chunked_transfer.py) для загрузки файлов в gcs из вашей локальной системы.
Обязательно создайте файл "client-secrets.json" из консоли администратора приложения.
MapReduce:
Используйте проект appengine-mapreduce "github. Скопируйте папку" mapreduce" в папку верхнего уровня вашего проекта.
Добавьте строку ниже в файл app.yaml:
includes:
- mapreduce/include.yaml
Ниже приведен ваш файл main.py
import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader
def testmapperFunc(newRequest):
f = StringIO.StringIO(newRequest)
reader = csv.reader(f, delimiter=',')
for row in reader:
newEntry = DataStoreModel(attr1=row[0], link=row[1])
yield op.db.Put(newEntry)
class TestGCSReaderPipeline(base_handler.PipelineBase):
def run(self, filename):
yield mapreduce_pipeline.MapreducePipeline(
"test_gcs",
"testgcs.testmapperFunc",
"mapreduce.input_readers.FileInputReader",
mapper_params={
"files": [filename],
"format": 'lines'
},
shards=1)
class tempTestRequestGCSUpload(webapp2.RequestHandler):
def get(self):
bucket_name = os.environ.get('BUCKET_NAME',
app_identity.get_default_gcs_bucket_name())
bucket = '/gs/' + bucket_name
filename = bucket + '/' + 'tempfile.csv'
pipeline = TestGCSReaderPipeline(filename)
pipeline.with_params(target="mapreducetestmodtest")
pipeline.start()
self.response.out.write('done')
application = webapp2.WSGIApplication([
('/gcsupload', tempTestRequestGCSUpload),
], debug=True)
Чтобы помнить:
- Проект Mapreduce использует устаревший "API файлов облачных хранилищ Google". Поэтому поддержка в будущем не гарантируется.
- Снижение карты добавляет небольшие накладные расходы на чтение и запись хранилища данных.
Метод 3: Клиентская библиотека GCS и GCS
- Загрузите файл csv/text в gcs, используя вышеупомянутый метод передачи файлов.
- Используйте библиотеку клиентов gcs (скопируйте папку "cloudstorage" в папку верхнего уровня приложения).
Добавьте приведенный ниже код в файл main.py приложения.
import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel
class UploadGCSData(webapp2.RequestHandler):
def get(self):
bucket_name = os.environ.get('BUCKET_NAME',
app_identity.get_default_gcs_bucket_name())
bucket = '/' + bucket_name
filename = bucket + '/tempfile.csv'
self.upload_file(filename)
def upload_file(self, filename):
gcs_file = gcs.open(filename)
datareader = csv.reader(gcs_file)
count = 0
entities = []
for row in datareader:
count += 1
newProd = DataStoreModel(attr1=row[0], link=row[1])
entities.append(newProd)
if count%50==0 and entities:
ndb.put_multi(entities)
entities=[]
if entities:
ndb.put_multi(entities)
application = webapp2.WSGIApplication([
('/gcsupload', UploadGCSData),
], debug=True)
Ответ 3
Удаленный API-метод, как показано в вашей ссылке [1], все еще работает нормально - хотя он очень медленный, если у вас более нескольких сотен строк.
Я успешно использовал GCS совместно с картой MapReduce, чтобы загружать, а не загружать содержимое хранилища данных, но принципы должны быть одинаковыми. См. Документацию mapreduce: на самом деле вам нужен только шаг mapper, поэтому вы можете определить простую функцию, которая принимает строку из вашего CSV и создает datastore из этих данных.
Ответ 4
По состоянию на 2018 год лучший способ сделать это - использовать новые возможности импорта/экспорта.