-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTeraSort.cpp
More file actions
129 lines (99 loc) · 3.93 KB
/
TeraSort.cpp
File metadata and controls
129 lines (99 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#ifndef TERASORT_CPP
#define TERASORT_CPP
#include "TeraSort.h"
template <class T, typename S>
TeraSort<T,S>::TeraSort (char * p_input_file_name,char * p_output_file_name,
uint16_t p_mappers, uint16_t p_reducers,uint64_t p_sample_size)
{
input = fopen (p_input_file_name,"rb");
output = fopen (p_output_file_name,"wb");
file_byte_size = 0 ;
file_record_size = 0 ;
mappers = p_mappers;
reducers = p_reducers;
sample_size = p_sample_size;
map_engines = NULL;
partitioner=NULL;
if ( input != NULL && output != NULL)
{
//Do your implementation here
//sets the file pointer to the end of the file
fseek(input,0,SEEK_END);
//stores the size of the program in the file_byte_size
file_byte_size = ftell(input);
rewind(input);
file_record_size = 100; //each line is 100 bytes
//stores the number of lines by dividing the file size by the size of each line
uint64_t records_count = file_byte_size / file_record_size;
uint64_t rem = records_count % mappers; //the remainder is the value which shall be passed to last thread more than the others.
uint64_t start = records_count / mappers ; //the starting position for all threads
//partitioner<T,S>::Partitioner (FILE * f,uint16_t p_partitions_count,uint64_t p_partition_expected_size)
uint64_t partition_expected_size = records_count / reducers;
partitioner = new Partitioner <T,S> (output, reducers, partition_expected_size);
int i;
map_engines = new Mapper <T,S> *[mappers];
reduce_engines = new Reducer <T,S> *[reducers];
for (i = 0; i<mappers -1; i++)
{
//Mapper<T,S>::Mapper (FILE * f, uint64_t p_start, uint64_t p_count, uint64_t p_sample_size, Partitioner<T,S> * p_partitioner)
map_engines[i] = new Mapper <T,S> (input, i*start, records_count / mappers, sample_size, partitioner);
}
map_engines[mappers-1] = new Mapper <T,S> (input, i*start, ((records_count / mappers)+ rem), sample_size, partitioner);
for (int j = 0; j < reducers; j++)
{
reduce_engines[j]= new Reducer <T,S> (partitioner->getPartition(j));
}
}
else perror("opening input or output files has failed");
}
template <class T, typename S>
void TeraSort<T,S>::execute()
{
printf ("Terasort Start Execute\n");
for ( uint16_t i = 0 ; i < mappers; i++)
{
std::thread * th = new std::thread(Mapper<T,S>::start, map_engines[i]);
map_engines[i]->setThread(th);
}
//Do your implementation here
for (int i=0; i<mappers; i++)
{
map_engines[i]->waitForRunToFinish();
}
//now I should calculate the Cutpoints so that they can be used in phase 2
partitioner->calcCutpointsAverage(mappers);
//this would be resposnible for calling phase 2
for ( uint16_t i = 0 ; i < mappers; i++)
{
std::thread * th2 = new std::thread(Mapper<T,S>::start, map_engines[i]);
map_engines[i]->setThread(th2);
}
//Do your implementation here
for (int x=0; x<mappers; x++)
{
map_engines[x]->waitForRunToFinish();
}
for ( uint16_t j = 0 ; j < reducers; j++)
{
std::thread * th = new std::thread(Reducer<T,S>::start, reduce_engines[j]);
reduce_engines[j]->setThread(th);
}
for (int x=0; x<reducers; x++)
reduce_engines[x]->waitForRunToFinish();
for ( uint16_t i = 0 ; i < reducers; i++)
partitioner->getPartition(i)->dump();
}
template <class T, typename S>
TeraSort<T,S>::~TeraSort()
{
if ( input != NULL) fclose (input);
if ( output != NULL) fclose (output);
if ( map_engines != NULL)
{
for ( uint16_t i = 0 ; i < mappers; i++)
delete (map_engines[i]);
free(map_engines);
}
if ( partitioner != NULL ) delete(partitioner);
}
#endif