C# 非同步的批量執行

日常生活中很多事物我們會一起做,例如買水果不會一次只買一種、也不會一次只買一顆,去水果攤一趟就是來回的時間成本。在程式中也一樣,我們透過任何形式接口、儲存體也不會一次只拿一筆資料,難道取得 100 筆資料會嘗試呼叫 100 次接口嗎?肯定是不會的。

另一種情境是反過來,接口允許批量呼叫,但想執行的量實在太大了,於是改成小批量執行,這也是今天想分享的內容。

在原本的程式中因為貪圖快速,想要進行分批執行時,很自然地使用了無限迴圈來做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static void RunInfLoop(int[] bigBatch)
{
const int batchSize = 10;
var skipCount = 0;

while (true)
{
var smallBatch = bigBatch.Skip(skipCount).Take(batchSize).ToArray();
if (smallBatch.Length == 0) break;

// DoMethod(smallBatch);

skipCount += BatchSize;
}
}

首先,這段程式是能夠運作沒錯,不過算是不夠嚴謹,當外界需要終止時,沒有人能夠控制這段無限迴圈直到它做完,所以外部被呼叫的儲存體或模組停止服務時,就只能等它噴錯噴到死了。比較正確的方式是加上 CancellationToken 來控制,不太清楚的也可以參考之前文章 CancellationToken 非同步取消工作

其次這樣執行方法很容易因為執行邏輯沒包裝好,導致整個程式進入無窮迴圈。

最後是這個方法不夠通用,若有多個地方要使用這個批次概念,就要寫很多份重複的 code。

於是我就自行調整了內容,加入了非同步呼叫、泛型、委派、參數、CancellationToken:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static async Task BatchExecute<T>(T[] bigBatch, Func<T[], Task> func, CancellationToken cancellationToken)
{
const int batchSize = 10;
var skipCount = 0;

while (!cancellationToken.IsCancellationRequested)
{
var smallBatch = bigBatch.Skip(skipCount).Take(batchSize).ToArray();

if (smallBatch.Length == 0) break;

await func(smallBatch);

skipCount += BatchSize;
}
}

這個方法委派了 func 去執行真正內容,每次丟進去的就是分好小批量的 smallBatch

呼叫方式差不多就是這樣,可以自行變通:

1
2
3
4
5
6
await BatchExecute( new[] { 1, 2, 3 }, async sb =>
{
// do what you want with parameters sb
// await method(sb);

}, cancellationToken);

在做類似東西時,曾有前輩提醒我關於設計架構觀念。

批次執行應該是由被呼叫方(提供服務者)來做。

假設我們是提供服務者,我們無法保證呼叫方會怎麼樣使用這個接口(像是呼叫頻率、每次量大小),因此在提供服務端做批次是比較安全可靠的,且提供服務者可以透過壓測與硬體設備等知道自己的極限來設計最佳 BatchSize。


C# 演進速度之快,省掉很多重功的程式碼,讓大家在維護上會更方便。但這些技巧、語法對於沒碰過的人剛開始會比較辛苦(我是說沒碰過、不是指新手喔),看習慣後就上手了。

Reference

  • 作者: MingYi Chou
  • 版權聲明: 轉載不用問,但請註明出處!本網誌均採用 BY-NC-SA 許可協議。