For the past couple years a good deal of my work responsibilities have been concerned with parsing large text datasets with Python and extracting business logic.
It always starts the same, dig through a couple day's worth of text data for something interesting. Then, process a weeks worth, a month's worth, a year's worth....processing time grows linearly 'til it becomes a real burden. In some cases, you have a distributed system (like a Hadoop cluster) but more often you have a multi-core Linux system. Today's multi-core systems often have substantial power, but only if you make use of the available cores. The scope of this post will focus on utilizing 'make' to parallelize the sequential processing of a dataset.
The fork-join takes a sequence of jobs, executes them in parallel, awaits completion of these jobs and then joins them into a final result.
A common case, and the one we'll be focusing on is processing a series of files with a single utility, each file independent of the rest.
Let's first set the stage with the sequential base-case. We have a utility
processFile which takes as a command line argument a list of files to process, outputting to stdout some result on
each file (note, the result needs to be independent for each file, not a cumulative result).
$ processFile file1.txt file2.txt file3.txt ... fileN.txt
Running sequentially like this will likely execute on a single core and depending on your system, you may find you have processing and memory underutilized as a result. Processing time grows from a cup of coffee, to a lunch break, to overnight...etc as your file list size grows.
In order to parallelize this form of execution we need to:
1) define our file list
2) split the file list into sublists
3) execute multiple instances of the processFile utility on each sublist simultaneously
4) wait for all sublists to be processed
5) join the output results into a single output
The following
Makefile accomplishes these steps and we'll step through them. But first, if you're not already aware; specifying parallel jobs in make is done by supplying the '-j' directive with a numeric representing the number of preferred jobs. For example
make -j2 specifies a desire to run 2 simultaneous jobs. This will provide our interface for specifying our number of parallel tasks.
$ cat -n Makefile
1 .PHONY: split all clean
2
3 PID := $(shell cat /proc/$$$$/status | grep PPid | awk '{print $$2}')
4 JOBS := $(shell ps -p ${PID} -f | tail -n1 | grep -oP '\-j *\d+' | sed 's/-j//')
5 ifeq "${JOBS}" ""
6 JOBS := 1
7 endif
8
9 SPLIT=$(wildcard x??)
10 OUTFILES=$(addsuffix .out,$(SPLIT))
11
12 all: split
13 ${MAKE} run.out
14
15 run.out: ${OUTFILES}
16 ${SH} cat ${OUTFILES} >> $@
17
18 %.out: %
19 $(SH) stdbuf -o0 processFile `cat $?` > $@
20
21 split: flist.txt
22 $(SH) split -l $$((`wc -l flist.txt | cut -f 1 -d ' '` / ${JOBS} )) $<
23
24 flist.txt:
25 ${SH} find /var/tmp/data -name "*.txt" > $@
26
27 clean:
28 ${RM} flist.txt x?? *.out
For purposes that should become clear, our makefile creates a
flist.txt file that contains the filelist we will then provide to the process utility. The
flist.txt target specified on line 24 is populated by issuing a find request for all *.txt files in the /var/tmp/data directory. The result is a list of files separated by new line characters. This accomplished out
define our file list step. Easy peasy. Saddle up, our next step is a bit more complicated.
In order to accomplish our
split the file list into sublists our objective is to take the
flist.txt file and divide it into J sublists.
J in this case represents the number of parallel tasks. We can/could simply hard-code this variable within the makefile (e.g. JOBS=2) and likely would have made the solution easier but certainly less robust. Lines 3-7 accomplish the assignment of the JOBS variable. Unfortunately, because the '-j' argument value made available to make, getting the value is done in a bit of a round-about manner. Specifically, this is done by getting the current process id of make and grabbing the jobs numeric from the command line. Defaults to 1 if no jobs numeric is provided. With the JOBS value assigned, we next want to take the
flist.txt file and split it into sublists. This is done by the
split target in line 21-22. The syntax is a bit complicated, but all it's really doing is finding the number of lines of
flist.txt and dividing it by JOBS. Specifying 'make -j 2' will split
flist.txt into 2 files using the split command. The output files from split are generated into a series of x?? named files (e.g. xab, xac). The use of the '-l' argument to split ensures we preserve the integrity of the lines, no splits within the line in the file. The result of JOBS=2
should be 2, but may be 3 if the number of lines in the file are odd. It matters little and it's common to get a smaller final file.
Two steps down...in the final stretch.
Lines 9,10,17-19 takes care of our
execute multiple instances of the processFile utility step. Line 9-10 lists the x?? sublist file names and appends a .out suffix to each. On line 15, this results in a dependency for the run.out target of the form
xab.out xac.out....xzz.out. The make dependency engine then satisfies the generation of these files by executing lines 18-19
in parallel. Make awaits the completion of all the x??.out files which satisfies our
wait for all sublists to be processed objective.
Finally, line 16 joins all the x??.out files into a single run.out file. This satisfies our final
join the output results into a single output step.
Now you've got a means of parallel processing on a Ramen noodle budget. Simply specify the number of jobs/processes you want via the -j command line arg.
$ make -j 1 all
$ make -j 2 all
...
$ make -j 16 all
Ahh, one last thing. You may have noticed the subcall to make in line 13. Since the split command generates the files as part of executing make, the files required in line 9-10 won't exist until that rule is executed. I attempted the use of secondary expansion to get around the subcall but couldn't find a workable solution.
I've been using this means of cheap parallelism for a few days now on 16 core servers with wonderful results. Processing tasks that once took 2 hours are now done in 15 minutes if I'm greedy with the cpu/core usage.