PVAnalytics QA Process: Temperature#
import pvanalytics
import numpy as np
import rdtools
from statistics import mode
import json
# pvanalytics.__version__
from pvanalytics.features.clearsky import reno #update to just do a pvanalytics import?
import pvlib
import matplotlib.pyplot as plt
import pandas as pd
from pvanalytics.quality import data_shifts as ds
from pvanalytics.quality import gaps
from pvanalytics.quality.outliers import zscore
from pvanalytics.features.daytime import power_or_irradiance
from pvanalytics.quality.time import shifts_ruptures
from pvanalytics.features import daytime
from pvanalytics.system import (is_tracking_envelope,
infer_orientation_fit_pvwatts)
from pvanalytics.features.clipping import geometric
from pvanalytics.features.clearsky import reno
import ruptures as rpt
import os
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams.update({'font.size': 12,
'figure.figsize': [4.5, 3],
'lines.markeredgewidth': 0,
'lines.markersize': 2
})
In the following example, a process for assessing the data quality of a temperature data stream is shown, using PVAnalytics functions. These example pipelines illustrates how several PVAnalytics functions can be used in sequence to assess the quality of a temperature data stream.
First, we download and import the temperature data stream from a PV installation under the 2023 solar data prize data set. This data set is publicly available via the PVDAQ database in the DOE Open Energy Data Initiative (OEDI) (https://data.openei.org/submissions/4568), under system ID 2107. This data is timezone-localized.
with open('./data/2107_system_metadata.json', 'r') as f:
metadata = json.load(f)
tz = metadata['System']['timezone_code']
def load_csv(file_path):
df = pd.read_csv(
file_path,
index_col=0,
parse_dates=True,
)
return df
df_env = load_csv("./data/2107_environment_data.csv")
df_env = df_env.tz_localize(tz, ambiguous=True)
latitude = metadata['Site']['latitude']
longitude = metadata['Site']['longitude']
# Get temperature column and turn it into a series
temp_time_series = df_env['ambient_temperature_o_149575']
# Identify the temperature data stream type (this affects the type of
# checks we do)
data_stream_type = "ambient"
temp_time_series.plot(title="Original Time Series")
plt.xlabel("Date")
plt.ylabel("Temperature")
plt.show()

Run Basic Data Checks#
Now, let’s run basic data checks to identify stale and abnormal/outlier data in the time series. Basic data checks include the following steps:
Flatlined/stale data periods (pvanalytics.quality.gaps.stale_values_round()).
“Abnormal” data periods, which are out of the temperature limits of -40 to 185 deg C. Additional checks based on thresholds are applied depending on the type of temperature sensor (ambient or module) (pvanalytics.quality.weather.temperature_limits()).
Outliers, which are defined as more than one 4 standard deviations away from the mean (pvanalytics.quality.outliers.zscore()).
Additionally, we identify the units of the temperature stream as either Celsius or Fahrenheit.
# REMOVE STALE DATA
stale_data_mask = gaps.stale_values_round(temp_time_series,
window=3,
decimals=2, mark = "tail")
# FIND ABNORMAL PERIODS
temperature_limit_mask = pvanalytics.quality.weather.temperature_limits(
temp_time_series, limits=(-40, 185))
temperature_limit_mask = temperature_limit_mask.reindex(
index=temp_time_series.index,
method='ffill',
fill_value=False)
# FIND OUTLIERS (Z-SCORE FILTER)
zscore_outlier_mask = zscore(temp_time_series,
zmax=4,
nan_policy='omit')
# PERFORM ADDITIONAL CHECKS, INCLUDING CHECKING UNITS (CELSIUS OR FAHRENHEIT)
temperature_mean = temp_time_series.mean()
if temperature_mean > 35:
temp_units = 'F'
else:
temp_units = 'C'
print("Estimated Temperature units: " + str(temp_units))
# Run additional checks based on temperature sensor type.
if data_stream_type == 'module':
if temp_units == 'C':
module_limit_mask = (temp_time_series <= 85)
temperature_limit_mask = (temperature_limit_mask & module_limit_mask)
if data_stream_type == 'ambient':
ambient_limit_mask = pvanalytics.quality.weather.temperature_limits(
temp_time_series, limits=(-40, 120))
temperature_limit_mask = (temperature_limit_mask & ambient_limit_mask)
if temp_units == 'C':
ambient_limit_mask_2 = (temp_time_series <= 50)
temperature_limit_mask = (temperature_limit_mask &
ambient_limit_mask_2)
# Get the percentage of data flagged for each issue, so it can later be logged
pct_stale = round((len(temp_time_series[
stale_data_mask].dropna())/len(temp_time_series.dropna())*100), 1)
pct_erroneous = round((len(temp_time_series[
~temperature_limit_mask].dropna())/len(temp_time_series.dropna())*100), 1)
pct_outlier = round((len(temp_time_series[
zscore_outlier_mask].dropna())/len(temp_time_series.dropna())*100), 1)
# Visualize all of the time series issues (stale, abnormal, outlier)
temp_time_series.plot()
labels = ["Temperature"]
if any(stale_data_mask):
temp_time_series.loc[stale_data_mask].plot(ls='',
marker='o',
color="green")
labels.append("Stale")
if any(~temperature_limit_mask):
temp_time_series.loc[~temperature_limit_mask].plot(ls='',
marker='o',
color="yellow")
labels.append("Abnormal")
if any(zscore_outlier_mask):
temp_time_series.loc[zscore_outlier_mask].plot(ls='',
marker='o',
color="purple")
labels.append("Outlier")
plt.legend(labels=labels)
plt.title("Time Series Labeled for Basic Issues")
plt.xticks(rotation=20)
plt.xlabel("Date")
plt.ylabel(f"Temperature ({temp_units})")
plt.tight_layout()
plt.show()
Estimated Temperature units: F

Now, let’s filter out any of the flagged data from the basic temperature checks (stale or abnormal data).
# Filter the time series, taking out all of the issues
issue_mask = ((~stale_data_mask) & (temperature_limit_mask) &
(~zscore_outlier_mask))
temp_time_series = temp_time_series[issue_mask].copy()
Daily Completeness Check#
We filter the time series based on its daily completeness score. This filtering scheme requires at least 25% of data to be present for each day to be included.
# Get the time frequency of the time series
freq_minutes = mode(temp_time_series.index.to_series().diff().dt.seconds / 60)
data_freq = str(freq_minutes) + "min"
temp_time_series = temp_time_series.asfreq(data_freq)
# Visualize daily data completeness
# Data frequency chaged to 60T due to duplicated data for every hr
# data_completeness_score = gaps.completeness_score(temp_time_series.asfreq("60T"))
data_completeness_score = gaps.completeness_score(temp_time_series)
# Visualize data completeness score as a time series.
data_completeness_score.plot()
plt.xlabel("Date")
plt.ylabel("Daily Completeness Score (Fractional)")
plt.axhline(y=0.25, color='r', linestyle='-',
label='Daily Completeness Cutoff')
plt.legend()
plt.tight_layout()
plt.show()
# Trim the series based on daily completeness score
trim_series_mask = pvanalytics.quality.gaps.trim_incomplete(
temp_time_series,
minimum_completeness=.25,
freq=data_freq)
temp_time_series = temp_time_series[trim_series_mask]

Data/Capacity Shifts#
Next, we check the time series for any abrupt data shifts. We take the longest continuous part of the time series that is free of data shifts. We use pvanalytics.quality.data_shifts.detect_data_shifts() to detect data shifts in the time series.
# Resample the time series to daily mean
temp_time_series_daily = temp_time_series.resample('D').mean()
data_shift_start_date, data_shift_end_date = \
ds.get_longest_shift_segment_dates(temp_time_series_daily)
data_shift_period_length = (data_shift_end_date -
data_shift_start_date).days
# Get the number of shift dates
data_shift_mask = ds.detect_data_shifts(temp_time_series_daily)
# Get the shift dates
shift_dates = list(temp_time_series_daily[data_shift_mask].index)
if len(shift_dates) > 0:
shift_found = True
else:
shift_found = False
# Visualize the time shifts for the daily time series
print("Shift Found: ", shift_found)
edges = ([temp_time_series_daily.index[0]] + shift_dates +
[temp_time_series_daily.index[-1]])
fig, ax = plt.subplots()
for (st, ed) in zip(edges[:-1], edges[1:]):
ax.plot(temp_time_series_daily.loc[st:ed])
plt.title("Daily Time Series Labeled for Data Shifts")
plt.xlabel("Date")
plt.ylabel(f"Temperature ({temp_units})")
plt.show()
Shift Found: False

Final Filtered Temperature Series#
Finally, we filter the time series to only include the longest shift-free period. We then visualize the final time series post-QA filtering.
temp_time_series = temp_time_series[
(temp_time_series.index >=
data_shift_start_date.tz_convert(temp_time_series.index.tz)) &
(temp_time_series.index <=
data_shift_end_date.tz_convert(temp_time_series.index.tz))]
# Plot the final filtered time series.
temp_time_series.plot(title="Final Filtered Time Series")
plt.xlabel("Date")
plt.ylabel(f"Temperature ({temp_units})")
plt.show()
temp_time_series.to_pickle("./data/ambient_temperature_o_149575.pkl")

Generate a dictionary output for the QA assessment of this data stream, including the percent stale and erroneous data detected, any shift dates, and the detected temperature units for the data stream.
qa_check_dict = {"temperature_units": temp_units,
"pct_stale": pct_stale,
"pct_erroneous": pct_erroneous,
"pct_outlier": pct_outlier,
"data_shifts": shift_found,
"shift_dates": shift_dates}
print("QA Results:")
print(qa_check_dict)
QA Results:
{'temperature_units': 'F', 'pct_stale': 75.4, 'pct_erroneous': 0.0, 'pct_outlier': 0.0, 'data_shifts': False, 'shift_dates': []}