1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 The design follows a basic actor system for handling the threads.
7 */
8 module dsnapshot.cmdgroup.watch;
9 
10 import core.thread : Thread;
11 import core.time : dur;
12 import logger = std.experimental.logger;
13 import std.algorithm : filter, map;
14 import std.array : empty;
15 import std.concurrency : spawn, spawnLinked, Tid, thisTid, send, receive,
16     receiveTimeout, receiveOnly, OwnerTerminated;
17 import std.exception : collectException;
18 
19 import sumtype;
20 
21 import dsnapshot.config : Config;
22 import dsnapshot.types;
23 
24 import dsnapshot.backend;
25 import dsnapshot.console;
26 import dsnapshot.exception;
27 import dsnapshot.from;
28 import dsnapshot.layout;
29 import dsnapshot.layout_utils;
30 
31 version (unittest) {
32     import unit_threaded.assertions;
33 }
34 
35 @safe:
36 
37 int cli(const Config.Global cglobal, const Config.Watch cwatch, SnapshotConfig[] snapshots) {
38     void act(const SnapshotConfig s) @trusted {
39         auto createSnapshotTid = spawnLinked(&actorCreateSnapshot, cast(immutable) s);
40         auto filterAndTriggerSyncTid = spawnLinked(&actorFilterAndTriggerSync,
41                 cast(immutable) s, createSnapshotTid);
42         auto watchTid = spawnLinked(&actorWatch, cast(immutable) s, filterAndTriggerSyncTid);
43 
44         send(createSnapshotTid, filterAndTriggerSyncTid);
45         send(createSnapshotTid, thisTid);
46         send(createSnapshotTid, RegisterListenerDone.value);
47 
48         send(filterAndTriggerSyncTid, watchTid);
49 
50         send(watchTid, Start.value);
51 
52         ulong countSnapshots;
53         while (countSnapshots < cwatch.maxSnapshots) {
54             // TODO: should this be triggered? maybe on ctrl+c?
55             receiveOnly!CreateSnapshotDone;
56             countSnapshots++;
57         }
58     }
59 
60     foreach (const s; snapshots.filter!(a => cwatch.name.value == a.name)) {
61         logger.info("# Watching ", s.name);
62         scope (exit)
63             logger.info("# Done watching ", s.name);
64 
65         try {
66             act(s);
67         } catch (SnapshotException e) {
68             e.errMsg.match!(a => a.print);
69             logger.error(e.msg);
70         } catch (Exception e) {
71             logger.error(e.msg);
72         }
73 
74         return 0;
75     }
76 
77     logger.info("No snapshot configuration named ", cwatch.name.value);
78     return 1;
79 }
80 
81 private:
82 
83 enum FilesystemChange {
84     value,
85 }
86 
87 enum CreateSnapshot {
88     value,
89 }
90 
91 enum CreateSnapshotDone {
92     value,
93 }
94 
95 enum Shutdown {
96     value,
97 }
98 
99 enum Start {
100     value,
101 }
102 
103 enum RegisterListenerDone {
104     value,
105 }
106 
107 /** Watch a path for changes on the filesystem.
108  */
109 void actorWatch(immutable SnapshotConfig snapshot, Tid onFsChange) nothrow {
110     import std.datetime : Duration;
111     import fswatch : FileWatch, FileChangeEvent, FileChangeEventType;
112 
113     void eventHandler(FileChangeEvent[] events) @safe {
114         if (!events.empty) {
115             () @trusted {
116                 send(onFsChange, FilesystemChange.value);
117                 receiveOnly!CreateSnapshotDone;
118             }();
119         }
120 
121         foreach (event; events) {
122             final switch (event.type) with (FileChangeEventType) {
123             case createSelf:
124                 logger.trace("Observable path created");
125                 break;
126             case removeSelf:
127                 logger.trace("Observable path deleted");
128                 break;
129             case create:
130                 logger.tracef("'%s' created", event.path);
131                 break;
132             case remove:
133                 logger.tracef("'%s' removed", event.path);
134                 break;
135             case rename:
136                 logger.tracef("'%s' renamed to '%s'", event.path, event.newPath);
137                 break;
138             case modify:
139                 logger.tracef("'%s' contents modified", event.path);
140                 break;
141             }
142         }
143     }
144 
145     string extractPath() @trusted nothrow {
146         try {
147             auto syncBe = makeSyncBackend(cast() snapshot);
148             return syncBe.flow.match!((None a) => null,
149                     (FlowLocal a) => a.src.value.Path.toString, (FlowRsyncToLocal a) => null,
150                     (FlowLocalToRsync a) => a.src.value.Path.toString);
151         } catch (Exception e) {
152             logger.warning(e.msg).collectException;
153         }
154         return null;
155     }
156 
157     void actFallback(const Duration poll) @trusted {
158         send(onFsChange, FilesystemChange.value);
159         receiveOnly!CreateSnapshotDone;
160         Thread.sleep(poll);
161     }
162 
163     void actNormal(string path, const Duration poll) @trusted {
164         auto watcher = FileWatch(path, true);
165         while (true) {
166             eventHandler(watcher.getEvents());
167             Thread.sleep(poll);
168         }
169     }
170 
171     auto path = extractPath;
172     const poll = () {
173         if (path.empty) {
174             logger.info("No local path to watch for changes. Falling back to polling.")
175                 .collectException;
176             return 10.dur!"seconds";
177         } else {
178             logger.infof("Watching %s for changes", path).collectException;
179             // arbitrarily chosen a timeout that is hopefully fast enough but not too fast.
180             return 200.dur!"msecs";
181         }
182     }();
183 
184     () @trusted { receiveOnly!Start.collectException; }();
185 
186     if (path.empty) {
187         while (true) {
188             try {
189                 actFallback(poll);
190             } catch (OwnerTerminated) {
191                 break;
192             } catch (Exception e) {
193                 logger.warning(e.msg).collectException;
194             }
195         }
196 
197     } else {
198         while (true) {
199             try {
200                 actNormal(path, poll);
201             } catch (OwnerTerminated) {
202                 break;
203             } catch (Exception e) {
204                 logger.warning(e.msg).collectException;
205             }
206         }
207     }
208 }
209 
210 /** Collect filesystem events to trigger a new snapshot when the first bucket
211  * is empty in the layout.
212  */
213 void actorFilterAndTriggerSync(immutable SnapshotConfig snapshot_, Tid onSync) nothrow {
214     import std.datetime : Clock, SysTime, Duration;
215 
216     static struct Process {
217     @safe:
218 
219         SyncBackend syncBe;
220         SnapshotConfig sconf;
221         Layout layout;
222 
223         SysTime triggerAt;
224 
225         this(SyncBackend syncBe, SnapshotConfig sconf) {
226             this.syncBe = syncBe;
227             this.sconf = sconf;
228             this.updateTrigger();
229         }
230 
231         void updateTrigger() {
232             this.layout = syncBe.update(Layout(Clock.currTime, sconf.layout.conf));
233 
234             if (layout.isFirstBucketEmpty) {
235                 triggerAt = Clock.currTime;
236             } else {
237                 triggerAt = Clock.currTime + (layout.snapshotTimeInBucket(0)
238                         .get - layout.times[0].begin);
239             }
240         }
241 
242         Duration timeout() {
243             if (layout.isFirstBucketEmpty) {
244                 return Duration.zero;
245             }
246             return triggerAt - Clock.currTime;
247         }
248 
249         bool trigger() {
250             return Clock.currTime > triggerAt;
251         }
252     }
253 
254     void act(ref Process process, Tid onSnapshotDone) @trusted {
255         bool tooEarly;
256         receive((FilesystemChange a) { tooEarly = !process.trigger(); });
257 
258         if (tooEarly) {
259             logger.trace("Too early, sleeping for ", process.timeout);
260             Thread.sleep(process.timeout);
261         }
262 
263         send(onSync, CreateSnapshot.value);
264         receiveOnly!CreateSnapshotDone;
265         process.updateTrigger();
266 
267         send(onSnapshotDone, CreateSnapshotDone.value);
268 
269         logger.info("Next snapshot at the earliest in ", process.timeout);
270     }
271 
272     try {
273         Tid onSnapshotDone = () @trusted { return receiveOnly!Tid; }();
274 
275         auto snapshot = () @trusted { return cast() snapshot_; }();
276 
277         auto backend = makeSyncBackend(snapshot);
278 
279         auto crypt = makeCrypBackend(snapshot.crypt);
280         open(crypt, backend.flow);
281         scope (exit)
282             crypt.close;
283 
284         auto process = Process(backend, snapshot);
285 
286         while (true) {
287             act(process, onSnapshotDone);
288         }
289     } catch (OwnerTerminated) {
290     } catch (Exception e) {
291         logger.error(e.msg).collectException;
292     }
293 }
294 
295 void actorCreateSnapshot(immutable SnapshotConfig snapshot) nothrow {
296     import std.datetime : Clock;
297 
298     static void act(SnapshotConfig snapshot) @safe {
299         auto backend = makeSyncBackend(snapshot);
300 
301         auto crypt = makeCrypBackend(snapshot.crypt);
302         open(crypt, backend.flow);
303         scope (exit)
304             crypt.close;
305 
306         auto layout = backend.update(snapshot.layout);
307 
308         const newSnapshot = () {
309             return Clock.currTime.toUTC.toISOExtString ~ snapshotInProgressSuffix;
310         }();
311 
312         backend.sync(layout, snapshot, newSnapshot);
313 
314         backend.publishSnapshot(newSnapshot);
315         backend.removeDiscarded(layout);
316     }
317 
318     Tid[] onSnapshotDone;
319     try {
320         bool running = true;
321         while (running) {
322             () @trusted {
323                 receive((Tid a) => onSnapshotDone ~= a, (RegisterListenerDone a) {
324                     running = false;
325                 });
326             }();
327         }
328     } catch (Exception e) {
329         logger.error(e.msg).collectException;
330         return;
331     }
332 
333     while (true) {
334         try {
335             () @trusted {
336                 scope (exit)
337                     () {
338                     foreach (t; onSnapshotDone)
339                         send(t, CreateSnapshotDone.value);
340                 }();
341                 receive((CreateSnapshot a) { act(cast() snapshot); });
342             }();
343         } catch (OwnerTerminated) {
344             break;
345         } catch (Exception e) {
346             logger.warning(e.msg).collectException;
347         }
348     }
349 }