1 module rxd.rx.base_interfaces;
2 
3 ///
4 interface IDisposable
5 {
6     ///
7     bool dispose();
8 }
9 
10 ///
11 interface IObserver(T)
12 {
13     ///
14     void onNext(T value);
15 
16     ///
17     void onError(Exception error);
18 
19     ///
20     void onCompleted();
21 }
22 
23 ///
24 unittest
25 {
26     int n, e, c;
27     class NumberObserver : IObserver!int
28     {
29         import std.stdio;
30         void onNext(int x) { n += x; }
31         void onError(Exception err) { e++; }
32         void onCompleted() { c++; }
33     }
34 
35     auto o = new NumberObserver();
36 
37     foreach (i; 0 .. 5)
38         o.onNext(i);
39 
40     o.onCompleted();
41 
42     assert (n == 10);
43     assert (e == 0);
44     assert (c == 1);
45 }
46 
47 ///
48 interface IObserver(T, Result)
49 {
50     ///
51     Result onNext(T value);
52 
53     ///
54     Result onError(Exception error);
55 
56     ///
57     Result onCompleted();
58 }
59 
60 ///
61 interface IObservable(T)
62 {
63     IDisposable subscribe(IObserver!T observer);
64 }
65 
66 ///
67 unittest
68 {
69     class ObservableNumber : IObservable!int
70     {
71         private int number;
72         private IObserver!int[] observers;
73 
74         private void publish()
75         {
76             foreach (o; observers)
77                 o.onNext(this.number);
78         }
79 
80         int opUnary(string op)()
81         {
82             this.number = mixin (op ~ "this.number");
83             publish();
84             return this.number;
85         }
86 
87         int opBinary(string op, T)(T other) const
88             if (is(T : int))
89         {
90             return mixin ("this.number" ~ op ~ "other");
91         }
92 
93         int opAssign(T)(T other) if (is(T : int))
94         {
95             this.number = other;
96             publish();
97             return this.number;
98         }
99 
100         int opOpAssign(string op, T)(T other) if (is(T : int))
101         {
102             mixin ("this = this " ~ op ~ " other;");
103             return this.number;
104         }
105 
106         class Ticket : IDisposable
107         {
108             this(size_t idx) { this.idx = idx; }
109             private const size_t idx;
110             bool dispose()
111             {
112                 if (this.outer.observers[idx] !is null)
113                 {
114                     this.outer.observers[idx] = null;
115                     return true;
116                 }
117                 else
118                     return false;
119             }
120         }
121 
122         IDisposable subscribe(IObserver!int o)
123         {
124             observers ~= o;
125             o.onNext(this.number);
126             return new Ticket(observers.length - 1);
127         }
128     }
129 
130     class NumberObserver : IObserver!int
131     {
132         int[] result;
133         void onNext(int x) { result ~= x; }
134         void onError(Exception error) { assert (0); }
135         void onCompleted() { assert (0); }
136     }
137 
138     auto observervable = new ObservableNumber;
139     auto observer = new NumberObserver;
140 
141     auto ticket = observervable.subscribe(observer);
142     assert ((cast(ObservableNumber.Ticket)ticket).idx == 0);
143     assert (observervable.observers.length == 1);
144     assert (observer.result == [ 0 ]);
145 
146     observervable++;
147     observervable += 3;
148     observervable *= 2;
149 
150     assert (observervable.number == 8);
151     assert (observer.result == [ 0, 1, 4, 8 ]);
152 }
153 
154 ///
155 interface ISubject(I, O) : IObserver!I, IObservable!O
156 {
157 }
158 
159 ///
160 interface ISubject(T) : ISubject!(T, T)
161 {
162 }
163 
164 ///
165 interface IConnectableObservable(T) : IObservable!T
166 {
167     IDisposable connect();
168 }
169 
170 interface IEventPattern(Sender, EventArgs)
171 {
172     Sender sender();
173     EventArgs eventArgs();
174 }