Использование именованных каналов с bash - проблема с потерей данных

Был ли поиск в Интернете, найдены простые "учебные пособия" для использования именованных каналов. Однако, когда я делаю что-либо с фоновыми заданиями, я, кажется, теряю много данных.

[[Изменить: найдено гораздо более простое решение, см. ответ на сообщение. Итак, вопрос, который я задал, теперь академичен - в случае, если вам нужен сервер заданий]]

Использование Ubuntu 10.04 с Linux 2.6.32-25-generiС# 45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash, версия 4.1.5 (1) -release (x86_64-pc-linux-gnu).

Моя функция bash:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

Я запускаю это в фоновом режиме:

> jqs&
[1] 5336

И теперь я его кормлю:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

Выход несовместим. Я часто не получаю отклика на успех. Я получаю как можно больше новых текстовых эхо, так как успех эха, иногда меньше.

Если я удалю '&' из "feed", похоже, работает, но я заблокирован до тех пор, пока не будет прочитан вывод. Поэтому я хочу, чтобы субпроцессы блокировались, но не основной процесс.

Цель состоит в том, чтобы написать простой контроль над заданиями script, чтобы я мог запускать, скажем, 10 заданий, в большинстве случаев, и оставлять остальное для последующей обработки, но надежно знать, что они выполняются.

Полный менеджер вакансий ниже:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo [email protected] >$pipe
}

Вызов

jq <name> <max processes> <command>
jq abc 2 sleep 20

запустит один процесс. Эта часть отлично работает. Начните второй, отлично. Кажется, что один за другим работает нормально. Но начало 10 в петле, похоже, потеряло систему, как в более простом примере выше.

Любые подсказки относительно того, что я могу сделать для решения этой очевидной потери данных МПК, будут с благодарностью.

С уважением, Ален.

Ответы

Ответ 1

Ваша проблема в if ниже:

while true
do
    if read txt <"$pipe"
    ....
done

Что происходит, так это то, что ваш сервер очереди задач открывает и закрывает канал каждый раз вокруг цикла. Это означает, что некоторые из клиентов получают ошибку "сломанной трубы", когда они пытаются записать в трубу, то есть читатель трубы уходит после того, как автор открывает его.

Чтобы исправить это, измените свой цикл на сервере, откройте его один раз для всего цикла:

while true
do
    if read txt
    ....
done < "$pipe"

Проделайте этот путь, труба открывается один раз и остается открытой.

Вам нужно быть осторожным с тем, что вы запускаете внутри цикла, поскольку вся обработка внутри цикла будет иметь stdin, прикрепленный к именованному каналу. Вы хотите, чтобы вы перенаправляли stdin всех ваших процессов внутри цикла из другого места, иначе они могут потреблять данные из канала.

Изменить: если проблема заключается в том, что вы получаете EOF при чтении, когда последний клиент закрывает трубу, вы можете использовать метод jilles для обнуления файловых дескрипторов, или вы можете просто убедиться, что вы тоже клиент, и сохраняйте сторона записи трубки открыта:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

Это приведет к тому, что сторона записи откроется на fd 3. Такая же оговорка применима к этому файловому дескриптору, как и к stdin. Вам нужно будет закрыть его, чтобы любые дочерние процессы не наследовали его. Это, вероятно, имеет меньшее значение, чем для stdin, но было бы более чистым.

Ответ 2

Как сказано в других ответах, вы должны постоянно держать fifo открытым во избежание потери данных.

Однако, как только все авторы ушли после того, как fifo был открыт (так появился писатель), сразу же возвращается результат (и poll() возвращает POLLHUP). Единственный способ очистить это состояние - это снова открыть fifo.

POSIX не дает решения для этого, но, по крайней мере, Linux и FreeBSD: если чтение начинается с сбоя, снова откройте fifo, сохраняя исходный дескриптор открытым. Это работает, потому что в Linux и FreeBSD состояние "зависания" является локальным для конкретного описания открытого файла, а в POSIX оно глобально относится к fifo.

Это можно сделать в оболочке script следующим образом:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done

Ответ 3

Как camh и Деннис Уильямсон говорят, не сломайте трубку.

Теперь у меня меньше примеров, прямо в командной строке:

Сервер:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

Клиент:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

Можно заменить строку клавиш:

    (echo "Test-$i" > tst-fifo&);

Все данные клиента, отправленные в трубу, считываются, хотя с помощью второй опции клиента может потребоваться запустить сервер пару раз, прежде чем все данные будут прочитаны.

Но хотя чтение ожидает данных в трубе для начала, после того, как данные были нажаты, он читает пустую строку навсегда.

Как остановить это?

Спасибо за любые идеи снова.

Ответ 4

Для тех, кому может быть интересно, [[отредактировано]] после комментариев от camh и jilles, вот две новые версии тестового сервера script.

Обе версии теперь работают точно так, как надеялись.

версия camh для управления трубами:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

jille версия для управления трубами:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

Спасибо всем за вашу помощь.

Ответ 5

С одной стороны, проблема хуже, чем я думал: Теперь, как представляется, в моем более сложном примере (jq_manage), где одни и те же данные снова и снова считываются из канала (даже если новые записи не записываются в него).

С другой стороны, я нашел простое решение (отредактированное после комментария Денниса):

function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; [email protected]) &"
}

Работает как шарм. Нет сокета или трубы. Простой.

Ответ 6

run говорит, что 10 заданий в большинстве случаев и очереди остальных для последующей обработки, но надежно знают, что они запускают

Вы можете сделать это с помощью GNU Parallel. Вам не понадобится этот скриптинг.

http://www.gnu.org/software/parallel/man.html#options

Вы можете установить max-procs "Количество мест занятости". Выполнять параллельное выполнение N заданий ". Существует возможность установить количество ядер процессора, которые вы хотите использовать. Вы можете сохранить список выполненных заданий в файл журнала, но это бета-функция.