Difference between revisions of "Processing PubMed Data"

From ECRIN-MDR Wiki
Jump to navigation Jump to search
(Searching Pubmed for Bank linked records)
(Assembling lists of Pubmed references)
Line 146: Line 146:
 
</div>
 
</div>
  
====Assembling lists of Pubmed references====
+
====Downloaded updated versions of Pubmed references====
 +
In this second form of PubMed download, the Pubmed references that are listed within study based data sources (especially but not exclusively ClinicalTrials.gov) are checked to see if they have been updated or added during the specified period, normally since the last download of this type. This is a more complex task that has to be done in 4 stages:<br/>
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
    Assembly of PMIDs referenced in source databases => Upload of Ids to the Entrez History server => Search of uploaded Ids for those revised or added in the specified period => Download of any 'qualifying' Pubmed records.
 +
</pre>
 +
</div>
 
The study_reference records are collected - one database at a time - and transferred to a single table. After the various tables required are established the system retrieves a list of the study based data sources with reference records. It loops through each of those references, getting the PMIDs as a collection of PMIDBySource objects in each case and transferring them to the pmids_by_source_total table. A similar call to that for the bank PMIDs is made, to the FillDistinctSourcePMIDsTable procedure, to store the distinct PMIDs into another table (distinct_pmids_by_source), and again group these into 10s.
 
The study_reference records are collected - one database at a time - and transferred to a single table. After the various tables required are established the system retrieves a list of the study based data sources with reference records. It loops through each of those references, getting the PMIDs as a collection of PMIDBySource objects in each case and transferring them to the pmids_by_source_total table. A similar call to that for the bank PMIDs is made, to the FillDistinctSourcePMIDsTable procedure, to store the distinct PMIDs into another table (distinct_pmids_by_source), and again group these into 10s.
 
<div style="font-family: monospace; font-size: 13px" >
 
<div style="font-family: monospace; font-size: 13px" >

Revision as of 12:15, 1 January 2021

Introduction

The PubMed data has a range of 'special issues' that demand additional processing in several places in the system. This is the only data that is not directly linked to studies within the data source, so the data objects retrieved from it, the PubMed citations and associated PMC articles, have to be linked within the MDR. There are two mechanisms for this. The first is embedded in the Pubmed data itself, in the shape of 'bank ids'. These are the identifiers of associated entries in 'databanks', and that term includes trial registries. This allows a PubMed citation to explicitly reference an associated study, through that study's registry id - most often but not exclusively the ClinicalTrials.gov NCT number. The second is in the 'associated reference' data found in some data sources. This appears under various names and in various forms, but usually includes a PubMed identifier, at least for the great majority of cited references (some are given only as DOIs or as textual citations). Note that although this list of references is obtained during a study based source's extraction, the citations themselves are not downloaded and processed into the system during that extraction. Instead all PubMed data is extracted and processed only when referencing PubMed (id = 100135) as the data source.
The system has to consider both PubMed linkage types, firstly when identifying the relevant PubMed data to download (PubMed contains well over 30 million records, so we only want the data we can link) and secondly when linking PubMed data objects to studies during data aggregation. The harvesting process for PubMed is also relatively complex, requiring additional calls into the contextual data. The PubMed specific aspects of data download, harvest and aggregation are therefore summarised below.

Data Download

Under normal circumstances downloads of PubMed data need to obtain only records that have been added to, or revised within, the PubMed system since the previous download. PubMed downloads therefore come with a cut-off date, that specifies only records added / edited on or after that date need be (re-) downloaded. In addition, they come with a search filter. The first is used to select records in PubMed that include 'bank' references to a trial registry. the second is used to assemble the reference lists already existing in the MDR, and use that list as the basis for checking revision dates in the PubMed database. In each case the 'type' of the saf (search and fetch) operation is 114: All new or revised records with a filter applied (download).
In the first, bank based, download the filter parameter is 10003: PubMed abstracts with references to any trial registry, whilst in the second, references based download it is 10004: Identifies PubMed references in Study sources that have not yet been downloaded.
Variants of these downloads exist using type 121: Filtered records (download). With filter 10003 this will cause the download of all bank-linked records to be downloaded, irrespective of date, whilst with filter 10004 all records referenced in study sources will be downloaded, irrespective of revision date. Use of this download type may be used occasionally, e.g. annually, after the Pubmed database itself is revised, to ensure synchronisation with the source. In all cases the download mechanism makes use of the 'Entrez' utilities API made available by the NCBI in the US, which supports PubMed along with many other (largely -omic) databases.
(see Sayers E. A General Introduction to the E-utilities. In: Entrez Programming Utilities Help [Internet]. Bethesda (MD): National Center for Biotechnology Information (US); 2010-. Available from: https://www.ncbi.nlm.nih.gov/books/NBK25497/).

Searching Pubmed for Bank linked records

The simpler task is to identify the records in PubMed that have a reference to a 'databank', in this context a trial registry, that have been revised or added since the last download (which provides the 'cut-off date'). The code is shown below. After some initial variable initialisations, and the conversion of the cut-off date into a suitable search parameter, each of the possible databanks is considered in turn. For each a search URL is composed, that consists of

  • the base URL of the Entrez Search engine
  • the database name ('pubmed')
  • An API 'key'
  • A query term, that consists of
    • the name of the databank, as listed by NCBI, plus '{SI]', which identifies the type of parameter as a databank, plus
    • the date range to be searched for a relevant revision date, in the required format
  • a flag that indicates that the results should be stored in the Entrez system, rather than being immediately downloaded, on what is referred to as the 'History' server. This allows intermediate results to be stored at the NCBI rather than being downloaded, and is a powerful part of the available Entrez workflow.

The 'API key' is an additional parameter provided by NCBI to individual users and serves to identify the API user - it is available free of charge after registration with the NCBI and is a hex digit string, 36 digits long. It should be included in all regular Entrez API calls, and doing so increases the allowed response frequency of the system - up to 5 calls can be made a second as long as an API key is present. The key is stored, like other sensitive data, in the appsettings.json file and read from there at programme startup - it can then be accessed through the logging data layer (logging_repo). It is not visible in the code on GitHub.
The relevant lines of code are:

string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
search_term = "&term=" + s.nlm_abbrev + "[SI]" + date_term;
string search_url = search_baseURL + api_key + search_term + "&usehistory=y";

The response to this API call contains, amongst other things,

  • The number of records found
  • an integer 'query key'
  • a string 'web environment'

The latter two are necessary to reference the list of Ids that have been stored on the Entrez History server.
This is done in the next stage of the routine. Assuming there are any records at all (numbers vary from 0 to several hundred, if the search period is about a week, for each trial registry) the a 'fetch' API call is constructed, downloading up to 100 records at a time, The URL needs to include

  • the base URL of the Entrez Fetch engine
  • the database name ('pubmed')
  • The API 'key'
  • A WebEnv parameter that includes the web environment returned from the search query
  • A query_key parameter that includes the query keyenvironment returned from the search query
  • 'retstart' and 'retmax' (=100) parameters that indicate which 100 records should be returned (added within a loop)
  • A 'retmode' parameter that indicates that the data should be returned as concatenated XML files.

The relevant code is below:

string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
fetch_baseURL += api_key + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
numCallsNeeded = (int)(totalRecords / retmax) + 1;
for (int i = 0; i < numCallsNeeded; i++)
{ 
       ...
       string fetch_URL = fetch_baseURL + "&retstart=" + (i * retmax).ToString() + "&retmax=" + retmax.ToString();
       fetch_URL += "&retmode=xml";
       ...

Once the URL is constructed it is passed to the FetchPubMedRecordsAsync function, along with a 'download result' object, that accumulates the results of eachdownload and which ultimately is returned at the end of the whole process. The FetchPubMedRecordsAsync function calls the API and - assuming data is returned - deserialises it to up to 100 separate pubmed 'Articles'. Each of these is then compared to the object source data table in the 'mon' database. If a new citation it is written out as a new file and a new record is added to the object source table. If not it replaces the existing file and the record in the object source table is updated.
The code for the whole top level function is shown below.

        public async Task<DownloadResult> ProcessPMIDsListfromBanksAsync()
        {
            int totalRecords = 0, numCallsNeeded = 0, bank_id = 0;
            string search_term = "", date_term = "";
            DownloadResult res = new DownloadResult();
            XmlSerializer xSerializer = new XmlSerializer(typeof(eSearchResult));

            // This search can be (and usually is) date sensitive.

            if (args.type_id == 114)
            {
                string today = DateTime.Now.ToString("yyyy/MM/dd");
                string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
                date_term = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
            }

            // Get list of potential linked data banks (includes trial registries).

            IEnumerable<PMSource> banks = pubmed_repo.FetchDatabanks();
            foreach (PMSource s in banks)
            {
                // get databank details

                bank_id = s.id;
                string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
                search_term = "&term=" + s.nlm_abbrev + "[SI]" + date_term;
                string search_url = search_baseURL + api_key + search_term + "&usehistory=y";

                // Get the number of total records that have this databank reference
                // and that (usually) have been revised recently 
                // and calculate the loop parameters.

                string responseBody = await ch.GetStringFromURLAsync(search_url);
                if (responseBody != null)
                {
                    using (TextReader reader = new StringReader(responseBody))
                    {
                        // The eSearchResult class corresponds to the returned data.

                        eSearchResult result = (eSearchResult)xSerializer.Deserialize(reader);
                        if (result != null)
                        {
                            totalRecords = result.Count;
                            query_key = result.QueryKey;
                            web_env = result.WebEnv;
                        }
                    }

                    // loop through the records and obtain and store relevant
                    // records, of PubMed Ids, retmax (= 100) at a time     

                    if (totalRecords > 0)
                    {
                        int retmax = 100;
                        string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
                        fetch_baseURL += api_key + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
                        numCallsNeeded = (int)(totalRecords / retmax) + 1;
                        for (int i = 0; i < numCallsNeeded; i++)
                        {
                            try
                            {
                                // Retrieve the articles as nodes.

                                string fetch_URL = fetch_baseURL + "&retstart=" + (i * retmax).ToString() + "&retmax=" + retmax.ToString();
                                fetch_URL += "&retmode=xml";
                                await FetchPubMedRecordsAsync(fetch_URL, res);
                                System.Threading.Thread.Sleep(300);
                            }

                            catch (HttpRequestException e)
                            {
                                logging_repo.LogError("In PubMed CreatePMIDsListfromBanksAsync(): " + e.Message);
                                return null;
                            }
                        }
                    }

                    logging_repo.LogLine("Processed " + totalRecords.ToString() + " from " + s.nlm_abbrev);
                }
            }

            return res;
        }

Downloaded updated versions of Pubmed references

In this second form of PubMed download, the Pubmed references that are listed within study based data sources (especially but not exclusively ClinicalTrials.gov) are checked to see if they have been updated or added during the specified period, normally since the last download of this type. This is a more complex task that has to be done in 4 stages:

    Assembly of PMIDs referenced in source databases => Upload of Ids to the Entrez History server => Search of uploaded Ids for those revised or added in the specified period => Download of any 'qualifying' Pubmed records.

The study_reference records are collected - one database at a time - and transferred to a single table. After the various tables required are established the system retrieves a list of the study based data sources with reference records. It loops through each of those references, getting the PMIDs as a collection of PMIDBySource objects in each case and transferring them to the pmids_by_source_total table. A similar call to that for the bank PMIDs is made, to the FillDistinctSourcePMIDsTable procedure, to store the distinct PMIDs into another table (distinct_pmids_by_source), and again group these into 10s.

        public void CreatePMIDsListfromSources()
        {
            // Establish tables and support objects

            pubmed_repo.SetUpTempPMIDsBySourceTable();
            pubmed_repo.SetUpSourcePMIDsTable();
            pubmed_repo.SetUpDistinctSourcePMIDsTable();
            CopyHelpers helper = new CopyHelpers();
            IEnumerable<PMIDBySource> references;

            // Loop through the study databases that hold
            // study_reference tables, i.e. with pmid ids
            IEnumerable<Source> sources = pubmed_repo.FetchSourcesWithReferences();
            foreach (Source s in sources)
            {
                pubmed_repo.TruncateTempPMIDsBySourceTable();
                references = pubmed_repo.FetchSourceReferences(s.database_name);
                pubmed_repo.StorePmidsBySource(helper.source_ids_helper, references);
                pubmed_repo.TransferSourcePMIDsToTotalTable(s.id);
            }

            pubmed_repo.FillDistinctSourcePMIDsTable();
            pubmed_repo.DropTempPMIDBySourceTable();
        }

The process is therefore much simpler and quicker than the API calls required for identifying the bank-linked PMIDs, but the end result is very similar - a list of PMIDs, each belonging to a numbered group of 10. Note that list only consists of PMIDs - the purpose is only to identify the PubMed data of interest and download it - linking that data to studies comes later.

Download operations

However the list of PMIDs is obtained, the download functionality is very similar.
The PMID list is retrieved as strings, each one holding 10 Ids, called idstrings, using either the FetchDistinctBankPMIDStrings() or the FetchDistinctSourcePMIDStrings() functions. These both use the same 'partitiion over' mechanism to concatenate the PMIDs from 10 records (as grouped during the earlier search phase) into a single string, for example:

        public IEnumerable<string> FetchDistinctSourcePMIDStrings()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"select distinct string_agg(pmid, ', ') 
                        OVER ( PARTITION BY group_id) 
                        from pp.distinct_pmids_by_source;";
                return conn.Query<string>(sql_string);
            }
        }

Each idstring, when inserted into an API call, will therefore download 10 records at a time, but the other components of the API string first need to be constructed. In this case, because of the volume of download required, the base URL requires not only the address of the NLM fetch API, but also an indicator of the tool being used and a contact email (as per the NLM's data access rules). The API call is then made is then by adding the idstring and '&retmode=xml' to the base URL, and each individual article node is extracted from the returned XML. At this stage there is very little additional processing of the downloaded data, but a call is made to GetDateLastRevised function, which returns that date by parsing a few nodes in the XML. The last revised data is then checked against the cut-off date, unless this download is of type 121, which does not require a date parameter. All bank-linked PMIDs should pass the test (because they have been already been filtered), but the source reference PMIDs will be a mix and only require downloading if they are new or have been revised since the last download date. The source data objects table needs to be interrogated to discover whether the file is completely new to the system - in which case a new object source record is added and the XML is written out as a file to the appropriate folder - or an update of an existing file - in which case the object source record is updated and the XML is used to replace the file that is already there.

        public async Task<DownloadResult> DownloadPubmedEntriesAsync(IEnumerable<string> idstrings)
        {
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?";
            baseURL += "tool=ECRINMDR&email=steve.canham@ecrin.org&db=pubmed&id=";
            DownloadResult res = new DownloadResult();
            bool ignore_revision_date = (args.type_id == 121) ? true : false;

            try
            {
                // loop through the references - already in groups of 10 
                // from processing in the database call

                foreach (string idstring in idstrings)
                {
                    // Construct the fetch URL using the 10 Ids and
                    // retrieve the articles as nodes

                    string url = baseURL + idstring + "&retmode=xml";
                    XmlDocument xdoc = new XmlDocument();
                    string responseBody = await webClient.GetStringAsync(url);
                    xdoc.LoadXml(responseBody);
                    XmlNodeList articles = xdoc.GetElementsByTagName("PubmedArticle");

                    // Consider each article node in turn

                    foreach (XmlNode article in articles)
                    {
                        string pmid = article.SelectSingleNode("MedlineCitation/PMID").InnerText;
                        if (Int32.TryParse(pmid, out int ipmid))
                        {
                            // get current or new file download record, calculate
                            // and store last revised date. Write new or replace
                            // file and update file_record (by ref).
                            res.num_checked++;
                            DateTime? last_revised_datetime = GetDateLastRevised(article);
                            ObjectFileRecord file_record = logging_repo.FetchObjectFileRecord(pmid, source.id);
                            if (file_record == null)
                            {
                                string remote_url = "https://www.ncbi.nlm.nih.gov/pubmed/" + pmid;
                                file_record = new ObjectFileRecord(source.id, pmid, remote_url, saf_id);
                                file_record.last_revised = last_revised_datetime;
                                WriteNewFile(article, ipmid, file_record);
                                
                                logging_repo.InsertObjectFileRec(file_record);
                                res.num_added++;
                                res.num_downloaded++;
                            }
                            else
                            {
                                // normally should be less then but here <= to be sure
                                if (ignore_revision_date || 
                                   (last_revised_datetime != null 
                                           && file_record.last_downloaded <= last_revised_datetime))
                                {
                                    file_record.last_saf_id = saf_id;
                                    file_record.last_revised = last_revised_datetime;
                                    ReplaceFile(article, file_record);

                                    logging_repo.StoreObjectFileRec(file_record);
                                    res.num_downloaded++;
                                }
                            }

                            if (res.num_checked % 100 == 0) StringHelpers.SendFeedback(res.num_checked.ToString());
                        }
                    }

                    System.Threading.Thread.Sleep(800);
                }

                return res;
            }

            catch (HttpRequestException e)
            {
                StringHelpers.SendError("In PubMed DownloadPubmedEntriesUsingSourcesAsync(): " + e.Message);
                return res;
            }
        }

Data Harvest

Because no processing of the PubMed record takes place during download, all of the transformation of the data into the ECRIN metadata format has to take place during the harvesting stage. The PubMed data is relatively rich, but some additional processing is required to find the 'managing organisation' - the publisher - of the related journal article.
The data available in each file, and its structure, is described in PubMed Data Structure, which is derived from the data available on the PubMed web site. This describes the structure of the source data, identifies the main elements and their attributes, and indicates which are extracted within the MDR system.
The code for the Harvest process is available on the ECRIN GitHub site, so there is no attempt to go through all aspects of the data processing and transformation during harvesting. Instead some of the main points, issues and features are listed as bullet points below:

  • The initial purpose of the harvest process is to produce and store a 'citation object' rather than a 'data object'. Both are stored in the sd tables of the database, in tables of the same name. Data objects records are created before any import, from the Citation Object, so the data structures in the ad schema are the same as for all other data sources. This is because it is impossible to derive the Managing Organisation from the PubMed data alone, so additional processing is required before the Data Objects can be created.
  • As with other harvest operations, a blank Object is created at the beginning of the process, and the values of data points and attributes are gradually added to it, in this case by parsing the XML file, identifying element and attribute nodes, and reading the date values from them.
  • The main citation object created - for all PubMed metadata files - is the PubMed citation itself. In about a third of cases, the publicly available PMC article is also added to the system as an additional data object.
  • The processing of titles is relatively complex, partly because of the use of brackets and other indicators of different title origin, which need to be properly interpreted and processed, and partly because of so-called 'vernacular titles'.
  • It is essential to obtain electronic and print issn values for the journal, not because they are used in the ECRIN metadata schema but because they can be used to identify the journal publisher. This is done after the main harvest, by using lookup tables in the context database.
  • Dates are expressed in a variety of formats and need processing to convert to the more standard format used by the ECRIN metadata scheme. Some may represent a range rather than a single data, and some may be partial. A variety of utility functions are required for data processing.
  • Topic data comes from a variety of sections within the source material, and therefore requires aggregation into a standard structure within the harvesting process.
  • Date and identifiers can also be found in different parts of the XML source record. In some cases they can be duplicated. It is therefore necessary to check any addition of these types of data against what is already present.
  • The display title of the object is not the article title, but a full citation in the form of authors - article title - pagination and journal details. This citation therefore has to be constructed during extraction, in a consistent fashion. A variety of data points are extracted for only this purpose, and are not used elsewhere in the ECRIN metadata schema.

Data Aggregation

In the same way that two potential sources of study-PubMed links need to be considered when downloading Pubmed data, the same two sources of links need to be considered when aggregating that data. During downloading the links are used only to identify PubMed data requiring download since the last download event. During aggregation, the Pubmed data is considered to be already downloaded, and it is the links that are important, for all the PubMed data, in order to associate the article journal data objects with the correct studies.
Data aggregation, which includes PubMed aggregation as the final step, should therefore be done after all study based data sources have been brought up to date, and then after all PubMed data has been downloaded and updated, to ensure that the data in each study based data source is as complete as possible.
The key function here is ProcessStandaloneObjectIds(), which orchestrates the process by which the links study-article links are established. It makes use of a series of functions in a helper class called PubMedTransferHelper, as PubMed is currently the only object based data source, and therefore the only source where object ids are processed in a standalone fashion, i.e. without reference to any study data in the same data source. The key lines of this function are shown below.
The initial task, once the helper class has been instantiated and some 'workspace' temporary tables have been created, is to get the 'bank-related' data from the PubMed database itself - i.e. all the PubMed identifiers (PMIDs) and trial registry ids listed in the PubMed data itself, as found in the ad.object_db_links table. The SQL used for this selects only object-db data where the type of 'db' is a trial registry (and not a 'databank'). It returns the source id of the object as 100135, i.e. PubMed, but the source id of the study is also returned by joining the object_db_links back to the contextual list of PubMed databank sources (ctx.nlm_databanks). The returned set of links are stored in the temporary table already constructed for this purpose.

       public IEnumerable<PMIDLink> FetchBankPMIDs()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"select 
                        100135 as source_id, 
                        d.id as parent_study_source_id, 
                        k.sd_oid, k.id_in_db as parent_study_sd_sid, 
                        a.datetime_of_data_fetch
                        from pubmed_ad.object_db_links k
                        inner join pubmed_ad.data_objects a 
                        on k.sd_oid = a.sd_oid
                        inner join context_ctx.nlm_databanks d
                        on k.db_name = d.nlm_abbrev
                        where bank_type <> 'databank'";
                return conn.Query<PMIDLink>(sql_string);
            }
        }

Then the other source of study-PMID links is interrogated, looping through all sources that include 'study references'. This is a process very similar to that during data download - a SQL statement is run against each relevant database using FetchSourceReferences, the study reference links are obtained, and added to the same temporary table as the bank based links.

        public IEnumerable<PMIDLink> FetchSourceReferences(int source_id, string db_name)
        {
            builder.Database = db_name;
            string db_conn_string = builder.ConnectionString;

            using (var conn = new NpgsqlConnection(db_conn_string))
            {
                string sql_string = @"select 
                        100135 as source_id, " +
                        source_id.ToString() + @" as parent_study_source_id, 
                        r.pmid as sd_oid, r.sd_sid as parent_study_sd_sid, 
                        s.datetime_of_data_fetch
                        from ad.study_references r
                        inner join ad.studies s
                        on r.sd_sid = s.sd_sid
                        where r.pmid is not null;";
                return conn.Query<PMIDLink>(sql_string);
            }
        }

The distinct links between study and PubMed citations are then transferred to a table called nk.distinct_pmids. There are a few 'gotchas' to avoid here. Firstly some PMIDs obtained from the study source databases will have been formatted as a string of 24 characters (the default for all object ids, which are normally hash values). These need to be trimmed so that they can be compared properly with other PMIDs, in order that the following distinct select does eliminate all the genuine link duplications (i.e. that are in both types of link source). Then, after the distinct data has been transferred, the latest (maximum) datetime_of_data_fetch value has to be applied to the distinct record. Different copies of the same link, obtained at different trimes, will obviously contain different values for this - it is the latest that is needed, for future transcribing to the object links tables.

        public void FillDistinctPMIDsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // First ensure that any PMIDs (sd_oids) are in the same format
                // Some have a 'tail' of spaces after them, as the standard 
                // length of a sd_oid is 24 characters.
                
                string sql_string = @"UPDATE nk.temp_pmids
                         SET sd_oid = trim(sd_oid);";
                conn.Execute(sql_string);

                // Then transfer the distinct data

                sql_string = @"INSERT INTO nk.distinct_pmids(
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid)
                         SELECT distinct 
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid
                         FROM nk.temp_pmids;";
                conn.Execute(sql_string);

                // Update with latest datetime_of_data_fetch

                sql_string = @"UPDATE nk.distinct_pmids dp
                         set datetime_of_data_fetch = mx.max_fetch_date
                         FROM 
                         ( select sd_oid, parent_study_sd_sid, 
                           max(datetime_of_data_fetch) as max_fetch_date
                           FROM nk.temp_pmids
                           group by sd_oid, parent_study_sd_sid ) mx
                         WHERE dp.parent_study_sd_sid = mx.parent_study_sd_sid
                         and dp.sd_oid = mx.sd_oid;";
                conn.Execute(sql_string);
            }
        }

It is then important to try and tidy some of the worst data anomalies - in particular in the PMID values inserted into the bank data (which are entered manually in the source data). A latrge variety of string manipulations are carried out in functions CleanPMIDsdsidData1 to CleanPMIDsdsidData4. A few examples are shown below...

                sql_string = @"UPDATE nk.distinct_pmids
                     SET parent_study_sd_sid = Replace(parent_study_sd_sid, '/', '-')
                     WHERE parent_study_source_id = 100121;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                     SET parent_study_sd_sid = 'CTRI-' || parent_study_sd_sid
                     WHERE parent_study_source_id = 100121
                     and parent_study_sd_sid not ilike 'CTRI-%';";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRA-CT', 'EUDRACT')
                   WHERE parent_study_source_id = 100123;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRACT', '')
                    WHERE parent_study_source_id = 100123;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EURODRACT', '')
                   WHERE parent_study_source_id = 100123;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'ISRTN', 'ISRCTN')
                   WHERE parent_study_source_id = 100126;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'ISRNT', 'ISRCTN')
                   WHERE parent_study_source_id = 100126;";
                conn.Execute(sql_string);

                ...
                ... 

The processing from this point is shared by the general object id helper class and the specific Pubmed class. The latter is used to transfer the data to the same temp_object_ids table as used for objects in study based data sources - the data structures are the same. A general object transfer helper function, UpdateObjectsWithStudyIds, is then used to insert the integer study ids into the data.

      
        public void TransferPMIDLinksToObjectIds()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO nk.temp_object_ids(
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid, datetime_of_data_fetch)
                         SELECT  
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid, datetime_of_data_fetch
                         FROM nk.distinct_pmids";
                conn.Execute(sql_string);
            }
        }

        public void UpdateObjectsWithStudyIds(int source_id)
        {
            // Update the object parent study_id using the 'correct'
            // value found in the all_ids_studies table

            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"UPDATE nk.temp_object_ids t
                           SET parent_study_id = s.study_id, 
                           is_preferred_study = s.is_preferred
                           FROM nk.all_ids_studies s
                           WHERE t.parent_study_sd_sid = s.sd_sid
                           and t.parent_study_source_id = s.source_id;";
                conn.Execute(sql_string);

                // Drop those link records that cannot be matched

                sql_string = @"DELETE FROM nk.temp_object_ids
                             WHERE parent_study_id is null;";
                conn.Execute(sql_string);
            }
        }

The difficulty is that some of the studies in this table may be the 'less preferred' forms of tables that are in the system in multiple source databases. It is therefore necessary to interrogate the study_study_links table and replace the less preferred with the most preferred study identifiers. This in turn may result in duplicate records, so the table needs to be de-duplicated by creating a new table and 'selecting distinct' into it (not shown in the code below). This approach is necessary because once again the maximum of the datetime_of_data_fetch field is applied to the distinct record set, after that dataset is created. The datetime_of_data_fetch field therefore needs to be part of the table definition. The distinct version of the data is then renamed back to temp_object_ids.

   
        public void InputPreferredSDSIDS()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // replace any LHS sd_sids with the 'preferred' RHS

                string sql_string = @"UPDATE nk.temp_object_ids b
                               SET parent_study_sd_sid = preferred_sd_sid,
                               parent_study_source_id = preferred_source_id
                               FROM nk.study_study_links k
                               WHERE b.parent_study_sd_sid = k.sd_sid
                               and b.parent_study_source_id = k.source_id ;";
                conn.Execute(sql_string);

                ...
                ...
                ...
            }
        }

The PubMed study links are now complete, at least from the point of view of the study Ids, and can be added to the main all_ids_data_objects table. This gives each added data object an integer object_id (the identity value of the record).

   
        public void UpdateAllObjectIdsTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // ...

                string sql_string = @"INSERT INTO nk.all_ids_data_objects
                             (source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch)
                             select source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch
                             from nk.temp_object_ids";
                conn.Execute(sql_string);

                // ...

                sql_string = @"UPDATE nk.all_ids_data_objects
                            SET object_id = id
                            WHERE source_id = " + source_id.ToString() + @"
                            and object_id is null;";
                conn.Execute(sql_string);
            }
        }

With study based data sources this is the end of the processing - the data objects are assumed at this stage to be unique, and their source data ids, created as hashes will certainly be unique. For PubMed citations. however, this is not the case. The objects will often be duplicated - a journal article can be referenced by many studies. Their source data id (sd_oid) is the PubMed identifier (PMID) and will be constant across sources - it is therefore possible to identify and resolve the duplicates. For any particular PubMed Id therefore, that is in the table more than once, the object_id needs to be replaced by the minimum of the available set. The function ResetIdsOfDuplicatedPMIDs is used to accomplish this.

  
        public void ResetIdsOfDuplicatedPMIDs()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // Find the minimum object_id for each PMID in the table
                // source id for PubMed = 100135

                string sql_string = @"DROP TABLE IF EXISTS nk.temp_min_object_ids;
                                     CREATE TABLE nk.temp_min_object_ids as
                                     SELECT sd_oid, Min(id) as min_id
                                     FROM nk.all_ids_data_objects
                                     WHERE source_id = 100135
                                     GROUP BY sd_oid;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.all_ids_data_objects b
                               SET object_id = min_id
                               FROM nk.temp_min_object_ids m
                               WHERE b.sd_oid = m.sd_oid
                               and source_id = 100135;";
                conn.Execute(sql_string);

                sql_string = @"DROP TABLE nk.temp_min_object_ids;";
                conn.Execute(sql_string);
            }
        }

Now, finally, the data object links for the PubMed records are all correct. The final stage is to create the utility table temp_objects_to_add with the correct data, so it can be used during transfer of the data object data itself. After that the system drops the temporary tables used in the links processing, and proceeds to the standard transfer of data objects and their attributes, in this instance from the PubMed database.

 
        public void FillObjectsToAddTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO nk.temp_objects_to_add
                             (object_id, sd_oid)
                             SELECT distinct object_id, sd_oid 
                             FROM nk.all_ids_data_objects
                             WHERE source_id = " + source_id.ToString();
                conn.Execute(sql_string);
            }
        }