Javaからsortコマンドを使ってソートしてみる

大規模データを扱う場合に限るけど、sortコマンドを使うと、データを処理するプロセスとソートするプロセスを分離できて便利。大体はシェルスクリプトでつなげば済むけど、複雑なデータ処理をしたい場合などは、Javaからsortしたいときもあったりする。

まずはシングルスレッドで。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.Random;

public class Main {
	public static void main(String[] args) {
		Random rgen = new Random();
		ProcessBuilder pb = new ProcessBuilder("sort", "-nu", "-k2");
		try {
			Process process = pb.start();
			try (PrintStream out = new PrintStream(process.getOutputStream())) {
				for (int i = 0; i < 10000; i++) {
					String str = String.format("%d\t%d", rgen.nextInt(1000), rgen.nextInt(1000));
					out.println(str);
				}
			}
			int count = 0;
			try (BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
				String line;
				while ((line = in.readLine()) != null) {
					System.out.println(line);
					count++;
				}
			}
			System.out.println("lines: "+ count);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

これだけだと、Arrays.sortとかCollections.sort使えばよいわけで、わざわざ外部プロセスを使うメリットは少ない。そこで次に、マルチスレッドで書き込んでみる。

package pipe;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.Random;
import java.util.stream.IntStream;

public class Main {
	public static void main(String[] args) {
		ProcessBuilder pb = new ProcessBuilder("sort");
		try {
			Process process = pb.start();
			try (PrintStream out = new PrintStream(process.getOutputStream())) {
				IntStream.range(0, 10).parallel().forEach(n -> {
					Random rgen = new Random(n);
					for (int i = 0; i < 1000; i++) {
						String str = String.format("%d\t%d", rgen.nextInt(1000), rgen.nextInt(1000));
						out.println(str);
					}
				});
			}
			int count = 0;
			try (BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
				String line;
				while ((line = in.readLine()) != null) {
					System.out.println(line);
					count++;
				}
			}
			System.out.println("lines: "+ count);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

PrintStreamは内部でsyncronizedかけているので、複数プロセスから荒っぽく書き込んでも全く問題ない。さらに、処理がおわったデータを保持し続けなくてもよくて、Javaプロセス上のメモリから一時的に追い出すことができる。(sortはディスクでのソートもサポートしているのでメモリに乗りきらなくてもソートできる。)

また、sortプロセスのOutputStreamを閉じるのは、全データを書き込んでからでないといけない。(全データを書き込む前に閉じると一部データがロストする)上記コードでは、ParallelStreamを使って並列化してるから、すべての書き込みを終えてからsortプロセスのOutputStreamが閉じているが、ForkJoinPoolなどで直接並列化するような場合は、意識的にすべてのスレッドでの書き込みを終えてから(Joinしてから)sortプロセスのOutputStreamを閉じる必要がある。