1 module rxd.rx.base_interfaces; 2 3 /// 4 interface IDisposable 5 { 6 /// 7 bool dispose(); 8 } 9 10 /// 11 interface IObserver(T) 12 { 13 /// 14 void onNext(T value); 15 16 /// 17 void onError(Exception error); 18 19 /// 20 void onCompleted(); 21 } 22 23 /// 24 unittest 25 { 26 int n, e, c; 27 class NumberObserver : IObserver!int 28 { 29 import std.stdio; 30 void onNext(int x) { n += x; } 31 void onError(Exception err) { e++; } 32 void onCompleted() { c++; } 33 } 34 35 auto o = new NumberObserver(); 36 37 foreach (i; 0 .. 5) 38 o.onNext(i); 39 40 o.onCompleted(); 41 42 assert (n == 10); 43 assert (e == 0); 44 assert (c == 1); 45 } 46 47 /// 48 interface IObserver(T, Result) 49 { 50 /// 51 Result onNext(T value); 52 53 /// 54 Result onError(Exception error); 55 56 /// 57 Result onCompleted(); 58 } 59 60 /// 61 interface IObservable(T) 62 { 63 IDisposable subscribe(IObserver!T observer); 64 } 65 66 /// 67 unittest 68 { 69 class ObservableNumber : IObservable!int 70 { 71 private int number; 72 private IObserver!int[] observers; 73 74 private void publish() 75 { 76 foreach (o; observers) 77 o.onNext(this.number); 78 } 79 80 int opUnary(string op)() 81 { 82 this.number = mixin (op ~ "this.number"); 83 publish(); 84 return this.number; 85 } 86 87 int opBinary(string op, T)(T other) const 88 if (is(T : int)) 89 { 90 return mixin ("this.number" ~ op ~ "other"); 91 } 92 93 int opAssign(T)(T other) if (is(T : int)) 94 { 95 this.number = other; 96 publish(); 97 return this.number; 98 } 99 100 int opOpAssign(string op, T)(T other) if (is(T : int)) 101 { 102 mixin ("this = this " ~ op ~ " other;"); 103 return this.number; 104 } 105 106 class Ticket : IDisposable 107 { 108 this(size_t idx) { this.idx = idx; } 109 private const size_t idx; 110 bool dispose() 111 { 112 if (this.outer.observers[idx] !is null) 113 { 114 this.outer.observers[idx] = null; 115 return true; 116 } 117 else 118 return false; 119 } 120 } 121 122 IDisposable subscribe(IObserver!int o) 123 { 124 observers ~= o; 125 o.onNext(this.number); 126 return new Ticket(observers.length - 1); 127 } 128 } 129 130 class NumberObserver : IObserver!int 131 { 132 int[] result; 133 void onNext(int x) { result ~= x; } 134 void onError(Exception error) { assert (0); } 135 void onCompleted() { assert (0); } 136 } 137 138 auto observervable = new ObservableNumber; 139 auto observer = new NumberObserver; 140 141 auto ticket = observervable.subscribe(observer); 142 assert ((cast(ObservableNumber.Ticket)ticket).idx == 0); 143 assert (observervable.observers.length == 1); 144 assert (observer.result == [ 0 ]); 145 146 observervable++; 147 observervable += 3; 148 observervable *= 2; 149 150 assert (observervable.number == 8); 151 assert (observer.result == [ 0, 1, 4, 8 ]); 152 } 153 154 /// 155 interface ISubject(I, O) : IObserver!I, IObservable!O 156 { 157 } 158 159 /// 160 interface ISubject(T) : ISubject!(T, T) 161 { 162 } 163 164 /// 165 interface IConnectableObservable(T) : IObservable!T 166 { 167 IDisposable connect(); 168 } 169 170 interface IEventPattern(Sender, EventArgs) 171 { 172 Sender sender(); 173 EventArgs eventArgs(); 174 }