`
`Exhibit J
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 2 of 25 PageID #: 476
`I 1111111111111111 11111 1111111111 1111111111 111111111111111 1111111111 11111111
`
`US007590620Bl
`
`c12) United States Patent
`Pike et al.
`
`(10) Patent No.:
`(45) Date of Patent:
`
`US 7,590,620 Bl
`Sep.15,2009
`
`5,920,854 A *
`5,956,704 A *
`9/1999 Gautam et al. ................. 707/1
`5,963,954 A * 10/1999 Burrows ..................... 707/102
`A * 12/1999 McComb et a!. ...............
`707/5
`6,006,224
`A *
`2/2000 Tsuchida et al. ...............
`Bl*
`1/2001 Matsuzawa et al. ............
`Bl*
`5/2001 Katariya ........................
`Bl*
`
`6,026,394
`
`6,182,061
`
`6,226,635
`
`6,256,621
`
`7/1999 Kirsch et al. ................... 707/3
`
`707/3
`
`707/2
`
`707/4
`
`7/2001 Tsuchida et al. ...............
`
`707/2
`
`(Continued)
`
`OTHER PUBLICATIONS
`
`Andres C. Arpaci-Dusseau et al., "High-Performance Sorting on
`Networks ofWorkstations", Proceedings of the 1997 AMC SJGMOD
`international Conference on Management of Data, May 1997,
`Tuscon, Arizona.
`
`(Continued)
`
`Primary Examiner-Hung Q Pham
`Assistant Examiner-Hubert Cheung
`(74) Attorney, Agent, or Firm-Morgan, Lewis & Bockius
`LLP
`
`(57)
`
`ABSTRACT
`
`A method and system for analyzing data records includes
`allocating groups of records to respective processes of a first
`plurality of processes executing in parallel. In each respective
`process of the first plurality of processes, for each record in
`the group of records allocated to the respective process, a
`query is applied to the record so as to produce zero or more
`values. Zero or more emit operators are applied to each of the
`zero or more produced values so as to add corresponding
`information to an intermediate data structure. Information
`from a plurality of the intermediate data structures is aggre
`gated to produce output data .
`
`30 Claims, 9 Drawing Sheets
`
`lnputFlles
`
`(54) SYSTEM AND METHOD FOR ANALYZING
`DATA RECORDS
`
`(75)
`
`Inventors: Robert C. Pike, Menlo Park, CA (US);
`Sean Quinlan, Menlo Park, CA (US);
`Sean M. Dorward, Martinsville, NJ
`(US); Jeffrey Dean, Palo Alto, CA (US);
`Sanjay Ghemawat, Mountain View, CA
`(US)
`
`(73) Assignee: Google Inc., Mountain View, CA (US)
`
`( *) Notice:
`
`Subject to any disclaimer, the term ofthis
`patent is extended or adjusted under 35
`U.S.C. 154(b) by 395 days.
`
`(21) Appl. No.: 10/954,692
`
`(22) Filed:
`
`Sep.29,2004
`
`Related U.S. Application Data
`
`(63) Continuation-in-part of application No. 10/871,244,
`filed on Jun. 18, 2004.
`
`(51)
`
`Int. Cl.
`(2006.01)
`G06F 17130
`(52) U.S. Cl. ............................................... 707/3; 707/7
`(58) Field of Classification Search ............. 707/1-206;
`709/201; 717/149
`See application file for complete search history.
`
`(56)
`
`References Cited
`
`U.S. PATENT DO CUMENTS
`
`4,876,643
`5,345,584
`5,414,849
`5,414,899
`5,471,622
`5,590,319
`5,806,059
`5,819,251
`5,870,743
`5,884,299
`5,884,303
`
`A * 10/1989
`A *
`9/1994
`A *
`5/1995
`A *
`5/1995
`A * 11/1995
`A * 12/1996
`A *
`9/1998
`A * 10/1998
`A *
`2/1999
`A *
`3/1999
`A *
`3/1999
`
`McN eill et al. ............. 710/110
`Hill ............................ 711/170
`Yamamoto .................. 717/149
`Raetz ..........................
`16/380
`Eadline .........................
`707/3
`Cohen et al. ...................
`707/4
`Tsuchida et al. ...............
`707/2
`Kremer et al. .................
`707/1
`Cohen et al. ...................
`707/8
`Ramesh et al. .................
`707/2
`Brown ...........................
`707/3
`
`Map Task
`Queu&
`
`"'
`ReduceTask
`Queue
`ill
`
`statusTable(s)
`
`Cloudera Exhibit 1002 - Page 1 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 3 of 25 PageID #: 477
`
`US 7,590,620 B1
`Page 2
`
`U.S. PATENT DOCUMENTS
`
`6,301,574
`6,366,904
`6,408,292
`6,556,988
`6,567,806
`6,741,992
`6,910,070
`6,961,723
`6,983,322
`7,099,871
`7,103,590
`7,146,365
`7,430,549
`7,433,863
`20O2/O120917
`2002/0147708
`2002fO1988.72
`2003/OO33279
`2003/0217033
`2004.0034639
`2004/011 1410
`2004/O148273
`2004/O220960
`2005/0O277O1
`2005, 0028134
`2005.0049.996
`2005, OOSOO30
`2005/0097.108
`2005. O131893
`2005/O138222
`
`10, 2001
`4, 2002
`6, 2002
`4, 2003
`5/2003
`5, 2004
`6, 2005
`11/2005
`1, 2006
`8, 2006
`9, 2006
`12, 2006
`9, 2008
`10, 2008
`8, 2002
`10, 2002
`12, 2002
`2, 2003
`11, 2003
`2, 2004
`6, 2004
`T/2004
`11, 2004
`2, 2005
`2, 2005
`3, 2005
`3, 2005
`5/2005
`6, 2005
`6, 2005
`
`Thomas et al. ................. 707/1
`BenHadda et al. ............. 707/3
`Bakalash et al. ............... 707/2
`TSuchida et al. ............... 707/3
`TSuchida et al. ............... 707/7
`McFadden ................... 707/10
`Mishra et al. ............... TO9,224
`Faybishenko et al. .......... 707/3
`Tripp et al. ................. 709,225
`Faybishenko et al. ......... 707/10
`Murthy et al. ................. 707/3
`Allen et al. .................... 707/8
`Zane et al. .....
`... 707.3
`Zane et al. .....
`... 707/2
`Abrari et al. ...
`717,110
`Thomas et al. ....
`... 707.3
`MacNicol et al. .
`... 707.3
`Gibson et al. .....
`... 707/1
`Sandler et al. .
`... 707/1
`McFadden ..
`... 707/10
`Burgoon et al.
`... 707/4
`Allen et al. .................... 707/2
`Ojeil et al. .................. 707/102
`Zane et al. ..................... 707/3
`Zane et al. .................. 717/106
`Srinivasan et al. ............. 707/1
`Gudbartsson et al. ......... 707/3
`Wang et al. ................. 7O7/1OO
`Von Glan ....................... 707/5
`Chari et al. .................... T10/1
`
`
`
`2005/0262045 A1* 11/2005 TSuchida et al. ............... 707/2
`OTHER PUBLICATIONS
`Arash Baratloo et al., "Charlotte: Metacomputing on the Web”, Pro
`ceedings of the 9th International Conference on Parallel and Dis
`tributed Computing Systems, 1996.
`Luiz André Barroso et al., “Web Search for a Planet: The Google
`Cluster Architecture', IEEE Micro, Mar.-Apr. 2003, pp. 22-28.
`Guy E. Blellochet al., “Scans as Primitive Parallel Operations'. IEEE
`Transactions on Computers, C-38(11), Nov. 1989.
`Sanjay Ghemawatet al., “The Google File System”, 19th Symposium
`on Operating Systems Principles, Lake George, New York, 2003, pp.
`29-43.
`Sergei Gorlatch, “Systematic Efficient Parallelization of Scan and
`Other List Homomorphisms’. Euro-Par '96, Parallel Processing,
`Lecture Notes in Computer Science 1996, pp. 401–408.
`Jim Gray, “Year 2004 Results'. Sort Benchmark Home Page, http://
`research.microsoft.com/barc? SortBenchmark?.
`William Gropp et al. Using MIP: Portable Parallel Programming
`with the Message-Passing Interface, MIT Press, Cambridge, MA
`1999.
`Richard E. Ladner et al., “Parallel Prefix Computation”, Journal of
`the ACM, vol. 27, No. 4, Oct. 1980, pp. 831-838.
`Erik Riedel et al., “Active Disks for Large-Scale Data Processing”.
`IEEE Computer, Jun. 2001, pp. 68-74.
`Douglas Thain et al., “Distributed Computing in Practice: The Con
`dor Experience'. Concurrency and Computation. Practice and
`Experience, 2004.
`Jim Wyllie, "SPsort: How to Sort a Terabyte Quickly”. http:/almel.
`almaden.ibm.com/sc/spsort.pdf.
`Leslie G. Baliant, A Bridging Model for Parallel Computation. Com
`munications of the ACM. vol. 33, No. 8, Aug. 1990, pp. 103-111.
`“AT&T Labs Research Hancock Project'. http://www.research.att.
`cm/~kfisher/hancock?.
`* cited by examiner
`
`Cloudera Exhibit 1002 - Page 2 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 4 of 25 PageID #: 478
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 1 of 9
`
`US 7,590,620 B1
`
`
`
`
`
`
`
`??J
`
`
`
`|epow fiu?ssaooud eqed eleos-e6JeT
`
`p. ?un61-I
`
`suo?euado
`
`Z?J
`
`
`
`eqeq qndu]
`
`Cloudera Exhibit 1002 - Page 3 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 5 of 25 PageID #: 479
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 2 of 9
`
`US 7,590,620 B1
`
`MapReduce
`Binary
`
`
`
`
`
`Input Files
`
`Split O
`
`Split1..... Split N-1
`
`
`
`
`
`
`
`
`Intermediate
`File A
`206a
`
`
`
`Intermediate
`File B
`206b
`
`Cloudera Exhibit 1002 - Page 4 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 6 of 25 PageID #: 480
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 3 of 9
`
`US 7,590,620 B1
`
`Input Files
`
`Split O
`
`Split1..... Split N-1
`
`Map Reduce
`Binary
`312
`
`Work Queue
`Master
`314
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`Mast
`)
`3S
`OCeSS eS
`
`Intermediate
`File A
`(Local Database)
`
`Intermediate
`File B
`(Local Database)
`
`Intermediate
`File C
`(Local Database) O. O.
`306C
`
`
`
`
`
`Reduce Task
`Queue
`324
`Status Table(s)
`326
`
`Output 1
`31 Oa
`
`Figure 3
`
`Cloudera Exhibit 1002 - Page 5 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 7 of 25 PageID #: 481
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 4 of 9
`
`US 7,590,620 B1
`
`Large-Scale Data
`Processing System
`400
`N/A
`
`CPU(s)
`402
`
`Memory
`412-N
`Operating System
`
`404
`
`User Interface
`
`y
`410
`
`446
`
`41
`
`406
`408
`
`Network Communication Module
`
`System initialization Module
`
`Application Software
`Map Operator
`Combiner Operator
`Reduce Operator
`Partition Operator
`
`
`
`
`
`Interface
`
`Map Functions
`
`File System
`(FS)
`
`Reduce Functions
`
`Partition Function
`
`Worker Process(es)
`Intermediate Files
`
`Master Process(es)
`
`Master Status Table(s)
`
`Figure 4
`
`416
`
`418
`
`420
`
`422
`424
`425
`426
`428
`
`432
`
`434
`436
`
`4.38
`
`440
`4.42
`
`444
`
`Cloudera Exhibit 1002 - Page 6 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 8 of 25 PageID #: 482
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 5 of 9
`
`US 7,590,620 B1
`
`Datacenter 4 (DC4)
`
`T4
`
`Datacenter 1 (DC1)
`502
`
`
`
`
`
`
`
`
`510a
`
`Figure 5
`
`Cloudera Exhibit 1002 - Page 7 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 9 of 25 PageID #: 483
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 6 of 9
`
`US 7,590,620 B1
`
`Task Failure
`
`Remedia
`Action
`
`Wait For All
`Tasks To
`Complete
`
`Any Waiting
`Tasks
`
`
`
`
`
`
`
`
`
`Task Completion
`
`
`
`
`
`Wait For
`ldle PrOCeSS
`
`
`
`612
`
`Assign A Waiting Task
`To An Idle Process;
`Update Status
`Table(s)
`
`610
`
`Figure 6
`
`Cloudera Exhibit 1002 - Page 8 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 10 of 25 PageID #: 484
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 7 Of 9
`
`US 7,590,620 B1
`
`Task Status Table
`Process input Files output Files
`Task ID Status
`Map0000 Completed P0000
`12340
`12341
`Map0001 Completed P0001
`12344
`12345
`Map0103 in ProgressPO010 12030
`12031
`Map0104 Failed
`PO011
`10101
`12102
`Map0103bn ProgressPO010 12030
`12031
`
`
`
`
`
`
`
`RedO000 in ProgressP0033 12340, ...
`RedOOO1
`-
`-
`
`14000
`-
`
`-
`
`-
`
`Figure 7A
`
`Process Status Table
`Process ID status Location Current
`POOOO
`CPU001 Task Map0000
`POOO1
`CPU011 Task Map0001
`
`
`
`POO33
`POO34
`
`POO34
`
`
`
`
`
`CPU015 Task RedOOOO
`CPU016 None
`idle
`Failed CPU016 None
`
`PO100
`
`Busy
`
`CPU031 waMaster
`
`
`
`
`
`
`
`
`
`O
`
`Figure 7B
`
`Cloudera Exhibit 1002 - Page 9 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 11 of 25 PageID #: 485
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 8 of 9
`
`US 7,590,620 B1
`
`Large-Scale Data
`Processing System
`800
`N/A Memory
`412
`Operating System
`Network Communication Module
`System initialization Module
`Application Software
`Application Script
`
`CE)
`
`m
`
`414
`
`404
`
`
`
`User Interface
`
`Keyboard
`
`410
`
`446
`
`4O6
`
`408
`
`Network
`
`Interface
`
`
`
`
`
`
`
`File System
`(FS)
`
`
`
`Application Library
`Collection Procedures
`Emit Operator
`Reduce Operator
`
`e o o
`Function Library
`Map Functions
`
`Reduce Functions
`
`Partition Function
`
`Worker Process(es)
`Intermediate Files
`
`Master Process(es)
`Master Status Table(s)
`
`Figure 8
`
`...
`
`416
`
`418
`
`420
`
`802
`
`804
`
`810
`
`812-1
`814
`816
`
`812-2
`
`430
`4.32
`
`434
`436
`
`438
`
`440
`4.42
`
`444
`
`Cloudera Exhibit 1002 - Page 10 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 12 of 25 PageID #: 486
`
`U.S. Patent
`
`Sep. 15, 2009
`
`Sheet 9 Of 9
`
`US 7,590,620 B1
`
`
`
`
`
`
`
`
`
`
`
`
`
`Records 905
`
`Records Allocated to Processes 910
`
`PrOCeSS 1915
`
`Process N 945
`
`ReCOrd 920
`
`Query/Extract 925
`
`
`
`
`
`Value(s) 930
`
`Emit Operator(s)
`935
`
`Intermediate
`Data
`Structure(s)
`940
`
`Intermediate
`Data
`Structure(s)
`950
`
`Combinel Aggregate 955
`
`
`
`
`
`Output Data
`96.O
`
`Figure 9
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`
`Emit Operators Library 965
`
`
`
`
`
`
`
`Top 982
`Histogram 983
`Quantile 984
`Unique 985
`
`Cloudera Exhibit 1002 - Page 11 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 13 of 25 PageID #: 487
`
`1.
`SYSTEMAND METHOD FOR ANALYZING
`DATA RECORDS
`
`US 7,590,620 B1
`
`2
`FIG. 7B is a block diagram of an exemplary process status
`table.
`FIG. 8 is a block diagram of an exemplary system for
`analyzing data records.
`FIG. 9 is a flow diagram illustrating an exemplary method
`for analyzing data records.
`Like reference numerals refer to corresponding parts
`throughout the several views of the drawings.
`
`10
`
`DESCRIPTION OF EMBODIMENTS
`
`CROSS REFERENCE TO RELATED
`APPLICATION
`
`This application is a continuation-in-part of U.S. applica
`tion Ser. No. 10/871,244, filed Jun. 18, 2004, the disclosure of
`which is hereby incorporated by reference.
`
`TECHNICAL FIELD
`
`The disclosed embodiments relate generally to data pro
`cessing systems and methods, and in particular to a frame
`work for simplifying large-scale data processing and analyZ
`ing data records.
`
`15
`
`BACKGROUND
`
`Large-scale data processing involves extracting data of
`interest from raw data in one or more datasets and processing
`it into a useful data product. The implementation of large
`scale data processing in a parallel and distributed processing
`environment typically includes the distribution of data and
`computations among multiple disks and processors to make
`efficient use of aggregate storage space and computing power.
`Various languages and systems provide application pro
`grammers with tools for querying and manipulating large
`datasets. These conventional languages and systems, how
`ever, fail to provide Support for automatically parallelizing
`30
`these operations across multiple processors in a distributed
`and parallel processing environment. Nor do these languages
`and systems automatically handle system faults (e.g., proces
`sor failures) and I/O scheduling. Nor do these languages and
`systems efficiently handle the analysis of data records.
`
`25
`
`35
`
`SUMMARY
`
`A method and system for analyzing data records includes
`allocating groups of records to respective processes of a first
`plurality of processes executing in parallel. In each respective
`process of the first plurality of processes, for each record in
`the group of records allocated to the respective process, a
`query is applied to the record so as to produce Zero or more
`values. Zero or more emit operators are applied to each of the
`Zero or more produced values so as to add corresponding
`information to an intermediate data structure. Information
`from a plurality of the intermediate data structures is aggre
`gated to produce output data.
`
`BRIEF DESCRIPTION OF THE DRAWINGS
`
`FIG. 1 is a block diagram of a large-scale data processing
`model.
`FIG. 2 is a block diagram of a large-scale data processing
`system.
`FIG. 3 is a block diagram of a large-scale data processing
`system, including a master process for managing tasks.
`FIG. 4 is a block diagram of a computer system for the data
`processing systems shown in FIGS. 2 and 3.
`FIG. 5 is a block diagram of a data distribution network for
`large-scale data processing.
`FIG. 6 is a flow diagram of an embodiment of a process for
`assigning tasks to processes.
`FIG. 7A is a block diagram of an exemplary task status
`table.
`
`40
`
`45
`
`50
`
`55
`
`60
`
`65
`
`Large-Scale Data Processing Model
`
`FIG. 1 is a block diagram of a large-scale data processing
`model 100. The model 100 generally includes mapping
`operations 102 and reduction operations 104. The mapping
`operations 102 apply one or more mapping operations to a set
`of input data C, (e.g., text files, records, logs, sorted maps,
`etc.) to provide a set of intermediate data values f. The
`reduction operations 104 apply one or more reduction opera
`tions to the set of intermediate data values B, to provide a set
`of output data (p, (e.g., tables, Sorted maps, record I/O, etc.). In
`Some embodiments, the mapping operations 102 are imple
`mented by one or more application-specific mapping func
`tions, which map a set of input data C, to a set of intermediate
`data values B. The intermediate data values B, or information
`corresponding to the intermediate data values are stored in
`one or more intermediate data structures. Some examples of
`intermediate data structures include, without limitation, files,
`buffers, histograms, count tables and any other Suitable data
`structure or device for storing digital information. In some
`embodiments, the intermediate data values B, are processed
`by the reduction operations 104, which are implemented by
`one or more application-specific reduction functions, which
`reduce the set of intermediate data values B, to a set of output
`data (p. In some embodiments, the intermediate data values B,
`are processed by one or more application-independent statis
`tical information processing functions, which reduce the set
`of intermediate data values B, to a set of output data (p.
`Distributed Data Processing System
`
`In order to explain the operation of the large scale data
`processing system, it is helpful to consider an exemplary
`distributed data processing system in which the large scale
`data processing is performed. In general, the embodiments
`described here can be performed by a set of interconnected
`processors that are interconnected by one or more communi
`cation networks.
`FIG. 5 is a block diagram of an exemplary distributed data
`processing system 500. It should be appreciated that the lay
`out of the system 500 is merely exemplary and the system 500
`may take on any other Suitable layout or configuration. The
`system 500 is used to store data, perform computational tasks,
`and transmit data between datacenters DC1-DC4. The system
`may include any number of data centers DCX, and thus the
`number of data centers shown in FIG. 5 is only exemplary.
`The system 500 may include dedicated optical links or other
`dedicated communication channels, as well as Supporting
`hardware such as modems, bridges, routers, Switches, wire
`less antennas and towers, and the like. In some embodiments,
`the system 500 includes one or more wide area networks
`(WANs) as well as multiple local area networks (LANs). In
`some embodiments, the system 500 utilizes a private net
`work, i.e., the system and its interconnections are designed
`and operated exclusively for a particular company or cus
`tomer. Alternatively, a public network may be used.
`
`Cloudera Exhibit 1002 - Page 12 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 14 of 25 PageID #: 488
`
`US 7,590,620 B1
`
`4
`work queue master 214, controls the set of processing tasks.
`As described in more detail below, the work queue master 214
`determines how many map tasks to use, how many reduce
`tasks to use, which processes and processors to use to perform
`those tasks, where to store the intermediate data and output
`data, how to respond to any processing failures, and so on.
`It should be noted that the work queue master 214 assigns
`tasks to processes, and that multiple processes may be
`executed by each of the processors in the group of processors
`that are available to do the work assigned by the work queue
`master 214. In the context of FIG. 5 or any other multiple
`processor system, the set of processes controlled by the work
`queue master 214 may be a subset of the full set of processes
`executed by the system, and furthermore the set of processors
`available to do the work assigned by the work queue master
`214 may be fewer than the full set of processors in the system.
`Some of the resources of the system may be used for other
`tasks, such as tasks that generate the input data 202, or that
`utilize the output data 210. However, in some embodiments,
`some or all of the tasks that generate the input data 202 or
`utilize the output data 210 may also be controlled or super
`vised by the work queue master 214. In addition, in some
`embodiments processors can be added or removed from the
`processing system during the execution of a map-reduce
`operation. The work queue master 214 keeps track of the
`processors in the system and the available processes execut
`ing on those processors.
`Application programmers are provided with a restricted set
`of application-independent operators for reading input data
`and generating output data. The operators invoke library
`functions that automatically handle data partitioning, paral
`lelization of computations, fault tolerance (e.g., recovering
`from process and machine failures) and I/O scheduling. In
`Some embodiments, to perform a specific data processing
`operation on a set of input files, the only information that must
`be provided by an application programmer is: information
`identifying the input file(s), information identifying or speci
`fying the output files to receive output data, and two applica
`tion-specific data processing operators, hereinafter referred
`to as map() and reduce(). Generally, the map() operator
`specifies how input data is to be processed to produce inter
`mediate data and the reduce() operator specifies how the
`intermediate data values are to be merged or otherwise com
`bined. Note that the disclosed embodiments are not limited to
`any particular type or number of operators. Other types of
`operators (e.g., data filters) can be provided, as needed,
`depending upon the system 200 architecture and the data
`processing operations required to produce the desired, appli
`cation-specific results. In some embodiments, the application
`programmers provide a partition operator, in addition to the
`map() and reduce() operators. The partition() operator,
`specifies how the intermediate data is to be partitioned over a
`set of intermediate files.
`To perform large-scale data processing, a set of input files
`202 are split into multiple data blocks 0, ...,N-1 of either a
`specified or predefined size (e.g., 64 MB). Alternately, in
`some embodiments the input files 202 have a predefined
`maximum size (e.g., 1 GB), and the individual files are the
`data blocks. A data block is a subset of data that is retrieved
`during processing. In some embodiments, the data blocks are
`distributed across multiple storage devices (e.g., magnetic or
`optical disks) in a data distribution network to fully utilize the
`aggregate storage space and disk bandwidth of the data pro
`cessing system.
`Referring to FIGS. 2 and 5, in some embodiments the input
`data files 202 are stored in one or more data centers DC1
`DC4. Ideally, the work queue master 214 assigns tasks to
`
`3
`Some of the datacenters DC1-DC4 may be located geo
`graphically close to each other, and others may be located far
`from the other datacenters. In some embodiments, each data
`center includes multiple racks. For example, datacenter 502
`(DC1) includes multiple racks 508a,...,508n. The racks 508
`can include frames or cabinets into which components are
`mounted. Each rack can include one or more processors
`(CPUs) 510. For example, the rack 508a includes CPUs
`510a,..., 510n (slaves 1-16) and the nth rack 506n includes
`multiple CPUs 510 (CPUs 17-32). The processors 510 can
`include data processors, network attached storage devices,
`and other computer controlled devices. In some embodi
`ments, at least one of processors 510 operates as a master
`processor, and controls the scheduling and data distribution
`tasks performed throughout the network system 500. In some
`embodiments, one or more processors 510 may take on one or
`more roles, such as a master and/or slave. A rack can include
`storage (e.g., one or more network attached disks) that is
`shared by the one or more processors 510.
`In some embodiments, the processors 510 within each rack
`508 are interconnected to one another through a rack switch
`506. Furthermore, all racks 508 within each datacenter 502
`are also interconnected via a datacenter switch 504. As noted
`above, the present invention can be implemented using other
`arrangements of multiple interconnected processors.
`25
`Further details regarding the distributed system 500 of
`FIG. 5 can be found in U.S. patent application Ser. No.
`10/613,626, entitled “System and Method For Data Distribu
`tion.” filed Jul. 3, 2003, which application is incorporated by
`reference herein in its entirety.
`In another embodiment, the processors shown in FIG. 5 are
`replaced by a single large-scale multiprocessor. In this
`embodiment, map and reduce operations are automatically
`assigned to processes running on the processors of the large
`scale multiprocessor.
`Large-Scale Data Processing System I
`
`10
`
`15
`
`30
`
`35
`
`FIG. 2 is a block diagram of a large-scale data processing
`system 200. The system 200 provides application program
`mers with an application-independent framework for writing
`data processing software that can run in parallel across mul
`tiple different machines on a distributed network. The system
`200 is typically a distributed system having multiple proces
`sors, possibly including network attached storage nodes, that
`are interconnected by one or more communication networks.
`FIG. 2 provides a logical view of a system 200, which in some
`embodiments may be implemented on a system having the
`physical structure shown in FIG. 5. In one embodiment, the
`system 200 operates within a single data center of the system
`500 shown in FIG. 5, while in another embodiment, the sys
`tem 200 operates over two or more data centers of the system
`SOO.
`As shown in FIG.2, a set of input files 202 are processed by
`a first set of processes 204, herein called map processes, to
`produce a set of intermediate data, represented here by files
`206. The intermediate data 206 is processed by a second set of
`processes 208, herein called reduce processes, to produce
`output data 210. Generally each “map process” is a process
`configured (or configurable) to perform map functions and to
`execute an application-specific map operator. Each “reduce
`process is a process configured (or configurable) to perform
`reduce functions and to execute an application-specific
`reduce operator. In some embodiments, the application-spe
`cific reduce operator includes or is replaced by one or more
`application-independent statistical information processing
`functions. A control or Supervisory process, herein called the
`
`40
`
`45
`
`50
`
`55
`
`60
`
`65
`
`Cloudera Exhibit 1002 - Page 13 of 24
`
`
`
`Case 4:23-cv-01147-ALM Document 22-13 Filed 05/23/24 Page 15 of 25 PageID #: 489
`
`US 7,590,620 B1
`
`6
`or instructions that are application-independent. Only the
`actual map and reduce operators, which produce intermediate
`data values from the input data and that produce output data
`from the intermediate data values, respectively, are applica
`tion-specific. These application-specific operators are
`invoked by the map and reduce tasks assigned to processes in
`step 610. By making a clear boundary between the applica
`tion-independent aspects and application-specific aspects of
`performing a large scale data processing operation, the appli
`cation-independent aspects can be optimized, thereby mak
`ing the entire large scale data processing operation very effi
`cient. As noted above, in some embodiments, the application
`specific reduce operator is replaced by one or more
`application-independent statistical information processing
`functions.
`The process 600 begins by determining if there are tasks
`waiting to be assigned to a process (step 606). If there are no
`tasks waiting, then the process 600 waits for all the tasks to
`complete (step 604). If there are tasks waiting, then the pro
`cess 600 determines if there are any idle processes (step 608).
`If there are idle processes, then the process 600 assigns a
`waiting task to an idle process (step 610) and returns to step
`606. If there are no idle processes, the process 600 waits for an
`idle process (step 614). Whenevera process completes a task,
`the process sends a corresponding message to the work queue
`master 214, which updates the process and task status tables
`(step 612). The work queue master 214 may then assign a new
`task to the idle process, if it has any unassigned tasks waiting
`for processing resources. For reduce tasks, the work queue
`master 214 may defer assigning any particular reduce task to
`an idle process until such time that the intermediate data to be
`processed by the reduce task has, in fact, been generated by
`the map tasks. Some reduce tasks may be started long before
`the last of the map tasks are started if the intermediate data to
`be processed by those reduce tasks is ready for reduce pro
`cessing.
`In some embodiments, whenever a process fails, which
`may be discovered by the work queue master 214 using any of
`a number of known techniques, the work queue master 214
`(A) determines what task was running in the failed process, if
`any, (B) assigns that task to a new process, waiting if neces
`sary until an idle process becomes available, and (C) updates
`its process and task status tables accordingly. In some
`embodiments, the work queue master 214 may undertake
`remedial measures (step 602). Such as causing the failed
`process to be restarted or replaced by a new process. In some
`embodiments, the work queue master may further detect
`when such remedial measures fail and then update its process
`status table to indicate such failures. In addition, in some
`embodiments, when a map task fails and is restarted in a new
`process, all processes executing reduce tasks are notified of
`the re-execution so that any reduce task that has not already
`read the data produced by the failed process will read the data
`produced by the new process.
`FIG. 7A shows an exemplary task status table for keeping
`track of the status of map and reduce tasks. In some embodi
`ments, each task (e.g., map, reduce) is assigned task ID, a
`status, a process, and one or more input files and output files.
`In some embodiments, the input files field may specify a
`portion of an input file (e.g., where the portion comprises a
`data block) to be processed by the task, or this field may
`specify portions of two of more input files. The status field
`indicates the current status of the task (e.g., waiting,
`in-progress, completed, or failed), which is being performed
`by the assigned process identified in the process field. The
`process retrieves data from one or more input files (or the one
`or more input file portions) identified in the input file field and
`
`5
`processors 510 in datacenters where the input files are stored
`so as to minimize network traffic whenever possible. In some
`embodiments, the work queue master 214 uses input file
`information received from a file system to determine the
`appropriate processor or process for executing a task, using a
`hierarchical decision process. When a process in a processor
`in a datacenter DC1-DC4 is idle, it requests a task from the
`work queue master 214. The work queue master 214 searches
`the input file information received from the file system (e.g.,
`FS 446, FIG. 4), for an unprocessed data block on the
`machine assigned to process the task. If none are available,
`the work queue master 214 searches the file information for
`an unprocessed data block on the same rack 508 as the
`machine assigned to process the task. If none are available,
`the work queue master 214 searches the file information for
`an unprocessed data block in the same datacenter as the
`machine assigned to process the task. If none are available,
`the work queue master 214 will search for unprocessed blocks
`in other datacenters.
`By using a hierarchical assignment scheme, data blocks
`can be processed quickly without requiring large Volumes of
`data transfer traffic on the system 500. This in turn allows
`more tasks to be performed without straining the limits of the
`system 500.
`
`Task Management
`
`5
`
`10
`
`15
`
`25
`
`Referring again to FIG. 2, application programmers
`develop the map() and/or reduce() operators, which are
`computer programs that process input data and intermediate,
`30
`respectively. In some embodiments these operators are com
`piled into binary files 212 suitable for use on a particular
`processing platform. The binary files 202 are loaded into a