大規模データを扱う場合に限るけど、sortコマンドを使うと、データを処理するプロセスとソートするプロセスを分離できて便利。大体はシェルスクリプトでつなげば済むけど、複雑なデータ処理をしたい場合などは、Javaからsortしたいときもあったりする。
まずはシングルスレッドで。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.util.Random; public class Main { public static void main(String[] args) { Random rgen = new Random(); ProcessBuilder pb = new ProcessBuilder("sort", "-nu", "-k2"); try { Process process = pb.start(); try (PrintStream out = new PrintStream(process.getOutputStream())) { for (int i = 0; i < 10000; i++) { String str = String.format("%d\t%d", rgen.nextInt(1000), rgen.nextInt(1000)); out.println(str); } } int count = 0; try (BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = in.readLine()) != null) { System.out.println(line); count++; } } System.out.println("lines: "+ count); } catch (IOException e) { e.printStackTrace(); } } }
これだけだと、Arrays.sortとかCollections.sort使えばよいわけで、わざわざ外部プロセスを使うメリットは少ない。そこで次に、マルチスレッドで書き込んでみる。
package pipe; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.util.Random; import java.util.stream.IntStream; public class Main { public static void main(String[] args) { ProcessBuilder pb = new ProcessBuilder("sort"); try { Process process = pb.start(); try (PrintStream out = new PrintStream(process.getOutputStream())) { IntStream.range(0, 10).parallel().forEach(n -> { Random rgen = new Random(n); for (int i = 0; i < 1000; i++) { String str = String.format("%d\t%d", rgen.nextInt(1000), rgen.nextInt(1000)); out.println(str); } }); } int count = 0; try (BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = in.readLine()) != null) { System.out.println(line); count++; } } System.out.println("lines: "+ count); } catch (IOException e) { e.printStackTrace(); } } }
PrintStreamは内部でsyncronizedかけているので、複数プロセスから荒っぽく書き込んでも全く問題ない。さらに、処理がおわったデータを保持し続けなくてもよくて、Javaプロセス上のメモリから一時的に追い出すことができる。(sortはディスクでのソートもサポートしているのでメモリに乗りきらなくてもソートできる。)
また、sortプロセスのOutputStreamを閉じるのは、全データを書き込んでからでないといけない。(全データを書き込む前に閉じると一部データがロストする)上記コードでは、ParallelStreamを使って並列化してるから、すべての書き込みを終えてからsortプロセスのOutputStreamが閉じているが、ForkJoinPoolなどで直接並列化するような場合は、意識的にすべてのスレッドでの書き込みを終えてから(Joinしてから)sortプロセスのOutputStreamを閉じる必要がある。