Aggregating Data

From ECRIN-MDR Wiki
Revision as of 11:36, 20 November 2020 by Admin (talk | contribs) (Aggregating Object Data)
Jump to navigation Jump to search

Introduction

The program takes all the data within the ad tables in the various source databases and loads it to central tables within the mdr database, dealing with multiple entries for studies and creating the link data information between studies and data objects. The aggregated data is held within tables in the st (study), ob (object) and nk (links) schemas. A fourth schema, 'core' is then populated as a direct import from the others, to provide a single simplified mdr dataset that can be exported to other systems.
Note that the aggregation process starts from scratch each time - there is no attempt to edit existing data. All of the central tables in the st, ob and nk schemas are dropped and then re-created during the main aggregation processes (triggered by -D). All of the core tables are dropped and re-created when the data is transferred to the core schema (triggered by -C). This makes each aggregation longer (it takes about 1 hour in total) but simplifies the processes involved, allowing a focus on the aggregation itself, without a need to consider updates and edits, and it makes the system much easier to maintain.
Although an aggregation can be run at any time it makes most sense to do so after the following sequence of events:

  • Downloads are run for all data sources to get the local source file collections as up to date as possible.
  • Contextual data is updated and / or augmented, so far as resources and time allow. (This is a gradual process).
  • Harvests and imports are run for all study based sources to get the 'baseline' study data as up to date as possible, in each of the study source databases.
  • Harvests and imports are run for all object based sources to get additional data object data as up to date as possible, in each of the object source databases.

In other words the aggregation module should be scheduled to run after the application of all other modules.

One important difference between the data in the central mdr tables and that in the source databases concerns the id fields used for both studies and data objects. In the source databases, these ids are called sd_sid (source data study id) and sd_oid (source data object id) respectively. sd_sid values are usually trial registry ids, or sometimes data repository accession numbers. sd_oids are usually constructed hash values, or in the case of journal papers PubMed ids. It is true that the records in the sd and ad schemas in the source databases also have an integer id, but this is a simple identity column and has no significance other than as a counter, a way of breaking up records into groups, by id, if and when necessary. The values are not transferred from the sd to the ad tables during the import process, or from the ad tables to the central tables during aggregation. The 'true' identifiers for the records, which of course are transferred, are in the sd_sid or sd_oid fields.
Because they are always source specific the sd_sid / sd_oid fields cannot, however, be guaranteed to be unique across all studies or objects (even though, in practice, at the moment they are). More fundamentally, because an MDR study entry (and in the future a data object entry) may be derived from more than one study (or object) it does not make sense to use one particular id to identify that study (or object). The ids used for both studies and objects in the central tables are therefore integers, and the system drops the sd_sids and sd_oids from the table data, although they are retained in the links tables. As a bonus the use of integer ids normally produces much faster processing than using strings, which given the size of some of the tables in the system is a non-trivial issue. The way in which the ids for studies and data objects are generated is described in the sections below. It is true that the attribute table records in the core database also have identity integer ids, but again these have no significance other than as a record label (e.g. to quickly locate a particular attribute record if the need for that arose).
Because the aggregation process starts from a 'blank canvas' each time, and re-creates all the tables each time, there are two important consequences:

  • The ids given for studies and data objects are not consistent in different versions of the aggregated data. They are consistent internally in any one version - but not across versions.
  • The ids given for studies and data objects cannot therefore be public identifiers, and so should never be exposed to users.

It should be possible to develop systems to generate permanent accession numbers, if that is required in the future, but for the moment it is important to be aware of this property of the ids in the central mdr system.

N.B. The aggregation code can be found at https://github.com/ecrin-github/DataAggregator

Parameters

The program is a console app, to enable it to be more easily scheduled. There are a variety of flag type parameters, that can be used alone or in combination (though only some combinations make sense). These include:
-D: which indicates that the aggregating data transfer should take place, from the source ad tables to the tables in the st (studies), ob (objects) and nk (links) schemas. This is the necessary first step of the aggregation process.
-C: indicates that the core tables should be created and filled from the aggregate tables, i.e. data is combined from the st, ob and nk schemas in to a single, simpler core schema.
-J: indicates that the core data be used to create JSON versions of the data within the core database.
-F: indicates that the core data should be used to create JSON files of two types, one for each study and another for each data object. It has no effect unless the -J parameter is also supplied.
-S: collects statistics about the existing data, from both the ad tables and the central aggregated tables.
-Z: zips the json files created by the -F parameter into a series of zip files, with up to 100,000 files in each. This is for ease of transfer to other systems.
The -S parameter can be provided at any time or on its own. It makes little sense to trigger the other processes without an initial call using -D. A -C call would then normally follows, and then -J (-F), and finally -Z. The system can cope with multiple parameters, and applies them in the order given: -D -C -J -F -S -Z. It is easier to see what is happening, however, to make multiple calls to the program working through the parameter list as described.

The Aggregation Process

Initial Setup

The main aggregation process, as triggered by -D, begins with the creation of a new set of central tables in the st, ob and nk schemas. Two key tables created in the nk schema are nk.all_ids_studies and nk.all_ids_data_objects. These tables hold the data that links the ids used in the source databases to those used in the mdr databases, for studies and objects respectively.

        public void create_table_all_ids_studies()
        {
            string sql_string = @"CREATE TABLE nk.all_ids_studies(
                id                       INT             NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 3000001 INCREMENT BY 1) PRIMARY KEY
              , study_id                 INT             NULL
              , source_id                INT             NULL
              , sd_sid                   VARCHAR         NULL
              , is_preferred             BOOLEAN         NULL
              , datetime_of_data_fetch   TIMESTAMPTZ     NULL
             );
            CREATE INDEX study_all_ids_studyid ON nk.all_ids_studies(study_id);
            CREATE INDEX study_all_ids_sdsidsource ON nk.all_ids_studies(source_id, sd_sid);";

            using (var conn = new NpgsqlConnection(db_conn))
            {
                conn.Execute(sql_string);
            }
        }

        public void create_table_all_ids_data_objects()
        {
            string sql_string = @"CREATE TABLE nk.all_ids_data_objects(
                id                       INT             NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 10000001 INCREMENT BY 1) PRIMARY KEY
              , object_id                INT             NULL
              , source_id                INT             NOT NULL
              , sd_oid                   VARCHAR         NULL
              , parent_study_source_id   INT             NULL
              , parent_study_sd_sid      VARCHAR         NULL
              , parent_study_id          INT             NULL
              , is_preferred_study       BOOLEAN         NULL
              , datetime_of_data_fetch   TIMESTAMPTZ     NULL
            );
            CREATE INDEX object_all_ids_objectid ON nk.all_ids_data_objects(object_id);
            CREATE INDEX object_all_ids_sdidsource ON nk.all_ids_data_objects(source_id, sd_oid);";

            using (var conn = new NpgsqlConnection(db_conn))
            {
                conn.Execute(sql_string);
            }
        }

After setting up the tables the program interrogates the mon database to retrieve a set of 'Source' objects, each one corresponding to a source database. The list is in order of decreasing 'preference', where preference indicates the usefulness of the source as the primary data source for duplicated studies (see the Preferred Source concept section in Study-study links). It runs through those sources to first obtain a list of the 'other registry ids' in each database. In other words it builds up a list of all studies that are found in more than one data source. About 25,000 studies are registered in 2 registries and about another 1,000 are registered in 3 or more registries. The details of the process are provided in Study-study links, but the outcome is a list of ordered study pairs, with each duplicated study indicating the 'more preferred' version of its data source.
Some studies (several hundred) have more complex relationships - for example are registered in two registries but in one of those are entered as a group of related studies rather than having a simple 1-to-1 relationship with the other registry study. These are removed and instead added to the study-study relationship data. Again the details are in Study-study links.
Armed with the study linkage data, the system can begin the data transfer to the central mdr database.

Overview of Process

The aggregation process is summarised by the high level loop below. For each source:

  • The ad schema of the source database is linked as a foreign table wrapper (FTW) in the mdr database. The schema name returned from this process is always <database name>_ad, e.g. ctg_ad, isrctn_ad, etc. It is required for future use in SQL commands.
  • A connection string for the source database is created for future use

if the source has study tables (i.e. is a study based source)

  • The study ids are retrieved from the source database, and processed to see which are new and which correspond to studies already in the central mdr database.
  • As part of that processing, the source data study ids (the string sd_sids) are replaced by integer ids that are unique across the whole MDR system.
  • The data for the studies and their attributes is added to the st schema tables in the mdr db
  • The object ids are retrieved and linked to the studies using the mdr ids for studies rather than the original source based ones.
  • The string source data object ids (sd_oids) are replaced by integer ids

else, if an object based data source

  • the object ids are retrieved and linked to studies using source-specific processes
  • The string source data object ids (sd_oids) are replaced by integer ids

In either case...

  • The data for data objects and their attributes is transferred to the ob schema tables in the mdr db
  • The relationship between old (sd_sid and sd_oid) ids and new integer ids in the central system is retained in link tables in the nk schema. This link data is created in the id processing steps above.
  • The foreign table wrapper for the source database is dropped.
      foreach (Source s in sources)
      {
         string schema_name = repo.SetUpTempFTW(s.database_name);
         string conn_string = logging_repo.FetchConnString(s.database_name);
         DataTransferBuilder tb = new DataTransferBuilder(s, schema_name, conn_string, logging_repo);
         if (s.has_study_tables)
         {
             tb.ProcessStudyIds();
             tb.TransferStudyData();
             tb.ProcessStudyObjectIds();
         }
         else
         {
             tb.ProcessStandaloneObjectIds();
         }
         tb.TransferObjectData();
         repo.DropTempFTW(s.database_name);
     }

Processing Study Ids

For each source, the initial step is to set up a temporary table to hold the study ids (temp_study_ids). The next step is then to fetch those study ids, as well as the datetime_of_data_fetch field, from the source data, also including the source id in the collection of StudyId objects created. This data is transferred to the temporary table. Note that the temporary table has the same structure as the permanent all_ids_studies table. Later on the temp_study_ids data will be transferred to that permanent table - but not before some records have been modified.

        public void SetUpTempStudyIdsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"DROP TABLE IF EXISTS nk.temp_study_ids;
                        CREATE TABLE nk.temp_study_ids(
                        study_id                 INT
                      , source_id                INT
                      , sd_sid                   VARCHAR
                      , is_preferred             BOOLEAN
                      , datetime_of_data_fetch   TIMESTAMPTZ
                      ); ";
                conn.Execute(sql_string);
            }
        }

        public IEnumerable<StudyId> FetchStudyIds(int source_id, string source_conn_string)
        {
            using (var conn = new NpgsqlConnection(source_conn_string))
            {
                string sql_string = @"select " + source_id.ToString() + @" as source_id, 
                          sd_sid, datetime_of_data_fetch
                          from ad.studies";
                return conn.Query<StudyId>(sql_string);
            }
        }

        public ulong StoreStudyIds(PostgreSQLCopyHelper<StudyId> copyHelper, IEnumerable<StudyId> entities)
        {
            // stores the study id data in a temporary table
            using (var conn = new NpgsqlConnection(connString))
            {
                conn.Open();
                return copyHelper.SaveAll(conn, entities);
            }
        }

The data in the temporary table is checked against the study_study links table. If any study id corresponds to a study already in the the system, it will appear in the left hand side of the study-study links table (source_id and sd_id). The corresponding preferred id, and source id, will be in the right hand side of this table (preferred_source_id and preferred_sd_sid). That preferred source_id / sd_sid will also exist in the all_ids_studies table. This is because the sources are aggregated 'most preferred first', so the more preferred version can be guaranteed to be present. That source_id / sd_sid combination will have an integer study_id, created when the data was first transferred to the all_ids_studies table, and that value is used to update the corresponding field in the temp_study_ids field. At the same time the study is marked as 'a non-preferred duplicate' by setting the is_preferred field to false.

        public void CheckStudyLinks()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"UPDATE nk.temp_study_ids t
                           SET study_id = s.study_id, is_preferred = false
                           FROM nk.study_study_links k
                                INNER JOIN nk.all_ids_studies s
                                ON k.preferred_sd_sid = s.sd_sid
                                AND k.preferred_source_id = s.source_id
                           WHERE t.sd_sid = k.sd_sid
                           AND t.source_id =  k.source_id;";
                int res = db.ExecuteSQL(sql_string);
                ...
                ...

In the case of the first and largest source (ClinicalTrials.gov) there will obviously be no existing study Id data and (because it is the most preferred of all the data sources) no matching record on the left hand side of the study links table. The update above therefore has no effect and no records are updated. For later sources some - usually a small majority - of the studies - will be updated to include a pre-existing study_id.
A small table - nk.existing_studies - is then created that just lists the sd_sid and study_id values of these 'pre-existing' studies - it will be useful later on.

                ...
                sql_string = @"DROP TABLE IF EXISTS nk.existing_studies;
                               CREATE TABLE nk.existing_studies as 
                                       SELECT sd_sid, study_id
                                       FROM nk.temp_study_ids
                                       WHERE is_preferred = false";
                db.ExecuteSQL(sql_string);

            }
        }

Now the temp_study_ids are imported into the permanent all_ids_studies table. All records are provided with an id value because the id field is an identity column. The newly added records are then updated, so that those that do not already have a study_id are given the identity id as their study_id. At the same time the is_preferred field for these records is set to true, to indicate that this is the first time this study has been added to the system. Because it is usually easier to work with the smaller temp_study_ids table than the all_ids_studies, the temporary table is 'back-filled' from the permanent data, so that the study_ids that were null there now hold the values in the corresponding permanent table rows. New study records are therefore given an id derived from the identity value in all_ids_studies, while pre-existing studies are given the same id as the pre-existing version.

 
        public void UpdateAllStudyIdsTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // Add the new study id records to the all Ids table

                string sql_string = @"INSERT INTO nk.all_ids_studies
                            (study_id, source_id, sd_sid, 
                             datetime_of_data_fetch, is_preferred)
                             select study_id, source_id, sd_sid, 
                             datetime_of_data_fetch, is_preferred
                             from nk.temp_study_ids";

                conn.Execute(sql_string);

                // Where the study_ids are null they can take on the value of the 
                // record id.

                sql_string = @"UPDATE nk.all_ids_studies
                            SET study_id = id, is_preferred = true
                            WHERE study_id is null
                            AND source_id = " + source_id.ToString();

                conn.Execute(sql_string);

                // 'Back-update' the study temp table using the newly created study_ids
                // now all should be done...

                sql_string = @"UPDATE nk.temp_study_ids t
                           SET study_id = a.study_id, is_preferred = true
                           FROM nk.all_ids_studies a
                           WHERE t.source_id = a.source_id
                           AND t.sd_sid = a.sd_sid
                           AND t.study_id is null;";

                conn.Execute(sql_string);
            }
        }

The study ids have now been processed and the various tables generated can be used to support the transfer of the study data.

Transferring the Study Data

Transferring the studies themselves is straightforward. A main study record can only have one data source. If a study is new the study record is transferred. If it is a duplicate it is ignored. The selection can be done by linking the source data to the temp_study_ids table where is_preferred is true. The study_id is derived from the temp_study_ids table, but all other fields are from the source. Note that the ExecuteTransferSQL function is used again to 'chunk' the transfer process if too many records are involved. Note also that with this and every other transfer the source records are updated with the datetime of the aggregation process, using the Update_SourceTable_ExportDate procedure. (Note this update is applied to all studies - whether or not the data is used).

        public void LoadStudies(string schema_name)
        {
            
            // Insert the study data unless it is already in under another 
            // id (i.e. t.is_preferred = false).

            string sql_string = @"INSERT INTO st.studies(id, 
                    display_title, title_lang_code, brief_description, bd_contains_html,
                    data_sharing_statement, dss_contains_html,
                    study_start_year, study_start_month, study_type_id, study_status_id,
                    study_enrolment, study_gender_elig_id, min_age, min_age_units_id,
                    max_age, max_age_units_id)
                    SELECT t.study_id,
                    s.display_title, s.title_lang_code, s.brief_description, s.bd_contains_html,
                    s.data_sharing_statement, s.dss_contains_html,
                    s.study_start_year, s.study_start_month, s.study_type_id, s.study_status_id,
                    s.study_enrolment, s.study_gender_elig_id, s.min_age, s.min_age_units_id,
                    s.max_age, s.max_age_units_id
                    FROM nk.temp_study_ids t
                    INNER JOIN " + schema_name + @".studies s
                    on t.sd_sid = s.sd_sid
                    WHERE t.is_preferred = true ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "studies", "new studies");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " studies, new studies");

            // Note that the statement below also updates studies that are not added as new
            // (because they equate to existing studies) but which were new in the 
            // source data.

            db.Update_SourceTable_ExportDate(schema_name, "studies");
        }

For the study attributes things are more complex. If a study is new to the system all its attributes are simply added to the system. If the study is a duplicate however, its attributes may or may not already be in the system. The new data belonging to these studies must therefore be compared to the existing data for these studies, using a few key fields, and only added if no match is found. In the code below, using study identifiers as an example, some field list strings are first established to simplify the later statements. The source field list needs to include a reference to the source table ('s.'). Then there is an initial transfer of all the attributes of the new studies (k.is_preferred = true).
For studies that have been added already, two temporary datasets are established. The first, nk.source_data, has all the attributes for these studies that exist in the source data. The second, nk.existing_data, has all the attributes for these studies that are already in the database. Note the use of the lightweight nk.existing_studies table, created in the id processing phase, to simplify these steps, and to provide the mdr study_ids for these records. The insertion required is of those source data records that do not exist in the existing data, using a left join on key fields to identify the new attributes. In this case the identifier_type_id and identifier_value fields are used, but this is obviously table specific. As usual the SQL statement is passed to ExecuteTransferSQL to carry out the actual insertion, to allow chunking of data. The temporary tables are then removed and the export date-time in the source data is updated.

        public void LoadStudyIdentifiers(string schema_name)
        {
            string destination_field_list = @"study_id, 
                    identifier_type_id, identifier_org_id, identifier_org, 
                    identifier_value, identifier_date, identifier_link ";

            string source_field_list = @" 
                    s.identifier_type_id, s.identifier_org_id, s.identifier_org, 
                    s.identifier_value, s.identifier_date, s.identifier_link ";

            // For 'preferred' study Ids add all identifiers.

            string sql_string = @"INSERT INTO st.study_identifiers(" + destination_field_list + @")
                    SELECT k.study_id, " + source_field_list + @"
                    FROM nk.temp_study_ids k
                    INNER JOIN " + schema_name + @".study_identifiers s
                    on k.sd_sid = s.sd_sid
                    WHERE k.is_preferred = true ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "study_identifiers", "new studies");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " study_identifiers, new studies");

            // For 'existing studies' study Ids add only new identifiers.

            sql_string = @"DROP TABLE IF EXISTS nk.source_data;
                           CREATE TABLE nk.source_data as 
                           SELECT es.study_id, d.* 
                           FROM " + schema_name + @".study_identifiers d
                           INNER JOIN nk.existing_studies es
                           ON d.sd_sid = es.sd_sid";
            db.ExecuteSQL(sql_string);

            sql_string = @"DROP TABLE IF EXISTS nk.existing_data;
                           CREATE TABLE nk.existing_data as 
                           SELECT es.sd_sid, es.study_id, 
                           c.identifier_type_id, c.identifier_value 
                           FROM st.study_identifiers c
                           INNER JOIN nk.existing_studies es
                           ON c.study_id = es.study_id;";
            db.ExecuteSQL(sql_string);

            sql_string = @"INSERT INTO st.study_identifiers(" + destination_field_list + @")
                           SELECT s.study_id, " + source_field_list + @" 
                           FROM nk.source_data s
                           LEFT JOIN nk.existing_data e
                           ON s.sd_sid = e.sd_sid
                           AND s.identifier_type_id = e.identifier_type_id
                           AND s.identifier_value = e.identifier_value
                           WHERE e.study_id is null ";

            res = db.ExecuteTransferSQL(sql_string, schema_name, "study_identifiers", "existing studies");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " study_identifiers, existing studies");

            db.ExecuteSQL("DROP TABLE IF EXISTS nk.source_data;");
            db.ExecuteSQL("DROP TABLE IF EXISTS nk.existing_data;");

            db.Update_SourceTable_ExportDate(schema_name, "study_identifiers");
        }

This process is repeated for all study attribute types. It is true that the steps of creating existing and source data sets, and then doing the insert, could all be rolled up into one SQL statement. It is a lot clearer, however, to separate out the steps and use temporary tables.

Aggregating Object Data

Transfer of object data also requires an initial 'id processing' phase, followed by the transfer itself. The id processing, however, differs according to whether the source is study based, where all data objects are already related to studies, or object based, where the study-object links have to be constructed, (the only current object based source is PubMed).
Note that - at least for the moment - there is not a requirement to establish an object-object links table, listing those objects that are duplicated in different sources, and thus which present with different ids. For study based sources the assumption is that almost all objects listed will be unique. This seems to be justified, and there is any case the ability to check this later in the processing. But there is also a practical element here - without PIDs, and with generated data object ids that are always source specific, it is extremely difficult to unambiguously identify duplicates even if they did occur. Journal articles in the system are duplicated, of course, but they always present with the same genuine identifier (the PubMed Id) and so present a different challenge - see PubMed Data for details. As more data sources are added, in particular more object based data sources, as well as journal articles not listed in PubMed, it will be necessary to revisit this issue, and almost certainly try to find ways of identifying duplicate objects across different sources.
In study based sources the source data objects will already carry a reference to the sd_sid study identifier. But that identifier will need 'translating' into the mdr integer identifier when the object is transferred to the central database, and the data object itself will also need an integer id assigned. As with the study data, the first step is to establish a temporary table that will hold the object identifier data - and in this case the associated study identifier - which is called temp_object_ids. The structure of this table is analogous

        public void SetUpTempObjectIdsTables()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids;
                      CREATE TABLE IF NOT EXISTS nk.temp_object_ids(
                        object_id                INT
                      , source_id                INT
                      , sd_oid                   VARCHAR
                      , parent_study_source_id   INT 
                      , parent_study_sd_sid      VARCHAR
                      , parent_study_id          INT
                      , is_preferred_study       BOOLEAN
                      , datetime_of_data_fetch   TIMESTAMPTZ
                      ); ";
                conn.Execute(sql_string);

                sql_string = @"DROP TABLE IF EXISTS nk.temp_objects_to_add;
                      CREATE TABLE IF NOT EXISTS nk.temp_objects_to_add(
                        object_id                INT
                      , sd_oid                   VARCHAR
                      ); 
                      CREATE INDEX temp_objects_to_add_sd_oid on nk.temp_objects_to_add(sd_oid);";
                conn.Execute(sql_string);
            }
        }
        public IEnumerable<ObjectId> FetchObjectIds(int source_id)
        {
            string conn_string = repo.GetConnString(source_id);
            using (var conn = new NpgsqlConnection(conn_string))
            {
                string sql_string = @"select " + source_id.ToString() + @" as source_id, " 
                          + source_id.ToString() + @" as parent_study_source_id, 
                          sd_oid, sd_sid as parent_study_sd_sid, datetime_of_data_fetch
                          from ad.data_objects";

                return conn.Query<ObjectId>(sql_string);
            }
        }


        public ulong StoreObjectIds(PostgreSQLCopyHelper<ObjectId> copyHelper, IEnumerable<ObjectId> entities)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                conn.Open();
                return copyHelper.SaveAll(conn, entities);
            }
        }
        public void UpdateObjectsWithStudyIds(int source_id)
        {
            // Update the object parent study_id using the 'correct'
            // value found in the all_ids_studies table

            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"UPDATE nk.temp_object_ids t
                           SET parent_study_id = s.study_id, 
                           is_preferred_study = s.is_preferred
                           FROM nk.all_ids_studies s
                           WHERE t.parent_study_sd_sid = s.sd_sid
                           and t.parent_study_source_id = s.source_id;";
                conn.Execute(sql_string);

                // Drop those link records that cannot be matched

                sql_string = @"DELETE FROM nk.temp_object_ids
                             WHERE parent_study_id is null;";
                conn.Execute(sql_string);
            }
        }


        public void CheckStudyObjectsForDuplicates(int source_id)
        {
            // TO DO - very rare at the momentt
        }
        public void UpdateAllObjectIdsTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // Add the new object id records to the all Ids table
                // For study based data, the assumption here is that within each source 
                // the data object sd_oid is unique, (because they are each linked to different studies) 
                // which means that the link is also unique.
                // BUT FOR PUBMED and other data object based data this is not true
                // therefore need to do the ResetIdsOfDuplicatedPMIDs later

                string sql_string = @"INSERT INTO nk.all_ids_data_objects
                             (source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch)
                             select source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch
                             from nk.temp_object_ids";
                conn.Execute(sql_string);

                // update the table with the object id (will always be the same as the 
                // identity at the moment as there is no object-object checking
                // If objects are amalgamated from different sources in the future
                // the object-object check will need to be added at this stage

                sql_string = @"UPDATE nk.all_ids_data_objects
                            SET object_id = id
                            WHERE source_id = " + source_id.ToString() + @"
                            and object_id is null;";
                conn.Execute(sql_string);

            }
        }
        public void FillObjectsToAddTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO nk.temp_objects_to_add
                             (object_id, sd_oid)
                             SELECT distinct object_id, sd_oid 
                             FROM nk.all_ids_data_objects
                             WHERE source_id = " + source_id.ToString();
                conn.Execute(sql_string);
            }
        }
        public void LoadDataObjects(string schema_name)
        {
             string sql_string = @"INSERT INTO ob.data_objects(id,
                    display_title, version, doi, doi_status_id, publication_year,
                    object_class_id, object_type_id, managing_org_id, managing_org,
                    lang_code, access_type_id, access_details, access_details_url,
                    url_last_checked, eosc_category, add_study_contribs, 
                    add_study_topics)
                    SELECT t.object_id,
                    s.display_title, s.version, s.doi, s.doi_status_id, s.publication_year,
                    s.object_class_id, s.object_type_id, s.managing_org_id, s.managing_org,
                    s.lang_code, s.access_type_id, s.access_details, s.access_details_url,
                    s.url_last_checked, s.eosc_category, s.add_study_contribs, 
                    s.add_study_topics
                    FROM " + schema_name + @".data_objects s
                    INNER JOIN nk.temp_objects_to_add t
                    on s.sd_oid = t.sd_oid ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "data_objects", "");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " data_objects");

            db.Update_SourceTable_ExportDate(schema_name, "data_objects");
        }
        public void LoadObjectInstances(string schema_name)
        {
            string sql_string = @"INSERT INTO ob.object_instances(object_id,  
            instance_type_id, repository_org_id, repository_org,
            url, url_accessible, url_last_checked, resource_type_id,
            resource_size, resource_size_units, resource_comments)
            SELECT t.object_id,
            instance_type_id, repository_org_id, repository_org,
            url, url_accessible, url_last_checked, resource_type_id,
            resource_size, resource_size_units, resource_comments
            FROM " + schema_name + @".object_instances s
                    INNER JOIN nk.temp_objects_to_add t
                    on s.sd_oid = t.sd_oid ";

            int res = db.ExecuteTransferSQL(sql_string, schema_name, "object_instances", "");
            StringHelpers.SendFeedback("Loaded records - " + res.ToString() + " object_instances");

            db.Update_SourceTable_ExportDate(schema_name, "object_instances");
        }

The link between data objects and studies - found within the source data object data - is transferred to link tables. The 'parent study' id is transformed into its most 'preferred' form if and when necessary, to ensure that the links work properly in the aggregated system. Also transferred to link tables is the provenance data that indicates when each study and data object was last retrieved from the source. For sources where there are no studies - just data objects - the process is necessarily different. It must also follow after the aggregation of study data, to ensure that all studies are in the central system. This only applies to PubMed data at the moment. For PubMed, the links between the PubMed data and the studies are first identified. Two sources are used - the 'bank id' data within the PubMed data itself, referring to trial registry ids, and the 'source id' data in the study based sources, where references are provided to relevant papers. These two sets of data are combined and de-duplicated, and two final sets of data are created: the distinct list of PubMed data objects, and the list of links between those objects and studies. Unlike most data objects in the study based resources, PubMed data objects can be linked to multiple studies, and of course studies may have multiple article references. The linkage is therefore complex and requires considerable additional processing.

Creating the Core Tables

Most of the other options provided by the program are relatively simple and self contained. The -C option copies the data from the aggregating schema (st, ob, and nk) to the core schema without any processing, other than creating the provenance strings for both studies and data objects. The latter may be composite if more than one source was involved.

Creating the JSON Data