CSV-копия в Postgres с массивом настраиваемого типа с использованием JDBC
У меня есть настраиваемый тип, определенный в моей базе данных, как
CREATE TYPE address AS (ip inet, port int);
И таблица, которая использует этот тип в массиве:
CREATE TABLE my_table (
addresses address[] NULL
)
У меня есть образец CSV файла со следующим содержимым
{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}
И я использую следующий фрагмент кода для выполнения моей COPY:
Class.forName("org.postgresql.Driver");
String input = loadCsvFromFile();
Reader reader = new StringReader(input);
Connection connection = DriverManager.getConnection(
"jdbc:postgresql://db_host:5432/db_name", "user",
"password");
CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
String copyCommand = "COPY my_table (addresses) " +
"FROM STDIN WITH (" +
"DELIMITER '\t', " +
"FORMAT csv, " +
"NULL '\\N', " +
"ESCAPE '\"', " +
"QUOTE '\"')";
copyManager.copyIn(copyCommand, reader);
Выполнение этой программы создает следующее исключение:
Exception in thread "main" org.postgresql.util.PSQLException: ERROR: malformed record literal: "(10.10.10.1"
Detail: Unexpected end of input.
Where: COPY only_address, line 1, column addresses: "{(10.10.10.1,80),(10.10.10.2,443)}"
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2422)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1114)
at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:963)
at org.postgresql.core.v3.CopyInImpl.endCopy(CopyInImpl.java:43)
at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:185)
at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:160)
Я пробовал с различными комбинациями круглых скобок во входе, но, похоже, не мог заставить COPY работать. Любые идеи, в которых я могу ошибиться?
Ответы
Ответ 1
См. Https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ для проекта с тестом JUnit, который делает то, что вы хотите.
В принципе, вы хотите использовать запятые для двух вещей: для разделения элементов массива и для разделения полей типа, но вы НЕ хотите, чтобы синтаксический анализ CSV интерпретировал запятые как разделители полей.
Так
- вы хотите сказать, что синтаксический анализатор CSV рассматривает всю строку как одну строку, одно поле, которое вы можете сделать, включив его в одинарные кавычки и сообщив парсеру CSV об этом, и
- вы хотите, чтобы анализатор полей PG рассматривал каждый экземпляр типа элемента массива, заключенный в двойную кавычку.
Код:
copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);
Пример DML 1:
COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''
Пример CSV 1:
'{"(10.0.0.1,1)","(10.0.0.2,2)"}'
'{"(10.10.10.1,80)","(10.10.10.2,443)"}'
'{"(10.10.10.3,8080)","(10.10.10.4,4040)"}'
Пример DML 2, избегая двойных кавычек:
COPY my_table (addresses) FROM STDIN WITH CSV
Пример CSV 2, выходящий из двойных кавычек:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"
Полный класс тестирования JUnit:
package io.mikael.poc;
import com.google.common.io.CharStreams;
import org.junit.*;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.testcontainers.containers.PostgreSQLContainer;
import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import static java.nio.charset.StandardCharsets.UTF_8;
public class CopyTest {
private Reader reader;
private Connection connection;
private CopyManager copyManager;
private static final String CREATE_TYPE = "CREATE TYPE address AS (ip inet, port int)";
private static final String CREATE_TABLE = "CREATE TABLE my_table (addresses address[] NULL)";
private String loadCsvFromFile(final String fileName) throws IOException {
try (InputStream is = getClass().getResourceAsStream(fileName)) {
return CharStreams.toString(new InputStreamReader(is, UTF_8));
}
}
@ClassRule
public static PostgreSQLContainer db = new PostgreSQLContainer("postgres:10-alpine");
@BeforeClass
public static void beforeClass() throws Exception {
Class.forName("org.postgresql.Driver");
}
@Before
public void before() throws Exception {
String input = loadCsvFromFile("/data_01.csv");
reader = new StringReader(input);
connection = DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
connection.setAutoCommit(false);
connection.beginRequest();
connection.prepareCall(CREATE_TYPE).execute();
connection.prepareCall(CREATE_TABLE).execute();
}
@After
public void after() throws Exception {
connection.rollback();
}
@Test
public void copyTest01() throws Exception {
copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);
final StringWriter writer = new StringWriter();
copyManager.copyOut("COPY my_table TO STDOUT WITH CSV", writer);
System.out.printf("roundtrip:%n%s%n", writer.toString());
final ResultSet rs = connection.prepareStatement(
"SELECT array_to_json(array_agg(t)) FROM (SELECT addresses FROM my_table) t")
.executeQuery();
rs.next();
System.out.printf("json:%n%s%n", rs.getString(1));
}
}
Выход теста:
roundtrip:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"
json:
[{"addresses":[{"ip":"10.0.0.1","port":1},{"ip":"10.0.0.2","port":2}]},{"addresses":[{"ip":"10.10.10.1","port":80},{"ip":"10.10.10.2","port":443}]},{"addresses":[{"ip":"10.10.10.3","port":8080},{"ip":"10.10.10.4","port":4040}]}]
Ответ 2
В формате CSV, когда вы указываете сепаратор, вы не можете использовать его в качестве символа в своих данных, если только вы не избежите его!
пример файла csv с использованием запятой в качестве разделителя
правильная запись: data1, data2
результаты анализа data1, data2
: [0] => data1 [1] => data2
неверный: data,1, data2
результаты анализа данных 2: [0] => data [1] => 1 [2] => data2
наконец, вам не нужно загружать файл в виде csv, но как простой файл, поэтому замените свой метод loadCsvFromFile();
от
public String loadRecordsFromFile(File file) {
LineIterator it = FileUtils.lineIterator(file, "UTF-8");
StringBuilder sb = new StringBuilder();
try {
while (it.hasNext()) {
sb.append(it.nextLine()).append(System.nextLine);
}
}
finally {
LineIterator.closeQuietly(iterator);
}
return sb.toString();
}
Не забудьте добавить эту зависимость в свой файл pom
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
Или загрузить JAR из commons.apache.org
Ответ 3
1NF
Прежде всего, я думаю, что ваш дизайн таблицы неправильный, потому что он не соответствует 1NF. Каждое поле должно содержать только атомные атрибуты, но это не так. Почему бы не использовать таблицу:
CREATE TABLE my_table (
id,
ip inet,
port int
)
Где id
- номер вашей строки в исходном файле, а ip
/port
один из адресов этой строки? Пример данных:
id | ip | port
-----------------------
1 | 10.10.10.1 | 80
1 | 10.10.10.2 | 443
2 | 10.10.10.3 | 8080
2 | 10.10.10.4 | 4040
...
Следовательно, вы сможете запросить свою базу данных по одному адресу (найти все связанные адреса, вернуть true, если два адреса находятся в одной строке, что бы вы ни захотели...).
Загрузка данных
Но пусть предположим, что вы знаете, что делаете. Основная проблема здесь заключается в том, что ваш файл входных данных находится в специальном формате. Это может быть один CSV файл столбца, но это был бы очень вырожденный CSV файл. В любом случае, вы должны преобразовать строки, прежде чем вставлять их в базу данных. У вас есть два варианта:
- вы читаете каждую строку входного файла, и вы делаете
INSERT
(это может занять некоторое время); - вы конвертируете входной файл в текстовый файл с ожидаемым форматом и используете
COPY
.
Вставить один за другим
Первые варианты кажутся легкими: для первой строки csv файла, {(10.10.10.1,80),(10.10.10.2,443)}
, вы должны запустить запрос:
INSERT INTO my_table VALUES (ARRAY[('10.10.10.1',80),('10.10.10.2',443)]::address[], 4)
Для этого вам просто нужно создать новую строку:
String value = row.replaceAll("\\{", "ARRAY[")
.replaceAll("\\}", "]::address[]")
.replaceAll("\\(([0-9.]+),", "'$1'");
String sql = String.format("INSERT INTO my_table VALUES (%s)", value);
И выполните запрос для каждой строки входного файла (или для лучшей безопасности используйте подготовленный оператор).
Вставить с помощью COPY
Я расскажу о втором варианте. Вы должны использовать в Java-коде:
copyManager.copyIn(sql, from);
Где запрос на копирование является оператором COPY FROM STDIN
а from
читателя. Заявление будет:
COPY my_table (addresses) FROM STDIN WITH (FORMAT text);
Чтобы прокормить диспетчер копирования, вам нужны данные (обратите внимание на кавычки):
{"(10.10.10.1,80)","(10.10.10.2,443)"}
{"(10.10.10.3,8080)","(10.10.10.4,4040)"}
С временным файлом
Более простой способ получить данные в правильном формате - создать временный файл. Вы читаете каждую строку входного файла и заменяете (
на "(
и )
на )"
. Запишите обработанную строку во временный файл. Затем передайте читателю этот файл в диспетчер копирования.
На лету
С двумя потоками вы можете использовать два потока:
-
thread 1 считывает входной файл, обрабатывает строки один за другим и записывает их в PipedWriter
.
-
thread 2 передает PipedReader
подключенный к предыдущему PipedWriter
в диспетчер копирования.
Основная трудность заключается в синхронизировать потоки таким образом, что поток 2 начинает читать PipedReader
до того, как поток 1 начнет записывать данные в PipedWriter
. См. Мой проект для примера.
С помощью пользовательского читателя. from
читателя может быть примером чего-то вроде (наивная версия):
class DataReader extends Reader {
PushbackReader csvFileReader;
private boolean wasParenthese;
public DataReader(Reader csvFileReader) {
this.csvFileReader = new PushbackReader(csvFileReader, 1);
wasParenthese = false;
}
@Override
public void close() throws IOException {
this.csvFileReader.close();
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
// rely on read()
for (int i = off; i < off + len; i++) {
int c = this.read();
if (c == -1) {
return i-off > 0 ? i-off : -1;
}
cbuf[i] = (char) c;
}
return len;
}
@Override
public int read() throws IOException {
final int c = this.csvFileReader.read();
if (c == '(' && !this.wasParenthese) {
this.wasParenthese = true;
this.csvFileReader.unread('(');
return '"'; // add " before (
} else {
this.wasParenthese = false;
if (c == ')') {
this.csvFileReader.unread('"');
return ')'; // add " after )
} else {
return c;
}
}
}
}
(Это наивная версия, потому что правильный способ сделать это - переопределить только public int read(char[] cbuf, int off, int len)
. Но затем вы должны обработать cbuf
чтобы добавить кавычки и сохранить дополнительные символы толкается вправо: это немного утомительно). Теперь, если r
является читателем для файла:
{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}
Просто используйте:
Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
.getConnection("jdbc:postgresql://db_host:5432/db_base", "user", "passwd");
CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
copyManager.copyIn("COPY my_table FROM STDIN WITH (FORMAT text)", new DataReader(r));
При массовой загрузке
Если вы загружаете огромный объем данных, не забудьте основные советы: отключите автообновление, удалите индексы и ограничения и используйте TRUNCATE
и ANALYZE
следующим образом:
TRUNCATE my_table;
COPY ...;
ANALYZE my_table;
Это ускорит загрузку.