csci5451/assignments/02/qs_mpi commented.c

655 lines
22 KiB
C
Raw Permalink Normal View History

2023-10-31 04:37:41 +00:00
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
// https://stackoverflow.com/a/75458495
#define check_mpi_error(n) __check_mpi_error(__FILE__, __LINE__, n)
void __check_mpi_error(const char *file, const int line, const int n) {
char errbuffer[MPI_MAX_ERROR_STRING];
int errlen;
if (n != MPI_SUCCESS) {
MPI_Error_string(n, errbuffer, &errlen);
printf("MPI-error: %s\n", errbuffer);
printf("Location: %s:%i\n", file, line);
MPI_Abort(MPI_COMM_WORLD, n);
}
}
#define ORDER_FORWARDS 1
#define ORDER_BACKWARDS 2
#define CTL_SIZE 4
#define ROOT_RANK 0
#define GENERIC_MAX(x, y) ((x) > (y) ? (x) : (y))
#define GENERIC_MIN(x, y) ((x) < (y) ? (x) : (y))
#define ENSURE_int(i) _Generic((i), int : (i))
#define ENSURE_float(f) _Generic((f), float : (f))
#define MAX(type, x, y) (type) GENERIC_MAX(ENSURE_##type(x), ENSURE_##type(y))
#define MIN(type, x, y) (type) GENERIC_MIN(ENSURE_##type(x), ENSURE_##type(y))
void init_ctl(int *ctl, int len);
void local_quicksort(int *arr, int lo, int hi);
char *string_of_list(int *arr, int len);
void recursive_quicksort(int *integers, int n, int segment_capac,
int segment_len, int *integers_out, MPI_Comm comm);
int main(int argc, char **argv) {
int rank, p;
MPI_Init(&argc, &argv);
int n = atoi(argv[1]);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &p);
// Generate integers
int n_over_p = n / p;
int integers[n_over_p];
// Minor implementation detail: srand(0) is specially handled by glibc to
// behave as if it was called with srand(1). To get around this, I'm seeding
// with rank + 1
//
// See more: https://stackoverflow.com/a/27386563
srand(rank + 1);
for (int i = 0; i < n_over_p; ++i) {
integers[i] = rand();
// printf(" - %d\n", integers[i]);
}
// printf("[%d,9999999999] GENERATED INTEGERS: %s\n", rank,
// string_of_list(integers, n_over_p));
int new_integers[n_over_p];
recursive_quicksort(integers, n, n_over_p, n_over_p, new_integers,
MPI_COMM_WORLD);
// sleep(1);
// printf("[%d] after: %s\n", rank, string_of_list(integers, n_over_p));
// The first node is responsible for collecting all the data and then
// printing it out to the file MPI_Gather(const void *sendbuf, int
// sendcount, MPI_INT, void *recvbuf,
// int recvcount, MPI_INT, 0, MPI_COMM_WORLD);
int recvbuf[n];
MPI_Gather(new_integers, n_over_p, MPI_INT, recvbuf, n_over_p, MPI_INT, 0,
MPI_COMM_WORLD);
if (rank == 0) {
FILE *fp = fopen(argv[2], "w");
// printf("integers: %s\n", string_of_list(recvbuf, n));
// printf("[%d,-1] ==== FINAL ====\n", rank);
for (int i = 0; i < n; i += 1) {
fprintf(fp, "%d\n", recvbuf[i]);
// printf("[%d,-1] %s\n", rank,
// string_of_list(&recvbuf[i * n_over_p], n_over_p));
}
fclose(fp);
}
MPI_Finalize();
// printf("Done.\n");
return 0;
}
// hi not inclusive
void local_quicksort(int *arr, int lo, int hi) {
int temp;
if (lo >= hi || lo < 0)
return;
int pivot = arr[hi - 1];
int pivot_idx = lo - 1;
for (int j = lo; j < hi; ++j) {
if (arr[j] < pivot) {
pivot_idx += 1;
temp = arr[j];
arr[j] = arr[pivot_idx];
arr[pivot_idx] = temp;
}
}
pivot_idx += 1;
temp = arr[hi - 1];
arr[hi - 1] = arr[pivot_idx];
arr[pivot_idx] = temp;
// Recursive call
local_quicksort(arr, lo, pivot_idx);
local_quicksort(arr, pivot_idx + 1, hi);
}
// char *string_of_list(int *arr, int len) {
// char *buffer = calloc(sizeof(char), 1000);
// int offset = 0; // Keep track of the current position in the buffer
// for (int i = 0; i < len; i++) {
// offset += sprintf(buffer + offset, "%d", arr[i]);
// if (i < len - 1) {
// // Add a separator (e.g., comma or space) if it's not the last element
// offset += sprintf(buffer + offset, " ");
// }
// }
// return buffer;
// }
void recursive_quicksort(int *integers, int total_elems, int segment_capac,
int segment_len, int *integers_out, MPI_Comm comm) {
int err, rank, p;
MPI_Comm_size(comm, &p);
MPI_Comm_rank(comm, &rank);
// printf(
// "[%d,%d] recursive_quicksort([%s], total=%d, capac=%d, len=%d)
// {p=%d}\n", rank, total_elems, string_of_list(integers, segment_len),
// total_elems, segment_capac, segment_len, p);
if (p <= 1) {
// Recursion base case: just sort it serially
local_quicksort(integers, 0, total_elems);
for (int i = 0; i < total_elems; ++i) {
integers_out[i] = integers[i];
}
// printf("Quicksorted: %s\n", string_of_list(integers, total_elems));
return;
}
// sleep(1);
// printf("\n\n");
// int segment_capac = (total_elems + p - 1) / p;
// int segment_len = total_elems / p;
// if (rank == ROOT_RANK)
// segment_len += total_elems - p * segment_len;
// printf("[%d,%d] capac: %d, len: %d\n", rank, total_elems, segment_capac,
// segment_len);
// printf(
// "[%d] :::::::::::::::::::::::::::: RECURSIVE QUICKSORT (n=%d,
// n/p=%d)\n", rank, n, n_over_p);
// Locally sort
// printf("[%d] Numbers before: %s\n", rank,
// string_of_list(integers, n_over_p));
local_quicksort(integers, 0, segment_len);
// printf("[%d] Numbers after first sort: %s\n", rank,
// string_of_list(integers, segment_len));
// Select a pivot.
// This pivot is broadcasted to all nodes
int pivot;
{
// First, select a random element
int rand_el = integers[rand() % segment_len];
// Gather it
int rand_els[p];
MPI_Gather(&rand_el, 1, MPI_INT, rand_els, 1, MPI_INT, ROOT_RANK, comm);
// Get the median
if (rank == ROOT_RANK) {
// Sort
local_quicksort(rand_els, 0, p);
// printf("[%d,%d] Local quicksort for pivot: %s\n", rank, total_elems,
// string_of_list(rand_els, p));
// Get the middle element
pivot = rand_els[p / 2];
}
MPI_Bcast(&pivot, 1, MPI_INT, ROOT_RANK, comm);
}
// printf("[%d,%d] Broadcasted pivot: %d\n", rank, total_elems, pivot);
// Determine where the boundary between S (lower) and L (higher) lies
int boundary = 0;
for (int i = 0; i < segment_len; ++i) {
if (integers[i] >= pivot) {
boundary = i;
break;
}
}
// printf("[%d,%d] boundary: %d\n", rank, total_elems, boundary);
int S_lo = 0, S_hi = boundary;
int L_lo = boundary, L_hi = segment_len;
int S_size = S_hi - S_lo, L_size = L_hi - L_lo;
// printf("[%d,%d] S: [%d - %d] (%d), L: [%d - %d] (%d)\n", rank, total_elems,
// S_lo, S_hi, S_size, L_lo, L_hi, L_size);
// Perform global arrangement
int S_global_end = -1, L_reverse_end = -1, S_global_max_end = -1;
MPI_Scan(&S_size, &S_global_end, 1, MPI_INT, MPI_SUM, comm);
MPI_Scan(&L_size, &L_reverse_end, 1, MPI_INT, MPI_SUM, comm);
int index;
MPI_Scan(&segment_len, &index, 1, MPI_INT, MPI_SUM, comm);
// printf("[%d] bruh %d\n", rank, S_global_end);
// Get the boundary element between S and L
MPI_Allreduce(&S_global_end, &S_global_max_end, 1, MPI_INT, MPI_MAX, comm);
int S_global_start = S_global_end - S_size,
L_reverse_start = L_reverse_end - L_size,
L_global_start = total_elems - L_reverse_end,
L_global_end = total_elems - L_reverse_start;
// printf("[%d,%d] Prefixed S: [%d - %d) (%d), Prefixed L: [%d - %d) (%d)\n",
// rank, total_elems, S_global_start, S_global_end, S_size,
// L_global_start, L_global_end, L_size);
// Determine which process S's and L's destination will start in, respectively
int S_starting_process, L_starting_process;
int p_of_split, split_point;
// int split_point = S_global_max_end % segment_len;
int indexes[p];
{
MPI_Allgather(&index, 1, MPI_INT, indexes, 1, MPI_INT, comm);
for (int i = 0; i < p; ++i) {
int lo = i == 0 ? 0 : indexes[i - 1];
int hi = indexes[i];
if (S_global_start >= lo && S_global_start < hi)
S_starting_process = i;
if (L_global_start >= lo && L_global_start < hi)
L_starting_process = i;
if (S_global_max_end >= lo && S_global_max_end < hi) {
p_of_split = i;
split_point = S_global_max_end - lo;
}
}
// err = MPI_Bcast(&S_starting_process, 1, MPI_INT, ROOT_RANK, comm);
// check_mpi_error(err);
// err = MPI_Bcast(&L_starting_process, 1, MPI_INT, ROOT_RANK, comm);
// check_mpi_error(err);
}
// printf("[%d,%d] indexes: %s\n", rank, total_elems,
// string_of_list(indexes, p));
// printf("[%d,%d] S=%d starts at %d , L=%d starts at %d , indexes: %s\n",
// rank,
// total_elems, S_global_start, S_starting_process, L_global_start,
// L_starting_process, string_of_list(indexes, p));
// S_starting_process = S_global_start / segment_len;
// L_starting_process = L_global_start / segment_len;
int S_offset = S_global_start % segment_len,
L_offset = L_global_start % segment_len;
int S_ctl[p * CTL_SIZE];
int L_ctl[p * CTL_SIZE];
int S_send_ctl[p * CTL_SIZE];
int L_send_ctl[p * CTL_SIZE];
int ctl_send_counts[p];
int ctl_send_displs[p];
int send_counts[p];
int send_displs[p];
int recv_counts[p];
int recv_displs[p];
init_ctl(S_ctl, p);
init_ctl(L_ctl, p);
init_ctl(S_send_ctl, p);
init_ctl(L_send_ctl, p);
int SPACE = segment_capac;
for (int i = 0; i < p; ++i) {
send_counts[i] = SPACE;
send_displs[i] = i * SPACE;
ctl_send_counts[i] = CTL_SIZE;
ctl_send_displs[i] = i * CTL_SIZE;
recv_counts[i] = CTL_SIZE;
recv_displs[i] = i * CTL_SIZE;
}
// Send S to the correct target
if (S_size) {
for (int i = S_lo, dest_pos = S_global_start,
processor = S_starting_process;
i < S_hi;) {
int next_break =
MIN(int, S_global_end,
MIN(int, dest_pos + (S_hi - S_lo),
(dest_pos / segment_len) * segment_len + segment_len));
int count = next_break - dest_pos;
int from_local_start = i, from_local_end = i + count;
int from_global_start = rank * segment_len + from_local_start,
from_global_end = from_global_start + count;
int to_global_start = dest_pos, to_global_end = dest_pos + count;
int to_local_start = to_global_start - processor * segment_len,
to_local_end = to_global_end - processor * segment_len;
// printf("[%d] S ->> (count=%d), from local [%d..%d] {%d..%d} -to-> "
// "p#%d [%d..%d] {%d..%d}\n",
// rank, count, from_local_start, from_local_end,
// from_global_start, from_global_end, processor, to_local_start,
// to_local_end, to_global_start, to_global_end);
S_send_ctl[processor * CTL_SIZE] = count;
S_send_ctl[processor * CTL_SIZE + 1] = from_global_start;
S_send_ctl[processor * CTL_SIZE + 2] = to_local_start;
S_send_ctl[processor * CTL_SIZE + 3] = from_local_start;
i += count;
dest_pos += count;
processor += 1;
}
}
MPI_Alltoallv(S_send_ctl, ctl_send_counts, ctl_send_displs, MPI_INT, S_ctl,
recv_counts, recv_displs, MPI_INT, comm);
// Send L to the correct target
if (L_size) {
for (int i = L_lo, dest_pos = L_global_start,
processor = L_starting_process;
i < L_hi;) {
int next_break =
MIN(int, L_global_end,
MIN(int, dest_pos + (L_hi - L_lo),
(dest_pos / segment_len) * segment_len + segment_len));
int count = next_break - dest_pos;
int from_local_start = i, from_local_end = i + count;
int from_global_start = rank * segment_len + from_local_start,
from_global_end = from_global_start + count;
int to_global_start = dest_pos, to_global_end = dest_pos + count;
int to_local_start = to_global_start - processor * segment_len,
to_local_end = to_global_end - processor * segment_len;
// printf("[%d] L ->> (count=%d), from local [%d..%d] {%d..%d} -to-> "
// "p#%d [%d..%d] {%d..%d}\n",
// rank, count, from_local_start, from_local_end,
// from_global_start, from_global_end, processor, to_local_start,
// to_local_end, to_global_start, to_global_end);
L_send_ctl[processor * CTL_SIZE] = count;
L_send_ctl[processor * CTL_SIZE + 1] = from_global_start;
L_send_ctl[processor * CTL_SIZE + 2] = to_local_start;
L_send_ctl[processor * CTL_SIZE + 3] = from_local_start;
i += count;
dest_pos += count;
processor += 1;
}
}
MPI_Alltoallv(L_send_ctl, ctl_send_counts, ctl_send_displs, MPI_INT, L_ctl,
recv_counts, recv_displs, MPI_INT, comm);
// After sending S and L information
for (int i = 0; i < p; ++i) {
recv_counts[i] = segment_len;
recv_displs[i] = i * segment_len;
}
// printf("[%d,%d] S CTL INFO\n", rank, total_elems);
// for (int i = 0; i < p; ++i) {
// printf("[%d,%d] [p=%d] (ct=%d)\n", rank, total_elems, i,
// S_send_ctl[i * CTL_SIZE]);
// }
// MPI_Alltoallv(integers, send_counts, send_displs, MPI_INT,
// integers_recv_buf,
// recv_counts, recv_displs, MPI_INT, MPI_COMM_WORLD);
// MPI_Allgather(integers, n_over_p, MPI_INT, integers_recv_buf, n_over_p,
// MPI_INT, comm);
// printf("[%d] ints: %s\n", rank, string_of_list(integers_recv_buf, n));
// Scheme for all send
int integers_recv_2[segment_capac];
int integers_recv_3[segment_capac];
for (int i = 0; i < segment_len; ++i) {
integers_recv_2[i] = -1;
integers_recv_3[i] = integers[i];
}
for (int host_p = 0; host_p < p; ++host_p) {
if (rank == host_p) {
// Your {S,L}_ctl is a mapping from source_processor -> ctl
// Everyone already knows who needs to send to who now
for (int sender_p = 0; sender_p < p; ++sender_p) {
int S_count = S_ctl[sender_p * CTL_SIZE];
if (S_count > 0) {
int to_local_start = S_ctl[sender_p * CTL_SIZE + 2];
int from_local_start = S_ctl[sender_p * CTL_SIZE + 3];
if (sender_p == host_p) {
for (int k = 0; k < S_count; ++k) {
integers_recv_3[to_local_start + k] =
integers[from_local_start + k];
}
continue;
}
// printf("[%d] - S inbound from host %d to [%d..%d] (%d)\n", rank,
// sender_p, to_local_start, to_local_start + S_count,
// S_count);
err = MPI_Recv(&integers_recv_2[to_local_start], S_count, MPI_INT,
sender_p, 124, comm, MPI_STATUS_IGNORE);
check_mpi_error(err);
for (int k = 0; k < S_count; ++k) {
integers_recv_3[to_local_start + k] =
integers_recv_2[to_local_start + k];
}
}
}
} else {
// Your {S,L}_send_ctl contains a mapping from dest_processor -> ctl
for (int dest_p = 0; dest_p < p; ++dest_p) {
int S_count = S_send_ctl[dest_p * CTL_SIZE];
if (S_count > 0 && dest_p == host_p) {
int from_local_start = S_send_ctl[dest_p * CTL_SIZE + 3];
// printf("[%d] - S outbound to host %d from [%d..%d] (%d)\n", rank,
// dest_p, from_local_start, from_local_start + S_count,
// S_count);
MPI_Send(&integers[from_local_start], S_count, MPI_INT, dest_p, 124,
comm);
}
}
}
}
for (int host_p = 0; host_p < p; ++host_p) {
if (rank == host_p) {
// Your {S,L}_ctl is a mapping from source_processor -> ctl
// Everyone already knows who needs to send to who now
for (int sender_p = 0; sender_p < p; ++sender_p) {
int L_count = L_ctl[sender_p * CTL_SIZE];
if (L_count > 0) {
int to_local_start = L_ctl[sender_p * CTL_SIZE + 2];
int from_local_start = L_ctl[sender_p * CTL_SIZE + 3];
if (sender_p == host_p) {
for (int k = 0; k < L_count; ++k) {
integers_recv_3[to_local_start + k] =
integers[from_local_start + k];
}
continue;
}
// printf("[%d] - L inbound from host %d to [%d..%d] (%d)\n", rank,
// sender_p, to_local_start, to_local_start + L_count,
// L_count);
err = MPI_Recv(&integers_recv_2[to_local_start], L_count, MPI_INT,
sender_p, 125, comm, MPI_STATUS_IGNORE);
check_mpi_error(err);
for (int k = 0; k < L_count; ++k) {
integers_recv_3[to_local_start + k] =
integers_recv_2[to_local_start + k];
}
}
}
} else {
// Your {S,L}_send_ctl contains a mapping from dest_processor -> ctl
for (int dest_p = 0; dest_p < p; ++dest_p) {
int L_count = L_send_ctl[dest_p * CTL_SIZE];
if (L_count > 0 && dest_p == host_p) {
int from_local_start = L_send_ctl[dest_p * CTL_SIZE + 3];
// printf("[%d] - L outbound to host %d from [%d..%d] (%d)\n", rank,
// dest_p, from_local_start, from_local_start + L_count,
// L_count);
MPI_Send(&integers[from_local_start], L_count, MPI_INT, dest_p, 125,
comm);
}
}
}
}
// printf("[%d,%d] after: %s\n", rank, total_elems,
// string_of_list(integers_recv_3, segment_len));
// printf("[%d,%d] -------------------------------------\n", rank,
// total_elems); for (int i = 0; i < segment_len; ++i) {
// integers[i] = integers_recv_3[i];
// }
// ###################################################################################
// SUBDIVIDING
// Now, determine which processes should be responsible for taking the S and L
// arrays
// Specifically, the part where it's split, break the tie to see if it goes
// down or up
int child_len = segment_len;
int difference = segment_len - split_point;
int transfer[split_point];
// printf("[%d,%d] p_of_split = %d, split_point = %d => (child_len = %d)\n",
// rank, total_elems, p_of_split, split_point, child_len);
int has_split = 0;
if (p_of_split == 0 || p_of_split == p - 1) {
// Super unfortunate, bad pivot
} else if (split_point == 0) {
// Super lucky, it's split evenly!
} else {
has_split = 1;
// Let's just say that if there's any split, the block itself counts as L
// and then add the rest to the previous block
if (rank == p_of_split - 1) {
child_len += split_point;
err = MPI_Recv(transfer, split_point, MPI_INT, p_of_split, 126, comm,
MPI_STATUS_IGNORE);
check_mpi_error(err);
} else if (rank == p_of_split) {
child_len = difference;
err = MPI_Send(integers, split_point, MPI_INT, p_of_split - 1, 126, comm);
check_mpi_error(err);
}
}
// Which group is this child going into?
int color;
if (rank < p_of_split)
color = 100;
else
color = 200;
// printf("[%d,%d] split color = %d, split lenth = %d\n", rank, total_elems,
// color, child_len);
MPI_Comm child_comm;
MPI_Comm_split(comm, color, rank, &child_comm);
// Figure out what the max is
int max_child_buf_len, total_child_elems;
err = MPI_Allreduce(&child_len, &max_child_buf_len, 1, MPI_INT, MPI_MAX,
child_comm);
check_mpi_error(err);
err = MPI_Allreduce(&child_len, &total_child_elems, 1, MPI_INT, MPI_SUM,
child_comm);
check_mpi_error(err);
// printf("[%d] [color=%d] max length = %d, total child elems = %d\n", rank,
// color, max_child_buf_len, total_child_elems);
// Copy into a new buf
int new_buf[max_child_buf_len];
int whichCase = 999;
for (int i = 0; i < max_child_buf_len; ++i) {
if (has_split && rank == p_of_split - 1) {
whichCase = 1001;
if (i < segment_len)
new_buf[i] = integers_recv_3[i];
else if (i < segment_len + split_point)
new_buf[i] = transfer[i - segment_len];
else
new_buf[i] = -1;
} else if (has_split && rank == p_of_split) {
whichCase = 1002;
if (i < difference)
new_buf[i] = integers_recv_3[i + split_point];
else
new_buf[i] = -1;
} else {
whichCase = 1003;
if (i < child_len)
new_buf[i] = integers_recv_3[i];
else
new_buf[i] = -1;
}
}
// printf("[%d,%d] orig integers: %s\n", rank, total_elems,
// string_of_list(integers, segment_len));
// printf("[%d,%d] new buf = %s (has_split = %d, segment_len = %d, case = %d,
// "
// "child_elems = %d)\n",
// rank, total_elems, string_of_list(new_buf, max_child_buf_len),
// has_split, segment_len, whichCase, child_len);
// printf("[%d,%d] \n", rank, total_elems);
int integers_out_buf[total_child_elems];
recursive_quicksort(new_buf, total_child_elems, max_child_buf_len, child_len,
integers_out_buf, child_comm);
// Ok now copy the new items back
switch (whichCase) {
case 1001:
// In this case, p is right before the split, so it got extra elements
// To reverse this, we can send the elements back to the second
for (int i = 0; i < total_child_elems; ++i) {
if (i < segment_len)
integers_out[i] = integers_out_buf[i];
else
transfer[i - segment_len] = integers_out_buf[i];
}
MPI_Send(transfer, split_point, MPI_INT, p_of_split, 127, comm);
break;
case 1002:
MPI_Recv(transfer, split_point, MPI_INT, p_of_split - 1, 127, comm,
MPI_STATUS_IGNORE);
for (int i = 0; i < split_point; ++i) {
integers_out[i] = transfer[i];
}
for (int i = 0; i < total_child_elems; ++i) {
integers_out[i + split_point] = integers_out_buf[i];
}
break;
case 1003:
for (int i = 0; i < total_child_elems; ++i) {
integers_out[i] = integers_out_buf[i];
}
break;
}
MPI_Comm_free(&child_comm);
}
void init_ctl(int *ctl, int len) {
for (int i = 0; i < len; ++i) {
ctl[i * CTL_SIZE] = 0;
for (int j = 1; j < CTL_SIZE; ++j) {
ctl[i * CTL_SIZE + j] = -1;
}
}
}