"""Tempo related functions and classes"""
import logging
import os
import sys
from datetime import date
from typing import Optional
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from jira import JIRA, Issue
from jira.client import ResultList
from tempoapiclient import client as Client
from metrics.date_utils import lookBack, weekdays
from metrics.tempo_config import EUR2SEK, YESTERDAY
[docs]class TempoData:
"""Tempo data class."""
client: Client.Tempo
jira_client: JIRA
raw: pd.DataFrame
data: pd.DataFrame
padded_data: pd.DataFrame
this_year: int
last_year: int
def __init__(
self,
tempo_base_url: str = "https://api.tempo.io/4",
tempo_key: Optional[str] = None,
jira_base_url: str = "https://verifa.atlassian.net",
jira_user: Optional[str] = None,
jira_api_token: Optional[str] = None,
) -> None:
tempo_key = tempo_key or os.environ.get("TEMPO_KEY")
if tempo_key is None:
sys.exit("Tempo key not provided or TEMPO_KEY not set")
jira_user = jira_user or os.environ.get("JIRA_USER")
if jira_user is None:
sys.exit("Jira User not provided or JIRA_USER not set")
jira_api_token = jira_api_token or os.environ.get("JIRA_API_TOKEN")
if jira_api_token is None:
sys.exit("Jira API token not provided or JIRA_API_TOKEN not set")
self.client = Client.Tempo(auth_token=tempo_key, base_url=tempo_base_url)
self.jira_client = JIRA(server=jira_base_url, basic_auth=(jira_user, jira_api_token))
self.raw = pd.DataFrame()
self.data = pd.DataFrame()
self.issues = pd.DataFrame()
[docs] def load(self, from_date: str = "1970-01-01", to_date: str = str(date.today()), crew=pd.DataFrame()) -> None:
"""Fetch and populate data from Tempo for the given date range"""
# Fetch data from tempo
logs = self.client.get_worklogs(dateFrom=from_date, dateTo=to_date)
self.raw = pd.json_normalize(logs)
self.data = self.raw[["issue.id", "timeSpentSeconds", "billableSeconds", "startDate", "author.accountId"]]
self.data.columns = ["IssueId", "Time", "Billable", "Date", "UserId"]
# Merge the data
issues = self.allJiraIssues()
users = self.allJiraUsers()
self.data = self.data.merge(issues, on="IssueId")
self.data = self.data.merge(users, on="UserId")
# Process the data according to our needs
df = pd.DataFrame(self.data.loc[:, ("Key")].str.split("-", n=1).tolist(), columns=["Group", "Number"])
self.data.loc[:, ("Group")] = df["Group"]
self.data.loc[:, ("Date")] = pd.to_datetime(self.data.loc[:, ("Date")], format="%Y-%m-%d")
self.data.loc[:, ("Time")] = self.data.loc[:, ("Time")] / 3600
self.data.loc[:, ("Billable")] = self.data.loc[:, ("Billable")] / 3600
self.data.loc[:, ("Internal")] = self.data.loc[:, ("Time")] - self.data.loc[:, ("Billable")]
self.data.loc[:, ("Year")] = self.data.loc[:, ("Date")].dt.year
self.this_year = self.data["Year"].unique().max()
self.last_year = self.this_year - 1
[docs] def allJiraIssues(self) -> pd.DataFrame:
"""Fetches all the JIRA issues with IssueId and Key columns as DataFrame"""
start_at = 0
res_raw: ResultList[Issue] = ResultList([])
while True:
issues = self.jira_client.search_issues(
jql_str="ORDER BY created DESC", maxResults=1000, startAt=start_at
) # this returns only 100 everytime
if len(issues) == 0:
break
res_raw.extend(issues)
start_at += len(issues)
res = map(lambda r: [int(r.id), r.key], res_raw)
jira = pd.DataFrame(res)
jira.columns = ["IssueId", "Key"]
return jira
[docs] def allJiraUsers(self) -> pd.DataFrame:
"""Fetches all the JIRA users with UserId and User coloums as DataFrame"""
raw_users = self.jira_client.search_users(startAt=0, maxResults=200, query="*")
_res = map(lambda r: [r.displayName, r.accountId], raw_users)
users = pd.DataFrame(_res)
users.columns = ["User", "UserId"]
return users
[docs] def injectRates(self, rates: pd.DataFrame) -> None:
"""Modify data by merging in the given rates data"""
uprated = self.data.merge(rates, on=["Key", "User"], how="left")
uprated["Rate"] = uprated.apply(lambda x: x["Rate"] / EUR2SEK if x["Currency"] == "SEK" else x["Rate"], axis=1)
uprated["Income"] = uprated["Rate"] * uprated["Billable"]
self.data = uprated
[docs] def getUsers(self) -> pd.Series:
"""returns list of users"""
return self.data["User"].drop_duplicates()
[docs] def byGroup(self) -> pd.DataFrame:
"""returns aggregated time and billable time grouped by date, user and group"""
return self.data.groupby(["Date", "User", "Group"], as_index=False)[["Time", "Billable"]].sum()
[docs] def byTimeType(self) -> pd.DataFrame:
"""returns aggregated time and time type grouped by date, user and group"""
newdata = self.data.copy()
newdata["Timetype"] = pd.isna(newdata["Rate"])
newdata["Timetype"] = ["Billable" if not (x) else "Non-billable" for x in newdata["Timetype"]]
newdata["Timetype"] = [
"VeriFriday" if x == "VF" else newdata["Timetype"][idx] for idx, x in enumerate(newdata["Group"])
]
return newdata.groupby(["Date", "Timetype", "Group"], as_index=False)[["Time", "Timetype"]].sum(
numeric_only=True
)
[docs] def byTotalGroup(self, days_back) -> pd.DataFrame:
"""returns aggregated billable time grouped by issue key group and user"""
timed_data = self.data[self.data["Date"] > lookBack(days_back)]
df = timed_data.groupby(["Group", "User"], as_index=False)[["Billable"]].sum()
df["Billable"].replace(0, np.nan, inplace=True)
df.dropna(subset=["Billable"], inplace=True)
return df
[docs] def byEggBaskets(self) -> pd.DataFrame:
"""returns aggregated billable income grouped by issue key group, user and time box (30, 60, 90)"""
baskets = self.data.copy()
baskets["TimeBasket"] = "0"
baskets.loc[baskets["Date"] > lookBack(90), "TimeBasket"] = "60-90 days ago"
baskets.loc[baskets["Date"] > lookBack(60), "TimeBasket"] = "30-60 days ago"
baskets.loc[baskets["Date"] > lookBack(30), "TimeBasket"] = "0-30 days ago"
baskets["TimeBasket"].replace("0", np.nan, inplace=True)
baskets.dropna(subset=["TimeBasket"], inplace=True)
df = baskets.groupby(["Group", "User", "TimeBasket"], as_index=False)[["Income"]].sum()
df["Income"].replace(0, np.nan, inplace=True)
df.dropna(subset=["Income"], inplace=True)
return df
[docs] def byDay(self) -> pd.DataFrame:
"""returns aggregated time and billable time grouped by date, user and issue key"""
return self.data.groupby(["Date", "User", "Key"], as_index=False)[["Time", "Billable"]].sum()
[docs] def firstEntry(self, user, start) -> pd.Timestamp:
if start == "*":
first = self.data[self.data["User"] == user]["Date"].min()
else:
first = start
return pd.Timestamp(first)
[docs] def lastEntry(self, user, stop) -> pd.Timestamp:
if stop == "*":
data = self.data[self.data["Date"] < pd.to_datetime("today")]
last = data[data["User"] == user]["Date"].max()
else:
last = stop
return pd.Timestamp(last)
[docs] def totalHours(self, user, start, stop=None):
data = self.data[self.data["User"] == user]
data = data[data["Date"] >= pd.to_datetime(start)]
if stop is None:
total = data["Time"].sum()
else:
total = data[data["Date"] <= pd.to_datetime(stop)]["Time"].sum()
return total
[docs] def byUser(self, working_hours: pd.DataFrame) -> pd.DataFrame:
"""returns aggregated time and billable time grouped by user"""
user_data = pd.DataFrame()
if not working_hours.empty:
user_data = working_hours[working_hours["Stop"] == "*"]
user_data["Trend"] = 0
user_data["First"] = [self.firstEntry(u, s).date() for u, s in zip(user_data["User"], user_data["Start"])]
user_data["Last"] = [self.lastEntry(u, s).date() for u, s in zip(user_data["User"], user_data["Stop"])]
user_data["Days"] = [weekdays(f, t) for f, t in zip(user_data["First"], user_data["Last"])]
user_data["Expected"] = [days * daily for days, daily in zip(user_data["Days"], user_data["Daily"])]
user_data["Total"] = [
self.totalHours(user, start) for user, start in zip(user_data["User"], user_data["First"])
]
user_data["Delta"] = user_data["Delta"] + [
tot - exp for tot, exp in zip(user_data["Total"], user_data["Expected"])
]
user_data["Last 7 days"] = [
self.totalHours(user, start, stop)
for user, start, stop in zip(
user_data["User"], user_data["Last"] - pd.to_timedelta("6day"), user_data["Last"]
)
]
user_data["Trend"] = [
last_week - 5 * daily for daily, last_week in zip(user_data["Daily"], user_data["Last 7 days"])
]
logging.debug("\n%s", user_data.to_string())
user_data = user_data.drop(["Daily", "Start", "Stop", "First", "Days", "Expected", "Total"], axis="columns")
else:
# Find the first time entry for each user
user_first = self.data.groupby("User", as_index=False)["Date"].min()
# Add a unique column name
user_first.columns = ["User", "First"]
# Convert fime stamp to just date
user_first["First"] = [x.date() for x in user_first["First"]]
# remove all user/dates that are older than First
user_data = pd.merge(self.data, user_first, on="User")
user_data = user_data[user_data["Date"] >= user_data["First"]]
# summarize time and billable
user_data = user_data.groupby("User", as_index=False)[["Time", "Billable"]].sum()
# add the column to the user data
user_data = pd.merge(user_data, user_first, on="User")
# remove today
user_last = self.data[self.data["Date"] < pd.Timestamp("today").strftime("%b %d, %Y")]
user_last = user_last.groupby("User", as_index=False)["Date"].max()
user_last.columns = ["User", "Last"]
user_last["Last"] = [x.date() for x in user_last["Last"]]
user_data = pd.merge(user_data, user_last, on="User")
user_data["Days"] = [weekdays(f, t) for f, t in zip(user_data["First"], user_data["Last"])]
return user_data
[docs] def tableByUser(
self, working_hours, fnTableHeight=None, color_head="paleturquoise", color_cells="lavender"
) -> go.Figure:
table_working_hours = self.byUser(working_hours).sort_values(by="Last").round(2)
if not working_hours.empty:
cell_values = [
table_working_hours["User"],
table_working_hours["Delta"],
table_working_hours["Trend"],
table_working_hours["Last"],
table_working_hours["Last 7 days"],
]
else:
cell_values = [
table_working_hours["User"],
table_working_hours["Time"],
table_working_hours["Billable"],
table_working_hours["First"],
table_working_hours["Last"],
table_working_hours["Days"],
]
fig = go.Figure(
data=[
go.Table(
header=dict(values=list(table_working_hours.columns), fill_color=color_head, align="left"),
cells=dict(
values=cell_values,
fill_color=color_cells,
align="left",
),
)
]
)
fig.update_layout(title="Working Hours")
if fnTableHeight:
fig.update_layout(height=fnTableHeight(table_working_hours))
return fig
# Collect the missing rates and replace with '???'
[docs] def rawRatesTable(self) -> pd.DataFrame:
rate_data = self.data[self.data["Billable"] > 0]
rate_data = rate_data.groupby(["Key", "Rate"], dropna=False, as_index=False).agg(
Hours=("Billable", np.sum), Users=("User", ", ".join)
)
rate_data["Rate"].replace(np.nan, "???", inplace=True)
rate_data["Users"] = rate_data["Users"].str.split(", ").map(set).str.join(", ")
return rate_data
[docs] def ratesTable(self, fnTableHeight=None, color_head="paleturquoise", color_cells="lavender") -> go.Figure:
rate_data = self.rawRatesTable()
fig = go.Figure(
data=[
go.Table(
columnwidth=[50, 50, 50, 400],
header=dict(values=list(rate_data.columns), fill_color=color_head, align="left"),
cells=dict(
values=[rate_data["Key"], rate_data["Rate"], rate_data["Hours"], rate_data["Users"]],
fill_color=color_cells,
align="left",
),
)
]
)
fig.update_layout(title="Rates")
if fnTableHeight:
fig.update_layout(height=fnTableHeight(rate_data))
return fig
# Draw the missing rates table
[docs] def missingRatesTable(self, fnTableHeight=None, color_head="paleturquoise", color_cells="lavender") -> go.Figure:
rate_data = self.rawRatesTable()
rate_data = rate_data[rate_data["Rate"] == "???"]
fig = go.Figure(
data=[
go.Table(
columnwidth=[50, 50, 50, 400],
header=dict(values=list(rate_data.columns), fill_color=color_head, align="left"),
cells=dict(
values=[rate_data["Key"], rate_data["Rate"], rate_data["Hours"], rate_data["Users"]],
fill_color=color_cells,
align="left",
),
)
]
)
fig.update_layout(title="Missing Rates")
if fnTableHeight:
fig.update_layout(height=fnTableHeight(rate_data))
return fig
[docs] def padTheData(self, working_hours: pd.DataFrame) -> None:
"""
creates the self.padded_data padded with zero data
for each User, an entry for the ZP group will be added for each date >= min(Date) && <= max(Date)
Key: ZP-1, Time: 0, Billable: 0, Group: ZP, Internal: 0, Currency: EUR, Rate: 0, Income: 0
"""
self.padded_data = self.data
if not working_hours.empty:
for _, row in working_hours.iterrows():
df_user = pd.DataFrame()
user = row["User"]
if row["Start"] == "*":
start = self.data[self.data["User"] == user]["Date"].min()
else:
start = row["Start"]
if row["Stop"] == "*":
stop = YESTERDAY
else:
stop = row["Stop"]
df_user["Date"] = pd.date_range(start, stop)
df_user["User"] = user
df_user["Key"] = "ZP-1"
df_user["Time"] = 0.0
df_user["Billable"] = 0.0
df_user["Group"] = "ZP"
df_user["Internal"] = 0.0
df_user["Year"] = df_user.loc[:, ("Date")].dt.year
df_user["Currency"] = "EUR"
df_user["Rate"] = 0
df_user["Income"] = 0
self.padded_data = pd.concat([self.padded_data, df_user])
else:
for user in self.data["User"].unique():
df_user = pd.DataFrame()
start = self.data[self.data["User"] == user]["Date"].min()
stop = self.data[self.data["User"] == user]["Date"].max()
df_user["Date"] = pd.date_range(start, stop)
df_user["User"] = user
df_user["Key"] = "ZP-1"
df_user["Time"] = 0.0
df_user["Billable"] = 0.0
df_user["Group"] = "ZP"
df_user["Internal"] = 0.0
df_user["Year"] = df_user.loc[:, ("Date")].dt.year
df_user["Currency"] = "EUR"
df_user["Rate"] = 0
df_user["Income"] = 0
self.padded_data = pd.concat([self.padded_data, df_user])
[docs] def userRolling7(self, to_sum) -> pd.DataFrame:
"""returns rolling 7 day sums for Billable and non Billable time grouped by user"""
daily_sum = self.padded_data.groupby(["Date", "User"], as_index=False)[to_sum].sum()
rolling_sum_7d = (
daily_sum.set_index("Date").groupby(["User"], as_index=False).rolling("7d", min_periods=7)[to_sum].sum()
)
return rolling_sum_7d.reset_index(inplace=False)
[docs] def teamRolling7(self, to_sum) -> pd.DataFrame:
"""returns rolling 7 day sums for Billable and non Billable time grouped by user"""
daily_sum = self.padded_data.groupby(["Date"], as_index=False)[to_sum].sum()
rolling_sum_7d = daily_sum.set_index("Date").rolling("7d", min_periods=7)[to_sum].sum()
return rolling_sum_7d.reset_index(inplace=False)
[docs] def teamRolling7Relative(self, costs: pd.Series) -> pd.DataFrame:
"""returns rolling 7 day sums for Billable and non Billable time grouped by user, relative to the costs"""
daily_sum = self.padded_data.groupby(["Date"], as_index=False)["Income"].sum()
daily_cost = costs
daily_cost["Date"] = daily_cost["Date"].astype("datetime64[M]")
daily_relative = pd.merge(daily_sum, daily_cost, on=["Date"], how="outer")
daily_relative = daily_relative.dropna()
rolling_sum_7d = pd.DataFrame()
rolling_sum_7d["sumIncome"] = daily_relative.set_index("Date").rolling("7d", min_periods=7)["Income"].sum()
rolling_sum_7d["sumExtCost"] = (
daily_relative.set_index("Date").rolling("7d", min_periods=7)["External_cost"].sum()
)
rolling_sum_7d["Diff"] = rolling_sum_7d["sumIncome"] / rolling_sum_7d["sumExtCost"]
return rolling_sum_7d.reset_index(inplace=False)
[docs] def thisYear(self) -> pd.DataFrame:
"""returns a dataFrame with entries for the current year"""
return self.data[self.data["Year"] == float(self.this_year)]
[docs] def lastYear(self) -> pd.DataFrame:
"""returns a dataFrame with entries for the previous year"""
return self.data[self.data["Year"] == float(self.last_year)]
[docs] def getYear(self, year) -> pd.DataFrame:
"""returns a dataFrame with entries for the given year"""
return self.data[self.data["Year"] == float(year)]
[docs] def zeroOutBillableTime(self, keys: pd.DataFrame) -> None:
"""
Sets billable time to zero (0) and actual time to 'internal' for internal project keys
"""
if not keys.empty:
for key in keys["Key"]:
logging.debug("Internal Key: %s", key)
self.data.loc[self.data["Group"] == key, ("Billable")] = 0
self.data.loc[self.data["Group"] == key, ("Internal")] = self.data[self.data["Group"] == key]["Time"]