python - Set delay between tasks in group in Celery -
i have python app user can initiate task.
the whole purpose of task execute given number of post/get requests particular interval given url.
so user gives n - number of requests, v - number of requests per second.
how better design such task taking account due i/o latency actual r/s speed bigger or smaller.
first of decided use celery eventlet because otherwise need dozen of works not acceptable.
my naive approach:
- client starts task using task.delay()
inside task this:
@task def task(number_of_requests, time_period): _ in range(number_of_requests): start = time.time() params_for_concrete_subtask = ... # .... io monkey_patched eventlet requests library elapsed = (time.time() - start) # if completed subtask fast if elapsed < time_period / number_of_requests: eventlet.sleep(time_period / number_of_requests)
a working example here.
if fast try wait keep desired speed. if slow it's ok client's prospective. not violate requests/second requirement. resume correctly if restart celery?
i think should work thought there better way. in celery can define task particular rate limit match needs guarantee. use celery group
feature , write:
@task(rate_limit=...) def task(...): # task_executor = task.s(number_of_requests, time_period) group(task_executor(params_for_concrete_task) params_for_concrete_task in ...).delay()
but here hardcode the rate_limit dynamic , not see way of changing it. saw example:
task.s(....).set(... params ...)
but tried pass rate_limit
set
method it did not work.
another maybe bettre idea use celery's periodic task scheduler. default implementation periods , tasks executed periodically fixed.
i need able dynamically create tasks, run periodically given number of times specific rate limit. maybe need run own scheduler take tasks db? not see documentation around this.
another approach try use chain
function, not figure out there delay between tasks parameter.
if want adjust rate_limit dynamically can using following code. creating chain() @ runtime. run see override rate_limit of 5/sec 0.5/sec.
test_tasks.py
from celery import celery, signature, chain import datetime dt app = celery('test_tasks') app.config_from_object('celery_config') @app.task(bind=true, rate_limit=5) def test_1(self): print dt.datetime.now() app.control.broadcast('rate_limit', arguments={'task_name': 'test_tasks.test_1', 'rate_limit': 0.5}) test_task = signature('test_tasks.test_1').set(immutable=true) l = [test_task] * 100 chain = chain(*l) res = chain()
i tried override attribute within class, imo rate_limit set when task registered worker, why .set() has no effects. i'm speculating here, 1 have check source code.
solution 2
implement own waiting mechanism using end time of previous call, in chain return of function passed next one.
so this:
from celery import celery, signature, chain import datetime dt import time app = celery('test_tasks') app.config_from_object('celery_config') @app.task(bind=true) def test_1(self, prev_endtime=dt.datetime.now(), wait_seconds=5): wait = dt.timedelta(seconds=wait_seconds) print dt.datetime.now() - prev_endtime wait = wait - (dt.datetime.now() - prev_endtime) wait = wait.seconds print wait time.sleep(max(0, wait)) = dt.datetime.now() print return #app.control.rate_limit('test_tasks.test_1', '0.5') test_task = signature('test_tasks.test_1') l = [test_task] * 100 chain = chain(*l) res = chain()
i think more reliable broadcast.
Comments
Post a Comment