scala - Spark: Counting co-occurrence - Algorithm for efficient multi-pass filtering of huge collections -
there table 2 columns books
, readers
of these books, books
, readers
book , reader ids, respectively :
books readers 1: 1 30 2: 2 10 3: 3 20 4: 1 20 5: 1 10 6: 2 30
record book = 1, reader = 30
means book id = 1
read user id = 30
. each book pair need count number of readers read both of these books, algorithm:
for each book each reader of book each other_book in books of reader increment common_reader_count ((book, other_book), cnt)
the advantage of using algorithm requires small number of operations compared counting book combinations two.
to implement above algorithm organize data in 2 groups : 1) keyed book, rdd containing readers of each book , 2) keyed reader, rdd containing books read each reader, such in following program:
import org.apache.spark.sparkconf import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.log4j.logger import org.apache.log4j.level object small { case class book(book: int, reader: int) case class bookpair(book1: int, book2: int, cnt:int) val recs = array( book(book = 1, reader = 30), book(book = 2, reader = 10), book(book = 3, reader = 20), book(book = 1, reader = 20), book(book = 1, reader = 10), book(book = 2, reader = 30)) def main(args: array[string]) { logger.getlogger("org.apache.spark").setlevel(level.warn) logger.getlogger("org.eclipse.jetty.server").setlevel(level.off) // set environment val conf = new sparkconf() .setappname("test") .set("spark.executor.memory", "2g") val sc = new sparkcontext(conf) val data = sc.parallelize(recs) val bookmap = data.map(r => (r.book, r)) val bookgrps = bookmap.groupbykey val readermap = data.map(r => (r.reader, r)) val readergrps = readermap.groupbykey // *** calculate book pairs // iterate book groups val allbookpairs = bookgrps.map(bookgrp => bookgrp match { case (book, reciter) => // iterate user groups reciter.tolist.map(rec => { // find readers book val areader = rec.reader // find books (including one) reader read val allreaderbooks = readergrps.filter(readergrp => readergrp match { case (reader2, reciter2) => reader2 == areader }) val bookpairs = allreaderbooks.map(readertuple => readertuple match { case (reader3, reciter3) => reciter3.tolist.map(rec => ((book, rec.book), 1)) }) bookpairs }) }) val x = allbookpairs.flatmap(identity) val y = x.map(rdd => rdd.first) val z = y.flatmap(identity) val p = z.reducebykey((cnt1, cnt2) => cnt1 + cnt2) val result = p.map(bookpair => bookpair match { case((book1, book2),cnt) => bookpair(book1, book2, cnt) } ) val resultcsv = result.map(pair => resulttostr(pair)) resultcsv.saveastextfile("./result.csv") } def resulttostr(pair: bookpair): string = { val sep = "|" pair.book1 + sep + pair.book2 + sep + pair.cnt } }
this implemntation in fact results in the different, inefficient algorithm !:
for each book find each reader of book scanning readers every time! each other_book in books of reader increment common_reader_count ((book, other_book), cnt)
which contradicts main goal of discussed above algorithm because instead of reducing, increases number of operations. finding user books requires filtering users every book. number of operations ~ n * m n - number of users , m - number of books.
questions:
- is there way implement original algorithm in spark without filtering complete reader collection every book?
- any other algorithms compute book pair counts efficiently?
- also, when running code
filter exception
reason can not figure out. ideas?
please, see exception log below:
15/05/29 18:24:05 warn util.utils: hostname, localhost.localdomain resolves loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0) 15/05/29 18:24:05 warn util.utils: set spark_local_ip if need bind address 15/05/29 18:24:09 info slf4j.slf4jlogger: slf4jlogger started 15/05/29 18:24:10 info remoting: starting remoting 15/05/29 18:24:10 info remoting: remoting started; listening on addresses :[akka.tcp://sparkdriver@10.0.2.15:38910] 15/05/29 18:24:10 info remoting: remoting listens on addresses: [akka.tcp://sparkdriver@10.0.2.15:38910] 15/05/29 18:24:12 error executor.executor: exception in task 0.0 in stage 6.0 (tid 4) java.lang.nullpointerexception @ org.apache.spark.rdd.rdd.filter(rdd.scala:282) @ small$$anonfun$4$$anonfun$apply$1.apply(small.scala:58) @ small$$anonfun$4$$anonfun$apply$1.apply(small.scala:54) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244) @ scala.collection.immutable.list.foreach(list.scala:318) @ scala.collection.traversablelike$class.map(traversablelike.scala:244) @ scala.collection.abstracttraversable.map(traversable.scala:105) @ small$$anonfun$4.apply(small.scala:54) @ small$$anonfun$4.apply(small.scala:51) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371) @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:327) @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371) @ org.apache.spark.util.collection.externalappendonlymap.insertall(externalappendonlymap.scala:137) @ org.apache.spark.aggregator.combinevaluesbykey(aggregator.scala:58) @ org.apache.spark.shuffle.hash.hashshufflewriter.write(hashshufflewriter.scala:55) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:68) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41) @ org.apache.spark.scheduler.task.run(task.scala:54) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:177) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:744)
update:
this code:
val df = sc.parallelize(array((1,30),(2,10),(3,20),(1,10)(2,30))).todf("books","readers") val results = df.join( df.select($"books" "r_books", $"readers" "r_readers"), $"readers" === $"r_readers" , $"books" < $"r_books" ) .groupby($"books", $"r_books") .agg($"books", $"r_books", count($"readers"))
gives following result:
books r_books count(readers) 1 2 2
so count
here number of times 2 books (here 1 , 2) read (count of pairs).
this kind of thing lot easier if convert original rdd dataframe:
val df = sc.parallelize( array((1,30),(2,10),(3,20),(1,10), (2,30)) ).todf("books","readers")
once that, self-join on dataframe make book pairs, count how many readers have read each book pair:
val results = df.join( df.select($"books" "r_books", $"readers" "r_readers"), $"readers" === $"r_readers" , $"books" < $"r_books" ).groupby( $"books", $"r_books" ).agg( $"books", $"r_books", count($"readers") )
as additional explanation join, note joining df
onto -- self-join: df.join(df.select(...), ...)
. looking stitch book #1 -- $"books"
-- second book -- $"r_books"
, same reader -- $"reader" === $"r_reader"
. if joined $"reader" === $"r_reader"
, same book joined onto itself. instead, use $"books" < $"r_books"
ensure ordering in book pairs (<lower_id>,<higher_id>)
.
once join, dataframe line every reader of every book pair. groupby
, agg
functions actual counting of number of readers per book pairing.
incidentally, if reader read same book twice, believe end double-counting, may or may not want. if that's not want change count($"readers")
countdistinct($"readers")
.
if want know more agg
functions count()
, countdistinct()
, bunch of other fun stuff, check out scaladoc org.apache.spark.sql.functions
Comments
Post a Comment