View Javadoc
1   package ca.uhn.hl7v2.testpanel.util;
2   
3   import java.io.File;
4   import java.io.FileNotFoundException;
5   import java.io.FileReader;
6   import java.io.FileWriter;
7   import java.io.IOException;
8   import java.io.Reader;
9   import java.text.NumberFormat;
10  import java.util.ArrayList;
11  import java.util.Collections;
12  import java.util.Comparator;
13  import java.util.Iterator;
14  import java.util.List;
15  
16  import ca.uhn.hl7v2.testpanel.util.IProgressCallback.OperationCancelRequestedException;
17  import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator;
18  
19  /**
20   * Sorts an entire file of HL7 v2 messages
21   */
22  public class Hl7V2FileSorter {
23  
24  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(Hl7V2FileSorter.class);
25  
26  	private boolean myAppendOutputFile;
27  	private Comparator<String> myComparator;
28  	private File myInputFile;
29  	private File myOutputFile;
30  	int myMaxiumumFileSize = 5000;
31  	private IProgressCallback myProgressCallback;
32  	
33  	public void setAppendOutputFile(boolean theAppend) {
34  		myAppendOutputFile = theAppend;
35  	}
36  
37  	/**
38  	 * @param theComparator the comparator to set
39  	 */
40  	public void setComparator(Comparator<String> theComparator) {
41  		myComparator = theComparator;
42  	}
43  
44  	/**
45  	 * @param theInputFile
46  	 *            the inputFile to set
47  	 */
48  	public void setInputFile(File theInputFile) {
49  		myInputFile = theInputFile;
50  	}
51  
52  	/**
53  	 * @param theOutputFile the outputFile to set
54  	 */
55  	public void setOutputFile(File theOutputFile) {
56  		myOutputFile = theOutputFile;
57  	}
58  
59  	/**
60  	 * @param theProgressCallback the progressCallback to set
61  	 */
62  	public void setProgressCallback(IProgressCallback theProgressCallback) {
63  		myProgressCallback = theProgressCallback;
64  	}
65  
66  	/**
67  	 * Performs the actual sort
68  	 */
69  	public void sort() throws Exception {
70  
71  		myProgressCallback.activityStarted();
72  		File inputFile = myInputFile;
73  		File outputFile = myOutputFile;
74  
75  		try {
76  			doSort(inputFile, outputFile, 0.0, 1.0);
77  		} finally {
78  			myProgressCallback.activityStopped();
79  		}
80  		
81  	}
82  
83  	private void doSort(File inputFile, File outputFile, double theProgressStart, double theProgressEnd) throws FileNotFoundException, OperationCancelRequestedException, IOException {
84  		double progressHalfWidth = (theProgressEnd - theProgressStart) * 0.5;
85  
86  		List<File> tempInputFiles = new ArrayList<File>();
87  		List<String> currentFileMessages = new ArrayList<String>();
88  		Reader reader = new FileReader(inputFile);
89  		CharCountingReaderWrapper inputCharCountingReader = new CharCountingReaderWrapper(reader);
90  		FileWriter outputWriter = null;
91  		try {
92  
93  			long maximumFileSize = inputFile.length() / 5;
94  			
95  			ourLog.info("Starting to read input file");
96  			Hl7InputStreamMessageStringIterator iter = new Hl7InputStreamMessageStringIterator(inputCharCountingReader);
97  			int totalNumMessages = 0;
98  			int count = 0;
99  			long currentFileByteCountStart = 0;
100 			while (iter.hasNext()) {
101 
102 				if (count++ % 100 == 0) {
103 					long currentCount = inputCharCountingReader.getCount();
104 					long currentTotal = inputFile.length();
105 					double currentProgress = theProgressStart + (((double) currentCount) / currentTotal) * progressHalfWidth;
106 					myProgressCallback.progressUpdate(currentProgress);
107 				}
108 
109 				currentFileMessages.add(iter.next());
110 
111 				ourLog.debug("Temp buffer has " + currentFileMessages.size() + " msgs");
112 
113 				long currentFileByteCount = inputCharCountingReader.getCount() - currentFileByteCountStart;
114 				if (currentFileByteCount > maximumFileSize || !iter.hasNext()) {
115 
116 					totalNumMessages += currentFileMessages.size();
117 					ourLog.info("Sorting next batch of messages and writing them to a file, now have " + totalNumMessages + " msgs in " + (tempInputFiles.size() + 1) + " files");
118 
119 					Collections.sort(currentFileMessages, myComparator);
120 
121 					File tempFile = File.createTempFile("temp_hl7_sorter", ".hl7");
122 					tempFile.deleteOnExit();
123 					tempInputFiles.add(tempFile);
124 
125 					FileWriter writer = new FileWriter(tempFile);
126 					for (String message : currentFileMessages) {
127 						writer.append(message);
128 						writer.append('\r');
129 					}
130 					writer.close();
131 					currentFileMessages.clear();
132 
133 					ourLog.info("Done writing to temporary file: " + tempFile.getAbsolutePath());
134 
135 					currentFileByteCountStart = inputCharCountingReader.getCount();
136 					
137 				}
138 
139 			}
140 
141 			ourLog.info("Have " + tempInputFiles.size() + " input files, now going to merge-sort them");
142 
143 			/*
144 			 * We now have a bunch of split up files which contain a portion of
145 			 * the input messages. These files have been sorted within
146 			 * themselves, but obviously not yet sorted relative to each other.
147 			 */
148 
149 			List<PushBackIterator<String>> inputIters = new ArrayList<PushBackIterator<String>>();
150 			for (File nextInputFile : tempInputFiles) {
151 				inputIters.add(new PushBackIterator<String>(new Hl7InputStreamMessageStringIterator(new FileReader(nextInputFile))));
152 			}
153 
154 			outputWriter = new FileWriter(outputFile, myAppendOutputFile);
155 
156 			boolean foundAtLeastOneMessage;
157 			int totalOutput = 0;
158 			do {
159 				foundAtLeastOneMessage = false;
160 
161 				// Get one message from each file
162 				ArrayList<String> inputMessages = new ArrayList<String>();
163 				for (Iterator<PushBackIterator<String>> iterIter = inputIters.iterator(); iterIter.hasNext();) {
164 					PushBackIterator<String> nextIter = iterIter.next();
165 					if (nextIter.hasNext()) {
166 						inputMessages.add(nextIter.next());
167 						foundAtLeastOneMessage = true;
168 					}
169 				}
170 
171 				ArrayList<String> sortedInputMessages = new ArrayList<String>(inputMessages);
172 				Collections.sort(sortedInputMessages, myComparator);
173 
174 				// Only write the message from the file with the message which
175 				// was
176 				// first overall
177 				// The rest get pushed back
178 				String nextMessage = sortedInputMessages.size() > 0 ? sortedInputMessages.remove(0) : null;
179 				if (nextMessage != null) {
180 					outputWriter.append(nextMessage);
181 					outputWriter.append('\n');
182 					totalOutput++;
183 
184 					for (PushBackIterator<String> next : inputIters) {
185 						if (next.previous() != null && sortedInputMessages.contains(next.previous())) {
186 							next.pushBack();
187 						}
188 					}
189 
190 					if (totalOutput % 100 == 0) {
191 						double percent = (((double) totalOutput / (double) totalNumMessages));
192 						
193 						double progress = theProgressStart + progressHalfWidth + (percent * progressHalfWidth);
194 						myProgressCallback.progressUpdate(progress);
195 						
196 						ourLog.info("Now written " + NumberFormat.getPercentInstance().format(percent) + ", " + totalOutput + " messages");
197 					}
198 
199 				}
200 
201 			} while (foundAtLeastOneMessage);
202 
203 		} finally {
204 			if (outputWriter != null) {
205 				outputWriter.close();
206 			}
207 			inputCharCountingReader.close();
208 			for (File next : tempInputFiles) {
209 				ourLog.info("Deleting temporary file: {}", next.getName());
210 				next.delete();
211 			}
212 			ourLog.info("Done sorting {} into {}", inputFile.getName(), outputFile.getName());
213 		}
214 	}
215 
216 }