multithreading - How in Dart implement a bounded buffer? -
how in dart solve producer-consumer problem?
possible implement same thing in dart?
// https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/condition.html class boundedbuffer { final lock lock = new reentrantlock(); final condition notfull = lock.newcondition(); final condition notempty = lock.newcondition(); final object[] items = new object[100]; int putptr, takeptr, count; public void put(object x) throws interruptedexception { lock.lock(); try { while (count == items.length) notfull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notempty.signal(); } { lock.unlock(); } } public object take() throws interruptedexception { lock.lock(); try { while (count == 0) notempty.await(); object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notfull.signal(); return x; } { lock.unlock(); } } }
since can't block function trying add, need callers respect abstraction. here [add] returns future, , if call again before future completes, considered different producer. buffer stores capacity+number of producers entries. equivalent buffer of size capacity , "number of producers" blocked threads, each 1 value waiting added.
import "dart:async"; import "dart:collection" show queue; class boundedasyncbuffer<t> { // state 1 of: // - 1 or more readers waiting, no elements in queue. // - between 0 , (capacity - 1) element in queue // - (capacity - 1 + n) elements in queue, n producers waiting. final int _capacity; final queue _buffer = new queue(); // unconsumed elements. final queue _waiting = new queue(); // "blocked" producers or consumers. boundedasyncbuffer(int capacity) : _capacity = capacity; future add(t element) { if (_buffer.isempty && _waiting.isnotempty) { _waiting.removefirst().complete(element); return new future.value(); } _buffer.add(element); if (_buffer.length >= _capacity) { var c = new completer(); _waiting.add(c); return c.future; } return new future.value(); } future<t> remove() { if (_buffer.isempty) { var c = new completer<t>(); _waiting.add(c); return c.future; } var result = _buffer.removefirst(); if (_waiting.isnotempty) { _waiting.removefirst().complete(); } return new future.value(result); } }
Comments
Post a Comment