import os import json import base64 import atexit from datetime import datetime, date, time, timedelta from threading import Thread, Event from collections.abc import MutableMapping from PySide2 import QtCore data_dir_path = os.path.join(QtCore.QStandardPaths.writableLocation(QtCore.QStandardPaths.AppDataLocation), "fimefracking") tasks_path = os.path.join(data_dir_path, "tasks.json") data_path = os.path.join(data_dir_path, "data_{}.json") save_delay = 3 * 60 class Tasks: def __init__(self): if not os.path.exists(data_dir_path): os.mkdir(data_dir_path) if os.path.exists(tasks_path): with open(tasks_path, "r") as f: encoded_tasks = json.loads(f.read()) self._tasks = list(map(lambda x: base64.b64decode(x.encode("utf-8")).decode("utf-8"), encoded_tasks)) else: self._tasks = [] @property def tasks(self): return self._tasks @tasks.setter def tasks(self, tasks): self._tasks = tasks self._save() def _save(self): print("... saving tasks ...") encoded_tasks = list(map(lambda x: base64.b64encode(x.encode("utf-8")).decode("utf-8"), self._tasks)) with open(tasks_path, "w+") as f: f.write(json.dumps(encoded_tasks)) class Data(MutableMapping): def __init__(self): if not os.path.exists(data_dir_path): os.mkdir(data_dir_path) self._cache = {} self._hot_keys = [] self._trunning = False self._tevent = Event() self._thread = None def cleanup(): self._trunning = False self._tevent.set() if self._thread: self._thread.join() atexit.register(cleanup) def __getitem__(self, key): dpath = data_path.format(key) if key not in self._cache and os.path.exists(dpath): with open(dpath, "r") as f: self._cache[key] = json.loads(f.read()) return self._cache[key] def __setitem__(self, key, value): self._cache[key] = value self._hot_keys.append(key) self._schedule_save() def _schedule_save(self): if self._trunning: return self._trunning = True self._thread = Thread(target=self._executor, daemon=True) self._thread.start() def _executor(self): while self._trunning: self._tevent.wait(save_delay) self._save() def _save(self): for key in self._hot_keys: print(f"... saving dict {key} ...") to_write = self._cache[key] # apparently thread-safe with open(data_path.format(key), "w+") as f: f.write(json.dumps(to_write)) self._hot_keys = [] self._saving = False def __delitem__(self, key): return NotImplemented def __iter__(self): return NotImplemented def __len__(self): # TODO use glob? return NotImplemented def __repr__(self): return f"{type(self).__name__}({self._cache})" class Log: def __init__(self): self._data = Data() def cleanup(): self.log("End") atexit.register(cleanup) def log(self, task, ptime=None): if ptime is None: ptime = datetime.now() # round to nearest minute round_min = timedelta(minutes=round(ptime.second/60)) ptime = ptime - timedelta(seconds=ptime.second) + round_min # month dance necessary to trigger Data.__setitem__ month = self._data.setdefault(ptime.strftime("%Y-%m"), {}) month.setdefault(ptime.strftime("%d"), [])\ .append(f"{ptime.strftime('%H:%M')} {base64.b64encode(task.encode('utf-8')).decode('utf-8')}") self._data[ptime.strftime("%Y-%m")] = month def last_log(self, pdate=None): if pdate is None: pdate = date.today() if pdate.strftime("%Y-%m") not in self._data \ or pdate.strftime("%d") not in self._data[pdate.strftime("%Y-%m")] \ or len(self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")]) == 0: return None last = base64.b64decode( self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8") if last == "End": month = self._data[pdate.strftime("%Y-%m")] del month[pdate.strftime("%d")][-1] self._data[pdate.strftime("%Y-%m")] = month last = base64.b64decode( self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8") return last def report(self, pdate=None): if pdate is None: pdate = date.today() return Report(self._data, pdate) class Report: def __init__(self, data, pdate): self._data = data self._date = pdate def report(self): tmp = [] for e in self._data[self._date.strftime("%Y-%m")][self._date.strftime("%d")]: tstr, b64str = e.split() task = base64.b64decode(b64str.encode("utf-8")).decode("utf-8") start_time = datetime.combine(self._date, datetime.strptime(tstr, "%H:%M").time()) tmp.append((task, start_time)) if self._date == date.today(): tmp.append(("End", datetime.now())) ret = [] dsum = timedelta() for i, t in enumerate(tmp): task, start_time = t if i < len(tmp) - 1: end_time = tmp[i+1][1] duration = end_time - start_time dsum += duration dhours, rem = divmod(duration.seconds, 3600) dmins, _ = divmod(rem, 60) ret.append([task, start_time.strftime("%H:%M"), f"{dhours:02d}:{dmins:02d}"]) else: ret.append([task, start_time.strftime("%H:%M"), ""]) ret.append(["", "", ""]) dhours, rem = divmod(dsum.seconds, 3600) dmins, _ = divmod(rem, 60) ret.append(["Sum", "", f"{dhours:02d}:{dmins:02d}"]) return ret def save(self, report): if self._date == date.today(): report = report[:-3] # cut off sum display and end time else: report = report[:-2] # cut off sum display save_list = [] for tstr, ttime, _ in report: b64str = base64.b64encode(tstr.encode("utf-8")).decode("utf-8") save_string = f"{ttime} {b64str}" save_list.append(save_string) # month dance necessary to trigger Data.__setitem__ month = self._data[self._date.strftime("%Y-%m")] if month[self._date.strftime("%d")] == save_list: # no changes return month[self._date.strftime("%d")] = save_list self._data[self._date.strftime("%Y-%m")] = month def prev_next_avail(self): prev = (self._date - timedelta(days=1)).strftime("%d") in self._data[self._date.strftime("%Y-%m")] _next = (self._date + timedelta(days=1)).strftime("%d") in self._data[self._date.strftime("%Y-%m")] return prev, _next def previous(self): self._date = self._date - timedelta(days=1) def next(self): self._date = self._date + timedelta(days=1) def date(self): return self._date.strftime("%Y-%m-%d")