fime/data.py
2020-02-24 16:54:28 +01:00

219 lines
7.2 KiB
Python

import os
import json
import base64
import atexit
from datetime import datetime, date, time, timedelta
from threading import Thread, Event
from collections.abc import MutableMapping
from PySide2 import QtCore
data_dir_path = os.path.join(QtCore.QStandardPaths.writableLocation(QtCore.QStandardPaths.AppDataLocation),
"fimefracking")
tasks_path = os.path.join(data_dir_path, "tasks.json")
data_path = os.path.join(data_dir_path, "data_{}.json")
#save_delay = 3 * 60
save_delay = 3
class Tasks:
def __init__(self):
if not os.path.exists(data_dir_path):
os.mkdir(data_dir_path)
if os.path.exists(tasks_path):
with open(tasks_path, "r") as f:
encoded_tasks = json.loads(f.read())
self._tasks = list(map(lambda x: base64.b64decode(x.encode("utf-8")).decode("utf-8"), encoded_tasks))
else:
self._tasks = []
@property
def tasks(self):
return self._tasks
@tasks.setter
def tasks(self, tasks):
self._tasks = tasks
self._save()
def _save(self):
print("... saving tasks ...")
encoded_tasks = list(map(lambda x: base64.b64encode(x.encode("utf-8")).decode("utf-8"), self._tasks))
with open(tasks_path, "w+") as f:
f.write(json.dumps(encoded_tasks))
class Data(MutableMapping):
def __init__(self):
if not os.path.exists(data_dir_path):
os.mkdir(data_dir_path)
self._cache = {}
self._hot_keys = []
self._trunning = False
self._tevent = Event()
self._thread = None
def cleanup():
self._trunning = False
self._tevent.set()
if self._thread:
self._thread.join()
atexit.register(cleanup)
def __getitem__(self, key):
dpath = data_path.format(key)
if key not in self._cache and os.path.exists(dpath):
with open(dpath, "r") as f:
self._cache[key] = json.loads(f.read())
return self._cache[key]
def __setitem__(self, key, value):
self._cache[key] = value
self._hot_keys.append(key)
self._schedule_save()
def _schedule_save(self):
if self._trunning:
return
self._trunning = True
self._thread = Thread(target=self._executor, daemon=True)
self._thread.start()
def _executor(self):
while self._trunning:
self._tevent.wait(save_delay)
self._save()
def _save(self):
for key in self._hot_keys:
print(f"... saving dict {key} ...")
to_write = self._cache[key] # apparently thread-safe
with open(data_path.format(key), "w+") as f:
f.write(json.dumps(to_write))
self._hot_keys = []
self._saving = False
def __delitem__(self, key):
return NotImplemented
def __iter__(self):
return NotImplemented
def __len__(self):
# TODO use glob?
return NotImplemented
def __repr__(self):
return f"{type(self).__name__}({self._cache})"
class Log:
def __init__(self):
self._data = Data()
def cleanup():
self.log("End")
atexit.register(cleanup)
def log(self, task, ptime=None):
if ptime is None:
ptime = datetime.now()
# round to nearest minute
round_min = timedelta(minutes=round(ptime.second/60))
ptime = ptime - timedelta(seconds=ptime.second) + round_min
# month dance necessary to trigger Data.__setitem__
month = self._data.setdefault(ptime.strftime("%Y-%m"), {})
month.setdefault(ptime.strftime("%d"), [])\
.append(f"{ptime.strftime('%H:%M')} {base64.b64encode(task.encode('utf-8')).decode('utf-8')}")
self._data[ptime.strftime("%Y-%m")] = month
def last_log(self, pdate=None):
if pdate is None:
pdate = date.today()
if pdate.strftime("%Y-%m") not in self._data \
or pdate.strftime("%d") not in self._data[pdate.strftime("%Y-%m")] \
or len(self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")]) == 0:
return None
last = base64.b64decode(
self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8")
if last == "End":
month = self._data[pdate.strftime("%Y-%m")]
del month[pdate.strftime("%d")][-1]
self._data[pdate.strftime("%Y-%m")] = month
last = base64.b64decode(
self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8")
return last
def report(self, pdate=None):
if pdate is None:
pdate = date.today()
return Report(self._data, pdate)
class Report:
def __init__(self, data, pdate):
self._data = data
self._date = pdate
def report(self):
tmp = []
for e in self._data[self._date.strftime("%Y-%m")][self._date.strftime("%d")]:
tstr, b64str = e.split()
task = base64.b64decode(b64str.encode("utf-8")).decode("utf-8")
start_time = datetime.combine(self._date, datetime.strptime(tstr, "%H:%M").time())
tmp.append((task, start_time))
if self._date == date.today():
tmp.append(("End", datetime.now()))
ret = []
dsum = timedelta()
for i, t in enumerate(tmp):
task, start_time = t
if i < len(tmp) - 1:
end_time = tmp[i+1][1]
duration = end_time - start_time
dsum += duration
dhours, rem = divmod(duration.seconds, 3600)
dmins, _ = divmod(rem, 60)
ret.append((task, start_time.strftime("%H:%M"), f"{dhours:02d}:{dmins:02d}"))
else:
ret.append((task, start_time.strftime("%H:%M"), ""))
ret.append(("", "", ""))
dhours, rem = divmod(dsum.seconds, 3600)
dmins, _ = divmod(rem, 60)
ret.append(("Sum", "", f"{dhours:02d}:{dmins:02d}"))
return ret
def save(self, report):
if self._date == date.today():
report = report[:-3] # cut off sum display and end time
else:
report = report[:-2] # cut off sum display
save_list = []
for tstr, ttime, _ in report:
b64str = base64.b64encode(tstr.encode("utf-8")).decode("utf-8")
save_string = f"{ttime} {b64str}"
save_list.append(save_string)
# month dance necessary to trigger Data.__setitem__
month = self._data[self._date.strftime("%Y-%m")]
if month[self._date.strftime("%d")] == save_list: # no changes
return
month[self._date.strftime("%d")] = save_list
self._data[self._date.strftime("%Y-%m")] = month
def prev_next_avail(self):
prev = (self._date - timedelta(days=1)).strftime("%d") in self._data[self._date.strftime("%Y-%m")]
_next = (self._date + timedelta(days=1)).strftime("%d") in self._data[self._date.strftime("%Y-%m")]
return prev, _next
def previous(self):
self._date = self._date - timedelta(days=1)
def next(self):
self._date = self._date + timedelta(days=1)