批改娘 10087. Sparse Matrix Multiplication (OpenMP)

contents

  1. 1. 題目描述
    1. 1.1. COO of Matrix $A$
    2. 1.2. COO of Matrix $B$
  2. 2. 輸入格式
  3. 3. 輸出格式
  4. 4. 範例輸入
  5. 5. 範例輸出
  6. 6. 範例解釋
  7. 7. Solution
    1. 7.1. 傳統處理
      1. 7.1.1. AOS
      2. 7.1.2. SOA
    2. 7.2. 額外空間
      1. 7.2.1. AOS
      2. 7.2.2. SOA

題目描述

稀疏矩陣為大部份元素皆為零的矩陣,在科學與工程領域中求解線性模型時經常出現大型的稀疏矩陣。現在給予最常見的 Coordinate Format (簡稱 COO 格式),請問兩個矩陣相乘結果為何。

給予矩陣 $A_{n, m}$$B_{m, r}$,請計算稀疏矩陣相乘。

$$A_{4,4} = \begin{bmatrix} 0 & 0 & 0 & 0 \\ 5 & 8 & 0 & 0 \\ 0 & 0 & 3 & 0 \\ 0 & 6 & 0 & 0 \\ \end{bmatrix}, \qquad B_{4,4} = \begin{bmatrix} 0 & 0 & 1 & 3 \\ 0 & 5 & 2 & 0 \\ 3 & 5 & 0 & 0 \\ 0 & 2 & 0 & 0 \\ \end{bmatrix}$$

根據 COO 格式,分別轉換矩陣 $A$ 和 $B$ 的所有非零元素,如下表所示

COO of Matrix $A$

row_index col_index value
1 0 5
1 1 8
2 2 3
3 1 6

COO of Matrix $B$

row_index col_index value
0 2 1
0 3 3
1 1 5
1 2 2
2 0 3
2 1 5
3 1 2

輸入格式

測資只有一組,第一行會有三個整數 $N, \; M, \; R$,分別表示矩陣 $A, \; B$ 的大小。下一行會有兩個整數 $N_A, \; N_B$,接下來會有 $N_A$ 行,每一行表示矩陣 $A$ 的 COO 格式,隨後會有 $N_B$ 行,每一行表示矩陣 $B$ 的 COO 格式。

  • $0 < N, \; M, \; R \le 10^6$
  • $0 < N_A, \; N_B \le 10^6$

給予 COO 格式時,先按照 row_index 由小到大,相同情況由 col_index 由小到大的方式給定,保證不會有重複,每一個元素值 $v$ 介於 $1$ 到 $2^{31}-1$ 之間。

輸出格式

輸出 $C_{n, r} = A_{n, m} \times B_{m, r}$ 的雜湊值。

定義 $\mathit{hash}(C_{n, r}) = \sum\nolimits_{e_{i, j} \in C \text{ and } e_{i, j} \neq 0} \mathit{encrypt}((i+1)*(j+1), e_{i, j})$,實際運作的 流程 可參考以下作法,當然你沒辦法宣告 $N \times M$ 的空間使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}
#define MAXN 1024
uint32_t A[MAXN][MAXN], B[MAXN][MAXN];
int main() {
int x, y, v;
int N, M, R, nA, nB;
scanf("%d %d %d", &N, &M, &R);
scanf("%d %d", &nA, &nB);
for (int i = 0; i < nA; i++)
scanf("%d %d %d", &x, &y, &v), A[x][y] = v;
for (int i = 0; i < nB; i++)
scanf("%d %d %d", &x, &y, &v), B[x][y] = v;
uint32_t hash = 0;
for (int i = 0; i < N; i++) {
for (int j = 0; j < R; j++) {
uint32_t sum = 0;
for (int k = 0; k < M; k++)
sum += A[i][k] * B[k][j];
if (sum)
hash += encrypt((i+1)*(j+1), sum);
}
}
printf("%u\n", hash);
return 0;
}

範例輸入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
4 4 4
4 7
1 0 5
1 1 8
2 2 3
3 1 6
0 2 1
0 3 3
1 1 5
1 2 2
2 0 3
2 1 5
3 1 2

範例輸出

1
13093438

範例解釋

$$A_{n, m} \times B_{m, r} = C_{4,4}=\begin{bmatrix} 0 & 0 & 0 & 0 \\ 0 & 40 & 21 & 15 \\ 9 & 15 & 0 & 0 \\ 0 & 30 & 12 & 0 \\ \end{bmatrix}$$

Solution

  • 傳統處理 Array of Structure: Accepted (14964 ms, 29104 KB)
  • 傳統處理 Structure of Array: Accepted (12537 ms, 45568 KB)
  • 額外空間 Array of Structure: Accepted (5711 ms, 29152 KB)
  • 額外空間 Structure of Array: Accepted (5202 ms, 29056 KB)

Array of Structure 簡稱 AOS,而 Structure of Array 簡稱 SOA,這兩種寫法的差異在於運用 structure 時的寫法,根據 cache line 的設計,有時候一個 struct 下會有很多變數,然而我們只需要使用部分的變數,這時候沒有使用的變數仍會跟著一起進入 cache,很容易造成 cache 效率偏低,因此回歸原始寫出 SOA 的版本。

傳統處理

如果是兩個稀疏方陣,用 COO 格式需要先對後面那個矩陣轉置,接著在三欄 $(i, j, v)$ 的表格下,按照字典順序排序。此時會有兩個三欄 $A(i, j, v), \; B(i, j, v)$ (別忘記 $B$ 矩陣已經轉置過),要計算 $A \times B = C$,為得到 $C_{i, j}$$C_{i, j} = \sum A(i, k, v) B(j, k, v)$,由於已經排序好,那麼找到共同的 $k$ 不是難事,按照合併排序的手法,能在 $\mathcal{O}(N)$ 完成。

因此在最稠密的矩陣下,仍需要 $\mathcal{O}(N^3)$ 才能完成。

AOS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <algorithm>
using namespace std;
#define MAXN 1048576
#define MAXL (MAXN<<2)
static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}
typedef struct ELE {
int i, j;
uint32_t v;
} ELE;
ELE LA[MAXL], LB[MAXL];
void SpMV(int N, int M, int R, ELE LA[], int na, ELE LB[], int nb) {
int ma = 0, mb = 0;
static int aoff[MAXN], boff[MAXN];
for (int i = 0, p = -1; i < na; i++) {
if (LA[i].i != p)
aoff[ma++] = i, p = LA[i].i;
}
for (int i = 0, p = -1; i < nb; i++) {
if (LB[i].i != p)
boff[mb++] = i, p = LB[i].i;
}
uint32_t hash = 0;
aoff[ma] = na, boff[mb] = nb;
#pragma omp parallel for reduction(+: hash)
for (int i = 0; i < ma; i++) {
for (int j = 0; j < mb; j++) {
int idx1 = aoff[i], idx2 = boff[j];
int top1 = aoff[i+1], top2 = boff[j+1];
uint32_t sum = 0;
int r = LA[idx1].i, c = LB[idx2].i;
while (idx1 < top1 && idx2 < top2) {
if (LA[idx1].j == LB[idx2].j)
sum += LA[idx1].v * LB[idx2].v, idx1++, idx2++;
else if (LA[idx1].j < LB[idx2].j)
idx1++;
else
idx2++;
}
if (sum)
hash += encrypt((r+1)*(c+1), sum);
}
}
printf("%u\n", hash);
}
bool cmp(const ELE &a, const ELE &b) {
if (a.i != b.i)
return a.i < b.i;
return a.j < b.j;
}
int main() {
int N, M, R, nA, nB;
while (scanf("%d %d %d", &N, &M, &R) == 3) {
scanf("%d %d", &nA, &nB);
for (int i = 0; i < nA; i++)
scanf("%d %d %d", &LA[i].i, &LA[i].j, &LA[i].v);
for (int i = 0; i < nB; i++)
scanf("%d %d %d", &LB[i].j, &LB[i].i, &LB[i].v);
sort(LB, LB+nB, cmp);
SpMV(N, M, R, LA, nA, LB, nB);
}
return 0;
}

SOA

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <algorithm>
using namespace std;
#define MAXN 1048576
#define MAXL (MAXN<<2)
static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}
typedef struct ELE {
int i[MAXL], j[MAXL];
uint32_t v[MAXL];
} ELE;
ELE LA, LB, LT;
int D[MAXL];
void SpMV(int N, int M, int R, ELE &LA, int na, ELE &LB, int nb) {
int ma = 0, mb = 0;
static int aoff[MAXN], boff[MAXN];
for (int i = 0, p = -1; i < na; i++) {
if (LA.i[i] != p)
aoff[ma++] = i, p = LA.i[i];
}
for (int i = 0, p = -1; i < nb; i++) {
if (LB.i[i] != p)
boff[mb++] = i, p = LB.i[i];
}
uint32_t hash = 0;
aoff[ma] = na, boff[mb] = nb;
#pragma omp parallel for reduction(+: hash)
for (int i = 0; i < ma; i++) {
for (int j = 0; j < mb; j++) {
int idx1 = aoff[i], idx2 = boff[j];
int top1 = aoff[i+1], top2 = boff[j+1];
uint32_t sum = 0;
int r = LA.i[idx1], c = LB.i[idx2];
while (idx1 < top1 && idx2 < top2) {
if (LA.j[idx1] < LB.j[idx2])
idx1++;
else if (LA.j[idx1] > LB.j[idx2])
idx2++;
else
sum += LA.v[idx1] * LB.v[idx2], idx1++, idx2++;
}
if (sum)
hash += encrypt((r+1)*(c+1), sum);
}
}
printf("%u\n", hash);
}
bool cmp(int a, int b) {
if (LT.i[a] != LT.i[b])
return LT.i[a] < LT.i[b];
return LT.j[a] < LT.j[b];
}
int main() {
int N, M, R, nA, nB;
while (scanf("%d %d %d", &N, &M, &R) == 3) {
scanf("%d %d", &nA, &nB);
for (int i = 0; i < nA; i++)
scanf("%d %d %d", &LA.i[i], &LA.j[i], &LA.v[i]);
for (int i = 0; i < nB; i++)
scanf("%d %d %d", <.j[i], <.i[i], <.v[i]);
for (int i = 0; i < nB; i++)
D[i] = i;
sort(D, D+nB, cmp);
for (int i = 0; i < nB; i++)
LB.i[i] = LT.i[D[i]], LB.j[i] = LT.j[D[i]], LB.v[i] = LT.v[D[i]];
SpMV(N, M, R, LA, nA, LB, nB);
}
return 0;
}

額外空間

若採用額外空間處理,一開始就不用對 $B$ 轉置,$C_{i, j} = \sum A(i, k, v) B(k, j, v)$。每一次解決 $C$ 的第 $i$ 列上的所有元素值,此時 $A(i, k, v)$ 按照 $k$ 遞增排列,$B(k, j, v)$ 要配對 $A(i, k, v)$,會恰好掃描完整的 $B$,累加元素值會不按照順序累加到 $C_{i, j}$

因此在最稠密的矩陣下,仍需要 $\mathcal{O}(N^3)$ 才能完成。

AOS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}
#define MAXN 1048576
#define MAXL 1048576
typedef struct ELE {
int i, j;
uint32_t v;
} ELE;
ELE LA[MAXL], LB[MAXL];
void SpMV(int N, int M, int R, ELE LA[], int na, ELE LB[], int nb) {
int ma = 0;
static int aoff[MAXN];
for (int i = 0, p = -1; i < na; i++) {
if (LA[i].i != p)
aoff[ma++] = i, p = LA[i].i;
}
aoff[ma] = na;
uint32_t hash = 0;
int chunk = 8;
#pragma omp parallel for schedule(dynamic, chunk) reduction(+: hash)
for (int i = 0; i < ma; i++) {
int idx1 = aoff[i], top1 = aoff[i+1];
int r = LA[idx1].i;
uint32_t *C = (uint32_t *) calloc(R, sizeof(uint32_t));
for (int idx2 = 0; idx2 < nb && idx1 < top1; ) {
if (LA[idx1].j == LB[idx2].i) {
C[LB[idx2].j] += LA[idx1].v * LB[idx2].v;
idx2++;
}
if (idx2 < nb && LA[idx1].j != LB[idx2].i) {
if (LA[idx1].j < LB[idx2].i)
idx1++;
else
idx2++;
}
}
for (int j = 0; j < R; j++) {
if (C[j])
hash += encrypt((r+1)*(j+1), C[j]);
}
free(C);
}
printf("%u\n", hash);
}
int main() {
uint32_t N, M, R, nA, nB;
while (scanf("%d %d %d", &N, &M, &R) == 3) {
scanf("%d %d", &nA, &nB);
for (int i = 0; i < nA; i++)
scanf("%d %d %d", &LA[i].i, &LA[i].j, &LA[i].v);
for (int i = 0; i < nB; i++)
scanf("%d %d %d", &LB[i].i, &LB[i].j, &LB[i].v);
SpMV(N, M, R, LA, nA, LB, nB);
}
return 0;
}

SOA

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}
#define MAXN 1048576
#define MAXL 1048576
typedef struct ELE {
int i[MAXL], j[MAXL];
uint32_t v[MAXL];
} ELE;
ELE LA, LB;
void SpMV(int N, int M, int R, ELE &LA, int na, ELE &LB, int nb) {
int ma = 0;
static int aoff[MAXN];
for (int i = 0, p = -1; i < na; i++) {
if (LA.i[i] != p)
aoff[ma++] = i, p = LA.i[i];
}
aoff[ma] = na;
uint32_t hash = 0;
int chunk = 8;
#pragma omp parallel for schedule(dynamic, chunk) reduction(+: hash)
for (int i = 0; i < ma; i++) {
int idx1 = aoff[i], top1 = aoff[i+1];
int r = LA.i[idx1];
uint32_t *C = (uint32_t *) calloc(R, sizeof(uint32_t));
for (int idx2 = 0; idx2 < nb && idx1 < top1; ) {
if (LA.j[idx1] == LB.i[idx2]) {
C[LB.j[idx2]] += LA.v[idx1] * LB.v[idx2];
idx2++;
}
if (idx2 < nb && LA.j[idx1] != LB.i[idx2]) {
if (LA.j[idx1] < LB.i[idx2])
idx1++;
else
idx2++;
}
}
for (int j = 0; j < R; j++) {
if (C[j])
hash += encrypt((r+1)*(j+1), C[j]);
}
free(C);
}
printf("%u\n", hash);
}
int main() {
uint32_t N, M, R, nA, nB;
while (scanf("%d %d %d", &N, &M, &R) == 3) {
scanf("%d %d", &nA, &nB);
for (int i = 0; i < nA; i++)
scanf("%d %d %d", &LA.i[i], &LA.j[i], &LA.v[i]);
for (int i = 0; i < nB; i++)
scanf("%d %d %d", &LB.i[i], &LB.j[i], &LB.v[i]);
SpMV(N, M, R, LA, nA, LB, nB);
}
return 0;
}