Difference between revisions of "Processing PubMed Data"

From ECRIN-MDR Wiki
Jump to navigation Jump to search
(Download operations)
(Download operations)
Line 189: Line 189:
 
</pre>
 
</pre>
 
</div>
 
</div>
Each idstring, when inserted into an API call, will therefore download 10 records at a time, but the other components of the API string first need to be constructed. In this case, because of the volume of download required, the base URL requires not only the address of the NLM fetch API, but also an indicator of the tool being used and a contact email (as per the NLM's data access rules). The API call is then made is then by adding the idstring and '&retmode=xml' to the base URL, and each individual article node is extracted from the returned XML. At this stage there is very little additional processing of the downloaded data, but a call is made to GetDateLastRevised(article), which returns that date by parsing a few nodes in the XML. The last revised data is then checked against the cut-off date, unless this download is of type 121, which does not include a cut-off date. All bank-linked PMIDs should pass the test (because they have been already been filtered), but the source reference PMIDs will be a mix and only require downloading if they are new or have been revised since the last download date. The source data objects table needs to be interrogated to discover whether the file is completely new to the system - in which case a new object source record is added and the XML is written out as a file to the appropriate folder - or an update of an existing file - in which case the object source record is updated and the XML is written out as a file that replaces the one already there.
+
Each idstring, when inserted into an API call, will therefore download 10 records at a time, but the other components of the API string first need to be constructed. In this case, because of the volume of download required, the base URL requires not only the address of the NLM fetch API, but also an indicator of the tool being used and a contact email (as per the NLM's data access rules). The API call is then made is then by adding the idstring and '&retmode=xml' to the base URL, and each individual article node is extracted from the returned XML. At this stage there is very little additional processing of the downloaded data, but a call is made to GetDateLastRevised function, which returns that date by parsing a few nodes in the XML. The last revised data is then checked against the cut-off date, unless this download is of type 121, which does not require a date parameter. All bank-linked PMIDs should pass the test (because they have been already been filtered), but the source reference PMIDs will be a mix and only require downloading if they are new or have been revised since the last download date. The source data objects table needs to be interrogated to discover whether the file is completely new to the system - in which case a new object source record is added and the XML is written out as a file to the appropriate folder - or an update of an existing file - in which case the object source record is updated and the XML is used to replace the file that is already there.
 
<div style="font-family: monospace; font-size: 13px" >
 
<div style="font-family: monospace; font-size: 13px" >
 
<pre>
 
<pre>

Revision as of 16:48, 21 November 2020

Introduction

The PubMed data has a range of 'special issues' that demand additional processing in several places in the system. This is the only data that is not directly linked to studies within the data source, so the data objects retrieved from it, the PubMed citations and associated PMC articles, have to be linked within the MDR. There are two mechanisms for this. The first is embedded in the Pubmed data itself, in the shape of 'bank ids'. These are the identifiers of associated entries in 'databanks', and that term includes trial registries. This allows a PubMed citation to explicitly reference an associated study, through that study's registry id - most often but not exclusively the ClinicalTrials.gov NCT number. The second is in the 'associated reference' data found in some data sources. This appears under various names and in various forms, but usually includes a PubMed identifier, at least for the great majority of cited references (some are given only as DOIs or as textual citations). Note that although this list of references is obtained during a study based source's extraction, the citations themselves are not downloaded and processed into the system during that extraction. Instead all PubMed data is extracted and processed only when referencing PubMed (id = 100135) as the data source.
The system has to consider both PubMed linkage types, firstly when identifying the relevant PubMed data to download (PubMed contains well over 30 million records, so we only want the data we can link) and secondly when linking PubMed data objects to studies during data aggregation. The harvesting process for PubMed is also relatively complex, requiring additional calls into the contextual data. The PubMed specific aspects of data download, harvest and aggregation are therefore summarised below.

Data Download

Under normal circumstances downloads of PubMed data need to obtain only records that have been added to, or revised within, the PubMed system since the previous download. PubMed downloads therefore come with a cut-off date, that specifies only records added / edited on or after that date need be (re-) downloaded. In addition, they come with a search filter. The first is used to select records in PubMed that include 'bank' references to a trial registry. the second is used to assemble the reference lists already existing in the MDR, and use that list as the basis for checking revision dates in the PubMed database. In each case the 'type' of the saf (search and fetch) operation is 114: All new or revised records with a filter applied (download).
In the first, bank based, download the filter parameter is 10003: PubMed abstracts with references to any trial registry, whilst in the second, references based download it is 10004: Identifies PubMed references in Study sources that have not yet been downloaded.
Variants of these downloads exist using type 121: Filtered records (download). With filter 10003 this will cause the download of all bank-linked records to be downloaded, irrespective of date, whilst with filter 10004 all records referenced in study sources will be downloaded, irrespective of revision date. Use of this download type may be used occasionally, e.g. annually, after the Pubmed database itself is revised, to ensure synchronisation with the source. In all cases the download mechanism is divided into two parts:

  • the creation of a list of the relevant PubMed identifiers (PMIDs), from a search of PubMed or from examining and collecting the reference data in the MDR
  • the download of the relevant files from PubMed, based on the list of PMIDs.

In the case of the bank-linked search, the application of the cut-off date forms part of the search parameters. In the case of the reference-linked search, the cut-off date is applied during the download process.

Searching Pubmed for Bank linked records

The PubMed API allows records revised or added in a date range to be identified, and at the same time allows the search results to be filtered to include only those with reference to a specific databank, but not to any databank. It is therefore necessary to loop through each of the possible trial registry 'banks' and obtain the relevant records for each, adding these to a table each time. In the code, shown below, the base URL for the Pubmed site is first established, along with other variables - retmax gives the maximum number of record ids to be returned in any search call, while include_dates uses the type_id parameter to determine if a cu-off date is to be applied. The various database tables required are set up, and then a list of the trial registries is retrieved from the database as the variable banks.
The API call is then constructed, using the base URL, the trial registry id (as known to the API, in the code called nlm_abbrev), followed by '[SI]', which identifies this parameter as referring to a particular databank, and then if required (they normally are) the cut off and current dates as the minimum and maximum dates of the selected date range against the 'mdat' parameterthe process for each databask, in the , in a yyyy/MM/dd format. The total number of PubMed Ids to be retrieved for this databank is retrieved by an initial API call, which examines the Count variable in the returned XML, and the number of calls required is then calculated using retmax.
The system then loops through each of the required calls, waiting 2 seconds between each, adding the start position and maximum size parameters for the returned id list. The sort of API string that results is shown below.
https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=ClinicalTrials.gov[SI]&mindate=2020/10/01&maxdate=2020/11/20&datetype=mdat&retstart=1&retmax=1000
The record Ids found are stored in the temp_pmids_by_bank table, and the accumulated records are transferred, at the end of each process, in the pmids_by_bank_total table.

        public async Task CreatePMIDsListfromBanksAsync()
        {
            int totalRecords, numCallsNeeded, bank_id;
            int retmax = 1000;
            string search_term;
            CopyHelpers helper = new CopyHelpers();
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=";
            bool include_dates = (args.type_id == 114) ? true : false;

            pubmed_repo.SetUpTempPMIDsByBankTable();
            pubmed_repo.SetUpBankPMIDsTable();
            pubmed_repo.SetUpDistinctBankPMIDsTable();

            // Get list of potential linked data banks (includes trial registries)
            IEnumerable<PMSource> banks = pubmed_repo.FetchDatabanks();

            foreach (PMSource s in banks)
            {
                // get databank details
                bank_id = s.id;
                search_term = s.nlm_abbrev + "[SI]";
                if (include_dates)
                {
                    string today = DateTime.Now.ToString("yyyy/MM/dd");
                    string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
                    string date_term = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
                    search_term += date_term;
                }
                string url = baseURL + search_term;


                // Get the number of total records that have this databank reference
                // and calculate the loop parameters
                totalRecords = await pubmed_repo.GetBankDataCountAsync(url);
                numCallsNeeded = (int)(totalRecords / retmax) + 1;
                pubmed_repo.TruncateTempPMIDsByBankTable();

                // loop through the records and obtain and store relevant
                // records retmax (= 1000) at a time

                for (int i = 0; i < numCallsNeeded; i++)
                {
                    try
                    {
                        int start = i * 1000;
                        string selectedRecords = "&retstart=" + start.ToString() + "&retmax=" + retmax.ToString();

                        // Put a 2 second pause before each call.
                        await Task.Delay(2000);
                        string responseBody = await webClient.GetStringAsync(url + selectedRecords);

                        // The eSearchResult class allows the returned json string to be easily deserialised
                        // and the required values, of each Id in the IdList, can then be read.

                        XmlSerializer xSerializer = new XmlSerializer(typeof(eSearchResult));
                        using (TextReader reader = new StringReader(responseBody))
                        {
                            eSearchResult result = (eSearchResult)xSerializer.Deserialize(reader);
                            if (result != null)
                            {
                                var FoundIds = Array.ConvertAll(result.IdList, ele => new PMIDByBank(ele.ToString()));
                                pubmed_repo.StorePMIDsByBank(helper.bank_ids_helper, FoundIds);

                                string feedback = "Storing " + retmax.ToString() +
                                                  " records from " + start.ToString() + " in bank " + search_term;
                                StringHelpers.SendFeedback(feedback);
                            }
                        }
                    }

                    catch (HttpRequestException e)
                    {
                        StringHelpers.SendError("In PubMed CreatePMIDsListfromBanksAsync(): " + e.Message);
                    }
                }

                // transfer across to total table...
                pubmed_repo.TransferBankPMIDsToTotalTable(s.nlm_abbrev);
            }

            pubmed_repo.DropTempPMIDByBankTable();
            pubmed_repo.FillDistinctBankPMIDsTable();
        }   

At the end of the process the pmids_by_bank_total table has a record of each PMID that meets the original search criteria. A call is then made to the FillDistinctBankPMIDsTable procedure. This stores the distinct PMIDs found into the previously created distinct_pmids_by_bank table, grouping those PMIDs into 10s (using the table's identity column identity column and integer division). The PMIDs are then prepared for the next, file download, stage.

       public void SetUpDistinctBankPMIDsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"DROP TABLE IF EXISTS pp.distinct_pmids_by_bank;
                       CREATE TABLE IF NOT EXISTS pp.distinct_pmids_by_bank(
                         identity int GENERATED ALWAYS AS IDENTITY
                       , group_id int
                       , pmid varchar)";
                conn.Execute(sql_string);
            }
        }

        public void FillDistinctBankPMIDsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO pp.distinct_pmids_by_bank(
                          pmid)
                          SELECT DISTINCT pmid
                          FROM pp.pmids_by_bank_total
                          ORDER BY pmid;";
                conn.Execute(sql_string);

                sql_string = @"Update pp.distinct_pmids_by_bank
                          SET group_id = identity / 10;";
                conn.Execute(sql_string);
            }
        }

Assembling lists of Pubmed references

The study_reference records are collected - one database at a time - and transferred to a single table. After the various tables required are established the system retrieves a list of the study based data sources with reference records. It loops through each of those references, getting the PMIDs as a collection of PMIDBySource objects in each case and transferring them to the pmids_by_source_total table. A similar call to that for the bank PMIDs is made, to the FillDistinctSourcePMIDsTable procedure, to store the distinct PMIDs into another table (distinct_pmids_by_source), and again group these into 10s.

        public void CreatePMIDsListfromSources()
        {
            // Establish tables and support objects

            pubmed_repo.SetUpTempPMIDsBySourceTable();
            pubmed_repo.SetUpSourcePMIDsTable();
            pubmed_repo.SetUpDistinctSourcePMIDsTable();
            CopyHelpers helper = new CopyHelpers();
            IEnumerable<PMIDBySource> references;

            // Loop through the study databases that hold
            // study_reference tables, i.e. with pmid ids
            IEnumerable<Source> sources = pubmed_repo.FetchSourcesWithReferences();
            foreach (Source s in sources)
            {
                pubmed_repo.TruncateTempPMIDsBySourceTable();
                references = pubmed_repo.FetchSourceReferences(s.database_name);
                pubmed_repo.StorePmidsBySource(helper.source_ids_helper, references);
                pubmed_repo.TransferSourcePMIDsToTotalTable(s.id);
            }

            pubmed_repo.FillDistinctSourcePMIDsTable();
            pubmed_repo.DropTempPMIDBySourceTable();
        }

The process is therefore much simpler and quicker than the API calls required for identifying the bank-linked PMIDs, but the end result is very similar - a list of PMIDs, each belonging to a numbered group of 10. Note that list only consists of PMIDs - the purpose is only to identify the PubMed data of interest and download it - linking that data to studies comes later.

Download operations

However the list of PMIDs is obtained, the download functionality is very similar.
The PMID list is retrieved as strings, each one holding 10 Ids, called idstrings, using either the FetchDistinctBankPMIDStrings() or the FetchDistinctSourcePMIDStrings() functions. These both use the same 'partitiion over' mechanism to concatenate the PMIDs from 10 records (as grouped during the earlier search phase) into a single string, for example:

        public IEnumerable<string> FetchDistinctSourcePMIDStrings()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"select distinct string_agg(pmid, ', ') 
                        OVER ( PARTITION BY group_id) 
                        from pp.distinct_pmids_by_source;";
                return conn.Query<string>(sql_string);
            }
        }

Each idstring, when inserted into an API call, will therefore download 10 records at a time, but the other components of the API string first need to be constructed. In this case, because of the volume of download required, the base URL requires not only the address of the NLM fetch API, but also an indicator of the tool being used and a contact email (as per the NLM's data access rules). The API call is then made is then by adding the idstring and '&retmode=xml' to the base URL, and each individual article node is extracted from the returned XML. At this stage there is very little additional processing of the downloaded data, but a call is made to GetDateLastRevised function, which returns that date by parsing a few nodes in the XML. The last revised data is then checked against the cut-off date, unless this download is of type 121, which does not require a date parameter. All bank-linked PMIDs should pass the test (because they have been already been filtered), but the source reference PMIDs will be a mix and only require downloading if they are new or have been revised since the last download date. The source data objects table needs to be interrogated to discover whether the file is completely new to the system - in which case a new object source record is added and the XML is written out as a file to the appropriate folder - or an update of an existing file - in which case the object source record is updated and the XML is used to replace the file that is already there.

        public async Task<DownloadResult> DownloadPubmedEntriesAsync(IEnumerable<string> idstrings)
        {
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?";
            baseURL += "tool=ECRINMDR&email=steve.canham@ecrin.org&db=pubmed&id=";
            DownloadResult res = new DownloadResult();
            bool ignore_revision_date = (args.type_id == 121) ? true : false;

            try
            {
                // loop through the references - already in groups of 10 
                // from processing in the database call

                foreach (string idstring in idstrings)
                {
                    // Construct the fetch URL using the 10 Ids and
                    // retrieve the articles as nodes

                    string url = baseURL + idstring + "&retmode=xml";
                    XmlDocument xdoc = new XmlDocument();
                    string responseBody = await webClient.GetStringAsync(url);
                    xdoc.LoadXml(responseBody);
                    XmlNodeList articles = xdoc.GetElementsByTagName("PubmedArticle");

                    // Consider each article node in turn

                    foreach (XmlNode article in articles)
                    {
                        string pmid = article.SelectSingleNode("MedlineCitation/PMID").InnerText;
                        if (Int32.TryParse(pmid, out int ipmid))
                        {
                            // get current or new file download record, calculate
                            // and store last revised date. Write new or replace
                            // file and update file_record (by ref).
                            res.num_checked++;
                            DateTime? last_revised_datetime = GetDateLastRevised(article);
                            ObjectFileRecord file_record = logging_repo.FetchObjectFileRecord(pmid, source.id);
                            if (file_record == null)
                            {
                                string remote_url = "https://www.ncbi.nlm.nih.gov/pubmed/" + pmid;
                                file_record = new ObjectFileRecord(source.id, pmid, remote_url, saf_id);
                                file_record.last_revised = last_revised_datetime;
                                WriteNewFile(article, ipmid, file_record);
                                
                                logging_repo.InsertObjectFileRec(file_record);
                                res.num_added++;
                                res.num_downloaded++;
                            }
                            else
                            {
                                // normally should be less then but here <= to be sure
                                if (ignore_revision_date || 
                                   (last_revised_datetime != null 
                                           && file_record.last_downloaded <= last_revised_datetime))
                                {
                                    file_record.last_saf_id = saf_id;
                                    file_record.last_revised = last_revised_datetime;
                                    ReplaceFile(article, file_record);

                                    logging_repo.StoreObjectFileRec(file_record);
                                    res.num_downloaded++;
                                }
                            }

                            if (res.num_checked % 100 == 0) StringHelpers.SendFeedback(res.num_checked.ToString());
                        }
                    }

                    System.Threading.Thread.Sleep(800);
                }

                return res;
            }

            catch (HttpRequestException e)
            {
                StringHelpers.SendError("In PubMed DownloadPubmedEntriesUsingSourcesAsync(): " + e.Message);
                return res;
            }
        }

Data Harvest

Data Aggregation

public void SetupTempPMIDTable()

       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"DROP TABLE IF EXISTS nk.temp_pmids;
                     CREATE TABLE IF NOT EXISTS nk.temp_pmids(
                       source_id                INT
                     , sd_oid                   VARCHAR
                     , parent_study_source_id   INT 
                     , parent_study_sd_sid      VARCHAR
                     , datetime_of_data_fetch   TIMESTAMPTZ
                     ); ";
               conn.Execute(sql_string);
           }
       }
       public void SetupDistinctPMIDTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"DROP TABLE IF EXISTS nk.distinct_pmids;
                     CREATE TABLE IF NOT EXISTS nk.distinct_pmids(
                       source_id                INT
                     , sd_oid                   VARCHAR
                     , parent_study_source_id   INT 
                     , parent_study_sd_sid      VARCHAR
                     , datetime_of_data_fetch   TIMESTAMPTZ
                     ); ";
               conn.Execute(sql_string);
           }
       }


       public IEnumerable<PMIDLink> FetchBankPMIDs()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"select 
                       100135 as source_id, 
                       d.id as parent_study_source_id, 
                       k.sd_oid, k.id_in_db as parent_study_sd_sid, 
                       a.datetime_of_data_fetch
                       from pubmed_ad.object_db_links k
                       inner join pubmed_ad.data_objects a 
                       on k.sd_oid = a.sd_oid
                       inner join context_ctx.nlm_databanks d
                       on k.db_name = d.nlm_abbrev
                       where bank_type <> 'databank'";
               return conn.Query<PMIDLink>(sql_string);
           }
       }


       public ulong StorePMIDLinks(PostgreSQLCopyHelper<PMIDLink> copyHelper, IEnumerable<PMIDLink> entities)
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               conn.Open();
               return copyHelper.SaveAll(conn, entities);
           }
       }


       public IEnumerable<PMIDLink> FetchSourceReferences(int source_id, string db_name)
       {
           builder.Database = db_name;
           string db_conn_string = builder.ConnectionString;
           using (var conn = new NpgsqlConnection(db_conn_string))
           {
               string sql_string = @"select 
                       100135 as source_id, " +
                       source_id.ToString() + @" as parent_study_source_id, 
                       r.pmid as sd_oid, r.sd_sid as parent_study_sd_sid, 
                       s.datetime_of_data_fetch
                       from ad.study_references r
                       inner join ad.studies s
                       on r.sd_sid = s.sd_sid
                       where r.pmid is not null;";
               return conn.Query<PMIDLink>(sql_string);
           }
       }


       public void FillDistinctPMIDsTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // First ensure that any PMIDs (sd_oids) are in the same format
               // Some have a 'tail' of spaces after them, as the standard 
               // length of a sd_oid is 24 characters.
               
               string sql_string = @"UPDATE nk.temp_pmids
                        SET sd_oid = trim(sd_oid);";
               conn.Execute(sql_string);
               // Then transfer the distinct data
               sql_string = @"INSERT INTO nk.distinct_pmids(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid)
                        SELECT distinct 
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid
                        FROM nk.temp_pmids;";
               conn.Execute(sql_string);
               // Update with latest datetime_of_data_fetch
               sql_string = @"UPDATE nk.distinct_pmids dp
                        set datetime_of_data_fetch = mx.max_fetch_date
                        FROM 
                        ( select sd_oid, parent_study_sd_sid, 
                          max(datetime_of_data_fetch) as max_fetch_date
                          FROM nk.temp_pmids
                          group by sd_oid, parent_study_sd_sid ) mx
                        WHERE dp.parent_study_sd_sid = mx.parent_study_sd_sid
                        and dp.sd_oid = mx.sd_oid;";
               conn.Execute(sql_string);
           }
       }


       public void CleanPMIDsdsidData1()
       {
           string sql_string = "";
           using (var conn = new NpgsqlConnection(connString))
           {
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = 'ACTRN' || parent_study_sd_sid
                       WHERE parent_study_source_id = 100116
                       AND length(parent_study_sd_sid) = 14;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, ' ', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, '#', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, ':', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, '[', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CHICTR', 'ChiCTR')
                       WHERE parent_study_source_id = 100118;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = 'ChiCTR-' || parent_study_sd_sid
                       WHERE parent_study_source_id = 100118
                       and parent_study_sd_sid not ilike 'ChiCTR-%';";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'ChiCTR-ChiCTR', 'ChiCTR-')
                       WHERE parent_study_source_id = 100118;";
               conn.Execute(sql_string);
           }
       }


       public void CleanPMIDsdsidData2()
       {
           string sql_string = "";
           using (var conn = new NpgsqlConnection(connString))
           {
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, '/', '-')
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = 'CTRI-' || parent_study_sd_sid
                    WHERE parent_study_source_id = 100121
                    and parent_study_sd_sid not ilike 'CTRI-%';";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'REF-', )
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CTRI-CTRI', 'CTRI-')
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = 'RPCEC' || parent_study_sd_sid
                  WHERE parent_study_source_id = 100122
                  and parent_study_sd_sid not ilike 'RPCEC%';";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = UPPER(parent_study_sd_sid)
                  WHERE parent_study_source_id = 100123;"; 
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, ' ', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, '–', '-')
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRA-CT', 'EUDRACT')
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRACT', )
                   WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EURODRACT', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EU', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'CT', )
                  WHERE parent_study_source_id = 100123;";
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = left(parent_study_sd_sid, 14)
                  WHERE parent_study_source_id = 100123
                  and length(parent_study_sd_sid) > 14;";
               conn.Execute(sql_string);
           }
       }


       public void TransferPMIDLinksToObjectIds()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"INSERT INTO nk.temp_object_ids(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, datetime_of_data_fetch)
                        SELECT  
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, datetime_of_data_fetch
                        FROM nk.distinct_pmids";
               conn.Execute(sql_string);
           }
       }


       public void InputPreferredSDSIDS()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // replace any LHS sd_sids with the 'preferred' RHS
               string sql_string = @"UPDATE nk.temp_object_ids b
                              SET parent_study_sd_sid = preferred_sd_sid,
                              parent_study_source_id = preferred_source_id
                              FROM nk.study_study_links k
                              WHERE b.parent_study_sd_sid = k.sd_sid
                              and b.parent_study_source_id = k.source_id ;";
               conn.Execute(sql_string);
               // That may have produced some duplicates - if so get rid of them
               // needs to be done indirectly because of the need to get the maximum
               // datetime_of_data_fetch for each duplciated object
               sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids2;
               CREATE TABLE IF NOT EXISTS nk.temp_object_ids2(
                 object_id                INT
               , source_id                INT
               , sd_oid                   VARCHAR
               , parent_study_source_id   INT
               , parent_study_sd_sid      VARCHAR
               , parent_study_id          INT
               , is_preferred_study       BOOLEAN  default true
               , datetime_of_data_fetch   TIMESTAMPTZ
               ); ";
               conn.Execute(sql_string);
               sql_string = @"INSERT INTO nk.temp_object_ids2(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, parent_study_id)
                        SELECT distinct 
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, parent_study_id
                        FROM nk.temp_object_ids;";
               conn.Execute(sql_string);
               // update with latest datetime_of_data_fetch
               // for each distinct study - data object link 
               sql_string = @"UPDATE nk.temp_object_ids2 to2
                        set datetime_of_data_fetch = mx.max_fetch_date
                        FROM 
                        ( select sd_oid, parent_study_sd_sid,
                          max(datetime_of_data_fetch) as max_fetch_date
                          FROM nk.temp_object_ids
                          group by sd_oid, parent_study_sd_sid ) mx
                        WHERE to2.sd_oid = mx.sd_oid
                        and to2.parent_study_sd_sid = mx.parent_study_sd_sid;";
               conn.Execute(sql_string);
               sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids;
               ALTER TABLE nk.temp_object_ids2 RENAME TO temp_object_ids;";
               conn.Execute(sql_string);
               // Maybe a few blank pmids slip through...
               sql_string = @"delete from nk.temp_object_ids
                   where sd_oid is null or sd_oid = ;";
               conn.Execute(sql_string);
           }
       }


       public void ResetIdsOfDuplicatedPMIDs()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // Find the minimum object_id for each PMID in the table
               // source id for PubMed = 100135
               string sql_string = @"DROP TABLE IF EXISTS nk.temp_min_object_ids;
                                    CREATE TABLE nk.temp_min_object_ids as
                                    SELECT sd_oid, Min(id) as min_id
                                    FROM nk.all_ids_data_objects
                                    WHERE source_id = 100135
                                    GROUP BY sd_oid;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.all_ids_data_objects b
                              SET object_id = min_id
                              FROM nk.temp_min_object_ids m
                              WHERE b.sd_oid = m.sd_oid
                              and source_id = 100135;";
               conn.Execute(sql_string);
               sql_string = @"DROP TABLE nk.temp_min_object_ids;";
               conn.Execute(sql_string);
               // ???? May be (yet) more duplicates have appeared, where a 
               // study - pmid link has been generated in more than one way
               // (Links of the same paper to different studies are not uncommon)
           }
       }


       public void DropTempPMIDTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = "DROP TABLE IF EXISTS pp.temp_pmids";
               conn.Execute(sql_string);
           }
       }