1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 The design follows a basic actor system for handling the threads. 7 */ 8 module dsnapshot.cmdgroup.watch; 9 10 import core.thread : Thread; 11 import core.time : dur; 12 import logger = std.experimental.logger; 13 import std.algorithm : filter, map; 14 import std.array : empty; 15 import std.concurrency : spawn, spawnLinked, Tid, thisTid, send, receive, 16 receiveTimeout, receiveOnly, OwnerTerminated; 17 import std.exception : collectException; 18 19 import sumtype; 20 21 import dsnapshot.config : Config; 22 import dsnapshot.types; 23 24 import dsnapshot.backend; 25 import dsnapshot.console; 26 import dsnapshot.exception; 27 import dsnapshot.from; 28 import dsnapshot.layout; 29 import dsnapshot.layout_utils; 30 31 version (unittest) { 32 import unit_threaded.assertions; 33 } 34 35 @safe: 36 37 int cli(const Config.Global cglobal, const Config.Watch cwatch, SnapshotConfig[] snapshots) { 38 void act(const SnapshotConfig s) @trusted { 39 auto createSnapshotTid = spawnLinked(&actorCreateSnapshot, cast(immutable) s); 40 auto filterAndTriggerSyncTid = spawnLinked(&actorFilterAndTriggerSync, 41 cast(immutable) s, createSnapshotTid); 42 auto watchTid = spawnLinked(&actorWatch, cast(immutable) s, filterAndTriggerSyncTid); 43 44 send(createSnapshotTid, RegisterCreateSnapshotListener(filterAndTriggerSyncTid)); 45 send(createSnapshotTid, RegisterNewSnapshotListener(thisTid)); 46 send(createSnapshotTid, RegisterListenerDone.value); 47 48 send(filterAndTriggerSyncTid, watchTid); 49 50 send(watchTid, Start.value); 51 52 ulong countSnapshots; 53 while (countSnapshots < cwatch.maxSnapshots) { 54 // TODO: should this be triggered? maybe on ctrl+c? 55 receiveOnly!NewSnapshotReply; 56 countSnapshots++; 57 } 58 } 59 60 foreach (const s; snapshots.filter!(a => cwatch.name.value == a.name)) { 61 logger.info("# Watching ", s.name); 62 scope (exit) 63 logger.info("# Done watching ", s.name); 64 65 try { 66 act(s); 67 } catch (SnapshotException e) { 68 e.errMsg.match!(a => a.print); 69 logger.error(e.msg); 70 } catch (Exception e) { 71 logger.error(e.msg); 72 } 73 74 return 0; 75 } 76 77 logger.info("No snapshot configuration named ", cwatch.name.value); 78 return 1; 79 } 80 81 private: 82 83 enum FilesystemChange { 84 value, 85 } 86 87 enum CreateSnapshot { 88 value, 89 } 90 91 enum CreateSnapshotReply { 92 value, 93 } 94 95 enum NewSnapshotReply { 96 value, 97 } 98 99 enum Shutdown { 100 value, 101 } 102 103 enum Start { 104 value, 105 } 106 107 enum RegisterListenerDone { 108 value, 109 } 110 111 /// If the signal to create a snapshot has been received. 112 struct RegisterCreateSnapshotListener { 113 Tid value; 114 } 115 116 /// If a snapshot has successfully been created. 117 struct RegisterNewSnapshotListener { 118 Tid value; 119 } 120 121 /** Watch a path for changes on the filesystem. 122 */ 123 void actorWatch(immutable SnapshotConfig snapshot, Tid onFsChange) nothrow { 124 import std.datetime : Duration; 125 import fswatch : FileWatch, FileChangeEvent, FileChangeEventType; 126 127 void eventHandler(FileChangeEvent[] events) @safe { 128 if (!events.empty) { 129 () @trusted { 130 send(onFsChange, FilesystemChange.value); 131 receiveOnly!CreateSnapshotReply; 132 }(); 133 } 134 135 foreach (event; events) { 136 final switch (event.type) with (FileChangeEventType) { 137 case createSelf: 138 logger.trace("Observable path created"); 139 break; 140 case removeSelf: 141 logger.trace("Observable path deleted"); 142 break; 143 case create: 144 logger.tracef("'%s' created", event.path); 145 break; 146 case remove: 147 logger.tracef("'%s' removed", event.path); 148 break; 149 case rename: 150 logger.tracef("'%s' renamed to '%s'", event.path, event.newPath); 151 break; 152 case modify: 153 logger.tracef("'%s' contents modified", event.path); 154 break; 155 } 156 } 157 } 158 159 string extractPath() @trusted nothrow { 160 try { 161 auto syncBe = makeSyncBackend(cast() snapshot); 162 return syncBe.flow.match!((None a) => null, 163 (FlowLocal a) => a.src.value.Path.toString, (FlowRsyncToLocal a) => null, 164 (FlowLocalToRsync a) => a.src.value.Path.toString); 165 } catch (Exception e) { 166 logger.warning(e.msg).collectException; 167 } 168 return null; 169 } 170 171 void actFallback(const Duration poll) @trusted { 172 send(onFsChange, FilesystemChange.value); 173 receiveOnly!CreateSnapshotReply; 174 Thread.sleep(poll); 175 } 176 177 void actNormal(string path, const Duration poll) @trusted { 178 auto watcher = FileWatch(path, true); 179 while (true) { 180 eventHandler(watcher.getEvents()); 181 Thread.sleep(poll); 182 } 183 } 184 185 auto path = extractPath; 186 const poll = () { 187 if (path.empty) { 188 logger.info("No local path to watch for changes. Falling back to polling.") 189 .collectException; 190 return 10.dur!"seconds"; 191 } else { 192 logger.infof("Watching %s for changes", path).collectException; 193 // arbitrarily chosen a timeout that is hopefully fast enough but not too fast. 194 return 200.dur!"msecs"; 195 } 196 }(); 197 198 () @trusted { receiveOnly!Start.collectException; }(); 199 200 if (path.empty) { 201 while (true) { 202 try { 203 actFallback(poll); 204 } catch (OwnerTerminated) { 205 break; 206 } catch (Exception e) { 207 logger.warning(e.msg).collectException; 208 } 209 } 210 211 } else { 212 while (true) { 213 try { 214 actNormal(path, poll); 215 } catch (OwnerTerminated) { 216 break; 217 } catch (Exception e) { 218 logger.warning(e.msg).collectException; 219 } 220 } 221 } 222 } 223 224 /** Collect filesystem events to trigger a new snapshot when the first bucket 225 * is empty in the layout. 226 */ 227 void actorFilterAndTriggerSync(immutable SnapshotConfig snapshot_, Tid onSync) nothrow { 228 import std.datetime : Clock, SysTime, Duration; 229 230 static struct Process { 231 @safe: 232 233 SyncBackend syncBe; 234 SnapshotConfig sconf; 235 Layout layout; 236 237 SysTime triggerAt; 238 239 this(SyncBackend syncBe, SnapshotConfig sconf) { 240 this.syncBe = syncBe; 241 this.sconf = sconf; 242 this.updateTrigger(); 243 } 244 245 void updateTrigger() { 246 layout = syncBe.update(sconf.layout); 247 248 if (layout.isFirstBucketEmpty) { 249 triggerAt = Clock.currTime; 250 } else { 251 triggerAt = Clock.currTime + (layout.snapshotTimeInBucket(0) 252 .get - layout.times[0].begin); 253 } 254 } 255 256 Duration timeout() { 257 if (layout.isFirstBucketEmpty) { 258 return Duration.zero; 259 } 260 return triggerAt - Clock.currTime; 261 } 262 263 bool trigger() { 264 return Clock.currTime > triggerAt; 265 } 266 } 267 268 void act(ref Process process, Tid onSnapshotDone) @trusted { 269 bool tooEarly; 270 receive((FilesystemChange a) { tooEarly = !process.trigger(); }); 271 272 if (tooEarly) { 273 logger.trace("Too early, sleeping for ", process.timeout); 274 Thread.sleep(process.timeout); 275 } 276 277 send(onSync, CreateSnapshot.value); 278 receiveOnly!CreateSnapshotReply; 279 process.updateTrigger(); 280 281 send(onSnapshotDone, CreateSnapshotReply.value); 282 283 logger.info("Next snapshot at the earliest in ", process.timeout); 284 } 285 286 try { 287 Tid onSnapshotDone = () @trusted { return receiveOnly!Tid; }(); 288 289 auto snapshot = () @trusted { return cast() snapshot_; }(); 290 291 auto backend = makeSyncBackend(snapshot); 292 293 auto crypt = makeCrypBackend(snapshot.crypt); 294 open(crypt, backend.flow); 295 scope (exit) 296 crypt.close; 297 298 auto process = Process(backend, snapshot); 299 300 while (true) { 301 act(process, onSnapshotDone); 302 } 303 } catch (OwnerTerminated) { 304 } catch (Exception e) { 305 logger.error(e.msg).collectException; 306 } 307 } 308 309 void actorCreateSnapshot(immutable SnapshotConfig snapshot) nothrow { 310 import std.datetime : Clock; 311 312 static void act(SnapshotConfig snapshot) @safe { 313 auto backend = makeSyncBackend(snapshot); 314 315 auto crypt = makeCrypBackend(snapshot.crypt); 316 open(crypt, backend.flow); 317 scope (exit) 318 crypt.close; 319 320 auto layout = backend.update(snapshot.layout); 321 322 const newSnapshot = () { 323 return Clock.currTime.toUTC.toISOExtString ~ snapshotInProgressSuffix; 324 }(); 325 326 backend.sync(layout, snapshot, newSnapshot); 327 328 backend.publishSnapshot(newSnapshot); 329 backend.removeDiscarded(layout); 330 } 331 332 Tid[] onSnapshotDone; 333 Tid[] onNewSnapshot; 334 try { 335 bool running = true; 336 while (running) { 337 () @trusted { 338 receive((RegisterCreateSnapshotListener a) { 339 onSnapshotDone ~= a.value; 340 }, (RegisterNewSnapshotListener a) { onNewSnapshot ~= a.value; }, 341 (RegisterListenerDone a) { running = false; }); 342 }(); 343 } 344 } catch (Exception e) { 345 logger.error(e.msg).collectException; 346 return; 347 } 348 349 while (true) { 350 try { 351 () @trusted { 352 scope (exit) 353 () { 354 foreach (t; onSnapshotDone) 355 send(t, CreateSnapshotReply.value); 356 }(); 357 receive((CreateSnapshot a) { 358 act(cast() snapshot); 359 foreach (t; onNewSnapshot) 360 send(t, NewSnapshotReply.value); 361 }); 362 }(); 363 } catch (OwnerTerminated) { 364 break; 365 } catch (Exception e) { 366 logger.warning(e.msg).collectException; 367 } 368 } 369 }