scala - Spark: Counting co-occurrence - Algorithm for efficient multi-pass filtering of huge collections -


there table 2 columns books , readers of these books, books , readers book , reader ids, respectively :

   books readers 1:     1      30 2:     2      10 3:     3      20 4:     1      20 5:     1      10 6:     2      30 

record book = 1, reader = 30 means book id = 1 read user id = 30. each book pair need count number of readers read both of these books, algorithm:

for each book   each reader of book     each other_book in books of reader       increment common_reader_count ((book, other_book), cnt) 

the advantage of using algorithm requires small number of operations compared counting book combinations two.

to implement above algorithm organize data in 2 groups : 1) keyed book, rdd containing readers of each book , 2) keyed reader, rdd containing books read each reader, such in following program:

import org.apache.spark.sparkconf import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.log4j.logger import org.apache.log4j.level  object small {    case class book(book: int, reader: int)   case class bookpair(book1: int, book2: int, cnt:int)    val recs = array(     book(book = 1, reader = 30),     book(book = 2, reader = 10),     book(book = 3, reader = 20),     book(book = 1, reader = 20),     book(book = 1, reader = 10),     book(book = 2, reader = 30))    def main(args: array[string]) {     logger.getlogger("org.apache.spark").setlevel(level.warn)     logger.getlogger("org.eclipse.jetty.server").setlevel(level.off)     // set environment     val conf = new sparkconf()       .setappname("test")       .set("spark.executor.memory", "2g")     val sc = new sparkcontext(conf)     val data = sc.parallelize(recs)      val bookmap = data.map(r => (r.book, r))     val bookgrps = bookmap.groupbykey      val readermap = data.map(r => (r.reader, r))     val readergrps = readermap.groupbykey      // *** calculate book pairs     // iterate book groups      val allbookpairs = bookgrps.map(bookgrp => bookgrp match {       case (book, reciter) =>         // iterate user groups          reciter.tolist.map(rec => {           // find readers book           val areader = rec.reader           // find books (including one) reader read           val allreaderbooks = readergrps.filter(readergrp => readergrp match {             case (reader2, reciter2) => reader2 == areader           })           val bookpairs = allreaderbooks.map(readertuple => readertuple match {             case (reader3, reciter3) => reciter3.tolist.map(rec => ((book, rec.book), 1))           })           bookpairs         })      })     val x = allbookpairs.flatmap(identity)     val y = x.map(rdd => rdd.first)     val z = y.flatmap(identity)     val p = z.reducebykey((cnt1, cnt2) => cnt1 + cnt2)     val result = p.map(bookpair => bookpair match {       case((book1, book2),cnt) => bookpair(book1, book2, cnt)     } )      val resultcsv = result.map(pair => resulttostr(pair))     resultcsv.saveastextfile("./result.csv")   }     def resulttostr(pair: bookpair): string = {      val sep = "|"     pair.book1 + sep + pair.book2 + sep + pair.cnt   } } 

this implemntation in fact results in the different, inefficient algorithm !:

for each book   find each reader of book scanning readers every time!     each other_book in books of reader       increment common_reader_count ((book, other_book), cnt) 

which contradicts main goal of discussed above algorithm because instead of reducing, increases number of operations. finding user books requires filtering users every book. number of operations ~ n * m n - number of users , m - number of books.

questions:

  1. is there way implement original algorithm in spark without filtering complete reader collection every book?
  2. any other algorithms compute book pair counts efficiently?
  3. also, when running code filter exception reason can not figure out. ideas?

please, see exception log below:

15/05/29 18:24:05 warn util.utils: hostname, localhost.localdomain resolves loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0) 15/05/29 18:24:05 warn util.utils: set spark_local_ip if need bind address 15/05/29 18:24:09 info slf4j.slf4jlogger: slf4jlogger started 15/05/29 18:24:10 info remoting: starting remoting 15/05/29 18:24:10 info remoting: remoting started; listening on addresses :[akka.tcp://sparkdriver@10.0.2.15:38910] 15/05/29 18:24:10 info remoting: remoting listens on addresses: [akka.tcp://sparkdriver@10.0.2.15:38910] 15/05/29 18:24:12 error executor.executor: exception in task 0.0 in stage 6.0 (tid 4) java.lang.nullpointerexception     @ org.apache.spark.rdd.rdd.filter(rdd.scala:282)     @ small$$anonfun$4$$anonfun$apply$1.apply(small.scala:58)     @ small$$anonfun$4$$anonfun$apply$1.apply(small.scala:54)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244)     @ scala.collection.immutable.list.foreach(list.scala:318)     @ scala.collection.traversablelike$class.map(traversablelike.scala:244)     @ scala.collection.abstracttraversable.map(traversable.scala:105)     @ small$$anonfun$4.apply(small.scala:54)     @ small$$anonfun$4.apply(small.scala:51)     @ scala.collection.iterator$$anon$11.next(iterator.scala:328)     @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371)     @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:327)     @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371)     @ org.apache.spark.util.collection.externalappendonlymap.insertall(externalappendonlymap.scala:137)     @ org.apache.spark.aggregator.combinevaluesbykey(aggregator.scala:58)     @ org.apache.spark.shuffle.hash.hashshufflewriter.write(hashshufflewriter.scala:55)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:68)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41)     @ org.apache.spark.scheduler.task.run(task.scala:54)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:177)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:744) 

update:

this code:

val df = sc.parallelize(array((1,30),(2,10),(3,20),(1,10)(2,30))).todf("books","readers") val results = df.join( df.select($"books" "r_books", $"readers" "r_readers"),  $"readers" === $"r_readers" , $"books" < $"r_books" ) .groupby($"books", $"r_books") .agg($"books", $"r_books", count($"readers")) 

gives following result:

books r_books count(readers) 1     2       2      

so count here number of times 2 books (here 1 , 2) read (count of pairs).

this kind of thing lot easier if convert original rdd dataframe:

val df = sc.parallelize(   array((1,30),(2,10),(3,20),(1,10), (2,30)) ).todf("books","readers") 

once that, self-join on dataframe make book pairs, count how many readers have read each book pair:

val results = df.join(   df.select($"books" "r_books", $"readers" "r_readers"),    $"readers" === $"r_readers" , $"books" < $"r_books" ).groupby(   $"books", $"r_books" ).agg(   $"books", $"r_books", count($"readers") ) 

as additional explanation join, note joining df onto -- self-join: df.join(df.select(...), ...). looking stitch book #1 -- $"books" -- second book -- $"r_books", same reader -- $"reader" === $"r_reader". if joined $"reader" === $"r_reader", same book joined onto itself. instead, use $"books" < $"r_books" ensure ordering in book pairs (<lower_id>,<higher_id>).

once join, dataframe line every reader of every book pair. groupby , agg functions actual counting of number of readers per book pairing.

incidentally, if reader read same book twice, believe end double-counting, may or may not want. if that's not want change count($"readers") countdistinct($"readers").

if want know more agg functions count() , countdistinct() , bunch of other fun stuff, check out scaladoc org.apache.spark.sql.functions


Comments

Popular posts from this blog

Java 3D LWJGL collision -

spring - SubProtocolWebSocketHandler - No handlers -

methods - python can't use function in submodule -