#include #include #include #include #include #include #include #include #include #include #include #include #include #include // #include // #include #define TAG_SEND_NUM_EDGES 1001 #define TAG_SEND_EDGES 1002 #define TAG_SEND_FINAL_RESULT 1003 typedef struct { int fst; int snd; } pair; void init_pair_type(MPI_Datatype *out); int main(int argc, char **argv) { MPI::Init(argc, argv); int rank = MPI::COMM_WORLD.Get_rank(), p = MPI::COMM_WORLD.Get_size(); MPI_Datatype IntPairType; init_pair_type(&IntPairType); // One process reads the file and distributes the data to the other processes // using a 1D decomposition (each rank gets approx same number of vertices). #pragma region FILE *fp; char *line = NULL; size_t len; ssize_t read; pair params; if (rank == 0) { printf("Hello\n"); fp = fopen(argv[1], "r"); if ((read = getline(&line, &len, fp)) != -1) sscanf(line, "%d %d", ¶ms.fst, ¶ms.snd); } // Send the params MPI_Bcast(¶ms, 1, IntPairType, 0, MPI::COMM_WORLD); int total_num_nodes = params.fst; int total_num_edges = params.snd; int each_num_nodes = total_num_nodes / p; // Calculate exactly how many nodes my current process holds int num_my_nodes = rank == p - 1 ? total_num_nodes - rank * each_num_nodes : each_num_nodes; int my_nodes[num_my_nodes]; std::function(int)> node_range = [p, total_num_nodes, each_num_nodes](int process) { int start = process * each_num_nodes; int end = process == p - 1 ? total_num_nodes : start + each_num_nodes; return std::make_pair(start, end); }; // Read the edges int num_my_edges; pair *my_edges; int counts[p], displs[p]; if (rank == 0) { line = NULL; pair all_edges[total_num_edges]; // For the current process, what's the last node we're expecting to see? int current_process = 0; std::pair current_node_range = node_range(current_process); int edge_counter = 0; for (int i = 0; i < total_num_edges; ++i) { getline(&line, &len, fp); int fst, snd; sscanf(line, "%d %d", &fst, &snd); if (fst >= current_node_range.second) { if (current_process == 0) { num_my_edges = edge_counter; my_edges = (pair *)calloc(num_my_edges, sizeof(pair)); memcpy(my_edges, all_edges, edge_counter * sizeof(pair)); } else { MPI_Send(&edge_counter, 1, MPI_INT, current_process, TAG_SEND_NUM_EDGES, MPI::COMM_WORLD); MPI_Send(all_edges, edge_counter, IntPairType, current_process, TAG_SEND_EDGES, MPI::COMM_WORLD); } // We're starting on the next process current_process += 1; current_node_range = node_range(current_process); edge_counter = 0; } all_edges[edge_counter].fst = fst; all_edges[edge_counter].snd = snd; edge_counter += 1; } // We have to send the last one again here, since it didn't get caught in // the loop above MPI_Send(&edge_counter, 1, MPI_INT, current_process, TAG_SEND_NUM_EDGES, MPI::COMM_WORLD); MPI_Send(all_edges, edge_counter, IntPairType, current_process, TAG_SEND_EDGES, MPI::COMM_WORLD); // int step = num_edges / p; // for (int i = 0; i < p; ++i) { // int start = i * step; // int end = i == p - 1 ? num_edges : start + step; // int count = end - start; // counts[i] = count; // displs[i] = start; // } } else { MPI_Recv(&num_my_edges, 1, MPI_INT, 0, TAG_SEND_NUM_EDGES, MPI::COMM_WORLD, NULL); my_edges = (pair *)calloc(num_my_edges, sizeof(pair)); MPI_Recv(my_edges, num_my_edges, IntPairType, 0, TAG_SEND_EDGES, MPI::COMM_WORLD, NULL); } char *buf = (char *)calloc(sizeof(char), 1000); int offset = 0; // Keep track of the current position in the buffer for (int i = 0; i < std::min(num_my_edges, 5); i++) { offset += sprintf(buf + offset, "(%d, %d)", my_edges[i].fst, my_edges[i].snd); if (i < len - 1) { // Add a separator (e.g., comma or space) if it's not the last offset += sprintf(buf + offset, " "); } } if (rank == 0) { fclose(fp); if (line) free(line); } #pragma endregion // STEP 2 TIMER STARTS HERE MPI::COMM_WORLD.Barrier(); double step_2_start_time = MPI::Wtime(); // Each process analyzes the non-local edges that are contained in its portion // of the graph. #pragma region std::map node_label_assignment; std::pair my_node_range = node_range(rank); // Initial node assignment for (int i = my_node_range.first; i < my_node_range.second; ++i) { node_label_assignment[i] = i; } std::map> adj; std::set non_local_nodes; std::set> non_local_edges; for (int i = 0; i < num_my_edges; ++i) { pair edge = my_edges[i]; adj[edge.fst].insert(edge.snd); if (!(my_node_range.first <= edge.fst && edge.fst < my_node_range.second)) { non_local_nodes.insert(edge.fst); non_local_edges.insert(std::make_pair(edge.snd, edge.fst)); } if (!(my_node_range.first <= edge.snd && edge.snd < my_node_range.second)) { non_local_nodes.insert(edge.snd); non_local_edges.insert(std::make_pair(edge.fst, edge.snd)); } } #pragma endregion // Each process determines which processors stores the non-local vertices // corresponding to the non-local edges. #pragma region std::map> send_map; std::map> recv_map; for (auto entry : non_local_edges) { int local_node = entry.first, remote_node = entry.second; int corresponding_process = remote_node / each_num_nodes; // The last process gets some extra nodes if (corresponding_process >= p) corresponding_process = p - 1; send_map[corresponding_process].insert(local_node); recv_map[corresponding_process].insert(remote_node); } #pragma endregion // All the processes are communicating to figure out which process needs to // send what data to the other processes. #pragma region #pragma endregion // STEP 5 TIMER STARTS HERE MPI::COMM_WORLD.Barrier(); double step_5_start_time = MPI::Wtime(); // The processes perform the transfers of non-local labels and updates of // local labels until convergence. #pragma region while (true) { // First, exchange the data that needs to be exchanged std::vector sendbuf; std::vector send_counts; std::vector send_displs; std::vector recv_counts; std::vector recv_displs; int recv_total; { int offset = 0; for (int i = 0; i < p; ++i) { int count = send_map[i].size(); // std::sort(send_map[i].begin(), send_map[i].end()); for (auto k : send_map[i]) { sendbuf.push_back(node_label_assignment[k]); } send_counts.push_back(count); send_displs.push_back(offset); offset += count; } offset = 0; for (int i = 0; i < p; ++i) { int count = recv_map[i].size(); // std::sort(recv_map[i].begin(), recv_map[i].end()); recv_counts.push_back(count); recv_displs.push_back(offset); offset += count; } recv_total = offset; } std::vector recvbuf(recv_total, 0); // std::cout << fmt::format("[{}] {} \t|| \t{}", rank, // fmt::join(send_counts, ", "), // fmt::join(recv_counts, ", ")) // << std::endl; MPI::COMM_WORLD.Alltoallv(sendbuf.data(), send_counts.data(), send_displs.data(), MPI_INT, recvbuf.data(), recv_counts.data(), recv_displs.data(), MPI_INT); std::map total_node_label_assignment(node_label_assignment); for (int i = 0; i < p; ++i) { std::vector ouais(recv_map[i].begin(), recv_map[i].end()); for (int j = 0; j < recv_counts[i]; ++j) { int remote_node = ouais[j]; int remote_value = recvbuf[recv_displs[i] + j]; total_node_label_assignment[remote_node] = remote_value; } } // For each local node, determine the minimum label out of its neighbors std::map new_labels; for (int i = my_node_range.first; i < my_node_range.second; ++i) { int current_value = total_node_label_assignment[i]; int min = current_value; for (auto neighbor : adj[i]) { if (total_node_label_assignment[neighbor] < min) min = total_node_label_assignment[neighbor]; } if (min < current_value) { new_labels[i] = min; } } // std::cout << fmt::format("[{}] Helloge {}", rank, // fmt::join(new_labels, ", ")) // << std::endl; // Have there been any changes in the labels? int num_changes = new_labels.size(); int total_changes; MPI::COMM_WORLD.Allreduce(&num_changes, &total_changes, 1, MPI_INT, MPI::SUM); // std::cout << fmt::format("[{}] # updates: {} ({})", rank, num_changes, // total_changes) // << std::endl; if (total_changes == 0) { break; } // Update the original node assignment for (auto entry : new_labels) { node_label_assignment[entry.first] = entry.second; } } #pragma endregion // END TIMERS MPI::COMM_WORLD.Barrier(); double end_time = MPI::Wtime(); printf("2-5 Time: %0.04fs\n", end_time - step_2_start_time); printf("5 Time: %0.04fs\n", end_time - step_5_start_time); // The results are gathered to a single process, which writes them to the // disk. #pragma region if (rank == 0) { std::vector all_assignments(total_num_nodes); std::map label_count; int ctr = 0; for (int i = 0; i < p; ++i) { std::pair this_node_range = node_range(i); int count = this_node_range.second - this_node_range.first; if (i == 0) { for (int j = 0; j < count; ++j) { all_assignments[this_node_range.first + j] = node_label_assignment[this_node_range.first + j]; label_count[all_assignments[this_node_range.first + j]]++; } } else { MPI::COMM_WORLD.Recv(&all_assignments[this_node_range.first], count, MPI::INT, i, TAG_SEND_FINAL_RESULT); for (int j = 0; j < count; ++j) { label_count[all_assignments[this_node_range.first + j]]++; } } } std::cout << "Done! " << label_count.size() << std::endl; } else { std::vector flat_assignments; for (int i = my_node_range.first; i < my_node_range.second; ++i) { flat_assignments.push_back(node_label_assignment[i]); } MPI::COMM_WORLD.Send(flat_assignments.data(), flat_assignments.size(), MPI::INT, 0, TAG_SEND_FINAL_RESULT); } #pragma endregion MPI::Finalize(); return 0; } void init_pair_type(MPI_Datatype *out) { int blocklengths[2] = {1, 1}; MPI_Datatype types[2] = {MPI_INT, MPI_INT}; MPI_Aint offsets[2]; offsets[0] = offsetof(pair, fst); offsets[1] = offsetof(pair, snd); MPI_Type_create_struct(2, blocklengths, offsets, types, out); MPI_Type_commit(out); }