批改娘 20021. Dynamic Range Sum

contents

  1. 1. 題目描述
    1. 1.1. main.c (測試用)
    2. 1.2. DRS.h
    3. 1.3. DRS.c
  2. 2. 測資限制
  3. 3. 範例輸入
  4. 4. 範例輸出
  5. 5. 編譯參數
  6. 6. Solution

題目描述

在二維平面上有 $N$ 個點,每一個點 $p_i(x_i, y_i)$各自帶權重 $w_i$,這些點不時會移動和改變權重。

現在 Morris 希望你幫忙撰寫線性搜索的函數,好讓他專注數據結構上的調整。函數詢問包含

  • $N$ 個點的資訊 (以 SoA (Structure of Array) 的方式儲存,以達到最好的快取使用率)
  • 詢問的矩形 $\text{Rect}$ (正交於兩軸)

函數必須回傳在矩形內部的點權重和。

1
int32_t search_range(Rect rect, int32_t x[], int32_t y[], int32_t w[], int32_t n);

main.c (測試用)

  • 一開始,在二維空間 $[0, R) \times [0, R)$ 之間產生 $N$ 個點的資訊
  • 接著,模擬 $M$ 次點的變化,並且呼叫 search_range
  • 最後,將所有答案 HASH 輸出一個值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include "DRS.h"

static uint32_t seed = 0;
static void p_srand(uint32_t x) { seed = x;}
static uint32_t p_rand() {return seed = (seed*9301 + 49297);}

static void swap(int *x, int *y) {
int tmp = *x;
*x = *y;
*y = tmp;
}
static inline Rect rand_rect(int R) {
Rect r;
r.lx = p_rand()%R;
r.ly = p_rand()%R;
r.rx = p_rand()%R;
r.ry = p_rand()%R;
if (r.lx > r.rx) swap(&r.lx, &r.rx);
if (r.ly > r.ry) swap(&r.ly, &r.ry);
return r;
}

static void init(int N, int R, int32_t x[], int32_t y[], int32_t w[]) {
for (int i = 0; i < N; i++) {
x[i] = p_rand()%R;
y[i] = p_rand()%R;
w[i] = p_rand()%R;
}
}

static void tick(int N, int R, int32_t x[], int32_t y[], int32_t w[]) {
for (int i = 0; i < 5; i++) {
int idx = p_rand()%N;
x[idx] = p_rand()%R;
y[idx] = p_rand()%R;
}
for (int i = 0; i < 5; i++) {
int idx = p_rand()%N;
w[idx] = p_rand()%R;
}
}

#define MAXN 1048576
int main() {
p_srand(0);
static int32_t x[MAXN], y[MAXN], w[MAXN];
int N = 1000, M = 10000, R = 100;

init(N, R, x, y, w);

int32_t hash = 0;
for (int it = 0; it < M; it++) {
Rect rect = rand_rect(R);
int32_t ret = search_range(rect, x, y, w, N);
hash ^= ret;
tick(N, R, x, y, w);
}
printf("%" PRIi32 "\n", hash);
return 0;
}

DRS.h

1
2
3
4
5
6
7
8
9
10
11
12
13
#ifndef __DRS_H
#define __DRS_H

#include <stdint.h>

typedef struct Rect {
int32_t lx, ly, rx, ry;
} Rect;

int32_t search_range(Rect rect, int32_t x[], int32_t y[],
int32_t w[], int32_t n);

#endif

DRS.c

你的目標是要加速下述函數的計算,通過最低要求加速 2 倍以上。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include "DRS.h"

int32_t search_range(Rect rect, int32_t x[], int32_t y[],
int32_t w[], int32_t n) {
int32_t ret = 0;
for (int i = 0; i < n; i++) {
if (rect.lx <= x[i] && x[i] <= rect.rx &&
rect.ly <= y[i] && y[i] <= rect.ry) {
ret += w[i];
}
}
return ret;
}

測資限制

  • $N \le 131072$
  • $R \le 32768$

範例輸入

no input

範例輸出

1
8967

編譯參數

1
2
3
4
5
all: main

main: main.c
gcc -std=c99 -Ofast -march=native DRS.c -c -o DRS.o
gcc -std=c99 -Ofast -march=native main.c DRS.o -o main

Solution

一般的線性作法,透過 rect.lx <= x[i] && x[i] <= rect.rx && rect.ly <= y[i] && y[i] <= rect.ry 操作,翻譯成組合語言時,四次的 branch & jump,在管線處理上的效能易受到影響。為了使這種 branch 減少而不使管線處理的效能下降,通常會使用查表法來完成,藉由并行的數值計算,在最後一步才進行 load/store 完成指定區塊的指令。

在這一題中,唯有條件式皆成立才執行,由於分枝操作上只有一種情況,故查表法就不適用於此。我們仍可以并行數個矩形判斷,並且存在至少一個矩形成立再運行 load 加總所需的資料,將會給予效能上的大幅提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include "DRS.h"

#include <x86intrin.h>
/*
int32_t search_range(Rect rect, int32_t x[], int32_t y[],
int32_t w[], int32_t n) {
int32_t ret = 0;
for (int i = 0; i < n; i++) {
if (rect.lx <= x[i] && x[i] <= rect.rx &&
rect.ly <= y[i] && y[i] <= rect.ry) {
ret += w[i];
}
}
return ret;
}
*/

int32_t search_range(Rect rect, int32_t x[], int32_t y[], int32_t w[], int32_t n) {
__m128i ret = _mm_set_epi32(0, 0, 0, 0);
rect.lx--, rect.ly--;
rect.rx++, rect.ry++;
__m128i lx = _mm_broadcastd_epi32(*((__m128i *) &rect.lx));
__m128i ly = _mm_broadcastd_epi32(*((__m128i *) &rect.ly));
__m128i rx = _mm_broadcastd_epi32(*((__m128i *) &rect.rx));
__m128i ry = _mm_broadcastd_epi32(*((__m128i *) &rect.ry));
__m128i zo = _mm_set_epi32(0, 0, 0, 0);
__m128i ic = _mm_set_epi32(3, 2, 1, 0);

for (int i = 0; i+4 <= n; i += 4) {
__m128i sx = _mm_load_si128((__m128i *) (x+i));
__m128i sy = _mm_load_si128((__m128i *) (y+i));
__m128i c1 = _mm_and_si128(_mm_cmplt_epi32(lx, sx), _mm_cmplt_epi32(sx, rx));
__m128i c2 = _mm_and_si128(_mm_cmplt_epi32(ly, sy), _mm_cmplt_epi32(sy, ry));
if (_mm_testz_si128(c1, c2) == 0) {
__m128i cc = _mm_and_si128(c1, c2);
__m128i vi = _mm_add_epi32(ic, _mm_set_epi32(i, i, i, i));
__m128i rs = _mm_mask_i32gather_epi32(zo, w+i, ic, cc, 4);
ret = _mm_add_epi32(ret, rs);
}
}

int32_t sum = 0;
for (int i = (n>>2)<<2; i < n; i++) {
if (rect.lx <= x[i] && x[i] <= rect.rx &&
rect.ly <= y[i] && y[i] <= rect.ry) {
sum += w[i];
}
}
static int32_t tmp[4] __attribute__ ((aligned (16)));
_mm_store_si128((__m128i*) &tmp[0], ret);
sum += tmp[0] + tmp[1] + tmp[2] + tmp[3];
return sum;
}