Difference between revisions of "Processing PubMed Data"

From ECRIN-MDR Wiki
Jump to navigation Jump to search
(Data Harvest)
(Data Aggregation)
Line 289: Line 289:
  
 
===Data Aggregation===
 
===Data Aggregation===
public void SetupTempPMIDTable()
+
 
 +
        public void SetupTempPMIDTable()
 
         {
 
         {
 
             using (var conn = new NpgsqlConnection(connString))
 
             using (var conn = new NpgsqlConnection(connString))

Revision as of 17:00, 23 November 2020

Introduction

The PubMed data has a range of 'special issues' that demand additional processing in several places in the system. This is the only data that is not directly linked to studies within the data source, so the data objects retrieved from it, the PubMed citations and associated PMC articles, have to be linked within the MDR. There are two mechanisms for this. The first is embedded in the Pubmed data itself, in the shape of 'bank ids'. These are the identifiers of associated entries in 'databanks', and that term includes trial registries. This allows a PubMed citation to explicitly reference an associated study, through that study's registry id - most often but not exclusively the ClinicalTrials.gov NCT number. The second is in the 'associated reference' data found in some data sources. This appears under various names and in various forms, but usually includes a PubMed identifier, at least for the great majority of cited references (some are given only as DOIs or as textual citations). Note that although this list of references is obtained during a study based source's extraction, the citations themselves are not downloaded and processed into the system during that extraction. Instead all PubMed data is extracted and processed only when referencing PubMed (id = 100135) as the data source.
The system has to consider both PubMed linkage types, firstly when identifying the relevant PubMed data to download (PubMed contains well over 30 million records, so we only want the data we can link) and secondly when linking PubMed data objects to studies during data aggregation. The harvesting process for PubMed is also relatively complex, requiring additional calls into the contextual data. The PubMed specific aspects of data download, harvest and aggregation are therefore summarised below.

Data Download

Under normal circumstances downloads of PubMed data need to obtain only records that have been added to, or revised within, the PubMed system since the previous download. PubMed downloads therefore come with a cut-off date, that specifies only records added / edited on or after that date need be (re-) downloaded. In addition, they come with a search filter. The first is used to select records in PubMed that include 'bank' references to a trial registry. the second is used to assemble the reference lists already existing in the MDR, and use that list as the basis for checking revision dates in the PubMed database. In each case the 'type' of the saf (search and fetch) operation is 114: All new or revised records with a filter applied (download).
In the first, bank based, download the filter parameter is 10003: PubMed abstracts with references to any trial registry, whilst in the second, references based download it is 10004: Identifies PubMed references in Study sources that have not yet been downloaded.
Variants of these downloads exist using type 121: Filtered records (download). With filter 10003 this will cause the download of all bank-linked records to be downloaded, irrespective of date, whilst with filter 10004 all records referenced in study sources will be downloaded, irrespective of revision date. Use of this download type may be used occasionally, e.g. annually, after the Pubmed database itself is revised, to ensure synchronisation with the source. In all cases the download mechanism is divided into two parts:

  • the creation of a list of the relevant PubMed identifiers (PMIDs), from a search of PubMed or from examining and collecting the reference data in the MDR
  • the download of the relevant files from PubMed, based on the list of PMIDs.

In the case of the bank-linked search, the application of the cut-off date forms part of the search parameters. In the case of the reference-linked search, the cut-off date is applied during the download process.

Searching Pubmed for Bank linked records

The PubMed API allows records revised or added in a date range to be identified, and at the same time allows the search results to be filtered to include only those with reference to a specific databank, but not to any databank. It is therefore necessary to loop through each of the possible trial registry 'banks' and obtain the relevant records for each, adding these to a table each time. In the code, shown below, the base URL for the Pubmed site is first established, along with other variables - retmax gives the maximum number of record ids to be returned in any search call, while include_dates uses the type_id parameter to determine if a cu-off date is to be applied. The various database tables required are set up, and then a list of the trial registries is retrieved from the database as the variable banks.
The API call is then constructed, using the base URL, the trial registry id (as known to the API, in the code called nlm_abbrev), followed by '[SI]', which identifies this parameter as referring to a particular databank, and then if required (they normally are) the cut off and current dates as the minimum and maximum dates of the selected date range against the 'mdat' parameterthe process for each databask, in the , in a yyyy/MM/dd format. The total number of PubMed Ids to be retrieved for this databank is retrieved by an initial API call, which examines the Count variable in the returned XML, and the number of calls required is then calculated using retmax.
The system then loops through each of the required calls, waiting 2 seconds between each, adding the start position and maximum size parameters for the returned id list. The sort of API string that results is shown below.
https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=ClinicalTrials.gov[SI]&mindate=2020/10/01&maxdate=2020/11/20&datetype=mdat&retstart=1&retmax=1000
The record Ids found are stored in the temp_pmids_by_bank table, and the accumulated records are transferred, at the end of each process, in the pmids_by_bank_total table.

        public async Task CreatePMIDsListfromBanksAsync()
        {
            int totalRecords, numCallsNeeded, bank_id;
            int retmax = 1000;
            string search_term;
            CopyHelpers helper = new CopyHelpers();
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=";
            bool include_dates = (args.type_id == 114) ? true : false;

            pubmed_repo.SetUpTempPMIDsByBankTable();
            pubmed_repo.SetUpBankPMIDsTable();
            pubmed_repo.SetUpDistinctBankPMIDsTable();

            // Get list of potential linked data banks (includes trial registries)
            IEnumerable<PMSource> banks = pubmed_repo.FetchDatabanks();

            foreach (PMSource s in banks)
            {
                // get databank details
                bank_id = s.id;
                search_term = s.nlm_abbrev + "[SI]";
                if (include_dates)
                {
                    string today = DateTime.Now.ToString("yyyy/MM/dd");
                    string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
                    string date_term = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
                    search_term += date_term;
                }
                string url = baseURL + search_term;


                // Get the number of total records that have this databank reference
                // and calculate the loop parameters
                totalRecords = await pubmed_repo.GetBankDataCountAsync(url);
                numCallsNeeded = (int)(totalRecords / retmax) + 1;
                pubmed_repo.TruncateTempPMIDsByBankTable();

                // loop through the records and obtain and store relevant
                // records retmax (= 1000) at a time

                for (int i = 0; i < numCallsNeeded; i++)
                {
                    try
                    {
                        int start = i * 1000;
                        string selectedRecords = "&retstart=" + start.ToString() + "&retmax=" + retmax.ToString();

                        // Put a 2 second pause before each call.
                        await Task.Delay(2000);
                        string responseBody = await webClient.GetStringAsync(url + selectedRecords);

                        // The eSearchResult class allows the returned json string to be easily deserialised
                        // and the required values, of each Id in the IdList, can then be read.

                        XmlSerializer xSerializer = new XmlSerializer(typeof(eSearchResult));
                        using (TextReader reader = new StringReader(responseBody))
                        {
                            eSearchResult result = (eSearchResult)xSerializer.Deserialize(reader);
                            if (result != null)
                            {
                                var FoundIds = Array.ConvertAll(result.IdList, ele => new PMIDByBank(ele.ToString()));
                                pubmed_repo.StorePMIDsByBank(helper.bank_ids_helper, FoundIds);

                                string feedback = "Storing " + retmax.ToString() +
                                                  " records from " + start.ToString() + " in bank " + search_term;
                                StringHelpers.SendFeedback(feedback);
                            }
                        }
                    }

                    catch (HttpRequestException e)
                    {
                        StringHelpers.SendError("In PubMed CreatePMIDsListfromBanksAsync(): " + e.Message);
                    }
                }

                // transfer across to total table...
                pubmed_repo.TransferBankPMIDsToTotalTable(s.nlm_abbrev);
            }

            pubmed_repo.DropTempPMIDByBankTable();
            pubmed_repo.FillDistinctBankPMIDsTable();
        }   

At the end of the process the pmids_by_bank_total table has a record of each PMID that meets the original search criteria. A call is then made to the FillDistinctBankPMIDsTable procedure. This stores the distinct PMIDs found into the previously created distinct_pmids_by_bank table, grouping those PMIDs into 10s (using the table's identity column identity column and integer division). The PMIDs are then prepared for the next, file download, stage.

       public void SetUpDistinctBankPMIDsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"DROP TABLE IF EXISTS pp.distinct_pmids_by_bank;
                       CREATE TABLE IF NOT EXISTS pp.distinct_pmids_by_bank(
                         identity int GENERATED ALWAYS AS IDENTITY
                       , group_id int
                       , pmid varchar)";
                conn.Execute(sql_string);
            }
        }

        public void FillDistinctBankPMIDsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO pp.distinct_pmids_by_bank(
                          pmid)
                          SELECT DISTINCT pmid
                          FROM pp.pmids_by_bank_total
                          ORDER BY pmid;";
                conn.Execute(sql_string);

                sql_string = @"Update pp.distinct_pmids_by_bank
                          SET group_id = identity / 10;";
                conn.Execute(sql_string);
            }
        }

Assembling lists of Pubmed references

The study_reference records are collected - one database at a time - and transferred to a single table. After the various tables required are established the system retrieves a list of the study based data sources with reference records. It loops through each of those references, getting the PMIDs as a collection of PMIDBySource objects in each case and transferring them to the pmids_by_source_total table. A similar call to that for the bank PMIDs is made, to the FillDistinctSourcePMIDsTable procedure, to store the distinct PMIDs into another table (distinct_pmids_by_source), and again group these into 10s.

        public void CreatePMIDsListfromSources()
        {
            // Establish tables and support objects

            pubmed_repo.SetUpTempPMIDsBySourceTable();
            pubmed_repo.SetUpSourcePMIDsTable();
            pubmed_repo.SetUpDistinctSourcePMIDsTable();
            CopyHelpers helper = new CopyHelpers();
            IEnumerable<PMIDBySource> references;

            // Loop through the study databases that hold
            // study_reference tables, i.e. with pmid ids
            IEnumerable<Source> sources = pubmed_repo.FetchSourcesWithReferences();
            foreach (Source s in sources)
            {
                pubmed_repo.TruncateTempPMIDsBySourceTable();
                references = pubmed_repo.FetchSourceReferences(s.database_name);
                pubmed_repo.StorePmidsBySource(helper.source_ids_helper, references);
                pubmed_repo.TransferSourcePMIDsToTotalTable(s.id);
            }

            pubmed_repo.FillDistinctSourcePMIDsTable();
            pubmed_repo.DropTempPMIDBySourceTable();
        }

The process is therefore much simpler and quicker than the API calls required for identifying the bank-linked PMIDs, but the end result is very similar - a list of PMIDs, each belonging to a numbered group of 10. Note that list only consists of PMIDs - the purpose is only to identify the PubMed data of interest and download it - linking that data to studies comes later.

Download operations

However the list of PMIDs is obtained, the download functionality is very similar.
The PMID list is retrieved as strings, each one holding 10 Ids, called idstrings, using either the FetchDistinctBankPMIDStrings() or the FetchDistinctSourcePMIDStrings() functions. These both use the same 'partitiion over' mechanism to concatenate the PMIDs from 10 records (as grouped during the earlier search phase) into a single string, for example:

        public IEnumerable<string> FetchDistinctSourcePMIDStrings()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"select distinct string_agg(pmid, ', ') 
                        OVER ( PARTITION BY group_id) 
                        from pp.distinct_pmids_by_source;";
                return conn.Query<string>(sql_string);
            }
        }

Each idstring, when inserted into an API call, will therefore download 10 records at a time, but the other components of the API string first need to be constructed. In this case, because of the volume of download required, the base URL requires not only the address of the NLM fetch API, but also an indicator of the tool being used and a contact email (as per the NLM's data access rules). The API call is then made is then by adding the idstring and '&retmode=xml' to the base URL, and each individual article node is extracted from the returned XML. At this stage there is very little additional processing of the downloaded data, but a call is made to GetDateLastRevised function, which returns that date by parsing a few nodes in the XML. The last revised data is then checked against the cut-off date, unless this download is of type 121, which does not require a date parameter. All bank-linked PMIDs should pass the test (because they have been already been filtered), but the source reference PMIDs will be a mix and only require downloading if they are new or have been revised since the last download date. The source data objects table needs to be interrogated to discover whether the file is completely new to the system - in which case a new object source record is added and the XML is written out as a file to the appropriate folder - or an update of an existing file - in which case the object source record is updated and the XML is used to replace the file that is already there.

        public async Task<DownloadResult> DownloadPubmedEntriesAsync(IEnumerable<string> idstrings)
        {
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?";
            baseURL += "tool=ECRINMDR&email=steve.canham@ecrin.org&db=pubmed&id=";
            DownloadResult res = new DownloadResult();
            bool ignore_revision_date = (args.type_id == 121) ? true : false;

            try
            {
                // loop through the references - already in groups of 10 
                // from processing in the database call

                foreach (string idstring in idstrings)
                {
                    // Construct the fetch URL using the 10 Ids and
                    // retrieve the articles as nodes

                    string url = baseURL + idstring + "&retmode=xml";
                    XmlDocument xdoc = new XmlDocument();
                    string responseBody = await webClient.GetStringAsync(url);
                    xdoc.LoadXml(responseBody);
                    XmlNodeList articles = xdoc.GetElementsByTagName("PubmedArticle");

                    // Consider each article node in turn

                    foreach (XmlNode article in articles)
                    {
                        string pmid = article.SelectSingleNode("MedlineCitation/PMID").InnerText;
                        if (Int32.TryParse(pmid, out int ipmid))
                        {
                            // get current or new file download record, calculate
                            // and store last revised date. Write new or replace
                            // file and update file_record (by ref).
                            res.num_checked++;
                            DateTime? last_revised_datetime = GetDateLastRevised(article);
                            ObjectFileRecord file_record = logging_repo.FetchObjectFileRecord(pmid, source.id);
                            if (file_record == null)
                            {
                                string remote_url = "https://www.ncbi.nlm.nih.gov/pubmed/" + pmid;
                                file_record = new ObjectFileRecord(source.id, pmid, remote_url, saf_id);
                                file_record.last_revised = last_revised_datetime;
                                WriteNewFile(article, ipmid, file_record);
                                
                                logging_repo.InsertObjectFileRec(file_record);
                                res.num_added++;
                                res.num_downloaded++;
                            }
                            else
                            {
                                // normally should be less then but here <= to be sure
                                if (ignore_revision_date || 
                                   (last_revised_datetime != null 
                                           && file_record.last_downloaded <= last_revised_datetime))
                                {
                                    file_record.last_saf_id = saf_id;
                                    file_record.last_revised = last_revised_datetime;
                                    ReplaceFile(article, file_record);

                                    logging_repo.StoreObjectFileRec(file_record);
                                    res.num_downloaded++;
                                }
                            }

                            if (res.num_checked % 100 == 0) StringHelpers.SendFeedback(res.num_checked.ToString());
                        }
                    }

                    System.Threading.Thread.Sleep(800);
                }

                return res;
            }

            catch (HttpRequestException e)
            {
                StringHelpers.SendError("In PubMed DownloadPubmedEntriesUsingSourcesAsync(): " + e.Message);
                return res;
            }
        }

Data Harvest

Because no processing of the PubMed record takes place during download, all of the transformation of the data into the ECRIN metadata format has to take place during the harvesting stage. The PubMed data is relatively rich, but some additional processing is required to find the 'managing organisation' - the publisher - of the related journal article.
The data available in each file, and its structure, is described in PubMed Data Structure, which is derived from the data available on the PubMed web site. This describes the structure of the source data, identifies the main elements and their attributes, and indicates which are extracted within the MDR system.
The code for the Harvest process is available on the ECRIN GitHub site, so there is no attempt to go through all aspects of the data processing and transformation during harvesting. Instead some of the main points, issues and features are listed as bullet points below:

  • The initial purpose of the harvest process is to produce and store a 'citation object' rather than a 'data object'. Both are stored in the sd tables of the database, in tables of the same name. Data objects records are created before any import, from the Citation Object, so the data structures in the ad schema are the same as for all other data sources. This is because it is impossible to derive the Managing Organisation from the PubMed data alone, so additional processing is required before the Data Objects can be created.
  • As with other harvest operations, a blank Object is created at the beginning of the process, and the values of data points and attributes are gradually added to it, in this case by parsing the XML file, identifying element and attribute nodes, and reading the date values from them.
  • The main citation object created - for all PubMed metadata files - is the PubMed citation itself. In about a third of cases, the publicly available PMC article is also added to the system as an additional data object.
  • The processing of titles is relatively complex, partly because of the use of brackets and other indicators of different title origin, which need to be properly interpreted and processed, and partly because of so-called 'vernacular titles'.
  • It is essential to obtain electronic and print issn values for the journal, not because they are used in the ECRIN metadata schema but because they can be used to identify the journal publisher. This is done after the main harvest, by using lookup tables in the context database.
  • Dates are expressed in a variety of formats and need processing to convert to the more standard format used by the ECRIN metadata scheme. Some may represent a range rather than a single data, and some may be partial. A variety of utility functions are required for data processing.
  • Topic data comes from a variety of sections within the source material, and therefore requires aggregation into a standard structure within the harvesting process.
  • Date and identifiers can also be found in different parts of the XML source record. In some cases they can be duplicated. It is therefore necessary to check any addition of these types of data against what is already present.
  • The display title of the object is not the article title, but a full citation in the form of authors - article title - pagination and journal details. This citation therefore has to be constructed during extraction, in a consistent fashion. A variety of data points are extracted for only this purpose, and are not used elsewhere in the ECRIN metadata schema.

Data Aggregation

       public void SetupTempPMIDTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"DROP TABLE IF EXISTS nk.temp_pmids;
                     CREATE TABLE IF NOT EXISTS nk.temp_pmids(
                       source_id                INT
                     , sd_oid                   VARCHAR
                     , parent_study_source_id   INT 
                     , parent_study_sd_sid      VARCHAR
                     , datetime_of_data_fetch   TIMESTAMPTZ
                     ); ";
               conn.Execute(sql_string);
           }
       }
       public void SetupDistinctPMIDTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"DROP TABLE IF EXISTS nk.distinct_pmids;
                     CREATE TABLE IF NOT EXISTS nk.distinct_pmids(
                       source_id                INT
                     , sd_oid                   VARCHAR
                     , parent_study_source_id   INT 
                     , parent_study_sd_sid      VARCHAR
                     , datetime_of_data_fetch   TIMESTAMPTZ
                     ); ";
               conn.Execute(sql_string);
           }
       }


       public IEnumerable<PMIDLink> FetchBankPMIDs()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"select 
                       100135 as source_id, 
                       d.id as parent_study_source_id, 
                       k.sd_oid, k.id_in_db as parent_study_sd_sid, 
                       a.datetime_of_data_fetch
                       from pubmed_ad.object_db_links k
                       inner join pubmed_ad.data_objects a 
                       on k.sd_oid = a.sd_oid
                       inner join context_ctx.nlm_databanks d
                       on k.db_name = d.nlm_abbrev
                       where bank_type <> 'databank'";
               return conn.Query<PMIDLink>(sql_string);
           }
       }


       public ulong StorePMIDLinks(PostgreSQLCopyHelper<PMIDLink> copyHelper, IEnumerable<PMIDLink> entities)
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               conn.Open();
               return copyHelper.SaveAll(conn, entities);
           }
       }


       public IEnumerable<PMIDLink> FetchSourceReferences(int source_id, string db_name)
       {
           builder.Database = db_name;
           string db_conn_string = builder.ConnectionString;
           using (var conn = new NpgsqlConnection(db_conn_string))
           {
               string sql_string = @"select 
                       100135 as source_id, " +
                       source_id.ToString() + @" as parent_study_source_id, 
                       r.pmid as sd_oid, r.sd_sid as parent_study_sd_sid, 
                       s.datetime_of_data_fetch
                       from ad.study_references r
                       inner join ad.studies s
                       on r.sd_sid = s.sd_sid
                       where r.pmid is not null;";
               return conn.Query<PMIDLink>(sql_string);
           }
       }


       public void FillDistinctPMIDsTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // First ensure that any PMIDs (sd_oids) are in the same format
               // Some have a 'tail' of spaces after them, as the standard 
               // length of a sd_oid is 24 characters.
               
               string sql_string = @"UPDATE nk.temp_pmids
                        SET sd_oid = trim(sd_oid);";
               conn.Execute(sql_string);
               // Then transfer the distinct data
               sql_string = @"INSERT INTO nk.distinct_pmids(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid)
                        SELECT distinct 
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid
                        FROM nk.temp_pmids;";
               conn.Execute(sql_string);
               // Update with latest datetime_of_data_fetch
               sql_string = @"UPDATE nk.distinct_pmids dp
                        set datetime_of_data_fetch = mx.max_fetch_date
                        FROM 
                        ( select sd_oid, parent_study_sd_sid, 
                          max(datetime_of_data_fetch) as max_fetch_date
                          FROM nk.temp_pmids
                          group by sd_oid, parent_study_sd_sid ) mx
                        WHERE dp.parent_study_sd_sid = mx.parent_study_sd_sid
                        and dp.sd_oid = mx.sd_oid;";
               conn.Execute(sql_string);
           }
       }


       public void CleanPMIDsdsidData1()
       {
           string sql_string = "";
           using (var conn = new NpgsqlConnection(connString))
           {
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = 'ACTRN' || parent_study_sd_sid
                       WHERE parent_study_source_id = 100116
                       AND length(parent_study_sd_sid) = 14;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, ' ', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, '#', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, ':', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, '[', )
                       WHERE parent_study_source_id = 100116;";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CHICTR', 'ChiCTR')
                       WHERE parent_study_source_id = 100118;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = 'ChiCTR-' || parent_study_sd_sid
                       WHERE parent_study_source_id = 100118
                       and parent_study_sd_sid not ilike 'ChiCTR-%';";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                       SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'ChiCTR-ChiCTR', 'ChiCTR-')
                       WHERE parent_study_source_id = 100118;";
               conn.Execute(sql_string);
           }
       }


       public void CleanPMIDsdsidData2()
       {
           string sql_string = "";
           using (var conn = new NpgsqlConnection(connString))
           {
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, '/', '-')
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = 'CTRI-' || parent_study_sd_sid
                    WHERE parent_study_source_id = 100121
                    and parent_study_sd_sid not ilike 'CTRI-%';";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'REF-', )
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CTRI-CTRI', 'CTRI-')
                    WHERE parent_study_source_id = 100121;";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = 'RPCEC' || parent_study_sd_sid
                  WHERE parent_study_source_id = 100122
                  and parent_study_sd_sid not ilike 'RPCEC%';";
               conn.Execute(sql_string);


               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = UPPER(parent_study_sd_sid)
                  WHERE parent_study_source_id = 100123;"; 
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, ' ', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, '–', '-')
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRA-CT', 'EUDRACT')
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRACT', )
                   WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EURODRACT', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EU', )
                  WHERE parent_study_source_id = 100123;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, 'CT', )
                  WHERE parent_study_source_id = 100123;";
               sql_string = @"UPDATE nk.distinct_pmids
                  SET parent_study_sd_sid = left(parent_study_sd_sid, 14)
                  WHERE parent_study_source_id = 100123
                  and length(parent_study_sd_sid) > 14;";
               conn.Execute(sql_string);
           }
       }


       public void TransferPMIDLinksToObjectIds()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = @"INSERT INTO nk.temp_object_ids(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, datetime_of_data_fetch)
                        SELECT  
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, datetime_of_data_fetch
                        FROM nk.distinct_pmids";
               conn.Execute(sql_string);
           }
       }


       public void InputPreferredSDSIDS()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // replace any LHS sd_sids with the 'preferred' RHS
               string sql_string = @"UPDATE nk.temp_object_ids b
                              SET parent_study_sd_sid = preferred_sd_sid,
                              parent_study_source_id = preferred_source_id
                              FROM nk.study_study_links k
                              WHERE b.parent_study_sd_sid = k.sd_sid
                              and b.parent_study_source_id = k.source_id ;";
               conn.Execute(sql_string);
               // That may have produced some duplicates - if so get rid of them
               // needs to be done indirectly because of the need to get the maximum
               // datetime_of_data_fetch for each duplciated object
               sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids2;
               CREATE TABLE IF NOT EXISTS nk.temp_object_ids2(
                 object_id                INT
               , source_id                INT
               , sd_oid                   VARCHAR
               , parent_study_source_id   INT
               , parent_study_sd_sid      VARCHAR
               , parent_study_id          INT
               , is_preferred_study       BOOLEAN  default true
               , datetime_of_data_fetch   TIMESTAMPTZ
               ); ";
               conn.Execute(sql_string);
               sql_string = @"INSERT INTO nk.temp_object_ids2(
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, parent_study_id)
                        SELECT distinct 
                        source_id, sd_oid, parent_study_source_id, 
                        parent_study_sd_sid, parent_study_id
                        FROM nk.temp_object_ids;";
               conn.Execute(sql_string);
               // update with latest datetime_of_data_fetch
               // for each distinct study - data object link 
               sql_string = @"UPDATE nk.temp_object_ids2 to2
                        set datetime_of_data_fetch = mx.max_fetch_date
                        FROM 
                        ( select sd_oid, parent_study_sd_sid,
                          max(datetime_of_data_fetch) as max_fetch_date
                          FROM nk.temp_object_ids
                          group by sd_oid, parent_study_sd_sid ) mx
                        WHERE to2.sd_oid = mx.sd_oid
                        and to2.parent_study_sd_sid = mx.parent_study_sd_sid;";
               conn.Execute(sql_string);
               sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids;
               ALTER TABLE nk.temp_object_ids2 RENAME TO temp_object_ids;";
               conn.Execute(sql_string);
               // Maybe a few blank pmids slip through...
               sql_string = @"delete from nk.temp_object_ids
                   where sd_oid is null or sd_oid = ;";
               conn.Execute(sql_string);
           }
       }


       public void ResetIdsOfDuplicatedPMIDs()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               // Find the minimum object_id for each PMID in the table
               // source id for PubMed = 100135
               string sql_string = @"DROP TABLE IF EXISTS nk.temp_min_object_ids;
                                    CREATE TABLE nk.temp_min_object_ids as
                                    SELECT sd_oid, Min(id) as min_id
                                    FROM nk.all_ids_data_objects
                                    WHERE source_id = 100135
                                    GROUP BY sd_oid;";
               conn.Execute(sql_string);
               sql_string = @"UPDATE nk.all_ids_data_objects b
                              SET object_id = min_id
                              FROM nk.temp_min_object_ids m
                              WHERE b.sd_oid = m.sd_oid
                              and source_id = 100135;";
               conn.Execute(sql_string);
               sql_string = @"DROP TABLE nk.temp_min_object_ids;";
               conn.Execute(sql_string);
               // ???? May be (yet) more duplicates have appeared, where a 
               // study - pmid link has been generated in more than one way
               // (Links of the same paper to different studies are not uncommon)
           }
       }


       public void DropTempPMIDTable()
       {
           using (var conn = new NpgsqlConnection(connString))
           {
               string sql_string = "DROP TABLE IF EXISTS pp.temp_pmids";
               conn.Execute(sql_string);
           }
       }