本文最后更新于:2022年8月1日 下午
1. 遍历每一个数,判断是否是质数
朴素方法 是否可以被整除
我们判断\(N\) 是否是质数时,不选要考虑\(\leq N\) 的所有情况,只需要考虑\(N\) 是否可以被小于等于\(\sqrt{N}\) 的数整除就可以了,因此代码如下(此时不考虑并行情况)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 bool ifprime (int n) { for (int k = 2 ; k <= sqrt (n); k++) { if (n % k == 0 ) return false ; } return true ; }void getPrime (vector<long > &prime) { for (int i = 2 ; i <= SIZE; i++) { if (ifprime(i)) prime.emplace_back (i); } }int main () { vector<long > prime; double t = omp_get_wtime (); getPrime (prime); t = omp_get_wtime () - t; printf ("time %f %ld\n" , t, long (SIZE)); printf ("\nsize %ld\n" , prime.size ()); }
此时运行时间为
time=0.477831
时间后的1000000时参数SIZE,含义是\(\leq
1000000\) 中有78498个质数
202203171647230
而在使用openmp并行后,从2-SIZE,每个线程分配一部分,判断是否为质数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 bool ifprime (int n) { for (int k = 2 ; k <= sqrt (n); k++) { if (n % k == 0 ) return false ; } return true ; }void getPrime (vector<long > &prime) { #pragma omp parallel { vector<int > vec_private;#pragma omp for nowait schedule(static) for (int i = 2 ; i <= SIZE; i++) { if (ifprime(i)) vec_private.emplace_back (i); }#pragma omp critical prime.insert (prime.end (), vec_private.begin (), vec_private.end ()); } }
此时运行时间为
time=0.033235
202203171651834
优化判断质数函数
首先,我们应该知道一个关于质数分布的规律:大于等于5的质数一定和6的倍数相邻 。例如5和7,11和13,17和19,反之是不一定成立的
证明:令x≥1,将大于等于5的自然数表示如下:···
6x-1,6x,6x+1,6x+2,6x+3,6x+4,6x+5,6(x+1),6(x+1)+1 ···
可以看到,不和6的倍数相邻的数为6x+2,6x+3,6x+4,由于2(3x+1),3(2x+1),2(3x+2),所以它们一定不是素数,再除去6x本身,显然,素数要出现只可能出现在6x的相邻两侧。因此在5到\(\sqrt{n}\) 中每6个数只判断2个,时间复杂度O(\(\frac{\sqrt{n}}{3}\) ),C++代码如下
1 2 3 4 5 6 7 8 9 10 11 bool ifprime (int n) { if (n == 2 || n == 3 ) return 1 ; if (n % 6 != 1 && n % 6 != 5 ) return 0 ; for (int k = 5 ; k <= floor (sqrt (n)); k += 6 ) if (n % k == 0 || n % (k + 2 ) == 0 ) return 0 ; return 1 ; }
串行 此时运行时间为
time=0.156363
202203171659060
并行 此时运行时间为
time=0.015401
202203171713716
2. 使用筛法
我们可以标记所有合数,然后求质数
1 2 3 4 5 6 7 8 9 10 11 long sign[SIZE + 1 ];void getPrime (vector<long > &prime) { int N = sqrt (SIZE); for (int i = 2 ; i <= N; i++) { if (!sign[i]) for (int j = i; j <= SIZE / i; j++) sign[i * j] = true ; }
sign[i]=true表示i为合数,当一个数i是质数时,我们标记\(i*j\) 为合数,\(i*j\leq SIZE\)
串行
此时运行时间为
time=0.021131
202203171712942
在遍历时并行
1 2 3 4 5 6 7 8 9 int N = sqrt (SIZE); #pragma omp parallel for num_threads(NUM_THREADS) for (int i = 2 ; i <= N; i++) { if (!sign[i]) for (int j = i; j <= SIZE / i; j++) sign[i * j] = true ; }
此时运行时间为
time=0.012204
202203171716363
在寻找合数时并行
1 2 3 4 5 6 7 8 9 int N = sqrt (SIZE); for (int i = 2 ; i <= N; i++) { if (!sign[i]) #pragma omp parallel for num_threads(NUM_THREADS) for (int j = i; j <= SIZE / i; j++) sign[i * j] = true ; }
此时运行时间为
time=0.006109
202203171718941
3. 时间对比
朴素方法 串行
0.477831
朴素方法 并行
0.033235
优化判断质数 串行
0.156363
优化判断质数 并行
0.015401
筛法 串行
0.021131
筛法 遍历时并行
0.012204
筛法 寻找合数时并行
0.006109
4. 多线程写入vector
在完成上述问题时,遇到过循环中多线程无法对同一个vector进行写操作,有以下几种解决方法
1 2 3 4 5 6 7 8 9 10 11 std::vector<int > vec;#pragma omp parallel { std::vector<int > vec_private; #pragma omp for nowait for (int i=0 ; i<100 ; i++) { vec_private.push_back (i); } #pragma omp critical vec.insert (vec.end (), vec_private.begin (), vec_private.end ()); }
OpenMP 4.0允许使用reduction
1 2 3 4 5 6 #pragma omp declare reduction (merge : std::vector<int> : omp_out.insert(omp_out.end(), omp_in.begin(), omp_in.end())) std::vector<int > vec;#pragma omp parallel for reduction(merge: vec) for (int i=0 ; i<100 ; i++) vec.push_back (i);
较为详细的可以这样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 std::vector<int > vec;#pragma omp parallel { std::vector<int > vec_private; #pragma omp for nowait schedule(static) for (int i=0 ; i<N; i++) { vec_private.push_back (i); } #pragma omp for schedule(static) ordered for (int i=0 ; i<omp_get_num_threads (); i++) { #pragma omp ordered vec.insert (vec.end (), vec_private.begin (), vec_private.end ()); } }