我一直在尝试使用GNU Parallel并行化以下脚本,特别是三个FOR循环实例中的每个实例,但未能做到。 FOR循环中包含的4个命令按顺序运行,每个循环大约需要10分钟。

#!/bin/bash

kar='KAR5'
runList='run2 run3 run4'
mkdir normFunc
for run in $runList
do 
  fsl5.0-flirt -in $kar"deformed.nii.gz" -ref normtemp.nii.gz -omat $run".norm1.mat" -bins 256 -cost corratio -searchrx -90 90 -searchry -90 90 -searchrz -90 90 -dof 12 
  fsl5.0-flirt -in $run".poststats.nii.gz" -ref $kar"deformed.nii.gz" -omat $run".norm2.mat" -bins 256 -cost corratio -searchrx -90 90 -searchry -90 90 -searchrz -90 90 -dof 12 
  fsl5.0-convert_xfm -concat $run".norm1.mat" -omat $run".norm.mat" $run".norm2.mat"
  fsl5.0-flirt -in $run".poststats.nii.gz" -ref normtemp.nii.gz -out $PWD/normFunc/$run".norm.nii.gz" -applyxfm -init $run".norm.mat" -interp trilinear

  rm -f *.mat
done


#1 楼



foo () {
    local run=
    fsl5.0-flirt -in $kar"deformed.nii.gz" -ref normtemp.nii.gz -omat $run".norm1.mat" -bins 256 -cost corratio -searchrx -90 90 -searchry -90 90 -searchrz -90 90 -dof 12 
    fsl5.0-flirt -in $run".poststats.nii.gz" -ref $kar"deformed.nii.gz" -omat $run".norm2.mat" -bins 256 -cost corratio -searchrx -90 90 -searchry -90 90 -searchrz -90 90 -dof 12 
    fsl5.0-convert_xfm -concat $run".norm1.mat" -omat $run".norm.mat" $run".norm2.mat"
    fsl5.0-flirt -in $run".poststats.nii.gz" -ref normtemp.nii.gz -out $PWD/normFunc/$run".norm.nii.gz" -applyxfm -init $run".norm.mat" -interp trilinear
}

for run in $runList; do foo "$run" & done


如果不清楚,重要部分在这里:

for run in $runList; do foo "$run" & done
                                   ^


使该函数在后台的派生shell中执行。这是平行的。

评论


就像魅力一样。谢谢。如此简单的实现(现在让我感到如此愚蠢!)。

–拉夫诺尔·吉尔(Ravnoor S Gill)
2013年12月5日21:24

如果我有8个文件可以并行运行,但只有4个内核,那么是否可以将其集成到这样的设置中,或者需要Job Scheduler?

–拉夫诺尔·吉尔(Ravnoor S Gill)
2013年12月5日21:27

在这种情况下,这并不重要;系统具有比核心更多的活动进程是正常的。如果您有许多简短的任务,理想情况下,您将输入由数量或工作线程<内核数服务的队列。我不知道用shell脚本真正完成该操作的频率(在这种情况下,它们不是线程,而是独立的进程),但是相对较少的长任务却毫无意义。操作系统调度程序将照顾它们。

–金锁
2013年12月5日21:50



您可能还想在末尾添加一个等待命令,以便在所有后台作业都退出之前,主脚本不会退出。

–psusi
2015年11月19日,0:22

我还限制了并发进程的数量,这很有用:我的每个进程都使用100%的内核时间(约25分钟)。这是在具有16个核心的共享服务器上,其中许多人正在运行作业。我需要运行该脚本的23个副本。如果我同时运行它们,那么我将淹没服务器,并使其在一两个小时内对其他所有人无用(负载增加到30,其他所有东西都变慢了)。我想这可以很好地完成,但是我不知道它是否会完成。

–naught101
2015年11月26日23:00

#2 楼

示例任务

task(){
   sleep 0.5; echo "";
}


顺序运行

for thing in a b c d e f g; do 
   task "$thing"
done


并行运行

for thing in a b c d e f g; do 
  task "$thing" &
done


并行运行N个进程

N=4
(
for thing in a b c d e f g; do 
   ((i=i%N)); ((i++==0)) && wait
   task "$thing" & 
done
)


还可以将FIFO用作信号量,并使用它们来确保产生新的进程为尽快运行,并且最多只能同时运行N个进程。但是它需要更多代码。

基于FIFO的信号量的N个进程:

# initialize a semaphore with a given number of tokens
open_sem(){
    mkfifo pipe-$$
    exec 3<>pipe-$$
    rm pipe-$$
    local i=
    for((;i>0;i--)); do
        printf %s 000 >&3
    done
}

# run the given command asynchronously and pop/push tokens
run_with_lock(){
    local x
    # this read waits until there is something to read
    read -u 3 -n 3 x && ((0==x)) || exit $x
    (
     ( "$@"; )
    # push the return code of the command to the semaphore
    printf '%.3d' $? >&3
    )&
}

N=4
open_sem $N
for thing in {a..g}; do
    run_with_lock task $thing
done 


解释:通过推(= printf)和弹出(= read)令牌('000'),我们将文件描述符3用作信号量。通过推送已执行任务的返回代码,我们可以在出现问题时中止操作。

评论


等待的行基本上允许所有进程运行,直到到达第n个进程,然后等待所有其他进程完成运行,对吗?

–naught101
2015年11月26日23:03



如果我为零,请致电等待。零测试后增加i。

– PSkocik
2015年11月26日23:08



@ naught101是的。 w / no arg等待所有孩子。这有点浪费。基于管道的信号量方法为您提供了更流畅的并发性(一段时间以来,我一直在将其与-nt / -ot检查一起成功地用于基于定制Shell的构建系统中)

– PSkocik
18 Mar 10 '18 at 20:02

@ BeowulfNode42您不必退出。只要在任务进程退出/崩溃后将状态(或具有该字节长度的内容)写回到fifo,任务的返回状态就不会损害信号量的一致性。

– PSkocik
18/12/17在10:27

仅供参考,mkfifo pipe-$$命令需要对当前目录具有适当的写访问权限。因此,我更喜欢指定完整路径,例如/ tmp / pipe-$$,因为它很可能具有当前用户可用的写访问权限,而不是依赖于当前目录是什么。是的,替换所有3次出现的pipe-$$。

– BeowulfNode42
19年7月29日在3:13

#3 楼

for stuff in things
do
( something
  with
  stuff ) &
done
wait # for all the something with stuff


它是否真正起作用取决于您的命令;我对他们不熟悉。如果rm *.mat并行运行,则看起来容易产生冲突。

评论


这也运行得很好。没错,我必须将rm * .mat更改为rm $ run“ .mat”之类的东西,才能使其正常工作,而一个过程不会干扰另一个过程。谢谢。

–拉夫诺尔·吉尔(Ravnoor S Gill)
2013年12月5日21:38



@RavnoorSGill欢迎来到Stack Exchange!如果此答案解决了您的问题,请勾选它旁边的对勾,将其标记为已接受。

–吉尔斯'所以-不再是邪恶的'
2013年12月5日23:54

+1等待,我忘了。

–金锁
2013年12月6日在12:13

如果有大量的“事物”,这将不会启动大量的过程吗?最好同时启动多个合理的进程,对吗?

–大卫·多里亚(David Doria)
15年3月20日在15:17

非常有用的提示!在这种情况下如何设置线程数?

–张大东
19年5月7日在18:12

#4 楼

for stuff in things
do
sem -j+0 "something; \
  with; \
  stuff"
done
sem --wait


这将使用信号量,并行化与可用核心数量一样多的迭代(-j +0表示将并行化N + 0个作业,其中N是可用核心数量)。

sem --wait告诉您等待直到for循环中的所有迭代都终止执行,然后再执行连续的代码行。

注意:您将需要“ parallel”来自GNU并行项目(sudo apt-get install parallel)。

评论


有可能超过60岁吗?我的抛出一个错误,指出文件描述符不足。

–碎肉
15年11月27日在7:47

如果这也是由于括号引起的语法错误,请查看moritzschaefer的答案。

–尼古拉
19年7月18日在14:17

在debian稳定版上,我在4附近遇到语法错误:script.sh:语法错误:“(”意外(期望“完成”,像工作方向一样的子外壳将成为杀手er功能)

–雷神召唤师
20-2-11在21:44

@ThorSummoner我根据此答案调整了代码

–lev
20-2-12在10:53

#5 楼

我经常使用的一种非常简单的方法:

cat "args" | xargs -P $NUM_PARALLEL command


这将运行命令,并并行传递“ args”文件的每一行,最多运行$同时NUM_PARALLEL。

,如果需要在不同位置替换输入参数,也可以查看xargs的-I选项。

评论


如果您有(例如)要处理的文件名列表。但是,如果您是书呆子,那实际上不是for循环。不过,我觉得解决方案很优雅。

– AdamKalisz
20-4-17的7:42

#6 楼

在最多N个进程并发中并行执行
只是一个普通的bash脚本-无需外部lib / app。

评论


对于没有外部库的每个人来说,这都是可以理解和简单的。谢谢

–voxter
20 Mar 10 '20 at 7:51

优秀的解决方案!只是一件小事(或者我错过了什么):在以N = 3运行示例时,“ ... -ge $ N”似乎比“ -gt $ N”更准确

– n1ghtm4n4g3r
20年9月7日在14:17

@Thenightmanager-是的,您是对的,谢谢! (固定示例)

– TomaszHławiczka
20/09/08在12:42

#7 楼

看来fsl作业彼此依赖,因此这4个作业无法并行运行。但是,这些运行可以并行运行。

使bash函数运行一次运行并并行运行该函数:

#!/bin/bash

myfunc() {
    run=
    kar='KAR5'
    mkdir normFunc
    fsl5.0-flirt -in $kar"deformed.nii.gz" -ref normtemp.nii.gz -omat $run".norm1.mat" -bins 256 -cost corratio -searchrx -90 90 -searchry -90 90 -searchrz -90 90 -dof 12 
    fsl5.0-flirt -in $run".poststats.nii.gz" -ref $kar"deformed.nii.gz" -omat $run".norm2.mat" -bins 256 -cost corratio -searchrx -90 90 -searchry -90 90 -searchrz -90 90 -dof 12 
    fsl5.0-convert_xfm -concat $run".norm1.mat" -omat $run".norm.mat" $run".norm2.mat"
    fsl5.0-flirt -in $run".poststats.nii.gz" -ref normtemp.nii.gz -out $PWD/normFunc/$run".norm.nii.gz" -applyxfm -init $run".norm.mat" -interp trilinear
}

export -f myfunc
parallel myfunc ::: run2 run3 run4


要了解更多信息,请观看介绍性视频:https://www.youtube.com/playlist?list=PL284C9FF2488BC6D1并花一个小时来浏览该教程http://www.gnu.org/software/parallel/parallel_tutorial.html命令行会为此而爱您。

评论


如果您使用的是非bash shell,则还需要在并行运行之前导出SHELL = / bin / bash。否则,您将收到类似以下的错误:未知命令'myfunc arg'

–安德鲁·哈维(AndrewHarvey)
2015年7月31日在3:39



@AndrewHarvey:那不是shebang的目的吗?

–naught101
2015年11月26日23:02

#8 楼

我非常喜欢@lev的答案,因为它以非常简单的方式提供了对最大进程数的控制。但是,如手册中所述,sem不适用于括号。
for stuff in things
do
sem -j +0 "something; \
  with; \
  stuff"
done
sem --wait

执行此工作。并行运行这么多作业。对于计算密集型作业,-j +0很有用,因为它将同时运行CPU核心数作业。
-j -N从CPU核心数中减去N。并行运行这么多作业。如果评估的数字小于1,则将使用1。另请参见--use-cpus-instead-of-cores。


#9 楼

就我而言,我无法使用信号量(我在Windows上处于git-bash中),因此我想出了一种通用方法,可以在N个工作程序开始之前将其分配给N个工作程序。

如果任务花费大致相同的时间,则效果很好。缺点是,如果其中一个工人花费很长时间来完成其工作,其他已经完成的工人将无济于事。

将工作分配给N个工人(每个核心1个) )

# array of assets, assuming at least 1 item exists
listAssets=( {a..z} ) # example: a b c d .. z
# listAssets=( ~/"path with spaces/"*.txt ) # could be file paths

# replace with your task
task() { #  = idWorker,  = asset
  echo "Worker : Asset '' START!"
  # simulating a task that randomly takes 3-6 seconds
  sleep $(( ($RANDOM % 4) + 3 ))
  echo "    Worker : Asset '' OK!"
}

nVirtualCores=$(nproc --all)
nWorkers=$(( $nVirtualCores * 1 )) # I want 1 process per core

worker() { #  = idWorker
  echo "Worker  GO!"
  idAsset=0
  for asset in "${listAssets[@]}"; do
    # split assets among workers (using modulo); each worker will go through
    # the list and select the asset only if it belongs to that worker
    (( idAsset % nWorkers ==  )) && task  "$asset"
    (( idAsset++ ))
  done
  echo "    Worker  ALL DONE!"
}

for (( idWorker=0; idWorker<nWorkers; idWorker++ )); do
  # start workers in parallel, use 1 process for each
  worker $idWorker &
done
wait # until all workers are done


#10 楼

我在@PSkocik的解决方案上遇到了麻烦。我的系统没有作为软件包提供GNU Parallel,并且在我手动构建和运行它时sem引发了异常。然后,我也尝试了FIFO信号量示例,该示例还引发了其他一些有关通信的错误。 )。 >
function _jobs_get_count_e {
   jobs -r | wc -l | tr -d " "
}

function _jobs_set_max_parallel {
   g_jobs_max_jobs=
}

function _jobs_get_max_parallel_e {
   [[ $g_jobs_max_jobs ]] && {
      echo $g_jobs_max_jobs

      echo 0
   }

   echo 1
}

function _jobs_is_parallel_available_r() {
   (( $(_jobs_get_count_e) < $g_jobs_max_jobs )) &&
      return 0

   return 1
}

function _jobs_wait_parallel() {
   # Sleep between available jobs
   while true; do
      _jobs_is_parallel_available_r &&
         break

      sleep 0.1s
   done
}

function _jobs_wait() {
   wait
}


示例用法:

#!/bin/bash

source "_lib_jobs.sh"

_jobs_set_max_parallel 3

# Run 10 jobs in parallel with varying amounts of work
for a in {1..10}; do
   _jobs_wait_parallel

   # Sleep between 1-2 seconds to simulate busy work
   sleep_delay=$(echo "scale=1; $(shuf -i 10-20 -n 1)/10" | bc -l)

   ( ### ASYNC
   echo $a
   sleep ${sleep_delay}s
   ) &
done

# Visualize jobs
while true; do
   n_jobs=$(_jobs_get_count_e)

   [[ $n_jobs = 0 ]] &&
      break

   sleep 0.1s
done