1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 The design follows a basic actor system for handling the threads.
7 */
8 module dsnapshot.cmdgroup.watch;
9 
10 import core.thread : Thread;
11 import core.time : dur;
12 import logger = std.experimental.logger;
13 import std.algorithm : filter, map;
14 import std.array : empty;
15 import std.concurrency : spawn, spawnLinked, Tid, thisTid, send, receive,
16     receiveTimeout, receiveOnly, OwnerTerminated;
17 import std.exception : collectException;
18 
19 import sumtype;
20 
21 import dsnapshot.config : Config;
22 import dsnapshot.types;
23 
24 import dsnapshot.backend;
25 import dsnapshot.console;
26 import dsnapshot.exception;
27 import dsnapshot.from;
28 import dsnapshot.layout;
29 import dsnapshot.layout_utils;
30 
31 version (unittest) {
32     import unit_threaded.assertions;
33 }
34 
35 @safe:
36 
37 int cli(const Config.Global cglobal, const Config.Watch cwatch, SnapshotConfig[] snapshots) {
38     void act(const SnapshotConfig s) @trusted {
39         auto createSnapshotTid = spawnLinked(&actorCreateSnapshot, cast(immutable) s);
40         auto filterAndTriggerSyncTid = spawnLinked(&actorFilterAndTriggerSync,
41                 cast(immutable) s, createSnapshotTid);
42         auto watchTid = spawnLinked(&actorWatch, cast(immutable) s, filterAndTriggerSyncTid);
43 
44         send(createSnapshotTid, RegisterCreateSnapshotListener(filterAndTriggerSyncTid));
45         send(createSnapshotTid, RegisterNewSnapshotListener(thisTid));
46         send(createSnapshotTid, RegisterListenerDone.value);
47 
48         send(filterAndTriggerSyncTid, watchTid);
49 
50         send(watchTid, Start.value);
51 
52         ulong countSnapshots;
53         while (countSnapshots < cwatch.maxSnapshots) {
54             // TODO: should this be triggered? maybe on ctrl+c?
55             receiveOnly!NewSnapshotReply;
56             countSnapshots++;
57         }
58     }
59 
60     foreach (const s; snapshots.filter!(a => cwatch.name.value == a.name)) {
61         logger.info("# Watching ", s.name);
62         scope (exit)
63             logger.info("# Done watching ", s.name);
64 
65         try {
66             act(s);
67         } catch (SnapshotException e) {
68             e.errMsg.match!(a => a.print);
69             logger.error(e.msg);
70         } catch (Exception e) {
71             logger.error(e.msg);
72         }
73 
74         return 0;
75     }
76 
77     logger.info("No snapshot configuration named ", cwatch.name.value);
78     return 1;
79 }
80 
81 private:
82 
83 enum FilesystemChange {
84     value,
85 }
86 
87 enum CreateSnapshot {
88     value,
89 }
90 
91 enum CreateSnapshotReply {
92     value,
93 }
94 
95 enum NewSnapshotReply {
96     value,
97 }
98 
99 enum Shutdown {
100     value,
101 }
102 
103 enum Start {
104     value,
105 }
106 
107 enum RegisterListenerDone {
108     value,
109 }
110 
111 /// If the signal to create a snapshot has been received.
112 struct RegisterCreateSnapshotListener {
113     Tid value;
114 }
115 
116 /// If a snapshot has successfully been created.
117 struct RegisterNewSnapshotListener {
118     Tid value;
119 }
120 
121 /** Watch a path for changes on the filesystem.
122  */
123 void actorWatch(immutable SnapshotConfig snapshot, Tid onFsChange) nothrow {
124     import std.datetime : Duration;
125     import fswatch : FileWatch, FileChangeEvent, FileChangeEventType;
126 
127     void eventHandler(FileChangeEvent[] events) @safe {
128         if (!events.empty) {
129             () @trusted {
130                 send(onFsChange, FilesystemChange.value);
131                 receiveOnly!CreateSnapshotReply;
132             }();
133         }
134 
135         foreach (event; events) {
136             final switch (event.type) with (FileChangeEventType) {
137             case createSelf:
138                 logger.trace("Observable path created");
139                 break;
140             case removeSelf:
141                 logger.trace("Observable path deleted");
142                 break;
143             case create:
144                 logger.tracef("'%s' created", event.path);
145                 break;
146             case remove:
147                 logger.tracef("'%s' removed", event.path);
148                 break;
149             case rename:
150                 logger.tracef("'%s' renamed to '%s'", event.path, event.newPath);
151                 break;
152             case modify:
153                 logger.tracef("'%s' contents modified", event.path);
154                 break;
155             }
156         }
157     }
158 
159     string extractPath() @trusted nothrow {
160         try {
161             auto syncBe = makeSyncBackend(cast() snapshot);
162             return syncBe.flow.match!((None a) => null,
163                     (FlowLocal a) => a.src.value.Path.toString, (FlowRsyncToLocal a) => null,
164                     (FlowLocalToRsync a) => a.src.value.Path.toString);
165         } catch (Exception e) {
166             logger.warning(e.msg).collectException;
167         }
168         return null;
169     }
170 
171     void actFallback(const Duration poll) @trusted {
172         send(onFsChange, FilesystemChange.value);
173         receiveOnly!CreateSnapshotReply;
174         Thread.sleep(poll);
175     }
176 
177     void actNormal(string path, const Duration poll) @trusted {
178         auto watcher = FileWatch(path, true);
179         while (true) {
180             eventHandler(watcher.getEvents());
181             Thread.sleep(poll);
182         }
183     }
184 
185     auto path = extractPath;
186     const poll = () {
187         if (path.empty) {
188             logger.info("No local path to watch for changes. Falling back to polling.")
189                 .collectException;
190             return 10.dur!"seconds";
191         } else {
192             logger.infof("Watching %s for changes", path).collectException;
193             // arbitrarily chosen a timeout that is hopefully fast enough but not too fast.
194             return 200.dur!"msecs";
195         }
196     }();
197 
198     () @trusted { receiveOnly!Start.collectException; }();
199 
200     if (path.empty) {
201         while (true) {
202             try {
203                 actFallback(poll);
204             } catch (OwnerTerminated) {
205                 break;
206             } catch (Exception e) {
207                 logger.warning(e.msg).collectException;
208             }
209         }
210 
211     } else {
212         while (true) {
213             try {
214                 actNormal(path, poll);
215             } catch (OwnerTerminated) {
216                 break;
217             } catch (Exception e) {
218                 logger.warning(e.msg).collectException;
219             }
220         }
221     }
222 }
223 
224 /** Collect filesystem events to trigger a new snapshot when the first bucket
225  * is empty in the layout.
226  */
227 void actorFilterAndTriggerSync(immutable SnapshotConfig snapshot_, Tid onSync) nothrow {
228     import std.datetime : Clock, SysTime, Duration;
229 
230     static struct Process {
231     @safe:
232 
233         SyncBackend syncBe;
234         SnapshotConfig sconf;
235         Layout layout;
236 
237         SysTime triggerAt;
238 
239         this(SyncBackend syncBe, SnapshotConfig sconf) {
240             this.syncBe = syncBe;
241             this.sconf = sconf;
242             this.updateTrigger();
243         }
244 
245         void updateTrigger() {
246             layout = syncBe.update(sconf.layout);
247 
248             if (layout.isFirstBucketEmpty) {
249                 triggerAt = Clock.currTime;
250             } else {
251                 triggerAt = Clock.currTime + (layout.snapshotTimeInBucket(0)
252                         .get - layout.times[0].begin);
253             }
254         }
255 
256         Duration timeout() {
257             if (layout.isFirstBucketEmpty) {
258                 return Duration.zero;
259             }
260             return triggerAt - Clock.currTime;
261         }
262 
263         bool trigger() {
264             return Clock.currTime > triggerAt;
265         }
266     }
267 
268     void act(ref Process process, Tid onSnapshotDone) @trusted {
269         bool tooEarly;
270         receive((FilesystemChange a) { tooEarly = !process.trigger(); });
271 
272         if (tooEarly) {
273             logger.trace("Too early, sleeping for ", process.timeout);
274             Thread.sleep(process.timeout);
275         }
276 
277         send(onSync, CreateSnapshot.value);
278         receiveOnly!CreateSnapshotReply;
279         process.updateTrigger();
280 
281         send(onSnapshotDone, CreateSnapshotReply.value);
282 
283         logger.info("Next snapshot at the earliest in ", process.timeout);
284     }
285 
286     try {
287         Tid onSnapshotDone = () @trusted { return receiveOnly!Tid; }();
288 
289         auto snapshot = () @trusted { return cast() snapshot_; }();
290 
291         auto backend = makeSyncBackend(snapshot);
292 
293         auto crypt = makeCrypBackend(snapshot.crypt);
294         open(crypt, backend.flow);
295         scope (exit)
296             crypt.close;
297 
298         auto process = Process(backend, snapshot);
299 
300         while (true) {
301             act(process, onSnapshotDone);
302         }
303     } catch (OwnerTerminated) {
304     } catch (Exception e) {
305         logger.error(e.msg).collectException;
306     }
307 }
308 
309 void actorCreateSnapshot(immutable SnapshotConfig snapshot) nothrow {
310     import std.datetime : Clock;
311 
312     static void act(SnapshotConfig snapshot) @safe {
313         auto backend = makeSyncBackend(snapshot);
314 
315         auto crypt = makeCrypBackend(snapshot.crypt);
316         open(crypt, backend.flow);
317         scope (exit)
318             crypt.close;
319 
320         auto layout = backend.update(snapshot.layout);
321 
322         const newSnapshot = () {
323             return Clock.currTime.toUTC.toISOExtString ~ snapshotInProgressSuffix;
324         }();
325 
326         backend.sync(layout, snapshot, newSnapshot);
327 
328         backend.publishSnapshot(newSnapshot);
329         backend.removeDiscarded(layout);
330     }
331 
332     Tid[] onSnapshotDone;
333     Tid[] onNewSnapshot;
334     try {
335         bool running = true;
336         while (running) {
337             () @trusted {
338                 receive((RegisterCreateSnapshotListener a) {
339                     onSnapshotDone ~= a.value;
340                 }, (RegisterNewSnapshotListener a) { onNewSnapshot ~= a.value; },
341                         (RegisterListenerDone a) { running = false; });
342             }();
343         }
344     } catch (Exception e) {
345         logger.error(e.msg).collectException;
346         return;
347     }
348 
349     while (true) {
350         try {
351             () @trusted {
352                 scope (exit)
353                     () {
354                     foreach (t; onSnapshotDone)
355                         send(t, CreateSnapshotReply.value);
356                 }();
357                 receive((CreateSnapshot a) {
358                     act(cast() snapshot);
359                     foreach (t; onNewSnapshot)
360                         send(t, NewSnapshotReply.value);
361                 });
362             }();
363         } catch (OwnerTerminated) {
364             break;
365         } catch (Exception e) {
366             logger.warning(e.msg).collectException;
367         }
368     }
369 }