1
1
package com .jobreadyprogrammer .spark ;
2
2
3
- import org .apache .spark .api .java .function .MapFunction ;
4
3
import org .apache .spark .sql .Dataset ;
5
4
import org .apache .spark .sql .Encoders ;
6
5
import org .apache .spark .sql .Row ;
7
6
import org .apache .spark .sql .SparkSession ;
8
7
9
- import com .jobreadyprogrammer .pojos .Line ;
8
+ import com .jobreadyprogrammer .mappers .LineMapper ;
9
+
10
+ import breeze .linalg .Options .Value ;
10
11
11
12
public class WordCount {
12
13
@@ -25,26 +26,27 @@ public void start() {
25
26
df .show (5 );
26
27
df .printSchema ();
27
28
28
- Dataset <Line > houseDS = df .map (
29
- new MapFunction <Row , Line >(){
30
-
31
- private static final long serialVersionUID = -2L ;
32
-
33
- @ Override
34
- public Line call (Row value ) throws Exception {
35
- String [] words = value .toString ().split (" " );
36
- Line l = new Line ();
37
- l .setWords (words );
38
-
39
- return l ;
40
- }
41
-
42
- },
43
-
44
- Encoders .bean (Line .class ));
29
+ Dataset <String > lineDS = df .flatMap (
30
+ new LineMapper (), Encoders .STRING ());
31
+
45
32
46
- houseDS .printSchema ();
47
- houseDS .show (10 , 50 );
33
+ lineDS .printSchema ();
34
+ lineDS .show (10 , 200 );
35
+
36
+ String boringWords = "( 'a', 'an', 'and', 'are', 'as', 'at', 'be', 'but', 'by',\r \n " +
37
+ " 'for', 'if', 'in', 'into', 'is', 'it',\r \n " +
38
+ " 'no', 'not', 'of', 'on', 'or', 'such',\r \n " +
39
+ " 'that', 'the', 'their', 'then', 'there', 'these',\r \n " +
40
+ " 'they', 'this', 'to', 'was', 'will', 'with', 'he', 'she')" ;
41
+
42
+ Dataset <Row > df2 = lineDS .toDF ();
43
+ df2 = df2 .groupBy ("value" ).count ();
44
+ df2 = df2 .filter ("lower(value) NOT IN" + boringWords );
45
+ df2 = df2 .orderBy (df2 .col ("count" ).desc ());
46
+
47
+
48
+ df2 .printSchema ();
49
+ df2 .show (100 );
48
50
}
49
51
50
52
0 commit comments