カスタムのジョブタイプ
ジョブシステムの最下位レベルのジョブは、JobsUtility のいずれかの Schedule
関数を呼び出すことでスケジュールが設定されます。すべての既存の ジョブタイプ では、これらの関数を使用しますが、同じ API を使用して特殊なジョブタイプを作成することもできます。
これらの API は安全性の低いコードを使用しており、不要な競合状態が生じやすいため慎重に作成する必要があります。独自のジョブタイプを追加する場合は、テストカバレッジ 100% を目標にすることを強くお勧めします。
例として、カスタムのジョブタイプ IJobParallelForBatch
を用意しています (参照ファイル: /Packages/com.unity.collections/Unity.Collections/Jobs/IJobParallelForBatch.cs)。
これは IJobParallelFor と同じように動作しますが、インデックスごとに実行関数を 1 つ呼び出すのではなく、実行対象のバッチごとに実行関数を 1 つ実行します。1 度に複数の項目に、同時並行で処理を実行する必要がある場合に便利です。このジョブタイプを使用するシナリオとして一般的なのは、一時配列を作成する必要があり、その配列内の項目を 1 つずつ作成するのは避けたい場合などです。IJobParallelFor を使用することで、バッチごとに一時配列を 1 つ作成することができます。
IJobParallelForBatch の例では、ジョブのスケジュールが実際に設定されるエントリーポイントは以下のようになります。
unsafe static public JobHandle ScheduleBatch<T>(this T jobData, int arrayLength, int minIndicesPerJobCount, JobHandle dependsOn = new JobHandle()) where T : struct, IJobParallelForBatch
{
var scheduleParams = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf(ref jobData), ParallelForBatchJobStruct<T>.Initialize(), dependsOn, ScheduleMode.Batched);
return JobsUtility.ScheduleParallelFor(ref scheduleParams, arrayLength, minIndicesPerJobCount);
}
最初の行で、スケジュール設定パラメーターを含む構造体が作成されます。その作成時に、ジョブにコピーされるデータへのポインターを設定する必要があります。ポインターを設定する必要があるのは、これを使用するネイティブコードが型を認識していないためです。 また、以下を呼び出して、作成される JobReflectionData へのポインターを渡す必要もあります。
JobsUtility.CreateJobReflectionData(typeof(T), JobType.ParallelFor, (ExecuteJobFunction)Execute);
JobReflection は、ジョブに含まれる NativeContainers や、ジョブのスケジュール設定時に必要なパッチの適用方法など、ジョブのデータとともに構造体に関する情報を保管します。これはエンジンのネイティブ側に保管され、マネージコードは、型に関する情報なしで、ポインター経由でのみアクセスできます。JobReflectionData を作成する際は、ジョブを実装する構造体の型、JobType、ジョブの実行時に呼び出されるメソッドを指定する必要があります。JobReflectionData は、スケジュールを設定した構造体内のデータではなく、構造体の型のみに依存します。そのため、同じインターフェースを実装するすべてのジョブで 1 回だけ作成する必要があります。現時点では、ジョブタイプは Single と ParallelFor の 2 つのみです。Single の場合、ジョブは 1 回だけ呼び出されます。ParallelFor の場合は、ジョブを処理するために複数回呼び出しが行われ、各呼び出しの対象は、処理するインデックスの範囲のサブセットのみに制限されます。どのジョブタイプを選ぶかが、呼び出しが許可されるスケジュール関数に影響します。
JobsUtility.JobScheduleParameters の 3 つ目のパラメーターは、スケジュールが設定されたジョブで使用する必要がある JobHandle です。
最後のパラメーターはスケジュールモードです。スケジュールモードは、Run と Batched のどちらかを選択します。Batched の場合は、処理を実行するために 1 つ以上のジョブのスケジュールが設定されます。Run の場合は、Schedule が返される前にメインスレッドで処理が実行されます。
スケジュールパラメーターが作成されたら、実際にジョブのスケジュールを設定します。ジョブのスケジュールは、タイプに応じて 3 つの方法で設定できます。
JobHandle Schedule(ref JobScheduleParameters parameters);
JobHandle ScheduleParallelFor(ref JobScheduleParameters parameters, int arrayLength, int innerLoopBatchCount);
JobHandle ScheduleParallelForTransform(ref JobScheduleParameters parameters, IntPtr transfromAccessArray);
Schedule は、JobType.Single で ScheduleParameters が作成された場合のみ使用できます。他の 2 つのスケジュール関数では、JobType.ParallelFor が必要です。 ScheduleParallelFor に渡される arrayLength と innerLoopBatchCount パラメーターは、ジョブが処理する必要があるインデックスの数と、内部ループで処理する必要があるインデックスの数を特定するために使用されます (内部ループのカウントの詳細については、実行と JobRanges のセクションを参照してください)。 ScheduleParallelForTransform は ScheduleParallelFor と同様ですが、GameObjects の Transform コンポーネントを変更できる TransformAccessArray にもアクセスします。インデックスの数とバッチのサイズは、TransformAccessArray から推測されます。
実行と JobRanges
ジョブのスケジュールを設定したら、ネイティブ側から直接指定したエントリーポイントが呼び出されます。これは、MonoBehaviours で Update が呼び出される動作と同様ですが、ジョブ内部から呼び出される点が異なります。ジョブごとに呼び出しが 1 回だけ実行され、ジョブが 1 つか、ワーカースレッドごとにジョブが 1 つのいずれかになります (ParallelFor の場合)。
Execute で使用されるシグネチャは以下のとおりです。
public delegate void ExecuteJobFunction(ref T data, IntPtr additionalPtr, IntPtr bufferRangePatchData, ref JobRanges ranges, int jobIndex);
Single ジョブの場合、必要なのはデータのみとなり、処理を即座に実行できますが、ParallelFor ジョブの場合は、インデックスの処理を開始する前に、いくつかの追加作業が必要になります。インデックスを、各ジョブによって同時並行で処理される複数の連続サブセットに分割する必要があります。これにより、同じ対象を 2 回処理することがなくなり、すべてを確実にカバーできるようになります。メモリのレイアウトによってインデックスの順番が決まります。
JobRanges には、ParallelFor によって処理されることになっているバッチとインデックスが含まれます。インデックスは、バッチのサイズに基づいてバッチに分割されます。各ジョブがメモリの連続するセクションで繰り返し処理を実行できるように、バッチは各ジョブに均等に配分されます。ParallelFor ジョブは以下を呼び出す必要があります。
JobsUtility.GetWorkStealingRange(ref ranges, jobIndex, out begin, out end)
これは false
が返されるまで続き、呼び出し後は、インデックスが begin から end までのすべての項目を処理します。
ジョブが処理する全項目のセットではなく、項目のバッチを取得するのは、Unity ではあるジョブが他のジョブよりも早く完了した場合に ワークスティーリング が適用されるためです。ここでいうワークスティーリングとは、完了済みのジョブが実行中の他のジョブを見て、大量の処理が残っているジョブがないかチェックすることを指します。未完了のジョブが検出された場合は、まだ開始されていない一部のバッチを取り、処理を動的に配分し直します。
ParallelFor ジョブは、項目の処理を開始する前に、処理対象の項目の範囲で NativeContainers への書き込みアクセスを制限する必要があります。そうしないと、複数のジョブが同一のインデックスに書き込み、競合状態が生じる可能性があります。制限する必要がある NativeContainers がジョブに渡され、それにパッチを適用する関数があります。これは、適切な範囲にない項目にはアクセスできないようにするためです。こうした処理を行うためのコードは以下のようになります。
# if ENABLE_UNITY_COLLECTIONS_CHECKS
JobsUtility.PatchBufferMinMaxRanges(bufferRangePatchData, UnsafeUtility.AddressOf(ref jobData), begin, end - begin);
# endif
カスタムの NativeContainers
ジョブを作成する際に特に対処するのが難しいのは、ジョブ間のデータ通信です。NativeArray を使用するだけでは限界があります。NativeQueue、NativeHashMap、NativeMultiHashMap、およびその Concurrent バージョンも使用することで、ほとんどのシナリオに対応できます。
それ以外のシナリオでは、独自にカスタムの NativeContainers を作成できます。 スレッド同期化 用のカスタムのコンテナを作成する際は、適切なコードを記述することが非常に大事です。新しいコンテナを追加する際は、テストカバレッジ 100% にすることをお勧めします。
これの非常に簡単な例として、NativeCounter.Concurrent を介して ParallelFor ジョブで増分できる NativeCounter を作成し、以降のジョブかメインスレッドで読み取ります。
では基本的なコンテナタイプから始めます。
// この構造体を NativeContainer としてマークします。これは通常、コンテナ用のジェネリック構造体ですが、カウンターをジェネリックにする必要はありません
// TODO - このカウンターをジェネリックにする必要がないのはなぜでしょうか。その理由として、引数のことを説明してください。
[StructLayout(LayoutKind.Sequential)]
[NativeContainer]
unsafe public struct NativeCounter
{
// 割り当てられたカウントへの実際のポインターは、このコンテナでジョブのスケジュールを設定できるように、制限を緩和する必要があります
[NativeDisableUnsafePtrRestriction]
int* m_Counter;
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle m_Safety;
// 破棄番兵はメモリリークを追跡します。これはマネージタイプなので、ジョブのスケジュール設定時にクリアされて null になります
// このジョブはコンテナを破棄することはできません。ジョブが実行されるまで、コンテナを破棄できるものは他にもないため、渡さなくても問題ありません
// この属性は必須です。この属性がないと、ジョブのアクセス権がマネージオブジェクトに付与されるため、この NativeContainer をジョブに渡すことができません
[NativeSetClassTypeToNullOnSchedule]
DisposeSentinel m_DisposeSentinel;
# endif
// このメモリが割り当てられた場所を追跡します
Allocator m_AllocatorLabel;
public NativeCounter(Allocator label)
{
// Blittable 型の int を常に使用するため、このチェックは不要です。
// ジェネリック型の型の正しさをチェックする方法の一例です。
# if ENABLE_UNITY_COLLECTIONS_CHECKS
if (!UnsafeUtility.IsBlittable<int>())
throw new ArgumentException(string.Format("{0} used in NativeQueue<{0}> must be blittable", typeof(int)));
# endif
m_AllocatorLabel = label;
// 単精度整数用のネイティブメモリを割り当てます
m_Counter = (int*)UnsafeUtility.Malloc(UnsafeUtility.SizeOf<int>(), 4, label);
// メモリリークを追跡するための破棄番兵を作成します。これにより AtomicSafetyHandle も作成されます
# if ENABLE_UNITY_COLLECTIONS_CHECKS
DisposeSentinel.Create(out m_Safety, out m_DisposeSentinel, 0);
# endif
// 非初期化データを回避するために、カウントを 0 に初期化します
Count = 0;
}
public void Increment()
{
// 呼び出し元にこのデータの書き込み権限があるか確認します。
// これは競合状態に対する防止策です。これらのチェックがないと AtomicSafetyHandle が機能しません
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
# endif
(*m_Counter)++;
}
public int Count
{
get
{
// 呼び出し元にこのデータの読み取り権限があるか確認します。
// これは競合状態に対する防止策です。これらのチェックがないと AtomicSafetyHandle が機能しません
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
# endif
return *m_Counter;
}
set
{
// 呼び出し元にこのデータに対する書き込み権限があるか確認します。これは競合状態に対する防止策です。これらのチェックがないと AtomicSafetyHandle が機能しません
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
# endif
*m_Counter = value;
}
}
public bool IsCreated
{
get { return m_Counter != null; }
}
public void Dispose()
{
// 破棄番兵がメモリリークを報告しないように、データが解放されたことを認識できるようにします
# if ENABLE_UNITY_COLLECTIONS_CHECKS
DisposeSentinel.Dispose(m_Safety, ref m_DisposeSentinel);
# endif
UnsafeUtility.Free(m_Counter, m_AllocatorLabel);
m_Counter = null;
}
}
これにより、カウントを get、set、増分できる簡単な NativeContainer が作成されます。このコンテナはジョブに渡すことができますが、NativeArray と同じ制限が適用されるため、書き込み権限で ParallelFor ジョブに渡すことはできません。
次のステップは、ParallelFor で利用できるようにすることです。競合状態を避けるため、ParallelFor による書き込み中は、他の要素が読み取ることができないようにします。そのために、複数の書き込み元を処理できるものの、読み取り元は処理できない Concurrent という内部構造体を別途作成します。NativeCounter.Concurrent は、NativeCounter 外に別個に保管することはできないため、通常の NativeCounter 内から割り当てられるようにします。
[NativeContainer]
// この属性によって、ParallelFor ジョブで NativeCounter.Concurrent を利用できるようになります
[NativeContainerIsAtomicWriteOnly]
unsafe public struct Concurrent
{
// 完全な NativeCounter からのポインターのコピー
[NativeDisableUnsafePtrRestriction]
int* m_Counter;
// 完全な NativeCounter からの AtomicSafetyHandle のコピー。この内部構造体はメモリを保有せず、解放する役割も担っていないため、破棄番兵はコピーされません。
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle m_Safety;
# endif
// これにより、NativeCounter から NativeCounter.Concurrent への割り当てが可能になります
public static implicit operator NativeCounter.Concurrent (NativeCounter cnt)
{
NativeCounter.Concurrent concurrent;
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(cnt.m_Safety);
concurrent.m_Safety = cnt.m_Safety;
AtomicSafetyHandle.UseSecondaryVersion(ref concurrent.m_Safety);
# endif
concurrent.m_Counter = cnt.m_Counter;
return concurrent;
}
public void Increment()
{
// Increment は引き続き書き込み権限をチェックする必要があります
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
# endif
// 実際の増分はアトミックを使用して実装されます。複数のスレッドによって同時に増分できるためです
Interlocked.Increment(ref *m_Counter);
}
}
この設定により、以下のように内部の Concurrent 構造体を介して、NativeCounter への書き込み権限で ParallelFor のスケジュールを設定できます。
struct CountZeros : IJobParallelFor
{
[ReadOnly]
public NativeArray<int> input;
public NativeCounter.Concurrent counter;
public void Execute(int i)
{
if (input[i] == 0)
{
counter.Increment();
}
}
}
var counter = new NativeCounter(Allocator.Temp);
var jobData = new CountZeros();
jobData.input = input;
jobData.counter = counter;
counter.Count = 0;
var handle = jobData.Schedule(input.Length, 8);
handle.Complete();
Debug.Log("The array countains " + counter.Count + " zeros");
counter.Dispose();
キャッシュの使用方法の改善
前のセクションの NativeCounter は、カウンターを適切に機能するよう実装したものですが、ParallelFor のすべてのジョブが値を増分するために同じアトミックにアクセスします。これは、すべてのスレッドで同じキャッシュ行が使用されるため、最適とは言えません。 NativeContainers におけるこの問題の一般的な解決方法は、別々のキャッシュ行に保存されるローカルキャッシュをワーカースレッドごとに用意することです。
[NativeSetThreadIndex] 属性は、ワーカースレッドのインデックスを挿入できます。このインデックスは、ParallelFor jobs ジョブから NativeContainer へのアクセス時に、必ず一意になります。
そのような最適化を行うためには、いくつかの変更が必要です。まず、データのレイアウトを変更する必要があります。パフォーマンス上の理由から、偽共有 を避けるため、単一の int ではなく、ワーカースレッドごとに完全なキャッシュ行が 1 つずつ必要です。
最初にキャッシュ行の int 数の定数を追加します。
public const int IntsPerCacheLine = JobsUtility.CacheLineSize / sizeof(int);
次に、割り当てられるメモリの量を変更します。
// 考えられるワーカーインデックスごとに、完全なキャッシュ行 (cacheline あたりの整数 * 整数のサイズ) が 1 つ (JobsUtility.MaxJobThreadCount)
m_Counter = (int*)UnsafeUtility.Malloc(UnsafeUtility.SizeOf<int>()*IntsPerCacheLine*JobsUtility.MaxJobThreadCount, 4, label);
メインの非コンカレントのバージョンからカウンターにアクセスする際は、書き込み元が 1 つだけの場合もあるため、増加関数で新しいメモリレイアウトに対応できます。
count
の get
と set
では、考えられるすべてのワーカーインデックス全体でループさせる必要があります。
public int Count
{
get
{
// 呼び出し元がこのデータの読み取り権限を持っているか確認します。
// これは競合状態に対する防止策です。これらのチェックがないと AtomicSafetyHandle が機能しません
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
# endif
int count = 0;
for (int i = 0; i < JobsUtility.MaxJobThreadCount; ++i)
count += m_Counter[IntsPerCacheLine * i];
return count;
}
set
{
// 呼び出し元がこのデータの読み取り権限を持っているか確認します。
// これは競合状態に対する防止策です。これらのチェックがないと AtomicSafetyHandle が機能しません
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
# endif
// ローカルにキャッシュされたカウントをすべて消去します。
// 最初のカウントを必須の値に設定します
for (int i = 1; i < JobsUtility.MaxJobThreadCount; ++i)
m_Counter[IntsPerCacheLine * i] = 0;
*m_Counter = value;
}
}
最後の変更は、ワーカーインデックスを挿入する必要がある内部の Concurrent 構造体です。各ワーカーは 1 度に 1 つずつジョブを実行するため、ローカルのカウントにアクセスするだけの場合では、アトミックを使用する必要はなくなります。
[NativeContainer]
[NativeContainerIsAtomicWriteOnly]
// 現在のワーカーインデックスをこのコンテナに挿入する必要があることをジョブシステムに通知します
unsafe public struct Concurrent
{
[NativeDisableUnsafePtrRestriction]
int* m_Counter;
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle m_Safety;
# endif
// 現在のワーカースレッドインデックス。これは挿入されるため、まったく同じ名前を使用する必要があります
[NativeSetThreadIndex]
int m_ThreadIndex;
public static implicit operator NativeCacheCounter.Concurrent (NativeCacheCounter cnt)
{
NativeCacheCounter.Concurrent concurrent;
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(cnt.m_Safety);
concurrent.m_Safety = cnt.m_Safety;
AtomicSafetyHandle.UseSecondaryVersion(ref concurrent.m_Safety);
# endif
concurrent.m_Counter = cnt.m_Counter;
concurrent.m_ThreadIndex = 0;
return concurrent;
}
public void Increment()
{
# if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
# endif
// ローカルのカウントを増分するだけなので、アトミックは不要です
++m_Counter[IntsPerCacheLine*m_ThreadIndex];
}
}
この方法で NativeCounter を作成することで、複数のスレッドで書き込みを行うオーバーヘッドを大幅に削減できます。ただしデメリットもあり、メインスレッドのカウントを取得するコストが大幅に増加します。ローカルのキャッシュをすべてチェックし、集計する必要があるためです。このことを踏まえ、戻り値を確実にキャッシュできるのであれば、多くの場合は効果的な方法になりますが、データ構造の制限を把握しておく必要があります。そのため、パフォーマンス上の特性を文書化しておくことを強くお勧めします。
テスト
NativeCounter の作成はこれで終わりではありません。残すはテストを追加し、適切に動作しているか、今後も破綻することがないかを確認することだけです。テストを作成する際は、通常とは異なるシナリオをできるだけ多くカバーする必要があります。すべてを検出できる可能性は低いですが、ジョブを使用して一種のストレステストを実施し、競合状態を検出する方法も効果的です。NativeCounter API は非常に容量が小さいため、必要なテスト数はそれほど多くなりません。
- 両方のバージョンのカウンターの例は、/Assets/NativeCounterDemo で参照できます。
- これらのテストは /Assets/NativeCounterDemo/Editor/NativeCounterTests.cs で確認できます。
使用可能な属性
NativeCounter は多数の属性を使用しますが、別のタイプのコンテナでは、利用できる属性が他にもいくつかあります。NativeContainer 構造体で利用できる属性のリストは以下のとおりです。
- NativeContainer - 構造体を、すべてのネイティブコンテナの NativeContainer.Required としてマークします。
- NativeContainerSupportsMinMaxWriteRestriction - NativeContainer が書き込み可能な範囲を最小と最大のインデックスの間に制限することを示します。これは、処理対象ではないインデックスにジョブが書き込みを行わないようにするためにコンテナを IJobParallelFor に渡す際に使用します。これを使用するためには、NativeContainer に、メンバーの int m_Length、int m_MinIndex、int m_MaxIndex がこの順番で、他のメンバーを間に挟まずに含まれている必要があります。コンテナは、最小値と最大値の範囲外の書き込みに対して例外をスローする必要もあります。
- NativeContainerIsAtomicWriteOnly - NativeContainer がアトミックの書き込みを使用し、アトミックの読み取りは使用しないことを示します。これを追加することで、書き込み先のインデックスを制限することなく、書き込み可能として NativeContainer を IJobParallelFor に渡すことができます。
- NativeContainerSupportsDeallocateOnJobCompletion - NativeContainer を DeallocateOnJobCompletion で使用できるようにします。これを使用するためには、NativeContainer に、m_Buffer の単一のアロケーションと、m_AllocatorLabel のアロケーターラベル、m_DisposeSentinel の破棄番兵が必要です。
- NativeSetThreadIndex - ジョブのスレッドインデックスを使用して int にパッチを適用します。
ネイティブコンテナの構造体自体のこうした属性以外にも、ネイティブコンテナのメンバーで使用できる属性がいくつかあります。
- NativeDisableUnsafePtrRestriction - ポインターが含まれていても、NativeContainer をジョブに渡せるようにします (通常はポインターが含まれていると渡すことができません)。
- NativeSetClassTypeToNullOnSchedule - マネージオブジェクトが含まれていても、NativeContainer をジョブに渡せるようにします。マネージオブジェクトは、ジョブに渡されるコピーで
null
に設定されます。