Трубы и фильтры на уровне СУБД: разделение выходного потока MERGE
Сценарий
У нас есть довольно стандартный процесс импорта данных, в котором мы загружаем
staging
, затем MERGE
в таблицу target
.
Новые требования (зеленый) включают захват подмножества импортированных данных
в отдельную таблицу queue
для полностью несвязанной обработки.
![Схема сценария]()
"Задача"
(1) Подмножество состоит из набора записей: те, которые были
только что вставлен в таблицу target
.
(2) Подмножество является проекцией некоторых вставленных столбцов, но также
по крайней мере, один столбец, который присутствует только в источнике (staging
таблицу).
(3) Оператор MERGE
уже использует предложение OUTPUT..INTO
строго записать $action
, взятый на MERGE
, чтобы мы могли
PIVOT
результат и COUNT
количество вставок, обновлений и
исключений для целей статистики. Нам действительно не нравится буферизация
действия для всего набора данных, подобные этому, и предпочли бы агрегировать
суммы на лету. Излишне говорить, что мы не хотим добавлять больше данных для
это таблица OUTPUT
.
(4) Мы не хотим выполнять соответствующую работу, чтобы MERGE
второй раз по какой-либо причине, даже частично.
target
таблица действительно большая, мы не можем индексировать все, и
операция обычно довольно дорога (минуты, а не секунды).
(5) Мы не рассматриваем возможность округления любых результатов от MERGE
до
клиент, чтобы клиент мог перенаправить его на queue
на
немедленно отправив его обратно. Данные должны оставаться на сервере.
(6) Мы хотим избежать буферизации всего набора данных во временном хранилище
между staging
и queue
.
Каким будет лучший способ этого?
Неудачи
(a) Требование о регистрации только вставленных записей предотвращает нас
от ориентации таблицы queue
непосредственно в предложении OUTPUT..INTO
MERGE
, поскольку это не допускает предложения WHERE
. Мы можем использовать некоторые
CASE
обман, чтобы отметить нежелательные записи для последующего удаления
из queue
без обработки, но это кажется сумасшедшим.
(b) Поскольку некоторые столбцы, предназначенные для queue
, не отображаются в
target
, мы не можем просто добавить триггер ввода в цель
таблицу для загрузки queue
. "Расщепление потока данных" должно произойти раньше.
(c) Поскольку мы уже использовали предложение OUTPUT..INTO
в MERGE
, мы
не может добавить второе предложение OUTPUT
и вставить MERGE
в
INSERT..SELECT
, чтобы загрузить очередь. Это позор, потому что это
чувствует себя как совершенно произвольное ограничение для того, что работает
очень хорошо в противном случае; SELECT
фильтрует только записи с
$action
мы хотим (INSERT
) и INSERT
их в queue
в одном
выражение. Таким образом, СУБД теоретически может избежать буферизации всего
dataset и просто поместите его в queue
. (Примечание: мы не преследовали
и, вероятно, это фактически не оптимизировало план таким образом.)
Ситуация
Мы чувствуем, что исчерпали наши возможности, но решили обратиться к hivemind
быть уверенным. Все, что мы можем придумать, это:
(S1) Создайте таблицу VIEW
таблицы target
, которая также содержит значение NULL
столбцы для данных, предназначенных только для queue
, и
Оператор SELECT
определяет их как NULL
. Затем установите INSTEAD OF
триггеры, которые заполняют таблицу target
и queue
соответственно. Наконец, подключите MERGE
, чтобы настроить таргетинг на представление. Эта
работает, но мы не поклонники конструкции - это определенно
выглядит сложнее.
(S2) Откажитесь, буферизируйте весь набор данных во временной таблице, используя
другой MERGE..OUTPUT
. После MERGE
немедленно скопируйте данные
(снова!) из временной таблицы в queue
.
Ответы
Ответ 1
Я понимаю, что основным препятствием является ограничение предложения OUTPUT
в SQL Server. Он позволяет одному OUTPUT INTO table
и/или одному OUTPUT
возвращать результирующий набор для вызывающего.
Вы хотите сохранить результат оператора MERGE
двумя способами:
- все строки, на которые повлиял
MERGE
для сбора статистики
- только вставленные строки для
queue
Простой вариант
Я бы использовал ваше решение S2. По крайней мере, для начала. Его легко понять и поддерживать, и он должен быть достаточно эффективным, потому что самая ресурсоемкая операция (MERGE
в Target
сама будет выполняться только один раз). Ниже приведен второй вариант, и было бы интересно сравнить их производительность с реальными данными.
Итак:
- Используйте
OUTPUT INTO @TempTable
в MERGE
- Либо
INSERT
все строки из @TempTable
в Stats
или агрегат перед вставкой. Если вам нужна сводная статистика, имеет смысл объединить результаты этой партии и объединить ее в окончательный Stats
вместо копирования всех строк.
-
INSERT
в queue
только "вставленные" строки из @TempTable
.
Я возьму образцы данных из ответа @i-one.
Схема
-- I'll return to commented lines later
CREATE TABLE [dbo].[TestTarget](
-- [ID] [int] IDENTITY(1,1) NOT NULL,
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStaging](
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStats](
[MergeAction] [nvarchar](10) NOT NULL
);
CREATE TABLE [dbo].[TestQueue](
-- [TargetID] [int] NOT NULL,
[foo] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
Примеры данных
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
Объединить
DECLARE @TempTable TABLE (
MergeAction nvarchar(10) NOT NULL,
foo varchar(10) NULL,
baz varchar(10) NULL);
MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;
INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;
INSERT INTO [dbo].[TestQueue]
([foo]
,[baz])
SELECT
T.foo
,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
Результат
TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A | AA |
| B | BB |
| C | CC |
+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C | CCC |
+-----+-----+
Второй вариант
Протестировано на SQL Server 2014 Express.
OUTPUT
может отправлять свой результирующий набор в таблицу и вызывающую. Таким образом, OUTPUT INTO
может напрямую войти в Stats
, и если мы завершим оператор MERGE
в хранимую процедуру, то мы можем использовать INSERT ... EXEC
в queue
.
Если вы рассмотрите план выполнения, вы увидите, что INSERT ... EXEC
создает временную таблицу за кулисами (см. также Скрытые затраты INSERT EXEC на
Adam Machanic), поэтому я ожидаю, что общая производительность будет похожа на первый вариант, когда вы создаете временную таблицу явно.
Еще одна проблема для решения: queue
таблица должна иметь только "вставленные" строки, а не все строки. Для этого вы можете использовать триггер в таблице queue
для удаления строк, отличных от "вставленных". Еще одна возможность состоит в том, чтобы определить уникальный индекс с помощью IGNORE_DUP_KEY = ON
и подготовить данные таким образом, чтобы "не вставленные" строки нарушали уникальный индекс и не включались в таблицу.
Итак, я добавлю столбец ID IDENTITY
в таблицу Target
, и я добавлю столбец TargetID
в таблицу queue
. (Раскомментируйте их в script выше).
Кроме того, я добавлю индекс в таблицу queue
:
CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
[TargetID] ASC
) WITH (
PAD_INDEX = OFF,
STATISTICS_NORECOMPUTE = OFF,
SORT_IN_TEMPDB = OFF,
IGNORE_DUP_KEY = ON,
DROP_EXISTING = OFF,
ONLINE = OFF,
ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON)
Важная часть - UNIQUE
и IGNORE_DUP_KEY = ON
.
Вот хранимая процедура для MERGE
:
CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
MERGE INTO dbo.TestTarget AS Dst
USING dbo.TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action INTO dbo.TestStats(MergeAction)
OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID,
inserted.foo,
Src.baz
;
END
Использование
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
([TargetID]
,[foo]
,[baz])
VALUES
(0
,NULL
,NULL);
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
Результат
TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
| 1 | A | AA |
| 2 | B | BB |
| 3 | C | CC |
+----+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+----------+------+------+
| TargetID | foo | baz |
+----------+------+------+
| 0 | NULL | NULL |
| 3 | C | CCC |
+----------+------+------+
Во время INSERT ... EXEC
появится дополнительное сообщение:
Duplicate key was ignored.
если MERGE
обновлено несколько строк. Это предупреждающее сообщение отправляется, когда уникальный индекс отбрасывает некоторые строки во время INSERT
из-за IGNORE_DUP_KEY = ON
.
При добавлении повторяющихся значений ключа появится предупреждающее сообщение в уникальный индекс. Только строки, нарушающие ограничение единственности не удастся.
Ответ 2
Рассмотрим два подхода к решению проблемы:
- Объединить данные в целевые и выходные данные, вставленные в очередь в одном из операторов, и суммировать статистику в триггере, созданном на целевом сервере. Пакетный идентификатор может быть передан в триггер через временную таблицу.
- Объединить данные в целевые и выходные данные, вставленные в очередь в одном операторе, и суммировать статистику сразу же после слияния, используя встроенные возможности отслеживания изменений, вместо того, чтобы делать это в триггере.
Подход 1 (объединить данные и собрать статистику в триггере):
Пример настройки данных (индексы и ограничения опущены для простоты):
create table staging (foo varchar(10), bar varchar(10), baz varchar(10));
create table target (foo varchar(10), bar varchar(10));
create table queue (foo varchar(10), baz varchar(10));
create table stats (batchID int, inserted bigint, updated bigint, deleted bigint);
insert into staging values
('A', 'AA', 'AAA')
,('B', 'BB', 'BBB')
,('C', 'CC', 'CCC')
;
insert into target values
('A', 'A_')
,('B', 'B?')
,('E', 'EE')
;
Триггер для сбора вставленной/обновленной/удаленной статистики:
create trigger target_onChange
on target
after delete, update, insert
as
begin
set nocount on;
if object_id('tempdb..#targetMergeBatch') is NULL
return;
declare @batchID int;
select @batchID = batchID from #targetMergeBatch;
merge into stats t
using (
select
batchID = @batchID,
cntIns = count_big(case when i.foo is not NULL and d.foo is NULL then 1 end),
cntUpd = count_big(case when i.foo is not NULL and d.foo is not NULL then 1 end),
cntDel = count_big(case when i.foo is NULL and d.foo is not NULL then 1 end)
from inserted i
full join deleted d on d.foo = i.foo
) s
on t.batchID = s.batchID
when matched then
update
set
t.inserted = t.inserted + s.cntIns,
t.updated = t.updated + s.cntUpd,
t.deleted = t.deleted + s.cntDel
when not matched then
insert (batchID, inserted, updated, deleted)
values (s.batchID, s.cntIns, s.cntUpd, cntDel);
end
Операторы слияния:
declare @batchID int;
set @batchID = 1;-- or select @batchID = batchID from ...;
create table #targetMergeBatch (batchID int);
insert into #targetMergeBatch (batchID) values (@batchID);
insert into queue (foo, baz)
select foo, baz
from
(
merge into target t
using staging s
on t.foo = s.foo
when matched then
update
set t.bar = s.bar
when not matched then
insert (foo, bar)
values (s.foo, s.bar)
when not matched by source then
delete
output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
;
drop table #targetMergeBatch
Проверьте результаты:
select * from target;
select * from queue;
select * from stats;
Цель:
foo bar
---------- ----------
A AA
B BB
C CC
Queue
foo baz
---------- ----------
C CCC
Статистика:
batchID inserted updated deleted
-------- ---------- --------- ---------
1 1 2 1
Подход 2 (собирайте статистику, используя возможности отслеживания изменений):
Настройка пробных данных такая же, как в предыдущем случае (просто отбросьте все, включая триггер и заново создайте таблицы с нуля), за исключением того, что в этом случае нам нужно иметь PK для цели, чтобы сделать работу образца:
create table target (foo varchar(10) primary key, bar varchar(10));
Включить отслеживание изменений в базе данных:
alter database Test
set change_tracking = on
Включить отслеживание изменений в целевой таблице:
alter table target
enable change_tracking
Слияние данных и статистика захвата сразу после этого, фильтрация по контексту изменения для подсчета только строк, затронутых слиянием:
begin transaction;
declare @batchID int, @chVersion bigint, @chContext varbinary(128);
set @batchID = 1;-- or select @batchID = batchID from ...;
SET @chVersion = change_tracking_current_version();
set @chContext = newid();
with change_tracking_context(@chContext)
insert into queue (foo, baz)
select foo, baz
from
(
merge into target t
using staging s
on t.foo = s.foo
when matched then
update
set t.bar = s.bar
when not matched then
insert (foo, bar)
values (s.foo, s.bar)
when not matched by source then
delete
output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
;
with ch(foo, op) as (
select foo, sys_change_operation
from changetable(changes target, @chVersion) ct
where sys_change_context = @chContext
)
insert into stats (batchID, inserted, updated, deleted)
select @batchID, [I], [U], [D]
from ch
pivot(count_big(foo) for op in ([I], [U], [D])) pvt
;
commit transaction;
Проверьте результаты:
select * from target;
select * from queue;
select * from stats;
Они такие же, как в предыдущем примере.
Цель:
foo bar
---------- ----------
A AA
B BB
C CC
Queue
foo baz
---------- ----------
C CCC
Статистика:
batchID inserted updated deleted
-------- ---------- --------- ---------
1 1 2 1
Ответ 3
Я предлагаю извлечь статистику, используя кодирование с использованием трех независимых триггеров AFTER INSERT / DELETE / UPDATE
по строкам:
create trigger dbo.insert_trigger_target
on [dbo].[target]
after insert
as
insert into dbo.[stats] ([action],[count])
select 'insert', count(1)
from inserted;
go
create trigger dbo.update_trigger_target
on [dbo].[target]
after update
as
insert into dbo.[stats] ([action],[count])
select 'update', count(1) from inserted -- or deleted == after / before image, count will be the same
go
create trigger dbo.delete_trigger_target
on [dbo].[target]
after delete
as
insert into dbo.[stats] ([action],[count])
select 'delete', count(1) from deleted
go
Если вам нужно больше контекста, поместите что-то в CONTEXT_INFO
и вырвите его из триггеров.
Теперь я буду утверждать, что триггеры AFTER не так дороги, но вам нужно проверить это, чтобы быть уверенным.
Имея дело с этим, вы можете использовать предложение OUTPUT
( НЕ OUTPUT INTO
) в MERGE
, а затем использовать его, вложенное в выборку для подмножества данных что вы хотите войти в таблицу queue
.
Обоснование
Из-за необходимости доступа к столбцам из staging
и target
для создания данных для queue
, этот HAS выполняется с помощью параметра OUTPUT
в MERGE
, поскольку ничто иное не имеет доступа к "обеим сторонам".
Затем, если мы убьем предложение OUTPUT
для queue
, как мы можем переделать эту функциональность? Я думаю, что триггеры AFTER
будут работать, учитывая требования к указанной вами статистике. Действительно, статистика может быть довольно сложной, если требуется, с учетом доступных изображений. Я утверждаю, что триггеры AFTER
"не так дороги", поскольку данные до и после должны всегда быть доступны, чтобы транзакция могла быть как COMMITTED ИЛИ ROLLED BACK - да, данные должны быть отсканированы (даже чтобы получить счет), но это не похоже на слишком большую стоимость.
В моем собственном анализе сканирование добавило около 5% к базовой стоимости плана выполнения
Звучит как решение?
Ответ 4
Рассматривали ли вы слияние слияния и просто вставляете, где не существует и обновление? Затем вы можете использовать предложение вывода из вставки для заполнения вашей таблицы очередей.
Ответ 5
Импорт через промежуточную таблицу может быть более эффективным с последовательной, а не ориентированной на заданную обработку. Я хотел бы переписать MERGE
в хранимую процедуру с помощью сканирования курсора. Затем для каждой записи вы можете иметь столько выходов, сколько хотите, плюс любые значения без поворота при общей стоимости одного сканирования таблицы staging
.
Сохраненная процедура может также предоставлять возможности для разделения обработки на более мелкие транзакции, тогда как триггеры на больших наборах данных могут привести к переполнению журнала транзакций.
Ответ 6
Если мне что-то не хватает, простая команда вставки должна соответствовать всем вашим требованиям.
insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging join target on staging.foo = target.boo
where whatever
Это произойдет после слияния в цель.
Только для новых записей сделайте это до слияния
insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging left join target on staging.foo = target.boo
where target.foo = null