xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (2) - オブザーバーパターン

Rxの根幹となる考え方はオブザーバーパターンです。まずはこのオブザーバーパターンについて見て行きます。

オブザーバーパターンの概要

オブジェクト指向プログラミングにおいては、「あるオブジェクトが変化したとき、その変更に合わせて他のオブジェクトが行動を起こす」というケースが多々あります。よくある例としては「データが変更されたときにUIを更新する」などが当てはまります。このようなとき、「データがUIを知っていて、UIの更新メソッドをそのまま呼び出す」というのは極めて汎用性が低い設計です。データはUIに依存するべきではないからです。「オブジェクトがクラスに依存せずに他のオブジェクトに状態変化を通知するにはどうすればよいか」。この問題をスマートに解決するために考えられたデザインパターンがオブザーバーパターンです。

オブザーバーパターンは監視対象 (サブジェクト/プロバイダー) と観測者 (オブザーバー/リスナー) から成ります。「監視対象のオブジェクトを観測者オブジェクトが監視していて、監視対象が変化したのを観測者が確認したら特定のアクションを起こす」の考え方が基本です。身近な例で言えば、「夜空を見ている人が流れ星が通った数を数える」などでしょうか。しかし実際の実装は逆で、「監視対象のオブジェクトに観測者オブジェクトを登録して、監視対象が変化したときに観測者に変更を通知する」となります。このとき重要なのは、監視対象は観測者 (の型) を直接知らないようにすることです。この抽象化を実現するために、一般的にはインターフェースが利用されます。.NET Framework 4では、監視対象のインターフェースとしてIObservable<T>が、観測者のインターフェースとしてIObserver<T>が導入されました。オブザーバーパターン自体はずっと昔から存在するのに、なぜ今更になって.NET Frameworkにこのインターフェースが導入されたのか。それは、今後Rxが.NET Framework標準で導入されるであろうことが大きく関係しているのではないかと思います。

IObservable<T>

IObservable<T>Subscribeメソッドのみを持ちます。引数として変更の通知先であるIObserver<T>を要求しており、IObservable<T>の具象クラスではその通知先を記憶することが求められます。変更の通知を行う必要がある場合、Subscribeメソッドで登録されたすべてのIObserver<T>に対して特定のメソッドを呼び出します。また、戻り値はIDisposableであり、そのDisposeメソッドを呼び出すことで通知を止められるようにします。ジェネリック型Tには、変更通知に用いる型を指定します。

メソッド 説明
Subscribe 通知先を登録します。

IObserver<T>

IObserver<T>には、IObservable<T>から状態変更を通知してもらうための3つのメソッドがあります。IObservable<T>は自身の状態の変化に応じてこれらを適切に呼び出すことが求められ、IObserver<T>の実装ではそれに適切に応答することが求められます。

メソッド 説明
OnNext 状態が変化したことを通知します
OnCompleted 状態変化の通知が完了したことを通知します
OnError 何らかのエラーが発生したことを通知します

簡単な例題で理解する

説明が悪いこともあって文章だけではイメージが掴みにくいと思うので、簡単な実装をすることで確認していきます。今回は、とあるオンラインニュース放送局とニュースの受信者がいることを想定します。「放送局にユーザー登録を行い、登録しているユーザーに対してニュース配信を行う」というものです。「ニュース配信の終了」や「ユーザー登録を解除する」という考え方も一緒に学びます。

Push

まず、配信するニュースを定義します。ここではフィールドとしてタイトルと内容のみを持つようにします。

namespace Sample01_ObserverPattern
{
    /// <summary>
    /// ニュースを表します。
    /// </summary>
    class News
    {
        /// <summary>
        /// タイトルを取得します。
        /// </summary>
        public string Title{ get; private set; }
 
        /// <summary>
        /// 内容を取得します。
        /// </summary>
        public string Content{ get; private set; }
 
        /// <summary>
        /// コンストラクタ
        /// </summary>
        /// <param name="title">タイトル</param>
        /// <param name="content">内容</param>
        public News(string title, string content)
        {
            this.Title   = title;
            this.Content = content;
        }
    }
}

次に受信者を定義します。複数の受信者を区別できるように名前を設定するようにしています。また、ニュースを受信したときの挙動も記述します。ここでは、コンソール画面にどのメソッドが呼ばれたかを出力するようにしています。

using System;
 
namespace Sample01_ObserverPattern
{
    /// <summary>
    /// 受信者としての機能を提供します。
    /// </summary>
    class Receiver : IObserver<News>
    {
        /// <summary>
        /// 受信者名を取得します。
        /// </summary>
        public string Name{ get; private set; }
 
        /// <summary>
        /// コンストラクタ
        /// </summary>
        /// <param name="name">受信者名</param>
        public Receiver(string name)
        {
            this.Name = name;
        }
 
        /// <summary>
        /// プロバイダーがプッシュベースの通知の送信を完了したことをオブザーバーに通知します。
        /// </summary>
        public void OnCompleted()
        {
            Console.WriteLine("{0} : OnCompleted", this.Name);
        }
 
        /// <summary>
        /// プロバイダーでエラー状態が発生したことをオブザーバーに通知します。
        /// </summary>
        /// <param name="error">エラーに関する追加情報を提供するオブジェクト</param>
        public void OnError(Exception error)
        {
            Console.WriteLine("{0} : OnError", this.Name);
        }
 
        /// <summary>
        /// オブザーバーに新しいデータを提供します。
        /// </summary>
        /// <param name="value">現在の通知情報</param>
        public void OnNext(News value)
        {
            Console.WriteLine("{0} : OnNext [{1} - {2}]", this.Name, value.Title, value.Content);
        }
    }
}

さらに、オンラインニュース放送局を定義します。Subscribeメソッドにより登録された受信者を記憶し、受信対象者に対してニュースを配信するようにしています。配信を終了する機能も付いています。Subscribeメソッドの戻り値を用いてユーザー登録解除を行えるようにします。

using System;
using System.Collections.Generic;
 
namespace Sample01_ObserverPattern
{
    /// <summary>
    /// オンラインニュース配信元としての機能を提供します。
    /// </summary>
    class WebcastingStation : IObservable<News>
    {
        /// <summary>
        /// 購読を解除する機能を提供します。
        /// </summary>
        private class Unsbscriber : IDisposable
        {
            /// <summary>
            /// オブザーバーのコレクションを保持します。
            /// </summary>
            private readonly LinkedList<IObserver<News>> observers = null;
 
            /// <summary>
            /// オブザーバーを保持します。
            /// </summary>
            private readonly IObserver<News> observer = null;
 
            /// <summary>
            /// コンストラクタ
            /// </summary>
            /// <param name="observers">オブザーバーを保持しているコレクション</param>
            /// <param name="observer">オブザーバー</param>
            public Unsbscriber(LinkedList<IObserver<News>> observers, IObserver<News> observer)
            {
                this.observers = observers;
                this.observer  = observer;
            }
 
            /// <summary>
            /// 使用していたリソースを解放します。
            /// </summary>
            public void Dispose()
            {
                if (this.observers.Contains(this.observer))
                    this.observers.Remove(this.observer);
            }
        }
 
        /// <summary>
        /// オブザーバーのコレクションを保持します。
        /// </summary>
        private readonly LinkedList<IObserver<News>> observers = new LinkedList<IObserver<News>>();
 
        /// <summary>
        /// オブザーバーが通知を受け取ることをプロバイダーに通知します。
        /// </summary>
        /// <param name="observer">通知を受け取るオブジェクト</param>
        /// <returns>プロバイダーが通知の送信を完了する前に、オブザーバーが通知の受信を停止できるインターフェイスへの参照</returns>
        public IDisposable Subscribe(IObserver<News> observer)
        {
            if (!this.observers.Contains(observer))
                this.observers.AddLast(observer);
            return new Unsbscriber(this.observers, observer);
        }
 
        /// <summary>
        /// 配信します。
        /// </summary>
        /// <param name="news">配信するニュース</param>
        public void Deliver(News news)
        {
            foreach (var observer in this.observers)
            {
                if (news == null) observer.OnError(new ArgumentNullException());
                else              observer.OnNext(news);
            }
        }
 
        /// <summary>
        /// 配信を終了します。
        /// </summary>
        public void StopDelivering()
        {
            foreach (var observer in this.observers)
                observer.OnCompleted();
            this.observers.Clear();
        }
    }
}

準備が整ったので、下記のように実行してみます。

namespace Sample01_ObserverPattern
{
    class Program
    {
        static void Main()
        {
            var provider     = new WebcastingStation();
            var unsbscriber1 = provider.Subscribe(new Receiver("吉木"));
            var unsbscriber2 = provider.Subscribe(new Receiver("倉木"));
            var unsbscriber3 = provider.Subscribe(new Receiver("鈴木"));
 
            provider.Deliver(new News("Title1", "Content1"));
            unsbscriber1.Dispose();
            provider.Deliver(null);
            provider.Deliver(new News("Title2", "Content2"));
            unsbscriber2.Dispose();
            provider.StopDelivering();
            provider.Deliver(new News("Title3", "Content3"));
            unsbscriber3.Dispose();
        }
    }
}
 
//—– 結果
/*
吉木 : OnNext [Title1 - Content1]
倉木 : OnNext [Title1 - Content1]
鈴木 : OnNext [Title1 - Content1]
倉木 : OnError
鈴木 : OnError
倉木 : OnNext [Title2 - Content2]
鈴木 : OnNext [Title2 - Content2]
鈴木 : OnCompleted
*/

上記の例題は次のように動いています。オブザーバーパターンの基本的な動作が確認できるかと思います。

  1. 放送局に3名の受信者を登録
  2. 放送局からTitle1のニュースが配信される
  3. 全員が正常に受信し、その旨をコンソール画面に表示
  4. 吉木さんがユーザー登録を解除
  5. 放送局が不明なニュースを配信
  6. 残りの2名が正常に受信できなかった旨をコンソール画面に表示
  7. 放送局がTitle2のニュースを配信
  8. 2名が正常に受信できた旨をコンソール画面に表示
  9. 倉木さんがユーザー登録解除
  10. 放送局がニュース配信を終了
  11. 配信終了を受け、ユーザー登録がされている鈴木さんが配信完了の旨をコンソール画面に表示
  12. 配信終了の通知後、放送局は全ユーザー登録を解除
  13. 放送局がTitle3のニュースを配信
  14. 登録されている人がいないのでコンソールには何も表示されない
  15. 鈴木さんのユーザー登録を解除するが、すでに登録解除されているので何も起こらない

参考記事

次の記事は、オブザーバーパターンについてより深く理解するための強力な手助けになると思います。ぜひ参考にしてみてください。

次回予告

今回はオブザーバーパターンの基本について見てきました。すでにお気付きかもしれませんが、これだけのことなら.NET Framework標準で提供されているイベントで十分に対応可能です。イベントはオブザーバーパターンの一実装なので、イベントを使えばこんなまどろっこしい事をする必要はありません。では、IObservable<T>とIObserver<T>を使うメリットは何でしょうか。それこそがRxの優れているところなので、今後追々見ていく予定です。次回はPush型とPull型ついて触れてみたいと思います。