#include #include #include #include #include #include #include #include #ifdef FMT_HEADER_ONLY #include #include #endif #define TAG_SEND_NUM_EDGES 1001 #define TAG_SEND_EDGES 1002 #define TAG_SEND_FINAL_RESULT 1003 #define MIN(a, b) \ ({ \ __typeof__(a) _a = (a); \ __typeof__(b) _b = (b); \ _a < _b ? _a : _b; \ }) typedef struct { int fst; int snd; } pair; void init_pair_type(MPI_Datatype *out); struct pair_vector { pair *ptr; int cap; int len; }; void pair_vector_init(struct pair_vector *); void pair_vector_clear(struct pair_vector *); void pair_vector_push(struct pair_vector *v, int fst, int snd); pair compute_node_range(int p, int total_num_nodes, int each_num_nodes, int process); int lookup_assignment(int *base_node_assignment, pair my_node_range, std::map> recv_map, int *recvbuf, int *recv_counts, int *recv_displs, int each_num_nodes, int rank, int node_number); int main(int argc, char **argv) { MPI_Init(&argc, &argv); int rank, p; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &p); MPI_Datatype IntPairType; init_pair_type(&IntPairType); // One process reads the file and distributes the data to the other processes // using a 1D decomposition (each rank gets approx same number of vertices). #pragma region FILE *fp; char *line = NULL; size_t len; ssize_t read; pair params; if (rank == 0) { fp = fopen(argv[1], "r"); // Read the first line if (getline(&line, &len, fp) != -1) sscanf(line, "%d %d", ¶ms.fst, ¶ms.snd); } // Send the params MPI_Bcast(¶ms, 1, IntPairType, 0, MPI_COMM_WORLD); int total_num_nodes = params.fst; int total_num_edges = params.snd; int each_num_nodes = total_num_nodes / p; // Calculate exactly how many nodes my current process holds int num_my_nodes = rank == p - 1 ? total_num_nodes - rank * each_num_nodes : each_num_nodes; int my_nodes[num_my_nodes]; pair node_ranges[p]; for (int i = 0; i < p; ++i) node_ranges[i] = compute_node_range(p, total_num_nodes, each_num_nodes, i); // Read the edges int num_my_edges; pair *my_edges; int counts[p], displs[p]; if (rank == 0) { line = NULL; // pair all_edges[total_num_edges]; struct pair_vector all_edges; pair_vector_init(&all_edges); // For the current process, what's the last node we're expecting to see? int current_process = 0; pair current_node_range = node_ranges[current_process]; int edge_counter = 0; for (int i = 0; i < total_num_edges; ++i) { if (getline(&line, &len, fp) == -1) break; int fst, snd; sscanf(line, "%d %d", &fst, &snd); if (fst >= current_node_range.snd) { if (current_process == 0) { num_my_edges = edge_counter; my_edges = (pair *)calloc(num_my_edges, sizeof(pair)); memcpy(my_edges, all_edges.ptr, edge_counter * sizeof(pair)); } else { MPI_Send(&edge_counter, 1, MPI_INT, current_process, TAG_SEND_NUM_EDGES, MPI_COMM_WORLD); MPI_Send(all_edges.ptr, edge_counter, IntPairType, current_process, TAG_SEND_EDGES, MPI_COMM_WORLD); } // We're starting on the next process current_process += 1; current_node_range = node_ranges[current_process]; edge_counter = 0; pair_vector_clear(&all_edges); } pair_vector_push(&all_edges, fst, snd); edge_counter += 1; } // We have to send the last one again here, since it didn't get caught in // the loop above if (current_process == 0) { num_my_edges = edge_counter; my_edges = (pair *)calloc(num_my_edges, sizeof(pair)); memcpy(my_edges, all_edges.ptr, edge_counter * sizeof(pair)); } else { MPI_Send(&edge_counter, 1, MPI_INT, current_process, TAG_SEND_NUM_EDGES, MPI_COMM_WORLD); MPI_Send(all_edges.ptr, edge_counter, IntPairType, current_process, TAG_SEND_EDGES, MPI_COMM_WORLD); } free(all_edges.ptr); } else { MPI_Recv(&num_my_edges, 1, MPI_INT, 0, TAG_SEND_NUM_EDGES, MPI_COMM_WORLD, NULL); my_edges = (pair *)calloc(num_my_edges, sizeof(pair)); MPI_Recv(my_edges, num_my_edges, IntPairType, 0, TAG_SEND_EDGES, MPI_COMM_WORLD, NULL); } if (rank == 0) { fclose(fp); if (line) free(line); } #pragma endregion if (rank == 0) printf("Params: p=%d, |E|=%d, |V|=%d\n", p, total_num_nodes, total_num_edges); // STEP 2 TIMER STARTS HERE MPI_Barrier(MPI_COMM_WORLD); double step_2_start_time; if (rank == 0) step_2_start_time = MPI_Wtime(); // Each process analyzes the non-local edges that are contained in its portion // of the graph. #pragma region int node_label_assignment_vec[num_my_nodes]; pair my_node_range = node_ranges[rank]; // Initial node assignment for (int idx = 0; idx < num_my_nodes; ++idx) { node_label_assignment_vec[idx] = my_node_range.fst + idx; } std::map> adj; std::set non_local_nodes; std::set> non_local_edges; for (int i = 0; i < num_my_edges; ++i) { pair edge = my_edges[i]; adj[edge.fst].insert(edge.snd); if (!(my_node_range.fst <= edge.fst && edge.fst < my_node_range.snd)) { non_local_nodes.insert(edge.fst); non_local_edges.insert(std::make_pair(edge.snd, edge.fst)); } if (!(my_node_range.fst <= edge.snd && edge.snd < my_node_range.snd)) { non_local_nodes.insert(edge.snd); non_local_edges.insert(std::make_pair(edge.fst, edge.snd)); } } #pragma endregion // Each process determines which processors stores the non-local vertices // corresponding to the non-local edges. #pragma region std::map> send_map; std::map> recv_map; for (auto entry : non_local_edges) { int local_node = entry.first, remote_node = entry.second; int remote_process = remote_node / each_num_nodes; // The last process gets some extra nodes if (remote_process >= p) remote_process = p - 1; send_map[remote_process].insert(local_node); recv_map[remote_process].insert(remote_node); } #pragma endregion // All the processes are communicating to figure out which process needs to // send what data to the other processes. #pragma region #pragma endregion // STEP 5 TIMER STARTS HERE MPI_Barrier(MPI_COMM_WORLD); double step_5_start_time; if (rank == 0) { step_5_start_time = MPI_Wtime(); } // The processes perform the transfers of non-local labels and updates of // local labels until convergence. #pragma region while (true) { // First, exchange the data that needs to be exchanged int sendbuf[num_my_nodes]; int send_counts[p]; int send_displs[p]; int recv_counts[p]; int recv_displs[p]; std::map remote_labels; if (p > 1) { int recv_total; { int offset = 0; for (int i = 0; i < p; ++i) { int count = send_map[i].size(); for (auto local_node : send_map[i]) { sendbuf[offset + local_node - my_node_range.fst] = node_label_assignment_vec[local_node - my_node_range.fst]; } send_counts[i] = count; send_displs[i] = offset; offset += count; } offset = 0; for (int i = 0; i < p; ++i) { int count = recv_map[i].size(); recv_counts[i] = count; recv_displs[i] = offset; offset += count; } recv_total = offset; } int recvbuf[recv_total]; MPI_Alltoallv(sendbuf, send_counts, send_displs, MPI_INT, recvbuf, recv_counts, recv_displs, MPI_INT, MPI_COMM_WORLD); // Cache efficiently for (int i = 0; i < p; ++i) { std::vector processor_nodes(recv_map[i].begin(), recv_map[i].end()); for (int j = 0; j < recv_counts[i]; ++j) { int remote_node = processor_nodes[j]; int remote_value = recvbuf[recv_displs[i] + j]; remote_labels[remote_node] = remote_value; } } } // For each local node, determine the minimum label out of its neighbors std::map new_labels; for (int i = 0; i < num_my_nodes; ++i) { int node = my_node_range.fst + i; // int current_value = total_node_label_assignment[i]; int current_value = node_label_assignment_vec[i]; int min = current_value; for (auto neighbor : adj[node]) { int neighbor_value; if (my_node_range.fst <= neighbor && neighbor < my_node_range.snd) { neighbor_value = node_label_assignment_vec[neighbor - my_node_range.fst]; } else { neighbor_value = remote_labels[neighbor]; } // = lookup_assignment( // node_label_assignment_vec, my_node_range, recv_map, // recvbuf.data(), recv_counts.data(), recv_displs.data(), // each_num_nodes, rank, neighbor); min = MIN(min, neighbor_value); } if (min < current_value) { new_labels[i] = min; } } // Have there been any changes in the labels? int num_changes = new_labels.size(); int total_changes; MPI_Allreduce(&num_changes, &total_changes, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); if (total_changes == 0) { break; } // Update the original node assignment for (auto entry : new_labels) { node_label_assignment_vec[entry.first] = entry.second; } if (rank == 0) printf("total changes: %d\n", total_changes); } #pragma endregion // END TIMERS MPI_Barrier(MPI_COMM_WORLD); double end_time; if (rank == 0) end_time = MPI_Wtime(); if (rank == 0) { printf("2-5 Time: %0.04fs\n", end_time - step_2_start_time); printf("5 Time: %0.04fs\n", end_time - step_5_start_time); } // The results are gathered to a single process, which writes them to the // disk. #pragma region if (rank == 0) { FILE *fp = fopen(argv[2], "w"); std::map label_count; for (int process_idx = 0; process_idx < p; ++process_idx) { pair this_node_range = node_ranges[process_idx]; int count = this_node_range.snd - this_node_range.fst; if (process_idx == 0) { for (int j = 0; j < count; ++j) { fprintf(fp, "%d\n", node_label_assignment_vec[j]); label_count[node_label_assignment_vec[j]]++; } } else { int recvbuf[count]; MPI_Recv(&recvbuf, count, MPI_INT, process_idx, TAG_SEND_FINAL_RESULT, MPI_COMM_WORLD, NULL); for (int j = 0; j < count; ++j) { fprintf(fp, "%d\n", recvbuf[j]); label_count[recvbuf[j]]++; } } } printf("%d\n", label_count.size()); } else { MPI_Send(node_label_assignment_vec, num_my_nodes, MPI_INT, 0, TAG_SEND_FINAL_RESULT, MPI_COMM_WORLD); } #pragma endregion MPI_Finalize(); return 0; } void init_pair_type(MPI_Datatype *out) { int blocklengths[2] = {1, 1}; MPI_Datatype types[2] = {MPI_INT, MPI_INT}; MPI_Aint offsets[2]; offsets[0] = offsetof(pair, fst); offsets[1] = offsetof(pair, snd); MPI_Type_create_struct(2, blocklengths, offsets, types, out); MPI_Type_commit(out); } void pair_vector_init(struct pair_vector *v) { const int INITIAL = 100; v->ptr = (pair *)malloc(INITIAL * sizeof(pair)); v->cap = INITIAL; v->len = 0; } void pair_vector_clear(struct pair_vector *v) { v->len = 0; } void pair_vector_push(struct pair_vector *v, int fst, int snd) { if (v->len == v->cap) { v->cap *= 2; pair *new_loc = (pair *)malloc(v->cap * sizeof(pair)); for (int i = 0; i < v->len; ++i) { new_loc[i].fst = v->ptr[i].fst; new_loc[i].snd = v->ptr[i].snd; } free(v->ptr); v->ptr = new_loc; } v->ptr[v->len].fst = fst; v->ptr[v->len].snd = snd; v->len++; } pair compute_node_range(int p, int total_num_nodes, int each_num_nodes, int process) { int start = process * each_num_nodes; int end = process == p - 1 ? total_num_nodes : start + each_num_nodes; return {.fst = start, .snd = end}; } int lookup_assignment(int *base_node_assignment, pair my_node_range, std::map> recv_map, int *recvbuf, int *recv_counts, int *recv_displs, int each_num_nodes, int rank, int node_number) { int process_from = node_number / each_num_nodes; // Just return from local if local if (process_from == rank) return base_node_assignment[node_number - my_node_range.fst]; int count = recv_counts[process_from]; int displs = recv_displs[process_from]; // Determine what index this node is int index = -1, ctr = 0; std::vector inner(recv_map[process_from].begin(), recv_map[process_from].end()); { // Use binary search... int lo = 0, hi = count; while (lo < hi) { int mid = (lo + hi) / 2; int midk = inner[mid]; if (node_number < midk) hi = mid; else if (node_number > midk) lo = mid; else { index = mid; break; } } } // for (int i = 0; i < count; ++i) { // int remote_node = inner[i]; // if (node_number == remote_node) { // index = ctr; // break; // } // ctr++; // } // Pull the corresponding value from the map return recvbuf[recv_displs[process_from] + index]; }