java.util.concurrent: Executors, Thread Pools(Cache, Fixed, Scheduled), ExecutorCompletionService – A tutorial

I received a request from one of my blog readers Kunal inquiring if I was planning to post something on FutureTask and ExecutorCompletionService. Since he put the seed of a possible blog post in my mind, I decided to go ahead and write a post on the two concurrency classes. Additionally to make the coverage complete the post intends to cover the Executors class as well.

Let’s start with creating a class which spawns a thread. Java provides two approaches for creating threads, one create a Thread sub class or implement Runnable interface. Subclassing Thread is a bad idea because it constraints the developer from developing a domain specific inheritance hierarchy. The class can only extend Thread. Instead implementing Runnable interface is a more suitable option. So let’s create a new class Activity class which implements Runnable interface. Refer the code below for the class source.

package com.concurrency;

public class Activity implements Runnable {

	@Override
	public void run() {
		System.out.println("Running concurrent activity!");
	}

}

To spawn a Activity thread, we create an Thread instance with the Activity instance as the constructor argument. Look at the source code of ThreadSpawner class.

package com.concurrency.test;

import com.concurrency.Activity;

public class ThreadSpawner {

	public static void main(String[] args) {
		Thread t = new Thread(new Activity());
		t.start();
	}

}

This is the pre Java 5 days way of creating a java thread. In post Java 5 days, Java has introduced new classes. One of the concurrency classes added in Java 5 is the Executors class. This class provided a number of utility methods. One of the method is defaultThreadFactory which creates a ThreadFactory class instance which can be used spawn new threads. An alternative approach to creating a new thread is explained in ThreadFactoryTest class. Here’s the class source code:

package com.concurrency.tf.test;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import com.concurrency.Activity;

public class ThreadFactoryTest {

	public static void main(String[] args) {
		ThreadFactory tf = Executors.defaultThreadFactory();
		Thread t = tf.newThread(new Activity());
		t.start();		
	}

}

Using Executors’s defaultThreadFactory, a ThreadFactory instance is created. To the ThreadFactory instance’s newThread method a Runnable instance whose thread needs to be spawned is passed. I have used the same Activity class to help understand the similarities and differences in the two approaches: the pre Java 5 and post Java 5 approach.So now the most important question is what’s the benefit of using Executors and ThreadFactory classes? I was able to figure out that using the Java 5 concurrency framework provides the newly created thread with an automatically assigned thread group name, thread pool name and thread name. Beyond that I am not sure if there are any additional benefits the Executors framework provides.

Parallel computing or parallelization is an important programming paradigm which speeds up a sequentially executing program. Java does not allow developers to spawn new processes; it allows spawning of threads. Breaking a task into sub activities which can be ran in parallel helps reduce the overall task processing time. However one needs to keep in mind that such an approach if uncontrolled can lead to spawning of large number of threads. Threads in large quantities can put a strain on JVM resources (CPU, memory). Therefore it is necessary to put in place a method to limit the number of threads to a threshold, the Executors framework provides such a mechanism.

The Executors framework allows the creation of thread pool.Executors supports creation of a cachable, fixed and scheduled thread pool. Let’s try and understand each thread pool type with an example. To begin with let’s create a Runnable implementation which can be reused for all the pools. The class we create is PoolRunnable and below is its source code.

package com.concurrency.pool;

import java.text.SimpleDateFormat;

import com.concurrency.utils.ConcurrencyUtils;

public class PoolRunnable implements Runnable {

	private int delayTime = 0;
	
	public PoolRunnable(int delayTime) {
		this.delayTime = delayTime;
	}
	
	public PoolRunnable() {
		
	}
	
	@Override
	public void run() {
		Thread curThread = Thread.currentThread();
		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss");
		System.out.println(df.format(new java.util.Date()) + " Starting thread " + curThread.getName());
		try {
			Thread.sleep(delayTime * 1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("@@@@@@@@@@@@@@@@@");
		System.out.println(this.toString());
		System.out.println(df.format(new java.util.Date()) + " Ending thread " + curThread.getName());
		System.out.println("#################");
	}

	@Override
	public String toString() {
		return ConcurrencyUtils.getThreadInfo();
	}
		
}

The goal of the PoolRunnable class is to spawn a thread which is to replicate a typical application behavior by sleeping for a user defined time duration. The end user configures the PoolRunnable sleep duration by passing the interval as a constructor argument. PoolRunnable also utilizes a utility class ConcurrencyUtils for its toString implementation. Here’s the source code.

package com.concurrency.utils;

import java.text.SimpleDateFormat;

public class ConcurrencyUtils {
	
	public static String getThreadInfo() {
		StringBuilder sb = new StringBuilder();
		Thread curThread = Thread.currentThread();
		sb.append("Thread Id: ");
		sb.append(curThread.getId());
		sb.append("\n");
		sb.append("Name: ");
		sb.append(curThread.getName());
		sb.append("\n");
		sb.append("Group: ");
		sb.append(curThread.getThreadGroup().getName());
		sb.append("\n");
		return sb.toString();
	}

	public static String getShortThreadInfo() {
		StringBuilder sb = new StringBuilder();
		Thread curThread = Thread.currentThread();
		sb.append("Thread Id: ");
		sb.append(curThread.getId());
		sb.append(", ");
		sb.append("Name: ");
		sb.append(curThread.getName());
		sb.append(", ");
		sb.append("Group: ");
		sb.append(curThread.getThreadGroup().getName());
		sb.append("\n");
		return sb.toString();
	}

	public static String retrieveCurrentDate(){
		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss");
		return df.format(new java.util.Date());
	}
		
}

To understand the cached thread pool functioning consider the following CachedPoolTest implementation:

package com.concurrency.pool.ctp.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

import com.concurrency.pool.PoolRunnable;

public class CachedPoolTest {

	public static void main(String[] args) {
		ExecutorService svc = Executors.newCachedThreadPool();
			
		Runnable r1 = new PoolRunnable();
		svc.execute(r1);
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		Runnable r2 = new PoolRunnable(1);
		svc.execute(r2);

		try {
			Thread.sleep(1200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		Runnable r3 = new PoolRunnable(2);
		svc.execute(r3);

		try {
			Thread.sleep(2100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		svc.shutdown();
	}

}

The line 13 creates a ExecutorService which retains a handle to the cached thread pool. By default, a pool having minimum size zero and max size equivalent of maximum integer value is created.An unused thread is retained in the cached pool for a duration of 60 seconds; beyond which the thread is removed from the pool.

The test class CachedPoolTest executes three Runnable instances of PoolRunnable class each provided a sleep duration of 0, 1 and 2 seconds respectively. The instances are individually submitted to execute method of the ExecutorService. For proper testing it is essential that the first thread is allowed to complete before triggering the subsequent thread. This is achieved by introducing a delay(1 sec, 1.2 sec, 2.1 sec) between each execute method invocation. Here’s the test class run output:

2011-05-13 at 14:52:15 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 14:52:15 Ending thread pool-1-thread-1
#################
2011-05-13 at 14:52:16 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 14:52:17 Ending thread pool-1-thread-1
#################
2011-05-13 at 14:52:17 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 14:52:19 Ending thread pool-1-thread-1
#################

I have used concurrency utils to print thread id, name and pool information. From the output it is clear that all reuse the same thread. One more thing to note is the line 42. Invocation of shutdown method on ExecutorService is important else the Java process will not terminate.

I have enhanced to add a fourth Runnable instance which will be executed after 61 seconds. This is to verify if a new thread is created after the expiration of the cache thread retention duration. The test class run output proves me correct. Below is the enhanced class and its output.

package com.concurrency.pool.ctp.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

import com.concurrency.pool.PoolRunnable;

public class CachedPoolTest {

	public static void main(String[] args) {
		ExecutorService svc = Executors.newCachedThreadPool();
			
		Runnable r1 = new PoolRunnable();
		svc.execute(r1);
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		Runnable r2 = new PoolRunnable(1);
		svc.execute(r2);

		try {
			Thread.sleep(1200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		Runnable r3 = new PoolRunnable(2);
		svc.execute(r3);

		try {
			Thread.sleep(2100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		try {
			Thread.sleep(61000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		Runnable r4 = new PoolRunnable();
		svc.execute(r4);

		
		svc.shutdown();
	}

}
2011-05-13 at 15:00:22 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 15:00:22 Ending thread pool-1-thread-1
#################
2011-05-13 at 15:00:23 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 15:00:24 Ending thread pool-1-thread-1
#################
2011-05-13 at 15:00:24 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 15:00:26 Ending thread pool-1-thread-1
#################
2011-05-13 at 15:01:27 Starting thread pool-1-thread-2
@@@@@@@@@@@@@@@@@
Thread Id: 9
Name: pool-1-thread-2
Group: main

2011-05-13 at 15:01:27 Ending thread pool-1-thread-2
#################

So we have looked at cached thread pool. Now let’s focus on the other types. First let’s look at the fixed thread pool. I have created the FixedThreadPoolTest class for explaining fixed thread pool. Refer the source code below:

package com.concurrency.pool.fp;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.concurrency.pool.PoolRunnable;

public class FixedThreadPoolTest {

	public static void main(String[] args) {
		ExecutorService svc = Executors.newFixedThreadPool(2);
		
		Runnable r1 = new PoolRunnable(2);
		svc.execute(r1);
				
		Runnable r2 = new PoolRunnable(3);
		svc.execute(r2);

		Runnable r3 = new PoolRunnable(3);
		svc.execute(r3);
		
		svc.shutdown();
		
	}
}

The line 11 in the test class creates a fixed thread pool containing 2 threads. I then execute three threads at nearly the same time with individual thread sleep durations 2 seconds, 3 seconds and 3 seconds respectively. the output observed is as below.

2011-05-13 at 15:14:26 Starting thread pool-1-thread-1
2011-05-13 at 15:14:26 Starting thread pool-1-thread-2
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 15:14:28 Ending thread pool-1-thread-1
#################
2011-05-13 at 15:14:28 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 9
Name: pool-1-thread-2
Group: main

2011-05-13 at 15:14:29 Ending thread pool-1-thread-2
#################
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 15:14:31 Ending thread pool-1-thread-1
#################

The test class run output shows that two threads are started simultaneously, one thread completes its run after 2 seconds. On termination of this thread, the third thread is spawned which terminates after 3 seconds. The test clearly shows that the 2 thread upper limit defined during the thread pool creation is honored and any new thread requests beyond the threshold limit are queued until existing pool thread(s) become available.

Sometimes during programming we have a requirement of triggering a specific functionality or a set of
functionalities after a designated period of time. The Executors framework provides a facility to create scheduled thread pool. Refer the source code of the test class ScheduleConcurrencyTest

package com.concurrency.schedule.test;

import java.text.SimpleDateFormat;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.concurrency.Calculator;
import com.concurrency.pool.PoolRunnable;

public class ScheduleConcurrencyTest {

	public static void main(String[] args) {
		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss");
		ScheduledExecutorService svc = Executors.newScheduledThreadPool(2);
		System.out.println(df.format(new java.util.Date()) + " Time Runnable");
		svc.schedule(new PoolRunnable(), 2, TimeUnit.SECONDS);
		System.out.println(df.format(new java.util.Date()) + " Time Callable");
		ScheduledFuture<Integer> sf = svc.schedule(new Calculator(1, 3, 
			Calculator.OPERATION_ADD), 5, TimeUnit.SECONDS);
		try {
			System.out.println("Waiting for value.");
			Integer val = sf.get();
			System.out.println(df.format(new java.util.Date()) + " Time Callable Retrieve");
			System.out.println("Computed Value: " + val);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		
		svc.shutdown();
		
	}
}

Refer line 17 of ScheduleConcurrencyTest. Here we create a scheduled thread pool and get a handle to the underlying pool via the ScheduledExecutorService interface. Next look at line 19. Here we schedule a PoolRunnable instance to be run after a delay of 2 seconds. Runnable implementation basically constitutes functionality which needs to be executed in parallel with the invoking application. Some times we expect the thread to do some processing and return some value. Java 5 has introduced the Callable construct. This allows threads to return values to the invoking application.

The ScheduledExecutorService allows execution of Callable interface after a defined time interval. I create a
calculator implementation of Callable interface. The Calculator is expected to add two integer values and respond back with the sum value. The source code of Calculator class handling Callable idiosyncrasies is shown below:

package com.concurrency;

import java.util.concurrent.Callable;

public class Calculator implements Callable<Integer> {
	
	public static final int OPERATION_ADD = 1;
	
	private int param1 = 0;
	private int param2 = 0;
	private int operation = 0;
	
	public Calculator(int param1, int param2, int operation) {
		this.param1 = param1;
		this.param2 = param2;
		this.operation = operation;
	}

	public int getParam1() {
		return param1;
	}

	public void setParam1(int param1) {
		this.param1 = param1;
	}

	public int getParam2() {
		return param2;
	}

	public void setParam2(int param2) {
		this.param2 = param2;
	}

	public int getOperation() {
		return operation;
	}

	public void setOperation(int operation) {
		this.operation = operation;
	}
	
	@Override
	public Integer call() throws Exception {
		int retValue = 0;
		switch (this.operation) {
		case 1:
			retValue = this.param1 + this.param2;
			break;
		// Additional cases can be similarly added for 
		//other mathematical operations
			
		default:
			retValue = 0;
			break;
		}
		return Integer.valueOf(retValue) ;
	}
}

Refer line 21 in the ScheduleConcurrencyTest. Here we schedule Calculator for execution after passage of 5 seconds. In line 25 we get the return value from the Calculator which can be used for further processing. The test run output is shown below.

2011-05-13 at 16:48:17 Time Runnable
2011-05-13 at 16:48:17 Time Callable
Waiting for value.
2011-05-13 at 16:48:19 Starting thread pool-1-thread-1
@@@@@@@@@@@@@@@@@
Thread Id: 8
Name: pool-1-thread-1
Group: main

2011-05-13 at 16:48:19 Ending thread pool-1-thread-1
#################
2011-05-13 at 16:48:22 Time Callable Retrieve
Computed Value: 4

Clearly line 4 and 12 of the output show that PoolRunnable instance is invoked after a delay of 2 seconds, and Calculator is invoked after completion of 5 seconds. The schedule example used above is a case where the schedule task/job is to processed on a one time basis only. However in real life scenarios, we might need the ability to trigger a task at regular intervals in time or after a fixed period after the completion of the specific task. The Executors framework provides two methods namely scheduleAtFixedRate and scheduleWithFixedDelay to implement the same.

Let’s take the first example of scheduleAtFixedRate. Refer the test class ScheduleIntervalTest’s source code:

package com.concurrency.schedule.test;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.concurrency.ScheduleJob;

public class ScheduleIntervalTest {

	public static void main(String[] args) {
		ScheduledExecutorService svc = Executors.newScheduledThreadPool(2);
		svc.scheduleAtFixedRate(new ScheduleJob(), 2, 2, TimeUnit.SECONDS);

	}
}

To understand the functionality better, I have created a new Runnable implementation ScheduleJob, here’s
ScheduleJob’s source code:

package com.concurrency;

import java.text.SimpleDateFormat;

public class ScheduleJob implements Runnable {
	
	private static int counter = 0;
	
	public ScheduleJob() {
		
	}

	@Override
	public void run() {
		try {
			if (counter == 5) {
				System.out.println("Exception is thrown!");
				throw new RuntimeException("Job cannot be processed.");
			}
			SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss");
			System.out.println(df.format(new java.util.Date()) + " job ran.");
			counter++;
		} catch(Throwable t) {
			t.printStackTrace();
			counter = 0;
		}
	}
}

The ScheduleJob has been programmed to throw a RuntimeException on its sixth run. The idea is to determine if the schedule job is affected by the exception or not. Here’s the test output run:

2011-05-13 at 17:13:05 job ran.
2011-05-13 at 17:13:07 job ran.
2011-05-13 at 17:13:09 job ran.
2011-05-13 at 17:13:11 job ran.
2011-05-13 at 17:13:13 job ran.
Exception is thrown!
java.lang.RuntimeException: Job cannot be processed.
	at com.concurrency.ScheduleJob.run(ScheduleJob.java:18)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(Unknown Source)
	at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(Unknown Source)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(Unknown Source)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
2011-05-13 at 17:13:17 job ran.
2011-05-13 at 17:13:19 job ran.
:
:
:

The job continues running every 2nd second as defined during the scheduleAtFixedRate method invocation.
Now moving on to the scheduleWithFixedDelay method. For understanding this, I am creating a new Runnable
implementation ScheduleDelayJob. During the thread run, the thread sleeps for a duration which is incremented by one for each subsequent invocation.

package com.concurrency;

import java.text.SimpleDateFormat;

public class ScheduleDelayJob implements Runnable {

	private static int counter = 1;
	
	@Override
	public void run() {
		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss");
		System.out.println(df.format(new java.util.Date()) + " Starting run.");
		
		try {
			Thread.sleep(counter * 1000);
			counter++;
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(df.format(new java.util.Date()) + " Ending run.");
	}
}

Refer the ScheduleDelayTest test class:

package com.concurrency.schedule.test;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.concurrency.ScheduleDelayJob;

public class ScheduleDelayTest {

	public static void main(String[] args) {
		ScheduledExecutorService svc = Executors.newScheduledThreadPool(2);
		svc.scheduleWithFixedDelay(new ScheduleDelayJob(), 2, 2, TimeUnit.SECONDS);
		
	}
}

The following thread is scheduled to run 2 seconds after completion of the incumbent thread. Here’s the test run output:

2011-05-13 at 17:27:16 Starting run.
2011-05-13 at 17:27:17 Ending run.
2011-05-13 at 17:27:19 Starting run.
2011-05-13 at 17:27:21 Ending run.
2011-05-13 at 17:27:23 Starting run.
2011-05-13 at 17:27:26 Ending run.
2011-05-13 at 17:27:28 Starting run.
2011-05-13 at 17:27:32 Ending run.
2011-05-13 at 17:27:34 Starting run.
2011-05-13 at 17:27:39 Ending run.
2011-05-13 at 17:27:41 Starting run.
:
:
:

Note that the difference between “Starting Run” and “Ending Run” time stamps is increasing incremently by 1 second and that new thread execution is starting exactly 2 seconds after completion of the incumbent thread run.

Finally let’s move on to ExecutorCompletionService. To understand the significance of this class, let’s use a real life scenario. Consider a search engine, a user types in a word to search, the engine queries its database and retrieves matches and displays the end results. Now to speed up this match retrieval process, let’s say that instead of querying a single large database, the engine queries hundreds of small databases. To acheive this in an optimum fashion, instead of one thread running a sequential query on each database, it would make sense to parallelise the individual database query process and consolidate the results sequentially. That is precisely what ExecutorCompletionService endeavors to achieve.

Let’s create SearchFragmentResult class which replicates the individual database search thread. The
SearchFragmentResult class implements Callable interface and is expected to return an integer value representing number of matches. Here’s the source code of the SearchFragmentResult class.

package com.concurrency.exec.comp;

import java.util.concurrent.Callable;

import com.concurrency.utils.ConcurrencyUtils;

public class SearchResultFragment implements Callable<Integer> {

	private boolean throwException = false;
	private int delayValue = 0;
	
	public SearchResultFragment(int delay) {
		this.delayValue = delay;
	}
	
	public SearchResultFragment(int delay, boolean exceptionFlag) {
		this.throwException = exceptionFlag;
		this.delayValue = delay;
	}

	@Override
	public Integer call() throws Exception {
		System.out.println(ConcurrencyUtils.retrieveCurrentDate() + 
			": Starting " + ConcurrencyUtils.getShortThreadInfo());
		if (this.throwException) {
			throw new RuntimeException("Check exception handling.");
		}
		int val = 0;
		if (this.delayValue > 5) {
			val = 10 - this.delayValue;
		} else {
			val = this.delayValue*2;
		}
		Thread.sleep(val * 1000);
		System.out.println(ConcurrencyUtils.retrieveCurrentDate() + 
				": Completing " + ConcurrencyUtils.getShortThreadInfo());
		return this.delayValue;
	}

}

The SearchFragmentResult has two overloaded constructor implementations, one accepting a delay value as input and the other accepting an exception flag value as input along with delay value. The boolean flag is used to inform the class to throw RuntimeException during its thread run. The delay value puts the thread to sleep for a computed duration time. This is just to simulate real life behavior of response time differences. Logic has been added in the run method implementation to ensure varied sleep durations.

Now let’s move on to the test class ExecutorCompletionSvcTest. Refer the source code below:

package com.concurrency.exec.comp.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.concurrency.exec.comp.SearchResultFragment;

public class ExecutorCompletionSvcTest {

	public static void main(String[] args) {
		ExecutorService svc = Executors.newFixedThreadPool(2);
		
		ExecutorCompletionService<Integer> compSvc 
			= new ExecutorCompletionService<Integer>(svc);
		
		List<SearchResultFragment> inputs = new ArrayList<SearchResultFragment>();
		for (int i=0; i<10; i++){
			SearchResultFragment frag = new SearchResultFragment(i);
			inputs.add(frag);
		}
		
		List<Future<Integer>> results = new ArrayList<Future<Integer>>();
		for (SearchResultFragment searchResultFragment : inputs) {
			Future<Integer> future = compSvc.submit(searchResultFragment);
			results.add(future);
		}
		System.out.println("Done submitting all requests!");
		
		boolean notDone = true;
		int result = 0;
		
		while(notDone) {
			boolean status = false;
			for (Future<Integer> future : results) {
				if (!future.isDone()) {
					status = true;
				}
			}
			if (!status) {
				notDone = false;
			} else {
				try {
					Thread.sleep(2000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
		for (Future<Integer> future : results) {
			int interimVal = 0;
			try {
				interimVal = future.get();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
			System.out.println("Interim Val: " + interimVal); 
			result += interimVal;

		}
		System.out.println("Result Value: " + result);
		svc.shutdown();
	}
}

The test code spawns 10 threads. These threads are submitted to ExecutorCompletionService, refer line 29 in the code. The service processes them and returns the match results as integer value. The value are returned back by the individual thread using the Future interface, refer line . The Future’s state is checked to validate if the processing is complete. This is achieved by invoking Future’s isDone() method, refer line . If completed, the value is retrieved using the get() method. The returned values are added to get the total match count. Here’s the test class run output:

Done submitting all requests!
2011-05-13 at 18:33:56: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:33:56: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:33:56: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:33:56: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:33:58: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:33:58: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:34:00: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:00: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:04: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:34:04: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:34:08: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:08: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:12: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:12: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:14: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:34:14: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:34:15: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:15: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:34:16: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:34:16: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

Interim Val: 0
Interim Val: 1
Interim Val: 2
Interim Val: 3
Interim Val: 4
Interim Val: 5
Interim Val: 6
Interim Val: 7
Interim Val: 8
Interim Val: 9
Result Value: 45

The run output clearly shows that we created a thread pool of 2, which was honored and the threshold was not exceeded. Once all the threads completed their processing, individual values were sought and added together.

There is an alternative and albeit the right way of getting values from Future interfaces. Refer the source code for ExecutorCompletionSvc2Test.

package com.concurrency.exec.comp.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.concurrency.exec.comp.SearchResultFragment;

public class ExecutorCompletionSvc2Test {

	public static void main(String[] args) {
		ExecutorService svc = Executors.newFixedThreadPool(2);
		
		ExecutorCompletionService<Integer> compSvc 
			= new ExecutorCompletionService<Integer>(svc);
		
		List<SearchResultFragment> inputs = new ArrayList<SearchResultFragment>();
		for (int i=0; i<10; i++){
			SearchResultFragment frag = new SearchResultFragment(i);
			inputs.add(frag);
		}
		
		List<Future<Integer>> results = new ArrayList<Future<Integer>>();
		for (SearchResultFragment searchResultFragment : inputs) {
			Future<Integer> future = compSvc.submit(searchResultFragment);
			results.add(future);
		}
		System.out.println("Done submitting all requests!");
		
		int result = 0;
		int counter = 0;
		
		while(counter != 10) {
			try {
				Future<Integer> response = compSvc.take();
				try {
					int interimVal = response.get();
					result += interimVal;
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			counter++;
		}
		
		System.out.println("Result Value: " + result);
		svc.shutdown();
	}
}

Here we use the take() method on ExecutorCompletionService to retrieve the Future reference of the currently completed thread, refer line 39.

So now what happens if an exception is thrown? Refer the source code of two test classes

ExecutorCompletionSvcExceptionTest and ExecutorCompletionSvcException2Test. ExecutorCompletionSvcException2Test is the right way to do things.

package com.concurrency.exec.comp.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.concurrency.exec.comp.SearchResultFragment;

public class ExecutorCompletionSvcExceptionTest {

	public static void main(String[] args) {
		ExecutorService svc = Executors.newFixedThreadPool(2);
		
		ExecutorCompletionService<Integer> compSvc 
			= new ExecutorCompletionService<Integer>(svc);
		
		List<SearchResultFragment> inputs = new ArrayList<SearchResultFragment>();
		for (int i=0; i<10; i++){
			SearchResultFragment frag = null;
			if (i == 5) {
				frag = new SearchResultFragment(i, true);
			} else {
				frag = new SearchResultFragment(i);
			}
			inputs.add(frag);
		}
		
		List<Future<Integer>> results = new ArrayList<Future<Integer>>();

		for (SearchResultFragment searchResultFragment : inputs) {
			Future<Integer> future = compSvc.submit(searchResultFragment);
			results.add(future);
		}
		System.out.println("Done submitting all requests!");
		boolean notDone = true;
		int result = 0;
		
		while(notDone) {
			boolean status = false;
			for (Future<Integer> future : results) {
				if (!future.isDone()) {
					status = true;
				}
			}
			if (!status) {
				notDone = false;
			} else {
				try {
					Thread.sleep(2000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
		for (Future<Integer> future : results) {
			int interimVal = 11;
			try {
				interimVal = future.get();
				System.out.println("Interim Val: " + interimVal); 
				result += interimVal;

			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				System.out.println("Execution Val: " + interimVal);
				e.printStackTrace();
			}

		}
		System.out.println("Result Value: " + result);
		svc.shutdown();
	}
}
package com.concurrency.exec.comp.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.concurrency.exec.comp.SearchResultFragment;

public class ExecutorCompletionSvcException2Test {

	public static void main(String[] args) {
		ExecutorService svc = Executors.newFixedThreadPool(2);
		
		ExecutorCompletionService<Integer> compSvc 
			= new ExecutorCompletionService<Integer>(svc);
		
		List<SearchResultFragment> inputs = new ArrayList<SearchResultFragment>();
		for (int i=0; i<10; i++){
			SearchResultFragment frag = null;
			if (i == 5) {
				frag = new SearchResultFragment(i, true);
			} else {
				frag = new SearchResultFragment(i);
			}
			inputs.add(frag);
		}
		
		List<Future<Integer>> results = new ArrayList<Future<Integer>>();

		for (SearchResultFragment searchResultFragment : inputs) {
			Future<Integer> future = compSvc.submit(searchResultFragment);
			results.add(future);
		}
		System.out.println("Done submitting all requests!");
		
		int result = 0;
		int counter = 0;
		
		while(counter != 10) {
			try {
				Future<Integer> response = compSvc.take();
				try {
					int interimVal = response.get();
					result += interimVal;
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			counter++;
		}
		
		System.out.println("Result Value: " + result);
		svc.shutdown();
	}
}

Here’s the run output:

Done submitting all requests!
2011-05-13 at 18:43:47: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:43:47: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:47: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:43:47: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:43:49: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:49: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:51: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:43:51: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:43:55: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:55: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:55: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:59: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:59: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:43:59: Starting Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:43:59: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:44:01: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:44:01: Starting Thread Id: 8, Name: pool-1-thread-1, Group: main

2011-05-13 at 18:44:02: Completing Thread Id: 9, Name: pool-1-thread-2, Group: main

2011-05-13 at 18:44:02: Completing Thread Id: 8, Name: pool-1-thread-1, Group: main

Interim Val: 0
Interim Val: 1
Interim Val: 2
Interim Val: 3
Interim Val: 4
Execution Val: 11
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Check exception handling.
	at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source)
	at java.util.concurrent.FutureTask.get(Unknown Source)
	at com.concurrency.exec.comp.test.ExecutorCompletionSvcExceptionTest.main

(ExecutorCompletionSvcExceptionTest.java:63)
Caused by: java.lang.RuntimeException: Check exception handling.
	at com.concurrency.exec.comp.SearchResultFragment.call(SearchResultFragment.java:26)
	at com.concurrency.exec.comp.SearchResultFragment.call(SearchResultFragment.java:1)
	at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Interim Val: 6
Interim Val: 7
Interim Val: 8
Interim Val: 9
Result Value: 40

As long as the main thread handles the exception, we lose only that worker thread’s computational value, the rest work fine and remain uninterrupted. As a devloper one might need to consider this possibility and design code accordingly.

That’s all for the moment. Kunal, I hope this helps.

Advertisements

15 thoughts on “java.util.concurrent: Executors, Thread Pools(Cache, Fixed, Scheduled), ExecutorCompletionService – A tutorial

  1. http vkontakte audio vk skaver vkontakte ru welcome vk 960 игры vkontakte

    vk сервер
    vk ip
    vk cc kf9qg
    http vk com login php
    vk gifts

  2. It is really nice…..
    I have one situation.Could you please help me in this if you can.
    I have 2 threads, one threads read data from socket and offers into BlockingQueue.This threads creates another thread.2nd Thread polls Queue and parses data..2nd thread continuously parses data inside infinite while loop…
    So problem is if I put Thread.sleep() inside while loop CPU utilization is very lass.Otherwise it is very high.(used P4 processor).
    How can I optimize CPU utilization without using thread.sleep().
    I got some suggestions that thread.sleep() is bad design at all.

    Could you help me to optimize CPU utilization…

    Here is sample code….

    public class SocketThread extends Thread {
    private Set blockinQueue= new HashSet(1, 1);
    private BlockingQueue<Set> bqSocketOutput;

    ProcessThread pThread;

    @Override
    public void run() {
    pThread = new ProcessThread(blockinQueue);
    pThread.start();
    for(long i=0; i<= 30000; i++) {
    System.out.println("SocketThread – Testing" + i);
    blockinQueue.put(setSocketOutput);
    }
    }

    }

    public class ProcessThread extends Thread {

    public ProcessThread(BlockingQueue<Set> blockinQueue) {
    System.out.println(“ProcessThread – Constructor”);
    }

    @Override
    public void run() {
    System.out.println(“ProcessThread – Exectution”);
    while (true) {
    blockinQueue.poll();
    // some parsing work

    //This should be changed..
    try {
    Thread.sleep(1); // FIXME: must need to redesign this logic
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    1. Hi Charan,

      As I understand it, you are looking to optimize your CPU utilization. Look at using a Semaphore to control the number of concurrent threads at a given point in time. Refer my blog post for a sample.

  3. This is a most excellent tutorial on the Future, Executors, ExecutorService, ExecutorCompletionService and ExecutionException classes. I just spent the morning going over everything you’ve illustrated here and I have to confess you’ve changed the way I’ll be doing management of Threads here and now and well into the future.

  4. public class ExecutorCompletionSvcException2TestII {

    public static void main(String[] args) {
    ExecutorService svc = Executors.newFixedThreadPool(4);

    ExecutorCompletionService compSvc
    = new ExecutorCompletionService(svc);

    for (int i=0; i<10; i++)
    compSvc.submit(new SearchResultFragment(i, (i==5)));

    System.out.println("Done submitting all requests!");

    int result = 0;
    int counter = 0;

    while(counter != 10) {
    try {
    Future response = compSvc.take();
    try {
    int interimVal = response.get();
    result += interimVal;
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    counter++;
    }

    System.out.println(“Result Value: ” + result);
    svc.shutdown();
    }
    }

  5. Ребенок уговорил приобрести котика домой.
    В результате пришлось его дарить соседям.
    Вдруг оказалось, что у сына на кошачью шерсть аллергия!

  6. Комната 10 м кв в 3-к квартире на 1 этаже 12-этажного панельного дома. За 15000 рублей.
    Комната 18 метров в трехкомнатной квартире за 20000 рублей. 89032968765
    В квартире есть телевизор, интернет, холодильник, плита электрическая, стиральная машина автомат, туалет и ванна раздельные, кухня 8 кв.м. Мебель имеется.
    Предлагаю Вам снять комнату в трехкомнатной квартире с хозяйкой. Проезд возможен от метро Киевская на электричке до станции Толстопальцево, дальше две остановки на автобусе, либо на маршрутном автобусе 510 от метро Юго Западная. Цена указана с коммунальными услугами включительно. Официальная временная регистрация также возможна. Аэропорт Внуково находится неподалеку. Ходит автобус городской и от него.

  7. I enjoyed wasting time with my family, eating a great dinner, and exchanging presents with your family.|No one likes to feel like they are being lectured to in the presentation. Why do concentrate too much of it as an entity, a thing, a proper noun, that might include a capital C? It attacked her lymph glands, her bone, her brains.|You can use fundamental, technical, or sentimental analysis. Remember practice will make you more comfortable. If you put precise numbers personal graphs, you’re making a blunder.|This means that costs can fall and rise steeply and quickly. To accomplish this you have to include enough information to answer all concerns of a journalist. When you have conversations, your words flow without chemicals.|Unbelievably that there is, however, some good news available here. These are things good for your health to realise. Delicious lead to high bills in the final.|Many best-sellers were originally self-published and later on picked up by big houses.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s