c# - Creating a file pickup process with a Blocking Collection -


what have got @ moment timer fires every 5000 ms:

static timer _atimer = new system.timers.timer();      static void main(string[] args)     {         _atimer.elapsed += new elapsedeventhandler(ontimedevent);          _atimer.interval = 5000;         _atimer.enabled = true;          console.writeline("press \'q\' quit sample.");         while (console.read() != 'q') ;      } 

on fire sets queues processing files:

        private static void ontimedevent(object source, elapsedeventargs e)     {         // stop timer dont reprocess files have in queue         stoptimer();          // setup list of queues         var lists = new list<incomingorderqueue>();         //get accounts in files looking in         var accounts = new list<string>() { "account1", "account2" };         //loop through accounts , set queue          foreach (var acc in accounts)         {             // create queue             var tmp = new incomingorderqueue();             // each file in folders add processed in queue             foreach (var orderfile in orderfiles(acc))             {                 tmp.enqueuesweep(new queuevariables() { account = acc, file = orderfile });             }             // add queue list of queues             lists.add(tmp);         }         // each of queues consume contents of them         parallel.foreach(lists, l => l.consume());          //start timer again because have finished files have in current queue         starttimer();     }          public static void stoptimer()     {         console.writeline("stop timer");         _atimer.stop();         _atimer.enabled = false;     }      public static void starttimer()     {         console.writeline("start timer");         _atimer.enabled = true;         _atimer.start();     } 

the blocking queue self:

 public class incomingorderqueue  {     blockingcollection<queuevariables> _orderq = new blockingcollection<queuevariables>();      public void enqueuesweep(queuevariables incoming)     {         // add items queue         _orderq.add(incoming);     }      public void consume()     {         // stop been adding queue         _orderq.completeadding();         // consume objects in blocking collection         parallel.foreach(_orderq.getconsumingenumerable(), processor.order.object);     }      public int queuecount     {                 {             return _orderq.count;         }     } } 

what have works how should, start timer -> stop timer -> trigger process collecting files within folders -> process files -> restart timer.

i cant think there better way im doing when number of queues going created accounts 200 - 400.

thanks

i think don't need stop , start producers , consumers. blockingcollection can block producers if reaches maximum capacity , block consumers if empty.

i'd start of single blockingcollection, until profiling shows need one. depending on relative speed of producers , consumers, may need tweak numbers. if io bound, should asynchronous , can have many, if cpu bound won't need more number of processors available.

i've redone example assuming io bound producers , consumers, hope gives ideas. fires off producers on 10 second intervals , can keep going until cancel production via canellationtoken. after have cancelled , completed production completeadding release blocked consumers.

public class queuevariables {     public string account {get;set;}     public string file {get;set;} }  public static concurrentqueue<string> getaccounts() {     return new concurrentqueue<string>(new []         {         "account1",         "account2",         "account3",         "account4",         "account5",         "account6",         "account7",         "account8",         "account9",         "account10",         "account11",         "account12",     }); }  public static list<string> getfiles(string acct) {     return new list<string>     {         "file1",         "file2",         "file3",         "file4",         "file5",         "file6",         "file7",         "file8",         "file9",         "file10",         "file11",         "file12",     }; }  public static async task startperiodicproducers(int numproducers, timespan period, cancellationtoken ct) {     while(!ct.iscancellationrequested)     {         var producers = startproducers(numproducers, ct);          // wait production finish         await task.whenall(producers.toarray());          // wait before running again         console.writeline("***waiting " + period);         await task.delay(period, ct);     } }  public static list<task> startproducers(int numproducers, cancellationtoken ct) {     list<task> producingtasks = new list<task>();     var accounts = getaccounts();      (int = 0; < numproducers; i++)     {         producingtasks.add(task.run(async () =>         {             string acct;             while(accounts.trydequeue(out acct) && !ct.iscancellationrequested)             {                 foreach (var file in getfiles(acct))                 {                     _orderq.add(new userquery.queuevariables{ account = acct, file = file });                     console.writeline("produced account:{0} file:{1}", acct, file);                     await task.delay(50, ct); // simulate production delay                 }             }              console.writeline("finished producing");         }));     }      return producingtasks; }  public static list<task> startconsumers(int numconsumers) {     list<task> consumingtasks = new list<task>();      (int j = 0; j < numconsumers; j++)     {         consumingtasks.add(task.run(async () =>         {             try             {                 while(true)                 {                     var queuevar = _orderq.take();                     console.writeline("consumed account:{0} file:{1}", queuevar.account, queuevar.file);                     await task.delay(200); // simulate consumption delay                 }             }             catch(invalidoperationexception)             {                 console.writeline("finished consuming");             }         }));     }      return consumingtasks; }  private static async task mainasync() {     cancellationtokensource cts = new cancellationtokensource();     var periodicproducers = startperiodicproducers(2, timespan.fromseconds(10), cts.token);     var consumingtasks = startconsumers(4);      await task.delay(timespan.fromseconds(120));      // stop production     cts.cancel();      try     {         // wait producers finish producing         await periodicproducers;     }     catch(operationcanceledexception)     {         // operation cancelled     }      // complete adding release blocked consumers     _orderq.completeadding();      // wait consumers finish consuming     await task.whenall(consumingtasks.toarray()); }  // maximum size 10, after capaicity reached producers block private static blockingcollection<queuevariables> _orderq = new blockingcollection<queuevariables>(10);  void main() {     mainasync().wait();     console.readline(); }  // define other methods , classes here 

Comments

Popular posts from this blog

Java 3D LWJGL collision -

spring - SubProtocolWebSocketHandler - No handlers -

methods - python can't use function in submodule -