Importing Data
Contents
Introduction
The program takes the data in the session data (sd) tables in a source database (the 'session data' having been created by the most recent harvest operation), and compares it with the accumulated data for each source, stored in the accumulated data (ad) tables. New and revised data are then transferred to the ad tables. Unlike the preceding download and harvest steps, the import process is essentially the same for all data sources, because the sd and ad tables have the same structure in each database. The listing of tables present in each DB varies but not the table structures themselves.
N.B. The download code can be found at https://github.com/ecrin-github/DataImporter
Parameters
The system is a console app, (to more easily support being scheduled) and can take takes 2 parameters:
-s, followed by a comma separated list of integer ids, each representing a data source within the system: The program takes each of these ids in turn, and carries out the sd to ad import process for each of them.
-T: as a flag. If present, forces the recreation of a new set of accumulated data tables. This parameter should therefore only be used when creating the ad tables for the first time, or when the entire ad schema is recreated from harvested data that represents the totality of the data available from the source. In other words when carrying out a full rebuild of the source's data from a full harvest.
Thus, the parameter string
-s "100116, 100124, 100132"
will cause data to be imported from the session data tables for each of the Australian, German and Dutch trial registries, in that order.
The parameter string
-s "101900, 101901" -T
will cause, for the BioLinncc and Yoda repositories respectively, the current ad tables to be dropped and then rebuilt, before being filled with the most recent session data harvested from those repositories.
Note there is no parameter to allow using only part of the harvested data in the sd tables - all of the sd data is always used. The way in which different data is imported, e.g. data that has been revised or added after a certain date, is by controlling the data harvest processing that precedes the import step.
Overview
Identifying New or Changed Data
The initial stage of the import process is to recreate and fill a set of 4 'diff' tables, that summarise the differences between the sd data and the corresponding ad data. These tables are
- to_ad_study_recs - lists new, edited, unchanged and (possibly) deleted study records
- to_ad_study_atts - lists the details of edits carried out within study attributes
- to_ad_object_recs - lists new, edited, unchanged and (possibly) deleted data object records
- to_ad_object_atts - lists the details of edits carried out within data object attributes
All 4 tables are created in the sd schema, where they remain (to allow inspection) until the next harvest or import process.
The system fills the _recs tables by comparing the sd_sid (for studies) or sd_oid (for objects) fields. New study sd_sids are inserted into to_ad_study_recs with a status of 1, edited studies, identified by differences in the 'study_full_hash' fields of matching study records, are inserted with a status of 2, and unchanged studies with a status of 3. It is not possible to identify deleted studies, i.e. that exist in the ad table but not the sd data, unless the sd data represents 100% of the downloaded files, i.e. is a full harvest. The system checks if this is the case. If it is, it can insert deleted sd_sids - this time from the ad tables - into the to_ad_study_recs table with a status of 4. An exactly analogous process is used for the data object records, filling the to_ad_object_recs table. The code for the study records is shown below.
public void IdentifyNewStudies() { string sql_string = @"INSERT INTO sd.to_ad_study_recs (sd_sid, status) SELECT s.sd_sid, 1 from sd.studies s LEFT JOIN ad.studies a on s.sd_sid = a.sd_sid WHERE a.sd_sid is null;"; ExecuteSQL(sql_string); } public void IdentifyEditedStudies() { string sql_string = @"INSERT INTO sd.to_ad_study_recs (sd_sid, status) SELECT s.sd_sid, 2 from sd.studies s INNER JOIN ad.studies a on s.sd_sid = a.sd_sid where s.study_full_hash <> a.study_full_hash;"; ExecuteSQL(sql_string); } public void IdentifyIdenticalStudies() { string sql_string = @"INSERT INTO sd.to_ad_study_recs (sd_sid, status) SELECT s.sd_sid, 3 from sd.studies s INNER JOIN ad.studies a on s.sd_sid = a.sd_sid where s.study_full_hash = a.study_full_hash;"; ExecuteSQL(sql_string); } public void IdentifyDeletedStudies() { string sql_string = @"INSERT INTO sd.to_ad_study_recs(sd_sid, status) SELECT a.sd_sid, 4 from ad.studies a LEFT JOIN sd.studies s on a.sd_sid = s.sd_sid WHERE s.sd_sid is null;"; ExecuteSQL(sql_string); }
For studies and objects that have been edited the nature of the edit is then clarified - in particular whether or not it involves changes to the main study or object record, and / or if it involves an addition, edit, or deletion of one or more attribute records. This is done by comparing the record_hash fields of the edited study and object fields. If a change is found - indicating the main study or object record itself has been edited - then a value of 2 is inserted into the _rec_status field in the _recs tables. For data objects, an additional check is made in the same way to see if any dataset data (if it exists) has changed, because there is a one-to-one relationship between data object and object dataset data. If it has a value of 4 is inserted into the object_dataset_status field. the relevant code is shown below.
public void IdentifyChangedStudyRecs() { string sql_string = @"with t as ( select s.sd_sid from sd.studies s INNER JOIN ad.studies a on s.sd_sid = a.sd_sid where s.record_hash <> a.record_hash) UPDATE sd.to_ad_study_recs c SET study_rec_status = 2 from t WHERE t.sd_sid = c.sd_sid;"; ExecuteSQL(sql_string); } public void IdentifyChangedObjectRecs() { string sql_string = @"with t as ( select s.sd_oid from sd.data_objects s INNER JOIN ad.data_objects a on s.sd_oid = a.sd_oid where s.record_hash <> a.record_hash) UPDATE sd.to_ad_object_recs c SET object_rec_status = 2 FROM t WHERE t.sd_oid = c.sd_oid;"; ExecuteSQL(sql_string); } public void IdentifyChangedDatasetRecs() { string sql_string = @"with t as ( select s.sd_oid from sd.object_datasets s INNER JOIN ad.object_datasets a on s.sd_oid = a.sd_oid where s.record_hash <> a.record_hash) UPDATE sd.to_ad_object_recs c SET object_dataset_status = 4 FROM t WHERE t.sd_oid = c.sd_oid;"; ExecuteSQL(sql_string); }
Finally the _atts tables are filled. Amongst the edited records, if a study or object attribute record has changed in some way, the value of the 'composite hash' for that attribute type, for that study or object, will be different in the sd and ad data. The study or object ids, attribute types and composite hashes in such cases are all stored in the _atts tables with a status value of 2. If a study or object has attributes of a certain type when previously it had none of this type at all, there will be a new composite hash record, also stored in the _atts table with a status of 1. And finally, if a study or object used to have attributes of a certain type but now has none at all, the data (taken from the ad tables) is stored in the _atts table with a status of 4. The _atts tables therefore indicate which attribute types have been changed for any edited study or object. The code for the study data is shown below.
public void IdentifyChangedStudyAtts() { // Store as the sd_id and hash type of all changed composite hash values // in edited records - indicates that one or more of the attributes has changed. string sql_string = @"INSERT INTO sd.to_ad_study_atts (sd_sid, hash_type_id, status, composite_hash) select s.sd_sid, s.hash_type_id, 2, s.composite_hash from ad.temp_sd_study_hashes s INNER JOIN ad.temp_ad_study_hashes a on s.sd_sid = a.sd_sid and s.hash_type_id = a.hash_type_id where s.composite_hash <> a.composite_hash;"; ExecuteSQL(sql_string); } public void IdentifyNewStudyAtts() { // Stores the sd_id and hash type of a new ad_sid / hash type combinations, // indicates that one or more of new types of attributes have been added. string sql_string = @"INSERT INTO sd.to_ad_study_atts (sd_sid, hash_type_id, status, composite_hash) select s.sd_sid, s.hash_type_id, 1, s.composite_hash from ad.temp_sd_study_hashes s LEFT JOIN ad.temp_ad_study_hashes a on s.sd_sid = a.sd_sid and s.hash_type_id = a.hash_type_id where a.sd_sid is null;"; ExecuteSQL(sql_string); } public void IdentifyDeletedStudyAtts() { // Stores the sd_id and hash type of deleted ad_sid / hash type combinations, // indicates that one or more types of attributes have disappeared. string sql_string = @"INSERT INTO sd.to_ad_study_atts (sd_sid, hash_type_id, status, composite_hash) select a.sd_sid, a.hash_type_id, 4, a.composite_hash from ad.temp_sd_study_hashes s RIGHT JOIN ad.temp_ad_study_hashes a on s.sd_sid = a.sd_sid and s.hash_type_id = a.hash_type_id where s.sd_sid is null;"; ExecuteSQL(sql_string); }
At the end of this process the system has 4 tables that hold a complete record of the differences between the session data and accumulated data sets, which can then be used to direct the necessary changes inthe ad tables.
Actioning the changes
The system then works through the actions required, for each designated source, following a fairly simple set of rules.
1) Any new studies or objects are directly imported into the corresponding ad tables, along with all their attributes. Note that objects (in study based sources) always hold a reference to their 'parent' study sd_sid, which is a persistent identifier, so a new data object added to an existing study is still correctly matched. Examples of the sort of code involved - in this case for study identifiers and titles - are shown below.
public void TransferStudyIdentifiers() { string sql_string = @"INSERT INTO ad.study_identifiers(sd_sid, identifier_value, identifier_type_id, identifier_org_id, identifier_org, identifier_date, identifier_link, record_hash) SELECT s.sd_sid, identifier_value, identifier_type_id, identifier_org_id, identifier_org, identifier_date, identifier_link, record_hash FROM sd.study_identifiers s INNER JOIN sd.to_ad_study_recs ts ON s.sd_sid = ts.sd_sid where ts.status = 1"; dbu.ExecuteTransferSQL(sql_string, "study_identifiers", "Adding"); } public void TransferStudyTitles() { string sql_string = @"INSERT INTO ad.study_titles(sd_sid, title_type_id, title_text, lang_code, lang_usage_id, is_default, comments, comparison_text, record_hash) SELECT s.sd_sid, title_type_id, title_text, lang_code, lang_usage_id, is_default, comments, comparison_text, record_hash FROM sd.study_titles s INNER JOIN sd.to_ad_study_recs ts ON s.sd_sid = ts.sd_sid where ts.status = 1"; dbu.ExecuteTransferSQL(sql_string, "study_titles", "Adding"); }
The call to an ExecuteTransferSQL routine in a database utility class (dbu) is required because for very large datasets it is necessary to break up the addition process, transferring records in 'chunks' (of 500,000) determined by their ids - otherwise the system can 'time out' waiting for the database to respond. The chunking is carried out by adding, when necessary, an additional where clause to the SQL that is passed into the function.
public void ExecuteTransferSQL(string sql_string, string table_name, string context) { try { int rec_count = GetRecordCount(table_name); int rec_batch = 500000; if (rec_count > rec_batch) { for (int r = 1; r <= rec_count; r += rec_batch) { string batch_sql_string = sql_string + " and s.id >= " + r.ToString() + " and s.id < " + (r + rec_batch).ToString(); ExecuteSQL(batch_sql_string); string feedback = context + " " + table_name + " data, " + r.ToString() + " to "; feedback += (r + rec_batch < rec_count) ? (r + rec_batch - 1).ToString() : rec_count.ToString(); StringHelpers.SendFeedback(feedback); } } else { ExecuteSQL(sql_string); StringHelpers.SendFeedback(context + " " + table_name + " data, as a single batch"); } } catch (Exception e) { string res = e.Message; StringHelpers.SendError("In data transfer (" + table_name + ") to ad table: " + res); } }
2) For unchanged studies the 'date of data fetch', a field in both study and data object records, is updated to match that in the sd data but no other changes are applied. This indicates the last date the data was examined, even if the data was unchanged. Note that the same update, of 'date of data fetch' is also made to edited records, and the data is contained automatically within new records. The code is shown below.
public void UpdateDateOfStudyData() { string top_sql = @"with t as ( select so.sd_sid, so.datetime_of_data_fetch from sd.studies so inner join sd.to_ad_study_recs ts on so.sd_sid = ts.sd_sid where ts.status in (2,3)"; string base_sql = @") update ad.studies s set datetime_of_data_fetch = t.datetime_of_data_fetch from t where s.sd_sid = t.sd_sid"; dbu.UpdateDateOfData("studies", top_sql, base_sql); } public void UpdateDateOfDataObjectData() { string top_sql = @"with t as ( select so.sd_oid, so.datetime_of_data_fetch from sd.data_objects so inner join sd.to_ad_object_recs td on so.sd_oid = td.sd_oid where td.status in (2, 3)"; string base_sql = @") update ad.data_objects s set datetime_of_data_fetch = t.datetime_of_data_fetch from t where s.sd_oid = t.sd_oid"; dbu.UpdateDateOfData("data_objects", top_sql, base_sql); }
Again a function is called within a database utility class, so that if necessary the update process can be carried out in chunks of records. In this case the where clause referencing the ids is inserted in the middle of the SQL statement, hence the need to send two SQL fragments, top_sql and base_sql.
3) For edited studies, the nature of the edit is examined. If a change has occurred in the main (singleton) study record, that record is replaced with the new version from the sd data.
public void EditStudies() { // if the record hash for the study has changed, then the data in the studies records should be changed string sql_string = @"update ad.studies a set display_title = t.display_title, title_lang_code = t.title_lang_code, brief_description = t.brief_description, bd_contains_html = t.bd_contains_html, data_sharing_statement = t.data_sharing_statement, dss_contains_html = t.dss_contains_html, study_start_year = t.study_start_year, study_start_month = t.study_start_month, study_type_id = t.study_type_id, study_status_id = t.study_status_id, study_enrolment = t.study_enrolment, study_gender_elig_id = t.study_gender_elig_id, min_age = t.min_age, min_age_units_id = t.min_age_units_id, max_age = t.max_age, max_age_units_id = t.max_age_units_id, datetime_of_data_fetch = t.datetime_of_data_fetch, record_hash = t.record_hash, last_edited_on = current_timestamp from (select so.* from sd.studies so INNER JOIN sd.to_ad_study_recs ts ON so.sd_sid = ts.sd_sid "; string base_string = @" where ts.study_rec_status = 2) t where a.sd_sid = t.sd_sid"; dbu.EditEntityRecords(sql_string, base_string, "studies"); }
Once again a utility class function is called to chunk the process if necessary.
public void EditEntityRecords(string topstring, string basestring, string table_name) { try { int rec_count = GetRecordCount(table_name); int rec_batch = 100000; if (rec_count > rec_batch) { for (int r = 1; r <= rec_count; r += rec_batch) { string batch_sql_string = topstring + " and so.id >= " + r.ToString() + " and so.id < " + (r + rec_batch).ToString() + basestring; ExecuteSQL(batch_sql_string); string feedback = "Updating entity records for " + table_name + ", " + r.ToString() + " to "; feedback += (r + rec_batch < rec_count) ? (r + rec_batch - 1).ToString() : rec_count.ToString(); StringHelpers.SendFeedback(feedback); } } else { ExecuteSQL(topstring + basestring); StringHelpers.SendFeedback("Updating entity records for " + table_name + ", as a single batch"); } } catch (Exception e) { string res = e.Message; StringHelpers.SendError("In updating entity records for " + table_name + ", sd to ad table: " + res); } }
If a change has occurred in a study attribute, all the attribute records of that type are replaced by all the attribute records of that type in the sd data. There is no attempt to try and match individual attribute records to see which specifically have been changed / added / deleted. The system knows, from the changed composite hash value, that an edit has happened somewhere in a set of attributes of a particular type. But because of the lack of persistent identifiers for the attribute records, and therefore the difficulty in trying to work out exactly what record(s) have changed, and / or have been added, and / or have been deleted, it is easier to replace the whole set of attributes. The system does this by first deleting the attributes and then inserting the new set. This strategy also means that if a completely new type of attribute appears for a study all the records of that attribute type are inserted, while if an attribute type completely disappears from a study all the corresponding attribute records are removed.
Example code is shown below - in this case for study identifiers. An initial call to GetStudyTString returns a clause that creates a CTE (Common Table Expression) that is used in both of the following steps. The full SQL statements for both deletion and insertion are then constructed. For deletion this is straightforward enough that a call can be made to a simple routine (GetStudyDeleteString) to return the code. For insertion the full field list (of the destination ad table) needs to be included. The call to the database utility class results in first the deletion and then the insertion SQL being run. The insertion routine again makes use of the ExecuteTransferSQL function (shown in part 1 of this section) to chunk the insertion process when necessary.
public void EditStudyIdentifiers() { // Where the composite hash value indicates that a change has taken place in one or more // of the records replace the whole set for the relevant studies // Composite hash id for study identifiers = 11 string sql_string = dbu.GetStudyTString(11); string sql_stringD = sql_string + dbu.GetStudyDeleteString("study_identifiers"); string sql_stringI = sql_string + @"INSERT INTO ad.study_identifiers(sd_sid, identifier_value, identifier_type_id, identifier_org_id, identifier_org, identifier_date, identifier_link, record_hash) SELECT s.sd_sid, identifier_value, identifier_type_id, identifier_org_id, identifier_org, identifier_date, identifier_link, record_hash FROM sd.study_identifiers s INNER JOIN t on s.sd_sid = t.sd_sid"; dbu.ExecuteDandI(sql_stringD, sql_stringI, "study_identifiers"); } public string GetStudyTString(int type_id) { return @"with t as ( SELECT sd_sid from sd.to_ad_study_atts WHERE hash_type_id = " + type_id.ToString() + ") "; } public string GetStudyDeleteString(string table_name) { return @" DELETE FROM ad." + table_name + @" a USING t WHERE a.sd_sid = t.sd_sid; "; } public void ExecuteDandI(string sql_string1, string sql_string2, string table_name) { using (var conn = new NpgsqlConnection(connstring)) { conn.Execute(sql_string1); ExecuteTransferSQL(sql_string2, table_name, "Editing"); } }
4) For deleted studies or objects, if it has been possible to identify these, the entire study and all attributes are removed from the ad tables. In fact because deletion operations can only occur if the preceding harvest has been 100%, it is often easier and safer to use the -T operation for these imports, and simply recreate the ad tables and fill them directly from the sd data. This is a simpler and therefore less risky procedure.
5) The composite hash records in the ad tables also have to be updated to match those in the sd tables, to ensure future comparisons work as expected. This is relatively straightforward, although again for large datasets some chunking of the data may be required. The code for the object hashes is shown below.
public void UpdateObjectCompositeHashes() { // Need to ensure that the hashes themselves are all up to date (for the next comparison) // Change the ones that have been changed in sd // if a very large studies (and therefore hash) table may need to chunk using a link to the // sd.data_objects table.... string sql_string = @"UPDATE ad.object_hashes ah set composite_hash = so.composite_hash FROM (SELECT st.id, ia.sd_oid, ia.hash_type_id, ia.composite_hash FROM sd.to_ad_object_atts ia INNER JOIN sd.data_objects st on ia.sd_oid = st.sd_oid where ia.status = 2) so WHERE ah.sd_oid = so.sd_oid and ah.hash_type_id = so.hash_type_id "; dbu.EditStudyHashes("data_objects", sql_string); } public void AddNewlyCreatedObjectHashTypes() { // for new sd_sid / hash type combinations string sql_string = @"INSERT INTO ad.object_hashes(sd_oid, hash_type_id, composite_hash) SELECT ia.sd_oid, ia.hash_type_id, ia.composite_hash FROM sd.to_ad_object_atts ia WHERE ia.status = 1"; dbu.ExecuteSQL(sql_string); StringHelpers.SendFeedback("Inserting new object hashtype combinations in object hash records"); } public void DropNewlyDeletedObjectHashTypes() { string sql_string = @"DELETE FROM ad.object_hashes sh USING sd.to_ad_object_atts ia WHERE sh.sd_oid = ia.sd_oid and sh.hash_type_id = ia.hash_type_id and ia.status = 4"; dbu.ExecuteSQL(sql_string); StringHelpers.SendFeedback("Dropping deleted object hashtype combinations from object hash records"); }
6) The study_full_hash and object_full_hash fields also need to be updated for the edited records. If the records themselves have been edited this will already have happened, but some edits only involve study or object attributes, and in these cases the study or object record itself will have remained the same apart from the date of data fetch field). It is therefore necessary to update this field. As usual, some chunking of the process may be required. The study
public void UpdateFullStudyHash() { // Ensure study_full_hash is updated to reflect new value // The study record itself may not have changed, so the study // record update above cannot be used to make the edit. string sql_string = @"UPDATE ad.studies a set study_full_hash = so.study_full_hash FROM sd.studies so WHERE so.sd_sid = a.sd_sid "; // Chunked by the dbu routine to 100,000 records at a time dbu.UpdateFullHashes("studies", sql_string); }
7) The source_data_studies table (for study based sources) and source_data_objects table (for object based sources) - in the mon database - also need to be updated with the date of last import and the id of the import event. Unlike the download and harvesting processes, where these tracking tables are updated one record at a time, durinmg import a SQL statement can be used to update all the records at once. The code for edited study based sources is shown below, but a similar call is also made when importing new studies, and for edited and added objects. The 'last imported' date does not change if the data is unchanged.
public void UpdateStudiesLastImportedDate(int import_id, int source_id) { string top_string = @"Update mon_sf.source_data_studies src set last_import_id = " + import_id.ToString() + @", last_imported = current_timestamp from (select so.id, so.sd_sid FROM sd.studies so INNER JOIN sd.to_ad_study_recs ts ON so.sd_sid = ts.sd_sid "; string base_string = @" where s.sd_sid = src.sd_id and src.source_id = " + source_id.ToString(); dbu.UpdateLastImportedDate("studies", top_string, base_string, "Editing"); }