Difference between revisions of "Harvesting Data"

From ECRIN-MDR Wiki
Jump to navigation Jump to search
(Overview of harvesting process)
(Updating contextual data)
 
(9 intermediate revisions by the same user not shown)
Line 158: Line 158:
  
 
===Updating contextual data===
 
===Updating contextual data===
* After the harvest has created the session data in the sd tables, it is necessary to update organisation data in various tables. This uses an internal database of organisations to try and identify as many as possible of the cited organisations in a standardised form - i.e. with a default name and a system id. Without this step the organisation data becomes difficult to interrogate, as the same organisation can be present in the system in so many different forms.
+
After the harvesting process has created the session data in the sd tables, it is necessary to update the organisation data in various tables. The problem is that organisations are often labelled with a wide variety of names (see '''[[Contextual Data]]'''). The system tries to standardise these wherever possible - replacing all variants of an organisation's name with the standard or default version, and in the process also adding the organisation's id, as it exists in the system's context tables. Developing a list of organisations and their various aliases is a time consuming task, so not all organisation names are able to be standardised and coded, but the proportion is slowly increasing. <br/>
* Topic data is also standardised as much as possible, with the application of MESH codes (to complement the MESH coding that exists inthe source data) to as many topic terms as possible.
+
The tables and fields that are processed include
* Once all data has been harvested and updated, the final steps involve the creation of md5 hashes for the key fields in the sd tables. For attribute records, for both studies and data objects, these hashes are then 'rolled up' to form a single hash for each set of attribute data, and then finally all the hashes for a single study or data object are rolled up into a single 'full hash' for that entity. These hash values play a critical role in identifying edited data during the data import process (see the MDR wiki for more details).
+
* The identifier_org_id and identifier_org in study identifier records - i.e. the organisation that assigned the identifier
 +
* The organisation_id and organisation_name in study contributor records - i.e. usually the listed sponsors and funders
 +
* The managing_org_id and managing_org in data object records - i.e. the organisation responsible for the access to that data object.
 +
The process is similar in all cases. The harvested names are checked first against the default names, and then the recorded 'other names', to find the relevant organisation id. The default name, including where appropriate a 'display suffix', is then applied to all fields where the id has been found. The code for the data object records is shown below.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void update_data_objects_using_default_name()
 +
        {
 +
            string sql_string = @"update sd.data_objects d
 +
            set managing_org_id = g.id
 +
            from context_ctx.organisations g
 +
            where lower(d.managing_org) = lower(g.default_name)
 +
            and d.managing_org_id is null;";
 +
 
 +
            using (var conn = new NpgsqlConnection(db_conn))
 +
            {
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
 
 +
        public void update_data_objects_using_other_name()
 +
        {
 +
            string sql_string = @"update sd.data_objects d
 +
            set managing_org_id = a.org_id
 +
            from context_ctx.org_other_names a
 +
            where lower(d.managing_org) = lower(a.other_name)
 +
            and d.managing_org_id is null;";
 +
 
 +
            using (var conn = new NpgsqlConnection(db_conn))
 +
            {
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
 
 +
        public void update_data_objects_insert_default_names()
 +
        {
 +
            string sql_string = @"update sd.data_objects d
 +
            set managing_org = g.default_name ||
 +
            case when g.display_suffix is not null and trim(g.display_suffix) <> '' then ' (' || g.display_suffix || ')'
 +
            else '' end
 +
            from context_ctx.organisations g
 +
            where d.managing_org_id = g.id;";
 +
 
 +
            using (var conn = new NpgsqlConnection(db_conn))
 +
            {
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
In each case the system also stores a list of the organisation names that were not matched, in the context database, so that they can be added to the context tables at a later date.<br/><br/>
 +
Topic data are also standardised as much as possible, with the application of MESH codes to as many topic terms as possible. Because the Pubmed objects and most of the ClinicalTrials.gov entries already have associated MESH codes, it makes sense to try and extend the MESH code coverage where possible. The code is shown below - very large topic collections are broken up into batches of 500,000 to prevent problems with the database connection timing out on a lengthy query (some sources can have up to 6 million topic records). Again unmatched topic terms are stored to see if they can be manually mapped to a MESH term at a later date.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void update_topics(string source_type)
 +
        {
 +
            // Can be difficult to do ths with large datasets.
 +
 
 +
            int rec_count = 0;
 +
            int rec_batch = 500000;
 +
            string sql_string = @"select count(*) from sd.";
 +
            sql_string += source_type.ToLower() == "study"
 +
                                ? "study_topics;"
 +
                                : "object_topics;";
 +
 
 +
            using (var conn = new NpgsqlConnection(db_conn))
 +
            {
 +
                rec_count = conn.ExecuteScalar<int>(sql_string);
 +
            }
 +
 
 +
            // In some cases mesh codes may be overwritten if
 +
            // they do not conform entirely (in format) with the mesh list.
 +
 
 +
            sql_string = @"Update ";
 +
            sql_string += source_type.ToLower() == "study"
 +
                                ? "sd.study_topics t "
 +
                                : "sd.object_topics t ";
 +
            sql_string += @" set topic_code = m.code,
 +
                            topic_value = m.term,
 +
                            mesh_coded = true
 +
                            from context_ctx.mesh_lookup m
 +
                            where lower(t.topic_value) = m.entry";
 +
            try
 +
            {
 +
                if (rec_count > rec_batch)
 +
                {
 +
                    for (int r = 1; r <= rec_count; r += rec_batch)
 +
                    {
 +
                        string batch_sql_string = sql_string + " and id >= " + r.ToString() + " and id < " + (r + rec_batch).ToString();
 +
                        ExecuteSQL(batch_sql_string);
 +
                        string feedback = "Updating topic codes, " + r.ToString() + " to ";
 +
                        feedback += (r + rec_batch < rec_count) ? (r + rec_batch).ToString() : rec_count.ToString();
 +
                        StringHelpers.SendFeedback(feedback);
 +
                    }
 +
                }
 +
                else
 +
                {
 +
                    ExecuteSQL(sql_string);
 +
                    StringHelpers.SendFeedback("Updating topic codes - as a single batch");
 +
                }
 +
            }
 +
            catch (Exception e)
 +
            {
 +
                string res = e.Message;
 +
                StringHelpers.SendError("In update_topics: " + res);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
Once all data has been harvested and updated, the final steps involve the creation of md5 hashes for the key fields in the sd tables. The data carrying fields, common to both sd and ad table versions but excluding record identifiers, are combined into an array which is then hashed, to provide a 'record_hash' for each row in each table. For attribute records, for both studies and data objects, the hashes of each attribute type, for each study or object, are then 'rolled up' to form a single hash for each set of attribute data, and then finally all the hashes for a single study or data object are rolled up into a single 'full hash' for that entity. The process is analogous to forming a Merkel tree during block chain construction.  These hash values play a critical role in identifying edited data during the data import process (see '''[[Missing PIDs and Hashing]]''').

Latest revision as of 15:30, 17 November 2020

Introduction

In the harvest phase the XML files for a data source, or more normally a subset (which one is controlled by the associated parameters) are converted into data in the 'sd' schema (= session data) tables within each source database. On each harvest 'run' the sd tables are dropped and created anew, and thus only ever contain the data from the most recent harvest. The tables present will vary in different databases, though if a table is present it will have a consistent structure in every database. The conversion to sd data therefore represents the second and final stage of the conversion of the source data into the consistent ECRIN schema. For that reason the detailed code for different sources can vary widely.

N.B. The harvest code can be found at https://github.com/ecrin-github/DataHarvester

Parameters

The system is a console app, and thus easily schedulable, and takes the following parameters:
-s, followed by a comma delimited string of integer ids, each representing a source: The source systems to be harvested.
-t, followed by 1 or 2: indicates the type of harvesting to be carried out:
If 1, the harvesting will be of all files in the source folder, representing 100% of the data that has been downloaded.
If 2, the harvest is only of files that have been (re-)downloaded, because they represent new or changed source data, after the date time of the last import process. Note that it is the last import event (for that source) which is important here, not the last harvest event. Multiple harvests between imports do not affect, therefore, the files that are harvested.
-G: is a flag that can be applied that prevents a normal harvest occurring, so that the sd tables are not recreated and reloaded. Instead they are updated using revised contextual data, so that - for example - organisation Ids and topic data codes can be re-applied. The option provides a relatively efficient way of updating data, though obviously works better if preceded with a type 1 full harvest of all data. Because the data is revised the various composite hash values that summarise data content also have to be re-created (see Missing PIDs and Hashing).

Thus, the parameter string
     -s "100120" -t2
will harvest data from source 100120 (ClinicalTrials.gov)that has been (re-)downloaded since the last import process, but the parameter string
     -s "100126" -t1
will harvest all the data from source 100126 (ISRCTN) The parameter string
      -s "101900, 101901" -G
will update the organisation, topic and other context related data for BioLincc and Yoda sd data, but will not re-harvest that data first.

Overview of harvesting process

Unless the -G flag has been applied the initial stage is to recreate the sd tables, to receive the harvested data. The system fires off a series of SQL statements that first drops all sd tables and then drops them. After that...
The program selects the relevant source data records from the monitor database - depending on the t parameter. For type 1 all records are selected, though for the larger sources this is done in batches, for type 2 the date time of the last import process - also available from the monitor database - is used to compare against the datetime of the XML file's download / production. Only files downloaded or created since the last import are harvested. An excerpt from the relevant code is provided in Logging and Tracking
The program then loops through each of the source XML files, using the local path value in the source data records to locate the file.
The xml file is deserialised into a C# object for further processing. The processing that follows is very different for different sources, although all the sources that are derived from WHO ICTRP files are processed in a very similar way. Most current sources are study based, so the usual pattern of processing is to identify the main study data points from the source data first, and then the study attributes (identifiers, titles, topics, features, contributors etc.) and then the associated data objects (registry entries, results entries, linked documents, etc.) and their attributes (instance details, titles, dates, etc.).
Cleaning of common data / formatting errors occurs as part of the harvesting process, for example the removal of values that inappropriately say 'none' or 'nil' for common study variables, or the extraction of other registry ids from the strings in which they can be buried (using regular expressions).
In each case the requirement is to end up with the data that is compatible with the ECRIN metadata schema. For some sources the harvest step is the second part of this conversion process, the first taking place during file generation. For others, where data are downloaded as pre-formed XML files from the source - as for ClinicaTrials.gov and PubMed - all the conversion process takes place in the harvesting step. The end product (for study based data sources) is a study object (shown below) that can then be transferred to the database tables. Note that for a study based source the data objects are repeating attributes of the study. All attributes are present as List<> collections. In each case the object definition within the collection (StudyContributor, ObjectTitle, etc.) matches the corresponding database table.

    public class Study
    {
        public string sd_sid { get; set; }
        public string display_title { get; set; }
        public string title_lang_code { get; set; }

        public string brief_description { get; set; }
        public bool bd_contains_html { get; set; }
        public string data_sharing_statement { get; set; }
        public bool dss_contains_html { get; set; }
        public int? study_start_year { get; set; }
        public int? study_start_month { get; set; }

        public int? study_type_id { get; set; }
        public string study_type { get; set; }
        public int? study_status_id { get; set; }
        public string study_status { get; set; }
        public int? study_enrolment { get; set; }
        public int? study_gender_elig_id { get; set; }
        public string study_gender_elig { get; set; }

        public int? min_age { get; set; }
        public int? min_age_units_id { get; set; }
        public string min_age_units { get; set; }
        public int? max_age { get; set; }
        public int? max_age_units_id { get; set; }
        public string max_age_units { get; set; }

        public DateTime? datetime_of_data_fetch { get; set; }

        public List<StudyIdentifier> identifiers { get; set; }
        public List<StudyTitle> titles { get; set; }
        public List<StudyContributor> contributors { get; set; }
        public List<StudyReference> references { get; set; }
        public List<StudyTopic> topics { get; set; }
        public List<StudyFeature> features { get; set; }
        public List<StudyRelationship> relationships { get; set; }
        public List<StudyLink> studylinks { get; set; }
        public List<AvailableIPD> ipd_info { get; set; }

        public List<DataObject> data_objects { get; set; }
        public List<ObjectDataset> object_datasets { get; set; }
        public List<ObjectTitle> object_titles { get; set; }
        public List<ObjectDate> object_dates { get; set; }
        public List<ObjectInstance> object_instances { get; set; }
    }

The data is then transferred to the database. The study data points themselves are stored, as a single study object, in the studies table, but all study attributes, data objects and object attributes are stored as a group of records in the matchingtable, using a set of pre-defined 'copy-helpers' and the PostgreSQLCopyHelper nuget package (see https://github.com/PostgreSQLCopyHelper/PostgreSQLCopyHelper). This allows multiple data objects, of the same type, to be serialised into a database quickly and simply.
As an example, the copy helper function for study identifiers is shown below - essentially it maps object properties to database fields. Below that is the call (towards the end of the processing of each set of study data) that requests the data repository layer to store the collected study identifiers, and finally there is the repository function itself, which inserts all records using a single 'SaveAll' call rather than looping through a collection.

   .. set up copy helper function
    public static class StudyCopyHelpers
    {
        public static PostgreSQLCopyHelper<StudyIdentifier> study_ids_helper =
            new PostgreSQLCopyHelper<StudyIdentifier>("sd", "study_identifiers")
                .MapVarchar("sd_sid", x => x.sd_sid)
                .MapVarchar("identifier_value", x => x.identifier_value)
                .MapInteger("identifier_type_id", x => x.identifier_type_id)
                .MapVarchar("identifier_type", x => x.identifier_type)
                .MapInteger("identifier_org_id", x => x.identifier_org_id)
                .MapVarchar("identifier_org", x => x.identifier_org)
                .MapVarchar("identifier_date", x => x.identifier_date)
                .MapVarchar("identifier_link", x => x.identifier_link);

        ...
        ...
     }

      ... call storage function referencing copyhelper  
      if (s.identifiers.Count > 0)
      {
          repo.StoreStudyIdentifiers(StudyCopyHelpers.study_ids_helper,
                                          s.identifiers);
      }

      ... carry out storage operation
      public ulong StoreStudyIdentifiers(PostgreSQLCopyHelper<StudyIdentifier> copyHelper, IEnumerable<StudyIdentifier> entities)
      {
          using (var conn = new NpgsqlConnection(connString))
          {
              conn.Open();
              return copyHelper.SaveAll(conn, entities);
          }
      }

For PubMed data in particular substantial additional processing is necessary. For instance the source data contains information about the journal but not the publisher of the article (the 'managing organisation') - that has to be obtained from separate lookup processes using contextual databases, that take place after the main harvesting process, using pissn and eissn numbers in the PubMed files.
Because PubMed is an object based data source in this case it is necessary for harvesting to generate a data object (in this context known as a citation object). The object's structure is shown below. The data object is then transferred to the database using a similar mechanism as for the study object above.

    public class CitationObject
    {
        public string sd_oid { get; set; }
        public string display_title { get; set; }
        public string version { get; set; }
        public string doi { get; set; }
        public int doi_status_id { get; set; }
        public int? publication_year { get; set; }
        public int? managing_org_id { get; set; }
        public string managing_org { get; set; }
        public string lang_code { get; set; }
        public int? access_type_id { get; set; }
        public string access_type { get; set; }
        public string access_details { get; set; }
        public string access_details_url { get; set; }
        public DateTime? url_last_checked { get; set; }
        public DateTime? datetime_of_data_fetch { get; set; }
        public string abstract_status { get; set; }
        public string pub_model { get; set; }
        public string publication_status { get; set; }
        public string journal_title { get; set; }
        public string pissn { get; set; }
        public string eissn { get; set; }

        public List<string> language_list { get; set; }
        public List<ObjectDate> article_dates { get; set; }
        public List<ObjectTitle> article_titles { get; set; }
        public List<ObjectIdentifier> article_identifiers { get; set; }
        public List<ObjectTopic> article_topics { get; set; }
        public List<ObjectPublicationType> article_pubtypes { get; set; }
        public List<ObjectDescription> article_descriptions { get; set; }
        public List<ObjectInstance> article_instances { get; set; }
        public List<ObjectContributor> article_contributors { get; set; }
        public List<ObjectComment> article_comments { get; set; }
        public List<ObjectDBLink> article_db_ids { get; set; }
    }

Updating contextual data

After the harvesting process has created the session data in the sd tables, it is necessary to update the organisation data in various tables. The problem is that organisations are often labelled with a wide variety of names (see Contextual Data). The system tries to standardise these wherever possible - replacing all variants of an organisation's name with the standard or default version, and in the process also adding the organisation's id, as it exists in the system's context tables. Developing a list of organisations and their various aliases is a time consuming task, so not all organisation names are able to be standardised and coded, but the proportion is slowly increasing.
The tables and fields that are processed include

  • The identifier_org_id and identifier_org in study identifier records - i.e. the organisation that assigned the identifier
  • The organisation_id and organisation_name in study contributor records - i.e. usually the listed sponsors and funders
  • The managing_org_id and managing_org in data object records - i.e. the organisation responsible for the access to that data object.

The process is similar in all cases. The harvested names are checked first against the default names, and then the recorded 'other names', to find the relevant organisation id. The default name, including where appropriate a 'display suffix', is then applied to all fields where the id has been found. The code for the data object records is shown below.

        public void update_data_objects_using_default_name()
        {
            string sql_string = @"update sd.data_objects d
            set managing_org_id = g.id
            from context_ctx.organisations g
            where lower(d.managing_org) = lower(g.default_name)
            and d.managing_org_id is null;";

            using (var conn = new NpgsqlConnection(db_conn))
            {
                conn.Execute(sql_string);
            }
        }

        public void update_data_objects_using_other_name()
        {
            string sql_string = @"update sd.data_objects d
            set managing_org_id = a.org_id
            from context_ctx.org_other_names a
            where lower(d.managing_org) = lower(a.other_name)
            and d.managing_org_id is null;";

            using (var conn = new NpgsqlConnection(db_conn))
            {
                conn.Execute(sql_string);
            }
        }

        public void update_data_objects_insert_default_names()
        {
            string sql_string = @"update sd.data_objects d
            set managing_org = g.default_name ||
            case when g.display_suffix is not null and trim(g.display_suffix) <> '' then ' (' || g.display_suffix || ')'
            else '' end
            from context_ctx.organisations g
            where d.managing_org_id = g.id;";

            using (var conn = new NpgsqlConnection(db_conn))
            {
                conn.Execute(sql_string);
            }
        }

In each case the system also stores a list of the organisation names that were not matched, in the context database, so that they can be added to the context tables at a later date.

Topic data are also standardised as much as possible, with the application of MESH codes to as many topic terms as possible. Because the Pubmed objects and most of the ClinicalTrials.gov entries already have associated MESH codes, it makes sense to try and extend the MESH code coverage where possible. The code is shown below - very large topic collections are broken up into batches of 500,000 to prevent problems with the database connection timing out on a lengthy query (some sources can have up to 6 million topic records). Again unmatched topic terms are stored to see if they can be manually mapped to a MESH term at a later date.

        public void update_topics(string source_type)
        {
            // Can be difficult to do ths with large datasets.

            int rec_count = 0;
            int rec_batch = 500000;
            string sql_string = @"select count(*) from sd.";
            sql_string += source_type.ToLower() == "study"
                                ? "study_topics;"
                                : "object_topics;";

            using (var conn = new NpgsqlConnection(db_conn))
            {
                rec_count = conn.ExecuteScalar<int>(sql_string);
            }

            // In some cases mesh codes may be overwritten if 
            // they do not conform entirely (in format) with the mesh list.

            sql_string = @"Update ";
            sql_string += source_type.ToLower() == "study"
                                ? "sd.study_topics t "
                                : "sd.object_topics t ";
            sql_string += @" set topic_code = m.code,
                             topic_value = m.term,
                             mesh_coded = true
                             from context_ctx.mesh_lookup m
                             where lower(t.topic_value) = m.entry";
            try
            {
                if (rec_count > rec_batch)
                {
                    for (int r = 1; r <= rec_count; r += rec_batch)
                    {
                        string batch_sql_string = sql_string + " and id >= " + r.ToString() + " and id < " + (r + rec_batch).ToString();
                        ExecuteSQL(batch_sql_string);
                        string feedback = "Updating topic codes, " + r.ToString() + " to ";
                        feedback += (r + rec_batch < rec_count) ? (r + rec_batch).ToString() : rec_count.ToString();
                        StringHelpers.SendFeedback(feedback);
                    }
                }
                else
                {
                    ExecuteSQL(sql_string);
                    StringHelpers.SendFeedback("Updating topic codes - as a single batch");
                }
            }
            catch (Exception e)
            {
                string res = e.Message;
                StringHelpers.SendError("In update_topics: " + res);
            }
        }

Once all data has been harvested and updated, the final steps involve the creation of md5 hashes for the key fields in the sd tables. The data carrying fields, common to both sd and ad table versions but excluding record identifiers, are combined into an array which is then hashed, to provide a 'record_hash' for each row in each table. For attribute records, for both studies and data objects, the hashes of each attribute type, for each study or object, are then 'rolled up' to form a single hash for each set of attribute data, and then finally all the hashes for a single study or data object are rolled up into a single 'full hash' for that entity. The process is analogous to forming a Merkel tree during block chain construction. These hash values play a critical role in identifying edited data during the data import process (see Missing PIDs and Hashing).