Skip to content

datainterface module

BaseData

Base class for handling common data operations.

Parameters:

Name Type Description Default
path str

The path to the data file.

required

Attributes:

Name Type Description
path str

The path to the data file.

sublabel str

The sublabel extracted from the file path.

label str

The label extracted from the file path.

type str

The type extracted from the label.

data DataFrame

The loaded data.

latest_date Timestamp

The latest date in the data.

oldest_date Timestamp

The oldest date in the data.

Source code in multidefusion\datainterface.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class BaseData:
    """
    Base class for handling common data operations.

    Args:
        path (str): The path to the data file.

    Attributes:
        path (str): The path to the data file.
        sublabel (str): The sublabel extracted from the file path.
        label (str): The label extracted from the file path.
        type (str): The type extracted from the label.
        data (pd.DataFrame): The loaded data.
        latest_date (pd.Timestamp): The latest date in the data.
        oldest_date (pd.Timestamp): The oldest date in the data.
    """

    TIME_INTERVAL_NUM = 1
    TIME_INTERVAL = 'D'

    def __init__(self, path: str) -> None:
        """
        Initializes BaseData object.

        Args:
            path (str): The path to the data file.
        """
        self.path = path
        self.sublabel = None
        self.label = self.add_label_to_data()
        self.type = self.label.split("_")[0]
        self.data = self.load_data()
        self.latest_date = self.data.index.max()
        self.oldest_date = self.data.index.min()

    def load_csv_data(self, header: List[str]) -> pd.DataFrame:
        """
        Loads data from the specified file.

        Args:
            header (List[str]): List of column names.

        Returns:
            pd.DataFrame: Loaded data.
        """
        with open(self.path, 'r') as file:
            first_line = file.readline()
        sep = "\s+|," if "," in first_line else "\s+"
        data = pd.read_csv(self.path, sep=sep, header=None, skiprows=1, names=header, engine="python")
        return data

    def load_data(self) -> pd.DataFrame:
        """
        Abstract method to be implemented by subclasses for loading data.

        Returns:
            pd.DataFrame: Loaded data.
        """
        raise NotImplementedError("Subclasses must implement the load_data method")

    def get_observation(self, row_of_data: Any) -> Any:
        """
        Abstract method to be implemented by subclasses for extracting observations.

        Args:
            row_of_data: A row of data.

        Returns:
            Any: Extracted observation.
        """
        raise NotImplementedError("Subclasses must implement the get_observation method")

    def add_label_to_data(self) -> str:
        """
        Extracts and returns the label and set sublabel from the file path.

        Returns:
            str: Extracted label.
        """
        splitted = os.path.split(self.path)[-1].split(".")[0].split("_")
        if len(splitted) == 3:
            self.sublabel = splitted[-1]
            return splitted[0] + "_" + splitted[1]
        elif len(splitted) == 2:
            return splitted[0] + "_" + splitted[1]
        elif len(splitted) == 1:
            return splitted[0]

    def get_data_by_date(self, date: pd.Timestamp, columns_list: List[str] = None) -> Union[pd.DataFrame, None]:
        """
        Gets data for a specific date.

        Args:
            date (pd.Timestamp): The date for which data is requested.
            columns_list (List[str]): List of column names to be returned.

        Returns:
            Union[pd.DataFrame, None]: Data for the specified date (or None if not found).
        """
        if date in self.data.index:
            data_by_date = self.data.loc[date]
            if not data_by_date.isnull().values.any():
                if columns_list:
                    return data_by_date[columns_list]
                else:
                    return data_by_date
        return None

    def process_timestamp_columns(self) -> None:
        """
        Process timestamp columns in the data.

        Converts GNSS, SBAS, and PSI "YYYY", "MM", "DD" columns into a single "timestamp" column,
        sets it as the index, and resamples the data based on the time interval.
        """
        self.data["timestamp"] = pd.to_datetime(self.data[["YYYY", "MM", "DD"]].astype(int).astype(str).apply(" ".join, 1), format="%Y %m %d")
        self.data = self.data.drop(["YYYY", "MM", "DD"], axis=1)
        self.data.set_index(["timestamp"], inplace=True)
        self.data = self.data.resample(self.TIME_INTERVAL).asfreq()

    def convert_range_into_two_timestamps(self) -> None:
        """
        Convert DInSAR range columns into two timestamp columns.

        Converts "YYYY1", "MM1", "DD1", "YYYY2", "MM2", "DD2" columns into
        "timestamp1" and "timestamp2" columns, and sets them as the index.
        """
        self.data["timestamp1"] = pd.to_datetime(self.data[["YYYY1", "MM1", "DD1"]].astype(int).astype(str).apply(" ".join, 1), format="%Y %m %d")
        self.data["timestamp2"] = pd.to_datetime(self.data[["YYYY2", "MM2", "DD2"]].astype(int).astype(str).apply(" ".join, 1), format="%Y %m %d")
        self.data = self.data.drop(["YYYY1", "MM1", "DD1", "YYYY2", "MM2", "DD2"], axis=1)
        self.data.set_index(["timestamp1", "timestamp2"], inplace=True)

    def replace_decimal_sep(self) -> None:
        """
        Replace decimal separators in non-float columns.

        Replaces commas with dots in non-float columns and converts them to float.
        """
        for column in self.data.columns:
            if self.data[column].dtype != float:
                try:
                    self.data[column] = self.data[column].replace(",", ".", regex=True).astype(float)
                except ValueError:
                    return None

    def create_projection_matrix_and_error(self, row_of_data: pd.Series) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
        """
        Abstract method to be implemented by subclasses for creating projection matrix and error matrix.

        Args:
            row_of_data (pd.Series): A row of data.

        Returns:
            Tuple[Optional[np.ndarray], Optional[np.ndarray]]: Projection matrix and error matrix.
        """
        raise NotImplementedError("Subclasses must implement the create_projection_matrix_and_error method")

__init__(path)

Initializes BaseData object.

Parameters:

Name Type Description Default
path str

The path to the data file.

required
Source code in multidefusion\datainterface.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(self, path: str) -> None:
    """
    Initializes BaseData object.

    Args:
        path (str): The path to the data file.
    """
    self.path = path
    self.sublabel = None
    self.label = self.add_label_to_data()
    self.type = self.label.split("_")[0]
    self.data = self.load_data()
    self.latest_date = self.data.index.max()
    self.oldest_date = self.data.index.min()

add_label_to_data()

Extracts and returns the label and set sublabel from the file path.

Returns:

Name Type Description
str str

Extracted label.

Source code in multidefusion\datainterface.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def add_label_to_data(self) -> str:
    """
    Extracts and returns the label and set sublabel from the file path.

    Returns:
        str: Extracted label.
    """
    splitted = os.path.split(self.path)[-1].split(".")[0].split("_")
    if len(splitted) == 3:
        self.sublabel = splitted[-1]
        return splitted[0] + "_" + splitted[1]
    elif len(splitted) == 2:
        return splitted[0] + "_" + splitted[1]
    elif len(splitted) == 1:
        return splitted[0]

convert_range_into_two_timestamps()

Convert DInSAR range columns into two timestamp columns.

Converts "YYYY1", "MM1", "DD1", "YYYY2", "MM2", "DD2" columns into "timestamp1" and "timestamp2" columns, and sets them as the index.

Source code in multidefusion\datainterface.py
130
131
132
133
134
135
136
137
138
139
140
def convert_range_into_two_timestamps(self) -> None:
    """
    Convert DInSAR range columns into two timestamp columns.

    Converts "YYYY1", "MM1", "DD1", "YYYY2", "MM2", "DD2" columns into
    "timestamp1" and "timestamp2" columns, and sets them as the index.
    """
    self.data["timestamp1"] = pd.to_datetime(self.data[["YYYY1", "MM1", "DD1"]].astype(int).astype(str).apply(" ".join, 1), format="%Y %m %d")
    self.data["timestamp2"] = pd.to_datetime(self.data[["YYYY2", "MM2", "DD2"]].astype(int).astype(str).apply(" ".join, 1), format="%Y %m %d")
    self.data = self.data.drop(["YYYY1", "MM1", "DD1", "YYYY2", "MM2", "DD2"], axis=1)
    self.data.set_index(["timestamp1", "timestamp2"], inplace=True)

create_projection_matrix_and_error(row_of_data)

Abstract method to be implemented by subclasses for creating projection matrix and error matrix.

Parameters:

Name Type Description Default
row_of_data Series

A row of data.

required

Returns:

Type Description
Tuple[Optional[ndarray], Optional[ndarray]]

Tuple[Optional[np.ndarray], Optional[np.ndarray]]: Projection matrix and error matrix.

Source code in multidefusion\datainterface.py
155
156
157
158
159
160
161
162
163
164
165
def create_projection_matrix_and_error(self, row_of_data: pd.Series) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
    """
    Abstract method to be implemented by subclasses for creating projection matrix and error matrix.

    Args:
        row_of_data (pd.Series): A row of data.

    Returns:
        Tuple[Optional[np.ndarray], Optional[np.ndarray]]: Projection matrix and error matrix.
    """
    raise NotImplementedError("Subclasses must implement the create_projection_matrix_and_error method")

get_data_by_date(date, columns_list=None)

Gets data for a specific date.

Parameters:

Name Type Description Default
date Timestamp

The date for which data is requested.

required
columns_list List[str]

List of column names to be returned.

None

Returns:

Type Description
Union[DataFrame, None]

Union[pd.DataFrame, None]: Data for the specified date (or None if not found).

Source code in multidefusion\datainterface.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_data_by_date(self, date: pd.Timestamp, columns_list: List[str] = None) -> Union[pd.DataFrame, None]:
    """
    Gets data for a specific date.

    Args:
        date (pd.Timestamp): The date for which data is requested.
        columns_list (List[str]): List of column names to be returned.

    Returns:
        Union[pd.DataFrame, None]: Data for the specified date (or None if not found).
    """
    if date in self.data.index:
        data_by_date = self.data.loc[date]
        if not data_by_date.isnull().values.any():
            if columns_list:
                return data_by_date[columns_list]
            else:
                return data_by_date
    return None

get_observation(row_of_data)

Abstract method to be implemented by subclasses for extracting observations.

Parameters:

Name Type Description Default
row_of_data Any

A row of data.

required

Returns:

Name Type Description
Any Any

Extracted observation.

Source code in multidefusion\datainterface.py
70
71
72
73
74
75
76
77
78
79
80
def get_observation(self, row_of_data: Any) -> Any:
    """
    Abstract method to be implemented by subclasses for extracting observations.

    Args:
        row_of_data: A row of data.

    Returns:
        Any: Extracted observation.
    """
    raise NotImplementedError("Subclasses must implement the get_observation method")

load_csv_data(header)

Loads data from the specified file.

Parameters:

Name Type Description Default
header List[str]

List of column names.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Loaded data.

Source code in multidefusion\datainterface.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def load_csv_data(self, header: List[str]) -> pd.DataFrame:
    """
    Loads data from the specified file.

    Args:
        header (List[str]): List of column names.

    Returns:
        pd.DataFrame: Loaded data.
    """
    with open(self.path, 'r') as file:
        first_line = file.readline()
    sep = "\s+|," if "," in first_line else "\s+"
    data = pd.read_csv(self.path, sep=sep, header=None, skiprows=1, names=header, engine="python")
    return data

load_data()

Abstract method to be implemented by subclasses for loading data.

Returns:

Type Description
DataFrame

pd.DataFrame: Loaded data.

Source code in multidefusion\datainterface.py
61
62
63
64
65
66
67
68
def load_data(self) -> pd.DataFrame:
    """
    Abstract method to be implemented by subclasses for loading data.

    Returns:
        pd.DataFrame: Loaded data.
    """
    raise NotImplementedError("Subclasses must implement the load_data method")

process_timestamp_columns()

Process timestamp columns in the data.

Converts GNSS, SBAS, and PSI "YYYY", "MM", "DD" columns into a single "timestamp" column, sets it as the index, and resamples the data based on the time interval.

Source code in multidefusion\datainterface.py
118
119
120
121
122
123
124
125
126
127
128
def process_timestamp_columns(self) -> None:
    """
    Process timestamp columns in the data.

    Converts GNSS, SBAS, and PSI "YYYY", "MM", "DD" columns into a single "timestamp" column,
    sets it as the index, and resamples the data based on the time interval.
    """
    self.data["timestamp"] = pd.to_datetime(self.data[["YYYY", "MM", "DD"]].astype(int).astype(str).apply(" ".join, 1), format="%Y %m %d")
    self.data = self.data.drop(["YYYY", "MM", "DD"], axis=1)
    self.data.set_index(["timestamp"], inplace=True)
    self.data = self.data.resample(self.TIME_INTERVAL).asfreq()

replace_decimal_sep()

Replace decimal separators in non-float columns.

Replaces commas with dots in non-float columns and converts them to float.

Source code in multidefusion\datainterface.py
142
143
144
145
146
147
148
149
150
151
152
153
def replace_decimal_sep(self) -> None:
    """
    Replace decimal separators in non-float columns.

    Replaces commas with dots in non-float columns and converts them to float.
    """
    for column in self.data.columns:
        if self.data[column].dtype != float:
            try:
                self.data[column] = self.data[column].replace(",", ".", regex=True).astype(float)
            except ValueError:
                return None

GNSSData

Bases: BaseData

Class for handling GNSS data operations, inheriting from BaseData.

Parameters:

Name Type Description Default
path str

The path to the GNSS data file.

required
Source code in multidefusion\datainterface.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class GNSSData(BaseData):
    """
    Class for handling GNSS data operations, inheriting from BaseData.

    Args:
        path (str): The path to the GNSS data file.

    Attributes:
        Inherits attributes from BaseData.
    """

    HEADER_GNSS = ['YYYY', 'MM', 'DD', 'X', 'Y', 'Z', 'mX', 'mY', 'mZ']

    def __init__(self, path: str) -> None:
        """
        Initializes GNSSData object.

        Args:
            path (str): The path to the GNSS data file.
        """
        super().__init__(path)

    def load_data(self) -> pd.DataFrame:
        """
        Loads GNSS data from the specified file.

        Returns:
            pd.DataFrame: Loaded GNSS data.
        """
        header = getattr(GNSSData, "HEADER_" + self.type)
        self.data = self.load_csv_data(header)

        mean_xyz_first_five_epochs, F = self.create_rotation_matrix()
        self.xyz_to_neu(mean_xyz_first_five_epochs, F)

        self.process_timestamp_columns()
        self.replace_decimal_sep()
        return self.data

    def create_projection_matrix_and_error(self, row_of_data: pd.Series) -> Tuple[Union[np.ndarray, None], Union[np.ndarray, None]]:
        """
        Creates a projection matrix and error matrix based on the provided row of data.

        Args:
            row_of_data (pd.Series): A row of GNSS data.

        Returns:
            Tuple[Union[np.ndarray, None], Union[np.ndarray, None]]: Projection matrix and error matrix.
        """
        if row_of_data is not None:
            projection_matrix = np.array([[1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 1, 0]])
            # error_matrix = np.diag(row_of_data[["mN", "mE", "mU"]].values ** 2) * 100  # ATTENTION!!!! Rephrasing the error due to the results reported by Bernese!!!!
            error_matrix = row_of_data['error']
            return projection_matrix, error_matrix
        return None, None

    def get_observation(self, row_of_data: pd.Series) -> np.ndarray:
        """
        Gets GNSS observation from the provided row of data.

        Args:
            row_of_data (pd.Series): A row of GNSS data.

        Returns:
            np.ndarray: GNSS observation.
        """
        return row_of_data[["N", "E", "U"]].values

    @staticmethod
    def xyz_to_blh(X, Y, Z):
        """
        Converts Cartesian coordinates (X, Y, Z) to geodetic coordinates (latitude, longitude, height).

        Args:
            X (float): Cartesian coordinate in the X direction.
            Y (float): Cartesian coordinate in the Y direction.
            Z (float): Cartesian coordinate in the Z direction.

        Returns:
            Tuple[float, float, float]: Geodetic coordinates (latitude, longitude, height).
        """
        a = 6378137
        b = 6356752.31414
        e2 = ((a * a) - (b * b)) / (a * a)
        elat = 1e-12
        eht = 1e-05
        p = np.sqrt(X ** 2 + Y ** 2)
        lat = np.arctan2(Z, p / (1 - e2))
        h = 0
        dh = 1
        dlat = 1
        i = 0
        while np.any(dlat > elat) or np.any(dh > eht):
            i += 1
            lat0 = lat
            h0 = h
            v = a / np.sqrt(1 - e2 * np.sin(lat) ** 2)
            h = p / np.cos(lat) - v
            lat = np.arctan2(Z, p * (1 - e2 * v / (v + h)))
            dlat = np.abs(lat - lat0)
            dh = np.abs(h - h0)
        lon = np.arctan2(Y, X)
        return lat, lon, h

    def create_rotation_matrix(self):
        """
        Creates a rotation matrix based on the mean coordinates of the first five epochs.

        Returns:
            Tuple[pd.Series, np.ndarray]: Mean coordinates of the first five epochs and the rotation matrix.
        """
        mean_xyz_first_five_epochs = self.data.loc[:, ["X", "Y", "Z"]].head(5).mean(axis=0)
        B, L, h = self.xyz_to_blh(mean_xyz_first_five_epochs["X"], mean_xyz_first_five_epochs["Y"], mean_xyz_first_five_epochs["Z"])
        F = np.array([[-np.sin(B) * np.cos(L), -np.sin(B) * np.sin(L), np.cos(B)],
                      [-np.sin(L), np.cos(L), 0],
                      [np.cos(B) * np.cos(L), np.cos(B) * np.sin(L), np.sin(B)]])
        return mean_xyz_first_five_epochs, F

    def xyz_to_neu(self, mean_xyz_first_five_epochs, F):
        """
        Converts Cartesian coordinates (X, Y, Z) to local coordinates (North, East, Up).

        Args:
            mean_xyz_first_five_epochs (pd.Series): Mean coordinates of the first five epochs.
            F (np.ndarray): Rotation matrix.

        Returns:
            None: Modifies the 'data' attribute in-place by updating coordinates and errors.
        """
        columns_to_modify = ["X", "Y", "Z"]

        self.data[columns_to_modify] -= mean_xyz_first_five_epochs[columns_to_modify]

        def calculate_coordinates_and_errors_for_row(row):
            NEU = np.dot(F, np.array([row["X"], row["Y"], row["Z"]]))
            errors = np.dot(np.dot(F, np.diag([row["mX"] ** 2, row["mY"] ** 2, row["mZ"] ** 2])), F.transpose())
            row["N"], row["E"], row["U"] = NEU
            row["error"] = errors
            return row

        self.data = self.data.apply(calculate_coordinates_and_errors_for_row, axis=1)
        self.data = self.data.drop(["X", "Y", "Z", "mX", "mY", "mZ"], axis=1)

__init__(path)

Initializes GNSSData object.

Parameters:

Name Type Description Default
path str

The path to the GNSS data file.

required
Source code in multidefusion\datainterface.py
181
182
183
184
185
186
187
188
def __init__(self, path: str) -> None:
    """
    Initializes GNSSData object.

    Args:
        path (str): The path to the GNSS data file.
    """
    super().__init__(path)

create_projection_matrix_and_error(row_of_data)

Creates a projection matrix and error matrix based on the provided row of data.

Parameters:

Name Type Description Default
row_of_data Series

A row of GNSS data.

required

Returns:

Type Description
Tuple[Union[ndarray, None], Union[ndarray, None]]

Tuple[Union[np.ndarray, None], Union[np.ndarray, None]]: Projection matrix and error matrix.

Source code in multidefusion\datainterface.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def create_projection_matrix_and_error(self, row_of_data: pd.Series) -> Tuple[Union[np.ndarray, None], Union[np.ndarray, None]]:
    """
    Creates a projection matrix and error matrix based on the provided row of data.

    Args:
        row_of_data (pd.Series): A row of GNSS data.

    Returns:
        Tuple[Union[np.ndarray, None], Union[np.ndarray, None]]: Projection matrix and error matrix.
    """
    if row_of_data is not None:
        projection_matrix = np.array([[1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 1, 0]])
        # error_matrix = np.diag(row_of_data[["mN", "mE", "mU"]].values ** 2) * 100  # ATTENTION!!!! Rephrasing the error due to the results reported by Bernese!!!!
        error_matrix = row_of_data['error']
        return projection_matrix, error_matrix
    return None, None

create_rotation_matrix()

Creates a rotation matrix based on the mean coordinates of the first five epochs.

Returns:

Type Description

Tuple[pd.Series, np.ndarray]: Mean coordinates of the first five epochs and the rotation matrix.

Source code in multidefusion\datainterface.py
272
273
274
275
276
277
278
279
280
281
282
283
284
def create_rotation_matrix(self):
    """
    Creates a rotation matrix based on the mean coordinates of the first five epochs.

    Returns:
        Tuple[pd.Series, np.ndarray]: Mean coordinates of the first five epochs and the rotation matrix.
    """
    mean_xyz_first_five_epochs = self.data.loc[:, ["X", "Y", "Z"]].head(5).mean(axis=0)
    B, L, h = self.xyz_to_blh(mean_xyz_first_five_epochs["X"], mean_xyz_first_five_epochs["Y"], mean_xyz_first_five_epochs["Z"])
    F = np.array([[-np.sin(B) * np.cos(L), -np.sin(B) * np.sin(L), np.cos(B)],
                  [-np.sin(L), np.cos(L), 0],
                  [np.cos(B) * np.cos(L), np.cos(B) * np.sin(L), np.sin(B)]])
    return mean_xyz_first_five_epochs, F

get_observation(row_of_data)

Gets GNSS observation from the provided row of data.

Parameters:

Name Type Description Default
row_of_data Series

A row of GNSS data.

required

Returns:

Type Description
ndarray

np.ndarray: GNSS observation.

Source code in multidefusion\datainterface.py
224
225
226
227
228
229
230
231
232
233
234
def get_observation(self, row_of_data: pd.Series) -> np.ndarray:
    """
    Gets GNSS observation from the provided row of data.

    Args:
        row_of_data (pd.Series): A row of GNSS data.

    Returns:
        np.ndarray: GNSS observation.
    """
    return row_of_data[["N", "E", "U"]].values

load_data()

Loads GNSS data from the specified file.

Returns:

Type Description
DataFrame

pd.DataFrame: Loaded GNSS data.

Source code in multidefusion\datainterface.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def load_data(self) -> pd.DataFrame:
    """
    Loads GNSS data from the specified file.

    Returns:
        pd.DataFrame: Loaded GNSS data.
    """
    header = getattr(GNSSData, "HEADER_" + self.type)
    self.data = self.load_csv_data(header)

    mean_xyz_first_five_epochs, F = self.create_rotation_matrix()
    self.xyz_to_neu(mean_xyz_first_five_epochs, F)

    self.process_timestamp_columns()
    self.replace_decimal_sep()
    return self.data

xyz_to_blh(X, Y, Z) staticmethod

Converts Cartesian coordinates (X, Y, Z) to geodetic coordinates (latitude, longitude, height).

Parameters:

Name Type Description Default
X float

Cartesian coordinate in the X direction.

required
Y float

Cartesian coordinate in the Y direction.

required
Z float

Cartesian coordinate in the Z direction.

required

Returns:

Type Description

Tuple[float, float, float]: Geodetic coordinates (latitude, longitude, height).

Source code in multidefusion\datainterface.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
@staticmethod
def xyz_to_blh(X, Y, Z):
    """
    Converts Cartesian coordinates (X, Y, Z) to geodetic coordinates (latitude, longitude, height).

    Args:
        X (float): Cartesian coordinate in the X direction.
        Y (float): Cartesian coordinate in the Y direction.
        Z (float): Cartesian coordinate in the Z direction.

    Returns:
        Tuple[float, float, float]: Geodetic coordinates (latitude, longitude, height).
    """
    a = 6378137
    b = 6356752.31414
    e2 = ((a * a) - (b * b)) / (a * a)
    elat = 1e-12
    eht = 1e-05
    p = np.sqrt(X ** 2 + Y ** 2)
    lat = np.arctan2(Z, p / (1 - e2))
    h = 0
    dh = 1
    dlat = 1
    i = 0
    while np.any(dlat > elat) or np.any(dh > eht):
        i += 1
        lat0 = lat
        h0 = h
        v = a / np.sqrt(1 - e2 * np.sin(lat) ** 2)
        h = p / np.cos(lat) - v
        lat = np.arctan2(Z, p * (1 - e2 * v / (v + h)))
        dlat = np.abs(lat - lat0)
        dh = np.abs(h - h0)
    lon = np.arctan2(Y, X)
    return lat, lon, h

xyz_to_neu(mean_xyz_first_five_epochs, F)

Converts Cartesian coordinates (X, Y, Z) to local coordinates (North, East, Up).

Parameters:

Name Type Description Default
mean_xyz_first_five_epochs Series

Mean coordinates of the first five epochs.

required
F ndarray

Rotation matrix.

required

Returns:

Name Type Description
None

Modifies the 'data' attribute in-place by updating coordinates and errors.

Source code in multidefusion\datainterface.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def xyz_to_neu(self, mean_xyz_first_five_epochs, F):
    """
    Converts Cartesian coordinates (X, Y, Z) to local coordinates (North, East, Up).

    Args:
        mean_xyz_first_five_epochs (pd.Series): Mean coordinates of the first five epochs.
        F (np.ndarray): Rotation matrix.

    Returns:
        None: Modifies the 'data' attribute in-place by updating coordinates and errors.
    """
    columns_to_modify = ["X", "Y", "Z"]

    self.data[columns_to_modify] -= mean_xyz_first_five_epochs[columns_to_modify]

    def calculate_coordinates_and_errors_for_row(row):
        NEU = np.dot(F, np.array([row["X"], row["Y"], row["Z"]]))
        errors = np.dot(np.dot(F, np.diag([row["mX"] ** 2, row["mY"] ** 2, row["mZ"] ** 2])), F.transpose())
        row["N"], row["E"], row["U"] = NEU
        row["error"] = errors
        return row

    self.data = self.data.apply(calculate_coordinates_and_errors_for_row, axis=1)
    self.data = self.data.drop(["X", "Y", "Z", "mX", "mY", "mZ"], axis=1)

SARData

Bases: BaseData

Class for handling SAR data operations, inheriting from BaseData.

Parameters:

Name Type Description Default
path str

The path to the SAR data file.

required
Source code in multidefusion\datainterface.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
class SARData(BaseData):
    """
    Class for handling SAR data operations, inheriting from BaseData.

    Args:
        path (str): The path to the SAR data file.

    Attributes:
        Inherits attributes from BaseData.
    """

    SENTINEL_WAVELENGTH = 0.055465763
    HEADER_DInSAR = ['YYYY1', 'MM1', 'DD1', 'YYYY2', 'MM2', 'DD2', 'DSP', 'INC_ANG', 'HEAD_ANG']
    HEADER_PSI = ['YYYY', 'MM', 'DD', 'DSP', 'INC_ANG', 'HEAD_ANG']
    HEADER_SBAS = ['YYYY', 'MM', 'DD', 'DSP', 'INC_ANG', 'HEAD_ANG']

    def __init__(self, path: str) -> None:
        """
        Initializes SARData object.

        Args:
            path (str): The path to the SAR data file.
        """
        super().__init__(path)
        self.bias_reduction = False
        self.get_header_from_file()

    def get_header_from_file(self):
        with open(self.path, 'r') as file:
            first_line = file.readline()
        try:
            if 'COH' in first_line:
                header = getattr(SARData, "HEADER_" + self.type) + ["COH"]
                return header
            elif 'ERROR' in first_line:
                header = getattr(SARData, "HEADER_" + self.type) + ["ERROR"]
                return header
            else:
                raise ValueError(f"The header cannot be identified in the {self.path} file.")
        except ZeroDivisionError:
            sys.exit(1)

    def load_data(self) -> pd.DataFrame:
        """
        Loads SAR data from the specified file.

        Returns:
            pd.DataFrame: Loaded SAR data.
        """
        header = self.get_header_from_file()
        self.data = self.load_csv_data(header)
        self.replace_decimal_sep()
        if "COH" in header:
            self.coherence_to_error()
        if self.type == "DInSAR":
            self.convert_range_into_two_timestamps()
        else:
            self.process_timestamp_columns()
        self.expand_dataframe_by_date_range()
        return self.data

    def reduce_bias_to_gnss(self, date: pd.Timestamp):
        """
        Reduces bias in SAR data to GNSS data.

        This method reduces the bias in the SAR data by removing data points prior to
        the specified date and adjusting the 'DSP' values based on the first non-null value
        after the specified date.

        Args:
            date (pd.Timestamp): The timestamp used as a reference point for bias reduction.

        Returns:
            None
        """
        self.data = self.data[self.data.index > date]
        first_non_null_value = self.data["DSP"].first_valid_index()
        if first_non_null_value is not None:
            self.data["DSP"] = self.data["DSP"] - self.data.at[first_non_null_value, "DSP"]

    def coherence_to_error(self) -> None:
        """
        Convert coherence column to error column in the data.
        """
        sentinel_wavelength = self.SENTINEL_WAVELENGTH

        def calculate_error(coherence: float) -> float:
            """
            Calculate error from coherence.

            Args:
                coherence (float): Coherence value.

            Returns:
                float: Calculated error.
            """
            li = 0
            k = 1
            while True:
                li += (coherence ** (2 * k)) / (k ** 2)
                li2 = li + (coherence ** (2 * k)) / (k ** 2)
                if abs(li - li2) <= 1e-10:
                    break
                k += 1
            phase = pi ** 2/3 - pi * asin(coherence) + asin(coherence) ** 2 - 0.5 * li2
            return sqrt(phase) * sentinel_wavelength / (4 * pi)

        self.data["ERROR"] = self.data["COH"].apply(calculate_error)

    def create_projection_matrix_and_error(self, row_of_data: pd.Series) -> Tuple[Optional[np.ndarray], Optional[float]]:
        """
        Creates a projection matrix and error from the provided row of data.

        Args:
            row_of_data (pd.Series): A row of SAR data.

        Returns:
            Tuple[Optional[np.ndarray], Optional[float]]: Projection matrix and error.
        """
        if row_of_data is not None:
            inc_ang = row_of_data["INC_ANG"]
            head_ang = row_of_data["HEAD_ANG"]
            error = row_of_data["ERROR"]
            if self.type == "DInSAR":
                projection_matrix = np.array([[0, sin(inc_ang) * sin(head_ang), 0, -sin(inc_ang) * cos(head_ang), 0, cos(inc_ang)]])
            else:
                projection_matrix = np.array([[sin(inc_ang) * sin(head_ang), 0, -sin(inc_ang) * cos(head_ang), 0, cos(inc_ang), 0]])
            error_matrix = error ** 2
            return projection_matrix, error_matrix
        return None, None

    def get_observation(self, row_of_data: pd.Series) -> Union[float, None]:
        """
        Gets SAR observation from the provided row of data.

        Args:
            row_of_data (pd.Series): A row of SAR data.

        Returns:
            Union[float, None]: SAR observation.
        """
        return row_of_data["DSP"]

    def expand_dataframe_by_date_range(self) -> None:
        """
        Expands the DataFrame by date range for "DInSAR" label.

        For SAR data labeled as "DInSAR", this method adds a temporary column "temp" to the DataFrame
        containing a date range between "timestamp1" and "timestamp2" based on the specified time interval.
        The DataFrame is then exploded based on the "temp" column, duplicates are removed, and unnecessary
        columns are dropped to create a new "timestamp1" index.

        Note:
        - This method is specifically designed for SAR data labeled as "DInSAR".
        - It modifies the existing DataFrame in-place.

        Example:
        If the original DataFrame has a row with "timestamp1" and "timestamp2" as ["2022-01-01", "2022-01-03"],
        and the time interval is set to "1D" (daily), the resulting DataFrame will have individual rows for
        "2022-01-01", "2022-01-02", and "2022-01-03".
        """
        if self.type == "DInSAR":
            # Create a new column to store timestamps from the range of the given row
            self.data["temp"] = self.data.apply(
                lambda row: pd.date_range(
                    row.name[0],
                    row.name[1] - timedelta(days=self.TIME_INTERVAL_NUM),
                    freq=self.TIME_INTERVAL
                ),
                axis=1
            )
            # Count the difference in days for each row between the date range
            self.data["day_diff"] = (self.data.index.get_level_values(1) - self.data.index.get_level_values(0)).days
            # Calculate rates of daily changes
            self.data["DSP"] = self.data["DSP"] / self.data["day_diff"]
            # Extend DataFrame to a specific time interval and add NaN where data is missing
            self.data = (
                self.data.explode("temp")
                .reset_index()
                .drop_duplicates(subset="temp", keep="last")
                .drop(columns=["timestamp1", "timestamp2", "day_diff"])
                .rename(columns={"temp": "timestamp1"})
                .set_index("timestamp1")
                .resample(self.TIME_INTERVAL)
                .asfreq()
            )

__init__(path)

Initializes SARData object.

Parameters:

Name Type Description Default
path str

The path to the SAR data file.

required
Source code in multidefusion\datainterface.py
328
329
330
331
332
333
334
335
336
337
def __init__(self, path: str) -> None:
    """
    Initializes SARData object.

    Args:
        path (str): The path to the SAR data file.
    """
    super().__init__(path)
    self.bias_reduction = False
    self.get_header_from_file()

coherence_to_error()

Convert coherence column to error column in the data.

Source code in multidefusion\datainterface.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def coherence_to_error(self) -> None:
    """
    Convert coherence column to error column in the data.
    """
    sentinel_wavelength = self.SENTINEL_WAVELENGTH

    def calculate_error(coherence: float) -> float:
        """
        Calculate error from coherence.

        Args:
            coherence (float): Coherence value.

        Returns:
            float: Calculated error.
        """
        li = 0
        k = 1
        while True:
            li += (coherence ** (2 * k)) / (k ** 2)
            li2 = li + (coherence ** (2 * k)) / (k ** 2)
            if abs(li - li2) <= 1e-10:
                break
            k += 1
        phase = pi ** 2/3 - pi * asin(coherence) + asin(coherence) ** 2 - 0.5 * li2
        return sqrt(phase) * sentinel_wavelength / (4 * pi)

    self.data["ERROR"] = self.data["COH"].apply(calculate_error)

create_projection_matrix_and_error(row_of_data)

Creates a projection matrix and error from the provided row of data.

Parameters:

Name Type Description Default
row_of_data Series

A row of SAR data.

required

Returns:

Type Description
Tuple[Optional[ndarray], Optional[float]]

Tuple[Optional[np.ndarray], Optional[float]]: Projection matrix and error.

Source code in multidefusion\datainterface.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def create_projection_matrix_and_error(self, row_of_data: pd.Series) -> Tuple[Optional[np.ndarray], Optional[float]]:
    """
    Creates a projection matrix and error from the provided row of data.

    Args:
        row_of_data (pd.Series): A row of SAR data.

    Returns:
        Tuple[Optional[np.ndarray], Optional[float]]: Projection matrix and error.
    """
    if row_of_data is not None:
        inc_ang = row_of_data["INC_ANG"]
        head_ang = row_of_data["HEAD_ANG"]
        error = row_of_data["ERROR"]
        if self.type == "DInSAR":
            projection_matrix = np.array([[0, sin(inc_ang) * sin(head_ang), 0, -sin(inc_ang) * cos(head_ang), 0, cos(inc_ang)]])
        else:
            projection_matrix = np.array([[sin(inc_ang) * sin(head_ang), 0, -sin(inc_ang) * cos(head_ang), 0, cos(inc_ang), 0]])
        error_matrix = error ** 2
        return projection_matrix, error_matrix
    return None, None

expand_dataframe_by_date_range()

Expands the DataFrame by date range for "DInSAR" label.

For SAR data labeled as "DInSAR", this method adds a temporary column "temp" to the DataFrame containing a date range between "timestamp1" and "timestamp2" based on the specified time interval. The DataFrame is then exploded based on the "temp" column, duplicates are removed, and unnecessary columns are dropped to create a new "timestamp1" index.

Note: - This method is specifically designed for SAR data labeled as "DInSAR". - It modifies the existing DataFrame in-place.

Example: If the original DataFrame has a row with "timestamp1" and "timestamp2" as ["2022-01-01", "2022-01-03"], and the time interval is set to "1D" (daily), the resulting DataFrame will have individual rows for "2022-01-01", "2022-01-02", and "2022-01-03".

Source code in multidefusion\datainterface.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def expand_dataframe_by_date_range(self) -> None:
    """
    Expands the DataFrame by date range for "DInSAR" label.

    For SAR data labeled as "DInSAR", this method adds a temporary column "temp" to the DataFrame
    containing a date range between "timestamp1" and "timestamp2" based on the specified time interval.
    The DataFrame is then exploded based on the "temp" column, duplicates are removed, and unnecessary
    columns are dropped to create a new "timestamp1" index.

    Note:
    - This method is specifically designed for SAR data labeled as "DInSAR".
    - It modifies the existing DataFrame in-place.

    Example:
    If the original DataFrame has a row with "timestamp1" and "timestamp2" as ["2022-01-01", "2022-01-03"],
    and the time interval is set to "1D" (daily), the resulting DataFrame will have individual rows for
    "2022-01-01", "2022-01-02", and "2022-01-03".
    """
    if self.type == "DInSAR":
        # Create a new column to store timestamps from the range of the given row
        self.data["temp"] = self.data.apply(
            lambda row: pd.date_range(
                row.name[0],
                row.name[1] - timedelta(days=self.TIME_INTERVAL_NUM),
                freq=self.TIME_INTERVAL
            ),
            axis=1
        )
        # Count the difference in days for each row between the date range
        self.data["day_diff"] = (self.data.index.get_level_values(1) - self.data.index.get_level_values(0)).days
        # Calculate rates of daily changes
        self.data["DSP"] = self.data["DSP"] / self.data["day_diff"]
        # Extend DataFrame to a specific time interval and add NaN where data is missing
        self.data = (
            self.data.explode("temp")
            .reset_index()
            .drop_duplicates(subset="temp", keep="last")
            .drop(columns=["timestamp1", "timestamp2", "day_diff"])
            .rename(columns={"temp": "timestamp1"})
            .set_index("timestamp1")
            .resample(self.TIME_INTERVAL)
            .asfreq()
        )

get_observation(row_of_data)

Gets SAR observation from the provided row of data.

Parameters:

Name Type Description Default
row_of_data Series

A row of SAR data.

required

Returns:

Type Description
Union[float, None]

Union[float, None]: SAR observation.

Source code in multidefusion\datainterface.py
443
444
445
446
447
448
449
450
451
452
453
def get_observation(self, row_of_data: pd.Series) -> Union[float, None]:
    """
    Gets SAR observation from the provided row of data.

    Args:
        row_of_data (pd.Series): A row of SAR data.

    Returns:
        Union[float, None]: SAR observation.
    """
    return row_of_data["DSP"]

load_data()

Loads SAR data from the specified file.

Returns:

Type Description
DataFrame

pd.DataFrame: Loaded SAR data.

Source code in multidefusion\datainterface.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def load_data(self) -> pd.DataFrame:
    """
    Loads SAR data from the specified file.

    Returns:
        pd.DataFrame: Loaded SAR data.
    """
    header = self.get_header_from_file()
    self.data = self.load_csv_data(header)
    self.replace_decimal_sep()
    if "COH" in header:
        self.coherence_to_error()
    if self.type == "DInSAR":
        self.convert_range_into_two_timestamps()
    else:
        self.process_timestamp_columns()
    self.expand_dataframe_by_date_range()
    return self.data

reduce_bias_to_gnss(date)

Reduces bias in SAR data to GNSS data.

This method reduces the bias in the SAR data by removing data points prior to the specified date and adjusting the 'DSP' values based on the first non-null value after the specified date.

Parameters:

Name Type Description Default
date Timestamp

The timestamp used as a reference point for bias reduction.

required

Returns:

Type Description

None

Source code in multidefusion\datainterface.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def reduce_bias_to_gnss(self, date: pd.Timestamp):
    """
    Reduces bias in SAR data to GNSS data.

    This method reduces the bias in the SAR data by removing data points prior to
    the specified date and adjusting the 'DSP' values based on the first non-null value
    after the specified date.

    Args:
        date (pd.Timestamp): The timestamp used as a reference point for bias reduction.

    Returns:
        None
    """
    self.data = self.data[self.data.index > date]
    first_non_null_value = self.data["DSP"].first_valid_index()
    if first_non_null_value is not None:
        self.data["DSP"] = self.data["DSP"] - self.data.at[first_non_null_value, "DSP"]