Source code for ladybug.sql

# coding=utf-8
"""Module for parsing EnergyPlus SQLite result files into Ladybug DataCollections"""
from __future__ import division

import os
import sqlite3
from collections import OrderedDict

import ladybug.datatype
from ladybug.datatype.generic import GenericType
from .dt import DateTime, datetime
from .location import Location
from .analysisperiod import AnalysisPeriod
from .header import Header
from .datacollection import HourlyContinuousCollection, DailyCollection, \
    MonthlyCollection


[docs]class SQLiteResult(object): """Object for parsing EnergyPlus SQLite result files into Ladybug DataCollections. Args: file_path: Full path to an SQLite file that was generated by EnergyPlus. Properties: * file_path * location * reporting_frequency * run_periods * run_period_names * run_period_indices * available_outputs * available_outputs_info * zone_cooling_sizes * zone_heating_sizes * component_sizes * component_types """ _interval_codes = ('Timestep', 'Hourly', 'Daily', 'Monthly', 'Annual', 'Annual') def __init__(self, file_path): """Initialize SQLiteResult""" assert os.path.isfile(file_path), 'No file was found at {}'.format(file_path) assert file_path.endswith(('.sql', '.db', '.sqlite')), \ '{} is not an SQL file ending in .sql or .db.'.format(file_path) self._file_path = file_path # values to be computed as soon as they are requested self._location = None self._reporting_frequency = None self._run_periods = None self._run_period_names = None self._run_period_indices = None self._available_outputs = None self._available_outputs_info = None self._zone_cooling_sizes = None self._zone_heating_sizes = None self._component_sizes = None @property def file_path(self): """Get the path to the .sql file.""" return self._file_path @property def location(self): """Get a Ladybug Location object derived from the SQL data. This will be None if there was no AllSummary report in the SQL file. """ if not self._location: self._extract_location() return self._location @property def reporting_frequency(self): """Get text for the output reporting frequency. Will be one of the following. * Annual * Monthly * Daily * Hourly * An integer for the number of steps per hour """ if not self._reporting_frequency: self._extract_available_outputs() if 'Timestep' in self._reporting_frequency: # get timesteps per hour self._reporting_frequency = self._extract_timestep() return self._reporting_frequency @property def run_periods(self): """Get an array of Ladybug AnalysisPeriod objects for the simulation run periods. This will be None if there was no AllSummary report in the SQL file. """ if not self._run_periods: self._extract_full_run_periods() return tuple(self._run_periods) @property def run_period_names(self): """Get an array of text for the names of the run periods. These will align with the run_periods property. This will be None if there was no AllSummary report in the SQL file. """ if not self._run_period_names: self._extract_full_run_periods() return tuple(self._run_period_names) @property def run_period_indices(self): """Get an array integers used to identify the run periods in the time table. These will align with the run_periods property. """ if not self._run_period_indices: self._extract_full_run_period_indices() return self._run_period_indices @property def available_outputs(self): """Get a list of strings for available timeseries outputs that can be requested. Any of these outputs when input to data_collections_by_output_name will yield a result with data collections. """ if not self._available_outputs: self._extract_available_outputs() return self._available_outputs @property def available_outputs_info(self): """Get a list of dictionaries with outputs within the SQL and their metadata. Each dictionary is formatted with the keys below. .. code-block:: python { "output_name": "Zone Ideal Loads Supply Air Total Cooling Energy", "object_type": "Zone", "units": "kWh", "data_type ": Energy # this is a ladybug DataType object } """ if not self._available_outputs_info: self._extract_available_outputs() return self._available_outputs_info @property def zone_cooling_sizes(self): """Get a list of ZoneSize objects for all conditioned zones in the model. Each ZoneSize object contains several properties regarding the outcome of the cooling sizing calculation. """ if not self._zone_cooling_sizes: self._zone_cooling_sizes = self._extract_zone_sizes('Cooling') return self._zone_cooling_sizes @property def zone_heating_sizes(self): """Get a list of ZoneSize objects for all conditioned zones in the model. Each ZoneSize object contains several properties regarding the outcome of the heating sizing calculation. """ if not self._zone_heating_sizes: self._zone_heating_sizes = self._extract_zone_sizes('Heating') return self._zone_heating_sizes @property def component_sizes(self): """Get a list of ComponentSize objects for all HVAC components in the results. Each ComponentSize object contains several properties regarding the outcome of the sizing calculation. """ if not self._component_sizes: self._component_sizes = self._extract_component_sizes() return self._component_sizes @property def component_types(self): """Get a list of text for all component types contained in the results.""" if not self._component_sizes: self._component_sizes = self._extract_component_sizes() _comp_types = set() for comp in self._component_sizes: _comp_types.add(comp.component_type) return list(_comp_types)
[docs] def component_sizes_by_type(self, component_type): """Get a list of ComponentSize objects for a specific type of HVAC component. This can be much faster than using the component_sizes property when there are a lot of components in the model and only one type of component is needed. Args: component_type: Text for the type of component to be retrieved. (eg. 'ZoneHVAC:IdealLoadsAirSystem') """ return self._extract_component_sizes(component_type)
[docs] def data_collections_by_output_name(self, output_name): """Get an array of Ladybug DataCollections for a specified output. Args: output_name: The name of an EnergyPlus output to be retrieved from the SQLite result file. This can also be an array of output names for which all data collections should be retrieved. Returns: An array of data collections of the requested output type. This will be an empty list if no output of the requested name was found in the file. """ conn = sqlite3.connect(self.file_path) try: # extract all indices in the ReportDataDictionary with the output_name c = conn.cursor() cols = 'ReportDataDictionaryIndex, IndexGroup, KeyValue, Name, ' \ 'ReportingFrequency, Units' if isinstance(output_name, str): # assume it's a single output query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols) c.execute(query, (output_name,)) elif len(output_name) == 1: # assume it's a list query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols) c.execute(query, (output_name[0],)) else: # assume it is a list of outputs c.execute('SELECT {} FROM ReportDataDictionary WHERE Name IN {}'.format( cols, tuple(output_name))) header_rows = c.fetchall() # if nothing was found, return an empty list if len(header_rows) == 0: conn.close() # ensure connection is always closed return [] # remove any data not of the same frequency freq = header_rows[0][4] header_rows = [row for row in header_rows if row[4] == freq] # extract all data of the relevant type from ReportData rel_indices = tuple(row[0] for row in header_rows) if len(rel_indices) == 1: c.execute('SELECT Value, TimeIndex FROM ReportData WHERE ' 'ReportDataDictionaryIndex=? ORDER BY ' 'TimeIndex', rel_indices) else: c.execute('SELECT Value, TimeIndex FROM ReportData WHERE ' 'ReportDataDictionaryIndex IN {} ORDER BY ' 'TimeIndex'.format(rel_indices)) data = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) # get the analysis period and the reporting frequency from the time table st_time, end_time = data[0][1], data[-1][1] run_period, report_frequency, mult = self._extract_run_period(st_time, end_time) if mult: # there are multiple analysis periods; get them all run_period = self._extract_all_run_period( report_frequency, run_period.timestep, run_period.is_leap_year) # create the header objects to be used for the resulting data collections units = header_rows[0][-1] if header_rows[0][-1] != 'J' else 'kWh' data_type, units = self._data_type_from_unit(units, header_rows[0][3]) meta_datas = [] for row in header_rows: obj_type = row[1] if 'Surface' not in output_name else 'Surface' meta_datas.append({'type': row[3], obj_type: row[2]}) headers = [] if report_frequency == 'Annual': pass elif isinstance(run_period, list): # multiple run periods for runper in run_period: for m_data in meta_datas: headers.append(Header(data_type, units, runper, m_data)) else: # just one run period for m_data in meta_datas: headers.append(Header(data_type, units, run_period, m_data)) # format the data such that we have one list for each of the header rows if isinstance(run_period, list): # multiple run periods if report_frequency == 'Monthly': chunks = [len(runper.months_int) for runper in run_period] elif report_frequency == 'Daily': chunks = [len(runper.doys_int) for runper in run_period] else: chunks = [len(runper) for runper in run_period] if units == 'kWh': all_values = self._partition_and_convert_timeseries_chunks(data, chunks) else: all_values = self._partition_timeseries_chunks(data, chunks) else: # just one run period n_lists = len(header_rows) if units == 'kWh': all_values = self._partition_and_convert_timeseries(data, n_lists) else: all_values = self._partition_timeseries(data, n_lists) # create the final data collections data_colls = [] if report_frequency == 'Hourly' or isinstance(report_frequency, int): for head, values in zip(headers, all_values): data_colls.append(HourlyContinuousCollection(head, values)) elif report_frequency == 'Daily': for head, values in zip(headers, all_values): data_colls.append(DailyCollection( head, values, head.analysis_period.doys_int)) elif report_frequency == 'Monthly': for head, values in zip(headers, all_values): data_colls.append(MonthlyCollection( head, values, head.analysis_period.months_int)) else: # Annual data; just return the values as they are return [val[0] for val in all_values] # ensure all imported data gets marked as valid; this increases speed elsewhere for data in data_colls: data._validated_a_period = True return data_colls
[docs] def data_collections_by_output_name_run_period(self, output_name, run_period_index): """Get an array of Ladybug DataCollections for an output and a run period index. Args: output_name: The name of an EnergyPlus output to be retrieved from the SQLite result file as a string. run_period_index: An integer taken from the run_period_indices property of this object, which will be used to select out data collections for just one run period in the SQL file. Returns: An array of data collections of the requested output type. This will be an empty list if no output of the requested name was found in the file. """ conn = sqlite3.connect(self.file_path) try: # extract all indices in the ReportDataDictionary with the output_name c = conn.cursor() cols = 'ReportDataDictionaryIndex, IndexGroup, KeyValue, Name, ' \ 'ReportingFrequency, Units' query = 'SELECT {} FROM ReportDataDictionary WHERE Name=?'.format(cols) c.execute(query, (output_name,)) header_rows = c.fetchall() # if nothing was found, return an empty list if len(header_rows) == 0: conn.close() # ensure connection is always closed return [] # remove any data not of the same frequency freq = header_rows[0][4] header_rows = [row for row in header_rows if row[4] == freq] # extract all data of the relevant type from ReportData rel_indices = tuple(row[0] for row in header_rows) query = 'SELECT ReportData.Value, ReportData.TimeIndex ' \ 'FROM ReportData ' \ 'INNER JOIN Time ON ReportData.TimeIndex=Time.TimeIndex ' \ 'WHERE ReportData.ReportDataDictionaryIndex IN {} AND ' \ 'Time.EnvironmentPeriodIndex=?'.format(rel_indices) c.execute(query, (run_period_index,)) data = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) # get the analysis period and the reporting frequency from the time table st_time, end_time = data[0][1], data[-1][1] run_period, report_frequency, mult = self._extract_run_period(st_time, end_time) # create the header objects to be used for the resulting data collections units = header_rows[0][-1] if header_rows[0][-1] != 'J' else 'kWh' data_type, units = self._data_type_from_unit(units, header_rows[0][3]) headers = [] for row in header_rows: obj_type = row[1] if 'Surface' not in output_name else 'Surface' m_data = {'type': row[3], obj_type: row[2]} headers.append(Header(data_type, units, run_period, m_data)) # format the data such that we have one list for each of the header rows all_values = self._partition_and_convert_timeseries(data, len(header_rows)) if \ units == 'kWh' else self._partition_timeseries(data, len(header_rows)) # create the final data collections data_colls = [] if report_frequency == 'Hourly' or isinstance(report_frequency, int): for head, values in zip(headers, all_values): data_colls.append(HourlyContinuousCollection(head, values)) elif report_frequency == 'Daily': for head, values in zip(headers, all_values): data_colls.append(DailyCollection( head, values, head.analysis_period.doys_int)) elif report_frequency == 'Monthly': for head, values in zip(headers, all_values): data_colls.append(MonthlyCollection( head, values, head.analysis_period.months_int)) else: # Annual data; just return the values as they are return all_values # ensure all imported data gets marked as valid; this increases speed elsewhere for data in data_colls: data._validated_a_period = True return data_colls
[docs] def tabular_data_by_name(self, table_name, j_to_kwh=True, report_name=None): """Get all the data within a table of a Summary Report using the table name. Args: table_name: Text string for the name of a table within a summary report. (eg. 'General'). j_to_kwh: Boolean to note if any data in MJ or GJ should be converted to kWh upon import of the table. This will also mean that any area-normalized values will also be converted to kWh/m2. report_name: An optional text string to indicate the report name from which the table should be pulled. This is useful in cases where tables have the same name in different reports. If None, data from all available tables will be returned. (Default: None). Returns: An ordered dictionary representing a matrix (list of lists), where the keys of the dictionary represent the row names and each value is a row of the table. The output should mirror how the table appears in the HTML output. """ conn = sqlite3.connect(self.file_path) try: # get the cursor and list of fields to be extracted c = conn.cursor() fields_to_extract = ['RowName', 'Value'] if j_to_kwh: fields_to_extract.append('Units') fields_to_extract_str = ', '.join(fields_to_extract) # extract the fields using a query if report_name is None: query_str = 'SELECT %s FROM TabularDataWithStrings ' \ 'WHERE TableName=?' % fields_to_extract_str c.execute(query_str, (table_name,)) else: query_str = 'SELECT %s FROM TabularDataWithStrings ' \ 'WHERE TableName=? AND ReportName=?' % fields_to_extract_str c.execute(query_str, (table_name, report_name)) table_data = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) # convert all of the extracted data into a tabular format table_dict = OrderedDict() if j_to_kwh: for item in table_data: try: val = float(item[1]) if 'GJ' in item[2]: val = val / 0.0036 elif 'MJ' in item[2]: val = val / 3.6 except ValueError: # not a number val = item[1] try: table_dict[item[0]].append(val) except KeyError: table_dict[item[0]] = [val] else: for item in table_data: try: val = float(item[1]) except ValueError: # not a number val = item[1] try: table_dict[item[0]].append(val) except KeyError: table_dict[item[0]] = [val] return table_dict
[docs] def tabular_column_names(self, table_name, report_name=None): """Get the names of the columns for a table of a Summary Report. Args: table_name: Text string for the name of a table within a summary report. (eg. 'General'). report_name: An optional text string to indicate the report name from which the table should be pulled. This is useful in cases where tables have the same name in different reports. If None, data from all available tables will be returned. (Default: None). Returns: A list of the column names of the table """ conn = sqlite3.connect(self.file_path) try: # extract the data from the General table in AllSummary c = conn.cursor() if report_name is None: c.execute('SELECT ColumnName FROM TabularDataWithStrings ' 'WHERE TableName=?', (table_name,)) else: c.execute( 'SELECT ColumnName FROM TabularDataWithStrings ' 'WHERE TableName=? AND ReportName=?', (table_name, report_name) ) table_col_names = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) return list(OrderedDict.fromkeys([item[0] for item in table_col_names]))
def _extract_location(self): """Extract a Location object from the SQLite file.""" # extract all of the data from the General table in AllSummary table_dict = self.tabular_data_by_name('General', False) general = list(table_dict.values()) if general == []: return # convert the extracted data into a Location object split_id = general[2][0].split(' ') city = ' '.join(split_id[:-2]) source = split_id[-2] station_id = split_id[-1].split('=')[-1] self._location = Location( city=city, source=source, station_id=station_id, latitude=general[3][0], longitude=general[4][0], time_zone=general[6][0], elevation=general[5][0]) def _extract_full_run_periods(self): """Extract all of the RunPeriod objects from the SQLite file.""" # extract all of the data from the General table in AllSummary table_dict = self.tabular_data_by_name('Environment', False) sim_env = list(table_dict.values()) if sim_env == []: return # convert the extracted data into a AnalysisPeriod objects self._run_periods = [] self._run_period_names = [] for row in sim_env: st_date = [int(digit) for digit in row[2].split('/')] end_date = [int(digit) for digit in row[3].split('/')] if len(st_date) == 3: # it's a true run period lp_yr = True if st_date[-1] % 4 == 0 else False aper = AnalysisPeriod(st_date[0], st_date[1], 0, end_date[0], end_date[1], 23, is_leap_year=lp_yr) else: # it's a design day period aper = AnalysisPeriod(st_date[0], st_date[1], 0, end_date[0], end_date[1], 23) self._run_periods.append(aper) self._run_period_names.append(row[0]) def _extract_full_run_period_indices(self): """Extract all RunPeriod indices from the Time table of the SQLite file.""" conn = sqlite3.connect(self.file_path) try: # extract all of the data from the Time table c = conn.cursor() c.execute('SELECT EnvironmentPeriodIndex FROM Time ' 'GROUP BY EnvironmentPeriodIndex') e_periods = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) self._run_period_indices = tuple(ind[0] for ind in e_periods) def _extract_available_outputs(self): """Extract the list of all available outputs from the SQLite file.""" conn = sqlite3.connect(self.file_path) try: # extract all indices in the ReportDataDictionary c = conn.cursor() c.execute('SELECT Name, IndexGroup, Units, ReportingFrequency ' 'FROM ReportDataDictionary') outputs = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) unique_outputs = set(outputs) self._available_outputs = tuple(outp[0] for outp in unique_outputs) self._available_outputs_info = [] for outp in unique_outputs: outp_dict = {} outp_dict['output_name'] = outp[0] outp_dict['object_type'] = outp[1] outp_dict['units'] = outp[2] if outp[2] != 'J' else 'kWh' outp_dict['data_type'], outp_dict['units'] = \ self._data_type_from_unit(outp_dict['units'], outp[0]) self._available_outputs_info.append(outp_dict) self._reporting_frequency = outp[3] def _extract_timestep(self): """Extract an integer for the timestep of the data. This is done by checking the first entry within the Time table. """ conn = sqlite3.connect(self.file_path) try: # extract the start and end times from the Time table c = conn.cursor() c.execute('SELECT Interval FROM Time') min_per_step = c.fetchone() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) return int(60 / min_per_step[0]) def _extract_zone_sizes(self, load_type): """Get all of the ZoneSize objects of a certain load type. Args: load_type: Text for the type of load to retrieve. This must be either 'Cooling' or 'Heating'. """ conn = sqlite3.connect(self.file_path) try: # extract the data from the ZoneSizes table c = conn.cursor() c.execute('SELECT * FROM ZoneSizes WHERE LoadType=?', (load_type,)) table_data = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) return [ZoneSize(table_row) for table_row in table_data] def _extract_component_sizes(self, component_type=None): """Get all of the ComponentSize objects of a certain type in the model. Args: component_type: Text for the type of component to be retrieved. (eg. 'ZoneHVAC:IdealLoadsAirSystem') """ conn = sqlite3.connect(self.file_path) try: # extract the data from the ZoneSizes table c = conn.cursor() if component_type: c.execute('SELECT * FROM ComponentSizes WHERE CompType=?', (component_type,)) else: c.execute('SELECT * FROM ComponentSizes') table_data = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) # group the rows by component name table_dict = {} for prop in table_data: comp_name = prop[2] try: table_dict[comp_name].append(prop) except KeyError: table_dict[comp_name] = [prop] # create the ComponentSize objects return [ComponentSize(table_rows) for table_rows in table_dict.values()] def _extract_run_period(self, st_time, end_time): """Extract the run period object and frequency from the SQLite file. Args: st_time: Index for the start time of the data. end_time: Index for the end time of the data. Returns: A tuple with run_period, reporting_frequency, and a boolean for whether the data was for a design day. """ conn = sqlite3.connect(self.file_path) try: # extract the start and end times from the Time table query_str = 'SELECT Year, Month, Day, Interval, IntervalType, ' \ 'EnvironmentPeriodIndex FROM Time WHERE TimeIndex=?' c = conn.cursor() c.execute(query_str, (st_time,)) start = c.fetchone() c.execute(query_str, (end_time,)) end = c.fetchone() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) # check whether the data was for a design day multiple_period = True if start[5] != end[5] else False # set the reporting frequency by the interval type interval_typ = start[4] if interval_typ <= 1: min_per_step = start[3] aper_timestep = int(60 / min_per_step) reporting_frequency = aper_timestep else: reporting_frequency = self._interval_codes[interval_typ] aper_timestep = 1 min_per_step = 60 # convert the extracted data into an AnalysisPeriod object leap_year = True if end[0] != 0 and end[0] % 4 == 0 else False if reporting_frequency == 'Annual': return None, reporting_frequency, multiple_period if reporting_frequency == 'Monthly': st_date = DateTime(start[1], 1, 0) else: st_date = DateTime(start[1], start[2], 0) end_date = DateTime(end[1], end[2], 0) end_date = end_date.add_minute(1440 - min_per_step) run_period = AnalysisPeriod( st_date.month, st_date.day, st_date.hour, end_date.month, end_date.day, end_date.hour, aper_timestep, leap_year) return run_period, reporting_frequency, multiple_period def _extract_all_run_period(self, reporting_frequency, timestep, leap_year): """Extract all run period objects the Time table in the SQLite file. Args: reporting_frequency: Text for the reporting frequency of the data. timestep: Integer to note the timestep of the analysis periods. By the time this function is running, this value should have been gotten from the _extract_run_period method leap_year: Boolean to note whether the analysis periods are for a leap year. Returns: A list of AnalysisPeriods for all periods that could be obtained from the Time table. """ conn = sqlite3.connect(self.file_path) try: # extract all of the data from the Time table c = conn.cursor() c.execute('SELECT Month, Day, EnvironmentPeriodIndex FROM Time') timeseries = c.fetchall() conn.close() # ensure connection is always closed except Exception as e: conn.close() # ensure connection is always closed raise Exception(str(e)) min_per_step = int(60 / timestep) # extract information about the first run period if reporting_frequency == 'Monthly': st_date = DateTime(timeseries[0][0], 1, 0) else: st_date = DateTime(timeseries[0][0], timeseries[0][1], 0) env_period = timeseries[0][2] # build up the analysis period objects run_periods = [] for i, time_row in enumerate(timeseries): if time_row[2] != env_period: # start of a new run period # create the run period end = timeseries[i - 1] end_date = DateTime(end[0], end[1], 0) end_date = end_date.add_minute(1440 - min_per_step) run_period = AnalysisPeriod( st_date.month, st_date.day, st_date.hour, end_date.month, end_date.day, end_date.hour, timestep, leap_year) run_periods.append(run_period) # reset the tracking variables if reporting_frequency == 'Monthly': st_date = DateTime(time_row[0], 1, 0) else: st_date = DateTime(time_row[0], time_row[1], 0) env_period = time_row[2] # create the last run period object and return all run periods end_date = DateTime(time_row[0], time_row[1], 0) end_date = end_date.add_minute(1440 - min_per_step) run_period = AnalysisPeriod( st_date.month, st_date.day, st_date.hour, end_date.month, end_date.day, end_date.hour, timestep, leap_year) run_periods.append(run_period) return run_periods @staticmethod def _data_type_from_unit(from_unit, data_name=''): """Get a Ladybug DataType object instance from a unit abbreviation. The returned object will be the base type (eg. Temperature, Energy, etc.). """ if from_unit == '': # dimensionless data type return ladybug.datatype.TYPESDICT['Fraction'](), 'fraction' for key in ladybug.datatype.UNITS: if from_unit in ladybug.datatype.UNITS[key]: return ladybug.datatype.TYPESDICT[key](), from_unit # no units are specified; the values are dimensionless or fractional return GenericType(data_name, from_unit), from_unit @staticmethod def _partition_timeseries(data, n_lists): """Partition timeseries data that has been retrieved from the SQL file. Args: n_lists: An integer for the number of lists to partition the data into. """ all_values = [] for i in range(0, len(data), n_lists): all_values.append([val[0] for val in data[i:i + n_lists]]) return zip(*all_values) @staticmethod def _partition_and_convert_timeseries(data, n_lists): """Partition data that retrieved from the SQL file + convert it to kWh. Args: n_lists: An integer for the number of lists to partition the data into. """ all_values = [] for i in range(0, len(data), n_lists): all_values.append([val[0] / 3600000. for val in data[i:i + n_lists]]) return zip(*all_values) @staticmethod def _partition_timeseries_chunks(data, chunks): """Partition timeseries data based on a chunking pattern. Args: chunks: A list of integers for the chunking pattern (eg. [24, 24, 8760]). """ n_lists = int(len(data) / sum(chunks)) zero_cum_chunks = [0] + SQLiteResult._accumulate(chunks) all_values = [] for j, chunk in enumerate(chunks): start = zero_cum_chunks[j] * n_lists day_vals = [] for i in range(0, chunk * n_lists, n_lists): day_vals.append([val[0] for val in data[start + i:start + i + n_lists]]) all_values.extend(zip(*day_vals)) return all_values @staticmethod def _partition_and_convert_timeseries_chunks(data, chunks): """Partition timeseries data based on a chunking pattern + convert it to kWh. Args: chunks: A list of integers for the chunking pattern (eg. [24, 24, 8760]). """ n_lists = int(len(data) / sum(chunks)) zero_sum_chunks = [0] + SQLiteResult._accumulate(chunks) all_values = [] for j, chunk in enumerate(chunks): start = zero_sum_chunks[j] * n_lists day_vals = [] for i in range(0, chunk * n_lists, n_lists): day_vals.append([val[0] / 3600000. for val in data[start + i:start + i + n_lists]]) all_values.extend(zip(*day_vals)) return all_values @staticmethod def _accumulate(chunks): """Cumulatively sum a list of numbers.""" cum_chunks = [] total = 0 for x in chunks: total += x cum_chunks.append(total) return cum_chunks
[docs] def ToString(self): """Overwrite .NET ToString.""" return self.__repr__()
def __repr__(self): return 'Energy SQLiteResult: {}'.format(self.file_path)
[docs]class ZoneSize(object): """Object for holding the sizing results of an individual zone. Args: sql_table_row: A list that represents a row of the SQLite ZoneSizes table. This row contains all of the sizing information for a single conditioned zone (either heating or cooling). Properties: * zone_name * load_type * calculated_design_load * final_design_load * calculated_design_flow * final_design_flow * design_day_name * peak_date_time * peak_temperature * peak_humidity_ratio * peak_outdoor_air_flow """ __slots__ = ('_zone_name', '_load_type', '_calculated_design_load', '_final_design_load', '_calculated_design_flow', '_final_design_flow', '_design_day_name', '_peak_date_time', '_peak_temperature', '_peak_humidity_ratio', '_peak_outdoor_air_flow') def __init__(self, sql_table_row): """Initialize ZoneSize""" self._zone_name = str(sql_table_row[1]) self._load_type = str(sql_table_row[2]) self._calculated_design_load = sql_table_row[3] self._final_design_load = sql_table_row[4] self._calculated_design_flow = sql_table_row[5] self._final_design_flow = sql_table_row[6] self._design_day_name = str(sql_table_row[7]) self._peak_temperature = sql_table_row[9] self._peak_humidity_ratio = sql_table_row[10] self._peak_outdoor_air_flow = sql_table_row[11] try: date_str = sql_table_row[8] if '24:00:00' not in sql_table_row[8] else \ sql_table_row[8].replace('24:00:00', '00:00:00') pyd = datetime.strptime(date_str, '%m/%d %H:%M:%S') self._peak_date_time = DateTime(pyd.month, pyd.day, pyd.hour, pyd.minute) except Exception: # likely a zone with no cooling; a peak value of 0 self._peak_date_time = None
[docs] @classmethod def from_dict(cls, data): """Create a ZoneSize from a dictionary. Args: data: ZoneSize dictionary following the format below. .. code-block:: python { "type": "ZoneSize", "zone_name": str, "load_type": str, "calculated_design_load": float, "final_design_load": float, "calculated_design_flow": float, "final_design_flow": float, "design_day_name": str, "peak_date_time": str, "peak_temperature": float, "peak_humidity_ratio": float, "peak_outdoor_air_flow": float } """ assert data['type'] == 'ZoneSize', \ 'Expected ZoneSize. Got {}.'.format(data['type']) return cls( [None, data['zone_name'], data['load_type'], data['calculated_design_load'], data['final_design_load'], data['calculated_design_flow'], data['final_design_flow'], data['design_day_name'], data['peak_date_time'], data['peak_temperature'], data['peak_humidity_ratio'], data['peak_outdoor_air_flow']])
[docs] def to_dict(self): """Get ZoneSize as a dictionary.""" pk_time = None if self.peak_date_time is None else \ self.peak_date_time.strftime('%m/%d %H:%M:%S') return { 'type': 'ZoneSize', 'zone_name': self.zone_name, 'load_type': self.load_type, 'calculated_design_load': self.calculated_design_load, 'final_design_load': self.final_design_load, 'calculated_design_flow': self.calculated_design_flow, 'final_design_flow': self.final_design_flow, 'design_day_name': self.design_day_name, 'peak_date_time': pk_time, 'peak_temperature': self.peak_temperature, 'peak_humidity_ratio': self.peak_humidity_ratio, 'peak_outdoor_air_flow': self.peak_outdoor_air_flow }
@property def zone_name(self): """Get the name of the zone to which this sizing information corresponds.""" return self._zone_name @property def load_type(self): """Get a text string that is either "Cooling" or "Heating".""" return self._load_type @property def calculated_design_load(self): """Get the peak load of the Zone computed by the EnergyPlus sizing calculation. Values are always in Watts. """ return self._calculated_design_load @property def final_design_load(self): """Get the peak load of the Zone that is ultimately used to size the equipment. Values are always in Watts. This accounts for the heating_factor and cooling_factor of the specified in the SizingParameter of the SimulationParameter object. """ return self._final_design_load @property def calculated_design_flow(self): """Get the peak flow of the Zone computed by the EnergyPlus sizing calculation. Values are always in m3/s. """ return self._calculated_design_flow @property def final_design_flow(self): """Get the peak flor of the Zone that is ultimately used to size the equipment. Values are always in m3/s. This accounts for the heating_factor and cooling_factor of the specified in the SizingParameter of the SimulationParameter object. """ return self._final_design_flow @property def design_day_name(self): """Get the name of the design day on which the peak load occurred.""" return self._design_day_name @property def peak_date_time(self): """Get a DateTime for the time at which the peak occurred.""" return self._peak_date_time @property def peak_temperature(self): """Get the outdoor air temperature at the time of the peak load (C).""" return self._peak_temperature @property def peak_humidity_ratio(self): """Get the outdoor humidity ratio at the time of the peak load (fractional).""" return self._peak_humidity_ratio @property def peak_outdoor_air_flow(self): """Get the outdoor air flow into the zone at the time of the peak load (m3/s).""" return self._peak_outdoor_air_flow
[docs] def ToString(self): """Overwrite .NET ToString.""" return self.__repr__()
def __repr__(self): return '{} Zone Size: {} W'.format(self.load_type, self.calculated_design_load)
[docs]class ComponentSize(object): """Object for holding the sizing results of an individual HVAC components. Args: sql_table_rows: A list of list where each sub-list represents a row of the SQLite ComponentSizes table. These rows are all expected to have the same component name. Properties: * component_type * component_name * descriptions * properties * values * units * properties_dict """ __slots__ = ('_component_type', '_component_name', '_properties', '_values', '_units', '_properties_dict') def __init__(self, sql_table_rows): """Initialize ComponentSize""" self._component_type = str(sql_table_rows[0][1]) self._component_name = str(sql_table_rows[0][2]) self._properties = tuple(str(table_row[3]) for table_row in sql_table_rows) self._values = tuple(table_row[4] for table_row in sql_table_rows) self._units = tuple(str(table_row[5]) for table_row in sql_table_rows)
[docs] @classmethod def from_dict(cls, data): """Create a ComponentSize from a dictionary. Args: data: ComponentSize dictionary following the format below. .. code-block:: python { "type": "ComponentSize", "component_type": str, "component_name": str, "properties": [], "values": [], "units": [] } """ assert data['type'] == 'ComponentSize', \ 'Expected ComponentSize. Got {}.'.format(data['type']) mtx = [] for prop, val, unit in zip(data['properties'], data['values'], data['units']): row = [None, data['component_type'], data['component_name'], prop, val, unit] mtx.append(row) return cls(mtx)
[docs] def to_dict(self): """Get ComponentSize as a dictionary.""" return { 'type': 'ComponentSize', 'component_type': self.component_type, 'component_name': self.component_name, 'properties': self.properties, 'values': self.values, 'units': self.units }
@property def component_type(self): """Get text for the type of component that this object represents.""" return self._component_type @property def component_name(self): """Get text for the name of component that this object represents.""" return self._component_name @property def descriptions(self): """Get a tuple with text descriptions for all component properties. Descriptions are formatted as component_name-property-unit. They are aligned with the values on this object. """ return tuple('{}-{} [{}]'.format(self._component_name, prop, unit) for prop, unit in zip(self._properties, self._units)) @property def properties(self): """Get a tuple with text for all component properties. They are aligned with the values on this object. """ return self._properties @property def values(self): """Get a tuple with numbers for all component property values. They are aligned with the properties on this object. """ return self._values @property def units(self): """Get a tuple with text for all component property units. They are aligned with the values on this object. """ return self._units @property def properties_dict(self): """Get a dictionary with the properties as keys and values as the values.""" return {prop: val for prop, val in zip(self._properties, self._values)}
[docs] def ToString(self): """Overwrite .NET ToString.""" return self.__repr__()
def __repr__(self): return 'Component Size: {}'.format(self.component_name)