批改娘 20021. Dynamic Range Sum

題目描述

在二維平面上有 $N$ 個點,每一個點 $p_i(x_i, y_i)$各自帶權重 $w_i$,這些點不時會移動和改變權重。

現在 Morris 希望你幫忙撰寫線性搜索的函數,好讓他專注數據結構上的調整。函數詢問包含

  • $N$ 個點的資訊 (以 SoA (Structure of Array) 的方式儲存,以達到最好的快取使用率)
  • 詢問的矩形 $\text{Rect}$ (正交於兩軸)

函數必須回傳在矩形內部的點權重和。

1
int32_t search_range(Rect rect, int32_t x[], int32_t y[], int32_t w[], int32_t n);

main.c (測試用)

  • 一開始,在二維空間 $[0, R) \times [0, R)$ 之間產生 $N$ 個點的資訊
  • 接著,模擬 $M$ 次點的變化,並且呼叫 search_range
  • 最後,將所有答案 HASH 輸出一個值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include "DRS.h"

static uint32_t seed = 0;
static void p_srand(uint32_t x) { seed = x;}
static uint32_t p_rand() {return seed = (seed*9301 + 49297);}

static void swap(int *x, int *y) {
int tmp = *x;
*x = *y;
*y = tmp;
}
static inline Rect rand_rect(int R) {
Rect r;
r.lx = p_rand()%R;
r.ly = p_rand()%R;
r.rx = p_rand()%R;
r.ry = p_rand()%R;
if (r.lx > r.rx) swap(&r.lx, &r.rx);
if (r.ly > r.ry) swap(&r.ly, &r.ry);
return r;
}

static void init(int N, int R, int32_t x[], int32_t y[], int32_t w[]) {
for (int i = 0; i < N; i++) {
x[i] = p_rand()%R;
y[i] = p_rand()%R;
w[i] = p_rand()%R;
}
}

static void tick(int N, int R, int32_t x[], int32_t y[], int32_t w[]) {
for (int i = 0; i < 5; i++) {
int idx = p_rand()%N;
x[idx] = p_rand()%R;
y[idx] = p_rand()%R;
}
for (int i = 0; i < 5; i++) {
int idx = p_rand()%N;
w[idx] = p_rand()%R;
}
}

#define MAXN 1048576
int main() {
p_srand(0);
static int32_t x[MAXN], y[MAXN], w[MAXN];
int N = 1000, M = 10000, R = 100;

init(N, R, x, y, w);

int32_t hash = 0;
for (int it = 0; it < M; it++) {
Rect rect = rand_rect(R);
int32_t ret = search_range(rect, x, y, w, N);
hash ^= ret;
tick(N, R, x, y, w);
}
printf("%" PRIi32 "\n", hash);
return 0;
}

DRS.h

1
2
3
4
5
6
7
8
9
10
11
12
13
#ifndef __DRS_H
#define __DRS_H

#include <stdint.h>

typedef struct Rect {
int32_t lx, ly, rx, ry;
} Rect;

int32_t search_range(Rect rect, int32_t x[], int32_t y[],
int32_t w[], int32_t n);

#endif

DRS.c

你的目標是要加速下述函數的計算,通過最低要求加速 2 倍以上。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include "DRS.h"

int32_t search_range(Rect rect, int32_t x[], int32_t y[],
int32_t w[], int32_t n) {
int32_t ret = 0;
for (int i = 0; i < n; i++) {
if (rect.lx <= x[i] && x[i] <= rect.rx &&
rect.ly <= y[i] && y[i] <= rect.ry) {
ret += w[i];
}
}
return ret;
}

測資限制

  • $N \le 131072$
  • $R \le 32768$

範例輸入

no input

範例輸出

1
8967

編譯參數

1
2
3
4
5
all: main

main: main.c
gcc -std=c99 -Ofast -march=native DRS.c -c -o DRS.o
gcc -std=c99 -Ofast -march=native main.c DRS.o -o main

Solution

一般的線性作法,透過 rect.lx <= x[i] && x[i] <= rect.rx && rect.ly <= y[i] && y[i] <= rect.ry 操作,翻譯成組合語言時,四次的 branch & jump,在管線處理上的效能易受到影響。為了使這種 branch 減少而不使管線處理的效能下降,通常會使用查表法來完成,藉由并行的數值計算,在最後一步才進行 load/store 完成指定區塊的指令。

在這一題中,唯有條件式皆成立才執行,由於分枝操作上只有一種情況,故查表法就不適用於此。我們仍可以并行數個矩形判斷,並且存在至少一個矩形成立再運行 load 加總所需的資料,將會給予效能上的大幅提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include "DRS.h"

#include <x86intrin.h>
/*
int32_t search_range(Rect rect, int32_t x[], int32_t y[],
int32_t w[], int32_t n) {
int32_t ret = 0;
for (int i = 0; i < n; i++) {
if (rect.lx <= x[i] && x[i] <= rect.rx &&
rect.ly <= y[i] && y[i] <= rect.ry) {
ret += w[i];
}
}
return ret;
}
*/

int32_t search_range(Rect rect, int32_t x[], int32_t y[], int32_t w[], int32_t n) {
__m128i ret = _mm_set_epi32(0, 0, 0, 0);
rect.lx--, rect.ly--;
rect.rx++, rect.ry++;
__m128i lx = _mm_broadcastd_epi32(*((__m128i *) &rect.lx));
__m128i ly = _mm_broadcastd_epi32(*((__m128i *) &rect.ly));
__m128i rx = _mm_broadcastd_epi32(*((__m128i *) &rect.rx));
__m128i ry = _mm_broadcastd_epi32(*((__m128i *) &rect.ry));
__m128i zo = _mm_set_epi32(0, 0, 0, 0);
__m128i ic = _mm_set_epi32(3, 2, 1, 0);

for (int i = 0; i+4 <= n; i += 4) {
__m128i sx = _mm_load_si128((__m128i *) (x+i));
__m128i sy = _mm_load_si128((__m128i *) (y+i));
__m128i c1 = _mm_and_si128(_mm_cmplt_epi32(lx, sx), _mm_cmplt_epi32(sx, rx));
__m128i c2 = _mm_and_si128(_mm_cmplt_epi32(ly, sy), _mm_cmplt_epi32(sy, ry));
if (_mm_testz_si128(c1, c2) == 0) {
__m128i cc = _mm_and_si128(c1, c2);
__m128i vi = _mm_add_epi32(ic, _mm_set_epi32(i, i, i, i));
__m128i rs = _mm_mask_i32gather_epi32(zo, w+i, ic, cc, 4);
ret = _mm_add_epi32(ret, rs);
}
}

int32_t sum = 0;
for (int i = (n>>2)<<2; i < n; i++) {
if (rect.lx <= x[i] && x[i] <= rect.rx &&
rect.ly <= y[i] && y[i] <= rect.ry) {
sum += w[i];
}
}
static int32_t tmp[4] __attribute__ ((aligned (16)));
_mm_store_si128((__m128i*) &tmp[0], ret);
sum += tmp[0] + tmp[1] + tmp[2] + tmp[3];
return sum;
}
Read More +

批改娘 20020. Dot Product

題目描述

請嘗試使用 SIMD 技術 AVX/SSE/MMX 來加速以下的純數值計算。

main.c

最低需求加速兩倍快

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <stdio.h>
#include <assert.h>
#include <inttypes.h>
#include <stdint.h>

static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}

static uint32_t f(int N, int off, uint32_t key1, uint32_t key2) {
uint32_t sum = 0;
for (int i = 0, j = off; i < N; i++, j++)
sum += encrypt(j, key1) * encrypt(j, key2), i++, j++;
return sum;
}
int main() {
int N;
uint32_t key1, key2;
while (scanf("%d %" PRIu32 " %" PRIu32, &N, &key1, &key2) == 3) {
uint32_t sum = f(N, 0, key1, key2);
printf("%" PRIu32 "\n", sum);
}
return 0;
}

輸入格式

有多組測資,每組一行包含三個整數 $N, \; \text{key1}, \; \text{key2}$,表示向量長度 $N$、向量 $\vec{A}$ 由亂數種子 $\text{key1}$ 產生、向量 $\vec{B}$ 由亂數種子 $\text{key2}$ 產生。

  • $1 \le N \le 16777216$

輸出格式

對於每組測資輸出一行整數,為 $\vec{A} \cdot \vec{B}$ 的 unsigned 32-bit integer 結果。

範例輸入

1
2
16777216 1 2
16777216 3 5

範例輸出

1
2
2885681152
2147483648

編譯參數

1
gcc -std=c99 -O3 -march=native main.c -lm

參考資料

Solution

對於數值計算時,SIMD 能充分地加速程序,每一個元素皆經過一連串的數學函數計算,那麼把一連串的數學式拆分,化成最簡的邏輯計算,並且找出常數向量存放到暫存器中。如在影像處理的程序,即使沒有 GPU 幫忙,搭配 SIMD 也是個不錯的選擇,可以加速 2~4 倍之多。

為了凸顯效能差異,題目設計時必須在計算單一元素結果上複雜些,防止大部分的加速效果是來自於減少 branch 操作 (loop unrolling)。

AVX

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <assert.h>
#include <inttypes.h>
#include <stdint.h>
#include <x86intrin.h>

static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}

static uint32_t SSE(int N, int off, uint32_t key1, uint32_t key2) {
uint32_t sum = 0;
for (int i = (N>>3)<<3; i < N; i++)
sum += encrypt(i+off, key1) * encrypt(i+off, key2);
__m256i s_i = _mm256_set_epi32(off, off+1, off+2, off+3, off+4, off+5, off+6, off+7);
__m256i s_4 = _mm256_set_epi32(8, 8, 8, 8, 8, 8, 8, 8);
__m256i s_k1 = _mm256_set_epi32(key1, key1, key1, key1, key1, key1, key1, key1);
__m256i s_k2 = _mm256_set_epi32(key2, key2, key2, key2, key2, key2, key2, key2);
uint32_t modk1 = key1&31;
uint32_t modk2 = key2&31;
uint32_t cmodk1 = 32 - modk1;
uint32_t cmodk2 = 32 - modk2;
__m256i s_ret = _mm256_set_epi32(0, 0, 0, 0, 0, 0, 0, 0);
N >>= 3;
for (int it = 0; it < N; it++) {
__m256i r_1 = _mm256_or_si256(_mm256_slli_epi32(s_i, modk1), _mm256_srli_epi32(s_i, cmodk1));
r_1 = _mm256_xor_si256(_mm256_add_epi32(r_1, s_k1), s_k1);
__m256i r_2 = _mm256_or_si256(_mm256_slli_epi32(s_i, modk2), _mm256_srli_epi32(s_i, cmodk2));
r_2 = _mm256_xor_si256(_mm256_add_epi32(r_2, s_k2), s_k2);
__m256i r_m = _mm256_mullo_epi32(r_1, r_2);
s_ret = _mm256_add_epi32(s_ret, r_m);
s_i = _mm256_add_epi32(s_i, s_4);
}
{
static int32_t tmp[8] __attribute__ ((aligned (32)));
_mm256_store_si256((__m256i*) &tmp[0], s_ret);
sum += tmp[0] + tmp[1] + tmp[2] + tmp[3];
sum += tmp[4] + tmp[5] + tmp[6] + tmp[7];
}
return sum;
}
int main() {
int N;
uint32_t key1, key2;
while (scanf("%d %" PRIu32 " %" PRIu32, &N, &key1, &key2) == 3) {
uint32_t sum = SSE(N, 0, key1, key2);
printf("%" PRIu32 "\n", sum);

}
return 0;
}

SSE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <stdio.h>
#include <assert.h>
#include <inttypes.h>
#include <stdint.h>
#include <x86intrin.h>

static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}

static uint32_t SSE(int N, int off, uint32_t key1, uint32_t key2) {
uint32_t sum = 0;
for (int i = N/4*4; i < N; i++)
sum += encrypt(i+off, key1) * encrypt(i+off, key2);
__m128i s_i = _mm_set_epi32(off, off+1, off+2, off+3);
__m128i s_4 = _mm_set_epi32(4, 4, 4, 4);
__m128i s_k1 = _mm_set_epi32(key1, key1, key1, key1);
__m128i s_k2 = _mm_set_epi32(key2, key2, key2, key2);
uint32_t modk1 = key1&31;
uint32_t modk2 = key2&31;
uint32_t cmodk1 = 32 - modk1;
uint32_t cmodk2 = 32 - modk2;
__m128i s_ret = _mm_set_epi32(0, 0, 0, 0);
N >>= 2;
for (int it = 0; it < N; it++) {
__m128i r_1 = _mm_or_si128(_mm_slli_epi32(s_i, modk1), _mm_srli_epi32(s_i, cmodk1));
r_1 = _mm_xor_si128(_mm_add_epi32(r_1, s_k1), s_k1);
__m128i r_2 = _mm_or_si128(_mm_slli_epi32(s_i, modk2), _mm_srli_epi32(s_i, cmodk2));
r_2 = _mm_xor_si128(_mm_add_epi32(r_2, s_k2), s_k2);
__m128i r_m = _mm_mullo_epi32(r_1, r_2);
s_ret = _mm_add_epi32(s_ret, r_m);
s_i = _mm_add_epi32(s_i, s_4);
}
{
static int32_t tmp[4] __attribute__ ((aligned (16)));
_mm_store_si128((__m128i*) &tmp[0], s_ret);
sum += tmp[0] + tmp[1] + tmp[2] + tmp[3];
}
return sum;
}
int main() {
int N;
uint32_t key1, key2;
while (scanf("%d %" PRIu32 " %" PRIu32, &N, &key1, &key2) == 3) {
uint32_t sum = SSE(N, 0, key1, key2);
printf("%" PRIu32 "\n", sum);

}
return 0;
}
Read More +

批改娘 20018. Square Root of Vector Elements

Background

Intel 公司在 X86 指令集上提供了 AVX (Advanced Vector Extensions)、SSE (Streaming SIMD Extensions) 的特殊指令,這些在 SIMD 指令上提供強力的加速。

Problem

現在給你長度為 $N$ 的 32-bit floating point,分別將其開根號回傳。

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <stdio.h>
#include <memory.h>
#include <math.h>
#include <time.h>
#include "VSQRT.h"


static float a[1048576], b[1048576];
int main() {
int N = 1048576;
for (int i = 0; i < N; ++i)
a[i] = i;

for (int it = 0; it < 20; it++)
{
memcpy(b, a, sizeof(a[0])*N);
clock_t t = clock();
for (int i = 0; i < 10; i++)
sqrt2(b, b+N);
t = clock() - t;
fprintf(stderr, "It took me %f seconds.\n", ((float) t)/CLOCKS_PER_SEC);
float sum = 0;
for (int i = 0; i < N; i++)
sum += b[i];
printf("%f\n", sum);
}
return 0;
}

VSQRT.h

1
2
3
4
5
6
#ifndef __VSQRT_H
#define __VSQRT_H

void sqrt2(float *begin, float *end);

#endif

VSQRT.c

請嘗試加速以下程式碼。

1
2
3
4
5
6
7
8
#include "VSQRT.h"

#include <math.h>

void sqrt2(float *begin, float *end) {
for (; begin != end; begin++)
*begin = sqrt(*begin);
}

Sample Input (stdin)

no input

Sample Output (stdout)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000
1051776.125000

Solution

這裡我們使用 Intel CPU 撰寫 SIMD 指令時,透過編譯參數 -msse -mavx ... 指定相關的指令集進行調適。這些單一指令多資料處理的操作,使得在指定層級得到較高的平行度。

為了使用它們,我們需要較多的前置處理,例如要把資料載入和儲存時,必然要符合對齊的標準,若記憶體位址不是 32 的倍數 (AVX 的 256 bits) 或者 16 的倍數 (SSE 的 128 bits),則系統會產生 trap,進而導致程式運行中斷而發生錯誤。這方面必須特別小心,有時編譯器恰好幫你對齊而正常運行,若增加其他變數時,對齊就有可能跑掉,這時候再去找錯誤會變得相當瑣碎。

通常些 SSE/AVX 加速指令,將會在 gcc -Ofast 中使用,而在 -O3 下仍有可能尚未開啟。而在 LLVM 中的 clang -O2 編譯下就會自動開啟 SSE。無論如何,要明白開啟這些的并行指令的代價,數據小造成運行效能低落,隨著資料量遽增才會明顯加速,若牽涉到浮點數計算,務必注意精準度的需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include "VSQRT.h"

#include <stdint.h>
#include <math.h>
#include <x86intrin.h>

//#undef __AVX__
//#undef __SSE__

#if defined(__AVX__)
void sqrt2(float *a, float *end) {
uint32_t offset = ((void *) a - (void *) 0);
for (; a < end && (offset&31); a++, offset += sizeof(float))
*a = sqrt(*a);
__m256* ptr = (__m256*) a;
for (; a + 8 <= end; a += 8, ptr++)
_mm256_store_ps(a, _mm256_sqrt_ps(*ptr));
for (; a < end; a++)
*a = sqrt(*a);
}
#elif defined(__SSE__)
void sqrt2(float *a, float *end) {
uint32_t offset = ((void *) a - (void *) 0);
for (; a < end && (offset&15); a++, offset += sizeof(float))
*a = sqrt(*a);
__m128* ptr = (__m128*) a;
for (; a + 4 <= end; a += 4, ptr++)
_mm_store_ps(a, _mm_sqrt_ps(*ptr));
for (; a < end; a++)
*a = sqrt(*a);
}
#else
void sqrt2(float *begin, float *end) {
for (; begin != end; begin++)
*begin = sqrtf(*begin);
}
#endif
Read More +

助教生涯-平行總結

從碩班入學開始的這兩年內,助教一職圍繞在身邊許久,不擅長教別人的我卻非得接下這份工作-「教練,我可以不當助教嗎?」一轉眼兩年過去,當了三個學期半的助教,兩年間也不過四個學期,我想這些經歷肯定對於其他人是特別的,甚至會覺得有博班學長跟你一起當助教。不幸地,沒人可以給你經驗學習當助教-「從今天開始,你就是課程助教了!」

關於作業

大部分的課程都會使用的相同作業,助教只需要發下去給同學們寫就好。若加點變化,這就讓助教苦了,那是為什麼呢?一個小小的更動將更新參考答案、增加檢測方法、準備應付學生難以理解題目的諮詢,單兵作業的情況下,要如何做到全面防守便是一場長期消耗戰。

在學校課程中,也只有幾堂課是週週出作業、交作業,那週週改作業是怎麼一回事!那麼就是一個完全的管線處理,一下子從期初忙到期末,作業格式要找老師檢查、課堂中督導學生撰寫、課後收作業批閱、收到補交作業批改、接獲隔週作業需求設計、進行初步實驗和作業規劃、郵件通知學生 … 等,每週這樣子循環,有時候在想要是突然生病或消失,會出什麼事情呢?好想知道,希望是個可以找個人接替的日子,那麼就能好好地做自己的事情。

永無止盡地思考——這樣的設計會不會出事?誰能來提供更好的解法,能不能再設計得更好了一些?你是否也有相同的煩惱呢?

關於考試

考試不像作業有一陣子的緩衝期,出題面向與準備方向需要跟老師確定,一旦沒有喬好,就會在學生和老師間被踢來踢去,要是讓學生準備方向錯誤,那麼一定會件很可怕的事情。檢查再檢查,還是會有一些描述在當下造成學生無法理解的部分。更擔心的是,知道考題內容也沒有附上參考答案和配分方法。當問答題時更為頭痛,總會有那些出乎意料的答題方法,學生考試當下詢問時,這要怎麼辦呢?例外處理要怎麼給分?

考卷批改又是另一回事,由老師改還是助教改呢?改完就要準備讓學生檢討「助教?為什麼這會錯?為什麼他寫那樣就會對,我寫就不對?」有時候,文字描述總是很可怕的,當我們充分理解這位仁兄時,看考卷的觀點會變成「假設你懂」相反地就是「假設你不懂」,通常都要以後者為主,但如果偏向實作課程時,對於那些實作能力很好,卻不想背誦專有名詞定義的高手而感到惋惜。

最好的解決方法,就是助教都認得出來每個學生和表現,這樣也許會好上一些,一個學期能不能認識一個班級的人呢?有些很少出現和表現的同學,到底要怎麼認識啊!

關於環境架設

在程式撰寫方面,替學生進行環境設定相當苦惱,因為有分成標準與不標準,像我這種非正規教育出來的,只知道一堆網路搜索的資訊結果,哪一種解決方法才是最好的,能帶到下一個運行版本下持續運作,長遠性的規劃!只發現以前課程所遺留下來的資料,大多都有一些毛病在,還有一些尚未描述清楚的部分。「咱很笨,能教教我嗎?上上上屆學長們,你們消失到哪了?」

「原來我之前什麼都不明白啊」—《正解的卡多 KADO》

雜言

「無言」-《從零開始的魔法書》

當收到同學問程式,時常忘記給予任何嘗試的足跡,只有一份有錯的程序,接著就要助教幫忙通靈找錯誤,你先可以問問一起修課的同學嗎?每次都這樣子的話,咱已經受不了啊,一年、兩年過去,也疲於應付這些,我也想要屬於我自己的救贖啊!

「」-《為美好的世界獻上祝福》

每週跟課三個多小時,又要確定作業題目、生測資和測試,我自己也想鑽研作業,所追求的作業是沒有底的標準,能更好就更好,多給一點意見,讓我也能從中學習吧,為什麼都只有作業寫好就完事?有點傷心,不甘如此地結束,你能驚艷我的,對吧?

從系統環境架設、考試準備、通知學生和課程時替學生除錯,製作課程簡報,修正幾代學長遺留下來的錯誤,撰寫參考解答,追求更好的效能與算法,行政事項的準備,還有外來的委託架設。該經歷的都經歷了,只是一個人忙不太過來,也一直撐不住這麼多龐大的壓力。

然而,要準備離開學校的當下,信箱不時有一些待申請的大一新生信件,其中還會有外系學生想要使用我們課程中使用的系統來學習。系上的轉系考要我弄環境、考題,看來我的人生很不平靜呢 … 一些邏輯說不通的事情發生在身上時,過得是多麼無力呀。

Read More +

平行優化技巧-基礎篇

在撰寫平行程式時,仍會使用到編譯器的技巧,大幅度地減少指令個數,有時候這些優化甚至會把寫壞的分享記憶體 (shared memory) 存取給移除掉,如果能充分理解這些不好的設計,就不會造成實驗上發生的謬誤,還以為自己跑得很慢,平行根本沒加速到,這樣草草交出的報告,助教也收到煩了 (這裡不討論實驗方法本身就是錯的情況)。

內存篇

首先,來談談 cache line 是什麼吧?現在的機器都有著記憶體階層架構 (memory hierarchy),按照大小排序,其中最小的就是 cache line,接下來才是 L1, L2, L3 cache,最後是最大的主記憶體 (main memory),但某些超級電腦的設計就不是如此,而有些輔助處理器 (如 intel xeon phi) 缺少 L3 cache,而像 GPU 則沒有 cache 設計,但有提供有 bank 存取資料。這些都跟處理問題的類型有關,才導致有那些特殊設計。

Multicore

Cache Size

為什麼 cache line 很重要?因為存取速度最快,但能存放的資料也很小,通常是 64 bytes 或者 32 bytes。在快取設計中,一次把位址連續的資料一起搬靠近 CPU,這樣簡單的設計,導致我們在撰寫程式時,若能將結構定義得好便能讓使用率高,這非常仰賴編成者對於系統架構的了解。例如色彩 RGB,可以宣告成 struct RGB {flaot R, G, B}; 或者 float R[], G[], B[],如果 RGB 三者會共同計算,例如色彩轉換通常是三者交互作用的表達式,因此前者構造方式較為妥當。

例題:批改娘 10087. Sparse Matrix Multiplication (OpenMP)

Cache Hierarchy

然而,有些情況充分使用反而不好,沒錯,就是平行程式,通常平行程式不外乎會有分享記憶體,這時候充分利用反而是一件壞事,因為我們需要讓數個核心 (core) 同時運行,因為每一個 core 都有各自的 cache line,若在不同 core 上同時修改相近的位址時,很容易產生 dirty bit,這導致要掉回 L3 cache (同一個 CPU,但分享在不同的 core 上) 進行同步。通常編譯器能做到 independent 分析,那麼就會利用暫存器配置來避開這問題,如果不行,就要靠自己手動來調整。

例題:批改娘 10085. Parallel Count (debug)

GPU

GPU 的記憶體設計分成四種,經常分成 on-chip 和 off-chip,差別在於是不是內建焊死,因此 on-chip 內建記憶體有比較好的效能,而 off-chip 則是外接記憶體,存取速度相對於 on-chip 慢個三倍以上。特別注意,有好就有壞,提供越快存取速度的記憶體能存放的容量越小,因此非常不容易配置。

在編寫 CUDA 或者是 OpenCL 時,提供 shared memory 的設計,如果請求數量過大,將會自動轉入 global memory,這一點沒有明確告知,只有在執行時期才會發現突然變慢。若採用存取速度較快的寫法,有時候不見得比較快,

例題:批改娘 10101. Fast Game of Life (CUDA)

Read More +

批改娘 10116. Fast Dynamic Programming Computing I (OpenMP)

題目描述

給定一序列矩陣,期望求出相乘這些矩陣的最有效方法的乘法次數。

sequence.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#define MAXN 4096
#define INF (1LL<<60)
int N;
long long dp[MAXN*MAXN], SZ[MAXN+5];
int main() {
while (scanf("%d", &N) == 1) {
for (int i = 0; i <= N; i++)
scanf("%lld", &SZ[i]);
for (int i = 1; i <= N; i++) {
for (int j = 0; j < N-i; j++) {
int l = j, r = j+i;
long long local = INF;
for (int k = l; k < r; k++) {
long long t = dp[l*N+k] + dp[(k+1)*N+r] + SZ[l] * SZ[k+1] * SZ[r+1];
if (t < local)
local = t;
}
dp[l*N+r] = local;
}
}
printf("%lld\n", dp[0*N+N-1]);
}
return 0;
}

輸入格式

有多組測資,每組第一行會有一個整數 $N$ 表示矩陣鏈上有 $N$ 個矩陣,第二行上會有 $N+1$ 個整數 $Z_i$,表示矩陣鏈的每一個行列大小,例如當 $N = 3$ 時,輸入 10 30 5 60 表示矩陣 $A_{10, 30} B_{30, 5} C_{5, 60}$ 相乘。

  • $1 \le N \le 4096$
  • $1 \le Z_i \le 4096$

輸出格式

對於每組測資輸出一行一個整數 $M$ 為最少乘法次數。

範例輸入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
2 2 2

3
10 30 5 60

3
1 5 20 1

3
5 10 20 35

6
30 35 15 5 10 20 25

範例輸出

1
2
3
4
5
8
4500
105
4500
15125

Solution

假設有 $p$ 個處理單元,矩陣大小為 $n \times n$,分析一般平行運算時間 $T_p$ 如下所示:

$$\begin{align*} T_p &= \sum\limits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil \times (n - k) \end{align*}$$

針對地 $k$ 次迭代,將第二層迴圈平行化,每一個執行緒處理 $\left\lceil \frac{k}{p} \right\rceil$ 個狀態計算,每個狀態繼續需要 $n-k$ 次計算。

$$\begin{align*} \sum\limits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil \times k &= 1 \times (1 + 2 + 3 + \cdots + p) + \\ & \qquad 2 \times ((p+1) + (p+2) + (p+3) + \cdots + 2p) + \cdots + \\ & \qquad (n \bmod{p}) \times \left\lceil \frac{n}{p}\right\rceil (\lfloor n/p \rfloor \times p + 1 + \cdots + n) \\ &= \sum_{k=1}^{\lfloor n/p \rfloor} k \cdot \frac{p (2k+1) p}{2} + (n \bmod{p}) \times \left\lceil \frac{n}{p}\right\rceil \frac{(n \bmod{p})(n + \left\lfloor \frac{n}{p}\right\rfloor p + 1)}{2} \\ &= p^2 \left[ \frac{\left\lfloor n/p \right\rfloor(\left\lfloor n/p \right\rfloor+1)(2 \left\lfloor n/p \right\rfloor + 1)}{6} + \frac{\left\lfloor n/p \right\rfloor(\left\lfloor n/p \right\rfloor+1)}{4} \right] + (n \bmod{p})^2 \left\lceil n/p \right\rceil \frac{n + \left\lfloor n/p \right\rfloor p + 1}{2} \\ \sum\limits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil \times n &= n \cdot p \cdot \frac{\left\lfloor n/p \right\rfloor (\left\lfloor n/p \right\rfloor + 1)}{2} + n \cdot (n - p \cdot \left\lfloor n/p \right\rfloor) \left\lceil n/p \right\rceil \end{align*}$$

總結一下 $T_p = \sum\nolimits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil \times n - \sum\nolimits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil \times k = O(n^3 / p)$

針對前半段 $\sum\limits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil$ 有以下兩種推法可供參考。

方法一

$$\begin{align*} \sum\limits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil &= p \cdot 1 + p \cdot 2 + p \cdot 3 + \cdots + p \cdot \left\lfloor n/p \right\rfloor + (n \bmod{p}) \cdot \left\lceil n/p \right\rceil \phantom{0\over0}\\ &= \sum\limits_{k=1}^{\left\lfloor n/p \right\rfloor} k \cdot p + (n - p \cdot \left\lfloor n/p \right\rfloor) \left\lceil n/p \right\rceil \\ &= p \cdot \frac{\left\lfloor n/p \right\rfloor (\left\lfloor n/p \right\rfloor + 1)}{2} + (n - p \cdot \left\lfloor n/p \right\rfloor) \left\lceil n/p \right\rceil && \blacksquare \end{align*}$$

方法二

套用定理如下:(解釋: $n$ 個人分成 $p$,每組人數最多差一。)

$$\begin{align} n = \left\lceil \frac{n}{p} \right\rceil + \left\lceil \frac{n-1}{p} \right\rceil + \cdots + \left\lceil \frac{n-p+1}{p} \right\rceil \end{align}$$

套用上式從 $k = n$ 往回推。

$$\begin{align*} \sum\limits_{k=1}^{n} \left\lceil \frac{k}{p} \right\rceil &= \left\lceil \frac{n}{p} \right\rceil + \left\lceil \frac{n-1}{p} \right\rceil + \cdots + \left\lceil \frac{n-p+1}{p} \right\rceil + \sum_{k=1}^{n-p} \left\lceil \frac{k}{p} \right\rceil \\ &= n + (n - p) + (n - 2p) + \cdots + (n - \lfloor n/p \rfloor \cdot p) + \sum_{k=1}^{n \bmod{p}} \left\lceil \frac{k}{p} \right\rceil \\ &= \sum\limits_{k=0}^{\left\lfloor n/p \right\rfloor - 1} (n - k p) + (n \bmod{p}) \\ &= n \left\lfloor n / p \right\rfloor - \frac{\left\lfloor n/p \right\rfloor (\left\lfloor n/p \right\rfloor - 1)}{2} \times p + (n \bmod{p}) && \blacksquare \end{align*}$$

基礎平行

  • Accepted (50883 ms, 132224 KB)

平行地方呼之欲出,針對第二層迴圈直接平行化即可,下述代碼會帶入一點累贅,其原因在於做了各種實驗所致,但不影響正確性。只做了減少 thread 建立時的 overhead 的處理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <stdio.h>
#include <omp.h>
#define MAXN 4096
#define INF (1LL<<60)
int N;
long long dp[MAXN*MAXN], SZ[MAXN+5];
int main() {
while (scanf("%d", &N) == 1) {
for (int i = 0; i <= N; i++)
scanf("%lld", &SZ[i]);
for (int i = 0; i < N; i++)
dp[i*N+i] = 0;
#pragma omp parallel firstprivate(N)
{
for (int i = 1; i <= N; i++) {
#pragma omp for
for (int j = 0; j < N-i; j++) {
int l = j, r = j+i;
long long local = INF;
dp[l*N+r] = INF;
for (int k = l; k < r; k++) {
long long t = dp[l*N+k] + dp[(k+1)*N+r] + SZ[l] * SZ[k+1] * SZ[r+1];
if (t < local)
local = t;
}
if (local < dp[l*N+r])
dp[l*N+r] = local;
}
}
}
printf("%lld\n", dp[0*N+N-1]);
}
return 0;
}

進階平行

  • Accepted (9890 ms, 136320 KB)

從一般平行實驗結果中,發現明顯地加速倍率不對,明明使用 24 個核心,跑起來不到兩倍加速的原因到底在哪?是的,在第三層存取順序需要 dp[l][k]dp[k+1][r] 隨著 $k$ 變大,在前者存取順序是連續的,而在後者存取順序每一次跳躍長度為 $N$,導致 dp[k+1][r] 常常會發生 cache miss,一旦發生 cache miss,需要數百的 cycle 等待資料被帶進記憶體中。

由於矩陣鍊乘積計算時只用了矩陣的上三角,那複製一份到下三角矩陣去吧!dp[l][r] = dp[r][l],如此一來在第三層迴圈發生 cache miss 的機會就大幅度下降。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <stdio.h>
#include <omp.h>
#define MAXN 4096
#define INF (1LL<<60)
int N, SZ[MAXN+5];
long long dp[MAXN*MAXN];
int main() {
while (scanf("%d", &N) == 1) {
for (int i = 0; i <= N; i++)
scanf("%d", &SZ[i]);
for (int i = 0; i < N; i++)
dp[i*N+i] = 0;
#pragma omp parallel firstprivate(N)
{
for (int i = 1; i <= N; i++) {
#pragma omp for
for (int j = 0; j < N-i; j++) {
int l = j, r = j+i;
long long local = INF, base = 1LL * SZ[l] * SZ[r+1];
long long *dp1 = dp + l*N, *dp2 = dp + r*N;
for (int k = l; k < r; k++) {
long long t = dp1[k] + dp2[k+1] + SZ[k+1] * base;
if (t < local)
local = t;
}
dp1[r] = dp2[l] = local;
}
}
}
printf("%lld\n", dp[0*N+N-1]);
}
return 0;
}

忘卻快取平行

  • Accepted (4264 ms, 134400 KB)

  • 參考論文 High-perofrmance Energy-efficient Recursive Dynamic Programming with Matrix-multiplication-like Flexible Kernels

最主要的精神 cache-oblivious algorithm 設計,利用大方陣切割成數個小方陣,矩陣跟矩陣之間在足夠小的情況下進行合併計算。算法概述如下:

cache-oblivious algorithm

每一個函數參數中的藍色區塊答案算出,而其他參數則是要合併的左矩陣和下矩陣,直到計算大小足夠小時,直接跑序列版本的程式,此時所有陣列可以全部帶進快取,這時候計算變得更加有利。再加上很多層的快取,每一個 CPU 的 cache 重複使用率相當高。

一般的平行度最高只有 $N$,因為我們只針對其中一個迴圈平行,而在 cache-oblivious algorithm 中,平行度最高為 $N^{1.415}$。這些都只是理論分析,實作時為了考慮硬體架構是沒辦法達到理想狀態的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#define MAXN 4096
#define MAXD 6
#define MAXC 64
#define INF (1LL<<62)
int N;
long long dp[MAXN*MAXN], SZ[MAXN+5];
typedef struct MtxHead {
long long *A;
int bx, by, n;
} MtxHead;

MtxHead subMtx(MtxHead X, int i, int j) {
MtxHead T = X;
T.n >>= 1;
if (i == 2) T.bx += T.n;
if (j == 2) T.by += T.n;
return T;
}
static inline int min(int x, int y) {
return x < y ? x : y;
}
static inline int max(int x, int y) {
return x > y ? x : y;
}
static inline long long loop(MtxHead X, MtxHead U, MtxHead V, int tl, int tr, int l, int r) {
long long v = INF, t;
long long comSZ = SZ[l] * SZ[r+1];
for (int k = tl; k < tr; k++) {
t = U.A[l*N+k] + V.A[(k+1)+r*N] + SZ[k+1] * comSZ;
if (t < v) v = t;
}
return v;
}
void Cloop(MtxHead X, MtxHead U, MtxHead V) {
int n = X.n, l, r, tl, tr;
long long v, t;

for (int i = n-1; i >= 0; i--) {
for (int j = 0; j < n; j++) {
l = X.bx + i, r = X.by + j;
v = X.A[l*N+r];

tl = max(U.by, X.bx+i), tr = min(U.by+n, X.by+j);
t = loop(X, U, V, tl, tr, l, r);
if (t < v) v = t;

tl = max(V.bx, X.bx+i), tr = min(V.bx+n, X.by+j);
t = loop(X, U, V, tl, tr, l, r);
if (t < v) v = t;

X.A[l*N+r] = X.A[r*N+l] = v;
}
}
}
void Bloop(MtxHead X, MtxHead U, MtxHead V) {
int n = X.n, l, r, tl, tr;
long long v, t;
for (int i = n-1; i >= 0; i--) {
for (int j = 0; j < n; j++) {
l = X.bx + i, r = X.by + j;
v = X.A[l*N+r];

tl = max(U.by+i, X.bx+i), tr = min(U.by+n, X.by+j);
t = loop(X, U, V, tl, tr, l, r);
if (t < v) v = t;

tl = max(V.bx, X.bx+i), tr = min(V.bx+n, X.by+j);
t = loop(X, U, V, tl, tr, l, r);
if (t < v) v = t;

X.A[l*N+r] = X.A[r*N+l] = v;
}
}
}
void Aloop(MtxHead X) {
int n = X.n;
for (int i = 1; i <= n; i++) {
for (int j = 0; j+i < n; j++) {
int l = X.bx + j, r = X.by + j+i;
long long v = X.A[l*N+r], t;
long long comSZ = SZ[l] * SZ[r+1];
for (int k = l; k < r; k++) {
t = X.A[l*N+k] + X.A[(k+1)+r*N] + SZ[k+1] * comSZ;
if (t < v)
v = t;
}
X.A[l*N+r] = X.A[r*N+l] = v;
}
}
}
void Cpar(MtxHead X, MtxHead U, MtxHead V, int dep) {
if (X.n <= MAXC) {
Cloop(X, U, V);
return ;
}
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 1, 1), subMtx(U, 1, 1), subMtx(V, 1, 1), dep+1);
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 1, 2), subMtx(U, 1, 1), subMtx(V, 1, 2), dep+1);
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 2, 1), subMtx(U, 2, 1), subMtx(V, 1, 1), dep+1);
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 2, 2), subMtx(U, 2, 1), subMtx(V, 1, 2), dep+1);
#pragma omp taskwait
//
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 1, 1), subMtx(U, 1, 2), subMtx(V, 2, 1), dep+1);
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 1, 2), subMtx(U, 1, 2), subMtx(V, 2, 2), dep+1);
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 2, 1), subMtx(U, 2, 2), subMtx(V, 2, 1), dep+1);
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 2, 2), subMtx(U, 2, 2), subMtx(V, 2, 2), dep+1);
#pragma omp taskwait
}
void Bpar(MtxHead X, MtxHead U, MtxHead V, int dep) {
if (X.n <= MAXC) {
Bloop(X, U, V);
return ;
}
Bpar(subMtx(X, 2, 1), subMtx(U, 2, 2), subMtx(V, 1, 1), dep+1);
//
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 1, 1), subMtx(U, 1, 2), subMtx(X, 2, 1), dep+1);
#pragma omp task if (dep < MAXD)
Cpar(subMtx(X, 2, 2), subMtx(X, 2, 1), subMtx(V, 1, 2), dep+1);
#pragma omp taskwait
//
#pragma omp task if (dep < MAXD)
Bpar(subMtx(X, 1, 1), subMtx(U, 1, 1), subMtx(V, 1, 1), dep+1);
#pragma omp task if (dep < MAXD)
Bpar(subMtx(X, 2, 2), subMtx(U, 2, 2), subMtx(V, 2, 2), dep+1);
#pragma omp taskwait
//
Cpar(subMtx(X, 1, 2), subMtx(U, 1, 2), subMtx(X, 2, 2), dep+1);
Cpar(subMtx(X, 1, 2), subMtx(X, 1, 1), subMtx(V, 1, 2), dep+1);
Bpar(subMtx(X, 1, 2), subMtx(U, 1, 1), subMtx(V, 2, 2), dep+1);
}
void Apar(MtxHead X, int dep) {
if (X.n <= MAXC) {
Aloop(X);
return ;
}
#pragma omp task if (dep < MAXD)
Apar(subMtx(X, 1, 1), dep+1);
#pragma omp task if (dep < MAXD)
Apar(subMtx(X, 2, 2), dep+1);
#pragma omp taskwait

Bpar(subMtx(X, 1, 2), subMtx(X, 1, 1), subMtx(X, 2, 2), dep+1);
}
void Psmall(int N) {
for (int i = 0; i < N; i++)
dp[i*N+i] = 0;
#pragma omp parallel
{
for (int i = N-1; i > 0; i--) {
int comN = N-i;
#pragma omp for
for (int j = 0; j < i; j++) {
int l = j, r = comN+j;
long long local = INF;
long long *dp1 = dp+l*N, *dp2 = dp+r*N;
long long comSZ = SZ[l] * SZ[r+1];
for (int k = l; k < r; k++) {
long long t = dp1[k] + dp2[(k+1)] + comSZ * SZ[k+1];
if (t < local)
local = t;
}
dp[l*N+r] = dp[r*N+l] = local;
}
}
}
printf("%lld\n", dp[0*N+N-1]);
}
int main() {
while (scanf("%d", &N) == 1) {
for (int i = 0; i <= N; i++)
scanf("%lld", &SZ[i]);
if (N <= 2048) {
Psmall(N);
continue;
}
int ON = N;
while ((N&(-N)) != N)
N++;
for (int i = 0; i < N; i++) {
for (int j = i+1; j < N; j++)
dp[i*N+j] = INF;
dp[i*N+i] = 0;
}
MtxHead X;
X.n = N, X.bx = X.by = 0, X.A = dp;
#pragma omp parallel
{
#pragma omp single
Apar(X, 0);
}
printf("%lld\n", dp[0*N+ON-1]);
}
return 0;
}
Read More +

批改娘 10113. Longest Common Subsequence II (CUDA)

題目描述

給兩個字串 $X, \; Y$,在兩個字串中都有出現且最長的子序列 (subsequence),就是最長共同子字串

輸入格式

有多組測資,每組測資有兩行字串 $X, \; Y$,$X, \; Y$ 只由 A T C G 四個字母構成。

  • $1 \le |X|, |Y| \le 60000$

輸出格式

針對每一組測資,輸出一行 $X, \; Y$ 的最長共同子字串長度以及任何一組最長共同子字串。

範例輸入

1
2
3
4
TCA
GTA
TGGAC
TATCT

範例輸出

1
2
3
4
2
TA
3
TAC

編譯參數

1
$ nvcc -Xcompiler "-O2 -fopenmp" main.cu -o main

Solution

附錄為舊版設計,可以針對 OpenMP 的設計再重新設計 CUDA 版本。

舊版設計根據 CPU 平行與不平行以及 GPU 三種狀態,根據閥值決定要選擇運行哪一種版本。若加上足夠小的情況下,直接用 $O(N^2)$ 表格直接回溯,和 OpenMP 判斷閥值的公式,效能還能在加速個兩到三倍,但整體而言未必比純 OpenMP 還要快,因為有 GPU 啟動的 overhead,要跑夠多的測資才看得出來。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#include <cstdio>
#include <cstdlib>
#include <algorithm>
#include <omp.h>
using namespace std;
#define MAXN 60005
#define CHARSET 4
#define max(x, y) (x) > (y) ? (x) : (y)
typedef unsigned short uint16;
static char A[MAXN], B[MAXN];
static int c2i[128];
static char i2c[CHARSET+1] = "ACGT";
static int *cuP;
static char *cuB;
static uint16 *cuDP;
__global__ void prepare(int *P, char *B, int nb) {
int i = threadIdx.x;
int *p = P + i*MAXN;
char c = "ACGT"[i];
p[0] = 0;
for (int j = 1; j <= nb; j++)
p[j] = (B[j] == c) ? j : p[j-1];
}
__global__ void run(int nb, uint16 *dpIn, uint16 *dpOut, int *P) {
int j = blockDim.x*blockIdx.x+threadIdx.x+1;
if (j > nb) return ;
int pos = P[j];
uint16 t1 = pos ? dpIn[pos-1]+1 : 0;
uint16 t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
int lcs_len_seq(const char *A, int na, const char *B, int nb, uint16 dpf[]) {
static uint16 dp[2][MAXN];
memset(dp[0], 0, sizeof(uint16)*(nb+1));
dp[1][0] = 0, dpf[0] = 0;
for (int i = 1; i <= na; i++) {
for (int j = 1; j <= nb; j++) {
if (A[i-1] == B[j-1])
dp[1][j] = dp[0][j-1]+1;
else
dp[1][j] = max(dp[1][j-1], dp[0][j]);
}
memcpy(dp[0], dp[1], sizeof(uint16)*(nb+1));
}
for (int i = 0; i <= nb; i++)
dpf[i] = dp[0][i];
return dpf[nb];
}
int lcs_len_omp(const char *A, int na, const char *B, int nb, uint16 dpf[]) {
static int P[CHARSET][MAXN];
static uint16 dp[2][MAXN];
A--, B--;
#pragma omp parallel for
for (int i = 0; i < CHARSET; i++) {
memset(P[i], 0, sizeof(int)*(nb+1));
for (int j = 1; j <= nb; j++)
P[i][j] = (B[j] == i2c[i])? j : P[i][j-1];
}
for (int i = 0; i < 2; i++)
memset(dp[i], 0, sizeof(uint16)*(nb+1));
#pragma omp parallel
for (int i = 1; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
#pragma omp for
for (int j = 1; j <= nb; j++) {
int last_match = Pv[j];
uint16 tmp = last_match ? dp[i&1^1][last_match-1]+1 : 0;
dp[i&1][j] = max(dp[i&1^1][j], tmp);
}
}
for (int i = 0; i <= nb; i++)
dpf[i] = dp[na&1][i];
return dpf[nb];
}
int lcs_len(const char *A, int na, const char *B, int nb, uint16 dpf[]) {
if (max(na, nb) < 2048)
return lcs_len_seq(A, na, B, nb, dpf);
if (nb < 10000)
return lcs_len_omp(A, na, B, nb, dpf);
B--;
cudaMemcpy(cuB, B, sizeof(char)*(nb+1), cudaMemcpyHostToDevice);
cudaMemset(cuDP, 0, sizeof(uint16)*(nb+1));
cudaMemset(cuDP+MAXN, 0, sizeof(uint16)*(nb+1));

int bsz = 1024;
dim3 bb(bsz);
dim3 gg((nb+bsz-1)/bsz);
prepare<<<1, 4>>>(cuP, cuB, nb);
for (int i = 0; i < na; i++) {
int v = c2i[A[i]];
run<<<gg, bb>>>(nb, cuDP+(i&1)*MAXN, cuDP+((i&1)^1)*MAXN, cuP+v*MAXN);
}
cudaMemcpy(dpf, cuDP+(na&1)*MAXN, sizeof(uint16)*(nb+1), cudaMemcpyDeviceToHost);
return dpf[nb];
}
char* alloc_str(int sz) {
return (char *) calloc(sz, sizeof(char));
}
char* substr(const char *s, int pos, int len) {
char *t = alloc_str(len+1);
memcpy(t, s+pos, len);
return t;
}
char* cat(const char *sa, const char *sb) {
int na = strlen(sa), nb = strlen(sb);
char *t = alloc_str(na + nb + 1);
memcpy(t, sa, na);
memcpy(t+na, sb, nb);
return t;
}
char* reverse(const char *s, int len) {
char *t = substr(s, 0, len);
char *l = t, *r = t + len - 1;
char tmp;
while (l < r) {
tmp = *l, *l = *r, *r = tmp;
l++, r--;
}
return t;
}
char* find_lcs(const char *a, int na, const char *b, int nb) {
if (na > nb) {
const char *c; int t;
c = a, a = b, b = c;
t = na, na = nb, nb = t;
}

if (na == 0)
return alloc_str(1);

if (na == 1) {
for (int i = 0; i < nb; i++) {
if (a[0] == b[i])
return substr(a, 0, 1);
}
return alloc_str(1);
}

static uint16 t1[MAXN];
static uint16 t2[MAXN];
int len = lcs_len(a, na, b, nb, t1);
if (len == 0)
return alloc_str(1);
int half_len = na / 2;
char *la = substr(a, 0, half_len);
char *ra = substr(a, half_len, na - half_len);
char *tb = reverse(b, nb);
char *ta = reverse(ra, na - half_len);
lcs_len(la, half_len, b, nb, t1);
lcs_len(ta, na - half_len, tb, nb, t2);

int split = -1;
for (int i = 0; i <= nb; i++) {
if (t1[i] + t2[nb-i] == len) {
split = i;
break;
}
}

char *lb = substr(b, 0, split);
char *rb = substr(b, split, nb - split);
char *sl = find_lcs(la, half_len, lb, split);
char *sr = find_lcs(ra, na - half_len, rb, nb - split);
char *ret = cat(sl, sr);
free(la), free(ra), free(ta);
free(lb), free(rb), free(tb);
free(sl), free(sr);
return ret;
}
int main() {
for (int i = 0; i < CHARSET; i++)
c2i[i2c[i]] = i;
cudaMalloc(&cuB, sizeof(char)*MAXN);
cudaMalloc(&cuP, sizeof(int)*MAXN*4);
cudaMalloc(&cuDP, sizeof(uint16)*MAXN*2);
static uint16 dpf[MAXN];
while (scanf("%s %s", A, B) == 2) {
int na = strlen(A);
int nb = strlen(B);
int len = lcs_len(A, na, B, nb, dpf);
char *str = find_lcs(A, na, B, nb);
printf("%d\n", len);
printf("%s\n", str);
free(str);
}
cudaFree(cuB);
cudaFree(cuP);
cudaFree(cuDP);
return 0;
}
Read More +

批改娘 10112. Longest Common Subsequence (CUDA)

題目描述

給兩個字串 $X, \; Y$,在兩個字串中都有出現且最長的子序列 (subsequence),就是最長共同子字串

輸入格式

有多組測資,每組測資有兩行字串 $X, \; Y$,$X, \; Y$ 只由 A T C G 四個字母構成。

  • $1 \le |X|, |Y| \le 60000$

輸出格式

針對每一組測資,輸出一行 $X, \; Y$ 的最長共同子字串長度。

範例輸入

1
2
3
4
TCA
GTA
TGGAC
TATCT

範例輸出

1
2
2
3

編譯參數

1
$ nvcc -Xcompiler "-O2 -fopenmp" main.cu -o main

Solution

相比於 OpenMP 版本,由於 GPU 的快取設計不如 CPU,在我們使用失敗指針優化 branch 的發生,對 GPU 反而還是慢的,因為要存取 global memory,這使得還不如發生 branch,讓 warp 分多次跑反而比較快,因為有足夠多的 warp 在一個 block。在優化程度上,CPU 和 GPU 還是得分開解決。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <cstdio>
#include <cstdlib>
#define MAXN 60005
#define CHARSET 4
typedef unsigned short uint16;
static char A[MAXN], B[MAXN];
static int c2i[128];
static char i2c[CHARSET+1] = "ACGT";
static int *cuP;
static char *cuB;
static uint16 *cuDP;
__global__ void prepare(int *P, char *B, int nb) {
int i = threadIdx.x;
int *p = P + i*MAXN;
char c = "ACGT"[i];
p[0] = 0;
for (int j = 1; j <= nb; j++)
p[j] = (B[j] == c) ? j : p[j-1];
}
__global__ void run(int nb, uint16 *dpIn, uint16 *dpOut, int *P) {
int j = blockDim.x*blockIdx.x+threadIdx.x+1;
if (j > nb) return ;
int pos = P[j];
uint16 t1 = pos ? dpIn[pos-1]+1 : 0;
uint16 t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
int lcs_len(const char *A, int na, const char *B, int nb, uint16 dpf[]) {
B--;
cudaMemcpy(cuB, B, sizeof(char)*(nb+1), cudaMemcpyHostToDevice);
cudaMemset(cuDP, 0, sizeof(uint16)*(nb+1));
cudaMemset(cuDP+MAXN, 0, sizeof(uint16)*(nb+1));

int bsz = 1024;
dim3 bb(bsz);
dim3 gg((nb+bsz-1)/bsz);
prepare<<<1, 4>>>(cuP, cuB, nb);
for (int i = 0; i < na; i++) {
int v = c2i[A[i]];
run<<<gg, bb>>>(nb, cuDP+(i&1)*MAXN, cuDP+((i&1)^1)*MAXN, cuP+v*MAXN);
}
cudaMemcpy(dpf, cuDP+(na&1)*MAXN, sizeof(uint16)*(nb+1), cudaMemcpyDeviceToHost);
return dpf[nb];
}
int main() {
for (int i = 0; i < CHARSET; i++)
c2i[i2c[i]] = i;
cudaMalloc(&cuB, sizeof(char)*MAXN);
cudaMalloc(&cuP, sizeof(int)*MAXN*4);
cudaMalloc(&cuDP, sizeof(uint16)*MAXN*2);
static uint16 dpf[MAXN];
while (scanf("%s %s", A, B) == 2) {
int len = lcs_len(A, strlen(A), B, strlen(B), dpf);
printf("%d\n", len);
}
cudaFree(cuB);
cudaFree(cuP);
cudaFree(cuDP);
return 0;
}
Read More +

批改娘 10111. Longest Common Subsequence II (OpenMP)

題目描述

給兩個字串 $X, \; Y$,在兩個字串中都有出現且最長的子序列 (subsequence),就是最長共同子字串

輸入格式

有多組測資,每組測資有兩行字串 $X, \; Y$,$X, \; Y$ 只由 A T C G 四個字母構成。

  • $1 \le |X|, |Y| \le 60000$

輸出格式

針對每一組測資,輸出一行 $X, \; Y$ 的最長共同子字串長度以及任何一組最長共同子字串。

範例輸入

1
2
3
4
TCA
GTA
TGGAC
TATCT

範例輸出

1
2
3
4
2
TA
3
TAC

Solution

雖然我們都知道數據小時,由於有 $O(N^2)$ 大小的表可以協助回溯找解,但當 $N$ 非常大時,就無法儲存這張表。因此,可以採用分治算法找到這一張表,也就是所謂的 Hirschberg’s algorithm,相關的 demo 在此。

時間複雜度為 $O(N \log N)$,空間複雜度為 $O(N)$,平行部分可以採用 fine-grain 設計,那麼就會有不斷建立 thread 的 overhead,但更容易處於負載平衡 (workload balance)。另一種則是在遞迴分治下,拆成好幾個 thread 完成各自部分,這樣比較容易觸發 cache 的性質。

從實作中,fine-grain 比 coarse-grain 好上許多。

fine-grain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <omp.h>
#define MAXN 60005
#define MAXSEQ 500
#define CHARSET 4
#define max(x, y) (x) > (y) ? (x) : (y)
static __thread int c2i[128] = {['A'] = 0, ['C'] = 1, ['G'] = 2, ['T'] = 3};
int lcs_len(const char *A, int na, const char *B, int nb, int inv_flag, int dpf[]) {
static int P[CHARSET][MAXN];
int dp[2][MAXN] = {};
A--, B--;
if (!inv_flag) {
#pragma omp parallel for
for (int i = 0; i < CHARSET; i++) {
P[i][0] = nb+1;
for (int j = 1; j <= nb; j++)
P[i][j] = (B[j] == "ACGT"[i]) ? j-1 : P[i][j-1];
}
for (int i = 0; i < 2; i++)
dp[i][nb+1] = -1;
#pragma omp parallel
for (int i = 1; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
int *dpIn = dp[i&1^1];
int *dpOut = dp[i&1];
#pragma omp for
for (int j = 1; j <= nb; j++) {
int t1 = dpIn[Pv[j]]+1;
int t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
}
memcpy(dpf, dp[na&1], sizeof(int)*(nb+1));
dpf[nb+1] = dpf[0] = 0;
return dpf[nb];
}
// inverse version
#pragma omp parallel for
for (int i = 0; i < CHARSET; i++) {
P[i][nb+1] = 0;
for (int j = nb; j >= 1; j--)
P[i][j] = (B[j] == "ACGT"[i]) ? j+1 : P[i][j+1];
}
for (int i = 0; i < 2; i++)
dp[i][0] = -1;
#pragma omp parallel
for (int i = na; i >= 1; i--) {
int *Pv = P[c2i[A[i]]];
int *dpIn = dp[i&1^1];
int *dpOut = dp[i&1];
#pragma omp for
for (int j = 1; j <= nb; j++) {
int t1 = dpIn[Pv[j]]+1;
int t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
}
memcpy(dpf, dp[1&1], sizeof(int)*(nb+1));
dpf[nb+1] = dpf[0] = 0;
return dpf[nb];
}

char* alloc_str(int sz) {
return (char *) calloc(sz, sizeof(char));
}
char* substr(const char *s, int pos, int len) {
char *t = alloc_str(len+1);
memcpy(t, s+pos, len);
return t;
}
char* cat(const char *sa, const char *sb) {
int na = strlen(sa), nb = strlen(sb);
char *t = alloc_str(na + nb + 1);
memcpy(t, sa, na);
memcpy(t+na, sb, nb);
return t;
}
char* find_lcs_seq(const char *A, int na, const char *B, int nb) {
static int P[CHARSET][MAXSEQ];
static char fa[MAXSEQ][MAXSEQ];
int dp[2][MAXSEQ] = {};
A--, B--;
for (int i = 0; i < CHARSET; i++) {
P[i][0] = nb+1;
for (int j = 1; j <= nb; j++)
P[i][j] = (B[j] == "ACGT"[i]) ? j-1 : P[i][j-1];
}
for (int i = 0; i < 2; i++)
dp[i][nb+1] = -1;
for (int i = 1; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
int *dpIn = dp[i&1^1];
int *dpOut = dp[i&1];
for (int j = 1; j <= nb; j++) {
int t1 = dpIn[Pv[j]]+1;
int t2 = dpIn[j];
if (t1 > t2)
dpOut[j] = t1, fa[i][j] = 0;
else
dpOut[j] = t2, fa[i][j] = 1;
}
}
int sz = dp[na&1][nb];
char *ret = alloc_str(sz+1);
for (int i = na, j = nb; sz && i >= 1; i--) {
if (fa[i][j] == 0)
ret[--sz] = A[i], j = P[c2i[A[i]]][j];
}
return ret;
}
char* find_lcs(const char *a, int na, const char *b, int nb) {
if (na > nb) {
const char *c; int t;
c = a, a = b, b = c;
t = na, na = nb, nb = t;
}

if (na == 0)
return alloc_str(1);
if (na < MAXSEQ && nb < MAXSEQ)
return find_lcs_seq(a, na, b, nb);

int t1[MAXN];
int t2[MAXN];
int len = -1;
int half_len = na / 2;
char *la = substr(a, 0, half_len);
char *ra = substr(a, half_len, na - half_len);
lcs_len(la, half_len, b, nb, 0, t1);
lcs_len(ra, na - half_len, b, nb, 1, t2);

int split = -1;
for (int i = 0; i <= nb; i++) {
if (t1[i] + t2[i+1] > len)
split = i, len = t1[i] + t2[i+1];
}
if (len == 0)
return alloc_str(1);
assert(split != -1);
char *lb = substr(b, 0, split);
char *rb = substr(b, split, nb - split);
char *sl = t1[split] ? find_lcs(la, half_len, lb, split) : alloc_str(1);
char *sr = t2[split+1] ? find_lcs(ra, na - half_len, rb, nb - split) : alloc_str(1);
char *ret = cat(sl, sr);
free(la), free(ra);
free(lb), free(rb);
free(sl), free(sr);
return ret;
}
int main() {
static char A[MAXN], B[MAXN];
int dp[MAXN];
while (scanf("%s %s", A, B) == 2) {
int na = strlen(A);
int nb = strlen(B);
char *str = find_lcs(A, na, B, nb);
printf("%d\n", strlen(str));
printf("%s\n", str);
free(str);
}
return 0;
}

coarse-grain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <omp.h>
#define MAXN 60005
#define CHARSET 4
#define max(x, y) (x) > (y) ? (x) : (y)
typedef unsigned short uint16;
int lcs_len_seq(const char *A, int na, const char *B, int nb, int dpf[]) {
uint16 dp[2][MAXN];
memset(dp[0], 0, sizeof(uint16)*(nb+1));
dp[1][0] = 0;
for (int i = 1; i <= na; i++) {
for (int j = 1; j <= nb; j++) {
if (A[i-1] == B[j-1])
dp[1][j] = dp[0][j-1]+1;
else
dp[1][j] = max(dp[1][j-1], dp[0][j]);
}
memcpy(dp[0], dp[1], sizeof(uint16)*(nb+1));
}
for (int i = 0; i <= nb; i++)
dpf[i] = dp[0][i];
return dpf[nb];
}
int lcs_len(const char *A, int na, const char *B, int nb, int dpf[]) {
if (nb < 256)
return lcs_len_seq(A, na, B, nb, dpf);
int c2i[128] = {['A'] = 0, ['C'] = 1, ['G'] = 2, ['T'] = 3};
char i2c[CHARSET] = {'A', 'C', 'G', 'T'};
int P[CHARSET][MAXN];
uint16 dp[2][MAXN];
A--, B--;
for (int i = 0; i < CHARSET; i++) {
P[i][0] = nb+1;
for (int j = 1; j <= nb; j++)
P[i][j] = (B[j] == i2c[i]) ? j-1 : P[i][j-1];
}
for (int i = 0; i < 2; i++) {
memset(dp[i], 0, sizeof(uint16)*(nb+1));
dp[i][nb+1] = -1;
}
for (int i = 1; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
uint16 *dpIn = dp[i&1^1];
uint16 *dpOut = dp[i&1];
for (int j = 1; j <= nb; j++) {
uint16 t1 = dpIn[Pv[j]]+1;
uint16 t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
}
for (int i = 0; i <= nb; i++)
dpf[i] = dp[na&1][i];
return dpf[nb];
}

char* alloc_str(int sz) {
return (char *) calloc(sz, sizeof(char));
}
char* substr(const char *s, int pos, int len) {
char *t = alloc_str(len+1);
memcpy(t, s+pos, len);
return t;
}
char* cat(const char *sa, const char *sb) {
int na = strlen(sa), nb = strlen(sb);
char *t = alloc_str(na + nb + 1);
memcpy(t, sa, na);
memcpy(t+na, sb, nb);
return t;
}
char* reverse(const char *s, int len) {
char *t = substr(s, 0, len);
char *l = t, *r = t + len - 1;
char tmp;
while (l < r) {
tmp = *l, *l = *r, *r = tmp;
l++, r--;
}
return t;
}

char* find_lcs(const char *a, int na, const char *b, int nb, int dep) {
if (na > nb) {
const char *c; int t;
c = a, a = b, b = c;
t = na, na = nb, nb = t;
}

if (na == 0)
return alloc_str(1);

if (na == 1) {
for (int i = 0; i < nb; i++) {
if (a[0] == b[i])
return substr(a, 0, 1);
}
return alloc_str(1);
}

int t1[MAXN];
int t2[MAXN];
int len = -1;
int half_len = na / 2;
char *la = substr(a, 0, half_len);
char *ra = substr(a, half_len, na - half_len);
char *tb = reverse(b, nb);
char *ta = reverse(ra, na - half_len);

#pragma omp task untied shared(t1)
lcs_len(la, half_len, b, nb, t1);
#pragma omp task untied shared(t2)
lcs_len(ta, na - half_len, tb, nb, t2);
#pragma omp taskwait

int split = -1;
for (int i = 0; i <= nb; i++) {
if (t1[i] + t2[nb-i] > len)
split = i, len = t1[i] + t2[nb-i];
}
if (len == 0)
return alloc_str(1);
char *lb = substr(b, 0, split);
char *rb = substr(b, split, nb - split);
char *sl, *sr;

#pragma omp task untied shared(sl)
sl = find_lcs(la, half_len, lb, split, dep+1);
#pragma omp task untied shared(sr)
sr = find_lcs(ra, na - half_len, rb, nb - split, dep+1);
#pragma omp taskwait

char *ret = cat(sl, sr);
free(la), free(ra), free(ta);
free(lb), free(rb), free(tb);
free(sl), free(sr);
return ret;
}
int main() {
static char A[MAXN], B[MAXN];
int dp[MAXN];
while (scanf("%s %s", A, B) == 2) {
int na = strlen(A);
int nb = strlen(B);
char *str;
#pragma omp parallel
{
#pragma omp single
str = find_lcs(A, na, B, nb, 0);
}
printf("%d\n", strlen(str));
printf("%s\n", str);
free(str);
}
return 0;
}
Read More +

批改娘 10110. Longest Common Subsequence (OpenMP)

題目描述

給兩個字串 $X, \; Y$,在兩個字串中都有出現且最長的子序列 (subsequence),就是最長共同子字串

輸入格式

有多組測資,每組測資有兩行字串 $X, \; Y$,$X, \; Y$ 只由 A T C G 四個字母構成。

  • $1 \le |X|, |Y| \le 60000$

輸出格式

針對每一組測資,輸出一行 $X, \; Y$ 的最長共同子字串長度。

範例輸入

1
2
3
4
TCA
GTA
TGGAC
TATCT

範例輸出

1
2
2
3

Solution

由於同學在課堂中提及這個應用,然後實作的效能稍微感到懷疑,再加上這一題的難度跟演算法上相當簡單,難的地方在於如何解決 DP 平行。

一般平行化

從遞迴公式來看,

$$\begin{align*} dp[x][y] = \left\{\begin{matrix} dp[x-1][y-1] + 1 & \text{if } A_x = B_x \\ \max(dp[x-1][y],dp[x][y-1]) & \text{otherwise}\\ \end{matrix}\right. \end{align*}$$

唯一的解決方案就是把紀錄矩陣旋轉 45 度,並且搭配滾動數組,需要保留三排進行完成遞迴轉移。

然而,在旋轉 45 度後,平行度在每一列是呈現波形,逐漸變大後又逐漸變小,這導致快取效果不是很好,當分配在不同的 CPU 上時,他們之間先前代入的資料可能不是這次所需,因為變大變小的原因,分配的區段不與之前重疊,傳到另一個 CPU 上平行的效率非常低落,若是在數個 core 在同一個 CPU 上,就能不掉出 L3 cache,但平行度也因此受到限制。

在我們實驗主機上,一個 CPU 總共有 6 個 core,加速最多 6 倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <cstdio>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <omp.h>
using namespace std;
const int MAXN = 65536;
static char A[MAXN], B[MAXN];
#define DP_TYPE unsigned short
int lcs_len(const char *A, int na, const char *B, int nb, int dpf[]) {
if (na > nb)
swap(A, B), swap(na, nb);
static DP_TYPE dp[3][MAXN<<1];
for (int i = 0; i < 3; i++)
memset(dp[i], 0, sizeof(DP_TYPE)*(nb+na+2));
memset(dpf, 0, sizeof(DP_TYPE)*(nb+1));
omp_set_num_threads(4);
int last_l = 0, last_r = 0;
for (int i = 0; i < na+nb-1; i++) {
int l = max(0, i-na+1), r = min(i, nb-1);
#pragma omp parallel for schedule(static) firstprivate(na, A, B)
for (int j = l; j <= r; j++) {
int x = i-j, y = j;
if (A[x] == B[y])
dp[2][j+1] = dp[0][j]+1;
else
dp[2][j+1] = dp[1][j] > dp[1][j+1] ? dp[1][j] : dp[1][j+1];
}
if (i-l == na-1)
dpf[l+1] = dp[2][l+1];
memcpy(dp[0]+last_l+1, dp[1]+last_l+1, sizeof(DP_TYPE)*(last_r-last_l+1));
memcpy(dp[1]+l+1, dp[2]+l+1, sizeof(DP_TYPE)*(r-l+1));
last_l = l, last_r = r;
}
return dpf[nb];
}
int main() {
int dp[MAXN];
while (scanf("%s %s", A, B) == 2) {
string a = A, b = B;
int len = lcs_len(a.c_str(), strlen(A), b.c_str(), strlen(B), dp);
printf("%d\n", len);
}
return 0;
}

高速平行化

  • 參考論文 An Efficient Parallel Algorithm for Longest Common Subsequence Problem on GPUs

感謝 R04922075 古耕竹同學提供相關資訊,將這一題越推越快。

在論文中,他將原先的 LCS 的遞迴公式改變,多一個額外的輔助數組,其輔助數組的空間大小為 $O(|\alpha|\cdot N)$。仍然 $dp[i][j]$ 為字串 $A$ 前 $i$ 個字元與字串 $B$ 前 $j$ 個字元的最常共同子字串長度。藉由輔助數組 $P[c][j]$ 為上一個字串 $c$ 出現在位置 $j$ 之前的位置在哪。

遞迴公式改為 (這裡我們先忽略邊界條件):

$$dp[x][y] = \max(dp[x-1][y],dp[x-1][P[A[x]][y]-1]+1)$$

從最優子結構分析,分成最後 $A[x]$ 是否與 $B[y]$ 匹配,在不匹配的情況下選擇 $dp[x-1][y]$,匹配的情況下選擇 $dp[x-1][P[A[x]][y]-1]+1$。

下述提及的特殊陣列宣告方式,採用 linux 常用的 -std=c99 標準贊助

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <string.h>
#include <omp.h>
#define MAXN 60005
#define CHARSET 4
typedef unsigned short uint16;
static char A[MAXN], B[MAXN];
int lcs_len(const char *A, int na, const char *B, int nb, int dpf[]) {
static int c2i[128] = {['A'] = 0, ['C'] = 1, ['G'] = 2, ['T'] = 3};
static char i2c[CHARSET] = {'A', 'C', 'G', 'T'};
static int P[CHARSET][MAXN] = {};
static uint16 dp[2][MAXN];
A--, B--;
#pragma omp parallel for
for (int i = 0; i < CHARSET; i++) {
for (int j = 1; j <= nb; j++)
P[i][j] = (B[j] == i2c[i])? j : P[i][j-1];
}
for (int i = 0; i < 2; i++)
memset(dp[i], 0, sizeof(uint16)*(nb+1));
#pragma omp parallel
for (int i = 1; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
uint16 *dpIn = dp[i&1^1];
uint16 *dpOut = dp[i&1];
#pragma omp for
for (int j = 1; j <= nb; j++) {
int last_match = Pv[j];
uint16 t1 = last_match ? dpIn[last_match-1]+1 : 0;
uint16 t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
}
for (int i = 0; i <= nb; i++)
dpf[i] = dp[na&1][i];
return dpf[nb];
}
int main() {
int dp[MAXN];
while (scanf("%s %s", A, B) == 2) {
int len = lcs_len(A, strlen(A), B, strlen(B), dp);
printf("%d\n", len);
}
return 0;
}

最終優化

這部分由高等編譯器課程獨家贊助 Advanced Compiler Support

由於 OpenMP 有很多 shared memory 存取,導致在平行區塊儘管開了 -O2 優化,很多表達式都要重複計算,沒辦法像一般序列化程式,可以讓編譯器理解到要做 Strength reduction 或者是 Common subexpression elimination,因此由程序員自己將 C 寫得接近組合語言,這樣效能才會達到最好。

因此在遞迴公式中,經常存取二維陣列,假設不會重疊或相同的情況,直接用兩個指針維護,意即在 inner loop 常常呼叫 ... += dp[i][j] ... 的話,改成 ... += dpI[j] ...,就可以減少 i * SIZE2D 的 offset 計算。同理找到一些 loop invariant,如 A[x],將變數往前提,就能減少 share memory 存取次數。

那為了處理邊界條件,可能會需要一些 branch 完成,這裡採用前處理和失敗指針的方式避開 branch 發生,這樣效能又有小幅度地上升。若在 intel CPU 上,可以透過 intel Parallel Studio XE 2016 下的 Analyzers 中其中一個 VTune Amplifier 調校看出。

這樣還不是極限,甚至可以加入 static __thread 或者利用 #pragma omp threadprivate(var) 進行 copy optimization 來提高資料局部性 (data local locality),但這會牽涉到 cache size,屬於 machine dependency 的相關優化,加或不加要看機器硬體情況。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <stdio.h>
#include <string.h>
#include <omp.h>
#define MAXN 60005
#define CHARSET 4
typedef unsigned short uint16;
static char A[MAXN], B[MAXN];
int lcs_len(const char *A, int na, const char *B, int nb, int dpf[]) {
static int c2i[128] = {['A'] = 0, ['C'] = 1, ['G'] = 2, ['T'] = 3};
static char i2c[CHARSET] = {'A', 'C', 'G', 'T'};
static int P[CHARSET][MAXN] = {};
static uint16 dp[2][MAXN];
A--, B--;
#pragma omp parallel for
for (int i = 0; i < CHARSET; i++) {
P[i][0] = nb+1;
for (int j = 1; j <= nb; j++)
P[i][j] = (B[j] == i2c[i]) ? j-1 : P[i][j-1];
}
for (int i = 0; i < 2; i++) {
memset(dp[i], 0, sizeof(uint16)*(nb+1));
dp[i][nb+1] = -1;
}
#pragma omp parallel
for (int i = 1; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
uint16 *dpIn = dp[i&1^1];
uint16 *dpOut = dp[i&1];
#pragma omp for
for (int j = 1; j <= nb; j++) {
uint16 t1 = dpIn[Pv[j]]+1;
uint16 t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
}
for (int i = 0; i <= nb; i++)
dpf[i] = dp[na&1][i];
return dpf[nb];
}
int main() {
int dp[MAXN];
while (scanf("%s %s", A, B) == 2) {
int len = lcs_len(A, strlen(A), B, strlen(B), dp);
printf("%d\n", len);
}
return 0;
}
Read More +