Amazon Interview Question
Software Engineer / DevelopersCountry: India
Interview Type: In-Person
I think initially all thread other than the first be suspended, as a thread enters the method to read word it suspends the previous thread and while it exits it resumes the next thread :
Keep an array of the threads[8] = {threadId1, threadId2, ..., threadId8};
Thread.suspend(threads[1]);
Thread.suspend(threads[2]);
...
Thread.suspend(threads[7]);
Sychronize void readWord(int index)
{
if(index == 0)
Thread.suspend(threads[7]);
else
Thread.suspend(threads[index-1]);
// procedure to read word
if(index == 7)
Thread.resume(threads[0]);
else
Thread.resume(threads[index+1]);
}
Please let me know your views on this.
It is not a good practice to suspend the threads. It is better to use some synchronization mechanisms
// m semaphores (one for each thread)
semaphore sem[m];
// semaphores for all but 0th thread are initially blocked
sem[0].init(1);
sem[1..m-1].init(0);
int word_count = 0; // shared variable to keep the no of printed words so far
// parallel code
TID = thread_id(); // TID = 0..m-1
while(1) {
sem[TID].wait(); // wait on semaphore
if(word_count < n) { // this code is guaranteed to execute by one thread
print(word[word_count]); // hence no need for critical section
word_count++;
}
sem[(TID+1)%m].signal(); // signal the next thread in a chain
if(word_count == n) // exit gracefully when all words are printed
break;
}
@asm,
Your pseudo code really helped me understand this. I have two doubts to what you wrote
1) Exit gracefully - Shouldnt we suspend or kill all threads as well in the end?
2) While signalling the next thread, shouldnt the current thread be init with value 0, so that next time it will wait at the starting of the loop?
One possible solution:
- create 'm' synchronization primitives (say, AutoResetEvent), one for each thread
- call it, say, h1, h2, h3..hm
- Initially all the synchonization objects are set to non-signaled stae, except h1. So, Thread1 can print the first word.
- Each thread has to wait on one synchonization object. Ex: thread1 will always wait on h1.
- Just before exiting each thread has to perform this:
* Each thread will signal its immediate thread's synchronization object. Ex: Once thread1 completes its task, it signals h2. So the next thread can get the access
* It resets it own synchronization object. Ex: thread1 will reset h1 (if not AutoResetEvent)
- Hence, each thread should get 2 synchronization objects
* one object to wait on
* the next synchonization object to signal. Ex: Thread1 will get (h1 and h2), it waits on h1 and signals h2.
- Thread m will wait on hm, and once it completes printing it signals h1. So the first thread can gain the access.
- Each thread will pick the next word and print. Make this code thread safe(Ex: use monitor/ criticalsection)
- Thread is finished, when no more words are there to print. So each worker thread has to wait for the synchronization object, once it acquires it can check if there is a word to print. If not then, it can signal next object and come out of the loop.
- Main thread will wait for worker threads to complete its tasks.(Join).
I like asm's solution. pthread pseudo code is below.
int main()
{
#define NUM_THREADS 8
sem_t sem[NUM_THREADS];
pthread_t thread[NUM_THREADS];
for(int i=0; i<NUM_THREADS; i++) {
sem_init(&sem[i], !i);
pthread_create(&thread[i], runme, <sem, i>);
}
for(int i=0; i<NUM_THREADS; i++) {
pthread_join(thread[i]);
}
return 0;
}
void *runme(void *arg) {
<sem, i> =get_params(arg);
sem_wait(&sem[i]);
if(! end of file) read and print word;
sem_post(&sem[i+1]);
}
void *runme(void *arg) {
<sem, i> =get_params(arg);
while(true) {
sem_wait(&sem[i]);
if(end of file) break;
read and print word;
sem_post(&sem[(i+1)%NUM_THREADS]);
}
}
But, I have read that mutexes are used to synchronise threads and semaphores are used o synchronise processes. U are dealing with threads here an you should use mutexes here.. Am I correct ? Please correct me if am wrong
You code will result in deadlock because when file has been read, you are breaking of the loop while multiple threads would be waiting on their sem_wait(). To avoid it, you shoud use one flag, say bEnd=true. When one thread finds the end of file, it should set the value of this flag as false and do sem_post for all the remaining threads. When other threads also find the EOF, they should check the value of bEnd, if set to false, the thread should return.
In this question, even there is multiple threads, but it is still a sequential program. So we can use a token to guarantee the order. Here is a java implementation, which I have tested:
public class PrintThread {
boolean [] start; // a array that pass the token (actually you can use a single int to represent that)
int pointer ; // the pointer to the current position in the paragraph
String [] para; // the paragraph
int speed ; // the waiting time for each thread before printing
ReadThread []rt; // the threads that print out the words
public PrintThread (int n, int speed){
start = new boolean [n];
for (int i = 1 ; i < start.length; i++){
start[i]= false;
}
start [0] = true;
pointer = 0;
generatePara(30);
this.speed= speed;
rt = new ReadThread [n];
for (int i = 0 ; i < start.length ; i++){
rt[i] = new ReadThread (i);
}
}
// generate a paragraph for testing
private void generatePara(int n) {
para = new String [n];
for (int i = 0 ; i < n ; i++){
para [i] = " "+i;
}
}
// start the simulation
public void startSim (){
for (int i = 0 ; i < start.length ; i++){
rt[i].start();
}
return;
}
// each thread that can print out the words
public class ReadThread extends Thread {
int index ; // the thead id
public ReadThread (int n){
index = n;
}
@Override
public void run() {
while (true){
try {
Thread.sleep(speed); // sleep for a while before printing
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if (pointer == para.length){ // if it is the end of the para just stop
return;
}
if (start[index]){ // if the token is passed to me
if (pointer != para.length){ // test again if the current pointer is out of bound
// because it is possible that pointer is increased between this test and last test
System.out.println("I'm "+index+" : "+para[pointer]); // print out
}
pointer ++; // increase the pointer
int next;
if (index == start.length -1){
next = 0;
} else {
next = index +1;
}
start [index] = false; // pass the token to next
start [next] = true;
}
}
}
}
public static void main (String args []){
PrintThread pt = new PrintThread (5, 1000);
pt.startSim();
}
}
Can we not just have a queue for each thread? When a thread is 'done' it calls some 'ScheduleNextEvent()' method that enqueues the next task for the next thread. The ScheduleNextEvent() and the enqueue methods need to be threadsafe, which is quite easy to achieve.
I agree this isn't very efficient, but it's pretty clean. Besides, the question itself doesn't make much practical sense and so, I think the solution doesn't really need to be practical either.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
public class Words implements Runnable{
private ArrayBlockingQueue<String>paragraph=new ArrayBlockingQueue<String>(10);
private CountDownLatch latch;
public Words(CountDownLatch latch)
{
this.latch=latch;
String [] s="This is the paragraph in order".split(" ");
for(String i : s)
paragraph.add(i);
}
public void run()
{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(!paragraph.isEmpty())
System.out.println(paragraph.poll()+" "+Thread.currentThread());
latch.countDown();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String [] args) throws InterruptedException
{
Scanner scan=new Scanner(System.in);
ExecutorService execute=Executors.newFixedThreadPool(4);
CountDownLatch latch=new CountDownLatch(4);
final Words cards=new Words(latch);
for(int i=0;i<cards.paragraph.size();i++)
execute.submit(cards);
execute.shutdown();
}
}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class Grace {
public static void main(String[] args) {
List<String> lst = Arrays.asList("first","second","third");
int noOfThread = 2;
Semaphore s = new Semaphore(1);
ExecutorService executor=Executors.newFixedThreadPool(noOfThread);
for(int i=0;i<lst.size(); i++){
CyclePrint t3 = new CyclePrint(s,lst);
executor.execute(t3);
}
executor.shutdown();
}
}
class CyclePrint extends Thread{
Semaphore s;
List<String> lst;
static AtomicInteger cnt = new AtomicInteger(0);
static AtomicBoolean done = new AtomicBoolean(false);
public CyclePrint(Semaphore s, List<String> lst) {
this.s = s;
this.lst = lst;
}
public void run(){
if(!done.get()){
try {
s.acquire();
System.out.println("Thread "+Thread.currentThread().getName()+" "+lst.get(cnt.getAndIncrement()));
if(cnt.get()==lst.size()) done.set(true);
s.release();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class Grace {
public static void main(String[] args) {
List<String> lst = Arrays.asList("1","2","3","4","5","6","7","8","9","10");
int noOfThread = 2;
Semaphore s = new Semaphore(1);
// ExecutorService executor=Executors.newFixedThreadPool(noOfThread);
// for(int i=0;i<lst.size(); i++){
// CyclePrint t3 = new CyclePrint(s,lst);
// executor.execute(t3);
// }
// executor.shutdown();
CountDownLatch latch = new CountDownLatch(noOfThread);
ExecutorService executor=Executors.newFixedThreadPool(noOfThread);
for(int i=0;i<lst.size(); i++){
CyclePrintLatch t3 = new CyclePrintLatch(latch,lst);
executor.execute(t3);
}
executor.shutdown();
}
}
class CyclePrintLatch extends Thread{
CountDownLatch s;
List<String> lst;
static AtomicInteger cnt = new AtomicInteger(0);
static AtomicBoolean done = new AtomicBoolean(false);
public CyclePrintLatch(CountDownLatch s, List<String> lst) {
this.s = s;
this.lst = lst;
}
public void run(){
if(!done.get()){
try {
System.out.println("Thread "+Thread.currentThread().getName()+" "+lst.get(cnt.getAndIncrement()));
if(cnt.get()==lst.size()) done.set(true);
s.countDown();
s.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class CyclePrint extends Thread{
Semaphore s;
List<String> lst;
static AtomicInteger cnt = new AtomicInteger(0);
static AtomicBoolean done = new AtomicBoolean(false);
public CyclePrint(Semaphore s, List<String> lst) {
this.s = s;
this.lst = lst;
}
public void run(){
if(!done.get()){
try {
s.acquire();
System.out.println("Thread "+Thread.currentThread().getName()+" "+lst.get(cnt.getAndIncrement()));
if(cnt.get()==lst.size()) done.set(true);
s.release();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
#include<iostream>
#include<mutex>
#include<thread>
#include<vector>
#include<condition_variable>
using namespace std;
mutex mu;
int current = 0;
condition_variable cond;
int i=0;
void printWord(int num, int M, int N, vector<string> str)
{
unique_lock<mutex> lock(mu);
while(i<N) {
if ( num == current)
{
cout<<"Printing by Thread :"<<num<<' '<<str[i]<<endl;
current = (current+1) % M;
i++;
}
else
cond.wait(lock);
cond.notify_all();
}
}
int main()
{
int M, N;
cin>>M;
cin>>N;
vector<string> str;
for(int j=0;j<N;j++)
{
string s;
cin>>s;
str.push_back(s);
}
thread threads[M];
for(int i=0;i<M;i++)
threads[i] = thread(printWord, i, M, N, str);
for(int i=0; i<M;i++)
threads[i].join();
cout<<"Task completed : Back to main"<<endl;
return 0;
}
import threading
import time
class PrintPara:
def __init__(self, paragraph, m):
self.paragraph_words = paragraph.split(' ')
self.n = len(self.paragraph_words)
self.idx = 0
self.m = m
self.semaphores = [threading.Semaphore(0) for _ in range(m)]
self.semaphores[0].release()
def print_word(self):
while self.idx < self.n:
TID = int(threading.currentThread().getName())
self.semaphores[TID].acquire()
if self.idx < self.n:
print(self.paragraph_words[self.idx] + " printed by " + threading.currentThread().getName())
self.idx += 1
self.semaphores[(TID + 1) % self.m].release()
else:
break
if __name__ == '__main__':
m = 3
para_obj = PrintPara("I am helping myself to get rid off these imaginary world", m)
for i in range(m):
t = threading.Thread(target=para_obj.print_word, name=str(i))
t.start()
With Java you cannot command or make a particular thread run. You can only use thread communication tools like synchronized blocks, locks, semaphores etc. and shared objects. So what I would do is :
- suchi August 15, 20121) initialize each thread (or runnable) with an unique number, eg. T1 - 1, T2 - 2 etc.
2) make all the threads wait on a common lock L
3) set a shared integer to a desired value, eg. i = 3
4) release all of the threads at once
5) have each thread check what's the current number. If it is the same as the thread's number, carry on, if not - sleep again (wait on the lock)
Be sure to synchronize access to i properly.