Использование именованных каналов с bash - проблема с потерей данных
Был ли поиск в Интернете, найдены простые "учебные пособия" для использования именованных каналов. Однако, когда я делаю что-либо с фоновыми заданиями, я, кажется, теряю много данных.
[[Изменить: найдено гораздо более простое решение, см. ответ на сообщение. Итак, вопрос, который я задал, теперь академичен - в случае, если вам нужен сервер заданий]]
Использование Ubuntu 10.04 с Linux 2.6.32-25-generiС# 45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux
GNU bash, версия 4.1.5 (1) -release (x86_64-pc-linux-gnu).
Моя функция bash:
function jqs
{
pipe=/tmp/__job_control_manager__
trap "rm -f $pipe; exit" EXIT SIGKILL
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
while true
do
if read txt <"$pipe"
then
echo "$(date +'%Y'): new text is [[$txt]]"
if [[ "$txt" == 'quit' ]]
then
break
fi
fi
done
}
Я запускаю это в фоновом режиме:
> jqs&
[1] 5336
И теперь я его кормлю:
for i in 1 2 3 4 5 6 7 8
do
(echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done
Выход несовместим.
Я часто не получаю отклика на успех.
Я получаю как можно больше новых текстовых эхо, так как успех эха, иногда меньше.
Если я удалю '&' из "feed", похоже, работает, но я заблокирован до тех пор, пока не будет прочитан вывод. Поэтому я хочу, чтобы субпроцессы блокировались, но не основной процесс.
Цель состоит в том, чтобы написать простой контроль над заданиями script, чтобы я мог запускать, скажем, 10 заданий, в большинстве случаев, и оставлять остальное для последующей обработки, но надежно знать, что они выполняются.
Полный менеджер вакансий ниже:
function jq_manage
{
export __gn__="$1"
pipe=/tmp/__job_control_manager_"$__gn__"__
trap "rm -f $pipe" EXIT
trap "break" SIGKILL
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
while true
do
date
jobs
if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
then
echo "Waiting for new job"
if read new_job <"$pipe"
then
echo "new job is [[$new_job]]"
if [[ "$new_job" == 'quit' ]]
then
break
fi
echo "In group $__gn__, starting job $new_job"
eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
fi
else
sleep 3
fi
done
}
function jq
{
# __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
# __jN__ = second parameter to this function, the maximum of job numbers to run concurrently
export __gn__="$1"
shift
export __jN__="$1"
shift
export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
if (($__jq__ '<' 1))
then
eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
fi
pipe=/tmp/__job_control_manager_"$__gn__"__
echo [email protected] >$pipe
}
Вызов
jq <name> <max processes> <command>
jq abc 2 sleep 20
запустит один процесс.
Эта часть отлично работает. Начните второй, отлично.
Кажется, что один за другим работает нормально.
Но начало 10 в петле, похоже, потеряло систему, как в более простом примере выше.
Любые подсказки относительно того, что я могу сделать для решения этой очевидной потери данных МПК, будут с благодарностью.
С уважением,
Ален.
Ответы
Ответ 1
Ваша проблема в if
ниже:
while true
do
if read txt <"$pipe"
....
done
Что происходит, так это то, что ваш сервер очереди задач открывает и закрывает канал каждый раз вокруг цикла. Это означает, что некоторые из клиентов получают ошибку "сломанной трубы", когда они пытаются записать в трубу, то есть читатель трубы уходит после того, как автор открывает его.
Чтобы исправить это, измените свой цикл на сервере, откройте его один раз для всего цикла:
while true
do
if read txt
....
done < "$pipe"
Проделайте этот путь, труба открывается один раз и остается открытой.
Вам нужно быть осторожным с тем, что вы запускаете внутри цикла, поскольку вся обработка внутри цикла будет иметь stdin, прикрепленный к именованному каналу. Вы хотите, чтобы вы перенаправляли stdin всех ваших процессов внутри цикла из другого места, иначе они могут потреблять данные из канала.
Изменить: если проблема заключается в том, что вы получаете EOF при чтении, когда последний клиент закрывает трубу, вы можете использовать метод jilles для обнуления файловых дескрипторов, или вы можете просто убедиться, что вы тоже клиент, и сохраняйте сторона записи трубки открыта:
while true
do
if read txt
....
done < "$pipe" 3> "$pipe"
Это приведет к тому, что сторона записи откроется на fd 3. Такая же оговорка применима к этому файловому дескриптору, как и к stdin. Вам нужно будет закрыть его, чтобы любые дочерние процессы не наследовали его. Это, вероятно, имеет меньшее значение, чем для stdin, но было бы более чистым.
Ответ 2
Как сказано в других ответах, вы должны постоянно держать fifo открытым во избежание потери данных.
Однако, как только все авторы ушли после того, как fifo был открыт (так появился писатель), сразу же возвращается результат (и poll()
возвращает POLLHUP
). Единственный способ очистить это состояние - это снова открыть fifo.
POSIX не дает решения для этого, но, по крайней мере, Linux и FreeBSD: если чтение начинается с сбоя, снова откройте fifo, сохраняя исходный дескриптор открытым. Это работает, потому что в Linux и FreeBSD состояние "зависания" является локальным для конкретного описания открытого файла, а в POSIX оно глобально относится к fifo.
Это можно сделать в оболочке script следующим образом:
while :; do
exec 3<tmp/testfifo
exec 4<&-
while read x; do
echo "input: $x"
done <&3
exec 4<&3
exec 3<&-
done
Ответ 3
Как camh и Деннис Уильямсон говорят, не сломайте трубку.
Теперь у меня меньше примеров, прямо в командной строке:
Сервер:
(
for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
do
if read s;
then echo ">>$i--$s//";
else
echo "<<$i";
fi;
done < tst-fifo
)&
Клиент:
(
for i in {%a,#b}{1,2}{0,1};
do
echo "Test-$i" > tst-fifo;
done
)&
Можно заменить строку клавиш:
(echo "Test-$i" > tst-fifo&);
Все данные клиента, отправленные в трубу, считываются, хотя с помощью второй опции клиента может потребоваться запустить сервер пару раз, прежде чем все данные будут прочитаны.
Но хотя чтение ожидает данных в трубе для начала, после того, как данные были нажаты, он читает пустую строку навсегда.
Как остановить это?
Спасибо за любые идеи снова.
Ответ 4
Для тех, кому может быть интересно, [[отредактировано]] после комментариев от camh и jilles, вот две новые версии тестового сервера script.
Обе версии теперь работают точно так, как надеялись.
версия camh для управления трубами:
function jqs # Job queue manager
{
pipe=/tmp/__job_control_manager__
trap "rm -f $pipe; exit" EXIT TERM
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
while true
do
if read -u 3 txt
then
echo "$(date +'%Y'): new text is [[$txt]]"
if [[ "$txt" == 'quit' ]]
then
break
else
sleep 1
# process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
fi
fi
done 3< "$pipe" 4> "$pipe" # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}
jille версия для управления трубами:
function jqs # Job queue manager
{
pipe=/tmp/__job_control_manager__
trap "rm -f $pipe; exit" EXIT TERM
if [[ ! -p "$pipe" ]]; then
mkfifo "$pipe"
fi
exec 3< "$pipe"
exec 4<&-
while true
do
if read -u 3 txt
then
echo "$(date +'%Y'): new text is [[$txt]]"
if [[ "$txt" == 'quit' ]]
then
break
else
sleep 1
# process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
fi
else
# Close the pipe and reconnect it so that the next read does not end up returning EOF
exec 4<&3
exec 3<&-
exec 3< "$pipe"
exec 4<&-
fi
done
}
Спасибо всем за вашу помощь.
Ответ 5
С одной стороны, проблема хуже, чем я думал:
Теперь, как представляется, в моем более сложном примере (jq_manage), где одни и те же данные снова и снова считываются из канала (даже если новые записи не записываются в него).
С другой стороны, я нашел простое решение (отредактированное после комментария Денниса):
function jqn # compute the number of jobs running in that group
{
__jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}
function jq
{
__groupn__="$1"; shift # job group name (the pool within which to allocate $__jmax__ jobs)
__jmax__="$1"; shift # maximum of job numbers to run concurrently
jqn
while (($__jqty__ '>=' $__jmax__))
do
sleep 1
jqn
done
eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; [email protected]) &"
}
Работает как шарм.
Нет сокета или трубы.
Простой.
Ответ 6
run говорит, что 10 заданий в большинстве случаев и очереди остальных для последующей обработки, но надежно знают, что они запускают
Вы можете сделать это с помощью GNU Parallel. Вам не понадобится этот скриптинг.
http://www.gnu.org/software/parallel/man.html#options
Вы можете установить max-procs "Количество мест занятости". Выполнять параллельное выполнение N заданий ". Существует возможность установить количество ядер процессора, которые вы хотите использовать. Вы можете сохранить список выполненных заданий в файл журнала, но это бета-функция.