Home About Contact
C# , .NET

C# .NET で Task を使って非同期に処理を実行(その2)

なんとなく Task の使い方がわかったので、 今回は Javaの newSingleThreadScheduledExecotor() の ExectorService と似た作動をするコードを C# で実装してみます。 実行するタスク(アクション)は Fire-and-forget 形式。

Javaで実装する場合、雰囲気としてこんな感じのコード:

public class MyTask implements Runnable {
    private String it;
    public MyTask(String it){
        this.it = it;
    }
    @Override
    public void run(){
        System.out.println("Hello, World! ("+it+")");
    }
};

ExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.submit(new MyTask("1"));
executor.submit(new MyTask("2"));
executor.submit(new MyTask("3"));

// タスクの処理の完了を待つ.
executor.shutdown();
while( !executor.isTerminated() ){
    Thread.sleep(1000);
}

これを C# で実現したい。

環境

今回は Windows 11 にインストールした .NET 6 を WSL Ubuntu 20.04 上で実行しています。

$ dotnet.exe --version
6.0.101

HelloThread プロジェクトの作成

$ dotnet.exe new console -n HelloThread
$ cd HelloThread
$ dotnet.exe run
Hello, World!

前回同様に Hello, World! (1) のように挨拶のあとに番号を出力するアクションをつくります。 Program.cs を以下のように編集:

namespace HelloThread
{
    class Program
    {
        private static Action createAction(string it)
        {
            return ()=>{
                Console.WriteLine($"Hello, World! ({it})");
            };
        }

        static void Main(string[] args)
        {
            var list = new List<string>{"1","2","3","4","5"};
            for(int i=0; i<list.Count; i++)
            {
                var action = createAction( list[i] );
                action();
            }
        }
    }
}

とりあえずそのままアクションを実行するだけのコードです。

実行してみます。

$ dotnet.exe run
Hello, World! (1)
Hello, World! (2)
Hello, World! (3)
Hello, World! (4)
Hello, World! (5)

次に、 アクションを非同期で実行するように変更。 action() する代わりに、 Task.Run(action) するだけです。

namespace HelloThread
{
    class Program
    {
        private static Action createAction(string it)
        {
            return ()=>{
                Console.WriteLine($"Hello, World! ({it})");
            };
        }

        static void Main(string[] args)
        {
            var list = new List<string>{"1","2","3","4","5"};
            for(int i=0; i<list.Count; i++)
            {
                var action = createAction( list[i] );
                //action();
                Task.Run(action);
            }

            Thread.Sleep(1000);
        }
    }
}

実行します:

$ dotnet.exe run
Hello, World! (3)
Hello, World! (1)
Hello, World! (4)
Hello, World! (2)
Hello, World! (5)

非同期で実行すると、アクションの実行順は不定です。実行するたびに毎回実行順は変わります。 これを Java の SingleThreadScheduledExecutor のようにアクションの登録順に非同期にアクションを実行するようにしてみます。

SingleThreadScheduledExector の振る舞いに合わせる

このアクションの登録順に実行するための管理クラスをつくります。 名前を SingleThreadActionExecutor として Submit(Action) すれば非同期に実行かつ Submit した順に実行できるようにします。

方針としては:

ということにします。 つまり、具体的には:

という振る舞いをするように実装します。

//SingleThreadActionExector.cs

namespace HelloThread
{
    class SingleThreadActionExecutor
    {
        private delegate void ActionFinishListener();
    
        private object lockObject = new object();
        private int actionCount = 0;
        private Queue<Action> actionQueue = new Queue<Action>();

        public void Submit(Action action){
            bool isTaskCountZero = false;
            lock(lockObject){
                isTaskCountZero = (actionCount==0);
                actionCount += 1;
    
                if(!isTaskCountZero){
                    actionQueue.Enqueue(action);
                }
            }
    
            if(isTaskCountZero){
                Run(action, DoActionFinishListener);
            }
        }
    
        private void Run(Action action, ActionFinishListener listener)
        {
            Task.Run( ()=>
            {
                action();
                listener();
            });
        }
    
        private void DoActionFinishListener()
        {
            Action? nextAction = null;
            lock(lockObject){
                actionCount += -1;
    
                if(actionCount>0 && actionQueue.Count>0){
                    nextAction = actionQueue.Dequeue();
                }
            }
    
            if( nextAction!=null ){
                Run(nextAction, DoActionFinishListener);
            }
        }
    
        public int UncompletedActionCount()
        {
            lock(lockObject){
                return actionCount;
            }
        }
    }
}

actionCount, actionQueue の操作や値の取得時には lock(lockObject) {} 内で処理するようにしています。

Program.cs は次のように変更します。

namespace HelloThread
{
    class Program
    {
        private static Action createAction(string it)
        {
            return ()=>{
                Console.WriteLine($"Hello, World! ({it})");
            };
        }

        static void Main(string[] args)
        {
            var executor = new SingleThreadActionExecutor();

            var list = new List<string>{"1","2","3","4","5"};
            for(int i=0; i<list.Count; i++)
            {
                var action = createAction( list[i] );
                //Task.Run(action);
                executor.Submit(action);
            }

            Thread.Sleep(1000);
        }
    }
}

変更点は、あらかじめ executor を生成しておき、 Task.Run(action) する代わりに executor.Submit(action) するだけです。

実行すると、アクションの登録順に非同期で実行できています。

$ dotnet.exe run
Hello, World! (1)
Hello, World! (2)
Hello, World! (3)
Hello, World! (4)
Hello, World! (5)

executor.UncompletedActionCount() で未処理のアクション数がわかるので、 Thread.Sleep(1000); ではなく、以下のようにして登録したアクションが全部処理されるまで 確実に メインスレッドをブロックしておくことができます。

while( executor.UncompletedActionCount()>0 )
{
    Thread.Sleep(100);
}

アプリ起動中に次々に Fire-and-forget タイプのアクション(たとえば保存処理とか)を非同期で実行して、 アプリ終了時に、それらが全部処理完了したのを確認してからアプリを終了する、という振る舞いをつくるときに 使えるのではないかと。

なんとなく、そのうち「実は そういう ExecutorService 的なものが C# にあった」それを使え、となる気がしないでもない。

以上です。

追伸

SingleThreadActionExecutor.cs の改良版 SingleThreadActionExecutor2.cs。 delegate を使わないでインタフェースで実装した。Javaっぽい実装。 この方が腑に落ちる気がするが、インタフェースにした結果、 無駄に Finish メソッドを public にするという副作用あり。

namespace HelloThread
{
    interface IActionListener
    {
        void Finish();
    }

    class SingleThreadActionExecutor2: IActionListener
    {
        private object lockObject = new object();
        private int actionCount = 0;
        private Queue<Action> actionQueue = new Queue<Action>();

        public void Submit(Action action){
            bool isTaskCountZero = false;
            lock(lockObject){
                isTaskCountZero = (actionCount==0);
                actionCount += 1;
    
                if(!isTaskCountZero){
                    actionQueue.Enqueue(action);
                }
            }
    
            if(isTaskCountZero){
                Run(action, this);
            }
        }
    
        private void Run(Action action, IActionListener listener)
        {
            Task.Run( ()=>
            {
                action();
                listener.Finish();
            });
        }
    
        public void Finish()
        {
            Action? nextAction = null;
            lock(lockObject){
                actionCount += -1;
    
                if(actionCount>0 && actionQueue.Count>0){
                    nextAction = actionQueue.Dequeue();
                }
            }
    
            if( nextAction!=null ){
                Run(nextAction, this);
            }
        }
    
        public int UncompletedActionCount()
        {
            lock(lockObject){
                return actionCount;
            }
        }
    }
}