-
Notifications
You must be signed in to change notification settings - Fork 25
/
Workshop.html
636 lines (404 loc) · 47.7 KB
/
Workshop.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Workshop</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
/*
This document has been created with Marked.app <http://markedapp.com>, Copyright 2011 Brett Terpstra
Please leave this notice in place, along with any additional credits below.
---------------------------------------------------------------
Title: GitHub
Author: Brett Terpstra
Description: Github README style. Includes theme for Pygmentized code blocks.
*/
html,body{color:black}*{margin:0;padding:0}body{font:13.34px helvetica,arial,freesans,clean,sans-serif;-webkit-font-smoothing:antialiased;line-height:1.4;padding:3px;background:#fff;border-radius:3px;-moz-border-radius:3px;-webkit-border-radius:3px}p{margin:1em 0}a{color:#4183c4;text-decoration:none}#wrapper{background-color:#fff;border:3px solid #eee!important;padding:0 30px;margin:15px}#wrapper{font-size:14px;line-height:1.6}#wrapper>*:first-child{margin-top:0!important}#wrapper>*:last-child{margin-bottom:0!important}h1,h2,h3,h4,h5,h6{margin:0;padding:0}h1{margin:15px 0;padding-bottom:2px;font-size:24px;border-bottom:1px solid #eee}h2{margin:20px 0 10px 0;font-size:18px}h3{margin:20px 0 10px 0;padding-bottom:2px;font-size:14px;border-bottom:1px solid #ddd}h4{font-size:14px;line-height:26px;padding:18px 0 4px;font-weight:bold;text-transform:uppercase}h5{font-size:13px;line-height:26px;padding:14px 0 0;font-weight:bold;text-transform:uppercase}h6{color:#666;font-size:14px;line-height:26px;padding:18px 0 0;font-weight:normal;font-variant:italic}hr{background:transparent url() repeat-x 0 0;border:0 none;color:#ccc;height:4px;margin:20px 0;padding:0}#wrapper>h2:first-child,#wrapper>h1:first-child,#wrapper>h1:first-child+h2{border:0;margin:0;padding:0}#wrapper>h3:first-child,#wrapper>h4:first-child,#wrapper>h5:first-child,#wrapper>h6:first-child{margin:0;padding:0}h4+p,h5+p,h6+p{margin-top:0}li p.first{display:inline-block}ul,ol{margin:15px 0 15px 25px}ul li,ol li{margin-top:7px;margin-bottom:7px}ul li>*:last-child,ol li>*:last-child{margin-bottom:0}ul li>*:first-child,ol li>*:first-child{margin-top:0}#wrapper>ul,#wrapper>ol{margin-top:21px;margin-left:36px}dl{margin:0;padding:20px 0 0}dl dt{font-size:14px;font-weight:bold;line-height:normal;margin:0;padding:20px 0 0}dl dt:first-child{padding:0}dl dd{font-size:13px;margin:0;padding:3px 0 0}blockquote{margin:14px 0;border-left:4px solid #ddd;padding-left:11px;color:#555}table{border-collapse:collapse;margin:20px 0 0;padding:0}table tr{border-top:1px solid #ccc;background-color:#fff;margin:0;padding:0}table tr:nth-child(2n){background-color:#f8f8f8}table tr th,table tr td{border:1px solid #ccc;text-align:left;margin:0;padding:6px 13px}img{max-width:100%;height:auto}code,tt{margin:0 2px;padding:2px 5px;white-space:nowrap;border:1px solid #ccc;background-color:#f8f8f8;border-radius:3px;-moz-border-radius:3px;-webkit-border-radius:3px;font-size:12px}pre>code{margin:0;padding:0;white-space:pre;border:0;background:transparent;font-size:13px}.highlight pre,pre{background-color:#f8f8f8;border:1px solid #ccc;font-size:13px;line-height:19px;overflow:auto;padding:6px 10px;border-radius:3px;-moz-border-radius:3px;-webkit-border-radius:3px}#wrapper>pre,#wrapper>div.highlight{margin:10px 0 0}pre code,pre tt{background-color:transparent;border:0}#wrapper{background-color:#fff;border:1px solid #cacaca;padding:30px}.poetry pre{font-family:Georgia,Garamond,serif!important;font-style:italic;font-size:110%!important;line-height:1.6em;display:block;margin-left:1em}.poetry pre code{font-family:Georgia,Garamond,serif!important}sup,sub,a.footnote{font-size:1.4ex;height:0;line-height:1;vertical-align:super;position:relative}sub{vertical-align:sub;top:-1px}@media print{body{background:#fff}img,pre,blockquote,table,figure{page-break-inside:avoid}#wrapper{background:#fff;border:0}code{background-color:#fff;color:#444!important;padding:0 .2em;border:1px solid #dedede}pre code{background-color:#fff!important;overflow:visible}pre{background:#fff}}@media screen{body.inverted,.inverted #wrapper,.inverted hr .inverted p,.inverted td,.inverted li,.inverted h1,.inverted h2,.inverted h3,.inverted h4,.inverted h5,.inverted h6,.inverted th,.inverted .math,.inverted caption,.inverted dd,.inverted dt,.inverted blockquote{color:#eee!important;border-color:#555}.inverted td,.inverted th{background:#333}.inverted pre,.inverted code,.inverted tt{background:#444!important}.inverted h2{border-color:#555}.inverted hr{border-color:#777;border-width:1px!important}::selection{background:rgba(157,193,200,.5)}h1::selection{background-color:rgba(45,156,208,.3)}h2::selection{background-color:rgba(90,182,224,.3)}h3::selection,h4::selection,h5::selection,h6::selection,li::selection,ol::selection{background-color:rgba(133,201,232,.3)}code::selection{background-color:rgba(0,0,0,.7);color:#eee}code span::selection{background-color:rgba(0,0,0,.7)!important;color:#eee!important}a::selection{background-color:rgba(255,230,102,.2)}.inverted a::selection{background-color:rgba(255,230,102,.6)}td::selection,th::selection,caption::selection{background-color:rgba(180,237,95,.5)}.inverted{background:#0b2531}.inverted #wrapper,.inverted{background:rgba(37,42,42,1)}.inverted a{color:rgba(172,209,213,1)}}.highlight .c{color:#998;font-style:italic}.highlight .err{color:#a61717;background-color:#e3d2d2}.highlight .k{font-weight:bold}.highlight .o{font-weight:bold}.highlight .cm{color:#998;font-style:italic}.highlight .cp{color:#999;font-weight:bold}.highlight .c1{color:#998;font-style:italic}.highlight .cs{color:#999;font-weight:bold;font-style:italic}.highlight .gd{color:#000;background-color:#fdd}.highlight .gd .x{color:#000;background-color:#faa}.highlight .ge{font-style:italic}.highlight .gr{color:#a00}.highlight .gh{color:#999}.highlight .gi{color:#000;background-color:#dfd}.highlight .gi .x{color:#000;background-color:#afa}.highlight .go{color:#888}.highlight .gp{color:#555}.highlight .gs{font-weight:bold}.highlight .gu{color:#800080;font-weight:bold}.highlight .gt{color:#a00}.highlight .kc{font-weight:bold}.highlight .kd{font-weight:bold}.highlight .kn{font-weight:bold}.highlight .kp{font-weight:bold}.highlight .kr{font-weight:bold}.highlight .kt{color:#458;font-weight:bold}.highlight .m{color:#099}.highlight .s{color:#d14}.highlight .na{color:#008080}.highlight .nb{color:#0086b3}.highlight .nc{color:#458;font-weight:bold}.highlight .no{color:#008080}.highlight .ni{color:#800080}.highlight .ne{color:#900;font-weight:bold}.highlight .nf{color:#900;font-weight:bold}.highlight .nn{color:#555}.highlight .nt{color:#000080}.highlight .nv{color:#008080}.highlight .ow{font-weight:bold}.highlight .w{color:#bbb}.highlight .mf{color:#099}.highlight .mh{color:#099}.highlight .mi{color:#099}.highlight .mo{color:#099}.highlight .sb{color:#d14}.highlight .sc{color:#d14}.highlight .sd{color:#d14}.highlight .s2{color:#d14}.highlight .se{color:#d14}.highlight .sh{color:#d14}.highlight .si{color:#d14}.highlight .sx{color:#d14}.highlight .sr{color:#009926}.highlight .s1{color:#d14}.highlight .ss{color:#990073}.highlight .bp{color:#999}.highlight .vc{color:#008080}.highlight .vg{color:#008080}.highlight .vi{color:#008080}.highlight .il{color:#099}.highlight .gc{color:#999;background-color:#eaf2f5}.type-csharp .highlight .k{color:#00F}.type-csharp .highlight .kt{color:#00F}.type-csharp .highlight .nf{color:#000;font-weight:normal}.type-csharp .highlight .nc{color:#2b91af}.type-csharp .highlight .nn{color:#000}.type-csharp .highlight .s{color:#a31515}.type-csharp .highlight .sc{color:#a31515}
</style>
</head>
<body class="normal">
<div id="wrapper">
<h1 id="scaldingworkshop">Scalding Workshop</h1>
<p><em>Copyright (C) 2010-2014 Think Big Analytics, Inc. All Rights Reserved.</em></p>
<p><a href="http://thestrangeloop.com">StrangeLoop 2012</a><br/>
<strong>Dean Wampler, Think Big Analytics</strong><br/>
<a href="mailto:dean@deanwampler.com">dean@deanwampler.com</a><br/>
<a href="https://twitter.com/deanwampler">@deanwampler</a><br/>
<a href="http://thinkbiganalytics.com">Hire Us!</a></p>
<p>This workshop/tutorial takes you through the basic principles of writing data analysis applications with <a href="https://github.com/twitter/scalding">Scalding</a>, a Scala API that wraps <a href="http://www.cascading.org/">Cascading</a>. I first went through this workshop at <a href="http://thestrangeloop.com">StrangeLoop 2012</a>. It took about 3 hours, but we didn’t do all the <em>mini-exercises</em>, so it make take you a bit longer if you do them all.</p>
<p>These instructions walk you through a series of exercises. The exercises have a corresponding Scalding script (Scala source file). We use a convention of adding a number suffix to the name to indicate the order of the exercises. Note that some of these exercises are adapted from the Tutorial examples that are part of the Scalding Github repo, where noted.</p>
<p>This document will explain many features of the Scalding and Cascading. The scripts themselves contain additional details. The Scalding and Cascading documentation has more information than we can cover here:</p>
<ul>
<li><a href="http://www.cascading.org/documentation/">Cascading Documentation</a>, especially the <a href="http://www.cascading.org/documentation/">Cascading User Guide</a> and the <a href="http://docs.cascading.org/cascading/2.0/javadoc/">Javadocs</a>.</li>
<li><a href="https://github.com/twitter/scalding/wiki">Scalding Wiki</a>.</li>
<li>Scalding Scaladocs are not online, but they can be built from the <a href="https://github.com/twitter/scalding">Scalding Repo</a>. For convenience, we have included these files in the workshop as <code>api.zip</code>. Unzip the file and open the <a href="api/index.html">index</a>.</li>
<li><a href="http://blog.echen.me/2012/02/09/movie-recommendations-and-more-via-mapreduce-and-scalding/">Movie Recommendations</a> is a fantastic blog post with detailed, non-trivial examples using Scalding.</li>
<li><a href="https://github.com/snowplow/scalding-example-project">Scalding Example Project</a> is a full example designed to run on Hadoop, specifically on Amazon’s EMR (Elastic MapReduce) platform.</li>
</ul>
<h2 id="adisclaimer">A Disclaimer…</h2>
<p>I’m not a Scalding or Cascading expert. Feedback welcome! <a href="https://github.com/ThinkBigAnalytics/scalding-workshop">Fork me</a>.</p>
<h1 id="basiccascadingconcepts">Basic Cascading Concepts</h1>
<p>Let’s start with a very brief synopsis of key Cascading concepts useful for understanding Scalding. Not all Cascading features are wrapped with Scalding APIs. In some cases, equivalent Scala idioms are used, even though the implementations may delegate to Cascading equivalents. </p>
<p>See the Cascading User Guide for more details.</p>
<h2 id="tuple">Tuple</h2>
<p>A common data structure in many programming languages, a tuple is a grouping of a fixed number of fields, where each element has a specific type, the types of the different fields can be different and the fields can have names. It is analogous to a SQL record, a <code>struct</code> in C, and an object in object-oriented languages.</p>
<h2 id="tap">Tap</h2>
<p>A <strong>Tap</strong> is a data <em>source</em> (for reading) or <em>sink</em> (for writing), corresponding to a file on the local file system, <em>Hadoop Distributed File System</em> (HDFS), or Amazon S3. You instantiate an <code>Hfs</code> instance for Hadoop or S3 file systems, and a <code>FileTap</code> instance for local file system. There are also more specialized versions for particular scenarios, like specifying a “template” for file or directory naming conventions.</p>
<h2 id="scheme">Scheme</h2>
<p>The <strong>Scheme</strong> encapsulates the file format. There are several supported natively by Cascading. The corresponding Java class names are used in the following subsections.</p>
<h3 id="textline">TextLine</h3>
<p>When read, each line of text in the file is returned, with no attempt to tokenize it into fields. The position, byte offset or line number, in the file for the line is also returned. In the Hadoop model of key-value data pairs, the offset is the key and the line is the value.</p>
<p>When written, tuple fields are serialized to text and separated by tabs.</p>
<p>Available for both local file systems and Hadoop.</p>
<h3 id="textdelimited">TextDelimited</h3>
<p>Handles data where each line is a tuple with fields separated by delimiters, such as tabs and commas. Quoted strings with nested delimiters and files with column headers can be handled. Fields can be cast to primitive types.</p>
<p>Available for both local file systems and Hadoop.</p>
<h3 id="sequencefile">SequenceFile</h3>
<p>A binary, Hadoop-only data format.</p>
<h3 id="writablesequencefile">WritableSequenceFile</h3>
<p>A more efficient implementation of SequenceFile.</p>
<h2 id="pipe">Pipe</h2>
<p><strong>Pipes</strong> are units of processing through which tuples are streamed. They are composed into <strong>Assemblies</strong>. Pipes are provided to merge and join streams, split them into separate streams, group them, filter them, etc.</p>
<h2 id="flow">Flow</h2>
<p>A <strong>Flow</strong> is created whenever a Tap is connected to a Pipe. Flows can also be composed.</p>
<h2 id="cascade">Cascade</h2>
<p>A <strong>Cascade</strong> joins flows and supports a model where a flow is only executed if the target output doesn’t exist is older than the input data, analogous to build tools like <code>make</code>.</p>
<h1 id="theworkshop">The Workshop</h1>
<p>Each section introduces one or more features for data manipulation, most of which are analogous to features found in SQL, Pig (the Hadoop <em>data flow</em> tool), and other systems.</p>
<h2 id="sanitycheck">Sanity Check</h2>
<p>First, the <a href="README.html">README</a> tells you to run a <code>SanityCheck0.scala</code> Scalding script as a sanity check to verify that your environment is ready to go.</p>
<p>Using <code>bash</code>: </p>
<pre><code> cd $HOME/fun/scalding-workshop
./run.rb scripts/SanityCheck0.scala
</code></pre>
<p>On Windows:</p>
<pre><code> cd C:\fun\scalding-workshop
ruby run.rb scripts/SanityCheck0.scala
</code></pre>
<p>From now on, we’ll assume you are working in the <code>scalding-workshop</code> directory, unless otherwise noted. Also, we’ll just show the <code>bash</code> versions of the subsequent <code>run.rb</code> commands. Finally, because we’re lazy, we’ll sometimes drop the <code>.scala</code> extension from script names when we discuss them in the text.</p>
<p>Run these commands again and verify that they run without error. The output is written to <code>output/SanityCheck0.txt</code>. What’s in that file?</p>
<p>It contains the contents of <code>SanityCheck0.scala</code>, but each line is now numbered.</p>
<blockquote>
<p>By default, when you create a new field in a <strong>pipeline</strong>, Cascading adds the field to the existing fields. All the fields together constitute a <strong>tuple</strong>.</p>
</blockquote>
<p>Loading the file added the line number as an additional field.</p>
<h2 id="projectingfields">Projecting Fields</h2>
<p>When you write a SQL <code>SELECT</code> statement like the following, you are <em>projecting</em> out the fields/columns or calculated values that you want, and discarding the rest of the fields. </p>
<pre><code>SELECT name, age FROM employees;
</code></pre>
<p>Scalding also has a <code>project</code> method for the same purpose. Let’s modify <code>SanityCheck0</code> to project out just the line we read from the file, discarding the line number. <code>Scripts/Project1.scala</code> has this change near the end of the file:</p>
<pre><code>in
.read
.project('line)
.write(out)
</code></pre>
<p>This expression is a sequence of Cascading <a href="http://docs.cascading.org/cascading/2.0/javadoc/cascading/pipe/Pipe.html">Pipes</a>. However, there is not <code>write</code> method defined on the <code>Pipe</code> class. Scalding uses Scala’s <em>implicit conversion</em> feature to wrap <code>Pipe</code> with a Scalding-specific <code>com.twitter.scalding.RichPipe</code> type that provides most of the methods we’ll actually use.</p>
<blockquote>
<p>There are also comments in this and other scripts about specific Scalding and Cascading features that we won’t cover in these notes.</p>
</blockquote>
<p>Run the script thusly:</p>
<pre><code> ./run.rb scripts/Project1.scala
</code></pre>
<p>Now, if you look at the output in <code>output/Project1.txt</code>, you’ll see just the original lines from <code>scripts/Project1.scala</code>. That is, running a <code>diff</code> command on the input and output files should show no differences.</p>
<h2 id="flatmapandgroupby-implementingwordcount">FlatMap and GroupBy - Implementing Word Count</h2>
<p>This exercise introduces several new concepts and implements the famous <em>hello world!</em> of Hadoop programming: <em>word count</em>. In word count, a corpus of documents is read, the content is tokenized into words, and the total count for each word over the entire corpus is computed.</p>
<p>First, we’ll use two new invocation command options:</p>
<ul>
<li><code>--input file</code> specifies the input file.</li>
<li><code>--output file</code> specifies the output file.</li>
</ul>
<blockquote>
<p>Unlike Hadoop’s HDFS API, Hive, and Pig, when you run using <code>--local</code> mode, you can’t specify a directory for the input, where all files will be read, or for the output, where one or more files will be written. You have to specify input and output files.</p>
</blockquote>
<p>Run the script like this, where have wrapped lines and used <code>\\</code> in to indicate the line breaks:</p>
<pre><code>./run.rb scripts/WordCount2.scala \
--input data/shakespeare/plays.txt \
--output output/shakespeare-wc.txt
</code></pre>
<p>The output should be identical to the contents of <code>data/shakespeare-wc/simple/wc.txt</code>. Using a <code>diff</code> command, should show no differences:</p>
<pre><code>diff data/shakespeare-wc/simple/wc.txt output/shakespeare-wc.txt
</code></pre>
<p>The script uses two new data transformation features to compute the word count.</p>
<h3 id="furtherexploration">Further Exploration</h3>
<p>Try these additional “mini-exercises” to explore what Scalding and Cascading are doing.</p>
<h4 id="improvethetokenization">Improve the Tokenization</h4>
<p>Look at the output and you’ll notice that the tokenization is rather poor. How can you improve the value defined in <code>tokenizerRegex</code>? Can you pass in the regular expression as an argument to the program?</p>
<h4 id="projectthenumfield">Project the ’num Field</h4>
<p>Instead of projecting out <code>'line</code>, project out <code>'num</code>, the line number. (The output is boring, but now you know the name of this field!)</p>
<h3 id="flatmap">flatMap</h3>
<p>When you apply a <code>map</code> operation to a collection, each element is passed to a function that returns a new element, perhaps of a completely different type. For example, mapping a collection of integers to a collection of their string representations. A crucial feature of <code>map</code> is that the process is <em>one-to-one</em>. Each input element has a corresponding output element and the sizes of the input and output collections are the same.</p>
<p>The <code>flatMap</code> operation is similar, but now the output of the function called for each element is a collection of zero or more new elements. These output collections from each function call are <em>flattened</em> into a single collection. So, a crucial difference compared to <code>map</code> is that the process is <em>one-to-many</em>, where <em>many</em> could be zero!</p>
<p><code>WordCount2</code> uses <code>flatMap</code> to convert each line of input text into many words:</p>
<pre><code>.flatMap('line -> 'word){
line : String => line.toLowerCase.split(tokenizerRegex)
}
</code></pre>
<p>where <code>tokenizerRegex</code> is <code>"\\s+"</code>.</p>
<p>A bit of Scala syntax; there are <em>two</em> argument lists passed to <code>flatMap</code>. The first, <code>('line -> 'word)</code> specifies the field(s) in the tuple to pass to the mapping function, shown on the left-hand side of the arrow-like <code>-></code>, and it names the output field(s) the function will return, the single <code>'word</code> in this case.</p>
<p>The second function argument list is <code>{ line : String => line.toLowerCase.split(tokenizerRegex)}</code>. Scala lets you substitute curly braces <code>{...}</code> for parentheses <code>(...)</code> for function argument lists, which is most useful when the content of the “block-like” structure is a single <em>function literal</em> (a.k.a. <em>anonymous function</em>). </p>
<p>The <code>line : String</code> is the argument list passed to the anonymous function, a single parameter named <code>line</code> of type <code>String</code>. On the right-hand side of the arrow-like <code>=></code> is the body of the anonymous function. In this case it converts <code>line</code> to lower case and splits it on whitespace into an array of words.</p>
<h3 id="groupby">groupBy</h3>
<p>Once we have a stream of individual words, we want to count the occurrences of each word. To do that, we need to group together all occurrences of the same words. The <code>groupBy</code> operation is used by <code>WordCount2</code> to do this. </p>
<pre><code>.groupBy('word){group => group.size('count)}
</code></pre>
<p>The calling syntax is similar to <code>flatMap</code>. The first argument list specifies one or more fields to group over, forming the “key”. The second argument is a function literal. It takes a single argument of type <code>com.twitter.scalding.GroupBuilder</code> that gives us a hook to the constructed group of words so we can compute what we need from it. In this case, all we care about is the size of the group, which we’ll name <code>'count</code>.</p>
<h3 id="furtherexploration">Further Exploration</h3>
<p>Try these additional “mini-exercises” to explore what Scalding and Cascading are doing.</p>
<h4 id="removethegroupby">Remove the groupBy</h4>
<p>Comment out the <code>groupBy</code> line so that the raw results of <code>flatMap</code> are written to the output instead of the word count output. Note the fields that are written. </p>
<p>You’ll see the line number, the whole line, and an individual word from the line. Note that the line number and line are repeated for each word in the line.</p>
<h4 id="groupagainbycount">Group Again by Count</h4>
<p>Now restore the <code>groupBy</code> line, and after it, add this line:</p>
<pre><code>.groupBy('count){ group => group.mkString('word -> 'words, "\t") }
</code></pre>
<p>The output lines will be extremely long at the beginning of the file, but very short at the end. This second <code>groupBy</code> regroups the <code>'word</code> and <code>'count</code> output from the previous pipe. It groups by count so we now have all the words with one occurrence on a line, followed by all the words with two occurrences, etc. At the end of the output, which words have the most occurrences? What are those “words”?</p>
<h4 id="improvethewordtokenization">Improve the Word Tokenization</h4>
<p>You probably noticed that simply splitting on whitespace is not very good, as punctuation is not removed. There are several ways it can be improved. First, replacing the definition of <code>tokenizerRegex</code>, which is <code>"\\s+"</code>, with <code>"\\W+"</code> will treat all runs of non-alphanumeric characters as word separators. This improves the result considerably (although it’s still not perfect…).</p>
<p>For a more complete tokenizer, refactor <code>line.toLowerCase.split(tokenizerRegex)</code> into a <code>tokenize</code> function. Then implement <code>tokenize</code> to remove punctuation, etc. An example implementation can be found in the <a href="https://github.com/twitter/scalding">Scalding README</a>.</p>
<h4 id="eliminateblanklines">Eliminate Blank Lines</h4>
<p>The very first line in the output is an empty word and a count of approximately 49,000! These are blank lines in the text. The implementation removes all other whitespace, but as written, it still returns an empty word for blank lines. Adding a filter clause will remove these lines. We’ll see how below, but you can search for that section now if you want to try it.</p>
<h2 id="inputparsing">Input Parsing</h2>
<p>Let’s do a similar <code>groupBy</code> operation, this time to compute the average of Apple’s (AAPL) closing stock price year over year (so you’ll know what entry points you missed…). Also, in this exercise we’ll solve a common problem; the input data is in an unsupported format.</p>
<p>Oddly enough, while there is a built-in <code>Tsv</code> class for tab-separated values, there is no corresponding <code>Csv</code> class, so we’ll handle that ourselves.</p>
<pre><code>./run.rb scripts/StockAverages3.scala \
--input data/stocks/AAPL.csv \
--output output/AAPL-year-avg.txt
</code></pre>
<p>You should get the following output (the input data ends in early 2010):</p>
<pre><code>1984 80 2.918625000000001 0.17609474544971507
1985 253 2.3041501976284584 0.5150177183048612
1986 253 3.7039130434782592 0.6311900614112455
1987 253 8.90608695652174 1.9436409195268336
1988 253 9.564703557312258 0.5662800839386863
1989 252 9.684563492063495 0.9768865589941155
1990 253 8.826126482213441 1.0871208010962554
1991 253 12.469169960474305 1.66952305050656
1992 254 13.130669291338577 1.7661116441071
1993 253 9.920395256916992 3.1660729659295854
1994 252 8.369880952380953 1.0934696061063884
1995 252 10.075198412698407 1.0061968512619912
1996 254 6.229881889763783 0.8162148485315347
1997 253 4.491818181818182 0.7140447712304852
1998 252 7.641666666666666 1.6581179568203712
1999 252 14.443214285714282 5.433605126282854
2000 252 22.856230158730177 8.415990854209504
2001 248 10.109758064516127 1.2389205523420814
2002 252 9.569920634920635 2.150379256336458
2003 252 9.272619047619047 1.6510305480966423
2004 252 17.763888888888886 6.577299642773537
2005 252 46.67595238095237 11.4046392452893
2006 251 70.81063745019917 9.507687243758655
2007 251 128.2739043824701 35.17547139617391
2008 253 141.9790118577075 33.66549448302255
2009 252 146.81412698412706 39.731840611338804
2010 25 204.7216 7.454055905344417
</code></pre>
<p>Note that as I write this (Septembe 2012), AAPL is currently trading at ~$700/share! <a href="#fn:1" id="fnref:1" title="see footnote" class="footnote">[1]</a></p>
<h3 id="musicalinterlude:comparisonwithhiveandpig">Musical Interlude: Comparison with Hive and Pig</h3>
<p>By the way, here’s the same query written using <em>Hive</em>, assuming there exists a <code>stocks</code> table and we have to select for the stock symbol and exchange:</p>
<pre><code>SELECT year(s.ymd), avg(s.price_close)
FROM stocks s
WHERE s.symbol = 'AAPL' AND s.exchange = 'NASDAQ'
GROUP BY year(s.ymd);
</code></pre>
<p>It’s a little more compact, in part because we can handle all issues of record parsing, etc. when we set up Hive tables, etc. However, Scalding gives us more flexibility when our SQL dialect and built-in functions aren’t flexible enough for our needs.</p>
<p>Here’s what the corresponding <em>Pig</em> script looks like (see also <code>scripts/StockAverages3.pig</code>):</p>
<pre><code>aapl = load 'data/stocks/AAPL.csv' using PigStorage(',') as (
ymd: chararray,
price_open: float,
price_high: float,
price_low: float,
price_close: float,
volume: int,
price_adj_close: float);
by_year = group aapl by SUBSTRING(ymd, 0, 4);
year_avg = foreach by_year generate group, AVG(aapl.price_close);
-- You always specify output directories:
store year_avg into 'output/AAPL-year-avg-pig';
</code></pre>
<p>If you have <em>Pig</em> installed, you can run this script (from this directory) with the following command:</p>
<pre><code>pig -x local scripts/StockAverages3.pig
</code></pre>
<p>The <code>-x local</code> option means that Pig will treat the paths as references to the local file system, not the Hadoop Distributed File System (HDFS).</p>
<h3 id="furtherexploration">Further Exploration</h3>
<p>Try these additional “mini-exercises” to learn more.</p>
<h4 id="projectotheraverages">Project Other Averages</h4>
<p>Try projecting averages for one or more other fields.</p>
<h4 id="pig">Pig</h4>
<p>If you have Pig installed, try the Pig script. Compare the performance of the Pig vs. Scalding script, but keep in mind that because we’re running in local mode, the performance comparison won’t mean as much as when you run in a Hadoop cluster.</p>
<h4 id="hive">Hive</h4>
<p>If you have Hive installed, try the Hive query shown above. You’ll need to create a table that uses the data files first. Compare the performance of the Hive vs. Scalding script, keeping in mind the caveats mentioned for Pig.</p>
<h2 id="joins">Joins</h2>
<p>Let’s join stocks and dividend data. To join two data sources, you set up to pipe assemblies and use one of the join operations.</p>
<p><code>scripts/StocksDividendsJoin4</code> performs an <em>inner join</em> of stock and dividend records. Let’s invoke for Apple data (yes, although Apple only recently announced that it would pay a dividend, Apple paid dividends back in the late 80s and early 90s.):</p>
<pre><code>./run.rb scripts/StocksDividendsJoin4.scala \
--stocks data/stocks/AAPL.csv \
--dividends data/dividends/AAPL.csv \
--output output/AAPL-stocks-dividends-join.txt
</code></pre>
<p>Note that we need to input sources, we use flags <code>--stocks</code> and <code>--dividends</code> for them.</p>
<h3 id="furtherexploration">Further Exploration</h3>
<p>Try these additional “mini-exercises” to learn more.</p>
<h4 id="leftouterjoin">Left Outer Join</h4>
<p>Change <code>joinWithSmaller</code> to <code>leftJoinWithSmaller</code> to perform a left-outer join. (Also change the output file name to something else). You have to scroll a ways into file to find dividends. See also the next mini-exercise.</p>
<h4 id="filteringbyyear">Filtering by Year</h4>
<p>Sometimes you want to filter records, say to limit the output. Add the following filter clause to limit the records to 1988:</p>
<pre><code>.filter('symd){ ymd: String => ymd.startsWith("1988")}
</code></pre>
<p>Try moving it to different positions in the pipe assembly and see if the execution times change. However, the data set is small enough that you might not notice a difference.</p>
<h4 id="filteringblanklinesfromwordcount2">Filtering Blank Lines from WordCount2</h4>
<p>Recall in the <code>WordCount2</code> exercise that we had thousands of blank lines that got counted. Add a <code>filter</code> before the <code>groupBy</code> that keeps only those words whose lengths are greater than zero.</p>
<h2 id="cogroups">CoGroups</h2>
<p>CoGroups in Scalding are used internally to implement joins of two pipe assemblies. Clients can also use them to implement joins of three or more pipe assemblies, so-called <em>star joins</em>. You should always use the largest data stream as the first one in the join, because the Cascading implementation is optimized for this scenario. </p>
<p>However, in this exercise, we’ll do a four-way self-join of the data files for the four stocks symbols we provided, AAPL, INTC, GE, and IBM. </p>
<p>For this script, the <code>--input</code> flag is used to specify the directory where the stocks files are located.</p>
<pre><code>run.rb scripts/StockCoGroup5.scala \
--input data/stocks \
--output output/AAPL-INTC-GE-IBM.txt
</code></pre>
<p>When you look at the implementation, it is not obvious how to use the CoGroup feature. You could do pair-wise joins, which would be conceptually easier perhaps, but offer poor performance in a large MapReduce job, as each pair would require a separate MapReduce Job. The CoGroup feature tries to do as many joins at one as possible.</p>
<p>For comparison, here is the equivalent Hive join.</p>
<pre><code>SELECT a.ymd, a.symbol, a.price_close, b.symbol, b.price_close,
c.symbol, c.price_close, d.symbol, d.price_close
</code></pre>
<p>FROM stocks a
JOIN stocks b ON a.ymd = b.ymd
JOIN stocks c ON a.ymd = c.ymd
JOIN stocks d ON a.ymd = d.ymd
a.symbol = ‘AAPL’ AND
b.symbol = ‘INTC’ AND
c.symbol = ‘GE’ AND
d.symbol = ‘IBM’</p>
<p>Note that because <code>a.ymd</code> appears in all <code>ON</code> clauses, Hive will perform this four-way join in a single MapReduce job.</p>
<h3 id="furtherexploration">Further Exploration</h3>
<h4 id="starjoinsonepairatatime">Star Joins, One Pair at a Time</h4>
<p>Try implementing the same four-way join doing a sequence of pair-wise joins. Compare the complexity of the code and the performance of the join with the CoGroup implementation. The performance would be much slower in MapReduce, where each pair-wise join would require a separate MapReduce job.</p>
<h2 id="splittingapipe">Splitting a Pipe</h2>
<p>This exercise shows how to split a data stream and use various features on the splits, including finding unique values.</p>
<pre><code>run.rb scripts/Twitter6.scala \
--input data/twitter/tweets.tsv \
--uniques output/unique-languages.txt \
--count_star output/count-star.txt \
--count_star_limit output/count-star-limit.txt
</code></pre>
<p>The output in <code>output/unique-languages.txt</code> is the following:</p>
<pre><code>\N
en
es
id
ja
ko
pt
ru
</code></pre>
<p>There are seven languages and an invalid value that looks vaguely like a null! These “languages” are actually from messages in the stream that aren’t tweets, but the results of other user-invoked actions.</p>
<p>The output in <code>output/count-star.txt</code> is a single line with the value 1000, the same as the number of lines in the data file. Similarly, <code>output/count-star-limit.txt</code> should have 100, reflecting the limit to the first 100 lines.</p>
<p>Note that the implementations use <code>groupAll</code>, then count the elements in the single group, via the <code>GroupBuilder</code> object. (The <code>count</code> method requires that we specify a field. We arbitrarily picked <code>tweet_id</code>.) </p>
<p>By the way, this approach is <em>exactly</em> how Pig implements <code>COUNT(*)</code>. For example:</p>
<pre><code>grouped = group tweets all;
count = foreach grouped generate COUNT(tweets);
</code></pre>
<p>Here, <code>tweets</code> would be the equivalent of a Pipe and <code>grouped</code> is the name of a new Pipe created by grouping all records together into one new record. The <code>foreach ... generate</code> statement iterates through this single record and projects the <code>COUNT</code> of the group contents (named <code>tweets</code> after the original relation).</p>
<p>Finally, note that we commented out the additional example using the <code>limit</code> feature. Unfortunately, there is a bug where running in local mode causes a <em>divide by zero</em> error. As we’ll demonstrate later, this bug doesn’t appear when running with Hadoop.</p>
<h3 id="furtherexploration">Further Exploration</h3>
<h4 id="debugsetting">Debug Setting</h4>
<p>Add the <code>debug</code> pipe to the pipe assembly. How does it change the console output? This is a very useful feature when you’re learning or debugging problems.</p>
<h4 id="filterforbadlanguages">Filter for Bad Languages</h4>
<p>Add a <code>filter</code> method call that removes these “bad” records. <strong>Hint:</strong> You’ll want to remove all tuples where the language value is <code>"""\N"""</code>. Without the triple quotes, you would have to write <code>"\\N"</code>.</p>
<h2 id="computengrams">Compute NGrams</h2>
<p>Let’s return to the Shakespeare data to compute <em>context ngrams</em>, a common natural language processing technique, where we provide a prefix of words and find occurrences of the prefix followed by an additional word. The ranked most common <code>n</code> phrases are returned. </p>
<pre><code>run.rb scripts/ContextNGrams7.scala \
--input data/shakespeare/plays.txt \
--output output/context-ngrams.txt \
--ngram-prefix "I love" \
--count 10
</code></pre>
<p>Unfortunately, the data set isn’t large enough to find a lot of examples for many possible ngrams.</p>
<h3 id="furtherexploration">Further Exploration</h3>
<h4 id="experimentwithdifferentprefixes">Experiment with Different Prefixes</h4>
<p>Try other prefixes of different lengths. You don’t have to specify a two-word prefix!</p>
<h4 id="tryusingothertextfiles">Try Using Other Text Files</h4>
<p>Run the script on other large text files you have.</p>
<h4 id="ngramdetector">NGram Detector</h4>
<p>Context ngrams are special case of ngrams, where you just find the most common n-length phrases. Write a script to compute the most common ngrams. </p>
<h2 id="joiningpipes">Joining Pipes</h2>
<p>Let’s revisit the exercise to join stock and dividend records and generalize it to read in multiple sets of data, for different companies, and process them as one stream. A complication is that the data files don’t contain the stock (“instrument”) symbol, so we’ll see another way to add data to tuples.</p>
<pre><code>run.rb scripts/StocksDividendsRevisited8.scala \
--stocks-root-path data/stocks/ \
--dividends-root-path data/dividends/ \
--symbols AAPL,INTC,GE,IBM \
--output output/stocks-dividends-join.txt
</code></pre>
<h1 id="matrixapi">Matrix API</h1>
<p>The Matrix API is relatively new and facilities many important machine learning algorithms.</p>
<h2 id="jaccardsimilarityandadjacencymatrices">Jaccard Similarity and Adjacency Matrices</h2>
<p><em>Adjacency matrices</em> are used to record the similarities between two things. For example, the “things” might be users who have rated movies and the <em>adjacency</em> might be how many movies they have reviewed in common. Higher adjacency numbers indicate more likely similarity of interests. Note that this simple representation says nothing about whether or not they both rated the movies in a similar way.</p>
<p>Once you have adjacency data, you need a <em>similarity measure</em> to determine how similar to things (e.g., people) really are. One is <em>Jaccard Similarity</em>:</p>
<figure>
<img src="images/JaccardSimilarity.png" alt="" /></figure>
<p>This is set notation; the size of the intersection of two sets over the size of the union. It can be generalized and is similar to the cosine of two vectors normalized by length. Note that the distance would be 1 - similarity.</p>
<p>Run the script this way on a small matrix:</p>
<pre><code>run.rb scripts/MatrixJaccardSimilarity9.scala \
--input data/matrix/graph.tsv \
--output output/jaccardSim.tsv
</code></pre>
<h2 id="termfrequency-inversedocumentfrequencytfidf">Term Frequency-Inverse Document Frequency (TF*IDF)</h2>
<p>TF*IDF is a widely used <em>Natural Language Processing</em> tool to analyze text. It’s useful for indexing documents, e.g., for web search engines. Naively, you might calculate the <em>frequency</em> of words in a corpus of documents and assume that if a word appears more frequently in one document, then that document is probably a “definitive” place for that word, such as the way you search for web pages on a particular topic. Similarly, the most frequent words indicate the primary topics for a document.</p>
<p>There’s a problem, though. Very common words, e.g., articles like “the”, “a”, etc. will appear very frequently, undermining results. So we want to remove them so how. Fortunately, they tend to appear frequently in <em>every</em> document, so you can reduce the ranking of a particular word if you <em>divide</em> its frequency in a given document by its frequency in <em>all</em> documents. That’s the essence of TF*IDF.</p>
<p>For more information, see the <a href="http://en.wikipedia.org/wiki/Tf*idf">Wikipedia</a> page.</p>
<p>Run the script this way on a small matrix:</p>
<pre><code>run.rb scripts/TfIdf10.scala \
--input data/matrix/docBOW.tsv \
--output output/featSelectedMatrix.tsv \
--nWords 300
</code></pre>
<h1 id="type-safeapi">Type-Safe API</h1>
<p>So far, we have been using the original <em>Fields-Based API</em>, which emphasizes naming fields and uses a relatively dynamic approach to typing. This is consistent with Cascading’s model.</p>
<p>There is newer, more experimental <em>Type-Safe API</em> that attempts to more fully exploit the type safety provided by Scala. We won’t discuss it here, but refer you to the <a href="https://github.com/twitter/scalding/wiki/Type-safe-api-reference">Type-Safe API Wiki page</a> for more information.</p>
<h1 id="usingscaldingwithhadoop">Using Scalding with Hadoop</h1>
<p>Now we’ll use the <code>scripts/scald.rb</code> script in the Scalding distribution to a script as a Hadoop job. For example, assuming that you cloned the Scalding repo in a sister directory of the workshop directory, here is a command to run <code>HadoopTwitter11</code>, which is identical to <code>Twitter6</code> that we ran previously, except that we now use the <code>limit</code> method, which <em>won’t</em> throw an exception when we run with Hadoop:</p>
<pre><code>../scalding/scripts/scald.rb --hdfs-local --host localhost \
scripts/HadoopTwitter11.scala \
--input data/twitter/tweets.tsv \
--uniques output/unique-languages \
--count_star output/count-star \
--count_star_limit output/count-star-limit
</code></pre>
<p>On a laptop configuration using <em>pseudo-distributed</em> mode, use <code>localhost</code> for the Hadoop host name flag. Use the server name for the <em>JobTracker</em> master process when running on a real cluster. Note that the <code>--hdfs-local</code> option actually means use MapReduce, but ignore the <em>Hadoop Distributed File System</em> (HDFS). Instead, use the local file system like we have been doing. If we used the <code>--hdfs</code> option instead, all paths would be interpreted as relative to HDFS. The paths shown would be assumed to be relative to the user’s home directory in HDFS, which is <code>/user/<name></code>, by default.</p>
<p>Finally, the values specified for output using the <code>--uniques</code>, <code>count_star</code>, and <code>count_star_limit</code> flags will be interpreted as <em>directory</em>, not <em>file</em> names as previously. This follows conventional Hadoop practice, where the parallel processes might result in multiple, concurrently-written output files!</p>
<p>In this case, the <code>limit</code> method doesn’t trigger an exception and each directory will contain two files, a <code>part-00000</code> file (partition number <code>00000</code>) that contains the data and a <code>.part-00000.crc</code> file that contains a CRC of the data file. With a larger data set and running on a real distributed cluster, instead of the <em>pseudo-distributed</em> mode you run on a single machine, there might be multiple files. The CRC file serves two purposes. First, it can be used to check for a corrupt data file and second, when it is written, processes watching the directory <em>know</em> that the process writing the corresponding data file has finished! This is important when sequencing processing tasks.</p>
<h1 id="conclusions">Conclusions</h1>
<h2 id="comparisonswithothertools">Comparisons with Other Tools</h2>
<p>It’s interesting to contrast Scalding with other tools.</p>
<h3 id="cascading">Cascading</h3>
<p>Because Scala is a <em>functional programming</em> language with excellent support for DSL (domain-specific language) creation, using Scalding is much nicer than the Java-based Cascading itself, because Scalding programs are more concise and intuitive.</p>
<h3 id="cascalog">Cascalog</h3>
<p>This Clojure dialect written by Nathan Marz also benefits from the functional nature and concision of Clojure. Nathan has also built in logic-programming features from Datalog.</p>
<h3 id="pig">Pig</h3>
<p>Pig has very similar capabilities, with notable advantages and disadvantages.</p>
<h4 id="advantages">Advantages</h4>
<ul>
<li><em>A custom language</em> - A purpose-built language for a particular domain can optimize expressiveness for common scenarios.</li>
<li><em>Type Safety</em> - Although Scala is strongly-typed, Cascading isn’t, at least in the sense that you don’t normally define the types of fields, except where necessary (e.g., to call math routines with numbers). Pig (like Hive) encourages specifying the type of every field.</li>
<li><em>Lazy evaluation</em> - you define the work flow, then Pig compiles, optimizes, and runs it when output is required. Scalding, following Scala, uses eager evaluation; each expression is executed as soon as it’s parsed.</li>
<li><em>Describe</em> - The describe feature is very helpful when learning how each Pig statement defines a new schema. There is an API call, <code>fields</code> on Pipes to get the field names, but it’s less convenient to use, especially in interactive scenarios.</li>
</ul>
<h4 id="disadvantages">Disadvantages</h4>
<ul>
<li><em>Not Turing complete</em> - You have to write extensions in other languages. By using Scala, Scalding lets you write everything in one language.</li>
<li><em>Slower</em> - At least for local jobs, Scalding (and Cascading) avoid Hadoop APIs completely and therefore run noticeably faster.</li>
</ul>
<h3 id="hive">Hive</h3>
<p>Hive is ideal when your problem fits the SQL model for queries. It’s less useful for complex transformations. Also, like Pig, extensions must be written in another language.</p>
<div class="footnotes">
<hr />
<ol>
<li id="fn:1">
<p>But when I refined these notes in November 2012, the stock had corrected to ~$500/share! <a href="#fnref:1" title="return to article" class="reversefootnote"> ↩</a></p>
</li>
</ol>
</div>
</div>
</body>
</html>