ok it works

This commit is contained in:
Michael Zhang 2023-11-25 02:05:20 +00:00
parent a373fd4b83
commit ca96bf8b2d
10 changed files with 3312262 additions and 126 deletions

View file

@ -15,6 +15,7 @@ RUN apt update -y && apt install -y --no-install-recommends \
git \
libomp-dev \
libopenmpi-dev \
libfmt-dev \
openmpi-bin \
pandoc \
pkg-config \

View file

@ -1,7 +1,13 @@
.PHONY: run
.PHONY: run clean
lpa: lpa.c
mpicc -o $@ -g $<
CFLAGS += -DFMT_HEADER_ONLY
LDFLAGS += $(shell pkg-config --libs fmt)
clean:
rm lpa
lpa: lpa.cpp
mpic++ $(CFLAGS) $(LDFLAGS) -o $@ -g $<
run:
watchexec -c clear 'make lpa && mpirun -n 4 ./lpa dataset/1000.txt'
watchexec -c clear 'make lpa && mpirun -n 4 ./lpa dataset/both_1000.txt'

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,5 @@
4 4
0 1
1 0
2 3
3 2

View file

@ -0,0 +1,17 @@
import sys
with open(sys.argv[1]) as f:
num_nodes, num_edges = map(int, f.readline().strip().split(" "))
all_edges = set()
for _ in range(num_edges):
from_edge, to_edge = map(int, f.readline().strip().split(" "))
all_edges.add((from_edge, to_edge))
all_edges.add((to_edge, from_edge))
all_edges = sorted(list(all_edges))
new_num_edges = len(all_edges)
with open(sys.argv[2], "w") as f:
f.write(f"{num_nodes} {new_num_edges}\n")
for from_edge, to_edge in all_edges:
f.write(f"{from_edge} {to_edge}\n")

View file

@ -1,122 +0,0 @@
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
typedef struct {
int fst;
int snd;
} pair;
void init_pair_type(MPI_Datatype *out);
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int rank, p;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &p);
MPI_Datatype IntPairType;
init_pair_type(&IntPairType);
// One process reads the file and distributes the data to the other processes
// using a 1D decomposition (each rank gets approx same number of vertices).
#pragma region
FILE *fp;
char *line = NULL;
size_t len;
ssize_t read;
pair params;
if (rank == 0) {
printf("Hello\n");
fp = fopen(argv[1], "r");
if ((read = getline(&line, &len, fp)) != -1)
sscanf(line, "%d %d", &params.fst, &params.snd);
}
// Send this pair
MPI_Bcast(&params, 1, IntPairType, 0, MPI_COMM_WORLD);
int num_nodes = params.fst;
int num_edges = params.snd;
int max_num_my_edges = (num_edges / p) + p;
pair my_edges[max_num_my_edges];
// Read the edges
pair edges[num_edges];
int my_count;
int counts[p], displs[p];
if (rank == 0) {
line = NULL;
for (int i = 0; i < num_edges; ++i) {
getline(&line, &len, fp);
sscanf(line, "%d %d", &edges[i].fst, &edges[i].snd);
}
int step = num_edges / p;
for (int i = 0; i < p; ++i) {
int start = i * step;
int end = i == p - 1 ? num_edges : start + step;
int count = end - start;
counts[i] = count;
displs[i] = start;
}
}
MPI_Scatter(counts, 1, MPI_INT, &my_count, 1, MPI_INT, 0, MPI_COMM_WORLD);
printf("[%d] #: %d\n", rank, my_count);
MPI_Scatterv(edges, counts, displs, IntPairType, my_edges, my_count,
IntPairType, 0, MPI_COMM_WORLD);
if (rank == 0) {
fclose(fp);
if (line)
free(line);
}
#pragma endregion
// Each process analyzes the non-local edges that are contained in its portion
// of the graph.
#pragma region
#pragma endregion
// Each process determines which processors stores the non-local vertices
// corresponding to the non-local edges.
#pragma region
#pragma endregion
// All the processes are communicating to figure out which process needs to
// send what data to the other processes.
#pragma region
#pragma endregion
// The processes perform the transfers of non-local labels and updates of
// local labels until convergence.
#pragma region
#pragma endregion
// The results are gathered to a single process, which writes them to the
// disk.
#pragma region
#pragma endregion
MPI_Finalize();
return 0;
}
void init_pair_type(MPI_Datatype *out) {
int blocklengths[2] = {1, 1};
MPI_Datatype types[2] = {MPI_INT, MPI_INT};
MPI_Aint offsets[2];
offsets[0] = offsetof(pair, fst);
offsets[1] = offsetof(pair, snd);
MPI_Type_create_struct(2, blocklengths, offsets, types, out);
MPI_Type_commit(out);
}

357
assignments/03/lpa.cpp Normal file
View file

@ -0,0 +1,357 @@
#include <algorithm>
#include <array>
#include <cstring>
#include <functional>
#include <limits>
#include <map>
#include <set>
#include <vector>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <utility>
#include <fmt/format.h>
#include <fmt/ranges.h>
#define TAG_SEND_NUM_EDGES 1001
#define TAG_SEND_EDGES 1002
#define TAG_SEND_FINAL_RESULT 1003
typedef struct {
int fst;
int snd;
} pair;
void init_pair_type(MPI_Datatype *out);
int main(int argc, char **argv) {
MPI::Init(argc, argv);
int rank = MPI::COMM_WORLD.Get_rank(), p = MPI::COMM_WORLD.Get_size();
MPI_Datatype IntPairType;
init_pair_type(&IntPairType);
// One process reads the file and distributes the data to the other processes
// using a 1D decomposition (each rank gets approx same number of vertices).
#pragma region
FILE *fp;
char *line = NULL;
size_t len;
ssize_t read;
pair params;
if (rank == 0) {
printf("Hello\n");
fp = fopen(argv[1], "r");
if ((read = getline(&line, &len, fp)) != -1)
sscanf(line, "%d %d", &params.fst, &params.snd);
}
// Send the params
MPI_Bcast(&params, 1, IntPairType, 0, MPI::COMM_WORLD);
int total_num_nodes = params.fst;
int total_num_edges = params.snd;
int each_num_nodes = total_num_nodes / p;
// Calculate exactly how many nodes my current process holds
int num_my_nodes =
rank == p - 1 ? total_num_nodes - rank * each_num_nodes : each_num_nodes;
int my_nodes[num_my_nodes];
std::function<std::pair<int, int>(int)> node_range =
[p, total_num_nodes, each_num_nodes](int process) {
int start = process * each_num_nodes;
int end = process == p - 1 ? total_num_nodes : start + each_num_nodes;
return std::make_pair(start, end);
};
// Read the edges
int num_my_edges;
pair *my_edges;
int counts[p], displs[p];
if (rank == 0) {
line = NULL;
pair all_edges[total_num_edges];
// For the current process, what's the last node we're expecting to see?
int current_process = 0;
std::pair<int, int> current_node_range = node_range(current_process);
int edge_counter = 0;
for (int i = 0; i < total_num_edges; ++i) {
getline(&line, &len, fp);
int fst, snd;
sscanf(line, "%d %d", &fst, &snd);
if (fst >= current_node_range.second) {
if (current_process == 0) {
num_my_edges = edge_counter;
my_edges = (pair *)calloc(num_my_edges, sizeof(pair));
memcpy(my_edges, all_edges, edge_counter * sizeof(pair));
} else {
MPI_Send(&edge_counter, 1, MPI_INT, current_process,
TAG_SEND_NUM_EDGES, MPI::COMM_WORLD);
MPI_Send(all_edges, edge_counter, IntPairType, current_process,
TAG_SEND_EDGES, MPI::COMM_WORLD);
}
// We're starting on the next process
current_process += 1;
current_node_range = node_range(current_process);
edge_counter = 0;
}
all_edges[edge_counter].fst = fst;
all_edges[edge_counter].snd = snd;
edge_counter += 1;
}
// We have to send the last one again here, since it didn't get caught in
// the loop above
MPI_Send(&edge_counter, 1, MPI_INT, current_process, TAG_SEND_NUM_EDGES,
MPI::COMM_WORLD);
MPI_Send(all_edges, edge_counter, IntPairType, current_process,
TAG_SEND_EDGES, MPI::COMM_WORLD);
// int step = num_edges / p;
// for (int i = 0; i < p; ++i) {
// int start = i * step;
// int end = i == p - 1 ? num_edges : start + step;
// int count = end - start;
// counts[i] = count;
// displs[i] = start;
// }
} else {
MPI_Recv(&num_my_edges, 1, MPI_INT, 0, TAG_SEND_NUM_EDGES, MPI::COMM_WORLD,
NULL);
my_edges = (pair *)calloc(num_my_edges, sizeof(pair));
MPI_Recv(my_edges, num_my_edges, IntPairType, 0, TAG_SEND_EDGES,
MPI::COMM_WORLD, NULL);
}
char *buf = (char *)calloc(sizeof(char), 1000);
int offset = 0; // Keep track of the current position in the buffer
for (int i = 0; i < std::min(num_my_edges, 5); i++) {
offset +=
sprintf(buf + offset, "(%d, %d)", my_edges[i].fst, my_edges[i].snd);
if (i < len - 1) {
// Add a separator (e.g., comma or space) if it's not the last
offset += sprintf(buf + offset, " ");
}
}
if (rank == 0) {
fclose(fp);
if (line)
free(line);
}
#pragma endregion
// Each process analyzes the non-local edges that are contained in its portion
// of the graph.
#pragma region
std::map<int, int> node_label_assignment;
std::pair<int, int> my_node_range = node_range(rank);
// Initial node assignment
for (int i = my_node_range.first; i < my_node_range.second; ++i) {
node_label_assignment[i] = i;
}
std::map<int, std::set<int>> adj;
std::set<int> non_local_nodes;
std::set<std::pair<int, int>> non_local_edges;
for (int i = 0; i < num_my_edges; ++i) {
pair edge = my_edges[i];
adj[edge.fst].insert(edge.snd);
if (!(my_node_range.first <= edge.fst && edge.fst < my_node_range.second)) {
non_local_nodes.insert(edge.fst);
non_local_edges.insert(std::make_pair(edge.snd, edge.fst));
}
if (!(my_node_range.first <= edge.snd && edge.snd < my_node_range.second)) {
non_local_nodes.insert(edge.snd);
non_local_edges.insert(std::make_pair(edge.fst, edge.snd));
}
}
#pragma endregion
// Each process determines which processors stores the non-local vertices
// corresponding to the non-local edges.
#pragma region
std::map<int, std::set<int>> send_map;
std::map<int, std::set<int>> recv_map;
for (auto entry : non_local_edges) {
int local_node = entry.first, remote_node = entry.second;
int corresponding_process = remote_node / each_num_nodes;
// The last process gets some extra nodes
if (corresponding_process >= p)
corresponding_process = p - 1;
send_map[corresponding_process].insert(local_node);
recv_map[corresponding_process].insert(remote_node);
}
#pragma endregion
// All the processes are communicating to figure out which process needs to
// send what data to the other processes.
#pragma region
#pragma endregion
// The processes perform the transfers of non-local labels and updates of
// local labels until convergence.
#pragma region
while (true) {
// First, exchange the data that needs to be exchanged
std::vector<int> sendbuf;
std::vector<int> send_counts;
std::vector<int> send_displs;
std::vector<int> recv_counts;
std::vector<int> recv_displs;
int recv_total;
{
int offset = 0;
for (int i = 0; i < p; ++i) {
int count = send_map[i].size();
// std::sort(send_map[i].begin(), send_map[i].end());
for (auto k : send_map[i]) {
sendbuf.push_back(node_label_assignment[k]);
}
send_counts.push_back(count);
send_displs.push_back(offset);
offset += count;
}
offset = 0;
for (int i = 0; i < p; ++i) {
int count = recv_map[i].size();
// std::sort(recv_map[i].begin(), recv_map[i].end());
recv_counts.push_back(count);
recv_displs.push_back(offset);
offset += count;
}
recv_total = offset;
}
std::vector<int> recvbuf(recv_total, 0);
// std::cout << fmt::format("[{}] {} \t|| \t{}", rank,
// fmt::join(send_counts, ", "),
// fmt::join(recv_counts, ", "))
// << std::endl;
MPI::COMM_WORLD.Alltoallv(sendbuf.data(), send_counts.data(),
send_displs.data(), MPI_INT, recvbuf.data(),
recv_counts.data(), recv_displs.data(), MPI_INT);
std::map<int, int> total_node_label_assignment(node_label_assignment);
for (int i = 0; i < p; ++i) {
std::vector<int> ouais(recv_map[i].begin(), recv_map[i].end());
for (int j = 0; j < recv_counts[i]; ++j) {
int remote_node = ouais[j];
int remote_value = recvbuf[recv_displs[i] + j];
total_node_label_assignment[remote_node] = remote_value;
}
}
// For each local node, determine the minimum label out of its neighbors
std::map<int, int> new_labels;
for (int i = my_node_range.first; i < my_node_range.second; ++i) {
int current_value = total_node_label_assignment[i];
int min = current_value;
for (auto neighbor : adj[i]) {
if (total_node_label_assignment[neighbor] < min)
min = total_node_label_assignment[neighbor];
}
if (min < current_value) {
new_labels[i] = min;
}
}
// std::cout << fmt::format("[{}] Helloge {}", rank,
// fmt::join(new_labels, ", "))
// << std::endl;
// Have there been any changes in the labels?
int num_changes = new_labels.size();
int total_changes;
MPI::COMM_WORLD.Allreduce(&num_changes, &total_changes, 1, MPI_INT,
MPI::SUM);
std::cout << fmt::format("[{}] # updates: {} ({})", rank, num_changes,
total_changes)
<< std::endl;
if (total_changes == 0) {
break;
}
// Update the original node assignment
for (auto entry : new_labels) {
node_label_assignment[entry.first] = entry.second;
}
}
#pragma endregion
// The results are gathered to a single process, which writes them to the
// disk.
#pragma region
if (rank == 0) {
std::vector<int> all_assignments(total_num_nodes);
std::map<int, int> label_count;
int ctr = 0;
for (int i = 0; i < p; ++i) {
std::pair<int, int> this_node_range = node_range(i);
int count = this_node_range.second - this_node_range.first;
if (i == 0) {
for (int j = 0; j < count; ++j) {
all_assignments[this_node_range.first + j] =
node_label_assignment[this_node_range.first + j];
label_count[all_assignments[this_node_range.first + j]]++;
}
} else {
MPI::COMM_WORLD.Recv(&all_assignments[this_node_range.first], count,
MPI::INT, i, TAG_SEND_FINAL_RESULT);
for (int j = 0; j < count; ++j) {
label_count[all_assignments[this_node_range.first + j]]++;
}
}
}
std::cout << "Done! " << label_count.size() << std::endl;
} else {
std::vector<int> flat_assignments;
for (int i = my_node_range.first; i < my_node_range.second; ++i) {
flat_assignments.push_back(node_label_assignment[i]);
}
MPI::COMM_WORLD.Send(flat_assignments.data(), flat_assignments.size(),
MPI::INT, 0, TAG_SEND_FINAL_RESULT);
}
#pragma endregion
MPI::Finalize();
return 0;
}
void init_pair_type(MPI_Datatype *out) {
int blocklengths[2] = {1, 1};
MPI_Datatype types[2] = {MPI_INT, MPI_INT};
MPI_Aint offsets[2];
offsets[0] = offsetof(pair, fst);
offsets[1] = offsetof(pair, snd);
MPI_Type_create_struct(2, blocklengths, offsets, types, out);
MPI_Type_commit(out);
}