批改娘 10080. Fast Matrix Multiplication (pthread)

題目描述

相信不少人都已經實作所謂的矩陣乘法,計算兩個方陣大小為 $N \times N$ 的矩陣 $A, B$。為了方便起見,提供一個偽隨機數的生成,減少在輸入處理浪費的時間,同時也減少上傳測資的辛苦。

根據種子 $c = S1$ 生成矩陣 $A$,種子 $c = S2$ 生成矩陣 $B$,計算矩陣相乘 $A \times B$,為了方便起見,使用 hash 函數進行簽章,最後輸出一個值。由於會牽涉到 overflow 問題,此題作為快取實驗就不考慮這個,overflow 問題都會相同。

matrix.h

1
void multiply(int N, unsigned long A[][2048], unsigned long B[][2048], unsigned long C[][2048]);

matrix.c

1
2
3
4
5
6
7
8
9
10
11
12
#include "matrix.h"
void multiply(int N, unsigned long A[][2048], unsigned long B[][2048], unsigned long C[][2048]) {
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
unsigned long sum = 0; // overflow, let it go.
for (int k = 0; k < N; k++)
sum += A[i][k] * B[k][j];
C[i][j] = sum;
}
}
}

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include "matrix.h"
// #define DEBUG
#define UINT unsigned long
#define MAXN 2048
void rand_gen(UINT c, int N, UINT A[][MAXN]) {
UINT x = 2, n = N*N;
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
x = (x * x + c + i + j)%n;
A[i][j] = x;
}
}
}
void print_matrix(int N, UINT A[][MAXN]) {
for (int i = 0; i < N; i++) {
fprintf(stderr, "[");
for (int j = 0; j < N; j++)
fprintf(stderr, " %u", A[i][j]);
fprintf(stderr, " ]\n");
}
}
UINT hash(UINT x) {
return (x * 2654435761LU);
}
UINT signature(int N, UINT A[][MAXN]) {
UINT h = 0;
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++)
h = hash(h + A[i][j]);
}
return h;
}
static UINT A[MAXN][MAXN], B[MAXN][MAXN], C[MAXN][MAXN];
int main() {
int N, S1, S2;
while (scanf("%d %d %d", &N, &S1, &S2) == 3) {
rand_gen(S1, N, A);
rand_gen(S2, N, B);
multiply(N, A, B, C);
#ifdef DEBUG
print_matrix(N, A);
print_matrix(N, B);
print_matrix(N, C);
#endif
printf("%u\n", signature(N, C));
}
return 0;
}

輸入格式

多組測資,每組測資一行三個整數 $N, S1, S2$

  • $1 \le N \le 1000$
  • $0 \le S1, S2 \le 65536$

輸出格式

每組測資輸出一行,一個整數 signature 的結果。

範例輸入

1
2
2 2 5
2 2 5

範例輸出

1
2
573770929
573770929

解釋

範例輸入產生 $2 \times 2$ 的矩陣。

$$A = \begin{bmatrix} 2 & 3\\ 0 & 0 \end{bmatrix} , B = \begin{bmatrix} 1 & 3\\ 3 & 0 \end{bmatrix} , A \times B = \begin{bmatrix} 11 & 6\\ 0 & 0 \end{bmatrix}$$

Makefile

1
2
matrix.out: matrix.h matrix.c main.c
gcc -std=c99 -O2 matrix.h matrix.c main.c -o matrix -lpthread -lm

Solution

針對矩陣乘法平行很簡單,在 CPU 上由於需要搬運資料,不一定開的 thread 數量接近 core 數量就一定最好。而在平行的部分要把握資料局部性的加速,以下實作根據最終答案進行 row-based 策略切割,每一個 thread 計算那一個 row 的所有答案。

matrix.h

1
2
3
#define UINT unsigned long
void multiply(int N, unsigned long A[][2048], unsigned long B[][2048], unsigned long C[][2048]);

matrix.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include "matrix.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define LOOP_UNROLL 8
#define MAXTHREAD 8
#define MAXN 2048
typedef struct Argu {
int l, r, N;
UINT *A, *B, *C;
} Argu;
static void transpose(int N, UINT A[][2048]) {
UINT t;
for (int i = 0; i < N; i++) {
for (int j = i+1; j < N; j++)
t = A[i][j], A[i][j] = A[j][i], A[j][i] = t;
}
}
void* thread_multiply(void *arg) {
Argu data = * ((Argu *) arg);
int l = data.l, r = data.r;
int N = data.N;
UINT *A = data.A, *B = data.B, *C = data.C;
UINT stkA[MAXN];
for (int i = l; i <= r; i++) {
memcpy(stkA, A + (i * MAXN), sizeof(UINT) * N);
for (int j = 0; j < N; j++) {
UINT sum = 0;
// UINT *a = A + (i * MAXN), *b = B + (j * MAXN);
UINT *a = stkA, *b = B + (j * MAXN);
int k;
for (k = 0; k+LOOP_UNROLL < N; k += LOOP_UNROLL) {
sum += *a * *b, a++, b++;
sum += *a * *b, a++, b++;
sum += *a * *b, a++, b++;
sum += *a * *b, a++, b++;
sum += *a * *b, a++, b++;
sum += *a * *b, a++, b++;
sum += *a * *b, a++, b++;
sum += *a * *b, a++, b++;
}
for (; k < N; k++)
sum += *a * *b, a++, b++;
*(C + (i * MAXN + j)) = sum;
}
}
return NULL;
}
void multiply(int N, UINT A[][2048], UINT B[][2048], UINT C[][2048]) {
transpose(N, B);
pthread_t *threads;
threads = (pthread_t *) malloc(MAXTHREAD * sizeof(pthread_t));
for (int i = 0; i < MAXTHREAD; i++) {
Argu *data = (Argu *) malloc(sizeof(Argu));
data->l = i * N / MAXTHREAD;
data->r = (i+1) * N / MAXTHREAD - 1;
data->N = N;
data->A = &A[0][0], data->B = &B[0][0], data->C = &C[0][0];
pthread_create(&threads[i], NULL, thread_multiply, (void *) (data));
}
for (int i = 0; i < MAXTHREAD; i++)
pthread_join(threads[i], NULL);
return ;
}
Read More +

批改娘 10086. Red/Blue Computation

題目描述

模擬工作流程,在一個 $N \times N$ 的網格 (左邊界可以通到右邊界,上邊界可以通到下邊界) 上有三種狀態紅 $R$, 藍 $B$, 空格 $W$,每次模擬分成兩個步驟

  • 第一步,只有紅 $R$ 可移動,紅 $R$ 只能往右移動一格到空白 $W$ 的位置,否則仍在原處。
  • 第二步,只有藍 $B$ 可移動,藍 $B$ 只能往下移動一格到空白 $W$ 的位置,否則仍在原處。

請問模擬 $M$ 次後盤面為何?

輸入格式

輸入只有一組測資,第一行有兩個整數 $N, \; M$,分別為盤面大小以及模擬次數。接下來會有 $N$ 行,每一行上會有 $N$ 個字元。

  • $1 \le N, M \le 1000$

輸出格式

輸出 $N \times N$ 盤面。

範例輸入

1
2
3
4
5
4 1
RRWR
WWBW
BWRW
WWWW

範例輸出

1
2
3
4
RWRR
WWWW
WWBR
BWWW

備註






Solution

模擬工作流程,平行計算下一個狀態,利用兩個滾動數組的方式交替使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define MAXN 1024
#define C_WHITE 0
#define C_RED 1
#define C_BLUE 2
char g1[MAXN][MAXN], g2[MAXN][MAXN];
void moveRed(int n) {
#pragma omp parallel for schedule(static)
for (int i = 0; i < n; i++)
memset(g2[i], C_WHITE, sizeof(char) * n);
#pragma omp parallel for schedule(static)
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++) {
if (g1[i][j] == C_BLUE) {
g2[i][j] = C_BLUE;
} else if (g1[i][j] == C_RED) {
int next = j+1 == n ? 0 : j+1;
if (g1[i][next] == C_WHITE)
g2[i][next] = C_RED;
else
g2[i][j] = C_RED;
}
}
}
}
void moveBlue(int n) {
#pragma omp parallel for schedule(static)
for (int i = 0; i < n; i++)
memset(g1[i], C_WHITE, sizeof(char) * n);
#pragma omp parallel for schedule(static)
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++) {
if (g2[i][j] == C_RED) {
g1[i][j] = C_RED;
} else if (g2[i][j] == C_BLUE) {
int next = i+1 == n ? 0 : i+1;
if (g2[next][j] == C_WHITE)
g1[next][j] = C_BLUE;
else
g1[i][j] = C_BLUE;
}
}
}
}
static inline int toIndex(char c) {
if (c == 'W') return C_WHITE;
if (c == 'R') return C_RED;
if (c == 'B') return C_BLUE;
fprintf(stderr, "[WARNING] %s %d\n", __FILE__, __LINE__);
}
static void printBoard(int n) {
char color[4] = "WRB";
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++)
putchar(color[g1[i][j]]);
putchar('\n');
}
}
int main() {
int n, m;
while (scanf("%d %d", &n, &m) == 2) {
for (int i = 0; i < n; i++) {
scanf("%s", &g1[i]);
for (int j = 0; j < n; j++)
g1[i][j] = toIndex(g1[i][j]);
}
for (int it = 0; it < m; it++) {
moveRed(n);
moveBlue(n);
}
printBoard(n);
}
return 0;
}
/*
4 1
RRWR
WWBW
BWRW
WWWW
*/
Read More +

數據分割 區間分塊小技巧

前言

運行平行程式之前,要將資料切成很多塊,每一塊之間獨立。從邏輯上看來有 row-based, column-based, matrix-based, tree-based 等平行方法。在許多方法中,最容易入手的就是 row-based。

假設有 $n$ 個元素,編號介於 $[0, n-1]$ 之間,要分成 $m$ 組。每一組得編號都要連續,以增加資料局部性 (data locality) 並且每一塊大小盡可能一樣,請問要怎麼分割才好?這樣的數學問題要做出來並不是件難事,但強迫症的人來說,在程式中零碎的判斷相當折騰。

數學分析

直觀

若有 $n$ 個元素分成 $m$ 組,每一個組至少有 $\lfloor n / m \rfloor$ 個元素,其中 $n \mod m$ 組會額外多一個元素。因此,對於第 $i$ 組滿足 $i < n \mod m$ 都會多一個元素。

1
2
3
4
5
6
7
8
9
10
void method0(int n, int m) {
printf("In %s\n", __func__);
int bsz = n/m, rm = n%m, sz;
for (int i = 0, L = 0, R; i < m; i++) {
sz = bsz + (i < rm);
R = L + sz - 1;
printf("%d len([%d, %d]) = %d\n", i, L, R, (R - L + 1));
L = R + 1;
}
}

程序流

這方式屬於懶惰程序員,根據整數的性質會得到刻度,在計算第 $i$ 個刻度採用 $\lfloor \frac{in}{m} \rfloor$,自然而然地可以與下一個刻度形成一個區間。不幸地,若 $m > n$ 時,有些刻度會重疊,導致重複計算的情況發生。

1
2
3
4
5
6
7
void method1(int n, int m) {
printf("In %s\n", __func__);
for (int i = 0, L, R; i < m; i++) {
L = i*n/m, R = (i+1)*n/m - 1;
printf("%d len([%d, %d]) = %d\n", i, L, R, (R - L + 1));
}
}

ceil

從《具體數學》第三章節的介紹,得到幾個簡單的數學公式

$$\begin{align} n = \left \lceil \frac{n}{m} \right \rceil + \left \lceil \frac{n-1}{m} \right \rceil + \cdots + \left \lceil \frac{n-m-1}{m} \right \rceil \end{align}$$

$i$ 團 ($0 \le i < m$),要處理 $\left \lceil \frac{n-i}{m} \right \rceil$ 個元素。

1
2
3
4
5
6
7
8
9
void method2(int n, int m) {
printf("In %s\n", __func__);
for (int i = 0, L = 0, R, sz; i < m; i++) {
sz = (n + m-i-1) / m;
R = L + sz-1;
printf("%d len([%d, %d]) = %d\n", i, L, R, sz);
L = R + 1;
}
}

floor

又或者使用 floor 表示法

$$\begin{align} n = \left \lfloor \frac{n}{m} \right \rfloor + \left \lfloor \frac{n+1}{m} \right \rfloor + \cdots + \left \lfloor \frac{n+m-1}{m} \right \rfloor \end{align}$$

$i$ 團 ($0 \le i < m$),要處理 $\left \lceil \frac{n+i}{m} \right \rceil$ 個元素。

1
2
3
4
5
6
7
8
9
void method3(int n, int m) {
printf("In %s\n", __func__);
for (int i = 0, L = 0, R, sz; i < m; i++) {
sz = (n + i) / m;
R = L + sz-1;
printf("%d len([%d, %d]) = %d\n", i, L, R, sz);
L = R + 1;
}
}

補充流

不管怎樣,前 $i$ 就拿取 $\lceil \frac{n}{m} \rceil$,最後一組拿少一點。

1
2
3
4
5
6
7
8
9
void method4(int n, int m) {
printf("In %s\n", __func__);
int bsz = (n + m-1)/m, sz;
for (int i = 0, L = 0, R; i < m; i++) {
L = i*bsz, R = L + bsz - 1;
if (R >= n) R = n-1;
printf("%d len([%d, %d]) = %d\n", i, L, R, (R - L + 1));
}
}

測試

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <bits/stdc++.h>
using namespace std;
void method0(int n, int m) {
printf("In %s\n", __func__);
int bsz = n/m, rm = n%m, sz;
for (int i = 0, L = 0, R; i < m; i++) {
sz = bsz + (i < rm);
R = L + sz - 1;
printf("%d len([%d, %d]) = %d\n", i, L, R, (R - L + 1));
L = R + 1;
}
}
void method1(int n, int m) {
printf("In %s\n", __func__);
for (int i = 0, L, R; i < m; i++) {
L = i*n/m, R = (i+1)*n/m - 1;
printf("%d len([%d, %d]) = %d\n", i, L, R, (R - L + 1));
}
}
void method2(int n, int m) {
printf("In %s\n", __func__);
for (int i = 0, L = 0, R, sz; i < m; i++) {
sz = (n + m-i-1) / m;
R = L + sz-1;
printf("%d len([%d, %d]) = %d\n", i, L, R, sz);
L = R + 1;
}
}
void method3(int n, int m) {
printf("In %s\n", __func__);
for (int i = 0, L = 0, R, sz; i < m; i++) {
sz = (n + i) / m;
R = L + sz-1;
printf("%d len([%d, %d]) = %d\n", i, L, R, sz);
L = R + 1;
}
}
void method4(int n, int m) {
printf("In %s\n", __func__);
int bsz = (n + m-1)/m, sz;
for (int i = 0, L = 0, R; i < m; i++) {
L = i*bsz, R = L + bsz - 1;
if (R >= n) R = n-1;
printf("%d len([%d, %d]) = %d\n", i, L, R, (R - L + 1));
}
}
int main() {
void (*FUNC[])(int, int) = {method0, method1, method2, method3, method4};
int n, m;
while (scanf("%d %d", &n, &m) == 2) {
for (int i = 0; i < sizeof(FUNC)/sizeof(FUNC[0]); i++)
(*FUNC[i])(n, m);
}
return 0;
}
Read More +

批改娘 10085. Parallel Count (debug)

題目描述

這有一份由 pthread 撰寫的序列總和計算,假設不開任何的優化參數,在快取處理會有嚴重缺失。

main.c

輸入序列長度 $n$,計算 $m$ 次經由 $\text{key}, \; \text{key} + 1, \; \text{key} + 2, \; \cdots, \; \text{key} + m-1$ 加密的序列總和。這部份將不提供修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <stdio.h>
#include <stdlib.h>
#include "utils.h"
int main() {
int cases = 0;
int n, m, key;
scanf("%d %d %d", &n, &m, &key);
for (int it = 0; it < m; it++) {
int ret = run(n, key);
printf("Case #%d: %d\n", ++cases, ret);
key++;
}
return 0;
}

utils.h

計算工作交由 void f(int n, int key, int *p1, int *p2, int *p3, int *p4); 完成最後四個參數,將會由 4 個 thread 計算分別儲存在位址 p1, p2, p3, p4 中。

1
2
3
4
5
6
7
#ifndef __UTILS_H
#define __UTILS_H
void f(int n, int key, int *p1, int *p2, int *p3, int *p4);
int run(int n, int key);
#endif

sum.c

平行計算序列總和,特別注意到 void* subtask(void* argu) 中存取的記憶體位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdlib.h>
#include <pthread.h>
#include <stdint.h>
#include "utils.h"
typedef struct Argu {
int *result;
int L, R, key;
} Argu;
static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}
static void* subtask(void* argu) {
Argu *ptr = (Argu *) argu;
*(ptr->result) = 0;
for (int i = ptr->L; i <= ptr->R; i++)
*(ptr->result) += encrypt(i, ptr->key);
}
void f(int n, int key, int *p1, int *p2, int *p3, int *p4) {
pthread_t threads[4];
int *output[4] = {p1, p2, p3, p4};
for (int i = 0; i < 4; i++) {
Argu *argu = (Argu *) malloc(sizeof(Argu));
argu->result = output[i];
argu->L = i * (n / 4) + 1;
argu->R = argu->L + n / 4 - 1;
argu->key = key;
if (i == 3) argu->R = n;
pthread_create(&threads[i], NULL, subtask, argu);
}
for (int i = 0; i < 4; i++)
pthread_join(threads[i], NULL);
}

job.c

你的工作要改善下方代碼的效能。

1
2
3
4
5
6
7
8
9
10
#include "utils.h"
int ret[128];
int run(int n, int key) {
int sum = 0;
f(n, key, ret, ret+1, ret+2, ret+3);
for (int i = 0; i < 4; i++)
sum += ret[i];
return sum;
}

輸入格式

輸入只有一行三個整數 $n, \; m, \; \text{key}$

輸出格式

輸出 $m$ 序列總和結果 (無視 overflow)。

範例輸入

1
10000000 10 514

範例輸出

1
2
3
4
5
6
7
8
9
10
Case #1: 1397862656
Case #2: 1970821632
Case #3: -1178356736
Case #4: 1113221120
Case #5: 1401409536
Case #6: 1977786368
Case #7: -1164427264
Case #8: 1145914243
Case #9: 645957382
Case #10: 1308383748

編譯環境

Makefile

1
2
3
4
5
6
CFLAG=-std=c99 -pthread
all: main.c sum.c job.c
gcc $(CFLAG) main.c -c
gcc $(CFLAG) sum.c -c
gcc $(CFLAG) main.o sum.o job.c -o job

reference

Solution

由於茵可大神給予我參考的簡報 Mind Cache 中提到不少的快取實驗,其中一部份就是在 thread 使用上,於是就拿來出題測試測試一番。

通常講到數據局部性 (Data Locality) 都希望資料盡量集中,但一不小心會犯下錯誤,就是在多個 thread 儲存答案時,雖然不會共用同一個位址,但相鄰的位址在另一個核 (core) 使用,由於彼此之間相互修改,兩個相鄰位址若放在同一個 cache line,dirty bit 勢必要讓他們從 cache line 掉到 L1 -> L2 -> L3,進行資料同步。

目前 cache line 普遍上設計大小為 64 Bytes,相當於間隔 16 個 4 bytes Integer 的差距,所以儲存答案間隔遠一點會比近一點好。當然,編譯器優化參數開到 O2 以上時,似乎就直接先存放到暫存器裡頭,不會不斷地存取 global memory 部份,如此一來,就不會發生 cache miss 問題。實驗結果效能可以差到四倍左右。

1
2
3
4
5
6
7
8
9
#include "utils.h"
int ret[128];
int run(int n, int key) {
int sum = 0;
f(n, key, ret+0, ret+16, ret+32, ret+48);
sum = ret[0] + ret[16] + ret[32] + ret[48];
return sum;
}
Read More +

批改娘 10084. Prefix Sum (pthread)

題目描述

給予序列 $A \left[ 1 \cdots n \right]$,請計算前綴和序列 $S \left[ 1 \cdots n \right]$,其中 $$S \left[ i \right] = \sum_{k=1}^{i} A \left[ k \right]$$

為了檢測方便,移除輸入和輸出時間,序列 $A$ 由一個簡單加密 $\textit{key}$ 得到序列 $$A \left[ i \right] = \textit{encrypt}(i, \textit{key})$$以及輸出部分直接呼叫$\textit{output}(\textit{S}, n)$。注意$S\left[ 0 \right] = 0$,儲存答案的範圍為$S\left[ 1 \cdots n \right]$。

utils.h

可以直接 #include "utils.h" 在你的程式中。

1
2
3
4
5
6
7
8
9
10
11
#ifndef _UTILS_H
#define _UTILS_H
#include <stdint.h>
static inline uint32_t rotate_left(uint32_t x, uint32_t n) {
return (x << n) | (x >> (32-n));
}
static inline uint32_t encrypt(uint32_t m, uint32_t key) {
return (rotate_left(m, key&31) + key)^key;
}
void output(uint32_t presum[], int n);
#endif

prefixsum-seq.c

請修改這份程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include "utils.h"
#define MAXN 10000005
#define MAX_THREAD 4
uint32_t prefix_sum[MAXN];
int main() {
int n;
uint32_t key;
while (scanf("%d %" PRIu32, &n, &key) == 2) {
uint32_t sum = 0;
for (int i = 1; i <= n; i++) {
sum += encrypt(i, key);
prefix_sum[i] = sum;
}
output(prefix_sum, n);
}
return 0;
}

secret.c (測試用)

實際情況會用助教寫好的平行方式進行計算且雜湊公式不同。

1
2
3
4
5
6
7
8
9
10
11
12
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include "utils.h"
void output(uint32_t presum[], int n) {
uint32_t hash = 0;
for (int i = 1; i <= n; i++)
hash += presum[i] * i;
printf("%" PRIu32 "\n", hash);
}

輸入格式

輸入有多組測資,每組測資一行兩個整數 $n$, $\textit{key}$,分別為序列長度 $n$,加密金鑰 $\textit{key}$

  • $1 \le n \le 10^7$
  • $0 \le key < 2^{32}$

輸出格式

對於每一組測資呼叫 output(uint32_t presum[], int n),隨後會輸出一個數值。

範例輸入

1
2
3 2
10 5

範例輸出

1
2
100
54560

範例解釋

  • $(n, \textit{key})=(3, 2)$,得 $A \left[ 1 \cdots 3\right] = \left[ 4, 8, 12 \right]$$S \left[ 1 \cdots 3\right] = \left[ 4, 12, 24 \right]$$\text{hash} = 4 + 12 \times 2 + 24 \times 3 = 100$

編譯方式

1
gcc -std=c99 -O2 -pthread prefixsum-seq.c secret.c -o prefixsum-seq

測試主機資訊

推薦使用 4 個執行緒解決此問題,平行效能接近 2 倍改進。若使用過多的執行緒,將因為要排到另一個處理器上運行而變慢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
processor : 0
vendor_id : GenuineIntel
cpu family : 6
model : 62
model name : Intel(R) Xeon(R) CPU E5-2620 v2 @ 2.10GHz
stepping : 4
microcode : 0x415
cpu MHz : 1200.000
cache size : 15360 KB
physical id : 0
siblings : 12
core id : 0
cpu cores : 6
apicid : 0
initial apicid : 0
fpu : yes
fpu_exception : yes
cpuid level : 13
wp : yes
processor : 1
vendor_id : GenuineIntel
cpu family : 6
model : 62
model name : Intel(R) Xeon(R) CPU E5-2620 v2 @ 2.10GHz
stepping : 4
microcode : 0x415
cpu MHz : 1200.000
cache size : 15360 KB
physical id : 0
siblings : 12
core id : 1
cpu cores : 6
apicid : 2
initial apicid : 2
fpu : yes
fpu_exception : yes
cpuid level : 13
wp : yes

Solution

利用 $P$ 的 thread,分開計算區間 $[1, N/P], \; [N/P+1, 2N/P], \cdots$ 的前綴和,由於前綴和只有利用加法計算,加法具有結合律,隨後傳遞前 $i$ 段的和,可以循序 $\mathcal{O}(P)$ 完成,或者利用樹形平行 (tree-based) 的方式在 $\mathcal{O}(\log P)$,在實務上直接循序即可。

由於採用分治方式,需要平行兩次,時間複雜度為 $\mathcal{O}(2 \frac{N}{P} + P)$

  • $P = 3$ 時,只獲得 1.5 倍的加速
  • $P = 4$ 時,獲得 2 倍加速
  • $P > 4$ 時,要將記憶體的部分傳到另一顆 CPU 上,假設搬運時間是線性正比,計算複雜度也是線性時間,那麼搬運時間不如在同一顆 CPU 上運行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include <pthread.h>
#include "utils.h"
typedef struct Argu {
int l, r;
uint32_t *psum;
uint32_t key;
} Argu;
int min(int x, int y) {
return x < y ? x : y;
}
void* subtask1(void *argu) {
Argu data = *((Argu *) argu);
int l = data.l, r = data.r;
uint32_t *psum = data.psum;
uint32_t sum = 0, key = data.key;
for (int i = l, j = 0; i <= r; i++, j++) {
sum += encrypt(i, key);
psum[j] = sum;
}
free(argu);
}
void* subtask2(void *argu) {
Argu data = *((Argu *) argu);
int l = data.l, r = data.r;
uint32_t *psum = data.psum;
uint32_t base = data.key;
for (int i = l, j = 0; i <= r; i++, j++)
psum[j] += base;
free(argu);
}
#define MAXN 10000005
#define MAX_THREAD 4
uint32_t prefix_sum[MAXN];
int main() {
int n;
uint32_t key;
while (scanf("%d %" PRIu32, &n, &key) == 2) {
int BLOCK = (n+MAX_THREAD-1) / MAX_THREAD, m = 0;
pthread_t threads[MAX_THREAD];
for (int i = 1; i <= n; ) {
Argu *argu = (Argu *) malloc(sizeof(Argu));
argu->l = i, argu->r = min(n, i + BLOCK - 1);
argu->psum = prefix_sum + i, argu->key = key;
i += BLOCK;
pthread_create(&threads[m], NULL, subtask1, argu), m++;
}
for (int i = 0; i < m; i++)
pthread_join(threads[i], NULL);
m = 0;
uint32_t block_sum = 0;
for (int i = 1; i <= n; i) {
uint32_t tmp = block_sum;
block_sum += prefix_sum[min(n, i+BLOCK-1)];
Argu *argu = (Argu *) malloc(sizeof(Argu));
argu->l = i, argu->r = min(n, i + BLOCK - 1);
argu->psum = prefix_sum + i, argu->key = tmp;
i += BLOCK;
pthread_create(&threads[m], NULL, subtask2, argu), m++;
}
for (int i = 0; i < m; i++)
pthread_join(threads[i], NULL);
output(prefix_sum, n);
}
return 0;
}
Read More +

pbrt-v2 加速結構 BVH-contract 改寫

Description of implementation approach and comments

從一般 BVH 架構中,一般都是用 full binary tree,子節點要不 0 個要不 2 個。若有 $N$ 個 primitive 物件,則表示有 $N$ 個葉節點放置這些 primitive 物件,而有 $N-1$ 個內部節點紀錄 Bounding Box 的部分。在測試交點和遍歷走訪的使用上,最慘有一半是多餘計算和走訪,而另一半屬於加速結構。

在論文 Ray Specialized Contraction on Bounding Volume Hierarchies 中,企圖想要利用 generic tree 取代一般使用 full binary tree 實作,在不更動 BVH 的效能下,減少運行上較沒用的內部節點,用以提升遍歷走訪效能,以及提升內存快取的效率。

降低內部節點步驟如下所示:

  1. 利用原生方法建立 full binary tree 的 BVH (利用各種分割策略完成)
  2. 進行坍倒 (flatten),將二元樹不連續的記憶體分布調整成線性分布,加速遍歷走訪的內存快取效率。
  3. 靜態調整成 generic tree 版本,藉由啟發式收縮 (Contract),利用節點與其子節點的 Bounding Box 表面積比例,評估浪費的走訪節點。
  4. 動態調整部分,採用隨機取樣,根據取樣 ray,取樣走訪次數,將比較容易打到的節點盡可能收縮到接近根節點。

若要收縮節點 $N$,假設 Bounding box 計算交點花費為 $C_B$,穿過節點 $N$ 的 Bounding box 射線機率 $\alpha_N$,得到收縮前後的計算差值 $\delta(N)$,如下所示。

$$\begin{align} \delta(N) &= n_{N.\text{child}} \; C_B - (\alpha_N (1+n_{N.\text{child}}) +(1 - \alpha_N)) \; C_B \\ & = ((1 - \alpha_N) \; n_{N.\text{child}} - 1) \; C_B \end{align}$$

目標讓 $\delta(N) < 0$,得到

$\alpha(N) > 1 - \frac{1}{n_{N.\text{child}}}$

計算 $\delta(N)$ 顯得要有效率,但又沒辦法全局考量,需要提供一個猜測算法,藉由部分取樣以及步驟 2. 的表面積總和比例進行收縮。

實作部分

從實作中,在步驟 2. 約略可以減少 $25\%$ 的節點,在記憶體方面的影響量沒有太大影響,節點紀錄資訊也增加 (sizeof(struct Node) 相對地大上許多)。

在步驟 3. 中,根據 pbrt-v2 的架構,加速結構能取得的場景資訊並不容易竄改,大部分的類別函數都是 const function(),意即無法變動 object member 的值,但針對指向位址的值可以改變。這類寫法,猜想所有加速結構都是靜態決定,在多核心運行時較不會存在同步 overhead 的影響。

在此,換成另外一種修改方案,在 pbrt-v2/core/scene.hbool Scene::Intersect(...) 函數中加入 aggregate->tick();。利用 aggregate->tick(); 這個函數,大部分呼叫都沒有更動樹狀結構。當呼叫次數 $T$ 達到一定次數時,加速結構會進行大規模的結構調整。

根據 pbrt rendering 的步驟,儘管不斷地測試或者是估計 $T$ 要設定的值,都無法接近全局的取樣評估,其中最大的原因是取樣順序和局部取樣調整,從理論上得知不可能會有比較好的結果。這樣的寫法提供簡便的方案去統計 pbrt 運算時較有可能的 ray 從哪裡射出,不用挖掘所有的不同類型 ray 進行取樣。

最後,修改檔案如下:

修改檔案清單

1
2
3
4
5
6
7
8
9
.
├── accelerators
│ ├── bvhcontract.cpp
│ └── bvhcontract.h
└── core
├── api.cpp
├── primitive.cpp
├── primitive.h
└── scene.h

core/api.cpp

1
2
3
4
5
6
7
8
Primitive *MakeAccelerator(const string &name,
const vector<Reference<Primitive> > &prims,
const ParamSet &paramSet) {
...
else if (name == "bvhcontract")
accel = CreateBVHContractAccelerator(prims, paramSet);
...
}

core/primitive.h

1
2
3
4
5
6
7
8
9
class Primitive : public ReferenceCounted {
public:
...
// MORRIS ADD
virtual void tick();
...
protected:
...
};

core/scene.h

1
2
3
4
5
6
7
8
9
class Scene {
public:
// Scene Public Methods
bool Intersect(const Ray &ray, Intersection *isect) const {
...
aggregate->tick();
...
}
};

Generic Tree 設計與走訪測試

在 Full binary tree style - BVH 實作中,利用前序走訪分配節點編號範圍 $[0, 2N-1]$,因此節點編號 $u$ 的左子樹的編號為 $u+1$,只需要紀錄右子樹編號 secondChildOffset,這種寫法在空間和走訪時的快取都有效能改善。在標準程序中也單用迭代方式即可完成,不採用遞迴走訪,減少 push stack 的指令。

在 Generic tree 版本中,基礎節點紀錄架構如下:

1
2
3
4
5
6
7
8
9
10
11
struct LinearTreeNode {
// node link information
uint32_t parentOffset;
uint32_t childOffsetHead, childOffsetTail;
uint32_t siblingOffsetNext, siblingOffsetPrev;
// faster record
uint32_t numChild;
// node data
TreeData e;
uint32_t visitCount;
};

在原生 BVH 求交點寫法中,根據節點的 Split-Axis 以及 Ray 的方向決定先左子樹還是先右子樹走訪,藉以獲得較高的剪枝機率。但全部先左子樹走訪的快取優勢比較高 (因為前序分配節點編號),反之在 Split-Axis 有一半的機率走會走快取優勢不高的情況,在權衡兩者之間。

然而在 Generic Tree 實作上,若要提供 Split-Axis 則需要提供 childOffsetTailsiblingOffsetPrev 兩個指針,則多了兩個紀錄欄位,單一節點大小從 sizeof(LinearBVHNode) = 32拓展到 sizeof(LinearBVHContractNode) = 60,記憶體用量整整接近兩倍。從 Contract 算法中,節點個數保證無法減少一半,推論得到在 Contract 後記憶體用量會多一些。

走訪實作上分成遞迴和迭代兩種版本,遞迴在效能上會卡在 push esp, argument 等資訊上,而在迭代版本省了 call function overhead 和空間使用,但增加計算次數。而在我撰寫的版本中,還必須 access 父節點的資訊決定要往 siblingOffsetNext 還是 siblingOffsetprev,因此快取效能從理論上嚴重下滑。

遞迴和迭代走訪寫法如下:

遞迴版本走訪

1
2
3
4
5
6
7
8
9
10
void recursiveTraverse(LinearTreeNode *node, LinearTreeNode *_mem) {
// proess
uint32_t offset = node->childOffsetHead;
if (offset == -1)
return ;
for (LinearTreeNode *u; offset != -1; offset = u->siblingOffsetNext) {
u = &_mem[offset];
recursiveTraverse(u, _mem);
}
}

迭代版本走訪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void iteratorTraverse(uint32_t offset, LinearTreeNode *_mem) {
bool process = true;
while (offset != -1) {
LinearTreeNode *node = &_mem[offset];
if (process) {
// process
}
if (node->childOffsetHead != -1 && process) {
offset = node->childOffsetHead;
process = true;
} else if (node->siblingOffsetNext != -1) {
offset = node->siblingOffsetNext;
process = true;
} else {
offset = node->parentOffset;
process = false;
}
}
}

節點大小對走訪效能影響

從實驗數據看來,遞迴版本比迭代版本在越大節點情況效能普遍性地好,預估是在遞迴版本造成的快取效能好上許多。

sizeof(LinearTreeNode) bytes \ Traversal Recursive Loop
32 6.049s 5.628s
44 6.651s 6.817s
60 7.460s 6.888s
92 9.361s 9.271s
156 16.844s 16.694s
220 25.294s 27.031s
284 28.181s 30.900s
540 28.560s 33.707s

實驗結果

由於 tick() 效果並不好,調整參數後與原生的作法差距仍然存在,單靠表面積提供的啟發式收縮,效率比加上動態結果還好。但從實驗結果中得知,實作方面存在一些尚未排除的效能障礙,在多核心部分效能差距就非常明顯,預計是在求交點時同步資料造成的 overhead 時間。

而在減少的節點數量,光是啟發是的表面收縮就減少 $25\%$ 節點量,而在動態收縮處理,儘管已經探訪 $10^7$ 收縮點數量近乎微乎其微 (不到一兩百個節點)。

sences \ BVH policy Native Contract(loop) Contract(recursive) Node Reduce
sibenik.pbrt 7.000s 10.502s 9.411s 99576 / 131457
yeahright.pbrt 12.297s 14.638s 14.210s 288707 / 376317
sponza-fog.pbrt 16m36.037s 21m09.960s 20m12.012s 91412 / 121155

Test sences

Reference paper

Ray Specialized Contraction on Bounding Volume Hierarchies, Yan Gu Yong He Guy E. Blelloch

Read More +

pbrt-v2 加速結構 Environment Lights 改寫

Median Cut Algorithm

根據論文 P. Debevec, A Median Cut Algorithm for Light Probe Sampling, SIGGRAPH 2006 Sketches and Applications. 中,預期要將 pbrt/lights/infinite.cpp 中的 class InfiniteAreaLight 用數個點光源取代 Infinite Area Light 的寫法,提升均勻取樣的效能,而 Median Cut Algorithm 在預處理非常快,根據用多少量的點光源將影響品質,若在品質不用太好的 rendering 環境下,這是一個不錯的降質提升速度的方案。

在 Median Cut Algorithm 取樣 64 equal-energy region 的速度在不同取樣個數差異比較

Env-Light.pbrt



Env-Light-new.pbrt



在 Median Cut Algorithm 不同數量 equal-energy region 在 256 samples 速度

Median Cut Algorithm equal-energy region



算法的步驟如下:

  1. 將入射光場影像 (light probe image) 切成好幾個矩形區域,每一個區域將取用一個點光源代替。將入射光場影像轉換成灰階亮度 $Y$,如論文中所提及的方案 $Y = 0.2125 R + 0.7154 G + 0.0721 B$ 這類型的轉換。
  2. 對於每一個區域將會根據最長維度切割成兩個子區域。切割成兩個子區域的亮度總和越接近越好。
  3. 若切割區域數量不到目標的數量,則重複步驟 2。
  4. 最後將每一個區域算出代表的點光源,並加總區域內的亮度和,隨後取樣根據亮度和作為取樣根據 (在 Spectrum MedianCutEnvironmentLight::Sample_L(const Point&, float, const LightSample&, float, Vector&, float*, VisibilityTester*) 中使用),用每一個區域內部的 pixel 位置和亮度計算重心作為代表的點光源。

算法類似於 k-d Tree,但特別的是每次選擇區域維度最長的那一段進行切割,而不是像 k-d Tree 則是採用輪替維度進行。

Median Cut Algorithm 需要 $\mathcal{O}(HW)$ 時間 $\mathcal{O}(HW)$ 空間來預處理亮度資訊。若要切割成 $N$ 個區域,需要 $\mathcal{O}(\log N)$ 次迭代,每一次迭代增加兩倍區域數量。將一個區域切割時,針對維度最長的那一軸二分搜尋,二分搜尋計算其中一個區域的亮度和是否是整個區域亮度的一半,由於預處理完成的表格,可以在 $\mathcal{O}(1)$ 時間內計算任意平行兩軸的矩形內部元素總和。

維護 sumTable[i][j] 計算左上角 $(0, 0)$ 右下角 $(i, j)$ 的亮度和,計算任意平行兩軸矩形內部和只需要 $\mathcal{O}(1)$ 時間。

1
2
3
4
5
6
7
float getEnergy(float sumTable[], int width, int height) {
float e = sumTable[VERT(ru, rv)];
if (lu) e -= sumTable[VERT(lu-1, rv)];
if (lv) e -= sumTable[VERT(ru, lv-1)];
if (lu && lv) e += sumTable[VERT(lu-1, lv-1)];
return e;
}

另一種方案,可以從 pbrt 原生的 class MIPMap 取代上述寫法,MIPMap 的寫法採用分層式的架構,每一層將原圖長寬各縮小一半。計算某矩形元素總和時,藉由兩層的總和估計進行內插。理論上,這種寫法雖然不夠精準,但提供很好的快取優勢。

重心計算

Centroid Formula 1

一般重心計算採用以下公式:

$$X_c = \frac{\sum^{}_{(x, y) \in \mathit{region}} L_{(x, y)} \; x}{\sum^{}_{(x, y) \in \mathit{region}} L_{(x, y)}} \\ Y_c = \frac{\sum^{}_{(x, y) \in \mathit{region}} L_{(x, y)} \; y}{\sum^{}_{(x, y) \in \mathit{region}} L_{(x, y)}}$$

經由 Median-Cut Algorithm 在 Texmap 1 運行後,代表的點光源明顯地偏離亮區,因此為了讓代表的點光源更靠近亮區,我們將其重心公式修改成 Centroid Formula 2。

Centroid Formula 1

若以 $\mathit{Energy} \propto L^2_{(x, y)}$,能量與亮度二次成正比,則計算出的重心會更靠近亮區。

$$X_c = \frac{\sum^{}_{(x, y) \in \mathit{region}} L^2_{(x, y)} \; x}{\sum^{}_{(x, y) \in \mathit{region}} L^2_{(x, y)}} \\ Y_c = \frac{\sum^{}_{(x, y) \in \mathit{region}} L^2_{(x, y)} \; y}{\sum^{}_{(x, y) \in \mathit{region}} L^2_{(x, y)}}$$

比較結果如下:

由於 CSS 問題,詳細內容請到 http://morris821028.github.io/hw-rendering/homework3/submission/index.html 觀看。

Read More +

pbrt-v2 加速結構 Realistic Camera 改寫

Ray Sphere Intersection

計算射線和球體的交點,可以參照 /pbrt-v2/shapes/sphere.cppbool Shpere::Intersect() 的算法。

假設球體圓心座標 $O$,射線單位向量 $I$ 的起點座標 $C$,且最近目標交點座標 $P$,原半徑 $\mathrm{radius}$。射線走單位時間 $t$ 會到達球面上。

  • $\overrightarrow{OC} + \overrightarrow{I} \times t = \overrightarrow{OP}$
  • $|\overrightarrow{OP}| = \text{radius}$
  • $|\overrightarrow{OC} + \overrightarrow{I} \times t| = |\overrightarrow{OP}|$
  • $|\overrightarrow{I}|^2 t^2 + 2 \; (\overrightarrow{OC} \cdot \overrightarrow{I}) \; t + |\overrightarrow{OC}|^2 - \text{radius}^2 = 0$

解一元二次方程式之後可以得到 $t$ 得值,並得到交點座標 $P$

Snell’s Law

根據物理法則斯乃爾定律計算折射的方向向量,課程中提供三種做法。

變數解釋:入射介面材質 $\eta_1$,折射介面材質 $\eta_2$,入射單位向量 $\overrightarrow{I}$,交點面法向量 $\overrightarrow{N}$,折射方向向量 $\overrightarrow{T}$

特別小心 ray->d = Normalize(ray->d) 的處理,Heckbert’s Method 計算維持在單位圓上,故不用做最後的正規化計算。

Whitted’s Method

$\sqrt{}$ $/$ $\times$ $+$ compute
1 $n = \eta_1 / \eta_2$
3 3 2 $I' = I / (-I \cdot N)$
3 $J = I' + N$
1 1 8 5 $\alpha = 1 / \sqrt{n^2(I' \cdot I') - (J \cdot J)}$
3 3 $T' = \alpha J - N$
1 3 3 2 $T' = T' / \| T' \|$
2 8 17 15 TOTAL

Heckbert’s Method

$\sqrt{}$ $/$ $\times$ $+$ compute
1 $\eta = \eta_1 / \eta_2$
3 2 $c_1 = - I \cdot N$
1 3 2 $c_2 = \sqrt{1 - \eta^2(1 - c_1^2)}$
7 4 $T = \eta I + (\eta c_1 - c_2) N$
1 1 13 8 TOTAL

Other Method

$\sqrt{}$ $/$ $\times$ $+$ compute
1 $n = \eta_2 / \eta_1$
3 2 $c_1 = - I \cdot N$
1 2 3 $\beta = c_1 \sqrt{n^2 - 1 + c_1^2}$
3 3 3 $T = (I + \beta N ) / n$
1 4 8 8 TOTAL

其中以 Heckbert’s Method 消耗最少計算數。如果除法速度快於乘法,則使用 Other Method,原則上很少有機器運算除法比乘法快。

RasterToCamera

這部分處理後得到 Transform RasterToCamera。若計算錯誤,會造成一片黑或者圖片顯示的大小問題。座標轉換處理細節可以參考實際的例子,如下圖所示:

http://www.scratchapixel.com/lessons/3d-basic-rendering/ray-tracing-generating-camera-rays/generating-camera-rays

  • Raster-To-NDC 轉換矩陣 $A=\text{Scale}(scale, scale, 1)$,進行縮放。
  • NDC-To-Camera 轉換矩陣 $B=\text{Translate}(Vector(0, 0, filedistance)) \times \text{Translate}(Vector(X, -Y, 0))$,進行位移。
  • 最後得到 RasterToCamera 轉換矩陣 $C= B \times A \times \text{Scale}(-1, 1, 1)$,最後將 x 軸顛倒以符合成像問題。

Ray Weight

作業要求經過 float GenerateRay() 回傳 $\mathrm{weight} = \frac{\cos^4 \theta'}{Z^2}$,這麼設置會過暗,根據論文 A Realistic Camera Model for Computer Graphics 中寫道

If the exit pupil subtends a small solid angle from $x'$, $\theta'$ can be assumed to be constant and equal the angle between $x'$ and the center of the disk. This allows us to simplify
$E(x') = \int_{x'' \in D} L(x'', x') \frac{\cos \theta' \cos \theta''}{\| x'' - x'\|} dA''$<span>$to:$</span><!-- Has MathJax -->$E(x’) = L \frac{A}{Z^2} \cos^4 \theta’$
where $Z$ is the axial distance from the film plane to the dist and $A$ is the area of the disk.

A Realistic Camera Model for Computer Graphics: Figure 6

因此需要額外乘上常數 $A$,其中 $A$ 是最裡層的透鏡面積,因為我們是根據最裡層的透鏡面做均勻取樣,得到 $A = \mathit{backLens.radius}^2 \pi$

Sampling

單位圓均勻取樣方法有以下兩種,而非均勻取樣的寫法可參照 Sample 3 (錯誤的做法參照) 出來的效果看出。

Sampling 1

採用內建函數 CencentricSampleDisk(),採用 A Low Distortion Map Between Disk and Square 論文中提到的方案,將一個正方形壓縮到一個圓形中。參照作法如下圖所示

其中給定 $a, b$ 均勻分布 $[0, 1]$,則得到 $r = a, \; \phi = \frac{\pi}{4} \frac{b}{a}$,最後計算得到座標 $x = r \cos \phi, \; y = r \sin \phi$

Sampling 2

採用教科書上提供,其中給定 $a, b$ 均勻分布 $[0, 1]$,令 $r = \sqrt{a}, \; \phi = 2 \pi b$,最後計算得到座標 $x = r \cos \phi, \; y = r \sin \phi$

Sampling 3

分布 $[0, 1]$,令 $r = a, \; \phi = 2 \pi b$,最後計算得到座標 $x = r \cos \phi, \; y = r \sin \phi$。這種寫法在相同半徑下,角度均勻分布,不同半徑下的周長與 $r$ 成正比,導致不同半徑的取樣點不均勻,越靠近中心點取樣越密集,意即容易造成中心點看起來較亮。

Reference

Final Images Rendered Compare 4 samples

Gauss Lens








My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Fisheye Lens








My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Telephoto Lens








My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Wide-Angle Lens








My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Final Images Rendered Compare 512 samples

Gauss Lens









Reference



My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Fisheye Lens









Reference



My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Telephoto Lens









Reference



My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Wide-Angle Lens









Reference



My Implementation (Sampling 1)



My Implementation (Sampling 2)



My Implementation (Sampling 3)


Read More +

pbrt-v2 加速結構 Height Field 改寫

前情提要 pbrt

一套數位影像生成軟體,盡可能以物理性質為基底打造的,目前已經到了 pbrt-v3。pbrt 主要的工作在於 modeling 後,可以在場景內放置以幾何描述的物體、光源、拍攝曝光 … 等設定,藉由模擬光線走訪得到呈現在螢幕上每一個 pixel 的顏色。

目標工作

Height Field 是一個高度場,測資中以山脈構造為主,等間隔取樣每一平面上的每一點高度的場景,當間隔越小越接近真實場景。原生寫法是將點放置在三維空間,轉換成一堆三角網格,將其丟入 KD-tree 或者是 BVH 等加速結構,加快光線模擬找交點 (三維空間中,一射線和一多面體的交點) 的效能。而現在挑戰用 3D-DDA 的方式進行交點查找。

實作採用 3D-DDA 算法降至 2D Grid 上進行交點測試。Uniform Grid 將會切割成 $N \times M$ 的格子,在 2D 平面上,先找到 ray 進入的格子點,接著使用增量算法找到下一個格子,算法只測試投影到 x-y 平面上時,在 ray 鄰近的格子點做交點測試,時間複雜度 $O(\sqrt{N \times M})$,與預設的其他算法所需要的 traversal 複雜度差不多。

原本預設的 Height Field 拆成好幾個 Triangle,做法類似 Triangle Mesh,接著將這幾個 Triangle 利用 BVH 或者是 KD-tree 的架構進行,原本的做法會考慮三個維度,使用投影的方式只考慮兩個維度下的情況,接著再針對有相交的格子測試,最後才進行 3D Triangle 是否與 ray 相交。

實作 Height Field 時,考慮快取和再計算之間的好壞,則有兩種方法

  1. 預先將預處理每個 3D Triangle 座標 (消耗空間)
  2. 需要時,再創建 Triangle 出來測試 (消耗時間)

由於要找 ray 碰度到 uniform grid 的第一個格子,需要預處理 Height Field Bound Box,計算 Bound Box 需要 $O(N \times M)$,若不預處理會造成 3D-DDA 與暴力法 $O(N \times M)$ 無異。

若對 Height Field 進行 2D-DDA 好處在於一定是數個三角形構成,因此不用等到 ray.maxT 小於下一格交點的 NextCrossingT[stepAxis] 才能結束,一與三角形有交點就可以離開,因為每一格之間都有獨立性。

實作與測試細節

測試環境

一開始為了熟悉 pbrt 架構環境只實作求出交點的函數,

1
2
3
4
bool Heightfield2::Intersect(const Ray &r, float *tHit, float *rayEpsilon,
DifferentialGeometry *dg) const {
// 3D DDA
}

而直接忽略單純詢問交點是否存在,這會導致畫面一片黑,因為每打到一個交點會嘗試從物體表面的交點連到點光源。若下述函數恆真,則不存在光源到交點的路徑,因此產出來的圖片是一片黑。

1
2
3
bool Heightfield2::IntersectP(const Ray &r) const {
return true;
}

紋理模糊

找出紋理的兩個參數 $(u, v)$ 時,從 trianglemesh.cpp 複製過來的 class Trianglebool Triangle::Intersect() 函數得到的 $u, v$ 都是相對於三角形的 $0 \le u, v \le 1$ (直接 rendering 會看起來很多小格子點),因此得到相對的 $u, v$ 和交點 $p_{hit}$ (Object coordinate),則把 $p_{hit}$ 再次投影到 x-y 平面上,由於 height field 在 x-y 平面上長寬都是 1,投影到平面後得到 $p_{projection}$,對於整個 height field $u = p_{projection}.x, \; v = p_{projection}.y$

若測試 texture.pbrt 時,發生過模糊情況,其主要原因在於過多次座標轉換,如 Object -> World -> Object -> World,求出來的 $u, v$ 誤差就會放大。

將 height field 存成好幾個 triangle 可以像 class TriangleMesh 利用 index 標籤來壓縮記憶體,但實作為方便仍然每一個三角形都有實體的 Point,儲存三角形每一個點座標採用 object coordinate,先把 $ray$WorldToObject 得到 $ray'$,得到的交點 $p_{hit}$ 再進行 ObjectToWorld,如此一來模糊情況就會消失。

Phong 圓滑化

Phong interpolation 方法在三角形上面操作時,需要得知三個頂點的法向量,給予 height field 的資訊,並不曉得每一個頂點的法向量為何。得到每一個頂點的法向量,採用預先處理所有頂點的法向量並儲存起來,因為不斷地正規化是很耗時間的 (float sqrt(float x) 儘管採用 fast inverse square root 算法,也要極力避免多餘的運算)。

再計算頂點法向量時,根據影像處理中常用的一次差分 (differentiation) 遮罩

Case 1 邊緣
$$N_x = \begin{bmatrix} 0 & 0 & 0 \\ -1 & 1 & 0 \\ 0 & 0 & 0 \\ \end{bmatrix}, \; N_y = \begin{bmatrix} 0 & 0 & 0 \\ 0 & 1 & 0 \\ 0 & -1 & 0 \\ \end{bmatrix}$$
Case 2 非邊緣
$$N_x = \begin{bmatrix} 0 & 0 & 0 \\ -1 & 0 & 1 \\ 0 & 0 & 0 \\ \end{bmatrix}, \; N_y = \begin{bmatrix} 0 & 1 & 0 \\ 0 & 0 & 0 \\ 0 & -1 & 0 \\ \end{bmatrix}$$

上圖矩陣的中心座標為 $(x, y)$,將相鄰座標的高度值 $z$ 相減得到兩個方向的向量 $N_x, \; N_y$,外積得到座標 $(x, y)$ 的法向量 $N_{x, y} = \mathrm{Cross}(N_x, N_y)$

從另一個角度來看,將 height field 三角化後,每一個頂點原則上有 6 個鄰居,可以對每一個鄰居進行差分,接著順時針或者逆時針將兩兩相臨的向量外積,得到 6 個法向量平均後得到 $N_{x, y}$,這做法看起來比上述的差分來得穩定,但初始化消耗的時間會比較長,且撰寫的外積順序不對容易造成零向量的正規化錯誤。

Final Images Rendered with my implementation of heightfield.cpp

 






























hftest.pbrt (without Phong
interpolation)

Timings:


  • Original: 0.125 seconds

  • My implementation: 0.130 seconds (104% original)



landsea-0.pbrt (without Phong
interpolation)

Timings:


  • Original: 0.815 seconds

  • My implementation: 1.050 seconds (128% original)



landsea-1.pbrt (without Phong
interpolation)

Timings:


  • Original: 0.850 seconds

  • My implementation: 1.020 seconds (120% original)



landsea-2.pbrt (without Phong
interpolation)

Timings:


  • Original: 0.780 seconds

  • My implementation: 0.870 seconds (115% original)



texture.pbrt (without Phong
interpolation)

Timings:


  • Original: 0.450 seconds

  • My implementation: 0.520 seconds (120% original)





landsea-big.pbrt (without Phong
interpolation)


Timings:


  • Original: 6.200 seconds

  • My implementation: 2.200 seconds (35% original)


Final Images Rendered with my implementation of heightfield.cpp

 






























hftest.pbrt (with Phong
interpolation)

Timings:


  • My implementation: 0.135 seconds



landsea-0.pbrt (with Phong
interpolation)

Timings:


  • My implementation: 1.100 seconds



landsea-1.pbrt (with Phong
interpolation)

Timings:


  • My implementation: 1.050 seconds



landsea-2.pbrt (with Phong
interpolation)

Timings:


  • My implementation: 0.900 seconds



texture.pbrt (with Phong
interpolation)

Timings:


  • My implementation: 0.530 seconds





landsea-big.pbrt (with Phong interpolation)


Timings:


  • My implementation: 2.300 seconds


Read More +

即時系統 加入排程器

目標

修改 linux kernel,在 kernel/sched.} 相關 scheduler 部分增加 Weighted Round-Robin scheduler、Shortest Job First scheduler 以及 Rate-monotonic scheduler。最後藉由 Project 1 的練習撰寫測試程序驗證 scheduler 是否符合預期結果。

修改檔案列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
.
├── README.md
├── arch
│ └── x86
│ ├── include
│ │ └── asm
│ │ ├── unistd_32.h
│ │ └── unistd_64.h
│ └── kernel
│ └── syscall_table_32.S
├── include
│ └── linux
│ ├── sched.h
│ └── syscalls.h
└── kernel
├── sched.c
├── sched_fair.c
├── sched_rt.c
└── sched_weighted_rr.c

特別注意到,若編譯環境為 x86_64 則需要額外修改 arch/x86/include/asm/unistd_64.h 複製 arch/x86/include/asm/unistd_32.h 增加的 syscall。 同理,增加自己定義的 syscall 時,需要修改 arch/x86 下的那三個檔案 (如果發現編譯 kernel 時,出現 WARNING: syscall NOT IMPLEMENTION. 大多數都是這個造成)。

include/linux/sched.h 中,增加 task 的成員變數,提供接下來針對 task 操作需要的參數,特別的是在 struct sched_rt_entity; 中,相較於一般資料結構的寫法,linux 採用倒過來將節點訊息放在資料內部中,利用 #define container_of(ptr, type, member) 取得物件訊息,這種寫法方便解決 task 在數個 scheduler 中轉換。

因為 syscall 的方式設定 task weighted,增加一個全區變數 int weighted_rr_time_slice;,每一次 syscall 去設定這一個全區變數值,然後 task 建立時,經過 sched_fork,直接利用被修改全區變數作為這一個 task 的 weight。若要特別針對 task 設定 weighted round-robin,在建立前都要呼叫 syscall 設定全區變數。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct sched_rt_entity {
...
//+ RTS Proj2: weighted_rr
struct list_head weighted_rr_list_item;
...
};
...
struct task_struct {
...
//+ RTS Proj2: weighted_rr
unsigned int task_time_slice;
//+ RTS Proj2: weighted_rr_prio
unsigned int weighted_time_slice;
//+ RTS Proj2: bonus RMS
int period;
...
};
...
//+ RTS Proj2: weighted_rr
extern int weighted_rr_time_slice;

Part 1. Weighted Round-Robin Scheduler

  • enqueue_task_weighted_rr()
    函數將會給予 task 和 rq,將 task 根據 rq 的資料結構重新將 task 放入。若按照 list 結構,直接模擬 queue 即可,由於 linux 提供 list 本身為雙向串列,直接加入尾端只需要 $\mathcal{O}(1)$ 時間。並且運行 rq->weighted_rr.nr_running++; 增加 rq 中的計數,隨後用來在回報該 scheduler 中有沒有存在 task 需要運行,方便在 $\mathcal{O}(1)$ 時間內完成,由於 Weighted round-robin 只需要確認 list 是否為空,那麼也可以利用 linux 提供的 list_empty() 確認。

  • dequeue_task_weighted_rr()
    當 task 完成任務時,都要呼叫 update_curr_weighted_rr 進行統計運行多久,並且更新與 task 相關的排程權重。接著將 task 從 rq 串列中移除,並且更新 scheduler 的 task 計數 (rq->weighted_rr.nr_running--;)。時間複雜度 $\mathcal{O}(1)$

  • requeue_task_weighted_rr()
    將一個 task 出列,在 Weighted round-robin 只需要直接對將 task.weighted_rr_list_item 移到串列最尾端,由於採用雙向串列,直接跟 scheduler 取得 list head (linux list 的 head 是不存放任何資訊) 直接加入尾端即可。時間複雜度 $\mathcal{O}(1)$

  • yield_task_weighted_rr()
    直接運行 \lstinline{requeue} 即可。

  • pick_next_task_weighted_rr()
    當最上層分配一段時間給 scheduler 運行,運行時會調用這個函數,並拿取要執行的 task,但並不用將其移除串列,並執行 next->se.exec_start = rq->clock; 紀錄 task 的開始運行時間,再呼叫 void update_curr_weighted_rr(struct rq *)。若不更新時間,則計算的相對運行時間會錯誤。

  • task_tick_weighted_rr()
    當 scheduler 運行時,每隔一段固定的時間會呼叫此函數。若程序執行量超過 scheduler 的設定,則需要更新串列,特別注意到 if (p->task_time_slice && --p->task_time_slice) 在原本預設是 unsigned int 型態,處理不當會造成 overflow,另一種情況會是一開始 quantum 設定為 0 所導致。
    需根據不同的 task 要補充不同的量 p->weighted_time_slice,不只要讓這支程式重進進入串列,同時需要呼叫 set_tsk_need_resched(p) (參考 kernel/sched_rt.c 代碼),藉以重新呼叫 pick_next_task_weighted_rr(struct rq*),否則這支程序會運行到結束。

Part 2. Shortest Job First Scheduler

  • enqueue_task_weighted_rr()
    與 Weighted round-robin 類似,利用 list 做插入排序,可以在最慘 $\mathcal{O}(n)$ 時間內維護一個優先權由高至低的 list。需要 task 時,直接將串列的首元素移除。

  • dequeue_task_weighted_rr()
    類同 Weighted round-robin。

  • requeue_task_weighted_rr()
    在最短工作優先排程中,由於 task 優先權會變動,不方便確定執行過一陣子的 task 要移動到哪裡,最簡單的實作方式採用 dequeue 後再一次 enqueue。時間複雜度 $\mathcal{O}(n)$

  • yield_task_weighted_rr()
    直接運行 \lstinline{requeue} 即可。

  • pick_next_task_weighted_rr()
    類同 Weighted round-robin。

  • task_tick_weighted_rr()
    在最短工作優先排程中若按照 Weighted round-robin 的寫法且不補充 quantum,則會變成 non-preemptive SJF。相反地,若要實作 preemptive SJF,需要在 check_preempt_curr_weighted_rr(struct rq*, struct task_struct*, int) 檢查執行的程序是不是串列的首元素,意即是不是最高優先權,如果不是最高優先權,則呼叫 resched_task() 即可完成 (詳見 Bonus RMS 的寫法)。

Bonus RMS

額外增加 syscall,修改三個檔案 unistd_32.h, unistd_64.h, syscall_table_32.S。增加 Rate-monotonic scheduler 的額外週期 (period) 和 syscall 函數主題,修改兩個檔案 sched.h, sched.c

程式碼細節 sched_weighted_rr.c

  • enqueue_task_weighted_rr
    使用與最短工作優先排程相同寫法,但利用週期 (period) 進行由優先權高到低排序。
  • check_preempt_curr_weighted_rr
    週期程式可能會搶之前正在運行程式,若發現運行程式的優先權低於要進入排程器的程式優先權,呼叫 resched_task() 重新排程。
  • 其餘類同 SJF。

測試程序 test_rms.c

參考 Project 1 的寫法,linux 針對每一個 thread 有各自計時的計時器,當 thread 被 context-switch 時,計時器也會隨之停止。

利用 main thread 產生週期性程式,pthread_create 並不會馬上將 thread 立即執行,迫使 main thread 的方式使用 sleep(1),以秒為單位產生週期性程式。由於要考慮週期程式的執行時間,又能在輸出緩衝區觀察順序關係,利用 thread 的計時器,盡可能接近 1ms 才進行一次印出,但這也會造成幾毫秒的誤差,對於一支程序一秒大約印出 1000 個字元,測試程式中,將連續相同字元計數,若相同字元取用四捨五入到秒才進行印出,計時誤差部分就不予輸出。

Github

https://github.com/morris821028/hw-realtime-system

Read More +