Linkedin Interview Question
Software EngineersCountry: India
Interview Type: Phone Interview
class MultiPutBlockingBoundedQueueImpl<E> implements MultiPutBlockingBoundedQueue<E> {
private E[] buffer;
private int position;
@Override
public void init(int capacity) throws Exception {
if (buffer != null) {
throw new IllegalStateException("Already initialized");
} else if (capacity <= 0) {
throw new IllegalArgumentException("Capacity must be positive number");
} else {
buffer = (E[]) new Object[capacity];
}
}
@Override
public E get() throws Exception {
if (buffer == null) {
throw new IllegalStateException("Not initialized");
}
synchronized (buffer) {
while (position == 0) {
buffer.wait();
}
position--;
return buffer[position + 1];
}
}
@Override
public void put(E obj) throws Exception {
multiput(new ArrayList<E>(){{add(obj);}});
}
@Override
public void multiput(List<E> objs) throws Exception {
if (buffer == null) {
throw new IllegalStateException("Not initialized");
}
synchronized (buffer) {
while (position >= buffer.length - 1 - objs.size()) {
buffer.wait();
}
for (E obj : objs) {
position++;
buffer[position] = obj;
}
buffer.notifyAll();
}
}
}
class MultiPutBlockingBoundedQueueImpl<E> implements MultiPutBlockingBoundedQueue<E> {
private E[] buffer;
private int position;
@Override
public void init(int capacity) throws Exception {
if (buffer != null) {
throw new IllegalStateException("Already initialized");
} else if (capacity <= 0) {
throw new IllegalArgumentException("Capacity must be positive number");
} else {
buffer = (E[]) new Object[capacity];
}
}
@Override
public E get() throws Exception {
if (buffer == null) {
throw new IllegalStateException("Not initialized");
}
synchronized (buffer) {
while (position == 0) {
buffer.wait();
}
position--;
return buffer[position + 1];
}
}
@Override
public void put(E obj) throws Exception {
multiput(new ArrayList<E>(){{add(obj);}});
}
@Override
public void multiput(List<E> objs) throws Exception {
if (buffer == null) {
throw new IllegalStateException("Not initialized");
}
synchronized (buffer) {
while (position >= buffer.length - 1 - objs.size()) {
buffer.wait();
}
for (E obj : objs) {
position++;
buffer[position] = obj;
}
buffer.notifyAll();
}
}
}
public class MultiPutBoundedQueue {
private LinkedList list;
private int maxCapacity;
private final Lock lock = new ReentrantLock();
private final Condition readCondition = lock.newCondition();
private final Condition writeCondition = lock.newCondition();
private final Condition writeCondition2 = lock.newCondition();
private volatile int countDownLatch;
private volatile boolean spaceAvailable = true;
public void init(int capacity) {
assertCapacity(capacity);
list = new LinkedList();
maxCapacity = capacity;
}
private void assertCapacity(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException("Capacity must be positive number");
}
public Object get() throws InterruptedException {
assertInitialized();
lock.lock();
try {
while (list.size() == 0) {
readCondition.await();
}
Object o = list.remove();
if (countDownLatch > 0) {
countDownLatch--;
}
System.out.println("[-1] Read 1, " + list.size());
if (countDownLatch == 0) {
writeCondition2.signalAll();
}
return o;
} finally {
lock.unlock();
}
}
public void put(Object obj) throws InterruptedException {
multiput(Arrays.asList(obj));
}
public void multiput(List objs) throws InterruptedException {
assertInitialized();
assertOverCapacity(objs); // if objs.size() > maxCapacity
lock.lock();
try {
while (!spaceAvailable) {
writeCondition.await();
}
if (list.size() + objs.size() > maxCapacity) {
countDownLatch = list.size() + objs.size() - maxCapacity;
while (countDownLatch > 0) {
spaceAvailable = false;
writeCondition2.await();
}
spaceAvailable = true;
}
list.addAll(objs);
System.out.println(String.format("[%s] Added %s, ", Thread.currentThread().getName(), objs.size()) + list.size());
writeCondition.signalAll();
readCondition.signalAll();
} finally {
lock.unlock();
}
}
private void assertOverCapacity(List objs) {
if (objs.size() > maxCapacity) {
throw new IllegalArgumentException("Max number of items is: " + maxCapacity);
}
}
private void assertInitialized() {
if (list == null) {
throw new IllegalStateException("MultiPutBlockingBoundedQueue has not been initialized with .init(...)");
}
}
public static void main(String[] args) throws Exception {
final MultiPutBoundedQueue multiPutBoundedQueue = new MultiPutBoundedQueue();
multiPutBoundedQueue.init(10);
Thread put = new Thread(new Runnable() {
@Override
public void run() {
try {
int counter = 0;
while (true) {
Thread.currentThread().setName("+1/" + counter++);
multiPutBoundedQueue.put("");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread mput = new Thread(new Runnable() {
@Override
public void run() {
try {
int counter = 0;
while (true) {
Thread.currentThread().setName("+5/" + counter++);
multiPutBoundedQueue.multiput(Arrays.asList(1, 2, 3, 4, 5));
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread read = new Thread(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("-1");
try {
while (true) {
multiPutBoundedQueue.get();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
put.start();
mput.start();
read.start();
}
}
Looked like a simplified version of java.util.concurrent.ArrayBlockingQueue which implements java.util.concurrent.BlockingQueue will suffice. Except there is a twist of multi-put that must be atomic.
Here is an ad-hoc test driver to test (note that driver will hang at the end, because consumer threads will be blocked for producers to insert more elements, but producer threads would have ended):
- scgupta January 31, 2016