This is my first time to do such thing intentionally. I believe everyone who has experience of writing shell script, they all have already done multiprocessing, for example: command &.
Here is an example output of a script I am about to show you:
# Creating 10 test files at /tmp/test{0..9}.bin, each is 10MB $ for ((i=0;i<10;i++)); do head -c 10m /dev/urandom > "/tmp/test${i}.bin"; done # The single process $ time for f in /tmp/test?.bin; do md5sum "$f"; done a928dc064f3b0f68386ff8e8ae8c3d8e /tmp/test0.bin 59a2940703258a750a6895efbfead10e /tmp/test1.bin 77dc3bb2b0d70ada17174f73d9b8ba5b /tmp/test2.bin e8be270104dc99d7fc842f6b1a8ed622 /tmp/test3.bin dedd45d0f8168ed3c9ecbf4e7458ab87 /tmp/test4.bin efaaa7064a849ab4f4dbd69153fcc11b /tmp/test5.bin 961520ac959d156a71d80628d001a20b /tmp/test6.bin 110185133ecc6b538b0c383295f3a137 /tmp/test7.bin 3f1901a68e828c7dfe16f1a84805dedc /tmp/test8.bin a4032ebc7417b844fc58a841557c73a4 /tmp/test9.bin real 0m0.426s user 0m0.338s sys 0m0.066s # Multiprocessing with three processes to work on data $ time ./mp.sh W0: W0 started to work... W1: W1 started to work... W2: W2 started to work... W0: a928dc064f3b0f68386ff8e8ae8c3d8e /tmp/test0.bin W1: 59a2940703258a750a6895efbfead10e /tmp/test1.bin W2: 77dc3bb2b0d70ada17174f73d9b8ba5b /tmp/test2.bin W0: e8be270104dc99d7fc842f6b1a8ed622 /tmp/test3.bin W1: dedd45d0f8168ed3c9ecbf4e7458ab87 /tmp/test4.bin W2: efaaa7064a849ab4f4dbd69153fcc11b /tmp/test5.bin W0: 961520ac959d156a71d80628d001a20b /tmp/test6.bin W1: 110185133ecc6b538b0c383295f3a137 /tmp/test7.bin W0: a4032ebc7417b844fc58a841557c73a4 /tmp/test9.bin W2: 3f1901a68e828c7dfe16f1a84805dedc /tmp/test8.bin real 0m0.265s user 0m0.342s sys 0m0.072s
The script mp.sh:
#!/bin/bash MAX_WORKERS=3 worker () { echo "$1 started to work..." while read cmd; do # if receives exit, then do some finishing jobs [[ "$cmd" == "exit" ]] && break md5sum "$cmd" done } get_next () { (( q_id >= ${#queue[@]} )) && next='' && return 0 next="${queue[q_id]}" ((q_id++)) return } for ((i=0;i<MAX_WORKERS;i++)); do # brings up workers and redirection to mute this : # ./mp.sh: line 27: warning: execute_coproc: coproc [22652:W0] still exists eval "coproc W$i { worker W$i; }" &>/dev/null done queue=($(ls -1 /tmp/test?.bin)) q_id=0 while :; do for ((i=0;i<MAX_WORKERS;i++)); do w_stdout="W$i[0]" w_stdin="W$i[1]" read data <&${!w_stdout} if [[ ! -z "$data" ]]; then echo "W$i: $data" get_next [[ -z "$next" ]] && break echo "$next" >&${!w_stdin} fi done [[ -z "$next" ]] && break done # clean up for ((i=0;i<MAX_WORKERS;i++)); do w_stdout="W$i[0]" w_stdin="W$i[1]" echo "exit" >&${!w_stdin} # get the rest of data while read data; do echo "W$i: $data" done <&${!w_stdout} w_pid="W${i}_PID" wait ${!w_pid} done
worker() is the data processor, which is fed by main program with item from queue array. The main program checks if a worker returns data, if so, then try to get new item for the worker who just finished the processing.
I use coproc to create a subshell for worker(), you should notice that I use eval because coproc doesn’t accept using Parameter Expansion to supply the name of co-process. We need to name them from W0, W1, …, and so on. You don’t want to fix the program with fixed number of workers.
In the main loop, you can see w_stdout and w_stdin, which is indirect expansion1, we need it to get the value of W#[0], where # is a digit. When using coproc, the name of co-process is the key to get co-process’ standard input/output and process ID, they are ${NAME[1]}, ${NAME[0]}, and ${NAME_PID}, respectively. You can use read data <&${NAME[0]} ; echo "$data" to get the output of co-process, and echo "blah blah blah" >&${NAME[1]} to feed co-process data.
If you design a better protocol for communicating with workers, it surely can do lots of thing. You can even change command(s) for a worker anytime you need. Currently, it only accept exit, so workers can exit gracefully.
[1] | data=123 ; point_to=data ; echo "${!point_to}" echoes "123". You uses ${!varname} to do such expansion. |
Really cool stuff.. I am going to utilize it in my bash-written screen scrapper tool, which will benefit from multiprocessing!
ReplyDelete