Importing Data

From ECRIN-MDR Wiki
Revision as of 11:08, 1 January 2021 by Admin (talk | contribs) (Actioning the changes)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Introduction

The program takes the data in the session data (sd) tables in a source database (the 'session data' having been created by the most recent harvest operation), and compares it with the accumulated data for each source, stored in the accumulated data (ad) tables. New and revised data are then transferred to the ad tables. Unlike the preceding download and harvest steps, the import process is essentially the same for all data sources, because the sd and ad tables have the same structure in each database. The listing of tables present in each DB varies but not the table structures themselves.

N.B. The import code can be found at https://github.com/ecrin-github/DataImporter

Parameters

The system is a console app, (to more easily support being scheduled) and can take takes 2 parameters:
-s, followed by a comma separated list of integer ids, each representing a data source within the system: The program takes each of these ids in turn, and carries out the sd to ad import process for each of them.
-T: as a flag. If present, forces the recreation of a new set of accumulated data tables. This parameter should therefore only be used when creating the ad tables for the first time, or when the entire ad schema is recreated from harvested data that represents the totality of the data available from the source. In other words when carrying out a full rebuild of the source's data from a full harvest.
Thus, the parameter string
     -s "100116, 100124, 100132"
will cause data to be imported from the session data tables for each of the Australian, German and Dutch trial registries, in that order.
The parameter string
     -s "101900, 101901" -T
will cause, for the BioLinncc and Yoda repositories respectively, the current ad tables to be dropped and then rebuilt, before being filled with the most recent session data harvested from those repositories.
Note there is no parameter to allow using only part of the harvested data in the sd tables - all of the sd data is always used. The way in which different data is imported, e.g. data that has been revised or added after a certain date, is by controlling the data harvest processing that precedes the import step.

Overview

Identifying New or Changed Data

The initial stage of the import process is to recreate and fill a set of 4 'diff' tables, that summarise the differences between the sd data and the corresponding ad data. These tables are

  • to_ad_study_recs - lists new, edited, unchanged and (possibly) deleted study records
  • to_ad_study_atts - lists the details of edits carried out within study attributes
  • to_ad_object_recs - lists new, edited, unchanged and (possibly) deleted data object records
  • to_ad_object_atts - lists the details of edits carried out within data object attributes

All 4 tables are created in the sd schema, where they remain (to allow inspection) until the next harvest or import process.
The system fills the _recs tables by comparing the sd_sid (for studies) or sd_oid (for objects) fields. New study sd_sids are inserted into to_ad_study_recs with a status of 1, edited studies, identified by differences in the 'study_full_hash' fields of matching study records, are inserted with a status of 2, and unchanged studies with a status of 3. It is not possible to identify deleted studies, i.e. that exist in the ad table but not the sd data, unless the sd data represents 100% of the downloaded files, i.e. is a full harvest. The system checks if this is the case. If it is, it can insert deleted sd_sids - this time from the ad tables - into the to_ad_study_recs table with a status of 4. An exactly analogous process is used for the data object records, filling the to_ad_object_recs table. The code for the study records is shown below.

        public void IdentifyNewStudies()
        {
            string sql_string = @"INSERT INTO sd.to_ad_study_recs (sd_sid, status)
                    SELECT s.sd_sid, 1 from sd.studies s
                    LEFT JOIN ad.studies a
                    on s.sd_sid = a.sd_sid 
                    WHERE a.sd_sid is null;";
            ExecuteSQL(sql_string);
        }

        public void IdentifyEditedStudies()
        {
            string sql_string = @"INSERT INTO sd.to_ad_study_recs (sd_sid, status)
                SELECT s.sd_sid, 2 from sd.studies s
                INNER JOIN ad.studies a
                on s.sd_sid = a.sd_sid
                where s.study_full_hash <> a.study_full_hash;";
            ExecuteSQL(sql_string);
        }

        public void IdentifyIdenticalStudies()
        {
            string sql_string = @"INSERT INTO sd.to_ad_study_recs (sd_sid, status)
                SELECT s.sd_sid, 3 from sd.studies s
                INNER JOIN ad.studies a
                on s.sd_sid = a.sd_sid
                where s.study_full_hash = a.study_full_hash;";
            ExecuteSQL(sql_string);
        }

        public void IdentifyDeletedStudies()
        {
            string sql_string = @"INSERT INTO sd.to_ad_study_recs(sd_sid, status)
                SELECT a.sd_sid, 4 from ad.studies a
                LEFT JOIN sd.studies s
                on a.sd_sid = s.sd_sid
                WHERE s.sd_sid is null;";
            ExecuteSQL(sql_string);
        }

For studies and objects that have been edited the nature of the edit is then clarified - in particular whether or not it involves changes to the main study or object record, and / or if it involves an addition, edit, or deletion of one or more attribute records. This is done by comparing the record_hash fields of the edited study and object fields. If a change is found - indicating the main study or object record itself has been edited - then a value of 2 is inserted into the _rec_status field in the _recs tables. For data objects, an additional check is made in the same way to see if any dataset data (if it exists) has changed, because there is a one-to-one relationship between data object and object dataset data. If it has a value of 4 is inserted into the object_dataset_status field. the relevant code is shown below.

        public void IdentifyChangedStudyRecs()
        {
            string sql_string = @"with t as (
                select s.sd_sid
                from sd.studies s
                INNER JOIN ad.studies a
                on s.sd_sid = a.sd_sid
                where s.record_hash <> a.record_hash)
                UPDATE sd.to_ad_study_recs c
                SET study_rec_status = 2
                from t
                WHERE t.sd_sid = c.sd_sid;";
            ExecuteSQL(sql_string);
        }

        public void IdentifyChangedObjectRecs()
        {
            string sql_string = @"with t as (
                select s.sd_oid
                from sd.data_objects s
                INNER JOIN ad.data_objects a
                on s.sd_oid = a.sd_oid
                where s.record_hash <> a.record_hash)
                UPDATE sd.to_ad_object_recs c
                SET object_rec_status = 2
                FROM t
                WHERE t.sd_oid = c.sd_oid;";
            ExecuteSQL(sql_string);
        }

        public void IdentifyChangedDatasetRecs()
        {
            string sql_string = @"with t as (
                select s.sd_oid
                from sd.object_datasets s
                INNER JOIN ad.object_datasets a
                on s.sd_oid = a.sd_oid
                where s.record_hash <> a.record_hash)
                UPDATE sd.to_ad_object_recs c
                SET object_dataset_status = 4
                FROM t
                WHERE t.sd_oid = c.sd_oid;";
            ExecuteSQL(sql_string);
        }

Finally the _atts tables are filled. Amongst the edited records, if a study or object attribute record has changed in some way, the value of the 'composite hash' for that attribute type, for that study or object, will be different in the sd and ad data. The study or object ids, attribute types and composite hashes in such cases are all stored in the _atts tables with a status value of 2. If a study or object has attributes of a certain type when previously it had none of this type at all, there will be a new composite hash record, also stored in the _atts table with a status of 1. And finally, if a study or object used to have attributes of a certain type but now has none at all, the data (taken from the ad tables) is stored in the _atts table with a status of 4. The _atts tables therefore indicate which attribute types have been changed for any edited study or object. The code for the study data is shown below.

        public void IdentifyChangedStudyAtts()
        {
            // Store as the sd_id and hash type of all changed composite hash values
            // in edited records - indicates that one or more of the attributes has changed.

            string sql_string = @"INSERT INTO sd.to_ad_study_atts
                (sd_sid, hash_type_id, status, composite_hash)
                select s.sd_sid, s.hash_type_id, 2, s.composite_hash
                from 
                ad.temp_sd_study_hashes s
                INNER JOIN 
                ad.temp_ad_study_hashes a
                on s.sd_sid = a.sd_sid
                and s.hash_type_id = a.hash_type_id
                where s.composite_hash <> a.composite_hash;";
            ExecuteSQL(sql_string);
        }

        public void IdentifyNewStudyAtts()
        {
        // Stores the sd_id and hash type of a new ad_sid / hash type combinations,
        // indicates that one or more of new types of attributes have been added.

            string sql_string = @"INSERT INTO sd.to_ad_study_atts
                (sd_sid, hash_type_id, status, composite_hash)
                select s.sd_sid, s.hash_type_id, 1, s.composite_hash
                from 
                ad.temp_sd_study_hashes s
                LEFT JOIN 
                ad.temp_ad_study_hashes a
                on s.sd_sid = a.sd_sid
                and s.hash_type_id = a.hash_type_id
                where a.sd_sid is null;";
            ExecuteSQL(sql_string);
        }

        public void IdentifyDeletedStudyAtts()
        {
        // Stores the sd_id and hash type of deleted ad_sid / hash type combinations,
        // indicates that one or more types of attributes have disappeared.

            string sql_string = @"INSERT INTO sd.to_ad_study_atts
                (sd_sid, hash_type_id, status, composite_hash)
                select a.sd_sid, a.hash_type_id, 4, a.composite_hash
                from 
                ad.temp_sd_study_hashes s
                RIGHT JOIN 
                ad.temp_ad_study_hashes a
                on s.sd_sid = a.sd_sid
                and s.hash_type_id = a.hash_type_id
                where s.sd_sid is null;";
            ExecuteSQL(sql_string);
        }

At the end of this process the system has 4 tables that hold a complete record of the differences between the session data and accumulated data sets, which can then be used to direct the necessary changes inthe ad tables.

Actioning the changes

The system then works through the actions required, for each designated source, following a fairly simple set of rules.

1) Any new studies or objects are directly imported into the corresponding ad tables, along with all their attributes. Note that objects (in study based sources) always hold a reference to their 'parent' study sd_sid, which is a persistent identifier, so a new data object added to an existing study is still correctly matched. Examples of the sort of code involved - in this case for study identifiers and titles - are shown below.

        public void TransferStudyIdentifiers()
        {
            string sql_string = @"INSERT INTO ad.study_identifiers(sd_sid,
            identifier_value, identifier_type_id, identifier_org_id, identifier_org,
            identifier_date, identifier_link, record_hash)
            SELECT s.sd_sid, 
            identifier_value, identifier_type_id, identifier_org_id, identifier_org,
            identifier_date, identifier_link, record_hash
            FROM sd.study_identifiers s
            INNER JOIN sd.to_ad_study_recs ts
            ON s.sd_sid = ts.sd_sid
            where ts.status = 1";

            dbu.ExecuteTransferSQL(sql_string, "study_identifiers", "Adding");
        }

        public void TransferStudyTitles()
        {
            string sql_string = @"INSERT INTO ad.study_titles(sd_sid,
            title_type_id, title_text, lang_code, lang_usage_id,
            is_default, comments, comparison_text, record_hash)
            SELECT s.sd_sid, 
            title_type_id, title_text, lang_code, lang_usage_id,
            is_default, comments, comparison_text, record_hash
            FROM sd.study_titles s
            INNER JOIN sd.to_ad_study_recs ts
            ON s.sd_sid = ts.sd_sid
            where ts.status = 1";

            dbu.ExecuteTransferSQL(sql_string, "study_titles", "Adding");
        }

The call to an ExecuteTransferSQL routine in a database utility class (dbu) is required because for very large datasets it is necessary to break up the addition process, transferring records in 'chunks' (of 500,000) determined by their ids - otherwise the system can 'time out' waiting for the database to respond. The chunking is carried out by adding, when necessary, an additional where clause to the SQL that is passed into the function.

        public void ExecuteTransferSQL(string sql_string, string table_name, string context)
        {
            try
            {
                int rec_count = GetRecordCount(table_name);
                int rec_batch = 500000;
                if (rec_count > rec_batch)
                {
                    for (int r = 1; r <= rec_count; r += rec_batch)
                    {
                        string batch_sql_string = sql_string + " and s.id >= " + r.ToString() + " and s.id < " + (r + rec_batch).ToString();
                        ExecuteSQL(batch_sql_string);

                        string feedback = context + " " + table_name + " data, " + r.ToString() + " to ";
                        feedback += (r + rec_batch < rec_count) ? (r + rec_batch - 1).ToString() : rec_count.ToString();
                        StringHelpers.SendFeedback(feedback);
                    }
                }
                else
                {
                    ExecuteSQL(sql_string);
                    StringHelpers.SendFeedback(context + " " + table_name + " data, as a single batch");
                }
            }
            catch (Exception e)
            {
                string res = e.Message;
                StringHelpers.SendError("In data transfer (" + table_name + ") to ad table: " + res);
            }
        }


2) For unchanged studies the 'date of data fetch', a field in both study and data object records, is updated to match that in the sd data but no other changes are applied. This indicates the last date the data was examined, even if the data was unchanged. Note that the same update, of 'date of data fetch' is also made to edited records, and the data is contained automatically within new records. The code is shown below.

        public void UpdateDateOfStudyData()
        {
            string top_sql = @"with t as 
            (   
                select so.sd_sid, so.datetime_of_data_fetch 
                from sd.studies so
                inner join sd.to_ad_study_recs ts
                on so.sd_sid  = ts.sd_sid
                where ts.status in (2,3)";

            string base_sql = @")
            update ad.studies s
            set datetime_of_data_fetch = t.datetime_of_data_fetch
            from t
            where s.sd_sid = t.sd_sid";

            dbu.UpdateDateOfData("studies", top_sql, base_sql);
        }

        public void UpdateDateOfDataObjectData()
        {
           string top_sql = @"with t as
            (
                select so.sd_oid, so.datetime_of_data_fetch
                from sd.data_objects so
                inner join sd.to_ad_object_recs td
                on so.sd_oid = td.sd_oid
                where td.status in (2, 3)";

            string base_sql = @")
            update ad.data_objects s
            set datetime_of_data_fetch = t.datetime_of_data_fetch
            from t
            where s.sd_oid = t.sd_oid";

            dbu.UpdateDateOfData("data_objects", top_sql, base_sql);
        }

Again a function is called within a database utility class, so that if necessary the update process can be carried out in chunks of records. In this case the where clause referencing the ids is inserted in the middle of the SQL statement, hence the need to send two SQL fragments, top_sql and base_sql.

3) For edited studies, the nature of the edit is examined. If a change has occurred in the main (singleton) study record, that record is replaced with the new version from the sd data.

        public void EditStudies()
        {
            // if the record hash for the study has changed, then the data in the studies records should be changed

            string sql_string = @"update ad.studies a
              set
                  display_title = t.display_title,
                  title_lang_code = t.title_lang_code, 
                  brief_description = t.brief_description, 
                  bd_contains_html = t.bd_contains_html, 
                  data_sharing_statement = t.data_sharing_statement,
                  dss_contains_html = t.dss_contains_html, 
                  study_start_year = t.study_start_year,
                  study_start_month = t.study_start_month,
                  study_type_id = t.study_type_id, 
                  study_status_id = t.study_status_id,
                  study_enrolment = t.study_enrolment, 
                  study_gender_elig_id = t.study_gender_elig_id, 
                  min_age = t.min_age, 
                  min_age_units_id = t.min_age_units_id, 
                  max_age = t.max_age,
                  max_age_units_id = t.max_age_units_id, 
                  datetime_of_data_fetch = t.datetime_of_data_fetch,
                  record_hash = t.record_hash,
                  last_edited_on = current_timestamp
              from (select so.* from sd.studies so
                    INNER JOIN sd.to_ad_study_recs ts
                    ON so.sd_sid = ts.sd_sid ";

            string base_string = @" where ts.study_rec_status = 2) t
                          where a.sd_sid = t.sd_sid";

            dbu.EditEntityRecords(sql_string, base_string, "studies");
        }

Once again a utility class function is called to chunk the process if necessary.

        public void EditEntityRecords(string topstring, string basestring, string table_name)
        {
            try
            {
                int rec_count = GetRecordCount(table_name);
                int rec_batch = 100000;

                if (rec_count > rec_batch)
                {
                    for (int r = 1; r <= rec_count; r += rec_batch)
                    {
                        string batch_sql_string = topstring + " and so.id >= " + r.ToString() + " and so.id < " 
                                                + (r + rec_batch).ToString() + basestring;
                        ExecuteSQL(batch_sql_string);

                        string feedback = "Updating entity records for " + table_name + ", " + r.ToString() + " to ";
                        feedback += (r + rec_batch < rec_count) ? (r + rec_batch - 1).ToString() : rec_count.ToString();
                        StringHelpers.SendFeedback(feedback);
                    }
                }
                else
                {
                    ExecuteSQL(topstring + basestring);
                    StringHelpers.SendFeedback("Updating entity records for " + table_name + ", as a single batch");
                }
            }
            catch (Exception e)
            {
                string res = e.Message;
                StringHelpers.SendError("In updating entity records for " + table_name + ", sd to ad table: " + res);
            }
        }

If a change has occurred in a study attribute, all the attribute records of that type are replaced by all the attribute records of that type in the sd data. There is no attempt to try and match individual attribute records to see which specifically have been changed / added / deleted. The system knows, from the changed composite hash value, that an edit has happened somewhere in a set of attributes of a particular type. But because of the lack of persistent identifiers for the attribute records, and therefore the difficulty in trying to work out exactly what record(s) have changed, and / or have been added, and / or have been deleted, it is easier to replace the whole set of attributes. The system does this by first deleting the attributes and then inserting the new set. This strategy also means that if a completely new type of attribute appears for a study all the records of that attribute type are inserted, while if an attribute type completely disappears from a study all the corresponding attribute records are removed.
Example code is shown below - in this case for study identifiers. An initial call to GetStudyTString returns a clause that creates a CTE (Common Table Expression) that is used in both of the following steps. The full SQL statements for both deletion and insertion are then constructed. For deletion this is straightforward enough that a call can be made to a simple routine (GetStudyDeleteString) to return the code. For insertion the full field list (of the destination ad table) needs to be included. The call to the database utility class results in first the deletion and then the insertion SQL being run. The insertion routine again makes use of the ExecuteTransferSQL function (shown in part 1 of this section) to chunk the insertion process when necessary.

        public void EditStudyIdentifiers()
        {
            // Where the composite hash value indicates that a change has taken place in one or more 
            // of the records replace the whole set for the relevant studies
            // Composite hash id for study identifiers = 11

            string sql_string = dbu.GetStudyTString(11);
            string sql_stringD = sql_string + dbu.GetStudyDeleteString("study_identifiers");

            string sql_stringI = sql_string + @"INSERT INTO ad.study_identifiers(sd_sid,
            identifier_value, identifier_type_id, identifier_org_id, identifier_org,
            identifier_date, identifier_link, record_hash)
            SELECT s.sd_sid, 
            identifier_value, identifier_type_id, identifier_org_id, identifier_org,
            identifier_date, identifier_link, record_hash
            FROM sd.study_identifiers s
            INNER JOIN t
            on s.sd_sid = t.sd_sid";

            dbu.ExecuteDandI(sql_stringD, sql_stringI, "study_identifiers");
        }

        public string GetStudyTString(int type_id)
        {
            return @"with t as (
               SELECT sd_sid from 
               sd.to_ad_study_atts 
               WHERE hash_type_id = " + type_id.ToString() + ") ";
        }

        public string GetStudyDeleteString(string table_name)
        {
            return @" DELETE FROM ad." + table_name + @" a
            USING t
            WHERE a.sd_sid = t.sd_sid; ";
        }

        public void ExecuteDandI(string sql_string1, string sql_string2, string table_name)
        {
            using (var conn = new NpgsqlConnection(connstring))
            {
                conn.Execute(sql_string1);
                ExecuteTransferSQL(sql_string2, table_name, "Editing");
            }
        }


4) For deleted studies or objects, if it has been possible to identify these, the entire study or object, and all their attributes, are removed from the ad tables. The process is simple enough to be able to 'walk through' each of the relevant tables and delete the corresponding records from each, as shown in the study related function below.

 
        public void DeleteStudyRecords(string table_name)
        {
            string sql_string = @"with t as (
                  select sd_sid from sd.to_ad_study_recs
                  where status = 4)
              delete from ad." + table_name + @" a
              using t
              where a.sd_sid = t.sd_sid;";

            dbu.ExecuteSQL(sql_string);
        }

In fact, because deletion operations can only occur if the preceding harvest has been 100%, it is often easier and safer to use the -T operation for these imports, and simply recreate the ad tables and fill them directly from the sd data. This is a simpler and therefore less risky procedure.

5) The composite hash records in the ad tables also have to be updated to match those in the sd tables, to ensure future comparisons work as expected. This is relatively straightforward, although again for large datasets some chunking of the data may be required. The code for the object hashes is shown below.

public void UpdateObjectCompositeHashes()
        {
            // Need to ensure that the hashes themselves are all up to date (for the next comparison)
            // Change the ones that have been changed in sd
            // if a very large studies (and therefore hash) table may need to chunk using a link to the 
            // sd.data_objects table....

            string sql_string = @"UPDATE ad.object_hashes ah
                    set composite_hash = so.composite_hash
                    FROM 
                        (SELECT st.id, ia.sd_oid, ia.hash_type_id, ia.composite_hash
                         FROM sd.to_ad_object_atts ia
                         INNER JOIN sd.data_objects st
                         on ia.sd_oid = st.sd_oid
                         where ia.status = 2) so
                    WHERE ah.sd_oid = so.sd_oid
                    and ah.hash_type_id = so.hash_type_id ";

            dbu.EditStudyHashes("data_objects", sql_string);
        }

        public void AddNewlyCreatedObjectHashTypes()
        {
            // for new sd_sid / hash type combinations

            string sql_string = @"INSERT INTO ad.object_hashes(sd_oid, 
                 hash_type_id, composite_hash)
                 SELECT ia.sd_oid, ia.hash_type_id, ia.composite_hash
                 FROM sd.to_ad_object_atts ia
                 WHERE ia.status = 1";

            dbu.ExecuteSQL(sql_string);
            StringHelpers.SendFeedback("Inserting new object hashtype combinations in object hash records");
        }


        public void DropNewlyDeletedObjectHashTypes()
        {
            string sql_string = @"DELETE FROM ad.object_hashes sh
                 USING sd.to_ad_object_atts ia
                 WHERE sh.sd_oid = ia.sd_oid   
                 and sh.hash_type_id = ia.hash_type_id 
                 and ia.status = 4";

            dbu.ExecuteSQL(sql_string);
            StringHelpers.SendFeedback("Dropping deleted object hashtype combinations from object hash records");
        }


6) The study_full_hash and object_full_hash fields also need to be updated for the edited records. If the records themselves have been edited this will already have happened, but some edits only involve study or object attributes, and in these cases the study or object record itself will have remained the same apart from the date of data fetch field). It is therefore necessary to update this field. As usual, some chunking of the process may be required. The study

        public void UpdateFullStudyHash()
        {
            // Ensure study_full_hash is updated to reflect new value
            // The study record itself may not have changed, so the study
            // record update above cannot be used to make the edit. 

            string sql_string = @"UPDATE ad.studies a
                    set study_full_hash = so.study_full_hash
                    FROM sd.studies so
                    WHERE so.sd_sid = a.sd_sid ";

            // Chunked by the dbu routine to 100,000 records at a time

            dbu.UpdateFullHashes("studies", sql_string);
        }


7) The source_data_studies table (for study based sources) and source_data_objects table (for object based sources) - in the mon database - also need to be updated with the date of last import and the id of the import event. Unlike the download and harvesting processes, where these tracking tables are updated one record at a time, during import a SQL statement can be used to update all the records at once. The code for study based sources is shown below, with the call for object based sources being very similar. The 'last imported' data is updated for all the records that have been examined and updated in the ad tables - i.e. those with status 1 (newly added), 2 (edited in some way) and 3 (unchanged but date of data updated). This means that when an 'update' harvest is carried out, which examines all data files that have been downloaded since the last import, only those files that could have potentially changed since they were last 'examined' by the system, i.e. compared with the ad data, are required for the sd tables.

         public void UpdateStudiesLastImportedDate(int import_id, int source_id)
        {
            string top_string = @"Update mon_sf.source_data_studies src
                          set last_import_id = " + import_id.ToString() + @", 
                          last_imported = current_timestamp
                          from 
                             (select so.id, so.sd_sid 
                             FROM sd.studies so
                             INNER JOIN sd.to_ad_study_recs ts
                             ON so.sd_sid = ts.sd_sid
                             where ts.status in (1, 2, 3) 
                             ";
            string base_string = @" ) s
                              where s.sd_sid = src.sd_id and
                              src.source_id = " + source_id.ToString();

            dbu.UpdateLastImportedDate("studies", top_string, base_string);
        }

As with many of the calls above, the routine uses a routine in the database utility class that constructs the final SQL statement, 'chunking' where necessary - for very large tables - the update into separate calls by adding additional where constraints to the subquery, thereby stepping through groups of records using their Ids - in this case 100,000 at a time.