fime/data.py
2020-03-03 23:20:36 +01:00

261 lines
8.8 KiB
Python

import os
import json
import base64
import atexit
from datetime import datetime, date, time, timedelta
from threading import Thread, Event
from collections.abc import MutableMapping
from PySide2 import QtCore
data_dir_path = os.path.join(QtCore.QStandardPaths.writableLocation(QtCore.QStandardPaths.AppDataLocation),
"fimefracking")
tasks_path = os.path.join(data_dir_path, "tasks.json")
data_path = os.path.join(data_dir_path, "data_{}.json")
save_delay = 3 * 60
class Tasks:
def __init__(self):
if not os.path.exists(data_dir_path):
os.mkdir(data_dir_path)
if os.path.exists(tasks_path):
with open(tasks_path, "r") as f:
encoded_tasks = json.loads(f.read())
self._tasks = list(map(lambda x: base64.b64decode(x.encode("utf-8")).decode("utf-8"), encoded_tasks))
else:
self._tasks = []
@property
def tasks(self):
return self._tasks
@tasks.setter
def tasks(self, tasks):
self._tasks = tasks
self._save()
def _save(self):
print("... saving tasks ...")
encoded_tasks = list(map(lambda x: base64.b64encode(x.encode("utf-8")).decode("utf-8"), self._tasks))
with open(tasks_path, "w+") as f:
f.write(json.dumps(encoded_tasks))
class Data(MutableMapping):
def __init__(self):
if not os.path.exists(data_dir_path):
os.mkdir(data_dir_path)
self._cache = {}
self._hot_keys = set()
self._trunning = False
self._tevent = Event()
self._thread = None
def cleanup():
self._trunning = False
self._tevent.set()
if self._thread:
self._thread.join()
atexit.register(cleanup)
def __getitem__(self, key):
dpath = data_path.format(key)
if key not in self._cache and os.path.exists(dpath):
with open(dpath, "r") as f:
self._cache[key] = json.loads(f.read())
return self._cache[key]
def __setitem__(self, key, value):
self._cache[key] = value
self._hot_keys.add(key)
self._schedule_save()
def _schedule_save(self):
if self._trunning:
return
self._trunning = True
self._thread = Thread(target=self._executor, daemon=True)
self._thread.start()
def _executor(self):
while self._trunning:
self._tevent.wait(save_delay)
self._save()
def _save(self):
for key in self._hot_keys:
print(f"... saving dict {key} ...")
to_write = self._cache[key] # apparently thread-safe
with open(data_path.format(key), "w+") as f:
f.write(json.dumps(to_write))
self._hot_keys = set()
self._saving = False
def __delitem__(self, key):
return NotImplemented
def __iter__(self):
return NotImplemented
def __len__(self):
# TODO use glob?
return NotImplemented
def __repr__(self):
return f"{type(self).__name__}({self._cache})"
class Log:
def __init__(self):
self._data = Data()
def cleanup():
self.log("End")
atexit.register(cleanup)
def log(self, task, ptime=None):
if ptime is None:
ptime = datetime.now()
# round to nearest minute
round_min = timedelta(minutes=round(ptime.second/60))
ptime = ptime - timedelta(seconds=ptime.second) + round_min
# month dance necessary to trigger Data.__setitem__
month = self._data.setdefault(ptime.strftime("%Y-%m"), {})
month.setdefault(ptime.strftime("%d"), [])\
.append(f"{ptime.strftime('%H:%M')} {base64.b64encode(task.encode('utf-8')).decode('utf-8')}")
self._data[ptime.strftime("%Y-%m")] = month
def last_log(self, pdate=None):
if pdate is None:
pdate = date.today()
if pdate.strftime("%Y-%m") not in self._data \
or pdate.strftime("%d") not in self._data[pdate.strftime("%Y-%m")] \
or len(self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")]) == 0:
return None
last = base64.b64decode(
self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8")
if last == "End":
month = self._data[pdate.strftime("%Y-%m")]
del month[pdate.strftime("%d")][-1]
self._data[pdate.strftime("%Y-%m")] = month
if len(self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")]) == 0:
return None
last = base64.b64decode(
self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8")
return last
def report(self, pdate=None):
if pdate is None:
pdate = date.today()
return Report(self._data, pdate)
class Report:
def __init__(self, data, pdate):
self._data = data
self._date = pdate
self._sum_len = 0
self._prev = None
self._next = None
self._update_prev_next()
def report(self):
tmp = []
if self._date.strftime("%Y-%m") in self._data \
and self._date.strftime("%d") in self._data[self._date.strftime("%Y-%m")]:
for e in self._data[self._date.strftime("%Y-%m")][self._date.strftime("%d")]:
tstr, b64str = e.split()
task = base64.b64decode(b64str.encode("utf-8")).decode("utf-8")
start_time = datetime.combine(self._date, datetime.strptime(tstr, "%H:%M").time())
tmp.append((task, start_time))
if self._date == date.today():
tmp.append(("End", datetime.now()))
ret = []
tasks_sums = {}
total_sum = timedelta()
for i, t in enumerate(tmp):
task, start_time = t
if i < len(tmp) - 1:
end_time = tmp[i+1][1]
duration = end_time - start_time
if task != "Pause":
task_sum = tasks_sums.setdefault(task, timedelta())
task_sum += duration
tasks_sums[task] = task_sum
total_sum += duration
dhours, rem = divmod(duration.seconds, 3600)
dmins, _ = divmod(rem, 60)
ret.append([task, start_time.strftime("%H:%M"), f"{dhours:02d}:{dmins:02d}"])
else:
ret.append([task, start_time.strftime("%H:%M"), ""])
ret.append(["", "", ""])
ret.append(["", "Sums", ""])
for k, v in tasks_sums.items():
dhours, rem = divmod(v.seconds, 3600)
dmins, _ = divmod(rem, 60)
ret.append([k, "", f"{dhours:02d}:{dmins:02d}"])
dhours, rem = divmod(total_sum.seconds, 3600)
dmins, _ = divmod(rem, 60)
ret.append(["Total sum", "", f"{dhours:02d}:{dmins:02d}"])
self._sum_len = 3 + len(tasks_sums)
if self._date == date.today():
self._sum_len += 1
return ret, len(ret) - (4 + len(tasks_sums))
def save(self, report):
report = report[:-self._sum_len]
if not report:
return
save_list = []
for tstr, ttime, _ in report:
b64str = base64.b64encode(tstr.encode("utf-8")).decode("utf-8")
save_string = f"{ttime} {b64str}"
save_list.append(save_string)
# month dance necessary to trigger Data.__setitem__
month = self._data[self._date.strftime("%Y-%m")]
if month[self._date.strftime("%d")] == save_list: # no changes
return
month[self._date.strftime("%d")] = save_list
self._data[self._date.strftime("%Y-%m")] = month
def _update_prev_next(self):
self._prev = None
self._next = None
for i in range(1, 32):
new_date = self._date - timedelta(days=i)
if new_date.strftime("%Y-%m") not in self._data:
break
if new_date.strftime("%d") in self._data[new_date.strftime("%Y-%m")]:
self._prev = new_date
break
for i in range(1, 32):
new_date = self._date + timedelta(days=i)
if new_date > date.today():
break
if new_date.strftime("%Y-%m") not in self._data:
break
if new_date.strftime("%d") in self._data[new_date.strftime("%Y-%m")]:
self._next = new_date
break
def prev_next_avail(self):
return self._prev is not None, self._next is not None
def previous(self):
self._date = self._prev
self._update_prev_next()
def next(self):
self._date = self._next
self._update_prev_next()
def date(self):
return self._date.strftime("%Y-%m-%d")