キャッシュを考慮したコードを書いてみよう!
どうもこんにちは、勤続3年目にして業界3年目のプログラマーです。
今回はプログラムのお話をしていこうかと思います。
プログラムを行っていく上で最適化というものが必要になるケースが度々あると思いますが、
その最適化の手法の一つにキャッシュ・ブロッキングという方法があります。
キャッシュ・ブロッキングとは?
キャッシュ・ブロッキングはキャッシュに収まるようにデータ構造をブロック化します。
ブロック化の目的は、複数回使用されるデータをキャッシュに保持することでデータの再利用の機会を活用することにあります。
詳しい話はググってもらうとして、今日は実際にキャッシュブロッキングを考慮したプログラムを紹介していこうと思います。
実際のプログラム例
わかりやすい例が行列積の計算なので行列積のプログラムを紹介します。
というのも1年以上前にキャッシュブロッキングを使った行列積のコードを書いたのを思い出したのでこれをネタにしたブログを書いたというのもありますが;;
ちなみに実行環境は
- CPU: core i7-6700K
- OS: ubuntu LTS16.04
- コンパイラ: g++ 5.40
- 最適化オプション: -march=native -mtune=native -O3
となっています。
だいたい2年前ぐらいの環境です。
最適化なしの行列積プログラム
#include <iostream>
#include <chrono>
#include <iomanip>
#include <limits>
// N x Nの行列を扱う
static constexpr int N = 1024;
// 行列の積を計算する
void matrix_multiply(double* A, double* B, double* C)
{
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
double c = C[i + N * j];
for (int k = 0; k < N; k++) {
c += A[i + N * k] * B[k + N * j];
}
C[i + N * j] = c;
}
}
}
// メイン関数
int main(void)
{
// 行列を定義
static double matrixA[N * N];
static double matrixB[N * N];
static double matrixC[N * N];
for (int i = 0; i < N * N; i++) {
matrixA[i] = static_cast<double>(i + 1);
matrixB[i] = static_cast<double>(-i - 1);
matrixC[i] = static_cast<double>(0);
}
auto start = std::chrono::high_resolution_clock::now();
matrix_multiply(matrixA, matrixB, matrixC);
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Calculator time is " << std::chrono::duration<double>(end - start).count() << " seconds." << std::endl;
std::cout << "Calculator result is " << std::setprecision(std::numeric_limits<double>::max_digits10) << matrixC[N * N - 1] << "." << std::endl;
return 0;
}
結果…
Calculator time is 2.51253 seconds.
Calculator result is -563316457472000.
024 * 1024行列同士の乗算プログラムです。
とりあえず10回ほど上のプログラムを回したところ、
平均およそ2.496秒
という感じになりました。
実際には10回という数字はかなり物足りないものだとは思いますが、気になる方は実際に上のコードを実行してみてください。
キャッシュを考慮したプログラム
上で挙げたmatrix_multiply関数にてdo_block関数を呼び出します。
static constexpr int BLOCK_SIZE = 32;
inline void do_block(int si, int sj, int sk, double* A, double* B, double* C)
{
for (int i = si; i < si + BLOCK_SIZE; i++) {
for (int j = sj; j < sj + BLOCK_SIZE; j++) {
double cij = C[i + N * j];
for (int k = sk; k < sk + BLOCK_SIZE; k++) {
cij += A[i + N * k] * B[k + N * j];
}
C[i + N * j] = cij;
}
}
}
// 行列の積を計算する
void matrix_multiply(double* A, double* B, double* C)
{
for (int sj = 0; sj < N; sj += BLOCK_SIZE) {
for (int si = 0; si < N; si += BLOCK_SIZE) {
for (int sk = 0; sk < N; sk += BLOCK_SIZE) {
// BLOCK_SIZE単位で処理を行う
do_block(si, sj, sk, A, B, C);
}
}
}
}
結果…
Calculator time is 1.16565 seconds.
Calculator result is -563316457472000.
BLOCK_SIZE*BLOCK_SIZEの部分行列を扱うことになります。今回は32*32の部分行列です。
double型(=8bytes)の要素を32*32=1024つをもつ行列3つのサイズ合計は24KiBなのでCore i7の32KiBのキャッシュに余裕を残して収まっています。
パタへネ本を参考にして書いたので詳細が気になる方は読んでみてください。
10回ほど上のプログラムを回したところ、
平均1.165秒
という感じになりました。さっきの1/2以下になりましたね。約2.14倍速です。
おまけ(OpenMPを使ったコード)
#pragma omp parallel forの一行を付け足すだけです。
void matrix_multiply(double* A, double* B, double* C)
{
#pragma omp parallel for
for (int sj = 0; sj < N; sj += BLOCK_SIZE) {
for (int si = 0; si < N; si += BLOCK_SIZE) {
for (int sk = 0; sk < N; sk += BLOCK_SIZE) {
// BLOCK_SIZE単位で処理を行う
do_block(si, sj, sk, A, B, C);
}
}
}
}
結果…
Calculator time is 0.248215 seconds.
Calculator result is -563316457472000.
めっちゃ早くなりましたね。約10.1倍速です^^
ちなみにAVXやループ展開などの手法を加えることでさらに早くなります。みなさんも興味があったら試してみてください。