Thursday, August 13, 2015

Read big XML file in Java

Tool:

LTFViewer

Maven:

<dependency>
        <groupId>org.codehaus.woodstox</groupId>
        <artifactId>stax2-api</artifactId>
        <version>3.1.2</version>
</dependency>

Java:

public void parseFile(String filename, Long jobId) throws Exception {
        XMLStreamReader2 xmlr = null;
        FileInputStream is = null;
        List<Future> trackingTask = null;

        try {
            XMLInputFactory2 xmlif = ((XMLInputFactory2) XMLInputFactory.newInstance());
            xmlif.configureForSpeed();

            // Start init executor service
            trackingTask = initExecutorService();

            is = new FileInputStream(filename);
            xmlr = (XMLStreamReader2) xmlif.createXMLStreamReader(is);

            // Parse into typed objects
            JAXBContext ctx = JAXBContext.newInstance(PGWSSchedulePOJO.class, PGWSChannelPOJO.class...);
            Unmarshaller um = ctx.createUnmarshaller();
            while (xmlr.hasNext()) {
                xmlr.next();
                if (xmlr.isStartElement()) {
                    if ((xmlr.getLocalName().equals("publishedTitles"))) {
                        NlpgwsTitleInfoPOJO ti = um.unmarshal(xmlr, NlpgwsTitleInfoPOJO.class).getValue();
                        if (ti != null) {
                            //Add to queue "parsedTitleInfo"
                            parsedTitleInfo.add(ti);
                        }
                    } else if ((xmlr.getLocalName().equals("providers"))) {
                        NlpgwsProviderPOJO pr = um.unmarshal(xmlr, NlpgwsProviderPOJO.class).getValue();
                        if (pr != null) {
                            //Add to queue "parsedProviders"
                            parsedProviders.add(pr);
                        }
                    }
                }
            }

        } catch (XMLStreamException ex) {
            LOGGER.error(ex.getMessage(), ex);
            throw ex;
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
            throw ex;
        } finally {
            if (xmlr != null) {
                try {
                    xmlr.close();
                } catch (Exception ex) {
                    LOGGER.error(ex.getMessage());
                }
            }

            if (is != null) {
                try {
                    is.close();
                } catch (Exception ex) {
                    LOGGER.error(ex.getMessage());
                }
            }
        }

        // Waiting for all task to completed and shutdown the executor server
        try {
            if (trackingTask != null) {
                for (Future future : trackingTask) {
                    future.get();
                }
            }
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
        } finally {
            // clean job
        }
    }

With:
protected static BlockingQueue<NlpgwsProviderPOJO> parsedProviders = new LinkedBlockingDeque<NlpgwsProviderPOJO>();
    protected static BlockingQueue<NlpgwsTitleInfoPOJO> parsedTitleInfo = new LinkedBlockingDeque<NlpgwsTitleInfoPOJO>();

Algorithm here:
- Read XML file
- Put data object to a BlockingQueue
- Init Executor -> create thread to scan the Blocking queue to do the business logic

Declare executor:
protected ExecutorService ingestionBatchInsert;
ThreadFactory batchInsertThreadFactory = new ThreadFactoryBuilder().setNameFormat("dls-batch-insert-%d").build();
ingestionBatchInsert = Executors.newFixedThreadPool(totalInsertThread + 1, batchInsertThreadFactory);

Add thread in trackingList:
for (int count = 0; count < totalInsertThread; count++) {
      Future task = ingestionBatchInsert.submit(othersBatchInsertThread);
      trackingTask.add(task);
}

OthersBatchInsertThread thread:
public class OthersBatchInsertThread implements Runnable {
        public void run() {
              //Get out object from BlockingQueue
             DLSFileParserService.parsedProviders.poll()
             //process
        }
}

Check file is open (lock) or not in Java

RandomAccessFile rf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = rf.getChannel();
FileLock lock = null;
try {
        // let us try to get a lock. If file already has an exclusive lock by another process
        LOGGER.info("Trying to acquire lock");
        lock = fileChannel.tryLock();
        if (lock != null) {
              success = true;
        }
} catch (Exception ex) {
         LOGGER.error(ex.getMessage());
} finally {
         if (lock != null) {
               lock.release();
         }
         if(fileChannel != null){
               fileChannel.close();
         }
          if(rf != null){
               rf.close();
          }
}

Observe folder to pick file when it's available


public class MMTServerStartListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    YourMonitorListenerImpl fileMonitor;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent context) {
        try {
            String filePath = "<file path>";
            startMonitor(filePath, fileMonitor);
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
        }
    }

    private void startMonitor(String filePath, YourMonitorListenerImpl fileMonitorImpl) {
        try {
            if (filePath != null && filePath.length() > 0) {

                final File directory = new File(filePath.trim());
                FileAlterationObserver fao = new FileAlterationObserver(directory);
                fao.addListener(fileMonitorImpl);

                final FileAlterationMonitor monitor = new FileAlterationMonitor();
                monitor.addObserver(fao);

                LOGGER.info("Starting monitor. CTRL+C to stop.");
                monitor.start();

                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    public void run() {
                        try {
                            LOGGER.info("Stopping monitor.");
                            monitor.stop();
                        } catch (Exception ignored) {
                            LOGGER.error(ignored.getMessage(), ignored);
                        }
                    }
                }));
            } else {
                LOGGER.error("Invalid input the monitor folder");
            }
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
        }
    }
}

With "YourMonitorListenerImpl" implements "FileAlterationListener" interface.

Wednesday, August 12, 2015

Thread with ExecutorService

ThreadFactory buildCache = new ThreadFactoryBuilder().setNameFormat("thread-name-%d").build();
ExecutorService executorService = Executors.newFixedThreadPool(totalThreads, buildCache);

List<Future> trackingTask = new ArrayList<Future>();

for (int index = 0; index < totalThreads; index++) {
        trackingTask.add(executorService.submit(new Runnable() {
              @Override
               public void run() {
                      //run your code
               }
        }));
}

// Run and waiting for task to finish
for (Future task : trackingTask) {
        task.get();
}

// Shutdown executor:
executorService.shutdownNow();

Using Redis cache

Installation:

Follow the instruction at: Install Redis cache in linux - DigitalOcean

Using Redis cache:

Create bean:
<bean id="jedisConnFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:hostName="${atm.cache.redis.host}"
          p:port="${atm.cache.redis.port}"
          p:poolConfig-ref="jedisPoolConfig"
          p:usePool="true"/>
 <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connectionFactory-ref="jedisConnFactory" p:keySerializer-ref="stringRedisSerializer"/>

 Autowired in java implement:
@Autowired
private RedisTemplate<String, ProgramBasicInfo> programRedisTemplate;

Delete all cache: (in this case, it get connection and then flush all data)
programRedisTemplate.getConnectionFactory().getConnection().flushAll();

Delete cache:
programRedisTemplate.opsForHash().getOperations()
                                    .delete(Constants.CACHE_PROGRAM_GUIDE_KEY);

Put data to cache:
programRedisTemplate.opsForHash().put(Constants.CACHE_PROGRAM_GUIDE_KEY, program.getProgramId(),       programBasicInfo);

Get data from cache:
(ProgramBasicInfo) programRedisTemplate.opsForHash().get(Constants.CACHE_PROGRAM_GUIDE_KEY, programId);

References:

http://blog.joshuawhite.com/java/caching-with-spring-data-redis/
http://caseyscarborough.com/blog/2014/12/18/caching-data-in-spring-using-redis/


Tuesday, August 11, 2015

Synchronize SimpleDateFormat object in Java

SimpleDateFormat object does not work properly in a multi threaded environment. It may output a wrong date when parsing. So the safest way is to synchronize it.

private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
public String formatDate(Date d) {
    synchronized(sdf) {
        return sdf.format(d);
    }
}
 
Hope that help.

Sunday, August 9, 2015

Check if a class is loaded and lib file location in JVM

Example below is to see if JVM using ojdbc6 version 11 or version 12.

final ClassLoader loader = Thread.currentThread().getContextClassLoader();
ClassPath clazzPath = ClassPath.from(loader);
Set<ClassInfo> classes = clazzPath.getTopLevelClasses();
for (final ClassPath.ClassInfo classInfo : classes) {
    if (classInfo.getName().startsWith("oracle")) {
         if (classInfo.getName().contains("oracle.jdbc.babelfish")) {
                 isVersion12 = true;
         }

    }
}
            

if(isVersion12) {
        LOGGER.info("Ojdbc version 12");

} else {
        LOGGER.info("Ojdbc version 11");

}

Class klass = OracleConnection.class;
URL location = klass.getResource('/' + klass.getName().replace('.',  '/') + ".class");
LOGGER.info(location.toString());