The term stealing can produce a bit of confusion when you are looking for the best Java Executor suited for your needs, so let's clear one for all the idea with few examples.
Despite how the name suggest, when a Thread in a ForkJoinPool starts performing a task, it is the only one that will execute the job, even if it becomes blocked or stalled during the execution. This is exactly how standard Java threads work: they will perform their run()
method from start until the end.
However, what changes between threads and tasks is the general semantic of the join()
: if a thread of a work-stealing pool joins to another ready task on the same executor, it can perform it, stealing the work associated to the task (which could have been scheduled to other threads, in the meanwhile).
This because tasks can be seen as unit of work, and join()
does not always imply a passive wait (as is for Java threads) but depends on the contextual Executor.
We can show this behavior with a simple scenario based on more single-threaded executor services: if the thread that is running on the executor appends new tasks to the queue and performs a join, it will run them to compute their results only if the executor is a work-stealing pool. Otherwise, it ends with a deadlock.
import org.junit.Test;
import java.util.List;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
public class ExecutorTest {
private final static Logger LOG = Logger.getLogger(ExecutorTest.class.getName());
@Test(timeout = 1000)
public void testWorkStealing() throws Exception {
ExecutorService executorService = Executors.newWorkStealingPool(1);
spawnTasks(executorService);
}
@Test(timeout = 1000)
public void testCachedPool() throws Exception {
ExecutorService executorService = newCachedThreadPool(1);
spawnTasks(executorService);
}
@Test(timeout = 1000)
public void testSingleThread() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
spawnTasks(executorService);
}
@Test(timeout = 1000)
public void testScheduledThread() throws Exception {
ExecutorService executorService = Executors.newScheduledThreadPool(1);
spawnTasks(executorService);
}
private void spawnTasks(ExecutorService executorService) throws InterruptedException, ExecutionException {
LOG.log(Level.INFO, "Thread executing the test: \"{0}\"", Thread.currentThread().getName());
executorService.submit(() -> {
String fatherThread = Thread.currentThread().getName();
LOG.log(Level.INFO, "Thread executing the main task: \"{0}\"", fatherThread);
List<Future> childrenTasks = Stream.generate(() -> executorService.submit(() -> {
String childThread = Thread.currentThread().getName();
LOG.log(Level.INFO, "Thread executing the child: \"{0}\"", childThread);
// Assert that the father performs the tasks of the children
assertEquals(fatherThread, childThread);
})).limit(3).collect(Collectors.toList());
for (Future childrenTask : childrenTasks) {
try {
childrenTask.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
LOG.log(Level.INFO, "All children completed! Thread: \"{0}\"", Thread.currentThread().getName());
}).get();
}
private static ExecutorService newCachedThreadPool(int maxThreadSize) {
return new ThreadPoolExecutor(0, 1,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}
ExecutorTest.testSingleThread [error]
ott 15, 2019 1:58:20 PM ExecutorTest spawnTasks
INFO: Thread executing the test: "Time-limited test"
ott 15, 2019 1:58:20 PM ExecutorTest lambda$spawnTasks$2
INFO: Thread executing the main task: "pool-1-thread-1"
org.junit.runners.model.TestTimedOutException: test timed out after 1000 milliseconds
ExecutorTest.testCachedPool [failed]
ott 15, 2019 1:58:21 PM ExecutorTest spawnTasks
INFO: Thread executing the test: "Time-limited test"
ott 15, 2019 1:58:21 PM ExecutorTest lambda$spawnTasks$2
INFO: Thread executing the main task: "pool-2-thread-1"
ott 15, 2019 1:58:21 PM ExecutorTest lambda$spawnTasks$0
INFO: Thread executing the child: "pool-2-thread-2"
ott 15, 2019 1:58:21 PM ExecutorTest lambda$spawnTasks$0
INFO: Thread executing the child: "pool-2-thread-4"
ott 15, 2019 1:58:21 PM ExecutorTest lambda$spawnTasks$0
INFO: Thread executing the child: "pool-2-thread-3"
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.junit.ComparisonFailure:
Expected :
pool-2-thread-[1]
Actual :
pool-2-thread-[2]
ExecutorTest.testScheduledThread [error]
ott 15, 2019 1:58:21 PM ExecutorTest spawnTasks
INFO: Thread executing the test: "Time-limited test"
ott 15, 2019 1:58:21 PM ExecutorTest lambda$spawnTasks$2
INFO: Thread executing the main task: "pool-3-thread-1"
org.junit.runners.model.TestTimedOutException: test timed out after 1000 milliseconds
ExecutorTest.testWorkStealing [passed]
ott 15, 2019 1:58:22 PM ExecutorTest spawnTasks
INFO: Thread executing the test: "Time-limited test"
ott 15, 2019 1:58:22 PM ExecutorTest lambda$spawnTasks$2
INFO: Thread executing the main task: "ForkJoinPool-1-worker-3"
ott 15, 2019 1:58:22 PM ExecutorTest lambda$spawnTasks$0
INFO: Thread executing the child: "ForkJoinPool-1-worker-3"
ott 15, 2019 1:58:22 PM ExecutorTest lambda$spawnTasks$0
INFO: Thread executing the child: "ForkJoinPool-1-worker-3"
ott 15, 2019 1:58:22 PM ExecutorTest lambda$spawnTasks$0
INFO: Thread executing the child: "ForkJoinPool-1-worker-3"
ott 15, 2019 1:58:22 PM ExecutorTest lambda$spawnTasks$2
INFO: All children completed! Thread: "ForkJoinPool-1-worker-3"