thread_split.cpp

*/#include <mpi.h>#include <stdlib.h>#include <string.h>#include <stdio.h>#include <vector>#include <string>#include <utility>#include <assert.h>#include <sys/time.h>// Choose threading model:enum { THR_OPENMP = 1, THR_POSIX = 2, THR_NONE = 0 } threading = THR_POSIX;template <typename T> MPI_Datatype get_mpi_type();template <> MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; }template <> MPI_Datatype get_mpi_type<int>() { return MPI_INT; }template <> MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; }template <> MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; }int main_threaded(int argc, char **argv);template <typename T>bool work_portion_2(T *in, size_t count, size_t niter, int rank, int nranks){    memset(in, 0, sizeof(T) * count);}template <typename T>bool work_portion_1(T *in, size_t count, size_t niter, int rank, int nranks);template <> bool work_portion_1<char>(char *in, size_t count, size_t niter, int rank, int nranks) { return true; }template <> bool work_portion_1<int>(int *in, size_t count, size_t niter, int rank, int nranks){     for (size_t i = 0; i < count; i++) {        in[i] = (int)(niter * (rank+1) * i);    }    return true;}template <typename T>bool work_portion_3(T *in, size_t count, size_t niter, int rank, int nranks);template <> bool work_portion_3<char>(char *out, size_t count, size_t niter, int rank, int nranks) { return true; }template <> bool work_portion_3<int>(int *out, size_t count, size_t niter, int rank, int nranks){    bool result = true;    for (size_t i = 0; i < count; i++) {        result = (result && (out[i] == (int)(niter * nranks*(nranks+1)*i/2)));    }    return result;}int main_threaded_openmp(int argc, char **argv);int main_threaded_posix(int argc, char **argv);int main(int argc, char **argv){    if (argc > 1) {        if (!strcasecmp(argv[1], "openmp")) threading = THR_OPENMP;        if (!strcasecmp(argv[1], "posix")) threading = THR_POSIX;        if (!strcasecmp(argv[1], "none")) threading = THR_NONE;    }    if (threading == THR_OPENMP) {        main_threaded_openmp(argc, argv);        return 0;    } else if (threading == THR_POSIX) {        main_threaded_posix(argc, argv);        return 0;    }    printf("No threading\n");    int rank, nranks;    MPI_Init(&argc, &argv);    MPI_Comm_rank(MPI_COMM_WORLD, &rank);    MPI_Comm_size(MPI_COMM_WORLD, &nranks);    typedef int type;    size_t count = 1024*1024;    int niter = 100;    type *in = (type *)malloc(count * sizeof(type));    type *out = (type *)malloc(count * sizeof(type));    for (int j = 1; j < niter+1; j++) {        work_portion_1<type>(in, count, j, rank, nranks);        work_portion_2<type>(out, count, j, rank, nranks);        MPI_Allreduce(in, out, count, get_mpi_type<type>(), MPI_SUM, MPI_COMM_WORLD);        assert(work_portion_3<type>(out, count, j, rank, nranks));        MPI_Barrier(MPI_COMM_WORLD);    }    MPI_Finalize();    return 0;}#include <omp.h>void omp_aware_barrier(MPI_Comm &comm, int thread){    assert(thread != 0 || comm != MPI_COMM_NULL);#pragma omp barrier    if (thread == 0)        MPI_Barrier(comm);#pragma omp barrier}struct offset_and_count { size_t offset; size_t count; };int main_threaded_openmp(int argc, char **argv){    printf("OpenMP\n");    int rank, nranks, provided = 0;    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);    assert(provided == MPI_THREAD_MULTIPLE);    MPI_Comm_rank(MPI_COMM_WORLD, &rank);    MPI_Comm_size(MPI_COMM_WORLD, &nranks);    typedef int type;    size_t count = 1024 * 1024;    int niter = 100;    type *in = (type *)malloc(count * sizeof(type));    type *out = (type *)malloc(count * sizeof(type));    // Divide workload for multiple threads.    // Save (offset, count) pair for each piece    size_t nthreads = 8;    if (argc > 2) {        nthreads = atoi(argv[2]);    }    size_t nparts = (count > nthreads) ? nthreads : count;    // Use nparts, it might be less than nthreads    size_t base = count / nparts;    size_t rest = count % nparts;    size_t base_off = 0;    std::vector<offset_and_count> offs_and_counts(nparts);    for (size_t i = 0; i < nparts; i++) {        offs_and_counts[i].offset = base_off; // off        base_off += (offs_and_counts[i].count = base + (i<rest?1:0)); // size    }    // Duplicate a communicator for each thread    std::vector<MPI_Comm> comms(nparts, MPI_COMM_NULL);    for (size_t i = 0; i < nparts; i++) {        MPI_Comm &new_comm = comms[i];        MPI_Comm_dup(MPI_COMM_WORLD, &new_comm);    }    // Go into parallel region, use precalculated (offset, count) pairs to separate workload    // use separated communicators from comms[]    // use omp_aware_barrier instead of normal MPI_COMM_WORLD barrier#pragma omp parallel num_threads(nparts)    {        int thread = omp_get_thread_num();        offset_and_count &offs = offs_and_counts[thread];        MPI_Comm &comm = comms[thread];        for (int j = 1; j < niter+1; j++) {            if (!offs.count) { omp_aware_barrier(comm, thread); continue; }            work_portion_1<type>(in + offs.offset, offs.count, j, rank, nranks);            work_portion_2<type>(out + offs.offset, offs.count, j, rank, nranks);            MPI_Allreduce(in + offs.offset, out + offs.offset, offs.count, get_mpi_type<type>(), MPI_SUM, comm);            assert(work_portion_3<type>(out + offs.offset, offs.count, j, rank, nranks));            omp_aware_barrier(comm, thread);        }    }    MPI_Finalize();    return 0;}#include <pthread.h>#include <sys/time.h>#include <sched.h>void pthreads_aware_barrier(MPI_Comm &comm, pthread_barrier_t &barrier, int thread){    assert(thread != 0 || comm != MPI_COMM_NULL);    pthread_barrier_wait(&barrier);    if (thread == 0)        MPI_Barrier(comm);    pthread_barrier_wait(&barrier);}struct global_data {    typedef int type;    type *in, *out;    int niter;    size_t count;    int rank, nranks;    pthread_barrier_t barrier;};struct thread_local_data {    size_t offset;    size_t count;    int thread_id;    MPI_Comm *comm;    global_data *global;};void *worker(void *arg_ptr){    thread_local_data &thr_local = *((thread_local_data *)arg_ptr);    global_data &global = *(thr_local.global);    global_data::type *in = global.in;    global_data::type *out = global.out;    int &niter = global.niter;    int &rank = global.rank;    int &nranks = global.nranks;    pthread_barrier_t &barrier = global.barrier;    size_t &offset = thr_local.offset;    size_t &count = thr_local.count;    int &thread = thr_local.thread_id;    MPI_Comm &comm = *(thr_local.comm);    cpu_set_t mask;    CPU_ZERO(&mask);    CPU_SET(thread, &mask);    int res = sched_setaffinity(0, sizeof(mask), &mask);    if (res == -1)        printf("failed set_thread_affinity()\n");    for (int j = 1; j < global.niter+1; j++) {        if (!thr_local.count) { pthreads_aware_barrier(comm, barrier, thread); continue; }        work_portion_1<global_data::type>(in + offset, count, j, rank, nranks);        work_portion_2<global_data::type>(out + offset, count, j, rank, nranks);        MPI_Allreduce(in + offset, out + offset, count, get_mpi_type<global_data::type>(), MPI_SUM, comm);        assert(work_portion_3<global_data::type>(out + offset, count, j, rank, nranks));        pthreads_aware_barrier(comm, barrier, thread);    }}int main_threaded_posix(int argc, char **argv){    printf("POSIX\n");    int provided = 0;    global_data global;    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);    assert(provided == MPI_THREAD_MULTIPLE);    MPI_Comm_rank(MPI_COMM_WORLD, &global.rank);    MPI_Comm_size(MPI_COMM_WORLD, &global.nranks);    global.count = 1024 * 1024;    global.niter = 100;    global.in = (global_data::type *)malloc(global.count * sizeof(global_data::type));    global.out = (global_data::type *)malloc(global.count * sizeof(global_data::type));    // Divide workload for multiple threads.    // Save (offset, count) pair for each piece    size_t nthreads = 8;    if (argc > 2) {        nthreads = atoi(argv[2]);    }    size_t nparts = ((global.count > nthreads) ? nthreads : global.count);    pthread_barrier_init(&global.barrier, NULL, nparts);    // Use nparts, it might be less than nthreads    size_t base = global.count / nparts;    size_t rest = global.count % nparts;    size_t base_off = 0;    std::vector<thread_local_data> thr_local(nparts);    for (size_t i = 0; i < nparts; i++) {        thr_local[i].offset = base_off; // off        base_off += (thr_local[i].count = base + (i<rest?1:0)); // size        thr_local[i].thread_id = i;    }    // Duplicate a communicator for each thread    std::vector<MPI_Comm> comms(nparts);    MPI_Info info;    MPI_Info_create(&info);    char s[16];    for (size_t i = 0; i < nparts; i++) {        MPI_Comm &new_comm = comms[i];        MPI_Comm_dup(MPI_COMM_WORLD, &new_comm);        snprintf(s, sizeof s, "%d", i);        MPI_Info_set(info, "thread_id", s);        MPI_Comm_set_info(new_comm, info);        thr_local[i].comm = &new_comm;        thr_local[i].global = &global;    }    // Start parallel POSIX threads    std::vector<pthread_t> pids(nparts);    for (size_t i = 0; i < nparts; i++) {        pthread_create(&pids[i], NULL, worker, (void *)&thr_local[i]);    }    // Wait for all POSIX threads to complete    for (size_t i = 0; i < nparts; i++) {        pthread_join(pids[i], NULL);    }    MPI_Info_free(&info);    MPI_Finalize();    return 0;}

See Also

Code Change Guide