1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 The design follows a basic actor system for handling the threads. 7 */ 8 module dsnapshot.cmdgroup.watch; 9 10 import core.thread : Thread; 11 import core.time : dur; 12 import logger = std.experimental.logger; 13 import std.algorithm : filter, map; 14 import std.array : empty; 15 import std.concurrency : spawn, spawnLinked, Tid, thisTid, send, receive, 16 receiveTimeout, receiveOnly, OwnerTerminated; 17 import std.exception : collectException; 18 19 import sumtype; 20 21 import dsnapshot.config : Config; 22 import dsnapshot.types; 23 24 import dsnapshot.backend; 25 import dsnapshot.console; 26 import dsnapshot.exception; 27 import dsnapshot.from; 28 import dsnapshot.layout; 29 import dsnapshot.layout_utils; 30 31 version (unittest) { 32 import unit_threaded.assertions; 33 } 34 35 @safe: 36 37 int cli(const Config.Global cglobal, const Config.Watch cwatch, SnapshotConfig[] snapshots) { 38 void act(const SnapshotConfig s) @trusted { 39 auto createSnapshotTid = spawnLinked(&actorCreateSnapshot, cast(immutable) s); 40 auto filterAndTriggerSyncTid = spawnLinked(&actorFilterAndTriggerSync, 41 cast(immutable) s, createSnapshotTid); 42 auto watchTid = spawnLinked(&actorWatch, cast(immutable) s, filterAndTriggerSyncTid); 43 44 send(createSnapshotTid, filterAndTriggerSyncTid); 45 send(createSnapshotTid, thisTid); 46 send(createSnapshotTid, RegisterListenerDone.value); 47 48 send(filterAndTriggerSyncTid, watchTid); 49 50 send(watchTid, Start.value); 51 52 ulong countSnapshots; 53 while (countSnapshots < cwatch.maxSnapshots) { 54 // TODO: should this be triggered? maybe on ctrl+c? 55 receiveOnly!CreateSnapshotDone; 56 countSnapshots++; 57 } 58 } 59 60 foreach (const s; snapshots.filter!(a => cwatch.name.value == a.name)) { 61 logger.info("# Watching ", s.name); 62 scope (exit) 63 logger.info("# Done watching ", s.name); 64 65 try { 66 act(s); 67 } catch (SnapshotException e) { 68 e.errMsg.match!(a => a.print); 69 logger.error(e.msg); 70 } catch (Exception e) { 71 logger.error(e.msg); 72 } 73 74 return 0; 75 } 76 77 logger.info("No snapshot configuration named ", cwatch.name.value); 78 return 1; 79 } 80 81 private: 82 83 enum FilesystemChange { 84 value, 85 } 86 87 enum CreateSnapshot { 88 value, 89 } 90 91 enum CreateSnapshotDone { 92 value, 93 } 94 95 enum Shutdown { 96 value, 97 } 98 99 enum Start { 100 value, 101 } 102 103 enum RegisterListenerDone { 104 value, 105 } 106 107 /** Watch a path for changes on the filesystem. 108 */ 109 void actorWatch(immutable SnapshotConfig snapshot, Tid onFsChange) nothrow { 110 import std.datetime : Duration; 111 import fswatch : FileWatch, FileChangeEvent, FileChangeEventType; 112 113 void eventHandler(FileChangeEvent[] events) @safe { 114 if (!events.empty) { 115 () @trusted { 116 send(onFsChange, FilesystemChange.value); 117 receiveOnly!CreateSnapshotDone; 118 }(); 119 } 120 121 foreach (event; events) { 122 final switch (event.type) with (FileChangeEventType) { 123 case createSelf: 124 logger.trace("Observable path created"); 125 break; 126 case removeSelf: 127 logger.trace("Observable path deleted"); 128 break; 129 case create: 130 logger.tracef("'%s' created", event.path); 131 break; 132 case remove: 133 logger.tracef("'%s' removed", event.path); 134 break; 135 case rename: 136 logger.tracef("'%s' renamed to '%s'", event.path, event.newPath); 137 break; 138 case modify: 139 logger.tracef("'%s' contents modified", event.path); 140 break; 141 } 142 } 143 } 144 145 string extractPath() @trusted nothrow { 146 try { 147 auto syncBe = makeSyncBackend(cast() snapshot); 148 return syncBe.flow.match!((None a) => null, 149 (FlowLocal a) => a.src.value.Path.toString, (FlowRsyncToLocal a) => null, 150 (FlowLocalToRsync a) => a.src.value.Path.toString); 151 } catch (Exception e) { 152 logger.warning(e.msg).collectException; 153 } 154 return null; 155 } 156 157 void actFallback(const Duration poll) @trusted { 158 send(onFsChange, FilesystemChange.value); 159 receiveOnly!CreateSnapshotDone; 160 Thread.sleep(poll); 161 } 162 163 void actNormal(string path, const Duration poll) @trusted { 164 auto watcher = FileWatch(path, true); 165 while (true) { 166 eventHandler(watcher.getEvents()); 167 Thread.sleep(poll); 168 } 169 } 170 171 auto path = extractPath; 172 const poll = () { 173 if (path.empty) { 174 logger.info("No local path to watch for changes. Falling back to polling.") 175 .collectException; 176 return 10.dur!"seconds"; 177 } else { 178 logger.infof("Watching %s for changes", path).collectException; 179 // arbitrarily chosen a timeout that is hopefully fast enough but not too fast. 180 return 200.dur!"msecs"; 181 } 182 }(); 183 184 () @trusted { receiveOnly!Start.collectException; }(); 185 186 if (path.empty) { 187 while (true) { 188 try { 189 actFallback(poll); 190 } catch (OwnerTerminated) { 191 break; 192 } catch (Exception e) { 193 logger.warning(e.msg).collectException; 194 } 195 } 196 197 } else { 198 while (true) { 199 try { 200 actNormal(path, poll); 201 } catch (OwnerTerminated) { 202 break; 203 } catch (Exception e) { 204 logger.warning(e.msg).collectException; 205 } 206 } 207 } 208 } 209 210 /** Collect filesystem events to trigger a new snapshot when the first bucket 211 * is empty in the layout. 212 */ 213 void actorFilterAndTriggerSync(immutable SnapshotConfig snapshot_, Tid onSync) nothrow { 214 import std.datetime : Clock, SysTime, Duration; 215 216 static struct Process { 217 @safe: 218 219 SyncBackend syncBe; 220 SnapshotConfig sconf; 221 Layout layout; 222 223 SysTime triggerAt; 224 225 this(SyncBackend syncBe, SnapshotConfig sconf) { 226 this.syncBe = syncBe; 227 this.sconf = sconf; 228 this.updateTrigger(); 229 } 230 231 void updateTrigger() { 232 this.layout = syncBe.update(Layout(Clock.currTime, sconf.layout.conf)); 233 234 if (layout.isFirstBucketEmpty) { 235 triggerAt = Clock.currTime; 236 } else { 237 triggerAt = Clock.currTime + (layout.snapshotTimeInBucket(0) 238 .get - layout.times[0].begin); 239 } 240 } 241 242 Duration timeout() { 243 if (layout.isFirstBucketEmpty) { 244 return Duration.zero; 245 } 246 return triggerAt - Clock.currTime; 247 } 248 249 bool trigger() { 250 return Clock.currTime > triggerAt; 251 } 252 } 253 254 void act(ref Process process, Tid onSnapshotDone) @trusted { 255 bool tooEarly; 256 receive((FilesystemChange a) { tooEarly = !process.trigger(); }); 257 258 if (tooEarly) { 259 logger.trace("Too early, sleeping for ", process.timeout); 260 Thread.sleep(process.timeout); 261 } 262 263 send(onSync, CreateSnapshot.value); 264 receiveOnly!CreateSnapshotDone; 265 process.updateTrigger(); 266 267 send(onSnapshotDone, CreateSnapshotDone.value); 268 269 logger.info("Next snapshot at the earliest in ", process.timeout); 270 } 271 272 try { 273 Tid onSnapshotDone = () @trusted { return receiveOnly!Tid; }(); 274 275 auto snapshot = () @trusted { return cast() snapshot_; }(); 276 277 auto backend = makeSyncBackend(snapshot); 278 279 auto crypt = makeCrypBackend(snapshot.crypt); 280 open(crypt, backend.flow); 281 scope (exit) 282 crypt.close; 283 284 auto process = Process(backend, snapshot); 285 286 while (true) { 287 act(process, onSnapshotDone); 288 } 289 } catch (OwnerTerminated) { 290 } catch (Exception e) { 291 logger.error(e.msg).collectException; 292 } 293 } 294 295 void actorCreateSnapshot(immutable SnapshotConfig snapshot) nothrow { 296 import std.datetime : Clock; 297 298 static void act(SnapshotConfig snapshot) @safe { 299 auto backend = makeSyncBackend(snapshot); 300 301 auto crypt = makeCrypBackend(snapshot.crypt); 302 open(crypt, backend.flow); 303 scope (exit) 304 crypt.close; 305 306 auto layout = backend.update(snapshot.layout); 307 308 const newSnapshot = () { 309 return Clock.currTime.toUTC.toISOExtString ~ snapshotInProgressSuffix; 310 }(); 311 312 backend.sync(layout, snapshot, newSnapshot); 313 314 backend.publishSnapshot(newSnapshot); 315 backend.removeDiscarded(layout); 316 } 317 318 Tid[] onSnapshotDone; 319 try { 320 bool running = true; 321 while (running) { 322 () @trusted { 323 receive((Tid a) => onSnapshotDone ~= a, (RegisterListenerDone a) { 324 running = false; 325 }); 326 }(); 327 } 328 } catch (Exception e) { 329 logger.error(e.msg).collectException; 330 return; 331 } 332 333 while (true) { 334 try { 335 () @trusted { 336 scope (exit) 337 () { 338 foreach (t; onSnapshotDone) 339 send(t, CreateSnapshotDone.value); 340 }(); 341 receive((CreateSnapshot a) { act(cast() snapshot); }); 342 }(); 343 } catch (OwnerTerminated) { 344 break; 345 } catch (Exception e) { 346 logger.warning(e.msg).collectException; 347 } 348 } 349 }