Java: Concurrency

Basic Threads

  • Create Thread object & execute threadObject.start()
  • Care must be taken to call myThread.start() and NOT myThread.run() – which calls the run method in-line, no new thread starts.
  • Static method Thread.sleep(millis) sends the current thread to sleep.

Option A (preferred) – Runnable Interface

public class MyRunnable implements Runnable{

	public static void main(String[] args){
		Thread myThread = new Thread(new MyRunnable());
		myThread.start();
	}

	@Override
	public void run() {
              // Do Thread Stuff
	}
}

Option B – Extend Thread

public class MyThread extends Thread{

	public static void main(String[] args){
		Thread myThread = new MyThread();
		myThread.start();
	}

	@Override
	public void run() {
              // Do Thread Stuff
	}
}

Executors

The Executors factory creates ExecutorServices that can be used to manage concurrent activity.

Methods for creating executor services thread types

newSingleThreadExecutor()Single thread
newFixedThreadPool(threadCnt)Pool of multiple threads – fixed number of threads
newCacheThreadPool()Creates new threads & reuses dormant ones, unmanaged creation, not for long-lived activity (can explode)
newScheduledThreadPool()For managing future scheduled / repeating activity

Types of Activity

  • Runnable interface: “run” method, return void
  • Callable<V> interface : “call” method, returns V

If a Callable is executed it could be some time before V is available. Interacting with callables normally involves accepting a Future<V> as a return type. You can

  • call myFuture.get() to wait until V become available (current thread hangs indefinitely)
  • call myFuture.get(x, TimeUnit.SECONDS) to wait x seconds then throw TimeoutException if V is not yet available

When dealing with executor services

  • Execute (Runnable only) – places the Runnable activity on the queue
  • Submit (Runnable or Callable) – places the activity on the queue, returns a Future<V> to retrieve output (eventually)
  • InvokeAll (Callable only) – tries to perform activity, returns when all complete.
  • InvokeAny (Callable only) – tries to perform activity, returns when first completes, tries to cancel remainder.

To halt an executor service call shutdown() service. It will stop accepting requests, tries to finish currently executing, isShutdown() will immediately be true, isTerminated() will be false. When last activity stops, isTerminated() will be true.

ScheduledExecutor

ScheduledExecutorService can be used in a number of ways

  • schedule() – call in x secs/mins – returns ScheduledFuture
  • scheduleAtFixedRate() – call every x secs/mins, starting in y secs/mins
  • scheduleAtFixedDelay() – call x secs/mins after previous execution finishes, starting in y secs/mins

Race Conditions

Two threads executing the same code with shared memory can trample over each other. When multiple threads access the same memory location, need to co-ordinate to ensure only one accesses the resource at the one time and completes its work before other threads get access.

Atomic Objects

Atomic Objects (e.g. AtomicBoolean, AtomicInteger, AtomicLong) have selected protected processing methods intended to use with multiple concurrent threads.

Synchronized Blocks

MyClass myObject = new MyClass()
synchronized(myObject){
  … only one thread has access to this code at one time...
}

Threads will queue for access to the synchronized block. Can be slow.

Reentrant Lock

Lock lock = new ReentrantLock();
try{
  lock.lock();
  … protected processing…
}finally{
  lock.unlock()
}

Allows threads to check for a lock before waiting using the lock.tryLock() method.

Cyclic Barrier

var myCyclicBarrier = new CyclicBarrier(3);

myCyclicBarrier.await();

...further processing…

in this case 3 threads have to have called await() before any progresses to the further processing. A variant is available that performs a function when the barrier is broken.

Concurrent Collections

There are a number of specialist concurrent collections. The performance & memory requirements will probably be worse than non-threadsafe equivalents.

  • ConcurrentHashMap
  • ConcurrentLinkedQueue
  • ConcurrentSkipListMap
  • ConcurrentSkipListSet
  • CopyOnWriteArrayList
  • CopyOnWriteArraySet
  • LinkedBlockingQueue

SkipList – sorted, like TreeMap, TreeSet

CopyOnWrite – data is copied when references change, original data remains available and unchanged

Synchronize Collections

Can use utility methods on the Collections object to create synchronized versions of common collections.

Set syncSet = Collections.synchronizedSet(basicSet);
Set syncMap = Collections.synchronizedMap(basicMap);

Note iteration is not stable with multiple threads (concurrent modification exception).

Streams

Can convert streams to a parallel stream.

Stream myParallelStream = myStream.parallel();
      // convert any stream to parallel stream
Stream myParallelStream = myCollection.parallelStream();
      // create parallel stream direct from collections

Reduce & collect only work well for parallel streams if “accumulator” and “combine” operations can operate in any order. If output depends on order of processing then results unpredictable.

In general, where possible stream processing should not include operations that involve state / order of processing.

x → myList.add(x); // should not be used as a stream lamda function. Order is uncertain in parallel operation.

Use stream native approaches instead:

myStream.collect(Collectors.toList());