c# - Creating a file pickup process with a Blocking Collection -
what have got @ moment timer fires every 5000 ms:
static timer _atimer = new system.timers.timer(); static void main(string[] args) { _atimer.elapsed += new elapsedeventhandler(ontimedevent); _atimer.interval = 5000; _atimer.enabled = true; console.writeline("press \'q\' quit sample."); while (console.read() != 'q') ; }
on fire sets queues processing files:
private static void ontimedevent(object source, elapsedeventargs e) { // stop timer dont reprocess files have in queue stoptimer(); // setup list of queues var lists = new list<incomingorderqueue>(); //get accounts in files looking in var accounts = new list<string>() { "account1", "account2" }; //loop through accounts , set queue foreach (var acc in accounts) { // create queue var tmp = new incomingorderqueue(); // each file in folders add processed in queue foreach (var orderfile in orderfiles(acc)) { tmp.enqueuesweep(new queuevariables() { account = acc, file = orderfile }); } // add queue list of queues lists.add(tmp); } // each of queues consume contents of them parallel.foreach(lists, l => l.consume()); //start timer again because have finished files have in current queue starttimer(); } public static void stoptimer() { console.writeline("stop timer"); _atimer.stop(); _atimer.enabled = false; } public static void starttimer() { console.writeline("start timer"); _atimer.enabled = true; _atimer.start(); }
the blocking queue self:
public class incomingorderqueue { blockingcollection<queuevariables> _orderq = new blockingcollection<queuevariables>(); public void enqueuesweep(queuevariables incoming) { // add items queue _orderq.add(incoming); } public void consume() { // stop been adding queue _orderq.completeadding(); // consume objects in blocking collection parallel.foreach(_orderq.getconsumingenumerable(), processor.order.object); } public int queuecount { { return _orderq.count; } } }
what have works how should, start timer -> stop timer -> trigger process collecting files within folders -> process files -> restart timer.
i cant think there better way im doing when number of queues going created accounts 200 - 400.
thanks
i think don't need stop , start producers , consumers. blockingcollection
can block producers if reaches maximum capacity , block consumers if empty.
i'd start of single blockingcollection
, until profiling shows need one. depending on relative speed of producers , consumers, may need tweak numbers. if io bound, should asynchronous , can have many, if cpu bound won't need more number of processors available.
i've redone example assuming io bound producers , consumers, hope gives ideas. fires off producers on 10 second intervals , can keep going until cancel production via canellationtoken
. after have cancelled , completed production completeadding
release blocked consumers.
public class queuevariables { public string account {get;set;} public string file {get;set;} } public static concurrentqueue<string> getaccounts() { return new concurrentqueue<string>(new [] { "account1", "account2", "account3", "account4", "account5", "account6", "account7", "account8", "account9", "account10", "account11", "account12", }); } public static list<string> getfiles(string acct) { return new list<string> { "file1", "file2", "file3", "file4", "file5", "file6", "file7", "file8", "file9", "file10", "file11", "file12", }; } public static async task startperiodicproducers(int numproducers, timespan period, cancellationtoken ct) { while(!ct.iscancellationrequested) { var producers = startproducers(numproducers, ct); // wait production finish await task.whenall(producers.toarray()); // wait before running again console.writeline("***waiting " + period); await task.delay(period, ct); } } public static list<task> startproducers(int numproducers, cancellationtoken ct) { list<task> producingtasks = new list<task>(); var accounts = getaccounts(); (int = 0; < numproducers; i++) { producingtasks.add(task.run(async () => { string acct; while(accounts.trydequeue(out acct) && !ct.iscancellationrequested) { foreach (var file in getfiles(acct)) { _orderq.add(new userquery.queuevariables{ account = acct, file = file }); console.writeline("produced account:{0} file:{1}", acct, file); await task.delay(50, ct); // simulate production delay } } console.writeline("finished producing"); })); } return producingtasks; } public static list<task> startconsumers(int numconsumers) { list<task> consumingtasks = new list<task>(); (int j = 0; j < numconsumers; j++) { consumingtasks.add(task.run(async () => { try { while(true) { var queuevar = _orderq.take(); console.writeline("consumed account:{0} file:{1}", queuevar.account, queuevar.file); await task.delay(200); // simulate consumption delay } } catch(invalidoperationexception) { console.writeline("finished consuming"); } })); } return consumingtasks; } private static async task mainasync() { cancellationtokensource cts = new cancellationtokensource(); var periodicproducers = startperiodicproducers(2, timespan.fromseconds(10), cts.token); var consumingtasks = startconsumers(4); await task.delay(timespan.fromseconds(120)); // stop production cts.cancel(); try { // wait producers finish producing await periodicproducers; } catch(operationcanceledexception) { // operation cancelled } // complete adding release blocked consumers _orderq.completeadding(); // wait consumers finish consuming await task.whenall(consumingtasks.toarray()); } // maximum size 10, after capaicity reached producers block private static blockingcollection<queuevariables> _orderq = new blockingcollection<queuevariables>(10); void main() { mainasync().wait(); console.readline(); } // define other methods , classes here
Comments
Post a Comment