diff --git a/src/fime/data.py b/src/fime/data.py index 4609f8f..70d4b14 100644 --- a/src/fime/data.py +++ b/src/fime/data.py @@ -5,7 +5,7 @@ import os from collections.abc import MutableMapping from datetime import datetime, date, timedelta from threading import Thread, Event -from typing import List +from typing import List, Tuple, Dict try: from PySide6 import QtCore @@ -16,66 +16,6 @@ save_delay = 3 * 60 max_jira_tasks = 50 -class Tasks: - def __init__(self, data): - self._data = data - if "tasks" in self._data: - self._tasks = list(map(lambda x: base64.b64decode(x.encode("utf-8")).decode("utf-8"), self._data["tasks"])) - else: - self._tasks = [] - if "jira_tasks" in self._data: - self._jira_tasks_usage = dict() - for k, v in self._data["jira_tasks"].items(): - key = base64.b64decode(k.encode("utf-8")).decode("utf-8") - self._jira_tasks_usage[key] = datetime.fromisoformat(v) - self._jira_tasks = sorted(self._jira_tasks_usage.keys(), key=lambda x: self._jira_tasks_usage[x]) - else: - self._jira_tasks_usage = dict() - self._jira_tasks = [] - - @property - def tasks(self) -> List[str]: - return self._tasks - - @tasks.setter - def tasks(self, tasks): - self._tasks = tasks - encoded_tasks = list(map(lambda x: base64.b64encode(x.encode("utf-8")).decode("utf-8"), self._tasks)) - self._data["tasks"] = encoded_tasks - - @property - def jira_tasks(self): - return self._jira_tasks - - def add_jira_task(self, task_name): - if task_name in self._jira_tasks_usage: - self._jira_tasks.remove(task_name) # move to end, to make visible again - self._jira_tasks.append(task_name) - self._jira_tasks_usage[task_name] = datetime.now() - if len(self._jira_tasks_usage) > max_jira_tasks: - sorted_tasks = sorted(self._jira_tasks_usage.keys(), key=lambda x: self._jira_tasks_usage[x]) - overhang_tasks = sorted_tasks[:len(sorted_tasks) - max_jira_tasks] - for task in overhang_tasks: - del self._jira_tasks_usage[task] - self._save_jira_tasks() - - def update_jira_task_usage(self, task_name): - if task_name in self._jira_tasks_usage: - self._jira_tasks_usage[task_name] = datetime.now() - self._save_jira_tasks() - - def _save_jira_tasks(self): - serialized = dict() - for k, v in self._jira_tasks_usage.items(): - key = base64.b64encode(k.encode("utf-8")).decode("utf-8") - serialized[key] = datetime.isoformat(v) - self._data["jira_tasks"] = serialized - - @property - def all_tasks(self): - return self.tasks + self.jira_tasks - - class Data(MutableMapping): def __init__(self): data_dir_path = QtCore.QStandardPaths.writableLocation(QtCore.QStandardPaths.AppDataLocation) @@ -143,8 +83,169 @@ class Data(MutableMapping): return f"{type(self).__name__}({self._cache})" +class Tasks: + def __init__(self, data: Data): + self._data = data + if "tasks" in self._data: + self._tasks = list(map(lambda x: base64.b64decode(x.encode("utf-8")).decode("utf-8"), self._data["tasks"])) + else: + self._tasks = [] + if "jira_tasks" in self._data: + self._jira_tasks_usage = dict() + for k, v in self._data["jira_tasks"].items(): + key = base64.b64decode(k.encode("utf-8")).decode("utf-8") + self._jira_tasks_usage[key] = datetime.fromisoformat(v) + self._jira_tasks = sorted(self._jira_tasks_usage.keys(), key=lambda x: self._jira_tasks_usage[x]) + else: + self._jira_tasks_usage = dict() + self._jira_tasks = [] + + @property + def tasks(self) -> List[str]: + return self._tasks + + @tasks.setter + def tasks(self, tasks): + self._tasks = tasks + encoded_tasks = list(map(lambda x: base64.b64encode(x.encode("utf-8")).decode("utf-8"), self._tasks)) + self._data["tasks"] = encoded_tasks + + @property + def jira_tasks(self): + return self._jira_tasks + + def add_jira_task(self, task_name): + if task_name in self._jira_tasks_usage: + self._jira_tasks.remove(task_name) # move to end, to make visible again + self._jira_tasks.append(task_name) + self._jira_tasks_usage[task_name] = datetime.now() + if len(self._jira_tasks_usage) > max_jira_tasks: + sorted_tasks = sorted(self._jira_tasks_usage.keys(), key=lambda x: self._jira_tasks_usage[x]) + overhang_tasks = sorted_tasks[:len(sorted_tasks) - max_jira_tasks] + for task in overhang_tasks: + del self._jira_tasks_usage[task] + self._save_jira_tasks() + + def update_jira_task_usage(self, task_name): + if task_name in self._jira_tasks_usage: + self._jira_tasks_usage[task_name] = datetime.now() + self._save_jira_tasks() + + def _save_jira_tasks(self): + serialized = dict() + for k, v in self._jira_tasks_usage.items(): + key = base64.b64encode(k.encode("utf-8")).decode("utf-8") + serialized[key] = datetime.isoformat(v) + self._data["jira_tasks"] = serialized + + @property + def all_tasks(self): + return self.tasks + self.jira_tasks + + +class LogCommentsData: + _log_key = "log" + _comments_key = "comments" + _init_day = {_log_key: [], _comments_key: {}} + + # Data.__setitem__ only gets triggered, when there is a direct assignment (in contrast to assignment down the tree) + # this necessitates reassigning the whole month, when triggering a save is intended + def __init__(self, data: Data): + self._data = data + self._converted = set() + + def _ensure_format(self, pdate: date): + month_str = pdate.strftime("%Y-%m") + if month_str in self._converted: + return + if month_str not in self._data: + return + day_str = pdate.strftime("%d") + for day, log in self._data[month_str].items(): + if type(log) is list: + self._data[month_str][day] = { + self._log_key: log, + self._comments_key: {} + } + self._converted.add(month_str) + + def get_log(self, pdate: date) -> List[Tuple[datetime, str]]: + self._ensure_format(pdate) + month_str = pdate.strftime("%Y-%m") + day_str = pdate.strftime("%d") + if month_str not in self._data: + return [] + if day_str not in self._data[month_str]: + return [] + log_data = self._data[month_str][day_str][self._log_key] + ret = [] + for entry in log_data: + tstr, b64str = entry.split() + start_time = datetime.combine(pdate, datetime.strptime(tstr, "%H:%M").time()) + task = base64.b64decode(b64str.encode("utf-8")).decode("utf-8") + ret.append((start_time, task)) + return ret + + def set_log(self, log: List[Tuple[datetime, str]]): + if not log: + return + pdate = log[0][0].date() + self._ensure_format(pdate) + encoded = [] + for entry in log: + tstr = entry[0].strftime("%H:%M") + b64str = base64.b64encode(entry[1].encode("utf-8")).decode("utf-8") + encoded.append(f"{tstr} {b64str}") + month_str = pdate.strftime("%Y-%m") + day_str = pdate.strftime("%d") + month = self._data.setdefault(month_str, {}) + month.setdefault(day_str, self._init_day)[self._log_key] = encoded + self._data[month_str] = month + + def add_log_entry(self, task: str): + now = datetime.now() + self._ensure_format(now) + tstr = now.strftime("%H:%M") + b64str = base64.b64encode(task.encode("utf-8")).decode("utf-8") + encoded = f"{tstr} {b64str}" + month_str = now.strftime("%Y-%m") + day_str = now.strftime("%d") + month = self._data.setdefault(month_str, {}) + month.setdefault(day_str, self._init_day)[self._log_key].append(encoded) + self._data[month_str] = month + + def get_comments(self, pdate: date) -> Dict[str, str]: + self._ensure_format(pdate) + month_str = pdate.strftime("%Y-%m") + day_str = pdate.strftime("%d") + if month_str not in self._data: + return dict() + if day_str not in self._data[month_str]: + return dict() + comment_data = self._data[month_str][day_str][self._comments_key] + ret = dict() + for k, v in comment_data: + k_dec = base64.b64decode(k.encode("utf-8")).decode("utf-8") + v_dec = base64.b64decode(v.encode("utf-8")).decode("utf-8") + ret[k_dec] = v_dec + return ret + + def set_comments(self, pdate: date, comments: Dict[str, str]): + self._ensure_format(pdate) + encoded = dict() + for k, v in comments: + k_enc = base64.b64encode(k.encode("utf-8")).decode("utf-8") + v_enc = base64.b64encode(v.encode("utf-8")).decode("utf-8") + encoded[k_enc] = v_enc + month_str = pdate.strftime("%Y-%m") + day_str = pdate.strftime("%d") + month = self._data.setdefault(month_str, {}) + month.setdefault(day_str, self._init_day)[self._comments_key] = encoded + self._data[month_str] = month + + class Log: - def __init__(self, data): + def __init__(self, data: LogCommentsData): self._data = data def cleanup(): @@ -152,41 +253,30 @@ class Log: atexit.register(cleanup) - def log(self, task, ptime=None): - if ptime is None: - ptime = datetime.now() - # round to nearest minute - round_min = timedelta(minutes=round(ptime.second/60)) - ptime = ptime - timedelta(seconds=ptime.second) + round_min - # month dance necessary to trigger Data.__setitem__ - month = self._data.setdefault(ptime.strftime("%Y-%m"), {}) - month.setdefault(ptime.strftime("%d"), [])\ - .append(f"{ptime.strftime('%H:%M')} {base64.b64encode(task.encode('utf-8')).decode('utf-8')}") - self._data[ptime.strftime("%Y-%m")] = month + def log(self, task): + self._data.add_log_entry(task) - def last_log(self, pdate=None): - if pdate is None: - pdate = date.today() - if pdate.strftime("%Y-%m") not in self._data \ - or pdate.strftime("%d") not in self._data[pdate.strftime("%Y-%m")] \ - or len(self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")]) == 0: + def last_log(self): + log = self._data.get_log(date.today()) + if not log: return None - last = base64.b64decode( - self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8") - if last == "End": - month = self._data[pdate.strftime("%Y-%m")] - del month[pdate.strftime("%d")][-1] - self._data[pdate.strftime("%Y-%m")] = month - if len(self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")]) == 0: + if log[-1] == "End": + del log[-1] + self._data.set_log(log) + if not log: return None - last = base64.b64decode( - self._data[pdate.strftime("%Y-%m")][pdate.strftime("%d")][-1].split()[1].encode("utf-8")).decode("utf-8") - return last + return log[-1] - def report(self, pdate=None): - if pdate is None: - pdate = date.today() - return Report(self._data, pdate) + def report(self): + return Report(self._data, date.today()) + + def worklog(self): + return Worklog(self._data) + + +class Worklog: + def __init__(self, data): + self._data = data class Report: