Ответ 1
Читайте мой пост здесь:
Последовательность в postgresql с блокировкой и выбор для обновления
Если вы используете транзакцию и LOCK TABLE, у вас не будет проблем.
У меня типичная проблема производителя и потребителя:
Несколько приложений-производителей записывают запросы на задания в таблицу заданий в базе данных PostgreSQL.
Запросы заданий имеют поле состояния, которое начинается с QUEUED при создании.
Есть несколько потребительских приложений, которые уведомляются по правилу, когда производитель вставляет новую запись:
CREATE OR REPLACE RULE "jobrecord.added" AS
ON INSERT TO jobrecord DO
NOTIFY "jobrecordAdded";
Они попытаются зарезервировать новую запись, установив ее состояние в положение RESERVED. Конечно, только на потребителя должно получиться. Все остальные потребители не должны резервировать одну и ту же запись. Вместо этого они должны резервировать другие записи с состоянием = QUEUED.
Пример: некоторые производители добавили следующие записи в таблицу jobrecord:
id state owner payload
------------------------
1 QUEUED null <data>
2 QUEUED null <data>
3 QUEUED null <data>
4 QUEUED null <data>
теперь два потребителя A, B хотят их обработать. Они начинают работать одновременно. Нужно зарезервировать идентификатор 1, другой должен зарезервировать id 2, тогда первый, кто закончит, должен зарезервировать id 3 и т.д.
В чистом многопоточном мире я бы использовал мьютекс для контроля доступа к очереди заданий, но потребители - это разные процессы, которые могут выполняться на разных машинах. Они получают доступ только к одной базе данных, поэтому вся синхронизация должна выполняться через базу данных.
Я прочитал много документации о параллельном доступе и блокировке в PostgreSQL, например. http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Выберите разблокированную строку в Postgresql PostgreSQL и блокировка
Из этих тем я узнал, что следующий SQL-оператор должен делать то, что мне нужно:
UPDATE jobrecord
SET owner= :owner, state = :reserved
WHERE id = (
SELECT id from jobrecord WHERE state = :queued
ORDER BY id LIMIT 1
)
RETURNING id; // will only return an id when they reserved it successfully
К сожалению, когда я запускаю это в нескольких потребительских процессах, примерно в 50% случаев они по-прежнему сохраняют одну и ту же запись, обрабатывая ее и переписывая изменения другой.
Что мне не хватает? Как мне написать инструкцию SQL, чтобы несколько пользователей не оставляли одну и ту же запись?
Читайте мой пост здесь:
Последовательность в postgresql с блокировкой и выбор для обновления
Если вы используете транзакцию и LOCK TABLE, у вас не будет проблем.
Я использую postgres для очереди FIFO. Первоначально я использовал ACCESS EXCLUSIVE, что дает правильные результаты в высоком concurrency, но имеет неприятный эффект от взаимного исключения с pg_dump, который получает блокировку ACCESS SHARE во время ее выполнения. Это приводит к тому, что моя функция next() блокируется очень долго (длительность pg_dump). Это было неприемлемо, так как мы являемся магазином 24x7, и клиентам не нравилось мертвое время в очереди посреди ночи.
Я полагал, что должен быть менее ограничивающий замок, который по-прежнему будет одновременно устойчивым, а не заблокирован, пока работает pg_dump. Мой поиск привел меня к этому сообщению SO.
Затем я сделал некоторые исследования.
Для функции NEXT() очереди FIFO достаточно использовать следующие режимы, которые будут обновлять статус задания из очереди в очередь без каких-либо concurrency сбоев, а также не блокировать pg_dump:
SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE
Query:
begin;
lock table tx_test_queue in exclusive mode;
update
tx_test_queue
set
status='running'
where
job_id in (
select
job_id
from
tx_test_queue
where
status='queued'
order by
job_id asc
limit 1
)
returning job_id;
commit;
Результат выглядит так:
UPDATE 1
job_id
--------
98
(1 row)
Ниже приведена оболочка script, которая проверяет весь режим блокировки при высоком concurrency (30).
#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock FAIL
# accessShare FAIL
# rowShare FAIL
# rowExclusive FAIL
# shareUpdateExclusive SUCCESS
# share FAIL+DEADLOCKS
# shareRowExclusive SUCCESS
# exclusive SUCCESS
# accessExclusive SUCCESS, but LOCKS against pg_dump
#config
strategy="exclusive"
db=postgres
dbuser=postgres
queuecount=100
concurrency=30
# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &
echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
"noLock") strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessShare") strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowShare") strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowExclusive") strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"share") strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareRowExclusive") strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"exclusive") strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessExclusive") strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
*) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
Код также здесь, если вы хотите изменить: https://gist.github.com/1083936
Я обновляю свое приложение, чтобы использовать режим EXCLUSIVE, так как он является самым ограничительным режимом, который a) правильный, и b) не конфликтует с pg_dump. Я выбрал наиболее ограничительный, поскольку он кажется наименее рискованным с точки зрения изменения приложения от ACCESS EXCLUSIVE, не являясь экспертом uber при блокировке postgres.
Я чувствую себя довольно комфортно с моей тестовой установкой и с общими идеями, стоящими за ответом. Я надеюсь, что совместное использование помогает решить эту проблему для других.
Не нужно делать целую блокировку таблицы для этого: \.
Блокировка строк, созданная с помощью for update
, отлично работает.
См. https://gist.github.com/mackross/a49b72ad8d24f7cefc32 для изменения, которое я сделал для ответа apinstein, и подтвердил, что он все еще работает.
Конечный код
update
tx_test_queue
set
status='running'
where
job_id in (
select
job_id
from
tx_test_queue
where
status='queued'
order by
job_id asc
limit 1 for update
)
returning job_id;
как насчет просто выбрать?
SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;
https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE
Вы можете посмотреть, как это делает queue_classic. https://github.com/ryandotsmith/queue_classic
Код довольно короткий и понятный.
Откроем PgQ вместо того, чтобы изобретать колесо.
Хорошо, вот решение, которое работает для меня, основано на ссылке от Йордани. Поскольку некоторые из моих проблем были в том, как работает Qt-SQL, я включил Qt-код:
QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(
"UPDATE jobrecord "
" SET \"owner\"= :owner, state = :reserved "
" WHERE id = ( "
" SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
" ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED);
bool result = query.exec();
Чтобы проверить, если несколько пользователей обрабатывают одно и то же задание, я добавил правило и таблицу журналов:
CREATE TABLE serverjobrecord_log
(
serverjobrecord_id integer,
oldowner text,
newowner text
) WITH ( OIDS=FALSE );
CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1
DO INSERT INTO jobrecord_log (id, oldowner, newowner)
VALUES (new.id, old.owner, new.owner);
Без оператора LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE;
таблица журналов заполняет случайные записи, если один из пользователей перезаписал значения другого, но с помощью оператора LOCK таблица журналов остается пустой: -)