reactive programming - RxJava: retrying map actions -
i have observable each item transformed in way may result in exception, can retried. don't want failures break stream, each item represents independent transaction. best solution can come this:
final atomiclong errcount = new atomiclong(); observable.from(immutablelist.of(1l, 2l, 3l)).flatmap(new func1<long, observable<long>>() { @override public observable<long> call(long along) { return observable.from(immutablelist.of(along)).map(new func1<long, long>() { @override public long call(long along) { if (along == 2 && errcount.getandincrement() < 1) { throw new runtimeexception("retryable error"); } return along * 100; } }).retry(2); } }).foreach(new action1<long>() { @override public void call(long along) { system.out.println(along); } }); // desired output: 100, 200, 300 (not 100, 100, 200, 300)
problems:
- the retry logic verbose.
- if item fails after 2 retries, stream broken (no more items processed). i'd clean way return both exceptions , results finagle's try, can process exceptions.
the retry logic verbose.
you can avoid using immutablelist
entirely switching observable.just(t1, t2, t3)
constructor. same thing less verbose.
i see flatmapping in order convert each value observable. prevent onerror encountered while mapping single value unsubscribing entire chain. when operator throws unsubscribe 1 value's inner observable chain. otherwise error cause unsubscribe , resubscribe main outer observable.
i can think of 2 choices if want maintain behavior reduce boiler plate (aside obvious switch java8 lambdas).
firstly, resubscribe , dedupe data after post-retry. if have value has hashcode
, equals
implementation use filter append stateful set , onnext when set not contain value.
observable.<long> just(1l, 2l, 3l) .map(new func1<long, long>() { @override public long call(long along) { if (along == 2 && errcount.getandincrement() < 1) { throw new runtimeexception("retryable error"); } return along * 100; }}) .retry(2) .filter(new func1<long, boolean>() { set<long> state = null; @override public boolean call(long a) { if (state == null) state = new hashset<long>(); if (!state.contains(a)) { state.add(a); return true; } return false; }}) .foreach(new action1<long>() { @override public void call(long along) { system.out.println(along); }});
secondly, switch observable resume left off when resubscribed to. note can have problems data loss when using operators buffer (observeon, merge, flatmap). because continue consume producer in manner decoupled downstream consumer. want make sure not buffer before retry. there other considerations if implementing observable source supports back-pressure.
// should resume right left off resumableobservable.map(...).retry(2).observeon() // don't this. observeon buffer values , resume lose data. resumableobservable.map(...).observeon().retry(2) // bad if running async observables. merging buffers have data loss. observable.merge(resumableobservable.map(...)).retry(2)
if item fails after 2 retries, stream broken (no more items processed). i'd clean way return both exceptions , results finagle's try, can process exceptions.
you change unreliable map long -> long
long -> tuple<long, list<exception>>
. quite mouthful of generics , becomes cumbersome i'd recommend using different variant of retry operator, namely retrywhen(func1<observable<throwable>, observable<?>>)
. here example of how use in code.
}).retrywhen(new func1<observable<? extends throwable>, observable<?>>(){ @override public observable<?> call(observable<? extends throwable> o) { final atomicinteger count = new atomicinteger(); return o.filter(new func1<throwable, boolean>() { @override public boolean call(throwable t) { return t instanceof runtimeexception || count.getandincrement() < 5; }}).delay(1, timeunit.seconds, schedulers.immediate()); }})
the benefit of using retrywhen can implement delayed retry after amount of time in non-blocking style.
Comments
Post a Comment