題目描述 給兩個字串 $X, \; Y$,在兩個字串中都有出現且最長的子序列 (subsequence),就是最長共同子字串 。
輸入格式 有多組測資,每組測資有兩行字串 $X, \; Y$,$X, \; Y$ 只由 A
T
C
G
四個字母構成。
$1 \le |X|, |Y| \le 60000$
輸出格式 針對每一組測資,輸出一行 $X, \; Y$ 的最長共同子字串長度。
範例輸入
範例輸出
Solution 由於同學在課堂中提及這個應用,然後實作的效能稍微感到懷疑,再加上這一題的難度跟演算法上相當簡單,難的地方在於如何解決 DP 平行。
一般平行化 從遞迴公式來看,
$$\begin{align*}
dp[x][y] = \left\{\begin{matrix}
	dp[x-1][y-1] + 1 & \text{if } A_x = B_x \\
	\max(dp[x-1][y],dp[x][y-1]) & \text{otherwise}\\
\end{matrix}\right.
\end{align*}$$
唯一的解決方案就是把紀錄矩陣旋轉 45 度,並且搭配滾動數組,需要保留三排進行完成遞迴轉移。
然而,在旋轉 45 度後,平行度在每一列是呈現波形,逐漸變大後又逐漸變小,這導致快取效果不是很好,當分配在不同的 CPU 上時,他們之間先前代入的資料可能不是這次所需,因為變大變小的原因,分配的區段不與之前重疊,傳到另一個 CPU 上平行的效率非常低落,若是在數個 core 在同一個 CPU 上,就能不掉出 L3 cache,但平行度也因此受到限制。
在我們實驗主機上,一個 CPU 總共有 6 個 core,加速最多 6 倍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <cstdio>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <omp.h>
using namespace std ;
const int MAXN = 65536 ;
static char A[MAXN], B[MAXN];
#define DP_TYPE unsigned short
int lcs_len (const char *A, int na, const char *B, int nb, int dpf[]) {
if (na > nb)
swap(A, B), swap(na, nb);
static DP_TYPE dp[3 ][MAXN<<1 ];
for (int i = 0 ; i < 3 ; i++)
memset (dp[i], 0 , sizeof (DP_TYPE)*(nb+na+2 ));
memset (dpf, 0 , sizeof (DP_TYPE)*(nb+1 ));
omp_set_num_threads(4 );
int last_l = 0 , last_r = 0 ;
for (int i = 0 ; i < na+nb-1 ; i++) {
int l = max(0 , i-na+1 ), r = min(i, nb-1 );
#pragma omp parallel for schedule(static) firstprivate(na, A, B)
for (int j = l; j <= r; j++) {
int x = i-j, y = j;
if (A[x] == B[y])
dp[2 ][j+1 ] = dp[0 ][j]+1 ;
else
dp[2 ][j+1 ] = dp[1 ][j] > dp[1 ][j+1 ] ? dp[1 ][j] : dp[1 ][j+1 ];
}
if (i-l == na-1 )
dpf[l+1 ] = dp[2 ][l+1 ];
memcpy (dp[0 ]+last_l+1 , dp[1 ]+last_l+1 , sizeof (DP_TYPE)*(last_r-last_l+1 ));
memcpy (dp[1 ]+l+1 , dp[2 ]+l+1 , sizeof (DP_TYPE)*(r-l+1 ));
last_l = l, last_r = r;
}
return dpf[nb];
}
int main () {
int dp[MAXN];
while (scanf ("%s %s" , A, B) == 2 ) {
string a = A, b = B;
int len = lcs_len(a.c_str(), strlen (A), b.c_str(), strlen (B), dp);
printf ("%d\n" , len);
}
return 0 ;
}
高速平行化
參考論文 An Efficient Parallel Algorithm for Longest Common Subsequence Problem on GPUs
感謝 R04922075 古耕竹同學提供相關資訊,將這一題越推越快。
在論文中,他將原先的 LCS 的遞迴公式改變,多一個額外的輔助數組,其輔助數組的空間大小為 $O(|\alpha|\cdot N)$ 。仍然 $dp[i][j]$ 為字串 $A$ 前 $i$ 個字元與字串 $B$ 前 $j$ 個字元的最常共同子字串長度。藉由輔助數組 $P[c][j]$ 為上一個字串 $c$ 出現在位置 $j$ 之前的位置在哪。
遞迴公式改為 (這裡我們先忽略邊界條件):
$$dp[x][y] =
	\max(dp[x-1][y],dp[x-1][P[A[x]][y]-1]+1)$$
從最優子結構分析,分成最後 $A[x]$ 是否與 $B[y]$ 匹配,在不匹配的情況下選擇 $dp[x-1][y]$,匹配的情況下選擇 $dp[x-1][P[A[x]][y]-1]+1$。
下述提及的特殊陣列宣告方式,採用 linux 常用的 -std=c99 標準贊助
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <string.h>
#include <omp.h>
#define MAXN 60005
#define CHARSET 4
typedef unsigned short uint16;
static char A[MAXN], B[MAXN];
int lcs_len (const char *A, int na, const char *B, int nb, int dpf[]) {
static int c2i[128 ] = {['A' ] = 0 , ['C' ] = 1 , ['G' ] = 2 , ['T' ] = 3 };
static char i2c[CHARSET] = {'A' , 'C' , 'G' , 'T' };
static int P[CHARSET][MAXN] = {};
static uint16 dp[2 ][MAXN];
A--, B--;
#pragma omp parallel for
for (int i = 0 ; i < CHARSET; i++) {
for (int j = 1 ; j <= nb; j++)
P[i][j] = (B[j] == i2c[i])? j : P[i][j-1 ];
}
for (int i = 0 ; i < 2 ; i++)
memset (dp[i], 0 , sizeof (uint16)*(nb+1 ));
#pragma omp parallel
for (int i = 1 ; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
uint16 *dpIn = dp[i&1 ^1 ];
uint16 *dpOut = dp[i&1 ];
#pragma omp for
for (int j = 1 ; j <= nb; j++) {
int last_match = Pv[j];
uint16 t1 = last_match ? dpIn[last_match-1 ]+1 : 0 ;
uint16 t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
}
for (int i = 0 ; i <= nb; i++)
dpf[i] = dp[na&1 ][i];
return dpf[nb];
}
int main () {
int dp[MAXN];
while (scanf ("%s %s" , A, B) == 2 ) {
int len = lcs_len(A, strlen (A), B, strlen (B), dp);
printf ("%d\n" , len);
}
return 0 ;
}
最終優化
這部分由高等編譯器課程獨家贊助 Advanced Compiler Support
由於 OpenMP 有很多 shared memory 存取,導致在平行區塊儘管開了 -O2
優化,很多表達式都要重複計算,沒辦法像一般序列化程式,可以讓編譯器理解到要做 Strength reduction 或者是 Common subexpression elimination,因此由程序員自己將 C 寫得接近組合語言,這樣效能才會達到最好。
因此在遞迴公式中,經常存取二維陣列,假設不會重疊或相同的情況,直接用兩個指針維護,意即在 inner loop 常常呼叫 ... += dp[i][j] ...
的話,改成 ... += dpI[j] ...
,就可以減少 i * SIZE2D
的 offset 計算。同理找到一些 loop invariant,如 A[x]
,將變數往前提,就能減少 share memory 存取次數。
那為了處理邊界條件,可能會需要一些 branch 完成,這裡採用前處理和失敗指針的方式避開 branch 發生,這樣效能又有小幅度地上升。若在 intel CPU 上,可以透過 intel Parallel Studio XE 2016 下的 Analyzers 中其中一個 VTune Amplifier 調校看出。
這樣還不是極限,甚至可以加入 static __thread
或者利用 #pragma omp threadprivate(var)
進行 copy optimization 來提高資料局部性 (data local locality),但這會牽涉到 cache size,屬於 machine dependency 的相關優化,加或不加要看機器硬體情況。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <stdio.h>
#include <string.h>
#include <omp.h>
#define MAXN 60005
#define CHARSET 4
typedef unsigned short uint16;
static char A[MAXN], B[MAXN];
int lcs_len (const char *A, int na, const char *B, int nb, int dpf[]) {
static int c2i[128 ] = {['A' ] = 0 , ['C' ] = 1 , ['G' ] = 2 , ['T' ] = 3 };
static char i2c[CHARSET] = {'A' , 'C' , 'G' , 'T' };
static int P[CHARSET][MAXN] = {};
static uint16 dp[2 ][MAXN];
A--, B--;
#pragma omp parallel for
for (int i = 0 ; i < CHARSET; i++) {
P[i][0 ] = nb+1 ;
for (int j = 1 ; j <= nb; j++)
P[i][j] = (B[j] == i2c[i]) ? j-1 : P[i][j-1 ];
}
for (int i = 0 ; i < 2 ; i++) {
memset (dp[i], 0 , sizeof (uint16)*(nb+1 ));
dp[i][nb+1 ] = -1 ;
}
#pragma omp parallel
for (int i = 1 ; i <= na; i++) {
int *Pv = P[c2i[A[i]]];
uint16 *dpIn = dp[i&1 ^1 ];
uint16 *dpOut = dp[i&1 ];
#pragma omp for
for (int j = 1 ; j <= nb; j++) {
uint16 t1 = dpIn[Pv[j]]+1 ;
uint16 t2 = dpIn[j];
dpOut[j] = t1 > t2 ? t1 : t2;
}
}
for (int i = 0 ; i <= nb; i++)
dpf[i] = dp[na&1 ][i];
return dpf[nb];
}
int main () {
int dp[MAXN];
while (scanf ("%s %s" , A, B) == 2 ) {
int len = lcs_len(A, strlen (A), B, strlen (B), dp);
printf ("%d\n" , len);
}
return 0 ;
}