Difference between revisions of "Processing PubMed Data"

From ECRIN-MDR Wiki
Jump to navigation Jump to search
(=Searching Pubmed for Bank linked records)
(Searching Pubmed for Bank linked records)
Line 13: Line 13:
  
 
====Searching Pubmed for Bank linked records====
 
====Searching Pubmed for Bank linked records====
The PubMed API allows records revised or added in a date range to be identified, and at the same time allows the search results to be filtered to include only those with reference to a ''specific'' databank, but not to ''any'' databank.  
+
The PubMed API allows records revised or added in a date range to be identified, and at the same time allows the search results to be filtered to include only those with reference to a ''specific'' databank, but not to ''any'' databank. It is therefore necessary to loop through each of the possible trial registry 'banks' and obtain the relevant records for each, adding these to a table each time. In the code, shown below, the base URL for the Pubmed site is first established, along with other variables - retmax gives the maximum number of record ids to be returned in any search call, while include_dates uses the type_id parameter to determine if a cu-off date is to be applied. The various database tables required are set up, and then a list of the trial registries is retrieved from the database as the variable banks.<br/>
The records found for each bank are stored in a table in the pp schema - pp.temp_by_bank_records.
+
The API call is then constructed, using the base URL, the trial registry id (as known to the API, in the code called nlm_abbrev), followed by '[SI]', which identifies this parameter as referring to a particular databank, and then if required (they normally are) the cut off and current dates as the minimum and maximum dates of the selected date range, in a yyyy/MM/dd format. The sort of API string is shown below.<br/>
These are transferred in turn to the pp.pmids_with_bank_records table
+
''https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=''
 +
 
 +
 
 +
The records found for each bank are stored in a table in the pp schema - pp.temp_by_bank_records. These are transferred in turn to the pp.pmids_with_bank_records table
 +
At the end of the loop through all banks the *distinct* pmid records are transfered to a list in memory. The system loops through these - if the record exists it is replaced and the object source record is updated in the mon database. if the record is new the record is downloaded and a new object source record is created. This strategy is represented by saf_type 114 (with cutoff date, and filter 10003).
  
At the end of the loop through all banks the *distinct* pmid records are transfered to a list in memory.
 
The system loops through these - if the record exists it is replaced and the object source record
 
is updated in the mon database.
 
if the record is new the record is downloaded and a new object source record is created.
 
This strategy is represented by saf_type 114 (with cutoff date, and filter 10003)
 
 
<div style="font-family: monospace; font-size: 13px" >
 
<div style="font-family: monospace; font-size: 13px" >
 
<pre>
 
<pre>

Revision as of 15:28, 21 November 2020

Introduction

The PubMed data has a range of 'special issues' that demand additional processing in several places in the system. This is the only data that is not directly linked to studies within the data source, so the data objects retrieved from it, the PubMed citations and associated PMC articles, have to be linked within the MDR. There are two mechanisms for this. The first is embedded in the Pubmed data itself, in the shape of 'bank ids'. These are the identifiers of associated entries in 'databanks', and that term includes trial registries. This allows a PubMed citation to explicitly reference an associated study, through that study's registry id - most often but not exclusively the ClinicalTrials.gov NCT number. The second is in the 'associated reference' data found in some data sources. This appears under various names and in various forms, but usually includes a PubMed identifier, at least for the great majority of cited references (some are given only as DOIs or as textual citations). Note that although this list of references is obtained during a study based source's extraction, the citations themselves are not downloaded and processed into the system during that extraction. Instead all PubMed data is extracted and processed only when referencing PubMed (id = 100135) as the data source.
The system has to consider both PubMed linkage types, firstly when identifying the relevant PubMed data to download (PubMed contains well over 30 million records, so we only want the data we can link) and secondly when linking PubMed data objects to studies during data aggregation. The harvesting process for PubMed is also relatively complex, requiring additional calls into the contextual data. The PubMed specific aspects of data download, harvest and aggregation are therefore summarised below.

Data Download

Under normal circumstances downloads of PubMed data need to obtain only records that have been added to, or revised within, the PubMed system since the previous download. PubMed downloads therefore come with a cut-off date, that specifies only records added / edited on or after that date need be (re-) downloaded. In addition, they come with a search filter. The first is used to select records in PubMed that include 'bank' references to a trial registry. the second is used to assemble the reference lists already existing in the MDR, and use that list as the basis for checking revision dates in the PubMed database. In each case the 'type' of the saf (search and fetch) operation is 114: All new or revised records with a filter applied (download).
In the first, bank based, download the filter parameter is 10003: PubMed abstracts with references to any trial registry, whilst in the second, references based download it is 10004: Identifies PubMed references in Study sources that have not yet been downloaded.
Variants of these downloads exist using type 121: Filtered records (download). With filter 10003 this will cause the download of all bank-linked records to be downloaded, irrespective of date, whilst with filter 10004 all records referenced in study sources will be downloaded, irrespective of revision date. Use of this download type may be used occasionally, e.g. annually, after the Pubmed database itself is revised, to ensure synchronisation with the source. In all cases the download mechanism is divided into two parts:

  • the creation of a list of the relevant PMID identifiers, from a search of PubMed or from examining and collecting the reference data in the MDR
  • the download of the relevant files from PubMed, based on the list of PMIDs.

In the case of the bank-linked search, the application of the cut-off date forms part of the search parameters. In the case of the reference-linked search, the cut-off date is applied during the download process.

Searching Pubmed for Bank linked records

The PubMed API allows records revised or added in a date range to be identified, and at the same time allows the search results to be filtered to include only those with reference to a specific databank, but not to any databank. It is therefore necessary to loop through each of the possible trial registry 'banks' and obtain the relevant records for each, adding these to a table each time. In the code, shown below, the base URL for the Pubmed site is first established, along with other variables - retmax gives the maximum number of record ids to be returned in any search call, while include_dates uses the type_id parameter to determine if a cu-off date is to be applied. The various database tables required are set up, and then a list of the trial registries is retrieved from the database as the variable banks.
The API call is then constructed, using the base URL, the trial registry id (as known to the API, in the code called nlm_abbrev), followed by '[SI]', which identifies this parameter as referring to a particular databank, and then if required (they normally are) the cut off and current dates as the minimum and maximum dates of the selected date range, in a yyyy/MM/dd format. The sort of API string is shown below.
https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=


The records found for each bank are stored in a table in the pp schema - pp.temp_by_bank_records. These are transferred in turn to the pp.pmids_with_bank_records table At the end of the loop through all banks the *distinct* pmid records are transfered to a list in memory. The system loops through these - if the record exists it is replaced and the object source record is updated in the mon database. if the record is new the record is downloaded and a new object source record is created. This strategy is represented by saf_type 114 (with cutoff date, and filter 10003).

        public async Task CreatePMIDsListfromBanksAsync()
        {
            int totalRecords, numCallsNeeded, bank_id;
            int retmax = 1000;
            string search_term;
            CopyHelpers helper = new CopyHelpers();
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=";
            bool include_dates = (args.type_id == 114) ? true : false;

            pubmed_repo.SetUpTempPMIDsByBankTable();
            pubmed_repo.SetUpBankPMIDsTable();
            pubmed_repo.SetUpDistinctBankPMIDsTable();

            // Get list of potential linked data banks (includes trial registries)
            IEnumerable<PMSource> banks = pubmed_repo.FetchDatabanks();

            foreach (PMSource s in banks)
            {
                // get databank details
                bank_id = s.id;
                search_term = s.nlm_abbrev + "[SI]";
                if (include_dates)
                {
                    string today = DateTime.Now.ToString("yyyy/MM/dd");
                    string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
                    string date_term = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
                    search_term += date_term;
                }
                string url = baseURL + search_term;


                // Get the number of total records that have this databank reference
                // and calculate the loop parameters
                totalRecords = await pubmed_repo.GetBankDataCountAsync(url);
                numCallsNeeded = (int)(totalRecords / retmax) + 1;
                pubmed_repo.TruncateTempPMIDsByBankTable();

                // loop through the records and obtain and store relevant
                // records retmax (= 1000) at a time

                for (int i = 0; i < numCallsNeeded; i++)
                {
                    try
                    {
                        int start = i * 1000;
                        string selectedRecords = "&retstart=" + start.ToString() + "&retmax=" + retmax.ToString();

                        // Put a 2 second pause beefore each call.
                        await Task.Delay(2000);
                        string responseBody = await webClient.GetStringAsync(url + selectedRecords);

                        // The eSearchResult class allows the returned json string to be easily deserialised
                        // and the required values, of each Id in the IdList, can then be read.

                        XmlSerializer xSerializer = new XmlSerializer(typeof(eSearchResult));
                        using (TextReader reader = new StringReader(responseBody))
                        {
                            eSearchResult result = (eSearchResult)xSerializer.Deserialize(reader);
                            if (result != null)
                            {
                                var FoundIds = Array.ConvertAll(result.IdList, ele => new PMIDByBank(ele.ToString()));
                                pubmed_repo.StorePMIDsByBank(helper.bank_ids_helper, FoundIds);

                                string feedback = "Storing " + retmax.ToString() +
                                                  " records from " + start.ToString() + " in bank " + search_term;
                                StringHelpers.SendFeedback(feedback);
                            }
                        }
                    }

                    catch (HttpRequestException e)
                    {
                        StringHelpers.SendError("In PubMed CreatePMIDsListfromBanksAsync(): " + e.Message);
                    }
                }

                // transfer across to total table...
                pubmed_repo.TransferBankPMIDsToTotalTable(s.nlm_abbrev);
            }

            pubmed_repo.DropTempPMIDByBankTable();
            pubmed_repo.FillDistinctBankPMIDsTable();
        }   

Assembling lists of Pubmed references

The study_reference records are collected - one database at a time - and transferred to a single database. At the end of that process a distinct list is created in memory. The system loops through this 10 records at a time. If there is no object source record one is created and the file is downloaded - it is new to the system. If a source record exists the record needs checking to see if it has been revised since the last similar exercise. If it has the record is downloaded and the source file is updated. If not, no download takes place. This strategy is represented by saf_type 114 (with cutoff date, and filter 10004)

        public void CreatePMIDsListfromSources()
        {
            // Establish tables and support objects

            pubmed_repo.SetUpTempPMIDsBySourceTable();
            pubmed_repo.SetUpSourcePMIDsTable();
            pubmed_repo.SetUpDistinctSourcePMIDsTable();
            CopyHelpers helper = new CopyHelpers();
            IEnumerable<PMIDBySource> references;

            // Loop threough the study databases that hold
            // study_reference tables, i.e. with pmid ids
            IEnumerable<Source> sources = pubmed_repo.FetchSourcesWithReferences();
            foreach (Source s in sources)
            {
                pubmed_repo.TruncateTempPMIDsBySourceTable();
                references = pubmed_repo.FetchSourceReferences(s.database_name);
                pubmed_repo.StorePmidsBySource(helper.source_ids_helper, references);
                pubmed_repo.TransferSourcePMIDsToTotalTable(s.id);
            }

            pubmed_repo.FillDistinctSourcePMIDsTable();
            pubmed_repo.DropTempPMIDBySourceTable();
        }

Download operations

        public async Task<DownloadResult> DownloadPubmedEntriesAsync(IEnumerable<string> idstrings)
        {
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?";
            baseURL += "tool=ECRINMDR&email=steve.canham@ecrin.org&db=pubmed&id=";
            DownloadResult res = new DownloadResult();
            bool ignore_revision_date = (args.type_id == 121) ? true : false;

            try
            {
                // loop through the references - already in groups of 10 
                // from processing in the database call

                foreach (string idstring in idstrings)
                {
                    // Construct the fetch URL using the 10 Ids and
                    // retrieve the articles as nodes

                    string url = baseURL + idstring + "&retmode=xml";
                    XmlDocument xdoc = new XmlDocument();
                    string responseBody = await webClient.GetStringAsync(url);
                    xdoc.LoadXml(responseBody);
                    XmlNodeList articles = xdoc.GetElementsByTagName("PubmedArticle");

                    // Consider each article node in turn

                    foreach (XmlNode article in articles)
                    {
                        string pmid = article.SelectSingleNode("MedlineCitation/PMID").InnerText;
                        if (Int32.TryParse(pmid, out int ipmid))
                        {
                            // get current or new file download record, calculate
                            // and store last revised date. Write new or replace
                            // file and update file_record (by ref).
                            res.num_checked++;
                            DateTime? last_revised_datetime = GetDateLastRevised(article);
                            ObjectFileRecord file_record = logging_repo.FetchObjectFileRecord(pmid, source.id);
                            if (file_record == null)
                            {
                                string remote_url = "https://www.ncbi.nlm.nih.gov/pubmed/" + pmid;
                                file_record = new ObjectFileRecord(source.id, pmid, remote_url, saf_id);
                                file_record.last_revised = last_revised_datetime;
                                WriteNewFile(article, ipmid, file_record);
                                
                                logging_repo.InsertObjectFileRec(file_record);
                                res.num_added++;
                                res.num_downloaded++;
                            }
                            else
                            {
                                // normally should be less then but here <= to be sure
                                if (ignore_revision_date || 
                                   (last_revised_datetime != null 
                                           && file_record.last_downloaded <= last_revised_datetime))
                                {
                                    file_record.last_saf_id = saf_id;
                                    file_record.last_revised = last_revised_datetime;
                                    ReplaceFile(article, file_record);

                                    logging_repo.StoreObjectFileRec(file_record);
                                    res.num_downloaded++;
                                }
                            }

                            if (res.num_checked % 100 == 0) StringHelpers.SendFeedback(res.num_checked.ToString());
                        }
                    }

                    System.Threading.Thread.Sleep(800);
                }

                return res;
            }

            catch (HttpRequestException e)
            {
                StringHelpers.SendError("In PubMed DownloadPubmedEntriesUsingSourcesAsync(): " + e.Message);
                return res;
            }
        }
        private DateTime? GetDateLastRevised(XmlNode article)
        {
            DateTime? date_last_revised = null;

            string year = article.SelectSingleNode("MedlineCitation/DateRevised/Year").InnerText ?? "";
            string month = article.SelectSingleNode("MedlineCitation/DateRevised/Month").InnerText ?? "";
            string day = article.SelectSingleNode("MedlineCitation/DateRevised/Day").InnerText ?? "";

            if (year != "" && month != "" && day != "")
            {
                if (Int32.TryParse(year, out int iyear) 
                && Int32.TryParse(month, out int imonth)
                && Int32.TryParse(day, out int iday))
                {
                    date_last_revised = new DateTime(iyear, imonth, iday);
                }
            }
            return date_last_revised;
        }
        private void WriteNewFile(XmlNode article, int ipmid, ObjectFileRecord file_record)
        {
            string folder_name = Path.Combine(source.local_folder, "PM" + (ipmid / 10000).ToString("00000") + "xxxx");
            if (!Directory.Exists(folder_name))
            {
                Directory.CreateDirectory(folder_name);
            }
            string filename = "PM" + ipmid.ToString("000000000") + ".xml";
            string full_path = Path.Combine(folder_name, filename);

            using (XmlWriter writer = XmlWriter.Create(full_path, settings))
            {
                article.WriteTo(writer);
            }

            file_record.local_path = full_path;
            file_record.download_status = 2;
            file_record.last_downloaded = DateTime.Now;
        }
        private void ReplaceFile(XmlNode article, ObjectFileRecord file_record)
        {
            string full_path = file_record.local_path;
            // ensure can over write
            if (File.Exists(full_path))
            {
                File.Delete(full_path);
            }
            using (XmlWriter writer = XmlWriter.Create(full_path, settings))
            {
                article.WriteTo(writer);
            }
            file_record.last_downloaded = DateTime.Now;
        }

Data Harvest

Data Aggregation

public void SetupTempPMIDTable()

       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"DROP TABLE IF EXISTS nk.temp_pmids;
                     CREATE TABLE IF NOT EXISTS nk.temp_pmids(
                       source_id                INT
                     , sd_oid                   VARCHAR
                     , parent_study_source_id   INT 
                     , parent_study_sd_sid      VARCHAR
                     , datetime_of_data_fetch   TIMESTAMPTZ
                     ); ";
               conn.Execute(sql_string);
           }
       }
       public void SetupDistinctPMIDTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"DROP TABLE IF EXISTS nk.distinct_pmids;
                     CREATE TABLE IF NOT EXISTS nk.distinct_pmids(
                       source_id                INT
                     , sd_oid                   VARCHAR
                     , parent_study_source_id   INT 
                     , parent_study_sd_sid      VARCHAR
                     , datetime_of_data_fetch   TIMESTAMPTZ
                     ); ";
               conn.Execute(sql_string);
           }
       }


       public IEnumerable<PMIDLink> FetchBankPMIDs()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"select 
                       100135 as source_id, 
                       d.id as parent_study_source_id, 
                       k.sd_oid, k.id_in_db as parent_study_sd_sid, 
                       a.datetime_of_data_fetch
                       from pubmed_ad.object_db_links k
                       inner join pubmed_ad.data_objects a 
                       on k.sd_oid = a.sd_oid
                       inner join context_ctx.nlm_databanks d
                       on k.db_name = d.nlm_abbrev
                       where bank_type <> 'databank'";
               return conn.Query<PMIDLink>(sql_string);
           }
       }


       public ulong StorePMIDLinks(PostgreSQLCopyHelper<PMIDLink> copyHelper, IEnumerable<PMIDLink> entities)
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               conn.Open();
               return copyHelper.SaveAll(conn, entities);
           }
       }


       public IEnumerable<PMIDLink> FetchSourceReferences(int source_id, string db_name)
       {
           builder.Database = db_name;
           string db_conn_string = builder.ConnectionString;
           using (var conn = new NpgsqlConnection(db_conn_string))
           {
               string sql_string = @"select 
                       100135 as source_id, " +
                       source_id.ToString() + @" as parent_study_source_id, 
                       r.pmid as sd_oid, r.sd_sid as parent_study_sd_sid, 
                       s.datetime_of_data_fetch
                       from ad.study_references r
                       inner join ad.studies s
                       on r.sd_sid = s.sd_sid
                       where r.pmid is not null;";
               return conn.Query<PMIDLink>(sql_string);
           }
       }


       public void FillDistinctPMIDsTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // First ensure that any PMIDs (sd_oids) are in the same format
               // Some have a 'tail' of spaces after them, as the standard 
               // length of a sd_oid is 24 characters.
               
               string sql_string = @"UPDATE nk.temp_pmids
                        SET sd_oid = trim(sd_oid);";
               conn.Execute(sql_string);
               // Then transfer the distinct data
               sql_string = @"INSERT INTO nk.distinct_pmids(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid)
                        SELECT distinct 
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid
                        FROM nk.temp_pmids;";
               conn.Execute(sql_string);
               // Update with latest datetime_of_data_fetch
               sql_string = @"UPDATE nk.distinct_pmids dp
                        set datetime_of_data_fetch = mx.max_fetch_date
                        FROM 
                        ( select sd_oid, parent_study_sd_sid, 
                          max(datetime_of_data_fetch) as max_fetch_date
                          FROM nk.temp_pmids
                          group by sd_oid, parent_study_sd_sid ) mx
                        WHERE dp.parent_study_sd_sid = mx.parent_study_sd_sid
                        and dp.sd_oid = mx.sd_oid;";
               conn.Execute(sql_string);
           }
       }


       public void CleanPMIDsdsidData1()
       {
           string sql_string = "";
           using (var conn = new NpgsqlConnection(connString))
           {
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = 'ACTRN' || parent_study_sd_sid
                       WHERE parent_study_source_id = 100116
                       AND length(parent_study_sd_sid) = 14;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, ' ', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, '#', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, ':', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, '[', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CHICTR', 'ChiCTR')
                       WHERE parent_study_source_id = 100118;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = 'ChiCTR-' || parent_study_sd_sid
                       WHERE parent_study_source_id = 100118
                       and parent_study_sd_sid not ilike 'ChiCTR-%';";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'ChiCTR-ChiCTR', 'ChiCTR-')
                       WHERE parent_study_source_id = 100118;";
               conn.Execute(sql_string);
           }
       }


       public void CleanPMIDsdsidData2()
       {
           string sql_string = "";
           using (var conn = new NpgsqlConnection(connString))
           {
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, '/', '-')
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = 'CTRI-' || parent_study_sd_sid
                    WHERE parent_study_source_id = 100121
                    and parent_study_sd_sid not ilike 'CTRI-%';";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'REF-', )
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CTRI-CTRI', 'CTRI-')
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = 'RPCEC' || parent_study_sd_sid
                  WHERE parent_study_source_id = 100122
                  and parent_study_sd_sid not ilike 'RPCEC%';";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = UPPER(parent_study_sd_sid)
                  WHERE parent_study_source_id = 100123;"; 
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, ' ', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, '–', '-')
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRA-CT', 'EUDRACT')
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRACT', )
                   WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EURODRACT', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EU', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'CT', )
                  WHERE parent_study_source_id = 100123;";
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = left(parent_study_sd_sid, 14)
                  WHERE parent_study_source_id = 100123
                  and length(parent_study_sd_sid) > 14;";
               conn.Execute(sql_string);
           }
       }


       public void TransferPMIDLinksToObjectIds()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"INSERT INTO nk.temp_object_ids(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, datetime_of_data_fetch)
                        SELECT  
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, datetime_of_data_fetch
                        FROM nk.distinct_pmids";
               conn.Execute(sql_string);
           }
       }


       public void InputPreferredSDSIDS()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // replace any LHS sd_sids with the 'preferred' RHS
               string sql_string = @"UPDATE nk.temp_object_ids b
                              SET parent_study_sd_sid = preferred_sd_sid,
                              parent_study_source_id = preferred_source_id
                              FROM nk.study_study_links k
                              WHERE b.parent_study_sd_sid = k.sd_sid
                              and b.parent_study_source_id = k.source_id ;";
               conn.Execute(sql_string);
               // That may have produced some duplicates - if so get rid of them
               // needs to be done indirectly because of the need to get the maximum
               // datetime_of_data_fetch for each duplciated object
               sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids2;
               CREATE TABLE IF NOT EXISTS nk.temp_object_ids2(
                 object_id                INT
               , source_id                INT
               , sd_oid                   VARCHAR
               , parent_study_source_id   INT
               , parent_study_sd_sid      VARCHAR
               , parent_study_id          INT
               , is_preferred_study       BOOLEAN  default true
               , datetime_of_data_fetch   TIMESTAMPTZ
               ); ";
               conn.Execute(sql_string);
               sql_string = @"INSERT INTO nk.temp_object_ids2(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, parent_study_id)
                        SELECT distinct 
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, parent_study_id
                        FROM nk.temp_object_ids;";
               conn.Execute(sql_string);
               // update with latest datetime_of_data_fetch
               // for each distinct study - data object link 
               sql_string = @"UPDATE nk.temp_object_ids2 to2
                        set datetime_of_data_fetch = mx.max_fetch_date
                        FROM 
                        ( select sd_oid, parent_study_sd_sid,
                          max(datetime_of_data_fetch) as max_fetch_date
                          FROM nk.temp_object_ids
                          group by sd_oid, parent_study_sd_sid ) mx
                        WHERE to2.sd_oid = mx.sd_oid
                        and to2.parent_study_sd_sid = mx.parent_study_sd_sid;";
               conn.Execute(sql_string);
               sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids;
               ALTER TABLE nk.temp_object_ids2 RENAME TO temp_object_ids;";
               conn.Execute(sql_string);
               // Maybe a few blank pmids slip through...
               sql_string = @"delete from nk.temp_object_ids
                   where sd_oid is null or sd_oid = ;";
               conn.Execute(sql_string);
           }
       }


       public void ResetIdsOfDuplicatedPMIDs()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // Find the minimum object_id for each PMID in the table
               // source id for PubMed = 100135
               string sql_string = @"DROP TABLE IF EXISTS nk.temp_min_object_ids;
                                    CREATE TABLE nk.temp_min_object_ids as
                                    SELECT sd_oid, Min(id) as min_id
                                    FROM nk.all_ids_data_objects
                                    WHERE source_id = 100135
                                    GROUP BY sd_oid;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.all_ids_data_objects b
                              SET object_id = min_id
                              FROM nk.temp_min_object_ids m
                              WHERE b.sd_oid = m.sd_oid
                              and source_id = 100135;";
               conn.Execute(sql_string);
               sql_string = @"DROP TABLE nk.temp_min_object_ids;";
               conn.Execute(sql_string);
               // ???? May be (yet) more duplicates have appeared, where a 
               // study - pmid link has been generated in more than one way
               // (Links of the same paper to different studies are not uncommon)
           }
       }


       public void DropTempPMIDTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = "DROP TABLE IF EXISTS pp.temp_pmids";
               conn.Execute(sql_string);
           }
       }