Google Interview Question
Software EngineersInterview Type: Phone Interview
Does this work? Maybe I'm missing something.
t1->{t2, t3, t4}
t2->{t4}
t3->{t4}
t4->{t5}
t4 gets executed before t5.
You're absolutely right, there's an issue with the way the algorithm increments the counts. I'll update my answer with a fix.
To be honest, during my interview I took a completely different approach, which was a simple extension of the initial algorithm relying on concurrent sets and latches.
Original code (flawed):
void mark_tasks(vec_task_t & tasks) {
for (Task * t : tasks) {
mark_map_t::iterator it;
if ((it = mark_map.find(t)) == mark_map.end()) {
mark_map.insert({ t, 0 });
} else {
it->second++;
}
mark_tasks(t->getDependencies());
}
}
Improved code:
void mark_tasks(vec_task_t & tasks, vec_task_t::size_type d = 0) {
for (Task * t : tasks) {
mark_map_t::iterator it;
if ((it = mark_map.find(t)) == mark_map.end()) {
mark_map.insert({ t, d });
} else {
it->second += d;
}
mark_tasks(t->getDependencies(), d+1);
}
}
Even slightly more optimized (doesn't traverse dependencies if parent count didn't increase, counter grows slower):
void mark_tasks(vec_task_t & tasks, vec_task_t::size_type d = 1) {
for (Task * t : tasks) {
mark_map_t::iterator it;
if ((it = mark_map.find(t)) == mark_map.end()) {
it = mark_map.insert({ t, 0 }).first;
}
if (it->second < d) {
it->second = d;
mark_tasks(t->getDependencies(), d+1);
}
}
}
Yes, I think the second version solves the problem. I don't see how the first is supposed to though.
But there is still one problem: *Every* task at level d has to wait for *every* task in level d+1 to finish. This greatly reduces the potential parallelism. Have a look at my implementation which takes a different approach and doesn't have this problem.
The first version represents what I intended to implement with the flawed code. For my test inputs it generates the same ordering as the second version, but it could be that under certain conditions it will fragment the ordering more so than the second version. I haven't looked into that though.
I like your approach and I can definitely see and acknowledge that it achieves a higher level of paralllelism (optimal even). Very cool!
Example set for readers:
1 -> { 2,3,4,5 }
2 -> { 6 }
3 -> { 4 }
4 -> { 6 }
5 -> { 7 }
6 -> { 7 }
The first version represents what I intended to implement with the flawed code. For my test inputs it generates the same ordering as the second version, but it could be that under certain conditions it will fragment the ordering more so than the second version. I haven't looked into that though. Performance of the second version is definitely better.
I like your approach and I can definitely see and acknowledge that it achieves a higher level of paralllelism (optimal even). Very cool!
Example set for readers:
1 -> { 2,3,4,5 }
2 -> { 6 }
3 -> { 4 }
4 -> { 6 }
5 -> { 7 }
6 -> { 7 }
We can create a dependency graph(use totpographical ) sort and then run the tasks parallely
Here is a different implementation from the OP's. It has the advantage that all threads are kept as busy as possible.
The basic idea is to create a table (std::unordered_multimap) which maps a task to all the parent tasks which depend on it. In addition, a seperate table (std::unordered_map) tracks how many dependencies a task is still waiting for (in real life, this would probably be part of the Task interface). A queue holds all the tasks which are currently available to run.
The thread code follows the Producer-Consumer model, using a std::condition_variable. As soon as a Task's dependencies are completed, it is added to the queue, any waiting threads are signaled, and the task is run when a thread is available.
class Task {
public:
virtual void run() = 0;
virtual std::vector<Task *> & getDependencies() = 0;
};
class TestTask : public Task {
public:
TestTask(int id, std::initializer_list<Task*> deps = {}){ mId = id; mDeps = deps; }
void run() { std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 1000)); }
std::vector<Task *> & getDependencies(){ return mDeps; }
int id() { return mId; }
private:
int mId;
std::vector<Task*> mDeps;
};
std::mutex mMutex;
std::condition_variable mCond;
//Tasks waiting to run, whose dependencies have been fulfilled
std::queue<Task*> mAvailableTasks;
//Maps a task to all the tasks which depend on it
std::unordered_multimap<Task*, Task*> mUnlockLists;
//Maps a task to the number of tasks it is waiting on
std::unordered_map<Task*, int> mLockCounts;
int mTasksToProcess; //Remaining task count
void ThreadFunc(int id)
{
std::unique_lock<std::mutex> mtx(mMutex);
while (1)
{
//Wait for a task to become available. If finished
//(mTasksToProcess == 0), exit thread
while (!mAvailableTasks.size() && mTasksToProcess)
{
std::cout << "Thread " << id << " waiting\n";
mCond.wait(mtx);
}
if (!mTasksToProcess)
break;
//Grab next available task, unlock mutex, and run it
Task* pTask = mAvailableTasks.front();
mAvailableTasks.pop();
mTasksToProcess--;
std::cout << "Thread " << id << " running task "
<< ((TestTask*)pTask)->id() << '\n';
mtx.unlock();
pTask->run();
mtx.lock();
std::cout << "Thread " << id << " finished task "
<< ((TestTask*)pTask)->id() << '\n';
//Find all the tasks that depend on the finished task
//Decrement it's lock count. If 0, add that task to
//the queue and signal all threads
auto unlocks = mUnlockLists.equal_range(pTask);
for (; unlocks.first != unlocks.second; unlocks.first++)
{
auto count = mLockCounts.find((*unlocks.first).second);
if (!--count->second)
{
mAvailableTasks.push(count->first);
mCond.notify_all();
}
}
}
std::cout << "Thread " << id << " finished\n";
}
void PreProcessTask(Task* pTask)
{
if (mLockCounts.find(pTask) == mLockCounts.end())
{
//Add dependency count of task to lockcount table.
//if it is 0, add it to the waiting Q
std::vector<Task*> &deps = pTask->getDependencies();
mLockCounts[pTask] = deps.size();
if (!deps.size())
mAvailableTasks.push(pTask);
//For each dependency, add it to the
//dependency->dependents mapping, and preprocess it
for (Task*pDep : deps)
{
mUnlockLists.insert({ pDep, pTask });
PreProcessTask(pDep);
}
mTasksToProcess++;
}
}
void main()
{
TestTask* t7 = new TestTask(7, {});
TestTask* t6 = new TestTask(6, {});
TestTask* t5 = new TestTask(5, {t6});
TestTask* t4 = new TestTask(4, {t5});
TestTask* t3 = new TestTask(3, {t6, t7});
TestTask* t2 = new TestTask(2, {t6});
TestTask* t1 = new TestTask(1, {t2, t3, t4, t7});
PreProcessTask(t1);
std::vector<std::thread> threads;
for (int i = 0; i < 3; i++)
threads.push_back(std::thread(ThreadFunc, i + 1));
for (auto& t : threads)
t.join();
getch();
}
Sample output:
Thread 1 running task 6
Thread 2 running task 7
Thread 3 waiting
Thread 1 finished task 6
Thread 1 running task 2
Thread 3 running task 5
Thread 2 finished task 7
Thread 2 running task 3
Thread 3 finished task 5
Thread 3 running task 4
Thread 1 finished task 2
Thread 1 waiting
Thread 2 finished task 3
Thread 2 waiting
Thread 3 finished task 4
Thread 3 running task 1
Thread 2 finished
Thread 1 finished
Thread 3 finished task 1
Thread 3 finished
This is an extended topological sort which keeps track of the different "levels" in the graph. Here's a quick implementation in Python as a generator:
def parallel_schedule(data):
while True:
ordered = set(item for item, dep in data.items() if not dep)
if not ordered:
break
yield ' '.join(sorted(ordered))
data = {item: (dep - ordered) for item, dep in data.items() \
if item not in ordered}
Note that data here is a dictionary which maps each task to a set of its dependencies. I'm also assuming that a task with no dependencies occurs in data as mapped to the empty set (e.g., data = {'A': set(), 'B': set(['A']), ...}).
This works by grabbing those tasks with no dependencies, executing them in parallel, removing them from the graph, and then repeating the process.
I am not sure if my approach is correct, but the idea is, since there are multiple threads executing, I use an integer releaseCount to check if all resources have been acquired. If it is, or if it is not at the top of the stack (according to the problem, all dependencies should be executed first), the current task should wait. When it comes to its turn, it will first schedule all of its dependencies and execute them first, then execute the current task. After executing the task, increment the releaseCount and let other task to acquire the resource.
public class TaskSchedulerParallel {
Set<Task> executed;
Stack<Task> scheduler;
int releaseCount;
//number of parallel nodes
public TaskSchedulerParallel(int N){
executed = new HashSet<Task>();
scheduler = new Stack<Task>();
releaseCount = N;
}
public synchronized void schedule(Task t) throws InterruptedException {
scheduler.push(t);
for(Task dep : t.GetDependencies()){
if(!executed.contains(dep) && !scheduler.contains(dep))
schedule(dep);
}
if(releaseCount == 0 || (!scheduler.isEmpty() && scheduler.peek() != t))
t.wait();
releaseCount--;
scheduler.pop();
t.Run();
executed.add(t);
releaseCount++;
}
}
There are 2 solutions.
First solution is a top-down way. If task A depends on task B, we say A is a child of B. Preprocess all the tasks, construct the tasks trees, then we make the roots of all trees to be the children of a virtual root (Task Init). We start from execute(Task Init):
void execute(Task A)
{
waitForFinish(A.dependencies());
A.run();
foreach(Task C in A.children())
new thread(execute(C));
}
The second solution is the bottom-up way, and needs no preprocessing:
foreach(Task A in Set)
new thread(execute(A));
void execute(Task A)
{
foreach(Task B in A.dependencies())
new thread(execute(B));
waitForFinish(A.dependencies());
A.run();
}
NOTE that the check of re-execution of tasks is omitted since it's trivial using lock or atomic.
This one's a little bit more tricky, let's start out with the same C++ abstract class as before.
And the original solution to this problem:
Now the tricky thing is of course that you can't just slab a thread on the run call and be done. You have to take into account that all dependencies should be executed first. That's not an issue at all actually, but it requires you to generalize the marked set to something that can be used to identify the ordering of tasks based upon their rate of dependency.
The set allows the property task -> bool or task -> [0, 1]. A map however allows the property task -> [0, n].
A map would need to be used instead of the set to get a dependency count. Use a hash map (unordered_map), because it allows relatively fast lookup.
However, this map doesn't allow ordering. To order the tasks by their dependency count you could use a multimap, which allows multiple values for the same key. This is necessary because you could have tasks with the same dependency count. For example, a set of tasks with no dependencies will have the same dependency count.
After ordering the tasks you can batch them up by executing same-keyed tasks in parallel and awaiting their completion, which takes as long as the longest running task for that dependency count. Starting with the highest key you'll execute all tasks which have no dependencies and are the most depended upon. Key 0 will map to tasks that are never depended upon.
- John March 07, 2015