Tool:
LTFViewerMaven:
<dependency><groupId>org.codehaus.woodstox</groupId>
<artifactId>stax2-api</artifactId>
<version>3.1.2</version>
</dependency>
Java:
public void parseFile(String filename, Long jobId) throws Exception {XMLStreamReader2 xmlr = null;
FileInputStream is = null;
List<Future> trackingTask = null;
try {
XMLInputFactory2 xmlif = ((XMLInputFactory2) XMLInputFactory.newInstance());
xmlif.configureForSpeed();
// Start init executor service
trackingTask = initExecutorService();
is = new FileInputStream(filename);
xmlr = (XMLStreamReader2) xmlif.createXMLStreamReader(is);
// Parse into typed objects
JAXBContext ctx = JAXBContext.newInstance(PGWSSchedulePOJO.class, PGWSChannelPOJO.class...);
Unmarshaller um = ctx.createUnmarshaller();
while (xmlr.hasNext()) {
xmlr.next();
if (xmlr.isStartElement()) {
if ((xmlr.getLocalName().equals("publishedTitles"))) {
NlpgwsTitleInfoPOJO ti = um.unmarshal(xmlr, NlpgwsTitleInfoPOJO.class).getValue();
if (ti != null) {
//Add to queue "parsedTitleInfo"
parsedTitleInfo.add(ti);
}
} else if ((xmlr.getLocalName().equals("providers"))) {
NlpgwsProviderPOJO pr = um.unmarshal(xmlr, NlpgwsProviderPOJO.class).getValue();
if (pr != null) {
//Add to queue "parsedProviders"
parsedProviders.add(pr);
}
}
}
}
} catch (XMLStreamException ex) {
LOGGER.error(ex.getMessage(), ex);
throw ex;
} catch (Exception ex) {
LOGGER.error(ex.getMessage(), ex);
throw ex;
} finally {
if (xmlr != null) {
try {
xmlr.close();
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
}
if (is != null) {
try {
is.close();
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
}
}
// Waiting for all task to completed and shutdown the executor server
try {
if (trackingTask != null) {
for (Future future : trackingTask) {
future.get();
}
}
} catch (Exception ex) {
LOGGER.error(ex.getMessage(), ex);
} finally {
// clean job
}
}
With:
protected static BlockingQueue<NlpgwsProviderPOJO> parsedProviders = new LinkedBlockingDeque<NlpgwsProviderPOJO>();
protected static BlockingQueue<NlpgwsTitleInfoPOJO> parsedTitleInfo = new LinkedBlockingDeque<NlpgwsTitleInfoPOJO>();
Algorithm here:
- Read XML file
- Put data object to a BlockingQueue
- Init Executor -> create thread to scan the Blocking queue to do the business logic
Declare executor:
protected ExecutorService ingestionBatchInsert;
ThreadFactory batchInsertThreadFactory = new ThreadFactoryBuilder().setNameFormat("dls-batch-insert-%d").build();
ingestionBatchInsert = Executors.newFixedThreadPool(totalInsertThread + 1, batchInsertThreadFactory);
Add thread in trackingList:
for (int count = 0; count < totalInsertThread; count++) {
Future task = ingestionBatchInsert.submit(othersBatchInsertThread);
trackingTask.add(task);
}
OthersBatchInsertThread thread:
public class OthersBatchInsertThread implements Runnable {
public void run() {
//Get out object from BlockingQueue
DLSFileParserService.parsedProviders.poll()
//process
}
}
No comments:
Post a Comment