Thursday, September 17, 2015

ReactiveX in Java

Dependency:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.0.14</version>
</dependency>

Code:

Observable:

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

Observable<ChannelItemType> myObservable = Observable.create(
        new Observable.OnSubscribe<ChannelItemType>() {
            @Override
            public void call(Subscriber<? super ChannelItemType> sub) {
                sub.onNext(item);
                sub.onCompleted();
            }
        }
).subscribeOn(Schedulers.from(executor));
myObservable.subscribe(new ChannelConsumer(channelRepo));

Above snippet, we create an Observable, sent item on onNext() function to Consumer (in this case ChannelConsumer). One interesting thing here is we thread consuming by using executor. We create 3 thread in executor. Obervable automatically pick thread in executor and send data.

Consumer:
public class ChannelConsumer extends Subscriber<ChannelItemType> {
    @Override
    public void onCompleted() {
        //Clean up your process when complete
    }
    @Override
    public void onError(Throwable throwable) {

    }
    @Override
    public void onNext(ChannelItemType channelItem) {
        //put your code here
    }
}

Thursday, August 20, 2015

Monday, August 17, 2015

Make Virtualbox display full Linux client


References:
https://help.ubuntu.com/community/VirtualBox/GuestAdditions
http://www.binarytides.com/vbox-guest-additions-ubuntu-14-04/

Also:  Try going to Software and Updates -> Additional Drivers and choose: Use x86 visualization solution

After all remember choosing: View -> Auto-resize Guest Display

Batch insert with JDBC and Spring

JDBC:

String sql = "insert into employee (name, city, phone) values (?, ?, ?)";
Connection connection = new getConnection();
PreparedStatement ps = connection.prepareStatement(sql);
final int batchSize = 1000;
int count = 0;
for (Employee employee: employees) {
     ps.setString(1, employee.getName());
     ps.setString(2, employee.getCity());
     ps.setString(3, employee.getPhone());
     ps.addBatch();

     if(++count % batchSize == 0) {
          ps.executeBatch();
     }
}
ps.executeBatch(); // insert remaining records

ps.close();
connection.close();

Spring Data:

public class EmployeeRepository extends JdbcDaoSupport { 

    public void insertEmployeeBatch(List<Employee> employees) {

        try {
            String sql = "
insert into employee (name, city, phone) values (?, ?, ?)";

            getJdbcTemplate().batchUpdate(sql, new BatchPreparedStatementSetter() {
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                     Employee employee = employees.get(i);                    

                     ps.setString(1, employee.getName());
                     ps.setString(2, employee.getCity());
                     ps.setString(3, employee.getPhone());
                }

                public int getBatchSize() {
                    return
employees.size();
                }
            });
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
        }
    } 

}

Thursday, August 13, 2015

Read big XML file in Java

Tool:

LTFViewer

Maven:

<dependency>
        <groupId>org.codehaus.woodstox</groupId>
        <artifactId>stax2-api</artifactId>
        <version>3.1.2</version>
</dependency>

Java:

public void parseFile(String filename, Long jobId) throws Exception {
        XMLStreamReader2 xmlr = null;
        FileInputStream is = null;
        List<Future> trackingTask = null;

        try {
            XMLInputFactory2 xmlif = ((XMLInputFactory2) XMLInputFactory.newInstance());
            xmlif.configureForSpeed();

            // Start init executor service
            trackingTask = initExecutorService();

            is = new FileInputStream(filename);
            xmlr = (XMLStreamReader2) xmlif.createXMLStreamReader(is);

            // Parse into typed objects
            JAXBContext ctx = JAXBContext.newInstance(PGWSSchedulePOJO.class, PGWSChannelPOJO.class...);
            Unmarshaller um = ctx.createUnmarshaller();
            while (xmlr.hasNext()) {
                xmlr.next();
                if (xmlr.isStartElement()) {
                    if ((xmlr.getLocalName().equals("publishedTitles"))) {
                        NlpgwsTitleInfoPOJO ti = um.unmarshal(xmlr, NlpgwsTitleInfoPOJO.class).getValue();
                        if (ti != null) {
                            //Add to queue "parsedTitleInfo"
                            parsedTitleInfo.add(ti);
                        }
                    } else if ((xmlr.getLocalName().equals("providers"))) {
                        NlpgwsProviderPOJO pr = um.unmarshal(xmlr, NlpgwsProviderPOJO.class).getValue();
                        if (pr != null) {
                            //Add to queue "parsedProviders"
                            parsedProviders.add(pr);
                        }
                    }
                }
            }

        } catch (XMLStreamException ex) {
            LOGGER.error(ex.getMessage(), ex);
            throw ex;
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
            throw ex;
        } finally {
            if (xmlr != null) {
                try {
                    xmlr.close();
                } catch (Exception ex) {
                    LOGGER.error(ex.getMessage());
                }
            }

            if (is != null) {
                try {
                    is.close();
                } catch (Exception ex) {
                    LOGGER.error(ex.getMessage());
                }
            }
        }

        // Waiting for all task to completed and shutdown the executor server
        try {
            if (trackingTask != null) {
                for (Future future : trackingTask) {
                    future.get();
                }
            }
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
        } finally {
            // clean job
        }
    }

With:
protected static BlockingQueue<NlpgwsProviderPOJO> parsedProviders = new LinkedBlockingDeque<NlpgwsProviderPOJO>();
    protected static BlockingQueue<NlpgwsTitleInfoPOJO> parsedTitleInfo = new LinkedBlockingDeque<NlpgwsTitleInfoPOJO>();

Algorithm here:
- Read XML file
- Put data object to a BlockingQueue
- Init Executor -> create thread to scan the Blocking queue to do the business logic

Declare executor:
protected ExecutorService ingestionBatchInsert;
ThreadFactory batchInsertThreadFactory = new ThreadFactoryBuilder().setNameFormat("dls-batch-insert-%d").build();
ingestionBatchInsert = Executors.newFixedThreadPool(totalInsertThread + 1, batchInsertThreadFactory);

Add thread in trackingList:
for (int count = 0; count < totalInsertThread; count++) {
      Future task = ingestionBatchInsert.submit(othersBatchInsertThread);
      trackingTask.add(task);
}

OthersBatchInsertThread thread:
public class OthersBatchInsertThread implements Runnable {
        public void run() {
              //Get out object from BlockingQueue
             DLSFileParserService.parsedProviders.poll()
             //process
        }
}

Check file is open (lock) or not in Java

RandomAccessFile rf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = rf.getChannel();
FileLock lock = null;
try {
        // let us try to get a lock. If file already has an exclusive lock by another process
        LOGGER.info("Trying to acquire lock");
        lock = fileChannel.tryLock();
        if (lock != null) {
              success = true;
        }
} catch (Exception ex) {
         LOGGER.error(ex.getMessage());
} finally {
         if (lock != null) {
               lock.release();
         }
         if(fileChannel != null){
               fileChannel.close();
         }
          if(rf != null){
               rf.close();
          }
}

Observe folder to pick file when it's available


public class MMTServerStartListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    YourMonitorListenerImpl fileMonitor;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent context) {
        try {
            String filePath = "<file path>";
            startMonitor(filePath, fileMonitor);
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
        }
    }

    private void startMonitor(String filePath, YourMonitorListenerImpl fileMonitorImpl) {
        try {
            if (filePath != null && filePath.length() > 0) {

                final File directory = new File(filePath.trim());
                FileAlterationObserver fao = new FileAlterationObserver(directory);
                fao.addListener(fileMonitorImpl);

                final FileAlterationMonitor monitor = new FileAlterationMonitor();
                monitor.addObserver(fao);

                LOGGER.info("Starting monitor. CTRL+C to stop.");
                monitor.start();

                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    public void run() {
                        try {
                            LOGGER.info("Stopping monitor.");
                            monitor.stop();
                        } catch (Exception ignored) {
                            LOGGER.error(ignored.getMessage(), ignored);
                        }
                    }
                }));
            } else {
                LOGGER.error("Invalid input the monitor folder");
            }
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
        }
    }
}

With "YourMonitorListenerImpl" implements "FileAlterationListener" interface.