2015-03-30 11 views
2

Я написал небольшой скрипт Bash, который использует инструменты inotify и интерфейс inotify. Моя проблема заключается в том, что одна из команд в этой функции может блокировать выполнение до его завершения. Таким образом, функция застревает.inotifywait с fifo queue в Bash

Чтобы решить эту проблему, я хотел бы поставить в очередь обнаруженные файлы (по событию close) и прочитать очередь из другой функции. Кто-нибудь знает, как это сделать в Bash?

Следующие переменные являются простыми строками для поиска каталогов или для назначения имен файлов.

inotifywait -mrq -e close --format %w%f /some/dir/ | while read FILE 
do 
    NAME=$(echo $CAP)_"`date +"%F-%H-%M-%S"`.pcap" 
    logger -i "$FILE was just closed" 
    # cp "$FILE" "$DATA/$CAP/$ENV/$NAME" 
    rsync -avz --stats --log-file=/root/rsync.log "$FILE" "$DATA/$CAP/$ENV/$NAME" >> /root/rsync_stats.log 
    RESULT=$? 
    if [ $RESULT -eq 0 ] ; then 
     logger -i "Success: $FILE copied to SAN $DATA/$CAP/$ENV/$NAME, code $RESULT" 
    else 
     logger -i "Fail: $FILE copy failed to SAN for $DATA/$CAP/$ENV/$NAME, code $RESULT" 
    fi 

    rm "$FILE" 
    RESULT=$? 
    if [ $RESULT -eq 0 ] ; then 
     logger -i "Success: deletion successfull for $FILE, code $RESULT" 
    else 
     logger -i "Fail: deletion failed for $FILE on SSD, code $RESULT" 
    fi 

    do_something() 
    logger -i "$NAME was handled" 
    # for stdout 
    echo "`date`: Moved file" 
done 

Я копирую файлы в том SAN, который иногда имеет отклонения во времени ответа. Вот почему эта функция может застрять некоторое время. Я заменил cp на Rsync, потому что мне нужна статистика пропускной способности. Cp (от coreutils), по-видимому, этого не делает.

ответ

2

Пара идей:

1) Вы можете использовать именованный канал в виде очереди ограниченного размера:

mkfifo pipe 

your_message_source | while read MSG 
do 
    #collect files in a pipe 
    echo "$MSG" >> pipe 
done & 

while read MSG 
do 
#Do your blocking work here 
done < pipe 

Это заблокирует на echo "$MSG" >> pipe, когда буфер трубы заполняется (вы можете получить . размер этого буфера с ulimit -p (путем умножения на 512) Это может быть достаточно для некоторых случаев

2) Вы можете использовать файл в виде очереди сообщений и файл блокировки его на каждой операции:.

#Feeder part 
    your_message_source | while read MSG  
     do 
      (
      flock 9 
      echo "$MSG" >> file_based_queue 
      ) 9> file_based_queue 
     done & 

    # Worker part 
    while : 
    do 
    #Lock shared queue and cut'n'paste it's content to the worker's private queue 
    (
     flock 9 
     cp file_based_queue workers_queue 
     truncate -s0 file_based_queue 
    ) 9> file_based_queue 

    #process private queue 
    while read MSG 
    do 
    #Do your blocking work here 
    done < workers_queue 
    done 

Вы блокируете inotifywait только в том случае, если вы находитесь в рабочем цикле в подклассе (flock ...) 9> file_based_queue и после команды flock одновременно. У вас могут быть очереди в RAMdisk (/ dev/shm), чтобы свести к минимуму время, проведенное вами, чтобы вы не пропустили события FS.

3) Или вы можете использовать некоторый интерфейс bash для (или запускать скрипты на языках с интерфейсом) в очереди сообщений с поддержкой базы данных или в очередь сообщений SysV.

2

Это пример использования файла в виде очереди FIFO, имеет неограниченный размер, постоянный при перезагрузке системы и позволяет использовать несколько читателей и писателей.

#!/bin/bash 

# manages a FIFO queue on a system file. 
# every message is a line of text. 
# supports multiple writers and multiple readers. 
# 
# Requires: bash, inotify-tools: /usr/bin/inotifywait, 
# ed, util-linux: /usr/bin/flock 

set -e 

# Retrieves one element 
# param: 
# pipe_name 
# writes to stdout: 
# element_string 
# returns: 
# true on succes, false on error or end of data 
_pipe_pull() { 
    local pipe="${1}" 
    local msg pid 

_pipe_pop() { 
    local fd1 
    (if ! flock --timeout 1 --exclusive ${fd1}; then 
     echo "Error: _pipe_pop can't get a lock." >&2 
     return 1 
    fi 
     [ ! -s "${pipe}" ] || \ 
      ed -s "${pipe}" <<< $'1p\n1d\nw' 
    ) {fd1}< "${pipe}" 
    : 
} 

    msg="" 
    while [ -z "${msg}" ]; do 
     if [ ! -s "${pipe}" ]; then 
      inotifywait -e modify "${pipe}" > /dev/null 2>&1 & 
      pid="${!}" 
      wait "${pid}" || return 1 
     fi 

     msg="$(_pipe_pop)" || \ 
      return 1 

     if [ "${msg}" = $'\x04' ]; then 
      echo "_pipe_pull: end of data." >&2 
      return 1 
     fi 
    done 
    printf '%s\n' "${msg}" 
    : 
} 

# Adds multiple elements at once 
# param: 
# pipe_name elem_1 ... elem_N 
# returns: 
# true on succes, false on error 
_pipe_push() { 
    local pipe="${1}" 
    local fd1 
    shift 

    (if ! flock --timeout 10 --exclusive ${fd1}; then 
     echo "Error: _pipe_push can't get a lock." >&2 
     return 1 
    fi 
     printf '%s\n' "${@}" >> "${pipe}" 
    ) {fd1}< "${pipe}" 
} 

pipe_file_1="$(mktemp /tmp/pipe-XXXXXX.txt)" 

# submit first reader process 
while msg="$(_pipe_pull "${pipe_file_1}")"; do 
    printf 'Reader 1:%s\n' "${msg}" 
done & 

# submit second reader process 
while msg="$(_pipe_pull "${pipe_file_1}")"; do 
    printf 'Reader 2:%s\n' "${msg}" 
done & 

# submit first writer process 
for i in {1..10}; do 
    _pipe_push "${pipe_file_1}" "Next comes ${i}" "${i}" 
done & 
pid1="${!}" 

# submit second writer process 
for i in {11..20}; do 
    _pipe_push "${pipe_file_1}" "${i}" "Previous was ${i}" 
done & 
pid2="${!}" 

# submit third writer process 
for i in {21..30}; do 
    _pipe_push "${pipe_file_1}" "${i}" 
done & 
pid3="${!}" 

# waiting for the end of writer processes 
wait ${pid1} ${pid2} ${pid3} 

# signal end of data to two readers 
_pipe_push "${pipe_file_1}" $'\x04' $'\x04' 

# waiting for the end of reader processes 
wait 

# cleaning 
rm -vf "${pipe_file_1}" 
: