python - Set delay between tasks in group in Celery -


i have python app user can initiate task.

the whole purpose of task execute given number of post/get requests particular interval given url.

so user gives n - number of requests, v - number of requests per second.

how better design such task taking account due i/o latency actual r/s speed bigger or smaller.

first of decided use celery eventlet because otherwise need dozen of works not acceptable.

my naive approach:

  • client starts task using task.delay()
  • inside task this:

    @task def task(number_of_requests, time_period):    _ in range(number_of_requests):        start = time.time()        params_for_concrete_subtask = ...        # .... io monkey_patched eventlet requests library        elapsed = (time.time() - start)        # if completed subtask fast        if elapsed < time_period / number_of_requests:            eventlet.sleep(time_period / number_of_requests) 

a working example here.

if fast try wait keep desired speed. if slow it's ok client's prospective. not violate requests/second requirement. resume correctly if restart celery?

i think should work thought there better way. in celery can define task particular rate limit match needs guarantee. use celery group feature , write:

@task(rate_limit=...) def task(...):     #  task_executor = task.s(number_of_requests, time_period) group(task_executor(params_for_concrete_task) params_for_concrete_task in ...).delay() 

but here hardcode the rate_limit dynamic , not see way of changing it. saw example:

  task.s(....).set(... params ...) 

but tried pass rate_limit set method it did not work.

another maybe bettre idea use celery's periodic task scheduler. default implementation periods , tasks executed periodically fixed.

i need able dynamically create tasks, run periodically given number of times specific rate limit. maybe need run own scheduler take tasks db? not see documentation around this.

another approach try use chain function, not figure out there delay between tasks parameter.

if want adjust rate_limit dynamically can using following code. creating chain() @ runtime. run see override rate_limit of 5/sec 0.5/sec.

test_tasks.py

from celery import celery, signature, chain import datetime dt  app = celery('test_tasks') app.config_from_object('celery_config')  @app.task(bind=true, rate_limit=5) def test_1(self):     print dt.datetime.now()   app.control.broadcast('rate_limit',                        arguments={'task_name': 'test_tasks.test_1',                                   'rate_limit': 0.5})  test_task = signature('test_tasks.test_1').set(immutable=true)  l = [test_task] * 100  chain = chain(*l) res = chain() 

i tried override attribute within class, imo rate_limit set when task registered worker, why .set() has no effects. i'm speculating here, 1 have check source code.

solution 2

implement own waiting mechanism using end time of previous call, in chain return of function passed next one.

so this:

from celery import celery, signature, chain import datetime dt import time  app = celery('test_tasks') app.config_from_object('celery_config')  @app.task(bind=true) def test_1(self, prev_endtime=dt.datetime.now(), wait_seconds=5):     wait = dt.timedelta(seconds=wait_seconds)     print dt.datetime.now() - prev_endtime     wait = wait - (dt.datetime.now() - prev_endtime)     wait = wait.seconds     print wait     time.sleep(max(0, wait))     = dt.datetime.now()     print     return  #app.control.rate_limit('test_tasks.test_1', '0.5') test_task = signature('test_tasks.test_1')  l = [test_task] * 100  chain = chain(*l) res = chain() 

i think more reliable broadcast.


Comments

Popular posts from this blog

Java 3D LWJGL collision -

spring - SubProtocolWebSocketHandler - No handlers -

methods - python can't use function in submodule -