#include #include #include #include // https://stackoverflow.com/a/75458495 #define check_mpi_error(n) __check_mpi_error(__FILE__, __LINE__, n) void __check_mpi_error(const char *file, const int line, const int n) { char errbuffer[MPI_MAX_ERROR_STRING]; int errlen; if (n != MPI_SUCCESS) { MPI_Error_string(n, errbuffer, &errlen); printf("MPI-error: %s\n", errbuffer); printf("Location: %s:%i\n", file, line); MPI_Abort(MPI_COMM_WORLD, n); } } #define ORDER_FORWARDS 1 #define ORDER_BACKWARDS 2 #define CTL_SIZE 4 #define ROOT_RANK 0 #define GENERIC_MAX(x, y) ((x) > (y) ? (x) : (y)) #define GENERIC_MIN(x, y) ((x) < (y) ? (x) : (y)) #define ENSURE_int(i) _Generic((i), int : (i)) #define ENSURE_float(f) _Generic((f), float : (f)) #define MAX(type, x, y) (type) GENERIC_MAX(ENSURE_##type(x), ENSURE_##type(y)) #define MIN(type, x, y) (type) GENERIC_MIN(ENSURE_##type(x), ENSURE_##type(y)) void init_ctl(int *ctl, int len); void local_quicksort(int *arr, int lo, int hi); char *string_of_list(int *arr, int len); void recursive_quicksort(int *integers, int n, int segment_capac, int segment_len, int *integers_out, MPI_Comm comm); int main(int argc, char **argv) { int rank, p; MPI_Init(&argc, &argv); int n = atoi(argv[1]); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &p); // Generate integers int n_over_p = n / p; int integers[n_over_p]; // Minor implementation detail: srand(0) is specially handled by glibc to // behave as if it was called with srand(1). To get around this, I'm seeding // with rank + 1 // // See more: https://stackoverflow.com/a/27386563 srand(rank + 1); for (int i = 0; i < n_over_p; ++i) { integers[i] = rand(); // printf(" - %d\n", integers[i]); } // printf("[%d,9999999999] GENERATED INTEGERS: %s\n", rank, // string_of_list(integers, n_over_p)); int new_integers[n_over_p]; recursive_quicksort(integers, n, n_over_p, n_over_p, new_integers, MPI_COMM_WORLD); // sleep(1); // printf("[%d] after: %s\n", rank, string_of_list(integers, n_over_p)); // The first node is responsible for collecting all the data and then // printing it out to the file MPI_Gather(const void *sendbuf, int // sendcount, MPI_INT, void *recvbuf, // int recvcount, MPI_INT, 0, MPI_COMM_WORLD); int recvbuf[n]; MPI_Gather(new_integers, n_over_p, MPI_INT, recvbuf, n_over_p, MPI_INT, 0, MPI_COMM_WORLD); if (rank == 0) { FILE *fp = fopen(argv[2], "w"); // printf("integers: %s\n", string_of_list(recvbuf, n)); // printf("[%d,-1] ==== FINAL ====\n", rank); for (int i = 0; i < n; i += 1) { fprintf(fp, "%d\n", recvbuf[i]); // printf("[%d,-1] %s\n", rank, // string_of_list(&recvbuf[i * n_over_p], n_over_p)); } fclose(fp); } MPI_Finalize(); // printf("Done.\n"); return 0; } // hi not inclusive void local_quicksort(int *arr, int lo, int hi) { int temp; if (lo >= hi || lo < 0) return; int pivot = arr[hi - 1]; int pivot_idx = lo - 1; for (int j = lo; j < hi; ++j) { if (arr[j] < pivot) { pivot_idx += 1; temp = arr[j]; arr[j] = arr[pivot_idx]; arr[pivot_idx] = temp; } } pivot_idx += 1; temp = arr[hi - 1]; arr[hi - 1] = arr[pivot_idx]; arr[pivot_idx] = temp; // Recursive call local_quicksort(arr, lo, pivot_idx); local_quicksort(arr, pivot_idx + 1, hi); } // char *string_of_list(int *arr, int len) { // char *buffer = calloc(sizeof(char), 1000); // int offset = 0; // Keep track of the current position in the buffer // for (int i = 0; i < len; i++) { // offset += sprintf(buffer + offset, "%d", arr[i]); // if (i < len - 1) { // // Add a separator (e.g., comma or space) if it's not the last element // offset += sprintf(buffer + offset, " "); // } // } // return buffer; // } void recursive_quicksort(int *integers, int total_elems, int segment_capac, int segment_len, int *integers_out, MPI_Comm comm) { int err, rank, p; MPI_Comm_size(comm, &p); MPI_Comm_rank(comm, &rank); // printf( // "[%d,%d] recursive_quicksort([%s], total=%d, capac=%d, len=%d) // {p=%d}\n", rank, total_elems, string_of_list(integers, segment_len), // total_elems, segment_capac, segment_len, p); if (p <= 1) { // Recursion base case: just sort it serially local_quicksort(integers, 0, total_elems); for (int i = 0; i < total_elems; ++i) { integers_out[i] = integers[i]; } // printf("Quicksorted: %s\n", string_of_list(integers, total_elems)); return; } // sleep(1); // printf("\n\n"); // int segment_capac = (total_elems + p - 1) / p; // int segment_len = total_elems / p; // if (rank == ROOT_RANK) // segment_len += total_elems - p * segment_len; // printf("[%d,%d] capac: %d, len: %d\n", rank, total_elems, segment_capac, // segment_len); // printf( // "[%d] :::::::::::::::::::::::::::: RECURSIVE QUICKSORT (n=%d, // n/p=%d)\n", rank, n, n_over_p); // Locally sort // printf("[%d] Numbers before: %s\n", rank, // string_of_list(integers, n_over_p)); local_quicksort(integers, 0, segment_len); // printf("[%d] Numbers after first sort: %s\n", rank, // string_of_list(integers, segment_len)); // Select a pivot. // This pivot is broadcasted to all nodes int pivot; { // First, select a random element int rand_el = integers[rand() % segment_len]; // Gather it int rand_els[p]; MPI_Gather(&rand_el, 1, MPI_INT, rand_els, 1, MPI_INT, ROOT_RANK, comm); // Get the median if (rank == ROOT_RANK) { // Sort local_quicksort(rand_els, 0, p); // printf("[%d,%d] Local quicksort for pivot: %s\n", rank, total_elems, // string_of_list(rand_els, p)); // Get the middle element pivot = rand_els[p / 2]; } MPI_Bcast(&pivot, 1, MPI_INT, ROOT_RANK, comm); } // printf("[%d,%d] Broadcasted pivot: %d\n", rank, total_elems, pivot); // Determine where the boundary between S (lower) and L (higher) lies int boundary = 0; for (int i = 0; i < segment_len; ++i) { if (integers[i] >= pivot) { boundary = i; break; } } // printf("[%d,%d] boundary: %d\n", rank, total_elems, boundary); int S_lo = 0, S_hi = boundary; int L_lo = boundary, L_hi = segment_len; int S_size = S_hi - S_lo, L_size = L_hi - L_lo; // printf("[%d,%d] S: [%d - %d] (%d), L: [%d - %d] (%d)\n", rank, total_elems, // S_lo, S_hi, S_size, L_lo, L_hi, L_size); // Perform global arrangement int S_global_end = -1, L_reverse_end = -1, S_global_max_end = -1; MPI_Scan(&S_size, &S_global_end, 1, MPI_INT, MPI_SUM, comm); MPI_Scan(&L_size, &L_reverse_end, 1, MPI_INT, MPI_SUM, comm); int index; MPI_Scan(&segment_len, &index, 1, MPI_INT, MPI_SUM, comm); // printf("[%d] bruh %d\n", rank, S_global_end); // Get the boundary element between S and L MPI_Allreduce(&S_global_end, &S_global_max_end, 1, MPI_INT, MPI_MAX, comm); int S_global_start = S_global_end - S_size, L_reverse_start = L_reverse_end - L_size, L_global_start = total_elems - L_reverse_end, L_global_end = total_elems - L_reverse_start; // printf("[%d,%d] Prefixed S: [%d - %d) (%d), Prefixed L: [%d - %d) (%d)\n", // rank, total_elems, S_global_start, S_global_end, S_size, // L_global_start, L_global_end, L_size); // Determine which process S's and L's destination will start in, respectively int S_starting_process, L_starting_process; int p_of_split, split_point; // int split_point = S_global_max_end % segment_len; int indexes[p]; { MPI_Allgather(&index, 1, MPI_INT, indexes, 1, MPI_INT, comm); for (int i = 0; i < p; ++i) { int lo = i == 0 ? 0 : indexes[i - 1]; int hi = indexes[i]; if (S_global_start >= lo && S_global_start < hi) S_starting_process = i; if (L_global_start >= lo && L_global_start < hi) L_starting_process = i; if (S_global_max_end >= lo && S_global_max_end < hi) { p_of_split = i; split_point = S_global_max_end - lo; } } // err = MPI_Bcast(&S_starting_process, 1, MPI_INT, ROOT_RANK, comm); // check_mpi_error(err); // err = MPI_Bcast(&L_starting_process, 1, MPI_INT, ROOT_RANK, comm); // check_mpi_error(err); } // printf("[%d,%d] indexes: %s\n", rank, total_elems, // string_of_list(indexes, p)); // printf("[%d,%d] S=%d starts at %d , L=%d starts at %d , indexes: %s\n", // rank, // total_elems, S_global_start, S_starting_process, L_global_start, // L_starting_process, string_of_list(indexes, p)); // S_starting_process = S_global_start / segment_len; // L_starting_process = L_global_start / segment_len; int S_offset = S_global_start % segment_len, L_offset = L_global_start % segment_len; int S_ctl[p * CTL_SIZE]; int L_ctl[p * CTL_SIZE]; int S_send_ctl[p * CTL_SIZE]; int L_send_ctl[p * CTL_SIZE]; int ctl_send_counts[p]; int ctl_send_displs[p]; int send_counts[p]; int send_displs[p]; int recv_counts[p]; int recv_displs[p]; init_ctl(S_ctl, p); init_ctl(L_ctl, p); init_ctl(S_send_ctl, p); init_ctl(L_send_ctl, p); int SPACE = segment_capac; for (int i = 0; i < p; ++i) { send_counts[i] = SPACE; send_displs[i] = i * SPACE; ctl_send_counts[i] = CTL_SIZE; ctl_send_displs[i] = i * CTL_SIZE; recv_counts[i] = CTL_SIZE; recv_displs[i] = i * CTL_SIZE; } // Send S to the correct target if (S_size) { for (int i = S_lo, dest_pos = S_global_start, processor = S_starting_process; i < S_hi;) { int next_break = MIN(int, S_global_end, MIN(int, dest_pos + (S_hi - S_lo), (dest_pos / segment_len) * segment_len + segment_len)); int count = next_break - dest_pos; int from_local_start = i, from_local_end = i + count; int from_global_start = rank * segment_len + from_local_start, from_global_end = from_global_start + count; int to_global_start = dest_pos, to_global_end = dest_pos + count; int to_local_start = to_global_start - processor * segment_len, to_local_end = to_global_end - processor * segment_len; // printf("[%d] S ->> (count=%d), from local [%d..%d] {%d..%d} -to-> " // "p#%d [%d..%d] {%d..%d}\n", // rank, count, from_local_start, from_local_end, // from_global_start, from_global_end, processor, to_local_start, // to_local_end, to_global_start, to_global_end); S_send_ctl[processor * CTL_SIZE] = count; S_send_ctl[processor * CTL_SIZE + 1] = from_global_start; S_send_ctl[processor * CTL_SIZE + 2] = to_local_start; S_send_ctl[processor * CTL_SIZE + 3] = from_local_start; i += count; dest_pos += count; processor += 1; } } MPI_Alltoallv(S_send_ctl, ctl_send_counts, ctl_send_displs, MPI_INT, S_ctl, recv_counts, recv_displs, MPI_INT, comm); // Send L to the correct target if (L_size) { for (int i = L_lo, dest_pos = L_global_start, processor = L_starting_process; i < L_hi;) { int next_break = MIN(int, L_global_end, MIN(int, dest_pos + (L_hi - L_lo), (dest_pos / segment_len) * segment_len + segment_len)); int count = next_break - dest_pos; int from_local_start = i, from_local_end = i + count; int from_global_start = rank * segment_len + from_local_start, from_global_end = from_global_start + count; int to_global_start = dest_pos, to_global_end = dest_pos + count; int to_local_start = to_global_start - processor * segment_len, to_local_end = to_global_end - processor * segment_len; // printf("[%d] L ->> (count=%d), from local [%d..%d] {%d..%d} -to-> " // "p#%d [%d..%d] {%d..%d}\n", // rank, count, from_local_start, from_local_end, // from_global_start, from_global_end, processor, to_local_start, // to_local_end, to_global_start, to_global_end); L_send_ctl[processor * CTL_SIZE] = count; L_send_ctl[processor * CTL_SIZE + 1] = from_global_start; L_send_ctl[processor * CTL_SIZE + 2] = to_local_start; L_send_ctl[processor * CTL_SIZE + 3] = from_local_start; i += count; dest_pos += count; processor += 1; } } MPI_Alltoallv(L_send_ctl, ctl_send_counts, ctl_send_displs, MPI_INT, L_ctl, recv_counts, recv_displs, MPI_INT, comm); // After sending S and L information for (int i = 0; i < p; ++i) { recv_counts[i] = segment_len; recv_displs[i] = i * segment_len; } // printf("[%d,%d] S CTL INFO\n", rank, total_elems); // for (int i = 0; i < p; ++i) { // printf("[%d,%d] [p=%d] (ct=%d)\n", rank, total_elems, i, // S_send_ctl[i * CTL_SIZE]); // } // MPI_Alltoallv(integers, send_counts, send_displs, MPI_INT, // integers_recv_buf, // recv_counts, recv_displs, MPI_INT, MPI_COMM_WORLD); // MPI_Allgather(integers, n_over_p, MPI_INT, integers_recv_buf, n_over_p, // MPI_INT, comm); // printf("[%d] ints: %s\n", rank, string_of_list(integers_recv_buf, n)); // Scheme for all send int integers_recv_2[segment_capac]; int integers_recv_3[segment_capac]; for (int i = 0; i < segment_len; ++i) { integers_recv_2[i] = -1; integers_recv_3[i] = integers[i]; } for (int host_p = 0; host_p < p; ++host_p) { if (rank == host_p) { // Your {S,L}_ctl is a mapping from source_processor -> ctl // Everyone already knows who needs to send to who now for (int sender_p = 0; sender_p < p; ++sender_p) { int S_count = S_ctl[sender_p * CTL_SIZE]; if (S_count > 0) { int to_local_start = S_ctl[sender_p * CTL_SIZE + 2]; int from_local_start = S_ctl[sender_p * CTL_SIZE + 3]; if (sender_p == host_p) { for (int k = 0; k < S_count; ++k) { integers_recv_3[to_local_start + k] = integers[from_local_start + k]; } continue; } // printf("[%d] - S inbound from host %d to [%d..%d] (%d)\n", rank, // sender_p, to_local_start, to_local_start + S_count, // S_count); err = MPI_Recv(&integers_recv_2[to_local_start], S_count, MPI_INT, sender_p, 124, comm, MPI_STATUS_IGNORE); check_mpi_error(err); for (int k = 0; k < S_count; ++k) { integers_recv_3[to_local_start + k] = integers_recv_2[to_local_start + k]; } } } } else { // Your {S,L}_send_ctl contains a mapping from dest_processor -> ctl for (int dest_p = 0; dest_p < p; ++dest_p) { int S_count = S_send_ctl[dest_p * CTL_SIZE]; if (S_count > 0 && dest_p == host_p) { int from_local_start = S_send_ctl[dest_p * CTL_SIZE + 3]; // printf("[%d] - S outbound to host %d from [%d..%d] (%d)\n", rank, // dest_p, from_local_start, from_local_start + S_count, // S_count); MPI_Send(&integers[from_local_start], S_count, MPI_INT, dest_p, 124, comm); } } } } for (int host_p = 0; host_p < p; ++host_p) { if (rank == host_p) { // Your {S,L}_ctl is a mapping from source_processor -> ctl // Everyone already knows who needs to send to who now for (int sender_p = 0; sender_p < p; ++sender_p) { int L_count = L_ctl[sender_p * CTL_SIZE]; if (L_count > 0) { int to_local_start = L_ctl[sender_p * CTL_SIZE + 2]; int from_local_start = L_ctl[sender_p * CTL_SIZE + 3]; if (sender_p == host_p) { for (int k = 0; k < L_count; ++k) { integers_recv_3[to_local_start + k] = integers[from_local_start + k]; } continue; } // printf("[%d] - L inbound from host %d to [%d..%d] (%d)\n", rank, // sender_p, to_local_start, to_local_start + L_count, // L_count); err = MPI_Recv(&integers_recv_2[to_local_start], L_count, MPI_INT, sender_p, 125, comm, MPI_STATUS_IGNORE); check_mpi_error(err); for (int k = 0; k < L_count; ++k) { integers_recv_3[to_local_start + k] = integers_recv_2[to_local_start + k]; } } } } else { // Your {S,L}_send_ctl contains a mapping from dest_processor -> ctl for (int dest_p = 0; dest_p < p; ++dest_p) { int L_count = L_send_ctl[dest_p * CTL_SIZE]; if (L_count > 0 && dest_p == host_p) { int from_local_start = L_send_ctl[dest_p * CTL_SIZE + 3]; // printf("[%d] - L outbound to host %d from [%d..%d] (%d)\n", rank, // dest_p, from_local_start, from_local_start + L_count, // L_count); MPI_Send(&integers[from_local_start], L_count, MPI_INT, dest_p, 125, comm); } } } } // printf("[%d,%d] after: %s\n", rank, total_elems, // string_of_list(integers_recv_3, segment_len)); // printf("[%d,%d] -------------------------------------\n", rank, // total_elems); for (int i = 0; i < segment_len; ++i) { // integers[i] = integers_recv_3[i]; // } // ################################################################################### // SUBDIVIDING // Now, determine which processes should be responsible for taking the S and L // arrays // Specifically, the part where it's split, break the tie to see if it goes // down or up int child_len = segment_len; int difference = segment_len - split_point; int transfer[split_point]; // printf("[%d,%d] p_of_split = %d, split_point = %d => (child_len = %d)\n", // rank, total_elems, p_of_split, split_point, child_len); int has_split = 0; if (p_of_split == 0 || p_of_split == p - 1) { // Super unfortunate, bad pivot } else if (split_point == 0) { // Super lucky, it's split evenly! } else { has_split = 1; // Let's just say that if there's any split, the block itself counts as L // and then add the rest to the previous block if (rank == p_of_split - 1) { child_len += split_point; err = MPI_Recv(transfer, split_point, MPI_INT, p_of_split, 126, comm, MPI_STATUS_IGNORE); check_mpi_error(err); } else if (rank == p_of_split) { child_len = difference; err = MPI_Send(integers, split_point, MPI_INT, p_of_split - 1, 126, comm); check_mpi_error(err); } } // Which group is this child going into? int color; if (rank < p_of_split) color = 100; else color = 200; // printf("[%d,%d] split color = %d, split lenth = %d\n", rank, total_elems, // color, child_len); MPI_Comm child_comm; MPI_Comm_split(comm, color, rank, &child_comm); // Figure out what the max is int max_child_buf_len, total_child_elems; err = MPI_Allreduce(&child_len, &max_child_buf_len, 1, MPI_INT, MPI_MAX, child_comm); check_mpi_error(err); err = MPI_Allreduce(&child_len, &total_child_elems, 1, MPI_INT, MPI_SUM, child_comm); check_mpi_error(err); // printf("[%d] [color=%d] max length = %d, total child elems = %d\n", rank, // color, max_child_buf_len, total_child_elems); // Copy into a new buf int new_buf[max_child_buf_len]; int whichCase = 999; for (int i = 0; i < max_child_buf_len; ++i) { if (has_split && rank == p_of_split - 1) { whichCase = 1001; if (i < segment_len) new_buf[i] = integers_recv_3[i]; else if (i < segment_len + split_point) new_buf[i] = transfer[i - segment_len]; else new_buf[i] = -1; } else if (has_split && rank == p_of_split) { whichCase = 1002; if (i < difference) new_buf[i] = integers_recv_3[i + split_point]; else new_buf[i] = -1; } else { whichCase = 1003; if (i < child_len) new_buf[i] = integers_recv_3[i]; else new_buf[i] = -1; } } // printf("[%d,%d] orig integers: %s\n", rank, total_elems, // string_of_list(integers, segment_len)); // printf("[%d,%d] new buf = %s (has_split = %d, segment_len = %d, case = %d, // " // "child_elems = %d)\n", // rank, total_elems, string_of_list(new_buf, max_child_buf_len), // has_split, segment_len, whichCase, child_len); // printf("[%d,%d] \n", rank, total_elems); int integers_out_buf[total_child_elems]; recursive_quicksort(new_buf, total_child_elems, max_child_buf_len, child_len, integers_out_buf, child_comm); // Ok now copy the new items back switch (whichCase) { case 1001: // In this case, p is right before the split, so it got extra elements // To reverse this, we can send the elements back to the second for (int i = 0; i < total_child_elems; ++i) { if (i < segment_len) integers_out[i] = integers_out_buf[i]; else transfer[i - segment_len] = integers_out_buf[i]; } MPI_Send(transfer, split_point, MPI_INT, p_of_split, 127, comm); break; case 1002: MPI_Recv(transfer, split_point, MPI_INT, p_of_split - 1, 127, comm, MPI_STATUS_IGNORE); for (int i = 0; i < split_point; ++i) { integers_out[i] = transfer[i]; } for (int i = 0; i < total_child_elems; ++i) { integers_out[i + split_point] = integers_out_buf[i]; } break; case 1003: for (int i = 0; i < total_child_elems; ++i) { integers_out[i] = integers_out_buf[i]; } break; } MPI_Comm_free(&child_comm); } void init_ctl(int *ctl, int len) { for (int i = 0; i < len; ++i) { ctl[i * CTL_SIZE] = 0; for (int j = 1; j < CTL_SIZE; ++j) { ctl[i * CTL_SIZE + j] = -1; } } }