1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 # Algorithm
7 1. Construct the layout consisting of a number of consecutive slots with a
8    duration between them.
9 2. Fill the slots with snapshots that exists using a "best fit" algorithm.
10 
11 # Best fit
12 The best fitting snapshot is the one with the lowest difference between the
13 buckets time and the snapshots actual time.
14 */
15 module dsnapshot.layout;
16 
17 import logger = std.experimental.logger;
18 import std.algorithm : joiner, map, filter;
19 import std.array : appender, empty;
20 import std.datetime : SysTime, Duration, dur, Clock, Interval;
21 import std.range : repeat, enumerate;
22 import std.typecons : Nullable;
23 
24 public import dsnapshot.types : Name;
25 
26 version (unittest) {
27     import unit_threaded.assertions;
28 }
29 
30 @safe:
31 
32 struct Snapshot {
33     /// The time the snapshot was taken.
34     SysTime time;
35     /// Name of the snapshot. This is used to locate it.
36     Name name;
37 }
38 
39 /// Represent an empty position in the snapshot layout.
40 struct Empty {
41 }
42 
43 // TODO: replace bucket with an alias to the internal sumtype.
44 struct Bucket {
45     import sumtype;
46 
47     SumType!(Empty, Snapshot) value;
48 
49     import std.range : isOutputRange;
50 
51     string toString() @safe const {
52         import std.array : appender;
53 
54         auto buf = appender!string;
55         toString(buf);
56         return buf.data;
57     }
58 
59     void toString(Writer)(ref Writer w) const if (isOutputRange!(Writer, char)) {
60         import std.format : formattedWrite;
61         import std.range : enumerate, put;
62 
63         formattedWrite(w, "%s", cast() value);
64     }
65 }
66 
67 /// It is always positive. The closer to zero the better fit.
68 Duration fitness(const SysTime a, const SysTime b) {
69     auto diff = b - a;
70     if (diff.isNegative)
71         return diff * -1;
72     return diff;
73 }
74 
75 /// Returns: the index of the interval that enclose `time`.
76 Nullable!size_t bestFitInterval(const SysTime time, const Interval!SysTime[] candidates) @safe pure nothrow {
77     typeof(return) rval;
78     // can't use contains because we want the intervals to be inverted, open
79     // beginning and closed end. this is to put times that are on the edge in
80     // the "closest to now" interval.
81     foreach (a; candidates.enumerate.filter!(a => (time > a.value.begin && time <= a.value.end))) {
82         rval = a.index;
83         break;
84     }
85 
86     return rval;
87 }
88 
89 @("shall find the interval that contains the time")
90 unittest {
91     import std.array : array;
92     import std.range : iota;
93 
94     const base = Clock.currTime;
95     const offset = 5.dur!"minutes";
96 
97     auto candidates = iota(0, 10).map!(a => Interval!SysTime(base - (a + 1)
98             .dur!"hours", base - a.dur!"hours")).array;
99 
100     // |---|---|---|---|---|---|
101     // 0   1   2   3   4   5   6
102     bestFitInterval(base - offset, candidates).get.should == 0;
103     bestFitInterval(base - 4.dur!"hours" - offset, candidates).get.should == 4;
104     bestFitInterval(Clock.currTime - 5.dur!"hours" - offset, candidates).get.should == 5;
105 
106     // test edge case where the times are exactly on the borders
107     bestFitInterval(base, candidates).get.should == 0;
108     bestFitInterval(base - 1.dur!"hours", candidates).get.should == 1;
109     bestFitInterval(base - 4.dur!"hours", candidates).get.should == 4;
110     bestFitInterval(base - 5.dur!"hours", candidates).get.should == 5;
111 }
112 
113 /// Returns: the index of the candidate that best fit the time.
114 Nullable!size_t bestFitTime(const SysTime time, const SysTime[] candidates) {
115     import std.typecons : tuple;
116 
117     typeof(return) rval;
118     auto curr = Duration.max;
119     foreach (a; candidates.enumerate.map!(a => tuple(a.index, fitness(time, a.value)))) {
120         if (a[1] < curr) {
121             rval = a[0];
122             curr = a[1];
123         }
124     }
125 
126     return rval;
127 }
128 
129 @("shall find the candidate that has the best fitness for the specific time")
130 unittest {
131     import std.array : array;
132     import std.range : iota;
133 
134     const base = Clock.currTime;
135     const offset = 5.dur!"minutes";
136 
137     auto candidates = iota(0, 10).map!(a => base - a.dur!"hours").array;
138 
139     // |---|---|---|---|---|---|
140     // 0   1   2   3   4   5   6
141     bestFitTime(base - offset, candidates).get.should == 0;
142     bestFitTime(base - 4.dur!"hours" - offset, candidates).get.should == 4;
143     bestFitTime(Clock.currTime - 5.dur!"hours" - offset, candidates).get.should == 5;
144 }
145 
146 /**
147  * At construction it is configured with how the snapshots should be organized
148  * into buckets. How many and the space in time between them.
149  *
150  * It is then updated with the current layout on the storage medium.
151  *
152  * The only data that it relies on is the basename of the paths that are pushed
153  * to it.
154  *
155  * It operates on two passes.
156  * The first pass is basically a histogram. It finds the bucket interval that
157  * *contain* the snapshot. It then checks to see if the candidate is newer than
158  * the one currently in the bucket. If so it replaces it. This mean that each
159  * bucket contains the latest snapshot that fit it. If a snapshot do not fit in
160  * a bucket or is replaced it is moved to the discarded list.
161  *
162  * The second pass goes through those that has been marked for discard and see
163  * if any of them fit *well enough* into the buckets that are empty.
164  */
165 struct Layout {
166     import sumtype;
167 
168     /// The config which can be used to regenerate the layout.
169     LayoutConfig conf;
170 
171     Bucket[] buckets;
172     /// The time of the bucket which a snapshot should try to match.
173     const(Interval!SysTime)[] times;
174 
175     /// Snapshots that has been discarded because they do are not the best fit for any bucket.
176     Snapshot[] discarded;
177 
178     this(const SysTime start, const LayoutConfig conf) {
179         this.conf = LayoutConfig(conf.spans.dup);
180         auto begin = start.toUTC;
181         auto end = start.toUTC;
182         auto app = appender!(Interval!SysTime[])();
183         foreach (const a; conf.spans.map!(a => repeat(a.space, a.nr)).joiner) {
184             try {
185                 end = begin;
186                 begin -= a;
187                 app.put(Interval!SysTime(begin, end));
188             } catch (Exception e) {
189                 logger.warning(e.msg);
190                 logger.infof("Tried to create a bucket with time span %s -> %s from span interval %s",
191                         begin, end, a);
192             }
193         }
194         times = app.data;
195         buckets.length = times.length;
196     }
197 
198     Layout dup() @safe pure nothrow const {
199         Layout rval;
200         rval.buckets = buckets.dup;
201         rval.times = times.dup;
202         rval.discarded = discarded.dup;
203         return rval;
204     }
205 
206     bool isFirstBucketEmpty() @safe pure nothrow const @nogc {
207         if (buckets.length == 0)
208             return false;
209         return buckets[0].value.match!((Empty a) => true, (Snapshot a) => false);
210     }
211 
212     Nullable!Snapshot firstFullBucket() const {
213         typeof(return) rval;
214         foreach (a; buckets) {
215             bool done;
216             a.value.match!((Empty a) {}, (Snapshot a) { done = true; rval = a; });
217             if (done)
218                 break;
219         }
220 
221         return rval;
222     }
223 
224     /// Returns: a snapshot that can be used to resume.
225     Nullable!Snapshot resume() @safe pure nothrow const @nogc {
226         import std.algorithm : endsWith;
227         import dsnapshot.types : snapshotInProgressSuffix;
228 
229         typeof(return) rval;
230 
231         foreach (s; discarded.filter!(a => a.name.value.endsWith(snapshotInProgressSuffix))) {
232             rval = s;
233             break;
234         }
235 
236         return rval;
237     }
238 
239     bool empty() const {
240         return buckets.length == 0;
241     }
242 
243     /// Returns: the time of the snapshot that is in the bucket
244     Nullable!SysTime snapshotTimeInBucket(size_t idx) @safe pure nothrow const @nogc {
245         typeof(return) rval;
246         if (idx >= buckets.length)
247             return rval;
248 
249         buckets[idx].value.match!((Empty a) {}, (Snapshot a) { rval = a.time; });
250         return rval;
251     }
252 
253     /// Returns: the bucket which interval enclose `time`.
254     Nullable!Snapshot bestFitBucket(const SysTime time) @safe const {
255         typeof(return) rval;
256 
257         const fitIdx = bestFitInterval(time, times);
258         if (!fitIdx.isNull) {
259             buckets[fitIdx.get].value.match!((Empty a) {}, (Snapshot a) {
260                 rval = a;
261             });
262         }
263 
264         return rval;
265     }
266 
267     void put(const Snapshot s) {
268         if (buckets.length == 0) {
269             discarded ~= s;
270             return;
271         }
272 
273         const fitIdx = bestFitInterval(s.time, times);
274         if (fitIdx.isNull) {
275             discarded ~= s;
276             return;
277         }
278 
279         const bucketTime = times[fitIdx];
280         auto tmp = cast() buckets[fitIdx.get].value.match!((Empty a) => s, (Snapshot a) {
281             // Replace the snapshot in the bucket if the new one `s` is a better fit.
282             // Using `.end` on the assumption that the latest snapshot for
283             // each bucket is the most interesting. This also mean that when a
284             // snapshot trickle over to a new bucket it will most probably
285             // replace the old one right away because the old one is closer to
286             // the `.begin` than `.end`.
287             if (fitness(bucketTime.end, s.time) < fitness(bucketTime.end, a.time)) {
288                 discarded ~= a;
289                 return s;
290             }
291             discarded ~= s;
292             return a;
293         });
294 
295         () @trusted { buckets[fitIdx.get].value = tmp; }();
296     }
297 
298     void finalize() {
299         if (buckets.empty || discarded.empty)
300             return;
301 
302         auto iterate(Snapshot[] candidates) {
303             Snapshot[] spare;
304 
305             // find what bucket the candidate fit in (bucket n).
306             // Put it in the bucket after it (bucket n+1), if n+1 is empty or
307             // the new one fit better.
308             // Assume that this only need to handle the cases when a snapshot
309             // is about to expire from its current bucket.
310             while (!candidates.empty) {
311                 auto s = candidates[0];
312                 candidates = candidates[1 .. $];
313 
314                 const fitIdx = bestFitInterval(s.time, times);
315                 if (fitIdx.isNull) {
316                     discarded ~= s;
317                     continue;
318                 }
319 
320                 const idx = fitIdx.get + 1;
321                 if (idx > buckets.length) {
322                     discarded ~= s;
323                     continue;
324                 }
325 
326                 const bucketTime = times[idx];
327                 auto tmp = buckets[idx].value.match!((Empty a) => s, (Snapshot a) {
328                     if (fitness(bucketTime.end, s.time) < fitness(bucketTime.end, a.time)) {
329                         spare ~= a;
330                         return s;
331                     }
332                     spare ~= s;
333                     return a;
334                 });
335 
336                 () @trusted { buckets[idx].value = tmp; }();
337             }
338 
339             return spare;
340         }
341 
342         // Assume that a cascade effect may happen when a snapshot is replacing
343         // another snapshot in a bucket.
344         Snapshot[] prev;
345         do {
346             prev = discarded;
347             discarded = iterate(discarded);
348         }
349         while (prev != discarded);
350     }
351 
352     import std.range : isOutputRange;
353 
354     string toString() @safe const {
355         import std.array : appender;
356 
357         auto buf = appender!string;
358         toString(buf);
359         return buf.data;
360     }
361 
362     void toString(Writer)(ref Writer w) const if (isOutputRange!(Writer, char)) {
363         import std.format : formattedWrite;
364         import std.range : enumerate, put;
365         import std.ascii : newline;
366 
367         put(w, "Bucket Nr: Interval\n");
368         foreach (a; buckets.enumerate) {
369             formattedWrite(w, "%9s: %s - %s\n%11s", a.index,
370                     times[a.index].begin, times[a.index].end, "");
371             a.value.value.match!((Empty a) { put(w, "empty"); }, (Snapshot a) {
372                 formattedWrite(w, "%s", a.time);
373             });
374             put(w, newline);
375         }
376 
377         if (discarded.length != 0)
378             put(w, "Discarded\n");
379         foreach (a; discarded.enumerate)
380             formattedWrite(w, "%s: %s name:%s\n", a.index, a.value.time, a.value.name.value);
381     }
382 }
383 
384 /// Configuration for a span of snapshots in a layout.
385 struct Span {
386     uint nr;
387     Duration space;
388 }
389 
390 /// Configuration of a layout consisting of a number of span configs.
391 struct LayoutConfig {
392     Span[] spans;
393 }
394 
395 @(
396         "shall be a layout of 15 snapshots with increasing time between them when configured with three spans")
397 unittest {
398     import std.conv : to;
399     import std.range : iota;
400 
401     const base = Clock.currTime.toUTC;
402 
403     auto conf = LayoutConfig([
404             Span(5, 4.dur!"hours"), Span(5, 1.dur!"days"), Span(5, 7.dur!"days")
405             ]);
406     auto layout = Layout(base, conf);
407 
408     immutable addSnapshotsNr = 5 * 4 + 5 * 24 + 5 * 24 * 7;
409 
410     // completely fill up the layout
411     foreach (a; iota(0, addSnapshotsNr)) {
412         layout.put(Snapshot(base - a.dur!"hours", a.to!string.Name));
413     }
414 
415     layout.buckets.length.should == 15;
416     layout.discarded.length.shouldEqual(addSnapshotsNr - 15);
417 
418     (base - layout.times[0].begin).total!"hours".shouldEqual(4);
419     (base - layout.times[4].begin).total!"hours".shouldEqual(4 * 5);
420     (base - layout.times[5].begin).total!"hours".shouldEqual(4 * 5 + 24);
421     (base - layout.times[9].begin).total!"hours".shouldEqual(4 * 5 + 24 * 5);
422     (base - layout.times[10].begin).total!"hours".shouldEqual(4 * 5 + 24 * 5 + 24 * 7);
423     (base - layout.times[14].begin).total!"hours".shouldEqual(4 * 5 + 24 * 5 + 24 * 7 * 5);
424 }
425 
426 @("shall move the about to expire snapshot from its current bucket to bucket n+1")
427 unittest {
428     import std.conv : to;
429     import std.range : iota;
430 
431     const base = Clock.currTime.toUTC;
432 
433     auto conf = LayoutConfig([Span(5, 4.dur!"hours")]);
434     auto layout = Layout(base, conf);
435 
436     // init bucket 0
437     layout.put(Snapshot(base - 3.dur!"hours", "bucket 1".Name));
438     layout.put(Snapshot(base - 3.dur!"hours" - 10.dur!"minutes", "bucket 1".Name));
439     layout.put(Snapshot(base - 7.dur!"hours", "bucket 2".Name));
440     // replace bucket 0 with a better candidate
441     layout.put(Snapshot(base - 1.dur!"hours", "bucket 0".Name));
442 
443     layout.finalize;
444 
445     (base - layout.snapshotTimeInBucket(0)).total!"hours".shouldEqual(1);
446 
447     (base - layout.snapshotTimeInBucket(1)).total!"hours".shouldEqual(3);
448     // should be the one with -10 minutes.
449     (base - layout.snapshotTimeInBucket(1)).total!"minutes".shouldEqual(190);
450 
451     (base - layout.snapshotTimeInBucket(2)).total!"hours".shouldEqual(7);
452 }