Numba 教學:加速 Python 科學計算(下)
除非你是進階用戶,否則 你不應該看進階使用章節! 看了反而模糊焦點,應該先掌握基礎使用,因為基礎使用已涵蓋七成以上的使用情境,基礎使用請看教學上篇。
使用 CUDA 加速運算
優化 CUDA 相較於針對 CPU 優化只要加上裝飾器來說更為複雜,因為需要對 CUDA 特別寫函式,導致程式只能在 GPU 上跑,所以筆者目前還沒寫過,不過基本注意事項一樣是注意 IO、工作量太小的不適合 CUDA。
經過一些研究後,筆者認為不該用 Numba 調用 CUDA,工欲善其事,既然程式碼沒有便攜性,那還不如直接用專門優化 CUDA 的套件。根據此篇 reddit 的討論可以看到 CuPy 是一個不錯的選擇,是專門調用 CUDA 的套件。研究途中也發現一個新穎的套件 Taichi 也可以調用 CUDA,稍微看過文檔後其特色大概在專攻物理粒子計算以及支援自動微分,官方也有測試效能的文章,根據該文章的測試結果,我們沒什麼理由使用 Numba 調用 CUDA。
使用字典傳遞參數
在數值模擬中,我們一定會遇到參數量超多的問題,Numba 其實支援用字典傳遞參數。
Signature
顯式的告訴 Numba 輸出輸入型別,某些功能強制要求標示,語法是 list[輸出1(輸入1A, 輸入1B), 輸出2(輸入2A, 輸入2B)]
。float64[:]
表示一維陣列,float64[:,:]
表示二維陣列。
簡單的 Numba signature 範例
import numpy as np
import numba as nb
# 使用顯式 signature
@nb.jit("float64[:](float64[:], float64[:])", nopython=True)
def add_and_sqrt(x, y):
result = np.empty_like(x)
for i in range(len(x)):
result[i] = np.sqrt(x[i] + y[i])
return result
# 不使用顯式 signature
@nb.jit(nopython=True)
def add_and_sqrt_no_sig(x, y):
result = np.empty_like(x)
for i in range(len(x)):
result[i] = np.sqrt(x[i] + y[i])
return result
if __name__ == "__main__":
x = np.random.random(100000)
y = np.random.random(100000)
_ = add_and_sqrt(x, y)
_ = add_and_sqrt_no_sig(x, y)
result_with_sig = add_and_sqrt(x, y)
result_without_sig = add_and_sqrt_no_sig(x, y)
print(f"結果是否相同: {np.allclose(result_with_sig, result_without_sig)}")
# 結果是否相同: True
複雜的 Numba signature 範例
此範例演示一個函式包含多個輸出輸入,也支援多 種輸出輸入維度。
from numba import njit, float64, types
import numpy as np
@njit(
[
types.Tuple((float64, float64))(float64, float64),
types.Tuple((float64[:], float64[:]))(float64[:], float64[:]),
]
)
def cosd(angle1, angle2):
result1 = np.cos(np.radians(angle1))
result2 = np.cos(np.radians(angle2))
return result1, result2
# Test with single values
angle1 = 45.0
angle2 = 90.0
result1, result2 = cosd(angle1, angle2)
print(f"Results for single values: {result1}, {result2}")
# Test with arrays
angles1 = np.array([0.0, 45.0, 90.0])
angles2 = np.array([30.0, 60.0, 90.0])
results1, results2 = cosd(angles1, angles2)
print(f"Results for arrays:\n{results1}\n{results2}")
其他裝飾器
常見裝飾器有
- vectorize
- guvectorize
- jitclass
- stencil
vectorize
vectorize 裝飾器語法限制輸入輸出和所有運算都只能是純量不能是向量,允許把純量操作向量化,並且可以把函式當作 Numpy ufunc 使用。官方文檔花了很大的篇幅在描述該方法可以簡單的建立 Numpy ufunc 函式,因為傳統方法需要寫 C 語言。對於效能,文檔很帥氣的輕描淡寫了一句話:
Numba will generate the surrounding loop (or kernel) allowing efficient iteration over the actual inputs.
官網對如何優化 jit 專門寫了一篇文章,對於 vectorize 的效能僅僅就只寫了這麼一句話,看起來此裝飾器重點好像不是擺在效能上,然而此文章中的 vectorize 速度比起 jit 又快了 20 倍!根據他的解釋 ,vectorize 會告訴額外訊息給 LLVM,於是 LLVM 就可以藉此使用 CPU 的向量運算指令集 SIMD。
下方是基礎語法範例,效能測試我們放到下面強化版的 guvectorize
:
# Edit from: https://github.com/numba/numba/blob/main/numba/tests/doc_examples/test_examples.py
# test_vectorize_multiple_signatures
from numba import vectorize
import numpy as np
# 格式是 list[輸出1(輸入1A, 輸入1B), 輸出2(輸入2A, 輸入2B)]
@vectorize(["int32(int32, int32)",
"int64(int64, int64)",
"float32(float32, float32)",
"float64(float64, float64)"])
def f(x, y):
return x + y # 定義時,函式內只能進行純量運算
a = np.arange(6)
result = f(a, a) # 使用時,允許輸入向量
# result == array([ 0, 2, 4, 6, 8, 10])
correct = np.array([0, 2, 4, 6, 8, 10])
np.testing.assert_array_equal(result, correct)
a = np.linspace(0, 1, 6)
result = f(a, a)
# Now, result == array([0. , 0.4, 0.8, 1.2, 1.6, 2. ])
correct = np.array([0., 0.4, 0.8, 1.2, 1.6, 2. ])
np.testing.assert_allclose(result, correct)
a = np.arange(12).reshape(3, 4)
# a == array([[ 0, 1, 2, 3],
# [ 4, 5, 6, 7],
# [ 8, 9, 10, 11]])
result1 = f.reduce(a, axis=0)
print(result1)
# result1 == array([12, 15, 18, 21])
result2 = f.reduce(a, axis=1)
print(result2)
# result2 == array([ 6, 22, 38])
result3 = f.accumulate(a)
print(result3)
# result3 == array([[ 0, 1, 2, 3],
# [ 4, 6, 8, 10],
# [12, 15, 18, 21]])
result4 = f.accumulate(a, axis=1)
print(result4)
guvectorize
generalized universal vectorizer,強化版的 vectorize,允許輸入是任意數量的 ufunc 元素,接受任意形狀輸入輸出的元素。黑魔法又來 了,這裡官方仍舊沒有描述效能,然而這篇文章中測試 guvectorize
竟然又更快,比 vectorize
還快了六倍。
附上語法範例和效能測試
- 語法示範
- 測試矩陣相乘
- 測試邏輯迴歸
- 測試計算弧長
# 以矩陣相乘示範 guvectorize 的語法
from numba import guvectorize, prange
import numpy as np
import time
# 簽章格式:在 list 中設定輸入選項,依序填寫 "輸入1, 輸入2, 輸出"。此範例可接受四種類型的輸入
# 輸出輸入維度:只需定義維度 (m,n),(n,p)->(m,p),也可以空白表示未指定,函式中不可寫 return
# 函式輸入:最後一項輸入是應該要被回傳的計算結果,guvectorize 不 return 而是直接將結果寫入輸入矩陣
@guvectorize(
[
"float64[:,:], float64[:,:], float64[:,:]",
"float32[:,:], float32[:,:], float32[:,:]",
"int64[:,:], int64[:,:], int64[:,:]",
"int32[:,:], int32[:,:], int32[:,:]",
],
"(m,n),(n,p)->(m,p)",
)
def matrix_multiply(A, B, C):
m, n = A.shape
n, p = B.shape
for i in range(m):
for j in range(p):
C[i, j] = 0
for k in range(n):
C[i, j] += A[i, k] * B[k, j]
# guvectorize with prange
# 測試 nopython, parallel
@guvectorize(
[
"float64[:,:], float64[:,:], float64[:,:]",
"float64[:,:], float64[:,:], float64[:,:]",
"int32[:,:], int32[:,:], int32[:,:]",
"int64[:,:], int64[:,:], int64[:,:]",
],
"(m,n),(n,p)->(m,p)",
nopython=True,
target="parallel",
)
def matrix_multiply_prange(A, B, C):
m, n = A.shape
n, p = B.shape
for i in prange(m):
for j in range(p):
C[i, j] = 0
for k in range(n):
C[i, j] += A[i, k] * B[k, j]
def run_benchmark():
n_runs = 1
N = 100
A = np.random.rand(N, N).astype(np.float64)
B = np.random.rand(N, N).astype(np.float64)
res_python = A @ B
start = time.time()
for _ in range(n_runs):
C = np.empty((N, N), dtype=np.float64)
matrix_multiply(A, B, C)
print("Are the results the same?", np.allclose(C, res_python))
end = time.time()
print("Without prange: Total time for {} runs: {:.4f} seconds".format(n_runs, end - start))
start = time.time()
for _ in range(n_runs):
C_prange = np.empty((N, N), dtype=np.float64)
matrix_multiply_prange(A, B, C_prange)
print("Are the results the same?", np.allclose(C_prange, res_python))
end = time.time()
print("With prange: Total time for {} runs: {:.4f} seconds".format(n_runs, end - start))
run_benchmark()
# Are the results the same? True
# Without prange: Total time for 1 runs: 0.0012 seconds
# Are the results the same? True
# With prange: Total time for 1 runs: 0.0012 seconds
程式碼來自於 Dask + Numba for Efficient In-Memory Model Scoring。
# 測試矩陣相乘的效能
import numpy as np
import time
from numba import njit, guvectorize, prange
n = 250000
x = np.random.poisson(lam=5, size=n)
y, z = np.random.normal(size=(n, 2)).T
overlay = False
n_runs = 100
res = np.zeros((n, 15))
# 原始Python版本
def python_func(x, y, z, overlay=False):
out = np.zeros((x.shape[0], 15))
adj = 1.5 if overlay else 1.0
for t in range(15):
out[:, t] = t * x**2 + y - 2 * z - 2 * t
return adj * out
# @njit 優化版本
@njit
def jitted_func(x, y, z, overlay=False):
out = np.zeros((x.shape[0], 15))
adj = 1.5 if overlay else 1.0
for t in range(15):
out[:, t] = t * x**2 + y - 2 * z - 2 * t
return adj * out
# @njit + signature 優化版本
@njit("float64[:,:](int64[:], float64[:], float64[:], boolean)")
def jitted_func_with_signature(x, y, z, overlay=False):
out = np.zeros((x.shape[0], 15))
adj = 1.5 if overlay else 1.0
for t in range(15):
out[:, t] = t * x**2 + y - 2 * z - 2 * t
return adj * out
# @guvectorize 優化版本
@guvectorize(
"i8, f8, f8, b1, f8[:], f8[:]",
"(), (), (), (), (specifySameDimension) -> (specifySameDimension)",
)
def fast_predict_over_time(x, y, z, overlay, _, out):
adj = 1.5 if overlay else 1.0
for t in range(len(out)):
out[t] = adj * (t * x**2 + y - 2 * z - 2 * t)
# 初始化編譯
res_python = python_func(x, y, z, overlay)
res_jitted = jitted_func(x, y, z, overlay)
res_jitted_signature = jitted_func_with_signature(x, y, z, overlay)
res_fast_pred = fast_predict_over_time(x, y, z, overlay, res)
# 1. 測試原始Python版本
start_time = time.time()
for _ in range(n_runs):
_ = python_func(x, y, z, overlay)
end_time = time.time()
print(f"Time: {(end_time - start_time) / n_runs:.6f} seconds, pure Python")
# 2. 測試 @njit 優化版本
start_time = time.time()
for _ in range(n_runs):
_ = jitted_func(x, y, z, overlay)
end_time = time.time()
print(f"Time: {(end_time - start_time) / n_runs:.6f} seconds, pure @njit")
# 3. 測試 @njit + signature 優化版本
start_time = time.time()
for _ in range(n_runs):
_ = jitted_func_with_signature(x, y, z, overlay)
end_time = time.time()
print(f"Time: {(end_time - start_time) / n_runs:.6f} seconds, @njit with signature")
# 4. 測試 @guvectorize 優化版本
start_time = time.time()
for _ in range(n_runs):
_ = fast_predict_over_time(x, y, z, overlay, res)
end_time = time.time()
print(f"Time: {(end_time - start_time) / n_runs:.6f} seconds, @guvectorize")
print("Are the results the same?", np.allclose(res_python, res_fast_pred))
print("Are the results the same?", np.allclose(res_python, res_jitted_signature))
# Time: 0.039077 seconds, pure Python
# Time: 0.027496 seconds, pure @njit
# Time: 0.027633 seconds, @njit with signature
# Time: 0.005587 seconds, @guvectorize
# Are the results the same? True
# Are the results the same? True