Basic Threads
- Create Thread object & execute threadObject.start()
- Care must be taken to call myThread.start() and NOT myThread.run() – which calls the run method in-line, no new thread starts.
- Static method Thread.sleep(millis) sends the current thread to sleep.
Option A (preferred) – Runnable Interface
public class MyRunnable implements Runnable{
public static void main(String[] args){
Thread myThread = new Thread(new MyRunnable());
myThread.start();
}
@Override
public void run() {
// Do Thread Stuff
}
}
Option B – Extend Thread
public class MyThread extends Thread{
public static void main(String[] args){
Thread myThread = new MyThread();
myThread.start();
}
@Override
public void run() {
// Do Thread Stuff
}
}
Executors
The Executors factory creates ExecutorServices that can be used to manage concurrent activity.
Methods for creating executor services thread types
newSingleThreadExecutor() | Single thread |
newFixedThreadPool(threadCnt) | Pool of multiple threads – fixed number of threads |
newCacheThreadPool() | Creates new threads & reuses dormant ones, unmanaged creation, not for long-lived activity (can explode) |
newScheduledThreadPool() | For managing future scheduled / repeating activity |
Types of Activity
If a Callable is executed it could be some time before V is available. Interacting with callables normally involves accepting a Future<V> as a return type. You can
- call myFuture.get() to wait until V become available (current thread hangs indefinitely)
- call myFuture.get(x, TimeUnit.SECONDS) to wait x seconds then throw TimeoutException if V is not yet available
When dealing with executor services
- Execute (Runnable only) – places the Runnable activity on the queue
- Submit (Runnable or Callable) – places the activity on the queue, returns a Future<V> to retrieve output (eventually)
- InvokeAll (Callable only) – tries to perform activity, returns when all complete.
- InvokeAny (Callable only) – tries to perform activity, returns when first completes, tries to cancel remainder.
To halt an executor service call shutdown() service. It will stop accepting requests, tries to finish currently executing, isShutdown() will immediately be true, isTerminated() will be false. When last activity stops, isTerminated() will be true.
ScheduledExecutor
ScheduledExecutorService can be used in a number of ways
- schedule() – call in x secs/mins – returns ScheduledFuture
- scheduleAtFixedRate() – call every x secs/mins, starting in y secs/mins
- scheduleAtFixedDelay() – call x secs/mins after previous execution finishes, starting in y secs/mins
Race Conditions
Two threads executing the same code with shared memory can trample over each other. When multiple threads access the same memory location, need to co-ordinate to ensure only one accesses the resource at the one time and completes its work before other threads get access.
Atomic Objects
Atomic Objects (e.g. AtomicBoolean, AtomicInteger, AtomicLong) have selected protected processing methods intended to use with multiple concurrent threads.
Synchronized Blocks
MyClass myObject = new MyClass()
synchronized(myObject){
… only one thread has access to this code at one time...
}
Threads will queue for access to the synchronized block. Can be slow.
Reentrant Lock
Lock lock = new ReentrantLock();
try{
lock.lock();
… protected processing…
}finally{
lock.unlock()
}
Allows threads to check for a lock before waiting using the lock.tryLock() method.
Cyclic Barrier
var myCyclicBarrier = new CyclicBarrier(3);
myCyclicBarrier.await();
...further processing…
in this case 3 threads have to have called await() before any progresses to the further processing. A variant is available that performs a function when the barrier is broken.
Concurrent Collections
There are a number of specialist concurrent collections. The performance & memory requirements will probably be worse than non-threadsafe equivalents.
- ConcurrentHashMap
- ConcurrentLinkedQueue
- ConcurrentSkipListMap
- ConcurrentSkipListSet
- CopyOnWriteArrayList
- CopyOnWriteArraySet
- LinkedBlockingQueue
SkipList – sorted, like TreeMap, TreeSet
CopyOnWrite – data is copied when references change, original data remains available and unchanged
Synchronize Collections
Can use utility methods on the Collections object to create synchronized versions of common collections.
Set syncSet = Collections.synchronizedSet(basicSet);
Set syncMap = Collections.synchronizedMap(basicMap);
Note iteration is not stable with multiple threads (concurrent modification exception).
Streams
Can convert streams to a parallel stream.
Stream myParallelStream = myStream.parallel();
// convert any stream to parallel stream
Stream myParallelStream = myCollection.parallelStream();
// create parallel stream direct from collections
Reduce & collect only work well for parallel streams if “accumulator” and “combine” operations can operate in any order. If output depends on order of processing then results unpredictable.
In general, where possible stream processing should not include operations that involve state / order of processing.
x → myList.add(x); // should not be used as a stream lamda function. Order is uncertain in parallel operation.
Use stream native approaches instead:
myStream.collect(Collectors.toList());