Difference between revisions of "Aggregating Data"

From ECRIN-MDR Wiki
Jump to navigation Jump to search
(Initial Setup)
(The Aggregation Process)
 
(65 intermediate revisions by the same user not shown)
Line 1: Line 1:
===Introduction===
+
<p style="color:blue; text-align:right"><small>'''''Last updated: 05/03/2022 - edit still in progress'''''</small></p>
The program takes all the data within the ad tables in the various source databases and loads it to central tables within the mdr database, dealing with multiple entries for studies and creating the link data information between studies and data objects. The aggregated data is held within tables in the st (study), ob (object) and nk (links) schemas. A fourth schema, 'core' is then populated as a direct import from the others, to provide a single simplified mdr dataset that can be exported to other systems. <br/>
+
 
Note that the aggregation process starts from scratch each time - there is no attempt to edit existing data. All of the central tables in the st, ob and nk schemas are dropped and then re-created during the main aggregation processes (triggered by -D). All of the core tables are dropped and re-created when the data is transferred to the core schema (triggered by -C). This makes each aggregation longer (it takes about 1 hour in total) but simplifies the processes involved, allowing a focus on the aggregation itself, without a need to consider updates and edits, and it makes the system much easier to maintain.<br/>
+
===Overview===
Although an aggregation can be run at any time it makes most sense to do so after the following sequence of events:
+
The aggregation program takes all the data within the '''ad''' tables in the various source databases and loads it to central tables within the mdr database, ultimately transferring it to a set of ‘'''core'''’ tables. The core tables hold all of the MDR data in a structure that exactly reflects the ECRIN metadata schema, and it is the core tables, or the JSON derived from them, that are interrogated by the web portal when users search and filter the MDR data. The aggregation process takes place weekly and takes about an hour to run.<br/>
* Downloads are run for all data sources to get the local source file collections as up to date as possible.
+
Some studies are to be found in multiple source systems, and some objects (mainly published  papers) may be linked to more than one study. The aggregation process needs to de-duplicate these multiple entries, so that the core system contains only one record for each study or object entity. At the same time it needs, as far as possible, to aggregate the different data points and links for each entity, even when they originate in different source systems.<br/>
* Contextual data is updated and / or augmented, so far as resources and time allow. (This is a gradual process).
+
Within the mdr tables all studies and data objects are assigned an integer accession number, their ‘'''ECRIN ID'''’, an internal, persistent ID that is quite distinct from any of the original IDs assigned by, for example, a trial registry, data repository, funder, sponsor, or bibliographic system. Studies have a 7 figure ID, beginning with 2000001, objects an 8 digit number beginning with 10000001. These IDs are used internally by the MDR system to link studies with their attributes, objects with their attributes, and studies and objects to each other. They are not normally exposed to the user.<br/>
* Harvests and imports are run for all study based sources to get the 'baseline' study data as up to date as possible, in each of the study source databases.
+
While the internal ECRIN IDs are persistent, the aggregated data is not. On each aggregation almost all the tables in the mdr database are dropped and then recreated, empty (the exceptions are the ‘master lists’ of study and object IDs). During the aggregation process the different sources are considered in a fixed sequence, and data is added from each in turn. Duplicate studies and objects are identified within this process and managed appropriately. Studies and objects that already have an ECRIN ID, from previous aggregations, are matched once more to that ID. New studies and objects, being aggregated for the first time, are assigned new IDs, in numerical sequence. There is no attempt to edit the aggregated data ‘in place’ with new versions of data replacing old. Re-aggregating all the available data from thew beginning takes longer, but it is a much simpler process and therefore easier to maintain.<br/>
* Harvests and imports are run for all object based sources to get additional data object data as up to date as possible, in each of the object source databases.
+
There are 4 sets of tables in the mdr database, each corresponding to a database schema:
In other words the aggregation module should be scheduled to run after the application of all other modules.<br/>
+
* The '''st''' schema tables are used to hold all the study related data imported from the source databases, but de-duplicated so that each study only appears once in the system,
<br/>
+
* The '''ob''' schema tables are used to hold all object related data, similarly imported from the sources and de-duplicated, so that each object is unique,
''N.B. The aggregation code can be found at https://github.com/ecrin-github/DataAggregator''
+
* The '''nk''' schema tables are used to hold all data related to links between studies and objects, and between studies and studies, and the master lists of study and object ids. Those master lists include the data that links the IDs of studies and objects in the source databases with the ECRIN IDs used within the mdr database.
<br/>
+
* The '''core''' schema tables hold a full set of all study and object data, and a single table with the study-object links, Data in the core is derived from that in the other three schemas, with some slight modifications so that it matches the ECRIN metadata schema more exactly (e.g. provenance data is added), together with some additional data added to better support search functionality.
 +
Although an aggregation can be run at any time it makes most sense to do so after the following sequence of events:<br/>
 +
&nbsp;&nbsp;a)&nbsp;&nbsp; Downloads are run for all study based data sources to get the local source file collections as up to date as possible.<br/>
 +
&nbsp;&nbsp;b)&nbsp;&nbsp; Harvests and imports are run for all study based sources to get the study data as up to date as possible, in each of the study source databases.<br/>
 +
&nbsp;&nbsp;c)&nbsp;&nbsp; Downloads are run for all object based data sources to get the local source file collections as up to date as possible. In practice ‘object based data sources’ currently means PubMed. This process has to occur after b) as the imported study data is used to identify some of the linked PubMed records.<br/>
 +
&nbsp;&nbsp;d)&nbsp;&nbsp; Harvests and imports are run for all object based sources to get data object data as up to date as possible, in each of the object source databases (currently PubMed).<br/>
 +
In other words the aggregation module should be scheduled to run after the application of all other modules – it is the final stage in the weekly update cycle.<br/>
 +
N.B. The aggregation code can be found at https://github.com/ecrin-github/DataAggregator<br/>
  
 
===Parameters===
 
===Parameters===
The program is a console app, to enable it to be more easily scheduled. There are a variety of flag type parameters, that can be used alone or in combination (though only some combinations make sense). These include:<br/>
+
The program is a console application, to enable it to be more easily scheduled. There are 4 flag type parameters:<br/>
'''-D''': which indicates that the aggregating data transfer should take place, from the source ad tables to the tables in the st (studies), ob (objects) and nk (links) schemas. This is the necessary first step of the aggregation process.<br/>
+
&nbsp;&nbsp;'''-D''': which indicates that the aggregating data transfer should take place, from the source ad tables to the tables in the st (studies), ob (objects) and nk (links) schemas. This is the necessary first step of the aggregation process.<br/>
'''-C''': indicates that the core tables should be created and filled from the aggregate tables, i.e. data is combined from the st, ob and nk schemas in to a single, simpler core schema.<br/>
+
&nbsp;&nbsp;'''-C''': indicates that the core tables should be created and filled from the aggregate tables, i.e. data is combined from the st, ob and nk schemas in to a single, simpler core schema.<br/>
'''-J''': indicates that the core data be used to create JSON versions of the data within the core database.<br/>
+
&nbsp;&nbsp;'''-S''': collects statistics about the existing data, from both the source ad tables and the central aggregated tables.<br/>
'''-F''': indicates that the core data should be used to create JSON files of two types, one for each study and another for each data object. It has no effect unless the '''-J''' parameter is also supplied.<br/>
+
&nbsp;&nbsp;'''-J''': indicates that the core data be used to create JSON versions of the data within the core database.<br/>
'''-S''': collects statistics about the existing data, from both the ad tables and the central aggregated tables.<br/>
+
It makes little sense to trigger the other processes without an initial call using '''-D'''. <br/>
'''-Z''': zips the json files created by the '''-F''' parameter into a series of zip files, with up to 100,000 files in each. This is for ease of transfer to other systems.<br/>    
+
A '''-C''' call would then normally follows, then '''-S''' and then finally '''-J'''. The system can cope with multiple parameters, and applies them in the order given, e.g. as -D -C -S. -J. The programme is currently scheduled to run 4 times, however, each time calling a single parameter in the order described.
The -S parameter can be provided at any time or on its own. It makes little sense to trigger the other processes without an initial call using -D. A -C call would then normally follows, and then -J (-F), and finally -Z. The system can cope with multiple parameters, and applies them in the order given: -D -C -J -F -S -Z. It is easier to see what is happening, however, to make multiple calls to the program working through the parameter list as described.<br/>
+
 
 +
===The Aggregation Process===
 +
====The Overall Sequence====
 +
The aggregation process begins with a call to the program with the -D flag.<br/>
 +
The system first drops and recreates almost all tables in the st, ob and nk schemas. The main exceptions are the study ‘master list’: nk.study_ids, and the data object ‘master list’: nk.data_object_ids.<br/>
 +
The program then interrogates the '''mon''' database to retrieve the set of 'Source' objects, each one corresponding to a source database, and runs those sources to obtain a list of the 'other registry ids' in each database. In other words it builds up a list of all studies that are found in more than one data source. About 25,000 studies are registered in 2 registries and about another 1,000 are registered in 3 or more registries. The details of the process are provided in '''[[Identifying Links between Studies|Study-study links]]'''), but the outcome is a list of ordered study pairs, with each duplicated study indicating the 'more preferred' version of its data source (see below).<br/>
 +
Some studies (several hundred) have more complex relationships - for example are registered in two registries but in one of those are entered as a group of related studies rather than having a simple 1-to-1 relationship with the other registry study. These are removed from the linked study table but added later as study-study relationship data. Again the details are in '''[[Identifying Links between Studies|Study-study links]]'''.
 +
Armed with the study linkage data, and if not in 'test mode' (when only the test data is accessed) the system can begin the data transfer to the central mdr database, by instantiating a DataTransferBuilder object for each source and issuing the top level data transfer commands on that object.<br/>
 +
Each database source is considered in a fixed 'order of preference'. The order of preference is provided by an integer number associated with each source – the lower the number the ‘more preferred’ that source. ‘More preferred’ sources are considered to be richer or more reliable sources of data, in general, than ‘less preferred’ sources. They are therefore considered first, with additional data from the ‘less preferred’ sources only added if applicable and identified as new. Currently the ‘most preferred’ source is ClinicalTrials.gov.<br/>
 +
For each source:  
 +
* The ad schema of the source database is linked as a foreign table wrapper (FTW) in the mdr database. The schema name returned from this process is always <database name>_ad, e.g. ctg_ad, isrctn_ad, etc. This allows data from the source database to be easily accessed by, and transferred to, the central mdr database.
 +
if the source has study tables (i.e. is a study based source):
 +
* The study ids are retrieved from the source database, and processed to see which are new and which correspond to studies that already exist in the ‘study id master list’. Matched studies are assigned the ECRIN ID that they had previously.
 +
* 'New studies are checked to ensure that they are not existing studies coming from a new ‘source’, e.g. have been registered in an additional registry. If so they receive the ECRIN ID of the existing study. If they are completely new to the system the study is assigned a new ECRIN ID.
 +
* The data for the studies and their attributes is added to the st schema tables, with the ECRIN study ID applied to all data.
 +
* The object data is retrieved and again checked against the master list of object ECRIN Ids to see if it matches an object already known to the system. If not, it is then checked to see if it is (probably) the same as an existing object, with the exact comparison method depends on the object type. If identified as a duplicate it is given the same ECRIN object Id as the original object, otherwise it is added to the system as a new object with a new ECRIN object ID.
 +
If it is an object based data source (currently only PubMed),
 +
* the object ids are retrieved and linked to studies using source-specific processes (described elsewhere).
 +
In both cases
 +
* The data for the data objects and their attributes is then transferred to the ob schema tables. The object id master list includes the ECRIN ID of the linked study, so this table also holds the study-object link data.
 +
* The foreign table wrapper for the source database is dropped.
 +
 
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
      foreach (Source source in sources)
 +
      {
 +
          string source_conn_string = _credentials.GetConnectionString(source.database_name, opts.testing);
 +
          source.db_conn = source_conn_string;
 +
          string schema_name = "";
 +
          if (opts.testing)
 +
          {
 +
              schema_name = "ad";    // In testing environment source schema name will simply be 'ad'
 +
              _test_repo.TransferADTableData(source);
 +
          }
 +
          else
 +
          {
 +
                // in normal non-testing environment schema name is the ad tables in a FTW - i.e. <db name>_ad
 +
                schema_name = _mon_repo.SetUpTempFTW(_credentials, source.database_name, dest_conn_string);
 +
          }
 +
 
 +
          DataTransferBuilder tb = new DataTransferBuilder(source, schema_name, dest_conn_string, _logger);
 +
          _logger_helper.LogStudyHeader(false, "Transferring data for " + source.database_name);
 +
          if (source.has_study_tables)
 +
          {
 +
                  _logger_helper.LogHeader("Process study Ids");
 +
                  tb.ProcessStudyIds();
 +
                  _logger_helper.LogHeader("Transfer study data");
 +
                  num_studies_imported += tb.TransferStudyData();
 +
                  _logger_helper.LogHeader("Process object Ids");
 +
                  tb.ProcessStudyObjectIds();
 +
          }
 +
          else
 +
          {
 +
                  tb.ProcessStandaloneObjectIds(sources, _credentials, opts.testing);  // for now, just PubMed
 +
          }
 +
          _logger_helper.LogHeader("Transfer object data");
 +
          num_objects_imported += tb.TransferObjectData();
 +
 
 +
          _mon_repo.DropTempFTW(source.database_name, dest_conn_string);
 +
      }
 +
</pre>
 +
</div>
 +
Once the '''st''' and '''ob''' tables have all been re-populated, and the study and object Id tables have been updated, the Aggregation system can be called again with the '''-C''' parameter.<br/>
 +
The '''-C''' option copies the data from the aggregating schema (st, ob, and nk) to the core schema without any processing, other than creating the provenance strings for both studies and data objects. The latter may be composite if more than one source was involved.<br/>
 +
The system also regenerates a study_search table within the core schema, which brings together into a single table the main attributes of studies that are used for searching and filtering, making those processes as simple as possible. <br/>
 +
The program can then be called with the '''-S''' flag to construct a set of ‘statistical’ records, which are added to the corresponding tables in the '''mon''' (monitoring) database. These include a summary of the numbers of studies and objects transferred and then aggregated, and a breakdown of the numbers of each type of data object in the system.<br/>
 +
Finally the Aggregation program is called wit the '''-J''' flag. This recreates JSON versions of the study and object data, which are stored in tab les within the core schema.<br/>
 +
 
 +
====Processing Study Ids====
 +
For each source, the initial step is to set up a temporary table to hold the study ids (temp_study_ids). The next step is then to fetch those study ids, as well as the datetime_of_data_fetch field, from the source data, also including the source id in the collection of StudyId objects created. This data is transferred to the temporary table. Note that the temporary table has the same structure as the permanent all_ids_studies table. Later on the temp_study_ids data will be transferred to that permanent table - but not before some records have been modified.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void SetUpTempStudyIdsTable()
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                string sql_string = @"DROP TABLE IF EXISTS nk.temp_study_ids;
 +
                        CREATE TABLE nk.temp_study_ids(
 +
                        study_id                INT
 +
                      , source_id                INT
 +
                      , sd_sid                  VARCHAR
 +
                      , is_preferred            BOOLEAN
 +
                      , datetime_of_data_fetch  TIMESTAMPTZ
 +
                      ); ";
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
 
 +
        public IEnumerable<StudyId> FetchStudyIds(int source_id, string source_conn_string)
 +
        {
 +
            using (var conn = new NpgsqlConnection(source_conn_string))
 +
            {
 +
                string sql_string = @"select " + source_id.ToString() + @" as source_id,
 +
                          sd_sid, datetime_of_data_fetch
 +
                          from ad.studies";
 +
                return conn.Query<StudyId>(sql_string);
 +
            }
 +
        }
 +
 
 +
        public ulong StoreStudyIds(PostgreSQLCopyHelper<StudyId> copyHelper, IEnumerable<StudyId> entities)
 +
        {
 +
            // stores the study id data in a temporary table
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                conn.Open();
 +
                return copyHelper.SaveAll(conn, entities);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The data in the temporary table is checked against the study_study links table. If any study id corresponds to a study already in the the system, it will appear in the left hand side of the study-study links table (source_id and sd_id). The corresponding preferred id, and source id, will be in the right hand side of this table (preferred_source_id and preferred_sd_sid). That preferred source_id / sd_sid will also exist in the all_ids_studies table. This is because the sources are aggregated 'most preferred first', so the more preferred version can be guaranteed to be present. That source_id / sd_sid combination will have an integer study_id, created when the data was first transferred to the all_ids_studies  table, and that value is used to update the corresponding field in the temp_study_ids field. At the same time the study is marked as 'a non-preferred duplicate' by setting the is_preferred field to false.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void CheckStudyLinks()
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                string sql_string = @"UPDATE nk.temp_study_ids t
 +
                          SET study_id = s.study_id, is_preferred = false
 +
                          FROM nk.study_study_links k
 +
                                INNER JOIN nk.all_ids_studies s
 +
                                ON k.preferred_sd_sid = s.sd_sid
 +
                                AND k.preferred_source_id = s.source_id
 +
                          WHERE t.sd_sid = k.sd_sid
 +
                          AND t.source_id =  k.source_id;";
 +
                int res = db.ExecuteSQL(sql_string);
 +
                ...
 +
                ...
 +
</pre>
 +
</div>
 +
In the case of the first and largest source (ClinicalTrials.gov) there will obviously be no existing study Id data and (because it is the most preferred of all the data sources) no matching record on the left hand side of the study links table. The update above therefore has no effect and no records are updated. For later sources some - usually a small majority - of the studies - will be updated to include a pre-existing study_id.<br/>
 +
A small table - nk.existing_studies - is then created that just lists the sd_sid and study_id values of these 'pre-existing' studies - it will be useful later on.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
                ...
 +
                sql_string = @"DROP TABLE IF EXISTS nk.existing_studies;
 +
                              CREATE TABLE nk.existing_studies as
 +
                                      SELECT sd_sid, study_id
 +
                                      FROM nk.temp_study_ids
 +
                                      WHERE is_preferred = false";
 +
                db.ExecuteSQL(sql_string);
 +
 
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
Now the temp_study_ids are imported into the permanent all_ids_studies table. All records are provided with an id value because the id field is an identity column. The newly added records are then updated, so that those that do not already have a study_id are given the identity id as their study_id. At the same time the is_preferred field for these records is set to true, to indicate that this is the first time this study has been added to the system. Because it is usually easier to work with the smaller temp_study_ids table than the all_ids_studies, the temporary table is 'back-filled' from the permanent data, so that the study_ids that were null there now hold the values in the corresponding permanent table rows. New study records are therefore given an id derived from the identity value in all_ids_studies, while pre-existing studies are given the same id as the pre-existing version.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void UpdateAllStudyIdsTable(int source_id)
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                // Add the new study id records to the all Ids table
 +
 
 +
                string sql_string = @"INSERT INTO nk.all_ids_studies
 +
                            (study_id, source_id, sd_sid,
 +
                            datetime_of_data_fetch, is_preferred)
 +
                            select study_id, source_id, sd_sid,
 +
                            datetime_of_data_fetch, is_preferred
 +
                            from nk.temp_study_ids";
 +
 
 +
                conn.Execute(sql_string);
 +
 
 +
                // Where the study_ids are null they can take on the value of the
 +
                // record id.
 +
 
 +
                sql_string = @"UPDATE nk.all_ids_studies
 +
                            SET study_id = id, is_preferred = true
 +
                            WHERE study_id is null
 +
                            AND source_id = " + source_id.ToString();
 +
 
 +
                conn.Execute(sql_string);
 +
 
 +
                // 'Back-update' the study temp table using the newly created study_ids
 +
                // now all should be done...
 +
 
 +
                sql_string = @"UPDATE nk.temp_study_ids t
 +
                          SET study_id = a.study_id, is_preferred = true
 +
                          FROM nk.all_ids_studies a
 +
                          WHERE t.source_id = a.source_id
 +
                          AND t.sd_sid = a.sd_sid
 +
                          AND t.study_id is null;";
 +
 
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The study ids have now been processed and the various tables generated can be used to support the transfer of the study data.
 +
 
 +
====Transferring the Study Data====
 +
Transferring the studies themselves is straightforward. A main study record can only have one data source. If a study is new the study record is transferred. If it is a duplicate it is ignored. The selection can be done by linking the source data to the temp_study_ids table where is_preferred is true. The study_id is derived from the temp_study_ids table, but all other fields are from the source. Note that the ExecuteTransferSQL function is used again to 'chunk' the transfer process if too many records are involved. Note also that with this and every other transfer the source records are updated with the datetime of the aggregation process, using the Update_SourceTable_ExportDate procedure. (Note this update is applied to all studies - whether or not the data is used).
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void LoadStudies(string schema_name)
 +
        {
 +
           
 +
            // Insert the study data unless it is already in under another
 +
            // id (i.e. t.is_preferred = false).
 +
 
 +
            string sql_string = @"INSERT INTO st.studies(id,
 +
                    display_title, title_lang_code, brief_description, bd_contains_html,
 +
                    data_sharing_statement, dss_contains_html,
 +
                    study_start_year, study_start_month, study_type_id, study_status_id,
 +
                    study_enrolment, study_gender_elig_id, min_age, min_age_units_id,
 +
                    max_age, max_age_units_id)
 +
                    SELECT t.study_id,
 +
                    s.display_title, s.title_lang_code, s.brief_description, s.bd_contains_html,
 +
                    s.data_sharing_statement, s.dss_contains_html,
 +
                    s.study_start_year, s.study_start_month, s.study_type_id, s.study_status_id,
 +
                    s.study_enrolment, s.study_gender_elig_id, s.min_age, s.min_age_units_id,
 +
                    s.max_age, s.max_age_units_id
 +
                    FROM nk.temp_study_ids t
 +
                    INNER JOIN " + schema_name + @".studies s
 +
                    on t.sd_sid = s.sd_sid
 +
                    WHERE t.is_preferred = true ";
 +
 
 +
            int res = db.ExecuteTransferSQL(sql_string, schema_name, "studies", "new studies");
 +
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " studies, new studies");
 +
 
 +
            // Note that the statement below also updates studies that are not added as new
 +
            // (because they equate to existing studies) but which were new in the
 +
            // source data.
 +
 
 +
            db.Update_SourceTable_ExportDate(schema_name, "studies");
 +
        }
 +
</pre>
 +
</div>
 +
For the study attributes things are more complex. If a study is new to the system all its attributes are simply added to the system. If the study is a duplicate however, its attributes may or may not already be in the system. The new data belonging to these studies must therefore be compared to the existing data for these studies, using a few key fields, and only added if no match is found. In the code below, using study identifiers as an example, some field list strings are first established to simplify the later statements. The source field list needs to include a reference to the source table ('s.'). Then there is an initial transfer of all the attributes of the new studies (k.is_preferred = true).<br/>
 +
For studies that have been added already, two temporary datasets are established. The first, nk.source_data, has all the attributes for these studies that exist in the source data. The second, nk.existing_data, has all the attributes for these studies that are already in the database. Note the use of the lightweight nk.existing_studies table, created in the id processing phase, to simplify these steps, and to provide the mdr study_ids for these records. The insertion required is of those source data records that do not exist in the existing data, using a left join on key fields to identify the new attributes. In this case the identifier_type_id and identifier_value fields are used, but this is obviously table specific. As usual the SQL statement is passed to ExecuteTransferSQL to carry out the actual insertion, to allow chunking of data. The temporary tables are then removed and the export date-time in the source data is updated.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void LoadStudyIdentifiers(string schema_name)
 +
        {
 +
            string destination_field_list = @"study_id,
 +
                    identifier_type_id, identifier_org_id, identifier_org,
 +
                    identifier_value, identifier_date, identifier_link ";
 +
 
 +
            string source_field_list = @"
 +
                    s.identifier_type_id, s.identifier_org_id, s.identifier_org,
 +
                    s.identifier_value, s.identifier_date, s.identifier_link ";
 +
 
 +
            // For 'preferred' study Ids add all identifiers.
 +
 
 +
            string sql_string = @"INSERT INTO st.study_identifiers(" + destination_field_list + @")
 +
                    SELECT k.study_id, " + source_field_list + @"
 +
                    FROM nk.temp_study_ids k
 +
                    INNER JOIN " + schema_name + @".study_identifiers s
 +
                    on k.sd_sid = s.sd_sid
 +
                    WHERE k.is_preferred = true ";
 +
 
 +
            int res = db.ExecuteTransferSQL(sql_string, schema_name, "study_identifiers", "new studies");
 +
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " study_identifiers, new studies");
 +
 
 +
            // For 'existing studies' study Ids add only new identifiers.
 +
 
 +
            sql_string = @"DROP TABLE IF EXISTS nk.source_data;
 +
                          CREATE TABLE nk.source_data as
 +
                          SELECT es.study_id, d.*
 +
                          FROM " + schema_name + @".study_identifiers d
 +
                          INNER JOIN nk.existing_studies es
 +
                          ON d.sd_sid = es.sd_sid";
 +
            db.ExecuteSQL(sql_string);
 +
 
 +
            sql_string = @"DROP TABLE IF EXISTS nk.existing_data;
 +
                          CREATE TABLE nk.existing_data as
 +
                          SELECT es.sd_sid, es.study_id,
 +
                          c.identifier_type_id, c.identifier_value
 +
                          FROM st.study_identifiers c
 +
                          INNER JOIN nk.existing_studies es
 +
                          ON c.study_id = es.study_id;";
 +
            db.ExecuteSQL(sql_string);
 +
 
 +
            sql_string = @"INSERT INTO st.study_identifiers(" + destination_field_list + @")
 +
                          SELECT s.study_id, " + source_field_list + @"
 +
                          FROM nk.source_data s
 +
                          LEFT JOIN nk.existing_data e
 +
                          ON s.sd_sid = e.sd_sid
 +
                          AND s.identifier_type_id = e.identifier_type_id
 +
                          AND s.identifier_value = e.identifier_value
 +
                          WHERE e.study_id is null ";
 +
 
 +
            res = db.ExecuteTransferSQL(sql_string, schema_name, "study_identifiers", "existing studies");
 +
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " study_identifiers, existing studies");
 +
 
 +
            db.ExecuteSQL("DROP TABLE IF EXISTS nk.source_data;");
 +
            db.ExecuteSQL("DROP TABLE IF EXISTS nk.existing_data;");
 +
 
 +
            db.Update_SourceTable_ExportDate(schema_name, "study_identifiers");
 +
        }
 +
</pre>
 +
</div>
 +
This process is repeated for all study attribute types. It is true that the steps of creating existing and source data sets, and then doing the insert, could all be rolled up into one SQL statement. It is a lot clearer, however, to separate out the steps and use temporary tables.
 +
 
 +
====Aggregating Object Data====
 +
Transfer of object data also requires an initial 'id processing' phase, followed by the transfer itself. The id processing, however, differs according to whether the source is study based, where all data objects are already related to studies, or object based, where the study-object links have to be constructed, (the only current object based source is PubMed).<br/>
 +
Note that - at least for the moment - there is '''not''' a requirement to establish an object-object links table, listing those objects that are duplicated in different sources, and thus which present with different ids. For study based sources the assumption is that almost all objects listed will be unique. This  seems to be justified, and there is any case the ability to check this later in the processing. But there is also a practical element here - without PIDs, and with generated data object ids that are always source specific, it is extremely difficult to unambiguously identify duplicates even if they did occur. Journal articles in the system are duplicated, of course, but they always present with the ''same'' genuine identifier (the PubMed Id) and so present a different challenge - see '''[[Processing PubMed Data|PubMed Data]]''' for details. As more data sources are added, in particular more object based data sources, as well as journal articles not listed in PubMed, it will be necessary to revisit this issue, and almost certainly try to find ways of identifying duplicate objects across different sources.<br/>
 +
In study based sources the source data objects will already carry a reference to the sd_sid study identifier. But that identifier will need 'translating' into the mdr integer identifier when the object is transferred to the central database, and the data object itself will also need an integer id assigned. As with the study data, the first step is to establish a temporary table that will hold the object identifier data - and in this case the associated study identifier - which is called temp_object_ids. The structure of this table is analogous to the permanent nk.all_ids_data_objects, to which the data will later be transferred, after processing. A small nk.temp_objects_to_add is also created at the same time.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void SetUpTempObjectIdsTables()
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                string sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids;
 +
                      CREATE TABLE IF NOT EXISTS nk.temp_object_ids(
 +
                        object_id                INT
 +
                      , source_id                INT
 +
                      , sd_oid                  VARCHAR
 +
                      , parent_study_source_id  INT
 +
                      , parent_study_sd_sid      VARCHAR
 +
                      , parent_study_id          INT
 +
                      , is_preferred_study      BOOLEAN
 +
                      , datetime_of_data_fetch  TIMESTAMPTZ
 +
                      ); ";
 +
                conn.Execute(sql_string);
 +
 
 +
                sql_string = @"DROP TABLE IF EXISTS nk.temp_objects_to_add;
 +
                      CREATE TABLE IF NOT EXISTS nk.temp_objects_to_add(
 +
                        object_id                INT
 +
                      , sd_oid                  VARCHAR
 +
                      );
 +
                      CREATE INDEX temp_objects_to_add_sd_oid on nk.temp_objects_to_add(sd_oid);";
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The data object and study source ids are then retrieved from the source database, along with the datetime_of_data_fetch value. The source id is incorporated twice, as both the object and study source id. The collection of ObjectId objects that result are stored in the temporary temp_object_ids table.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public IEnumerable<ObjectId> FetchObjectIds(int source_id)
 +
        {
 +
            string conn_string = repo.GetConnString(source_id);
 +
            using (var conn = new NpgsqlConnection(conn_string))
 +
            {
 +
                string sql_string = @"select " + source_id.ToString() + @" as source_id, "
 +
                          + source_id.ToString() + @" as parent_study_source_id,
 +
                          sd_oid, sd_sid as parent_study_sd_sid, datetime_of_data_fetch
 +
                          from ad.data_objects";
 +
 
 +
                return conn.Query<ObjectId>(sql_string);
 +
            }
 +
        }
 +
 
 +
        public ulong StoreObjectIds(PostgreSQLCopyHelper<ObjectId> copyHelper, IEnumerable<ObjectId> entities)
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                conn.Open();
 +
                return copyHelper.SaveAll(conn, entities);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The data in the temporary table is then updated with the integer mdr version of the study's id, and its is_preferred status. Any records that cannot be matched to an existing study (though this should not happen - it is included only to deal with possible errors) are deleted. At this point it would be possible to check the data object and its attributes for possible duplication against a pre-existing object for this study. This would obviously not apply to objects that are directly embedded in the data source - e.g. a trial registry entry, or a repository study 'landing page' - but it could happen that a document or dataset could be duplicated. Unfortunately the names assigned to different data objects are generally not consistent. Comparison could be made on type and version, though the version is very rarely given, and / or on the URL of the associated instance(s). At the moment, however, the incidence of duplicate data objects appears to be zero, or at least extremely rare, so these checks have not yet been implemented. If they were added, any duplicate data object would be simply removed from the temp_object_ids dataset, so that neither it nor any of its attributes would be added to the aggregate data.       
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void UpdateObjectsWithStudyIds(int source_id)
 +
        {
 +
            // Update the object parent study_id using the 'correct'
 +
            // value found in the all_ids_studies table
 +
 
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                string sql_string = @"UPDATE nk.temp_object_ids t
 +
                          SET parent_study_id = s.study_id,
 +
                          is_preferred_study = s.is_preferred
 +
                          FROM nk.all_ids_studies s
 +
                          WHERE t.parent_study_sd_sid = s.sd_sid
 +
                          and t.parent_study_source_id = s.source_id;";
 +
                conn.Execute(sql_string);
 +
 
 +
                // Drop those link records that cannot be matched
 +
 
 +
                sql_string = @"DELETE FROM nk.temp_object_ids
 +
                            WHERE parent_study_id is null;";
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The temporary object id records are then added to the permanent table, with each record gaining an id from the identity column. The object_id is then derived as that id. Because there is no object-object linkage to consider there is no need to go through the more complex process used for studies - but the same strategy and steps are used to allow that process to be inserted later, if and when necessary.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void UpdateAllObjectIdsTable(int source_id)
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                string sql_string = @"INSERT INTO nk.all_ids_data_objects
 +
                            (source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
 +
                            parent_study_id, is_preferred_study, datetime_of_data_fetch)
 +
                            select source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
 +
                            parent_study_id, is_preferred_study, datetime_of_data_fetch
 +
                            from nk.temp_object_ids";
 +
                conn.Execute(sql_string);
 +
 
 +
                sql_string = @"UPDATE nk.all_ids_data_objects
 +
                            SET object_id = id
 +
                            WHERE source_id = " + source_id.ToString() + @"
 +
                            and object_id is null;";
 +
                conn.Execute(sql_string);
 +
 
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
As an aid to the data transfer process the temp_objects_to_add table is then filled with the data linking the source sd_oid with the new mdr integer object id.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void FillObjectsToAddTable(int source_id)
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                string sql_string = @"INSERT INTO nk.temp_objects_to_add
 +
                            (object_id, sd_oid)
 +
                            SELECT distinct object_id, sd_oid
 +
                            FROM nk.all_ids_data_objects
 +
                            WHERE source_id = " + source_id.ToString();
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The data transfer for objects is much more straightforward than for studies as there are no 'preferred' versions of studies. The code for transferring the data objects themselves is given below, with the object_id being drawn from the temp_objects_to_add table and the rest of the data from the source table. The ExecuteTransferSQL is used to carry out the transfer, allowing chunking of records if it is needed, and again the source records are updated with the date-time of the transfer. Note that the link to the 'parent' study has disappeared in the mdr database. That link is retained, instead, in the all_ids_data_objects table.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void LoadDataObjects(string schema_name)
 +
        {
 +
            string sql_string = @"INSERT INTO ob.data_objects(id,
 +
                    display_title, version, doi, doi_status_id, publication_year,
 +
                    object_class_id, object_type_id, managing_org_id, managing_org,
 +
                    lang_code, access_type_id, access_details, access_details_url,
 +
                    url_last_checked, eosc_category, add_study_contribs,
 +
                    add_study_topics)
 +
                    SELECT t.object_id,
 +
                    s.display_title, s.version, s.doi, s.doi_status_id, s.publication_year,
 +
                    s.object_class_id, s.object_type_id, s.managing_org_id, s.managing_org,
 +
                    s.lang_code, s.access_type_id, s.access_details, s.access_details_url,
 +
                    s.url_last_checked, s.eosc_category, s.add_study_contribs,
 +
                    s.add_study_topics
 +
                    FROM " + schema_name + @".data_objects s
 +
                    INNER JOIN nk.temp_objects_to_add t
 +
                    on s.sd_oid = t.sd_oid ";
 +
 
 +
            int res = db.ExecuteTransferSQL(sql_string, schema_name, "data_objects", "");
 +
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " data_objects");
 +
 
 +
            db.Update_SourceTable_ExportDate(schema_name, "data_objects");
 +
        }
 +
</pre>
 +
</div>
 +
For object attributes the process is again much more straightforward than for studies. Because all objects that are transferred are assumed to be unique, a single data transfer call is all that is required for their attributes. As an example the code for object instances is given below.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void LoadObjectInstances(string schema_name)
 +
        {
 +
            string sql_string = @"INSERT INTO ob.object_instances(object_id, 
 +
            instance_type_id, repository_org_id, repository_org,
 +
            url, url_accessible, url_last_checked, resource_type_id,
 +
            resource_size, resource_size_units, resource_comments)
 +
            SELECT t.object_id,
 +
            instance_type_id, repository_org_id, repository_org,
 +
            url, url_accessible, url_last_checked, resource_type_id,
 +
            resource_size, resource_size_units, resource_comments
 +
            FROM " + schema_name + @".object_instances s
 +
                    INNER JOIN nk.temp_objects_to_add t
 +
                    on s.sd_oid = t.sd_oid ";
  
===Overview===
+
            int res = db.ExecuteTransferSQL(sql_string, schema_name, "object_instances", "");
====Initial Setup====
+
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " object_instances");
The main aggregation process, as triggered by -D, begins with the creation of a new set of central tables in the st, ob and nk schemas.
 
<br/>After that the program interrogates the mon database to retrieve a set of 'Source' objects, each one corresponding to a source database. The list is in order of decreasing 'preference', where preference indicates the usefulness of the source as the primary data source for duplicated studies (see the Preferred Source concept section in '''[[Identifying Links between Studies|Study-study links]]]'''). It runs through those sources to first obtain a list of the 'other registry ids' in each database. In other words it builds up a list of all studies that are found in more than one data source. About 25,000 studies are registered in 2 registries and about another 1,000 are registered in 3 or more registries. The details are provided in '''[[Identifying Links between Studies|Study-study links]]'''.
 
  
Some studies (several hundred) have more complex relationships - for example are registered in two registries but in one of those are entered as a group of related studies rather than having a simple 1-to-1 relationship with the other registry study. These are removed and instead added to the study-study relationship data.
+
            db.Update_SourceTable_ExportDate(schema_name, "object_instances");
 +
        }
 +
</pre>
 +
</div>
 +
The description above only applies to study based sources. For PubMed, the links between the PubMed data and the studies are first identified. Two sources are used - the 'bank id' data within the PubMed data itself, referring to trial registry ids, and the 'source id' data in the study based sources, where references are provided to relevant papers. These two sets of data are combined and de-duplicated, and two final sets of data are created: the distinct list of PubMed data objects, and the list of links between those objects and studies. These steps require considerable additional processing, which is described on the '''[[Processing PubMed Data|PubMed Data]]''' page.
  
====Aggregating Study Data====
 
* The study data is then added to the aggregate tables, in the order of most preferred source, working through the list to the least preferred. Apart from the first (ClinicalTriuals.gov) the study id of any imported study is checked against the table of poly-registered studies. If it exists in that table it is not added as a separate record but instead is given the same id as that of the most preferred version of that study. 
 
* Study data, including all attribute data, of studies that are genuinely new to the system are simply added to the aggregate data. Also immediately added are all associated data objects and their attribute data.
 
* Study data for a study that already exists in the system is checked first to see if it represents new data. The main study data record is not added - that can only come from the 'preferred' source. Study attributes are only added if they do not already exist, so far as that can be readily checked by the program. Data objects in the 'non-preferred' versions of the study may already exist but the nature of the data is that genuine duplication of data objects from different sources is extremely rare. Almost all data objects are therefore added. Studies with multiple entries in different registries therefore have their data built up from a single 'preferred' source for the main study record, from potentially multiple registries for study attributes, and definitely from multiple registries for the associated data objects.
 
====Aggregating Link Data====
 
* The link between data objects and studies - found within the source data object data - is transferred to link tables. The 'parent study' id is transformed into its most 'preferred' form if and when necessary, to ensure that the links work properly in the aggregated system. Also transferred to link tables is the provenance data that indicates when each study and data object was last retrieved from the source.
 
====Aggregating Object Based Data====
 
* For sources where there are no studies - just data objects - the process is necessarily different. It must also follow after the aggregation of study data, to ensure that all  studies are in the central system.
 
* This only applies to PubMed data at the moment. For PubMed, the links between the PubMed data and the studies are first identified. Two sources are used - the 'bank id' data within the PubMed data itself, referring to trial registry ids, and the 'source id' data in the study based sources, where references are provided to relevant papers. These two sets of data are combined and de-duplicated, and two final sets of data are created: the distinct list of PubMed data objects, and the list of links between those objects and studies. Unlike most data objects in the study based resources, PubMed data objects can be linked to multiple studies, and of course studies may have multiple article references. The linkage is therefore complex and requires considerable additional processing.
 
 
====Creating the Core Tables====
 
====Creating the Core Tables====
Most of the other options provided by the program are relatively simple and self contained. The -C option copies the data from the aggregating schema (st, ob, and nk) to the core schema without any processing, other than creating the provenance strings for both studies and data objects. The latter may be composite if more than one source was involved.<br/>
+
The -C option copies the data from the aggregating schema (st, ob, and nk) to the core schema without any processing, other than creating the provenance strings for both studies and data objects. The latter may be composite if more than one source was involved.<br/>
 +
The process involved is therefore very simple. The core tables are dropped and then re-created, and the study and object data are simply transferred across from the st and ob schemas, respectively. The ids created within the aggregation process are also copied - there is no identifier generation in this step. The various link tables are not copied, but the basic link between studies and objects is extracted, to become the study_objects_links table. The core database therefore holds no record of the source, or source identifiers, of studies and objects, '''''except''''' within the provenance strings.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void LoadStudyObjectLinks()
 +
        {
 +
            string sql_string = @"INSERT INTO core.study_object_links(id,
 +
            study_id, object_id)
 +
            SELECT s.id, s.parent_study_id, s.object_id
 +
            FROM nk.all_ids_data_objects s ";
 +
 
 +
            int res = db.ExecuteCoreTransferSQL(sql_string, "nk.all_ids_data_objects");
 +
        }
 +
</pre>
 +
</div>
 +
The provenance strings indicate the source(s) of the data and the date(s) on which it was last read. The source for these strings lie in the link schema ids tables - nk.all_ids_studies  and nk.all_ids_data_objects, in the datetime_of_data_fetch field. The data in that field ultimately derives from the most recent download process for that data. This field is stored in the all_ids_ tables because it is a function of a specific source. The provenance string combines the data from each source, to give the date-time data was obtained from every relevant source. For studies these may be multiple sources, for objects (at the moment) there is always one. The code is shown below. The Postgres string_agg function is used on the data source name(s) and time(s) associated with the studies, with the only variation being that if the source data was accessed via a WHO ICRTP file this is acknowledged in the provenance string.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void GenerateStudyProvenanceData()
 +
        {
 +
            string sql_string = "";
 +
            sql_string = @"CREATE table nk.temp_study_provenance
 +
                    as
 +
                    select s.study_id,
 +
                    'Data retrieved from ' || string_agg(d.repo_name || ' at ' || to_char(s.datetime_of_data_fetch, 'HH24:MI, dd Mon yyyy'), ', ' ORDER BY s.datetime_of_data_fetch) as provenance
 +
                    from nk.all_ids_studies s
 +
                    inner join
 +
                        (select t.id,
 +
                          case
 +
                            when p.uses_who_harvest = true then t.default_name || ' (via WHO ICTRP)'
 +
                            else t.default_name
 +
                          end as repo_name
 +
                        from context_ctx.data_sources t
 +
                        inner join mon_sf.source_parameters p
 +
                        on t.id = p.id) d
 +
                    on s.source_id = d.id
 +
                    group by study_id ";
 +
            db.ExecuteSQL(sql_string);
 +
 
 +
            sql_string = @"update core.studies s
 +
                    set provenance_string = tt.provenance
 +
                    from nk.temp_study_provenance tt
 +
                    where s.id = tt.study_id ";
 +
            db.ExecuteProvenanceSQL(sql_string, "core.studies");
 +
 
 +
            sql_string = @"drop table nk.temp_study_provenance;";
 +
            db.ExecuteSQL(sql_string);
 +
        }
 +
</pre>
 +
</div>
 +
Some examples of the strings that result from this are shown below.
 +
* ''Data retrieved from Iranian Registry of Clinical Trials (via WHO ICTRP) at 21:19, 22 Jul 2020''<br/>
 +
* ''Data retrieved from Registro Brasileiro de Ensaios Clínicos (via WHO ICTRP) at 13:43, 04 Oct 2020, Deutschen Register Klinischer Studien (via WHO ICTRP) at 13:45, 04 Oct 2020, EU Clinical Trials Register at 11:51, 06 Oct 2020, ClinicalTrials.gov at 18:28, 09 Oct 2020''<br/>
 +
* ''Data retrieved from ISRCTN at 22:41, 18 Apr 2020''<br/>
 +
* ''Data retrieved from Australian New Zealand Clinical Trials Registry (via WHO ICTRP) at 13:51, 04 Oct 2020, EU Clinical Trials Register at 12:43, 06 Oct 2020''<br/>
 +
The generation and format of provenance strings for data objects is very similar, but there is only ever one source. The creation of provenance strings is the only processing that occurs during the transfer of the data tom the core tables.
 +
 
 
====Creating the JSON Data====
 
====Creating the JSON Data====
 +
The final stages of the aggregation process involve the creation of JSON equivalents of the core data, for both studies and objects, and the (optional) generation of that json into files, followed by the (optional) compression of the JSON files into zipped archives for ease of distribution.<br/>
 +
The JSON that is created, using the -J flag, is stored two small tables , one for studies the other for data objects, that hold only the id and the JSON string (as Postgres data type JSON). The process in each case consists of creating a C# object that matches the ECRIN metadata schema exactly and filling it that with the values from the database, one study or one data object at a time. The objects and their properties are shown below. Constituent objects (lookup, age_param, study_topic, dataset_consent, object_title etc., are all defined to match the metadata schema.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
    public class JSONStudy
 +
    {
 +
        public string file_type { get; set; }
 +
        public int id { get; set; }
 +
        public string display_title { get; set; }
 +
        public text_block brief_description { get; set; }
 +
        public text_block data_sharing_statement { get; set; }
 +
        public lookup study_type { get; set; }
 +
        public lookup study_status { get; set; }
 +
        public int? study_enrolment { get; set; }
 +
        public lookup study_gender_elig { get; set; }
 +
        public age_param min_age { get; set; }
 +
        public age_param max_age { get; set; }
 +
        public string provenance_string { get; set; }
 +
 +
        public List<study_identifier> study_identifiers { get; set; }
 +
        public List<study_title> study_titles { get; set; }
 +
        public List<study_topic> study_topics { get; set; }
 +
        public List<study_feature> study_features { get; set; }
 +
        public List<study_relationship> study_relationships { get; set; }
 +
        public List<int> linked_data_objects { get; set; }
 +
    }
 +
 +
    public class JSONDataObject
 +
    {
 +
        public string file_type { get; set; }
 +
        public int id { get; set; }
 +
        public string doi { get; set; }
 +
        public string display_title { get; set; }
 +
        public string version { get; set; }
 +
        public lookup object_class { get; set; }
 +
        public lookup object_type { get; set; }
 +
        public int? publication_year { get; set; }
 +
        public lookup managing_organisation { get; set; }
 +
        public string lang_code { get; set; }
 +
        public lookup access_type { get; set; }
 +
        public object_access access_details { get; set; }
 +
        public int? eosc_category { get; set; }
 +
        public string provenance_string { get; set; }
 +
 +
        public record_keys dataset_record_keys { get; set; }
 +
        public deidentification dataset_deident_level { get; set; }
 +
        public consent dataset_consent { get; set; }
 +
 +
        public List<object_instance> object_instances { get; set; }
 +
        public List<object_title> object_titles { get; set; }
 +
        public List<object_contributor> object_contributors { get; set; }
 +
        public List<object_date> object_dates { get; set; }
 +
        public List<object_topic> object_topics { get; set; }
 +
        public List<object_description> object_descriptions { get; set; }
 +
        public List<object_identifier> object_identifiers { get; set; }
 +
        public List<object_right> object_rights { get; set; }
 +
        public List<object_relationship> object_relationships { get; set; }
 +
        public List<int> linked_studies { get; set; }
 +
    }
 +
</pre>
 +
</div>
 +
Once the object is created it is serialised to a json string and written to the table. The only difficulty is that to store it as a JSON datatype it is necessary to explicitly define the parameter as being of type json, making the call to the Dapper micro-ORM slightly more complex than normal.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void StoreJSONStudyInDB(int id, string study_json)
 +
        {
 +
            using (NpgsqlConnection Conn = new NpgsqlConnection(connString))
 +
            {
 +
                Conn.Open();
 +
 +
                // To insert the string into a json field the parameters for the
 +
                // command have to be explicitly declared and typed
 +
 +
                using (var cmd = new NpgsqlCommand())
 +
                {
 +
                  cmd.CommandText = "INSERT INTO core.studies_json (id, json) VALUES (@id, @p)";
 +
                  cmd.Parameters.Add(new NpgsqlParameter("@id", NpgsqlDbType.Integer) {Value = id });
 +
                  cmd.Parameters.Add(new NpgsqlParameter("@p", NpgsqlDbType.Json) {Value = study_json });
 +
                  cmd.Connection = Conn;
 +
                  cmd.ExecuteNonQuery();
 +
                }
 +
                Conn.Close();
 +
          }
 +
      }
 +
</pre>
 +
</div>
 +
If the -F flag is specified at the same time as the -J parameter, the system will - at the same time as it writes the JSON to the database, write a file to local storage using a formatted JSON string. Once the files are created, running the utility FileZipper with the -J parameter zips those files using the zip compression engine built into .Net, in the System.IO.Compression namespace. The routine for zipping study files is shown below (that for data objects is exactly analogous). It starts by reading the source folder (this is stored in a JSON configuration file) and obtaining the folders within that. It then zips the folders in batches - at the moment of 10. As each folder has up to 100,000 records each batch includes up to a million files. The number of zip files is calculated and then, for each, a series of string manipulations based on the date and the first and last folder of the batch generate a name for the zip file. The total number of files in the batch is then calculated and added to the zip file name. The program then uses the ZipArchive class to take each file in each folder in the batch, zip it and add it to the designated zip file.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
        public void ZipStudyFiles()
 +
        {
 +
            JSONStudyDataLayer repo = new JSONStudyDataLayer();
 +
            folder_base = repo.StudyJsonFolder;
 +
            folder_list = Directory.GetDirectories(folder_base);
 +
 +
            // Batch size of 10 folders, each of about 10,000 records - each zip therefore 100,000 files
 +
            // should produce about 6 zips
 +
            int folder_batch = 10;
 +
 +
            number_zip_files_needed = (folder_list.Length % folder_batch == 0) ? (folder_list.Length / folder_batch) : (folder_list.Length / folder_batch) + 1;
 +
            for (int n = 0; n < number_zip_files_needed; n++)
 +
            {
 +
                int start_number = n * folder_batch;
 +
                int end_limit = ((n + 1) * folder_batch);
 +
                if (end_limit > folder_list.Length) end_limit = folder_list.Length;
 +
 +
                // Folder name lengths could change (not common)
 +
 +
                string first_folder = folder_list[start_number];
 +
                string last_folder = folder_list[end_limit - 1];
 +
                int ff_studies = first_folder.LastIndexOf("\\studies ") + 9;
 +
                int lf_studies = last_folder.LastIndexOf("\\studies ") + 9;
 +
                string first_id = first_folder.Substring(ff_studies).Substring(0, 7);
 +
                int last_id_pos = last_folder.Substring(lf_studies).LastIndexOf(" ") + 1;
 +
                string last_id = last_folder.Substring(lf_studies).Substring(last_id_pos, 7);
 +
                string zip_file_suffix = today + " " + first_id + " to " + last_id;
 +
 +
                // get total number of files in this batch
 +
                int file_num = 0;
 +
                for (int i = start_number; i < end_limit; i++)
 +
                {
 +
                    folder_path = folder_list[i];
 +
                    string[] file_list = Directory.GetFiles(folder_path);
 +
                    file_num += file_list.Length;
 +
                }
 +
                string file_num_suffix = " [" + file_num.ToString() + " files]";
 +
 +
                string zip_file_path = Path.Combine(folder_base, "study ids " +
 +
                                            zip_file_suffix + file_num_suffix + ".zip");
 +
                using (ZipArchive zip = ZipFile.Open(zip_file_path, ZipArchiveMode.Create))
 +
                {
 +
                    for (int i = start_number; i < end_limit; i++)
 +
                    {
 +
                        folder_path = folder_list[i];
 +
                        string[] file_list = Directory.GetFiles(folder_path);
 +
                        int last_backslash = 0;
 +
                        string entry_name = "";
 +
                        for (int j = 0; j < file_list.Length; j++)
 +
                        {
 +
                            file_path = file_list[j];
 +
                            last_backslash = file_path.LastIndexOf("\\") + 1;
 +
                            entry_name = file_path.Substring(last_backslash);
 +
                            zip.CreateEntryFromFile(file_path, entry_name);
 +
                        }
 +
                    }
 +
                }
 +
            }
 +
        }
 +
</pre>
 +
</div>

Latest revision as of 18:08, 5 March 2022

Last updated: 05/03/2022 - edit still in progress

Overview

The aggregation program takes all the data within the ad tables in the various source databases and loads it to central tables within the mdr database, ultimately transferring it to a set of ‘core’ tables. The core tables hold all of the MDR data in a structure that exactly reflects the ECRIN metadata schema, and it is the core tables, or the JSON derived from them, that are interrogated by the web portal when users search and filter the MDR data. The aggregation process takes place weekly and takes about an hour to run.
Some studies are to be found in multiple source systems, and some objects (mainly published papers) may be linked to more than one study. The aggregation process needs to de-duplicate these multiple entries, so that the core system contains only one record for each study or object entity. At the same time it needs, as far as possible, to aggregate the different data points and links for each entity, even when they originate in different source systems.
Within the mdr tables all studies and data objects are assigned an integer accession number, their ‘ECRIN ID’, an internal, persistent ID that is quite distinct from any of the original IDs assigned by, for example, a trial registry, data repository, funder, sponsor, or bibliographic system. Studies have a 7 figure ID, beginning with 2000001, objects an 8 digit number beginning with 10000001. These IDs are used internally by the MDR system to link studies with their attributes, objects with their attributes, and studies and objects to each other. They are not normally exposed to the user.
While the internal ECRIN IDs are persistent, the aggregated data is not. On each aggregation almost all the tables in the mdr database are dropped and then recreated, empty (the exceptions are the ‘master lists’ of study and object IDs). During the aggregation process the different sources are considered in a fixed sequence, and data is added from each in turn. Duplicate studies and objects are identified within this process and managed appropriately. Studies and objects that already have an ECRIN ID, from previous aggregations, are matched once more to that ID. New studies and objects, being aggregated for the first time, are assigned new IDs, in numerical sequence. There is no attempt to edit the aggregated data ‘in place’ with new versions of data replacing old. Re-aggregating all the available data from thew beginning takes longer, but it is a much simpler process and therefore easier to maintain.
There are 4 sets of tables in the mdr database, each corresponding to a database schema:

  • The st schema tables are used to hold all the study related data imported from the source databases, but de-duplicated so that each study only appears once in the system,
  • The ob schema tables are used to hold all object related data, similarly imported from the sources and de-duplicated, so that each object is unique,
  • The nk schema tables are used to hold all data related to links between studies and objects, and between studies and studies, and the master lists of study and object ids. Those master lists include the data that links the IDs of studies and objects in the source databases with the ECRIN IDs used within the mdr database.
  • The core schema tables hold a full set of all study and object data, and a single table with the study-object links, Data in the core is derived from that in the other three schemas, with some slight modifications so that it matches the ECRIN metadata schema more exactly (e.g. provenance data is added), together with some additional data added to better support search functionality.

Although an aggregation can be run at any time it makes most sense to do so after the following sequence of events:
  a)   Downloads are run for all study based data sources to get the local source file collections as up to date as possible.
  b)   Harvests and imports are run for all study based sources to get the study data as up to date as possible, in each of the study source databases.
  c)   Downloads are run for all object based data sources to get the local source file collections as up to date as possible. In practice ‘object based data sources’ currently means PubMed. This process has to occur after b) as the imported study data is used to identify some of the linked PubMed records.
  d)   Harvests and imports are run for all object based sources to get data object data as up to date as possible, in each of the object source databases (currently PubMed).
In other words the aggregation module should be scheduled to run after the application of all other modules – it is the final stage in the weekly update cycle.
N.B. The aggregation code can be found at https://github.com/ecrin-github/DataAggregator

Parameters

The program is a console application, to enable it to be more easily scheduled. There are 4 flag type parameters:
  -D: which indicates that the aggregating data transfer should take place, from the source ad tables to the tables in the st (studies), ob (objects) and nk (links) schemas. This is the necessary first step of the aggregation process.
  -C: indicates that the core tables should be created and filled from the aggregate tables, i.e. data is combined from the st, ob and nk schemas in to a single, simpler core schema.
  -S: collects statistics about the existing data, from both the source ad tables and the central aggregated tables.
  -J: indicates that the core data be used to create JSON versions of the data within the core database.
It makes little sense to trigger the other processes without an initial call using -D.
A -C call would then normally follows, then -S and then finally -J. The system can cope with multiple parameters, and applies them in the order given, e.g. as -D -C -S. -J. The programme is currently scheduled to run 4 times, however, each time calling a single parameter in the order described.

The Aggregation Process

The Overall Sequence

The aggregation process begins with a call to the program with the -D flag.
The system first drops and recreates almost all tables in the st, ob and nk schemas. The main exceptions are the study ‘master list’: nk.study_ids, and the data object ‘master list’: nk.data_object_ids.
The program then interrogates the mon database to retrieve the set of 'Source' objects, each one corresponding to a source database, and runs those sources to obtain a list of the 'other registry ids' in each database. In other words it builds up a list of all studies that are found in more than one data source. About 25,000 studies are registered in 2 registries and about another 1,000 are registered in 3 or more registries. The details of the process are provided in Study-study links), but the outcome is a list of ordered study pairs, with each duplicated study indicating the 'more preferred' version of its data source (see below).
Some studies (several hundred) have more complex relationships - for example are registered in two registries but in one of those are entered as a group of related studies rather than having a simple 1-to-1 relationship with the other registry study. These are removed from the linked study table but added later as study-study relationship data. Again the details are in Study-study links. Armed with the study linkage data, and if not in 'test mode' (when only the test data is accessed) the system can begin the data transfer to the central mdr database, by instantiating a DataTransferBuilder object for each source and issuing the top level data transfer commands on that object.
Each database source is considered in a fixed 'order of preference'. The order of preference is provided by an integer number associated with each source – the lower the number the ‘more preferred’ that source. ‘More preferred’ sources are considered to be richer or more reliable sources of data, in general, than ‘less preferred’ sources. They are therefore considered first, with additional data from the ‘less preferred’ sources only added if applicable and identified as new. Currently the ‘most preferred’ source is ClinicalTrials.gov.
For each source:

  • The ad schema of the source database is linked as a foreign table wrapper (FTW) in the mdr database. The schema name returned from this process is always <database name>_ad, e.g. ctg_ad, isrctn_ad, etc. This allows data from the source database to be easily accessed by, and transferred to, the central mdr database.

if the source has study tables (i.e. is a study based source):

  • The study ids are retrieved from the source database, and processed to see which are new and which correspond to studies that already exist in the ‘study id master list’. Matched studies are assigned the ECRIN ID that they had previously.
  • 'New studies are checked to ensure that they are not existing studies coming from a new ‘source’, e.g. have been registered in an additional registry. If so they receive the ECRIN ID of the existing study. If they are completely new to the system the study is assigned a new ECRIN ID.
  • The data for the studies and their attributes is added to the st schema tables, with the ECRIN study ID applied to all data.
  • The object data is retrieved and again checked against the master list of object ECRIN Ids to see if it matches an object already known to the system. If not, it is then checked to see if it is (probably) the same as an existing object, with the exact comparison method depends on the object type. If identified as a duplicate it is given the same ECRIN object Id as the original object, otherwise it is added to the system as a new object with a new ECRIN object ID.

If it is an object based data source (currently only PubMed),

  • the object ids are retrieved and linked to studies using source-specific processes (described elsewhere).

In both cases

  • The data for the data objects and their attributes is then transferred to the ob schema tables. The object id master list includes the ECRIN ID of the linked study, so this table also holds the study-object link data.
  • The foreign table wrapper for the source database is dropped.
      foreach (Source source in sources)
      {
           string source_conn_string = _credentials.GetConnectionString(source.database_name, opts.testing);
           source.db_conn = source_conn_string;
           string schema_name = "";
           if (opts.testing)
           {
               schema_name = "ad";     // In testing environment source schema name will simply be 'ad'
               _test_repo.TransferADTableData(source);
           }
           else
           {
                // in normal non-testing environment schema name is the ad tables in a FTW - i.e. <db name>_ad
                schema_name = _mon_repo.SetUpTempFTW(_credentials, source.database_name, dest_conn_string);
           }

           DataTransferBuilder tb = new DataTransferBuilder(source, schema_name, dest_conn_string, _logger);
           _logger_helper.LogStudyHeader(false, "Transferring data for " + source.database_name);
           if (source.has_study_tables)
           {
                  _logger_helper.LogHeader("Process study Ids");
                  tb.ProcessStudyIds();
                  _logger_helper.LogHeader("Transfer study data");
                  num_studies_imported += tb.TransferStudyData();
                  _logger_helper.LogHeader("Process object Ids");
                  tb.ProcessStudyObjectIds();
           }
           else
           {
                  tb.ProcessStandaloneObjectIds(sources, _credentials, opts.testing);  // for now, just PubMed
           }
           _logger_helper.LogHeader("Transfer object data");
           num_objects_imported += tb.TransferObjectData();

           _mon_repo.DropTempFTW(source.database_name, dest_conn_string);
       }

Once the st and ob tables have all been re-populated, and the study and object Id tables have been updated, the Aggregation system can be called again with the -C parameter.
The -C option copies the data from the aggregating schema (st, ob, and nk) to the core schema without any processing, other than creating the provenance strings for both studies and data objects. The latter may be composite if more than one source was involved.
The system also regenerates a study_search table within the core schema, which brings together into a single table the main attributes of studies that are used for searching and filtering, making those processes as simple as possible.
The program can then be called with the -S flag to construct a set of ‘statistical’ records, which are added to the corresponding tables in the mon (monitoring) database. These include a summary of the numbers of studies and objects transferred and then aggregated, and a breakdown of the numbers of each type of data object in the system.
Finally the Aggregation program is called wit the -J flag. This recreates JSON versions of the study and object data, which are stored in tab les within the core schema.

Processing Study Ids

For each source, the initial step is to set up a temporary table to hold the study ids (temp_study_ids). The next step is then to fetch those study ids, as well as the datetime_of_data_fetch field, from the source data, also including the source id in the collection of StudyId objects created. This data is transferred to the temporary table. Note that the temporary table has the same structure as the permanent all_ids_studies table. Later on the temp_study_ids data will be transferred to that permanent table - but not before some records have been modified.

        public void SetUpTempStudyIdsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"DROP TABLE IF EXISTS nk.temp_study_ids;
                        CREATE TABLE nk.temp_study_ids(
                        study_id                 INT
                      , source_id                INT
                      , sd_sid                   VARCHAR
                      , is_preferred             BOOLEAN
                      , datetime_of_data_fetch   TIMESTAMPTZ
                      ); ";
                conn.Execute(sql_string);
            }
        }

        public IEnumerable<StudyId> FetchStudyIds(int source_id, string source_conn_string)
        {
            using (var conn = new NpgsqlConnection(source_conn_string))
            {
                string sql_string = @"select " + source_id.ToString() + @" as source_id, 
                          sd_sid, datetime_of_data_fetch
                          from ad.studies";
                return conn.Query<StudyId>(sql_string);
            }
        }

        public ulong StoreStudyIds(PostgreSQLCopyHelper<StudyId> copyHelper, IEnumerable<StudyId> entities)
        {
            // stores the study id data in a temporary table
            using (var conn = new NpgsqlConnection(connString))
            {
                conn.Open();
                return copyHelper.SaveAll(conn, entities);
            }
        }

The data in the temporary table is checked against the study_study links table. If any study id corresponds to a study already in the the system, it will appear in the left hand side of the study-study links table (source_id and sd_id). The corresponding preferred id, and source id, will be in the right hand side of this table (preferred_source_id and preferred_sd_sid). That preferred source_id / sd_sid will also exist in the all_ids_studies table. This is because the sources are aggregated 'most preferred first', so the more preferred version can be guaranteed to be present. That source_id / sd_sid combination will have an integer study_id, created when the data was first transferred to the all_ids_studies table, and that value is used to update the corresponding field in the temp_study_ids field. At the same time the study is marked as 'a non-preferred duplicate' by setting the is_preferred field to false.

        public void CheckStudyLinks()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"UPDATE nk.temp_study_ids t
                           SET study_id = s.study_id, is_preferred = false
                           FROM nk.study_study_links k
                                INNER JOIN nk.all_ids_studies s
                                ON k.preferred_sd_sid = s.sd_sid
                                AND k.preferred_source_id = s.source_id
                           WHERE t.sd_sid = k.sd_sid
                           AND t.source_id =  k.source_id;";
                int res = db.ExecuteSQL(sql_string);
                ...
                ...

In the case of the first and largest source (ClinicalTrials.gov) there will obviously be no existing study Id data and (because it is the most preferred of all the data sources) no matching record on the left hand side of the study links table. The update above therefore has no effect and no records are updated. For later sources some - usually a small majority - of the studies - will be updated to include a pre-existing study_id.
A small table - nk.existing_studies - is then created that just lists the sd_sid and study_id values of these 'pre-existing' studies - it will be useful later on.

                ...
                sql_string = @"DROP TABLE IF EXISTS nk.existing_studies;
                               CREATE TABLE nk.existing_studies as 
                                       SELECT sd_sid, study_id
                                       FROM nk.temp_study_ids
                                       WHERE is_preferred = false";
                db.ExecuteSQL(sql_string);

            }
        }

Now the temp_study_ids are imported into the permanent all_ids_studies table. All records are provided with an id value because the id field is an identity column. The newly added records are then updated, so that those that do not already have a study_id are given the identity id as their study_id. At the same time the is_preferred field for these records is set to true, to indicate that this is the first time this study has been added to the system. Because it is usually easier to work with the smaller temp_study_ids table than the all_ids_studies, the temporary table is 'back-filled' from the permanent data, so that the study_ids that were null there now hold the values in the corresponding permanent table rows. New study records are therefore given an id derived from the identity value in all_ids_studies, while pre-existing studies are given the same id as the pre-existing version.

 
        public void UpdateAllStudyIdsTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // Add the new study id records to the all Ids table

                string sql_string = @"INSERT INTO nk.all_ids_studies
                            (study_id, source_id, sd_sid, 
                             datetime_of_data_fetch, is_preferred)
                             select study_id, source_id, sd_sid, 
                             datetime_of_data_fetch, is_preferred
                             from nk.temp_study_ids";

                conn.Execute(sql_string);

                // Where the study_ids are null they can take on the value of the 
                // record id.

                sql_string = @"UPDATE nk.all_ids_studies
                            SET study_id = id, is_preferred = true
                            WHERE study_id is null
                            AND source_id = " + source_id.ToString();

                conn.Execute(sql_string);

                // 'Back-update' the study temp table using the newly created study_ids
                // now all should be done...

                sql_string = @"UPDATE nk.temp_study_ids t
                           SET study_id = a.study_id, is_preferred = true
                           FROM nk.all_ids_studies a
                           WHERE t.source_id = a.source_id
                           AND t.sd_sid = a.sd_sid
                           AND t.study_id is null;";

                conn.Execute(sql_string);
            }
        }

The study ids have now been processed and the various tables generated can be used to support the transfer of the study data.

Transferring the Study Data

Transferring the studies themselves is straightforward. A main study record can only have one data source. If a study is new the study record is transferred. If it is a duplicate it is ignored. The selection can be done by linking the source data to the temp_study_ids table where is_preferred is true. The study_id is derived from the temp_study_ids table, but all other fields are from the source. Note that the ExecuteTransferSQL function is used again to 'chunk' the transfer process if too many records are involved. Note also that with this and every other transfer the source records are updated with the datetime of the aggregation process, using the Update_SourceTable_ExportDate procedure. (Note this update is applied to all studies - whether or not the data is used).

        public void LoadStudies(string schema_name)
        {
            
            // Insert the study data unless it is already in under another 
            // id (i.e. t.is_preferred = false).

            string sql_string = @"INSERT INTO st.studies(id, 
                    display_title, title_lang_code, brief_description, bd_contains_html,
                    data_sharing_statement, dss_contains_html,
                    study_start_year, study_start_month, study_type_id, study_status_id,
                    study_enrolment, study_gender_elig_id, min_age, min_age_units_id,
                    max_age, max_age_units_id)
                    SELECT t.study_id,
                    s.display_title, s.title_lang_code, s.brief_description, s.bd_contains_html,
                    s.data_sharing_statement, s.dss_contains_html,
                    s.study_start_year, s.study_start_month, s.study_type_id, s.study_status_id,
                    s.study_enrolment, s.study_gender_elig_id, s.min_age, s.min_age_units_id,
                    s.max_age, s.max_age_units_id
                    FROM nk.temp_study_ids t
                    INNER JOIN " + schema_name + @".studies s
                    on t.sd_sid = s.sd_sid
                    WHERE t.is_preferred = true ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "studies", "new studies");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " studies, new studies");

            // Note that the statement below also updates studies that are not added as new
            // (because they equate to existing studies) but which were new in the 
            // source data.

            db.Update_SourceTable_ExportDate(schema_name, "studies");
        }

For the study attributes things are more complex. If a study is new to the system all its attributes are simply added to the system. If the study is a duplicate however, its attributes may or may not already be in the system. The new data belonging to these studies must therefore be compared to the existing data for these studies, using a few key fields, and only added if no match is found. In the code below, using study identifiers as an example, some field list strings are first established to simplify the later statements. The source field list needs to include a reference to the source table ('s.'). Then there is an initial transfer of all the attributes of the new studies (k.is_preferred = true).
For studies that have been added already, two temporary datasets are established. The first, nk.source_data, has all the attributes for these studies that exist in the source data. The second, nk.existing_data, has all the attributes for these studies that are already in the database. Note the use of the lightweight nk.existing_studies table, created in the id processing phase, to simplify these steps, and to provide the mdr study_ids for these records. The insertion required is of those source data records that do not exist in the existing data, using a left join on key fields to identify the new attributes. In this case the identifier_type_id and identifier_value fields are used, but this is obviously table specific. As usual the SQL statement is passed to ExecuteTransferSQL to carry out the actual insertion, to allow chunking of data. The temporary tables are then removed and the export date-time in the source data is updated.

        public void LoadStudyIdentifiers(string schema_name)
        {
            string destination_field_list = @"study_id, 
                    identifier_type_id, identifier_org_id, identifier_org, 
                    identifier_value, identifier_date, identifier_link ";

            string source_field_list = @" 
                    s.identifier_type_id, s.identifier_org_id, s.identifier_org, 
                    s.identifier_value, s.identifier_date, s.identifier_link ";

            // For 'preferred' study Ids add all identifiers.

            string sql_string = @"INSERT INTO st.study_identifiers(" + destination_field_list + @")
                    SELECT k.study_id, " + source_field_list + @"
                    FROM nk.temp_study_ids k
                    INNER JOIN " + schema_name + @".study_identifiers s
                    on k.sd_sid = s.sd_sid
                    WHERE k.is_preferred = true ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "study_identifiers", "new studies");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " study_identifiers, new studies");

            // For 'existing studies' study Ids add only new identifiers.

            sql_string = @"DROP TABLE IF EXISTS nk.source_data;
                           CREATE TABLE nk.source_data as 
                           SELECT es.study_id, d.* 
                           FROM " + schema_name + @".study_identifiers d
                           INNER JOIN nk.existing_studies es
                           ON d.sd_sid = es.sd_sid";
            db.ExecuteSQL(sql_string);

            sql_string = @"DROP TABLE IF EXISTS nk.existing_data;
                           CREATE TABLE nk.existing_data as 
                           SELECT es.sd_sid, es.study_id, 
                           c.identifier_type_id, c.identifier_value 
                           FROM st.study_identifiers c
                           INNER JOIN nk.existing_studies es
                           ON c.study_id = es.study_id;";
            db.ExecuteSQL(sql_string);

            sql_string = @"INSERT INTO st.study_identifiers(" + destination_field_list + @")
                           SELECT s.study_id, " + source_field_list + @" 
                           FROM nk.source_data s
                           LEFT JOIN nk.existing_data e
                           ON s.sd_sid = e.sd_sid
                           AND s.identifier_type_id = e.identifier_type_id
                           AND s.identifier_value = e.identifier_value
                           WHERE e.study_id is null ";

            res = db.ExecuteTransferSQL(sql_string, schema_name, "study_identifiers", "existing studies");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " study_identifiers, existing studies");

            db.ExecuteSQL("DROP TABLE IF EXISTS nk.source_data;");
            db.ExecuteSQL("DROP TABLE IF EXISTS nk.existing_data;");

            db.Update_SourceTable_ExportDate(schema_name, "study_identifiers");
        }

This process is repeated for all study attribute types. It is true that the steps of creating existing and source data sets, and then doing the insert, could all be rolled up into one SQL statement. It is a lot clearer, however, to separate out the steps and use temporary tables.

Aggregating Object Data

Transfer of object data also requires an initial 'id processing' phase, followed by the transfer itself. The id processing, however, differs according to whether the source is study based, where all data objects are already related to studies, or object based, where the study-object links have to be constructed, (the only current object based source is PubMed).
Note that - at least for the moment - there is not a requirement to establish an object-object links table, listing those objects that are duplicated in different sources, and thus which present with different ids. For study based sources the assumption is that almost all objects listed will be unique. This seems to be justified, and there is any case the ability to check this later in the processing. But there is also a practical element here - without PIDs, and with generated data object ids that are always source specific, it is extremely difficult to unambiguously identify duplicates even if they did occur. Journal articles in the system are duplicated, of course, but they always present with the same genuine identifier (the PubMed Id) and so present a different challenge - see PubMed Data for details. As more data sources are added, in particular more object based data sources, as well as journal articles not listed in PubMed, it will be necessary to revisit this issue, and almost certainly try to find ways of identifying duplicate objects across different sources.
In study based sources the source data objects will already carry a reference to the sd_sid study identifier. But that identifier will need 'translating' into the mdr integer identifier when the object is transferred to the central database, and the data object itself will also need an integer id assigned. As with the study data, the first step is to establish a temporary table that will hold the object identifier data - and in this case the associated study identifier - which is called temp_object_ids. The structure of this table is analogous to the permanent nk.all_ids_data_objects, to which the data will later be transferred, after processing. A small nk.temp_objects_to_add is also created at the same time.

        public void SetUpTempObjectIdsTables()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids;
                      CREATE TABLE IF NOT EXISTS nk.temp_object_ids(
                        object_id                INT
                      , source_id                INT
                      , sd_oid                   VARCHAR
                      , parent_study_source_id   INT 
                      , parent_study_sd_sid      VARCHAR
                      , parent_study_id          INT
                      , is_preferred_study       BOOLEAN
                      , datetime_of_data_fetch   TIMESTAMPTZ
                      ); ";
                conn.Execute(sql_string);

                sql_string = @"DROP TABLE IF EXISTS nk.temp_objects_to_add;
                      CREATE TABLE IF NOT EXISTS nk.temp_objects_to_add(
                        object_id                INT
                      , sd_oid                   VARCHAR
                      ); 
                      CREATE INDEX temp_objects_to_add_sd_oid on nk.temp_objects_to_add(sd_oid);";
                conn.Execute(sql_string);
            }
        }

The data object and study source ids are then retrieved from the source database, along with the datetime_of_data_fetch value. The source id is incorporated twice, as both the object and study source id. The collection of ObjectId objects that result are stored in the temporary temp_object_ids table.

        public IEnumerable<ObjectId> FetchObjectIds(int source_id)
        {
            string conn_string = repo.GetConnString(source_id);
            using (var conn = new NpgsqlConnection(conn_string))
            {
                string sql_string = @"select " + source_id.ToString() + @" as source_id, " 
                          + source_id.ToString() + @" as parent_study_source_id, 
                          sd_oid, sd_sid as parent_study_sd_sid, datetime_of_data_fetch
                          from ad.data_objects";

                return conn.Query<ObjectId>(sql_string);
            }
        }

        public ulong StoreObjectIds(PostgreSQLCopyHelper<ObjectId> copyHelper, IEnumerable<ObjectId> entities)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                conn.Open();
                return copyHelper.SaveAll(conn, entities);
            }
        }

The data in the temporary table is then updated with the integer mdr version of the study's id, and its is_preferred status. Any records that cannot be matched to an existing study (though this should not happen - it is included only to deal with possible errors) are deleted. At this point it would be possible to check the data object and its attributes for possible duplication against a pre-existing object for this study. This would obviously not apply to objects that are directly embedded in the data source - e.g. a trial registry entry, or a repository study 'landing page' - but it could happen that a document or dataset could be duplicated. Unfortunately the names assigned to different data objects are generally not consistent. Comparison could be made on type and version, though the version is very rarely given, and / or on the URL of the associated instance(s). At the moment, however, the incidence of duplicate data objects appears to be zero, or at least extremely rare, so these checks have not yet been implemented. If they were added, any duplicate data object would be simply removed from the temp_object_ids dataset, so that neither it nor any of its attributes would be added to the aggregate data.

        public void UpdateObjectsWithStudyIds(int source_id)
        {
            // Update the object parent study_id using the 'correct'
            // value found in the all_ids_studies table

            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"UPDATE nk.temp_object_ids t
                           SET parent_study_id = s.study_id, 
                           is_preferred_study = s.is_preferred
                           FROM nk.all_ids_studies s
                           WHERE t.parent_study_sd_sid = s.sd_sid
                           and t.parent_study_source_id = s.source_id;";
                conn.Execute(sql_string);

                // Drop those link records that cannot be matched

                sql_string = @"DELETE FROM nk.temp_object_ids
                             WHERE parent_study_id is null;";
                conn.Execute(sql_string);
            }
        }

The temporary object id records are then added to the permanent table, with each record gaining an id from the identity column. The object_id is then derived as that id. Because there is no object-object linkage to consider there is no need to go through the more complex process used for studies - but the same strategy and steps are used to allow that process to be inserted later, if and when necessary.

        public void UpdateAllObjectIdsTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO nk.all_ids_data_objects
                             (source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch)
                             select source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch
                             from nk.temp_object_ids";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.all_ids_data_objects
                            SET object_id = id
                            WHERE source_id = " + source_id.ToString() + @"
                            and object_id is null;";
                conn.Execute(sql_string);

            }
        }

As an aid to the data transfer process the temp_objects_to_add table is then filled with the data linking the source sd_oid with the new mdr integer object id.

        public void FillObjectsToAddTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO nk.temp_objects_to_add
                             (object_id, sd_oid)
                             SELECT distinct object_id, sd_oid 
                             FROM nk.all_ids_data_objects
                             WHERE source_id = " + source_id.ToString();
                conn.Execute(sql_string);
            }
        }

The data transfer for objects is much more straightforward than for studies as there are no 'preferred' versions of studies. The code for transferring the data objects themselves is given below, with the object_id being drawn from the temp_objects_to_add table and the rest of the data from the source table. The ExecuteTransferSQL is used to carry out the transfer, allowing chunking of records if it is needed, and again the source records are updated with the date-time of the transfer. Note that the link to the 'parent' study has disappeared in the mdr database. That link is retained, instead, in the all_ids_data_objects table.

        public void LoadDataObjects(string schema_name)
        {
             string sql_string = @"INSERT INTO ob.data_objects(id,
                    display_title, version, doi, doi_status_id, publication_year,
                    object_class_id, object_type_id, managing_org_id, managing_org,
                    lang_code, access_type_id, access_details, access_details_url,
                    url_last_checked, eosc_category, add_study_contribs, 
                    add_study_topics)
                    SELECT t.object_id,
                    s.display_title, s.version, s.doi, s.doi_status_id, s.publication_year,
                    s.object_class_id, s.object_type_id, s.managing_org_id, s.managing_org,
                    s.lang_code, s.access_type_id, s.access_details, s.access_details_url,
                    s.url_last_checked, s.eosc_category, s.add_study_contribs, 
                    s.add_study_topics
                    FROM " + schema_name + @".data_objects s
                    INNER JOIN nk.temp_objects_to_add t
                    on s.sd_oid = t.sd_oid ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "data_objects", "");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " data_objects");

            db.Update_SourceTable_ExportDate(schema_name, "data_objects");
        }

For object attributes the process is again much more straightforward than for studies. Because all objects that are transferred are assumed to be unique, a single data transfer call is all that is required for their attributes. As an example the code for object instances is given below.

        public void LoadObjectInstances(string schema_name)
        {
            string sql_string = @"INSERT INTO ob.object_instances(object_id,  
            instance_type_id, repository_org_id, repository_org,
            url, url_accessible, url_last_checked, resource_type_id,
            resource_size, resource_size_units, resource_comments)
            SELECT t.object_id,
            instance_type_id, repository_org_id, repository_org,
            url, url_accessible, url_last_checked, resource_type_id,
            resource_size, resource_size_units, resource_comments
            FROM " + schema_name + @".object_instances s
                    INNER JOIN nk.temp_objects_to_add t
                    on s.sd_oid = t.sd_oid ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "object_instances", "");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " object_instances");

            db.Update_SourceTable_ExportDate(schema_name, "object_instances");
        }

The description above only applies to study based sources. For PubMed, the links between the PubMed data and the studies are first identified. Two sources are used - the 'bank id' data within the PubMed data itself, referring to trial registry ids, and the 'source id' data in the study based sources, where references are provided to relevant papers. These two sets of data are combined and de-duplicated, and two final sets of data are created: the distinct list of PubMed data objects, and the list of links between those objects and studies. These steps require considerable additional processing, which is described on the PubMed Data page.

Creating the Core Tables

The -C option copies the data from the aggregating schema (st, ob, and nk) to the core schema without any processing, other than creating the provenance strings for both studies and data objects. The latter may be composite if more than one source was involved.
The process involved is therefore very simple. The core tables are dropped and then re-created, and the study and object data are simply transferred across from the st and ob schemas, respectively. The ids created within the aggregation process are also copied - there is no identifier generation in this step. The various link tables are not copied, but the basic link between studies and objects is extracted, to become the study_objects_links table. The core database therefore holds no record of the source, or source identifiers, of studies and objects, except within the provenance strings.

        public void LoadStudyObjectLinks()
        {
            string sql_string = @"INSERT INTO core.study_object_links(id, 
            study_id, object_id)
            SELECT s.id, s.parent_study_id, s.object_id
            FROM nk.all_ids_data_objects s ";

            int res = db.ExecuteCoreTransferSQL(sql_string, "nk.all_ids_data_objects");
        }

The provenance strings indicate the source(s) of the data and the date(s) on which it was last read. The source for these strings lie in the link schema ids tables - nk.all_ids_studies and nk.all_ids_data_objects, in the datetime_of_data_fetch field. The data in that field ultimately derives from the most recent download process for that data. This field is stored in the all_ids_ tables because it is a function of a specific source. The provenance string combines the data from each source, to give the date-time data was obtained from every relevant source. For studies these may be multiple sources, for objects (at the moment) there is always one. The code is shown below. The Postgres string_agg function is used on the data source name(s) and time(s) associated with the studies, with the only variation being that if the source data was accessed via a WHO ICRTP file this is acknowledged in the provenance string.

        public void GenerateStudyProvenanceData()
        {
            string sql_string = "";
            sql_string = @"CREATE table nk.temp_study_provenance
                     as
                     select s.study_id, 
                     'Data retrieved from ' || string_agg(d.repo_name || ' at ' || to_char(s.datetime_of_data_fetch, 'HH24:MI, dd Mon yyyy'), ', ' ORDER BY s.datetime_of_data_fetch) as provenance
                     from nk.all_ids_studies s
                     inner join
                        (select t.id,
                          case 
                            when p.uses_who_harvest = true then t.default_name || ' (via WHO ICTRP)'
                            else t.default_name
                          end as repo_name 
                         from context_ctx.data_sources t
                         inner join mon_sf.source_parameters p
                         on t.id = p.id) d
                     on s.source_id = d.id
                     group by study_id ";
            db.ExecuteSQL(sql_string);

            sql_string = @"update core.studies s
                    set provenance_string = tt.provenance
                    from nk.temp_study_provenance tt
                    where s.id = tt.study_id ";
            db.ExecuteProvenanceSQL(sql_string, "core.studies");

            sql_string = @"drop table nk.temp_study_provenance;";
            db.ExecuteSQL(sql_string);
        }

Some examples of the strings that result from this are shown below.

  • Data retrieved from Iranian Registry of Clinical Trials (via WHO ICTRP) at 21:19, 22 Jul 2020
  • Data retrieved from Registro Brasileiro de Ensaios Clínicos (via WHO ICTRP) at 13:43, 04 Oct 2020, Deutschen Register Klinischer Studien (via WHO ICTRP) at 13:45, 04 Oct 2020, EU Clinical Trials Register at 11:51, 06 Oct 2020, ClinicalTrials.gov at 18:28, 09 Oct 2020
  • Data retrieved from ISRCTN at 22:41, 18 Apr 2020
  • Data retrieved from Australian New Zealand Clinical Trials Registry (via WHO ICTRP) at 13:51, 04 Oct 2020, EU Clinical Trials Register at 12:43, 06 Oct 2020

The generation and format of provenance strings for data objects is very similar, but there is only ever one source. The creation of provenance strings is the only processing that occurs during the transfer of the data tom the core tables.

Creating the JSON Data

The final stages of the aggregation process involve the creation of JSON equivalents of the core data, for both studies and objects, and the (optional) generation of that json into files, followed by the (optional) compression of the JSON files into zipped archives for ease of distribution.
The JSON that is created, using the -J flag, is stored two small tables , one for studies the other for data objects, that hold only the id and the JSON string (as Postgres data type JSON). The process in each case consists of creating a C# object that matches the ECRIN metadata schema exactly and filling it that with the values from the database, one study or one data object at a time. The objects and their properties are shown below. Constituent objects (lookup, age_param, study_topic, dataset_consent, object_title etc., are all defined to match the metadata schema.

    public class JSONStudy
    {
        public string file_type { get; set; }
        public int id { get; set; }
        public string display_title { get; set; }
        public text_block brief_description { get; set; }
        public text_block data_sharing_statement { get; set; }
        public lookup study_type { get; set; }
        public lookup study_status { get; set; }
        public int? study_enrolment { get; set; }
        public lookup study_gender_elig { get; set; }
        public age_param min_age { get; set; }
        public age_param max_age { get; set; }
        public string provenance_string { get; set; }

        public List<study_identifier> study_identifiers { get; set; }
        public List<study_title> study_titles { get; set; }
        public List<study_topic> study_topics { get; set; }
        public List<study_feature> study_features { get; set; }
        public List<study_relationship> study_relationships { get; set; }
        public List<int> linked_data_objects { get; set; }
    }

    public class JSONDataObject
    {
        public string file_type { get; set; }
        public int id { get; set; }
        public string doi { get; set; }
        public string display_title { get; set; }
        public string version { get; set; }
        public lookup object_class { get; set; }
        public lookup object_type { get; set; }
        public int? publication_year { get; set; }
        public lookup managing_organisation { get; set; }
        public string lang_code { get; set; }
        public lookup access_type { get; set; }
        public object_access access_details { get; set; }
        public int? eosc_category { get; set; }
        public string provenance_string { get; set; }

        public record_keys dataset_record_keys { get; set; }
        public deidentification dataset_deident_level { get; set; }
        public consent dataset_consent { get; set; }

        public List<object_instance> object_instances { get; set; }
        public List<object_title> object_titles { get; set; }
        public List<object_contributor> object_contributors { get; set; }
        public List<object_date> object_dates { get; set; }
        public List<object_topic> object_topics { get; set; }
        public List<object_description> object_descriptions { get; set; }
        public List<object_identifier> object_identifiers { get; set; }
        public List<object_right> object_rights { get; set; }
        public List<object_relationship> object_relationships { get; set; }
        public List<int> linked_studies { get; set; }
    }

Once the object is created it is serialised to a json string and written to the table. The only difficulty is that to store it as a JSON datatype it is necessary to explicitly define the parameter as being of type json, making the call to the Dapper micro-ORM slightly more complex than normal.

        public void StoreJSONStudyInDB(int id, string study_json)
        { 
            using (NpgsqlConnection Conn = new NpgsqlConnection(connString))
            { 
                Conn.Open();

                // To insert the string into a json field the parameters for the 
                // command have to be explicitly declared and typed

                using (var cmd = new NpgsqlCommand())
                {
                   cmd.CommandText = "INSERT INTO core.studies_json (id, json) VALUES (@id, @p)";
                   cmd.Parameters.Add(new NpgsqlParameter("@id", NpgsqlDbType.Integer) {Value = id });
                   cmd.Parameters.Add(new NpgsqlParameter("@p", NpgsqlDbType.Json) {Value = study_json });
                   cmd.Connection = Conn;
                   cmd.ExecuteNonQuery();
                }
                Conn.Close();
           }
       }

If the -F flag is specified at the same time as the -J parameter, the system will - at the same time as it writes the JSON to the database, write a file to local storage using a formatted JSON string. Once the files are created, running the utility FileZipper with the -J parameter zips those files using the zip compression engine built into .Net, in the System.IO.Compression namespace. The routine for zipping study files is shown below (that for data objects is exactly analogous). It starts by reading the source folder (this is stored in a JSON configuration file) and obtaining the folders within that. It then zips the folders in batches - at the moment of 10. As each folder has up to 100,000 records each batch includes up to a million files. The number of zip files is calculated and then, for each, a series of string manipulations based on the date and the first and last folder of the batch generate a name for the zip file. The total number of files in the batch is then calculated and added to the zip file name. The program then uses the ZipArchive class to take each file in each folder in the batch, zip it and add it to the designated zip file.

        public void ZipStudyFiles()
        {
            JSONStudyDataLayer repo = new JSONStudyDataLayer();
            folder_base = repo.StudyJsonFolder;
            folder_list = Directory.GetDirectories(folder_base);

            // Batch size of 10 folders, each of about 10,000 records - each zip therefore 100,000 files
            // should produce about 6 zips
            int folder_batch = 10;

            number_zip_files_needed = (folder_list.Length % folder_batch == 0) ? (folder_list.Length / folder_batch) : (folder_list.Length / folder_batch) + 1;
            for (int n = 0; n < number_zip_files_needed; n++)
            {
                int start_number = n * folder_batch;
                int end_limit = ((n + 1) * folder_batch);
                if (end_limit > folder_list.Length) end_limit = folder_list.Length;

                // Folder name lengths could change (not common)

                string first_folder = folder_list[start_number];
                string last_folder = folder_list[end_limit - 1];
                int ff_studies = first_folder.LastIndexOf("\\studies ") + 9;
                int lf_studies = last_folder.LastIndexOf("\\studies ") + 9;
                string first_id = first_folder.Substring(ff_studies).Substring(0, 7);
                int last_id_pos = last_folder.Substring(lf_studies).LastIndexOf(" ") + 1;
                string last_id = last_folder.Substring(lf_studies).Substring(last_id_pos, 7);
                string zip_file_suffix = today + " " + first_id + " to " + last_id;

                // get total number of files in this batch
                int file_num = 0;
                for (int i = start_number; i < end_limit; i++)
                {
                    folder_path = folder_list[i];
                    string[] file_list = Directory.GetFiles(folder_path);
                    file_num += file_list.Length;
                }
                string file_num_suffix = " [" + file_num.ToString() + " files]";

                string zip_file_path = Path.Combine(folder_base, "study ids " + 
                                            zip_file_suffix + file_num_suffix + ".zip");
                using (ZipArchive zip = ZipFile.Open(zip_file_path, ZipArchiveMode.Create))
                {
                    for (int i = start_number; i < end_limit; i++)
                    {
                        folder_path = folder_list[i];
                        string[] file_list = Directory.GetFiles(folder_path);
                        int last_backslash = 0;
                        string entry_name = "";
                        for (int j = 0; j < file_list.Length; j++)
                        {
                            file_path = file_list[j];
                            last_backslash = file_path.LastIndexOf("\\") + 1;
                            entry_name = file_path.Substring(last_backslash);
                            zip.CreateEntryFromFile(file_path, entry_name);
                        }
                    }
                }
            }
        }