1 package ca.uhn.hl7v2.testpanel.util;
2
3 import java.io.File;
4 import java.io.FileNotFoundException;
5 import java.io.FileReader;
6 import java.io.FileWriter;
7 import java.io.IOException;
8 import java.io.Reader;
9 import java.text.NumberFormat;
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.Comparator;
13 import java.util.Iterator;
14 import java.util.List;
15
16 import ca.uhn.hl7v2.testpanel.util.IProgressCallback.OperationCancelRequestedException;
17 import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator;
18
19
20
21
22 public class Hl7V2FileSorter {
23
24 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(Hl7V2FileSorter.class);
25
26 private boolean myAppendOutputFile;
27 private Comparator<String> myComparator;
28 private File myInputFile;
29 private File myOutputFile;
30 int myMaxiumumFileSize = 5000;
31 private IProgressCallback myProgressCallback;
32
33 public void setAppendOutputFile(boolean theAppend) {
34 myAppendOutputFile = theAppend;
35 }
36
37
38
39
40 public void setComparator(Comparator<String> theComparator) {
41 myComparator = theComparator;
42 }
43
44
45
46
47
48 public void setInputFile(File theInputFile) {
49 myInputFile = theInputFile;
50 }
51
52
53
54
55 public void setOutputFile(File theOutputFile) {
56 myOutputFile = theOutputFile;
57 }
58
59
60
61
62 public void setProgressCallback(IProgressCallback theProgressCallback) {
63 myProgressCallback = theProgressCallback;
64 }
65
66
67
68
69 public void sort() throws Exception {
70
71 myProgressCallback.activityStarted();
72 File inputFile = myInputFile;
73 File outputFile = myOutputFile;
74
75 try {
76 doSort(inputFile, outputFile, 0.0, 1.0);
77 } finally {
78 myProgressCallback.activityStopped();
79 }
80
81 }
82
83 private void doSort(File inputFile, File outputFile, double theProgressStart, double theProgressEnd) throws FileNotFoundException, OperationCancelRequestedException, IOException {
84 double progressHalfWidth = (theProgressEnd - theProgressStart) * 0.5;
85
86 List<File> tempInputFiles = new ArrayList<File>();
87 List<String> currentFileMessages = new ArrayList<String>();
88 Reader reader = new FileReader(inputFile);
89 CharCountingReaderWrapper inputCharCountingReader = new CharCountingReaderWrapper(reader);
90 FileWriter outputWriter = null;
91 try {
92
93 long maximumFileSize = inputFile.length() / 5;
94
95 ourLog.info("Starting to read input file");
96 Hl7InputStreamMessageStringIterator iter = new Hl7InputStreamMessageStringIterator(inputCharCountingReader);
97 int totalNumMessages = 0;
98 int count = 0;
99 long currentFileByteCountStart = 0;
100 while (iter.hasNext()) {
101
102 if (count++ % 100 == 0) {
103 long currentCount = inputCharCountingReader.getCount();
104 long currentTotal = inputFile.length();
105 double currentProgress = theProgressStart + (((double) currentCount) / currentTotal) * progressHalfWidth;
106 myProgressCallback.progressUpdate(currentProgress);
107 }
108
109 currentFileMessages.add(iter.next());
110
111 ourLog.debug("Temp buffer has " + currentFileMessages.size() + " msgs");
112
113 long currentFileByteCount = inputCharCountingReader.getCount() - currentFileByteCountStart;
114 if (currentFileByteCount > maximumFileSize || !iter.hasNext()) {
115
116 totalNumMessages += currentFileMessages.size();
117 ourLog.info("Sorting next batch of messages and writing them to a file, now have " + totalNumMessages + " msgs in " + (tempInputFiles.size() + 1) + " files");
118
119 Collections.sort(currentFileMessages, myComparator);
120
121 File tempFile = File.createTempFile("temp_hl7_sorter", ".hl7");
122 tempFile.deleteOnExit();
123 tempInputFiles.add(tempFile);
124
125 FileWriter writer = new FileWriter(tempFile);
126 for (String message : currentFileMessages) {
127 writer.append(message);
128 writer.append('\r');
129 }
130 writer.close();
131 currentFileMessages.clear();
132
133 ourLog.info("Done writing to temporary file: " + tempFile.getAbsolutePath());
134
135 currentFileByteCountStart = inputCharCountingReader.getCount();
136
137 }
138
139 }
140
141 ourLog.info("Have " + tempInputFiles.size() + " input files, now going to merge-sort them");
142
143
144
145
146
147
148
149 List<PushBackIterator<String>> inputIters = new ArrayList<PushBackIterator<String>>();
150 for (File nextInputFile : tempInputFiles) {
151 inputIters.add(new PushBackIterator<String>(new Hl7InputStreamMessageStringIterator(new FileReader(nextInputFile))));
152 }
153
154 outputWriter = new FileWriter(outputFile, myAppendOutputFile);
155
156 boolean foundAtLeastOneMessage;
157 int totalOutput = 0;
158 do {
159 foundAtLeastOneMessage = false;
160
161
162 ArrayList<String> inputMessages = new ArrayList<String>();
163 for (Iterator<PushBackIterator<String>> iterIter = inputIters.iterator(); iterIter.hasNext();) {
164 PushBackIterator<String> nextIter = iterIter.next();
165 if (nextIter.hasNext()) {
166 inputMessages.add(nextIter.next());
167 foundAtLeastOneMessage = true;
168 }
169 }
170
171 ArrayList<String> sortedInputMessages = new ArrayList<String>(inputMessages);
172 Collections.sort(sortedInputMessages, myComparator);
173
174
175
176
177
178 String nextMessage = sortedInputMessages.size() > 0 ? sortedInputMessages.remove(0) : null;
179 if (nextMessage != null) {
180 outputWriter.append(nextMessage);
181 outputWriter.append('\n');
182 totalOutput++;
183
184 for (PushBackIterator<String> next : inputIters) {
185 if (next.previous() != null && sortedInputMessages.contains(next.previous())) {
186 next.pushBack();
187 }
188 }
189
190 if (totalOutput % 100 == 0) {
191 double percent = (((double) totalOutput / (double) totalNumMessages));
192
193 double progress = theProgressStart + progressHalfWidth + (percent * progressHalfWidth);
194 myProgressCallback.progressUpdate(progress);
195
196 ourLog.info("Now written " + NumberFormat.getPercentInstance().format(percent) + ", " + totalOutput + " messages");
197 }
198
199 }
200
201 } while (foundAtLeastOneMessage);
202
203 } finally {
204 if (outputWriter != null) {
205 outputWriter.close();
206 }
207 inputCharCountingReader.close();
208 for (File next : tempInputFiles) {
209 ourLog.info("Deleting temporary file: {}", next.getName());
210 next.delete();
211 }
212 ourLog.info("Done sorting {} into {}", inputFile.getName(), outputFile.getName());
213 }
214 }
215
216 }