Tuesday, March 1, 2011

Callable,Futures,Executors and others...

In this posting we will go through how unbounded thread creation lead us to designing thread pool which is enhanced further by java.util.concurrent package  by providing Executor,ExecutorService,Callable ,Futures and others.

Problem with unbounded thread creation

We all would have observed that while creating threads we rarely think about thread creation,managebility and others.Most of the time our energy or thought process goes towards synchronization and preventing concurrent modification of state data.

Imagine a use case where you have created a task where as soon as salary comes in your bank account,you want to pay your electricity bills,car emi bills,home loan and others.One way is to serially execute the payment transactions one by one.Other way is to spawn thread for each activity so that there is no dependency and failure in one transaction should not effect others in the queue.So we could end up in code as follows:-

PaymentProcessor.java  having methods for doing payment
package com.kunaal.parallel.payment;

import com.kunaal.parallel.domain.PaymentInfo;

/**
 * Payment processor class which has logic for doing payment
 * Currently it just have sysout,can be plugged with implemenation 
 * for real scenarios 
 * @author Kunaal A Trehan
 *
 */
public class PaymentProcessor {
 /**
  * Thread Local variable containing payment information details
  */
 private static ThreadLocal<PaymentInfo> PAYMENT_INFO=new ThreadLocal<PaymentInfo>(){
   protected PaymentInfo initialValue() {
          return new PaymentInfo();
      }
 };
 
 /**
  * Setter method for paymentInfo
  * @param pymtInfo
  */
 public void setPaymentInfo(PaymentInfo pymtInfo){
  PAYMENT_INFO.set(pymtInfo);
 }
 
 /**
  * Cleanup paymentInfo thread local variable
  */
 public void removePaymentInfo(){
  PAYMENT_INFO.remove();
 }

 /**
  * Dummy implementation for making payment
  */
 public void makePayment(){
  System.out.println("Payment details are:- "+ PAYMENT_INFO.get());
 }
 
 /**
  * Dummy implementation for making payment and getting acknowledgement 
  * for the same.
  * @return
  */
 public String makePaymentGetAck(){
  PaymentInfo paymentInfo = PAYMENT_INFO.get();
  StringBuffer buffer=new StringBuffer();
  buffer.append("ACK-")
   .append(paymentInfo.getAcctNum())
   .append("-")
   .append(paymentInfo.getAmt())
   .append("-")
   .append("Y");
  return buffer.toString();
 }
 

 /**
  * Default Constructor
  */
 public PaymentProcessor() {
  super();
 }
}


RunImpl.java setting threadLocal  variable containing payment information and invoking payment processor
package com.kunaal.parallel.runnable;

import com.kunaal.parallel.Constants;
import com.kunaal.parallel.domain.PaymentInfo;
import com.kunaal.parallel.payment.PaymentProcessor;

/**
 * Run implementation for thread
 * @author Kunaal A Trehan
 *
 */
public class RunImpl implements Runnable {
 
 private PaymentProcessor pymtProcessor;

 /* 
  * Run implementation which creates a payment information details
  * depending upon thread name.
  */
 @Override
 public void run() {
  String name = Thread.currentThread().getName();
  PaymentInfo pymtInfo=null;
  
  if (name.equals(Constants.CAR_LOAN_EMI)){
   pymtInfo=new PaymentInfo(new Double(650), "Acct-1", "Bank-1", "City-1");
  }else if (name.equals(Constants.ELEC_BILL)){
   pymtInfo=new PaymentInfo(new Double(50), "Acct-2", "Bank-2", "City-2");
  }else if (name.equals(Constants.EXECUTIVE_MBA)){
   pymtInfo=new PaymentInfo(new Double(150), "Acct-3", "Bank-3", "City-3");
  }else if (name.equals(Constants.HOME_LOAN_EMI)){
   pymtInfo=new PaymentInfo(new Double(1050), "Acct-4", "Bank-4", "City-4");
  }else if (name.equals(Constants.KID_SCHOOL_FEES)){
   pymtInfo=new PaymentInfo(new Double(70), "Acct-5", "Bank-5", "City-5");
  }else if (name.equals(Constants.WATER_BILL)){
   pymtInfo=new PaymentInfo(new Double(30), "Acct-6", "Bank-6", "City-6");
  }else if (name.equals(Constants.FUND_FOR_OLD_AGE_HOME)){
   pymtInfo=new PaymentInfo(new Double(20), "Acct-7", "Bank-7", "City-7");
  }else if (name.equals(Constants.FUND_FOR_ORPHANAGE)){
   pymtInfo=new PaymentInfo(new Double(30), "Acct-8", "Bank-8", "City-8");
  }else{
   System.out.println("Nothing to pay-no info provided");
  }
  
  //Here we do thread local setting into payment processor
  //and after performing the activity 
  //We clear the thread local variable
  pymtProcessor.setPaymentInfo(pymtInfo);
  pymtProcessor.makePayment();
  pymtProcessor.removePaymentInfo();
  
  try {
   Thread.currentThread().sleep(10*1000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }

 /**
  * @param pymtProcessor
  */
 public RunImpl(PaymentProcessor pymtProcessor) {
  super();
  this.pymtProcessor = pymtProcessor;
 }

 /**
  * @return the pymtProcessor
  */
 public PaymentProcessor getPymtProcessor() {
  return pymtProcessor;
 }

 /**
  * @param pymtProcessor the pymtProcessor to set
  */
 public void setPymtProcessor(PaymentProcessor pymtProcessor) {
  this.pymtProcessor = pymtProcessor;
 }

}

ThreadExample.java spawning new thread for each payment instruction
package com.kunaal.parallel;

import com.kunaal.parallel.payment.PaymentProcessor;
import com.kunaal.parallel.runnable.RunImpl;

/**
 * Code demonstrating thread spawned per activity use case 
 * There is a thread spawned for paying 
 * -Car Loan 
 * -Electricity Bill
 * -Executive MBA
 * -Old Age home fund
 * -Orphange fund
 * -Home loan fund
 * -Kid school fees
 * -Water bill 
 * 
 * @author Kunaal A Trehan
 *
 */
public class ThreadExample {

 /**
  * @param args
  */
 public static void main(String[] args) {
  PaymentProcessor pymtProcessor=new PaymentProcessor();
  RunImpl runImpl=new RunImpl(pymtProcessor);
  
  Thread carLoanThread=new Thread(runImpl,Constants.CAR_LOAN_EMI);
  Thread elecBillThread=new Thread(runImpl,Constants.ELEC_BILL);
  Thread executiveMbaThread=new Thread(runImpl,Constants.EXECUTIVE_MBA);
  Thread oldAgeHomeThread=new Thread(runImpl,Constants.FUND_FOR_OLD_AGE_HOME);
  Thread orphanageThread=new Thread(runImpl,Constants.FUND_FOR_ORPHANAGE);
  Thread homeLoanThread=new Thread(runImpl,Constants.HOME_LOAN_EMI);
  Thread kidFeesThread=new Thread(runImpl,Constants.KID_SCHOOL_FEES);
  Thread waterThread=new Thread(runImpl,Constants.WATER_BILL);
  
  long startTime = System.currentTimeMillis();
  System.out.println("Before starting the threads:-" + startTime);
  
  carLoanThread.start();
  elecBillThread.start();
  executiveMbaThread.start();
  oldAgeHomeThread.start();
  orphanageThread.start();
  homeLoanThread.start();
  kidFeesThread.start();
  waterThread.start();
  
  long timeAfterStarting = System.currentTimeMillis();
  
  System.out.println("After starting the threads:-" + timeAfterStarting);
  try {
   carLoanThread.join();
   elecBillThread.join();
   executiveMbaThread.join();
   oldAgeHomeThread.join();
   orphanageThread.join();
   homeLoanThread.join();
   kidFeesThread.join();
   waterThread.join();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  long timeEnd = System.currentTimeMillis();
  
  System.out.println("Time taken for threads to finish-"+ ((timeEnd-timeAfterStarting)/1000) + " seconds");
 }
}

Constants.java
package com.kunaal.parallel;

/**
 * Constants class
 * @author Kunaal A Trehan
 *
 */
public class Constants {
 
 public static final String ELEC_BILL="Electricity Bill";
 
 public static final String WATER_BILL="Water Bill";
 
 public static final String KID_SCHOOL_FEES="Kid School Fees";
 
 public static final String HOME_LOAN_EMI="Home Loan EMI";
 
 public static final String CAR_LOAN_EMI="Car Loan EMI";
 
 public static final String EXECUTIVE_MBA="Executive MBA fees";
 
 public static final String FUND_FOR_ORPHANAGE="Orphanage fund";
 
 public static final String FUND_FOR_OLD_AGE_HOME="Old Age Home fund";

}

Results of ThreadExample.java
Before starting the threads:-1298979143734
After starting the threads:-1298979143734
Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-8, bankName=Bank-8, city=City-8]
Payment details are:- PaymentInfo [amt=150.0, acctNum=Acct-3, bankName=Bank-3, city=City-3]
Payment details are:- PaymentInfo [amt=650.0, acctNum=Acct-1, bankName=Bank-1, city=City-1]
Payment details are:- PaymentInfo [amt=50.0, acctNum=Acct-2, bankName=Bank-2, city=City-2]
Payment details are:- PaymentInfo [amt=70.0, acctNum=Acct-5, bankName=Bank-5, city=City-5]
Payment details are:- PaymentInfo [amt=1050.0, acctNum=Acct-4, bankName=Bank-4, city=City-4]
Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-6, bankName=Bank-6, city=City-6]
Payment details are:- PaymentInfo [amt=20.0, acctNum=Acct-7, bankName=Bank-7, city=City-7]
Time taken for threads to finish-10 seconds


Above example shows thread per activity,it looks nice.However it has following issues.These are:-
  • For each activity we are creating a thread.Creation of thread is expensive and lets suppose there are 100 activities we will end up having 100 threads.Context switching between them and manging so many threads could be problematic.
  • Main thread has to keep track of threads spawned and as we have shown in above example.We have to keep some mechanism(join as shown above) to make sure each and every thread has finished the job assigned to it before main thread exits.
  • Moreover lets suppose you want to fetch result from each activity which thread does and perform some computation on results .It would be quite difficult  and messy as run() method does not return any thing.We have to plug some code which may not be clean.

ThreadPool
So we have seen what unbounded thread creation can do and what are its drawbacks.So it takes us to threadpool.

Threadpool is a collection of threads which are provided chunks of work to perform.Since threads are already initialized so thread creation is avoided.Moreover once the thread has finished the task assigned to it.It can pick other task from the queue.How many threads should be there in a thread pool depends upon the type of task and load that particular thread pool needs to cater.

Prior to jdk1.5 developers used to code thread pool implementations.However with the introduction of java.util.concurrent package ,Java has  provided  utility classes for thread pool implementation which will be discussed in next section.

Executor,ExecutorService and Executors

Executor is an interface having one method for executing runnables.

ExecutorService  is the sub interface of executor having methods for shutdown,submitting callable,invoking collection of callables and others.

Executors is the utility class having factory methods for thread pool,executor service and others.

Lets take the same use case involving threadpool.

 ThreadPoolExecutorEx.java using executors to create the thread pool and passing runnable implementations to the thread pool
package com.kunaal.parallel;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.kunaal.parallel.payment.PaymentProcessor;
import com.kunaal.parallel.runnable.ExecutorRunImpl;

/**
 * @author Kunaal A Trehan
 *
 */
public class ThreadPoolExecutorEx {

 /**
  * @param args
  */
 public static void main(String[] args) {
  
  ExecutorService threadPoolExecutor=Executors.newFixedThreadPool(4);
  PaymentProcessor pymtProcessor=new  PaymentProcessor();
  
  ExecutorRunImpl carLoanExecutor=new ExecutorRunImpl(Constants.CAR_LOAN_EMI, pymtProcessor);
  ExecutorRunImpl elecBillExecutor=new ExecutorRunImpl(Constants.ELEC_BILL, pymtProcessor);
  ExecutorRunImpl mbaExecutor=new ExecutorRunImpl(Constants.EXECUTIVE_MBA, pymtProcessor);
  ExecutorRunImpl oldAgeExecutor=new ExecutorRunImpl(Constants.FUND_FOR_OLD_AGE_HOME, pymtProcessor);
  ExecutorRunImpl orphanageExecutor=new ExecutorRunImpl(Constants.FUND_FOR_ORPHANAGE, pymtProcessor);
  ExecutorRunImpl homeLoanExecutor=new ExecutorRunImpl(Constants.HOME_LOAN_EMI, pymtProcessor);
  ExecutorRunImpl kidSchoolExecutor=new ExecutorRunImpl(Constants.KID_SCHOOL_FEES, pymtProcessor);
  ExecutorRunImpl waterBillExecutor=new ExecutorRunImpl(Constants.WATER_BILL, pymtProcessor);
  
  long startTime = System.currentTimeMillis();
  System.out.println("Before starting the threads:-" + startTime);
  
  threadPoolExecutor.execute(carLoanExecutor);
  threadPoolExecutor.execute(elecBillExecutor);
  threadPoolExecutor.execute(mbaExecutor);
  threadPoolExecutor.execute(oldAgeExecutor);
  threadPoolExecutor.execute(orphanageExecutor);
  threadPoolExecutor.execute(homeLoanExecutor);
  threadPoolExecutor.execute(kidSchoolExecutor);
  threadPoolExecutor.execute(waterBillExecutor);
  
  long timeAfterStarting = System.currentTimeMillis();  
  System.out.println("After starting the threads:-" + timeAfterStarting);
  
  threadPoolExecutor.shutdown();
  while(!threadPoolExecutor.isTerminated()){
   
  }
  long timeEnd = System.currentTimeMillis();  
  System.out.println("Time taken for threads to finish-"+ ((timeEnd-timeAfterStarting)/1000) + " seconds");
 }

}


ExecutorRunImpl.java overriding run() method
package com.kunaal.parallel.runnable;

import com.kunaal.parallel.Constants;
import com.kunaal.parallel.domain.PaymentInfo;
import com.kunaal.parallel.payment.PaymentProcessor;

/**
 * @author Kunaal A Trehan
 *
 */
public class ExecutorRunImpl implements Runnable {
 
 private String info;
 private PaymentProcessor pymtProcessor;

 /**
  * @param info
  */
 public ExecutorRunImpl(String info,PaymentProcessor pymtProcessor) {
  this.info = info;
  this.pymtProcessor=pymtProcessor;
 }

 /* (non-Javadoc)
  * @see java.lang.Runnable#run()
  */
 @Override
 public void run() {
  PaymentInfo pymtInfo=null;
  
  if (info.equals(Constants.CAR_LOAN_EMI)){
   pymtInfo=new PaymentInfo(new Double(650), "Acct-1", "Bank-1", "City-1");
  }else if (info.equals(Constants.ELEC_BILL)){
   pymtInfo=new PaymentInfo(new Double(50), "Acct-2", "Bank-2", "City-2");
  }else if (info.equals(Constants.EXECUTIVE_MBA)){
   pymtInfo=new PaymentInfo(new Double(150), "Acct-3", "Bank-3", "City-3");
  }else if (info.equals(Constants.HOME_LOAN_EMI)){
   pymtInfo=new PaymentInfo(new Double(1050), "Acct-4", "Bank-4", "City-4");
  }else if (info.equals(Constants.KID_SCHOOL_FEES)){
   pymtInfo=new PaymentInfo(new Double(70), "Acct-5", "Bank-5", "City-5");
  }else if (info.equals(Constants.WATER_BILL)){
   pymtInfo=new PaymentInfo(new Double(30), "Acct-6", "Bank-6", "City-6");
  }else if (info.equals(Constants.FUND_FOR_OLD_AGE_HOME)){
   pymtInfo=new PaymentInfo(new Double(20), "Acct-7", "Bank-7", "City-7");
  }else if (info.equals(Constants.FUND_FOR_ORPHANAGE)){
   pymtInfo=new PaymentInfo(new Double(30), "Acct-8", "Bank-8", "City-8");
  }else{
   System.out.println("Nothing to pay-no info provided");
  }
  
  pymtProcessor.setPaymentInfo(pymtInfo);
  pymtProcessor.makePayment();
  pymtProcessor.removePaymentInfo();
  
  try {
   Thread.currentThread().sleep(10*1000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }

 /**
  * @return the info
  */
 public String getInfo() {
  return info;
 }

 /**
  * @param info the info to set
  */
 public void setInfo(String info) {
  this.info = info;
 }

}

Results of ThreadPoolExecutorEx.java
Before starting the threads:-1298978911765
After starting the threads:-1298978911765
Payment details are:- PaymentInfo [amt=650.0, acctNum=Acct-1, bankName=Bank-1, city=City-1]
Payment details are:- PaymentInfo [amt=150.0, acctNum=Acct-3, bankName=Bank-3, city=City-3]
Payment details are:- PaymentInfo [amt=50.0, acctNum=Acct-2, bankName=Bank-2, city=City-2]
Payment details are:- PaymentInfo [amt=20.0, acctNum=Acct-7, bankName=Bank-7, city=City-7]
Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-8, bankName=Bank-8, city=City-8]
Payment details are:- PaymentInfo [amt=1050.0, acctNum=Acct-4, bankName=Bank-4, city=City-4]
Payment details are:- PaymentInfo [amt=70.0, acctNum=Acct-5, bankName=Bank-5, city=City-5]
Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-6, bankName=Bank-6, city=City-6]
Time taken for threads to finish-20 seconds

Though this implementation avoids creation of threads and uses the existing threads for other activity once job assigned to it is finished.It suffers from one issue.There is no easy way to track results from thread as run method does not have any return type.

So it takes us to callables and futures.

Before moving to callables and futures,lets see what all variants of thread pool implementation is provided by Executors.These are as follows:-
  • FixedThreadPool
    In this thread pool we define number of threads to be created at the startup.When we have tasks more than number of threads.Then other tasks wait in the queue till the time some thread is free to perform the task.

    In Executors we have utility methods like newFixedThreadPool(int noOfThreads) ,newFixedThreadPool(int nThreads, ThreadFactory threadFactory) for creating the same.
  • CachedThreadPool
    This thread pool  creates as many threads as required for parallel processing.However if there is a thread which is free,it will try to re use that thread rather than creating a new thread.However if a thread has not performed any activity for last 60 seconds,it is terminated and removed from the cache.

    In Executors we have utility methods like  newCachedThreadPool(ThreadFactory threadFactory), newCachedThreadPool() for creating the same.

  • SingleThreadPool
    This thread pool contains only one thread.

  • ScheduledThreadPool
    This threadpool schedule commands to run after a given delay, or to execute periodically.


What is Callable and Future?
Callable is similar to runnable interface except that it can return value.Future is used in conjunction with callable to store result of asynchronously run task.Callable was specifically introduced in jdk1.5 to provide alternate mechanism incase we want to return something from thread execution.


So lets revisit the same use case of funds transfer,but this time we will use Callable and Futures to hold the acknowledgement.


CallableImpl.java storing the return value from call() in the FutureList
package com.kunaal.parallel.callable;

import java.util.concurrent.Callable;

import com.kunaal.parallel.Constants;
import com.kunaal.parallel.domain.PaymentInfo;
import com.kunaal.parallel.payment.PaymentProcessor;

/**
 * Callable implementation with String as thr return value
 * String contains acknowledgement of the payment transaction.
 * 
 * @author Kunaal A Trehan
 *
 */
public class CallableImpl implements Callable<String> {
 private String info;
 private PaymentProcessor pymtProcessor;

 /**
  * @param info
  */
 public CallableImpl(String info,PaymentProcessor pymtProcessor) {
  this.info = info;
  this.pymtProcessor=pymtProcessor;
 }

 /**
  * Over riden call method with String as return type
  */
 @Override
 public String call() throws Exception {
  PaymentInfo pymtInfo=null;
  String acknowledgement=null;
  
  if (info.equals(Constants.CAR_LOAN_EMI)){
   pymtInfo=new PaymentInfo(new Double(650), "Acct-1", "Bank-1", "City-1");
  }else if (info.equals(Constants.ELEC_BILL)){
   pymtInfo=new PaymentInfo(new Double(50), "Acct-2", "Bank-2", "City-2");
  }else if (info.equals(Constants.EXECUTIVE_MBA)){
   pymtInfo=new PaymentInfo(new Double(150), "Acct-3", "Bank-3", "City-3");
  }else if (info.equals(Constants.HOME_LOAN_EMI)){
   pymtInfo=new PaymentInfo(new Double(1050), "Acct-4", "Bank-4", "City-4");
  }else if (info.equals(Constants.KID_SCHOOL_FEES)){
   pymtInfo=new PaymentInfo(new Double(70), "Acct-5", "Bank-5", "City-5");
  }else if (info.equals(Constants.WATER_BILL)){
   pymtInfo=new PaymentInfo(new Double(30), "Acct-6", "Bank-6", "City-6");
  }else if (info.equals(Constants.FUND_FOR_OLD_AGE_HOME)){
   pymtInfo=new PaymentInfo(new Double(20), "Acct-7", "Bank-7", "City-7");
  }else if (info.equals(Constants.FUND_FOR_ORPHANAGE)){
   pymtInfo=new PaymentInfo(new Double(30), "Acct-8", "Bank-8", "City-8");
  }else{
   System.out.println("Nothing to pay-no info provided");
  }
  
  pymtProcessor.setPaymentInfo(pymtInfo);
  acknowledgement=pymtProcessor.makePaymentGetAck();
  pymtProcessor.removePaymentInfo();
  
  try {
   Thread.currentThread().sleep(10*1000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }

  return acknowledgement;
 }

}


CallableExecutor.java using callable for the execution and future for displaying the acknowledgements
package com.kunaal.parallel;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.kunaal.parallel.callable.CallableImpl;
import com.kunaal.parallel.payment.PaymentProcessor;

/**
 * Usecase with callable and future implementation
 * Here we submit callable references in the thread pool 
 * which executes it and store the result in the future
 * 
 * @author Kunaal A Trehan
 *
 */
public class CallableExecutor {

 /**
  * @param args
  * @throws ExecutionException 
  * @throws InterruptedException 
  */
 public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService threadPoolExecutor=Executors.newFixedThreadPool(4);
  PaymentProcessor pymtProcessor=new  PaymentProcessor();
  List<Future<String>> futureList= new ArrayList<Future<String>>();
  
  CallableImpl carLoanExecutor=new CallableImpl(Constants.CAR_LOAN_EMI, pymtProcessor);
  CallableImpl elecBillExecutor=new CallableImpl(Constants.ELEC_BILL, pymtProcessor);
  CallableImpl mbaExecutor=new CallableImpl(Constants.EXECUTIVE_MBA, pymtProcessor);
  CallableImpl oldAgeExecutor=new CallableImpl(Constants.FUND_FOR_OLD_AGE_HOME, pymtProcessor);
  CallableImpl orphanageExecutor=new CallableImpl(Constants.FUND_FOR_ORPHANAGE, pymtProcessor);
  CallableImpl homeLoanExecutor=new CallableImpl(Constants.HOME_LOAN_EMI, pymtProcessor);
  CallableImpl kidSchoolExecutor=new CallableImpl(Constants.KID_SCHOOL_FEES, pymtProcessor);
  CallableImpl waterBillExecutor=new CallableImpl(Constants.WATER_BILL, pymtProcessor);
  
  futureList.add(threadPoolExecutor.submit(carLoanExecutor));
  futureList.add(threadPoolExecutor.submit(elecBillExecutor));
  futureList.add(threadPoolExecutor.submit(mbaExecutor));
  futureList.add(threadPoolExecutor.submit(oldAgeExecutor));
  futureList.add(threadPoolExecutor.submit(orphanageExecutor));
  futureList.add(threadPoolExecutor.submit(homeLoanExecutor));
  futureList.add(threadPoolExecutor.submit(kidSchoolExecutor));
  futureList.add(threadPoolExecutor.submit(waterBillExecutor));
  
  threadPoolExecutor.shutdown();
  
  for(Future<String> future:futureList){
   System.out.println(future.get());
  }
 }

}

CallableExecutor.java results
ACK-Acct-1-650.0-Y
ACK-Acct-2-50.0-Y
ACK-Acct-3-150.0-Y
ACK-Acct-7-20.0-Y
ACK-Acct-8-30.0-Y
ACK-Acct-4-1050.0-Y
ACK-Acct-5-70.0-Y
ACK-Acct-6-30.0-Y


So in the above code we used callable and future to capture the results.However it can still be improved.But where is the problem.......

Problem lies in the iterator of futureList which is used to print the acknowledgements.
Imagine that the first task took good amount of time.By that time other tasks have finished executing.But still we can't see the results of those tasks unless first task gets completed.

So what we should do,java has provided CompletionService which we will discuss in next section which overcomes this issue.

CompletionService
CompletionService is the interface provided by java.util.concurrent package which isolates producer from submitting the task to consumer who retrieves the result irrespective of the order in which task is submitted.Consumer takes the result in the order it is completed.



ExecutorCompletionService is concrete implementation of CompletionService.

CompletionExecutor.java showing implementation of CompletionService
package com.kunaal.parallel;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * CompletionExecutor showing the use of CompletionService
 * which adds the result as it comes from the thread rather than 
 * maintaining the order in which threads were spawned
 * 
 * @author Kunaal A Trehan
 *
 */
public class CompletionExecutor {

 /**
  * @param args
  * @throws ExecutionException 
  * @throws InterruptedException 
  */
 public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService threadPoolExecutor=Executors.newFixedThreadPool(4);
  CompletionService<String> completionService=new ExecutorCompletionService<String>(threadPoolExecutor);
  
  CallImpl impl1=new CallImpl(20);
  CallImpl impl2=new CallImpl(10);
  CallImpl impl3=new CallImpl(5);
  CallImpl impl4=new CallImpl(3);
  
  completionService.submit(impl1);
  completionService.submit(impl2);
  completionService.submit(impl3);
  completionService.submit(impl4);
  
  threadPoolExecutor.shutdown();
  
  for(int i=0;i <4;i++){
   String result = completionService.take().get();
   System.out.println(result);
  }
 }

}

/**
 * Inner class for Callable implementation 
 * where threads paused for n seconds before completing the 
 * execution block.
 * 
 * @author Kunaal A Trehan
 *
 */
class CallImpl implements Callable<String>{

 private int waitTime;
 
 /**
  * @param waitTime
  */
 public CallImpl(int waitTime) {
  super();
  this.waitTime = waitTime;
 }

 @Override
 public String call() throws Exception {
  long startTime = System.currentTimeMillis();
  long endTime =startTime +(waitTime*1000);
  
  while(System.currentTimeMillis()<endTime){
   
  }
  return "Thread paused for " + waitTime + " seconds";
 }
 
}


Program console showing how completion service adds the result of the callables in the order it is completed
Thread paused for 3 seconds
Thread paused for 5 seconds
Thread paused for 10 seconds
Thread paused for 20 seconds

2 comments: