Difference between revisions of "Processing PubMed Data"

From ECRIN-MDR Wiki
Jump to navigation Jump to search
(Assembling lists of Pubmed references)
(Downloaded updated versions of Pubmed references)
 
(43 intermediate revisions by the same user not shown)
Line 6: Line 6:
 
Under normal circumstances downloads of PubMed data need to obtain only records that have been added to, or revised within, the PubMed system since the previous download. PubMed downloads therefore come with a cut-off date, that specifies only records added / edited on or after that date need be (re-) downloaded. In addition, they come with a search filter. The first is used to select records in PubMed that include 'bank' references to a trial registry. the second is used to assemble the reference lists already existing in the MDR, and use that list as the basis for checking revision dates in the PubMed database. In each case the 'type' of the saf (search and fetch) operation is '''114: All new or revised records with a filter applied (download)'''.<br/>
 
Under normal circumstances downloads of PubMed data need to obtain only records that have been added to, or revised within, the PubMed system since the previous download. PubMed downloads therefore come with a cut-off date, that specifies only records added / edited on or after that date need be (re-) downloaded. In addition, they come with a search filter. The first is used to select records in PubMed that include 'bank' references to a trial registry. the second is used to assemble the reference lists already existing in the MDR, and use that list as the basis for checking revision dates in the PubMed database. In each case the 'type' of the saf (search and fetch) operation is '''114: All new or revised records with a filter applied (download)'''.<br/>
 
In the first, bank based, download the filter parameter is  '''10003: PubMed abstracts with references to any trial registry''', whilst in the second, references based download it is '''10004: Identifies PubMed references in Study sources that have not yet been downloaded'''.<br/>
 
In the first, bank based, download the filter parameter is  '''10003: PubMed abstracts with references to any trial registry''', whilst in the second, references based download it is '''10004: Identifies PubMed references in Study sources that have not yet been downloaded'''.<br/>
Variants of these downloads exist using type '''121: Filtered records (download)'''. With filter 10003 this will cause the download of all bank-linked records to be downloaded, irrespective of date, whilst with filter 10004 all records referenced in study sources will be downloaded, irrespective of revision date. Use of this download type may be used occasionally, e.g. annually, after the Pubmed database itself is revised, to ensure synchronisation with the source.
+
Variants of these downloads exist using type '''121: Filtered records (download)'''. With filter 10003 this will cause the download of ''all'' bank-linked records to be downloaded, irrespective of date, whilst with filter 10004 ''all'' records referenced in study sources will be downloaded, irrespective of revision date. Use of this download type may be used occasionally, e.g. annually, after the Pubmed database itself is revised, to ensure synchronisation with the source.
In all cases the download mechanism is divided into two parts:
+
In all cases the download mechanism makes use of the 'Entrez' utilities API made available by the NCBI in the US, which supports PubMed along with many other (largely -omic) databases. <br/>
* the creation of a list of the relevant PMID identifiers, from a search of PubMed or from examining and collecting the reference data in the MDR
+
(see Sayers E. A General Introduction to the E-utilities. In: Entrez Programming Utilities Help [Internet]. Bethesda (MD): National Center for Biotechnology Information (US); 2010-. Available from: https://www.ncbi.nlm.nih.gov/books/NBK25497/).<br/>
* the download of the relevant files from PubMed, based on the list of PMIDs.
 
In the case of the bank-linked search, the application of the cut-off date forms part of the search parameters. In the case of the reference-linked search, the cut-off date is applied during the download process.
 
 
 
 
====Searching Pubmed for Bank linked records====
 
====Searching Pubmed for Bank linked records====
The PubMed API allows records revised or added in a date range to be identified, and at the same time allows the search results to be filtered to include only those with reference to a ''specific'' databank, but not to ''any'' databank. It is therefore necessary to loop through each of the possible trial registry 'banks' and obtain the relevant records for each, adding these to a table each time. In the code, shown below, the base URL for the Pubmed site is first established, along with other variables - retmax gives the maximum number of record ids to be returned in any search call, while include_dates uses the type_id parameter to determine if a cu-off date is to be applied. The various database tables required are set up, and then a list of the trial registries is retrieved from the database as the variable banks.<br/>
+
The simpler task is to identify the records in PubMed that have a reference to a 'databank', in this context a trial registry, that have been revised or added since the last download (which provides the 'cut-off date').
The API call is then constructed, using the base URL, the trial registry id (as known to the API, in the code called nlm_abbrev), followed by '[SI]', which identifies this parameter as referring to a particular databank, and then if required (they normally are) the cut off and current dates as the minimum and maximum dates of the selected date range against the 'mdat' parameterthe process for each databask, in the , in a yyyy/MM/dd format. The total number of PubMed Ids to be retrieved for this databank is retrieved by an initial API call, which examines the Count variable in the returned XML, and the number of calls required is then calculated using retmax.<br/>
+
The code is shown below. After some initial variable initialisations, and the conversion of the cut-off date into a suitable search parameter, each of the possible databanks is considered in turn.  
The system then loops through each of the required calls, waiting 2 seconds between each, adding the start position and maximum size parameters for the returned id list. The sort of API string that results is shown below.<br/>
+
For each a search URL is composed, that consists of
''https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=ClinicalTrials.gov[SI]&mindate=2020/10/01&maxdate=2020/11/20&datetype=mdat&retstart=1&retmax=1000''<br/>
+
* the base URL of the Entrez Search engine
The record Ids found are stored in the temp_pmids_by_bank table, and the accumulated records are transferred, at the end of each process, in the pmids_by_bank_total table.
+
* the database name ('pubmed')
 +
* An API 'key'
 +
* A query term, that consists of
 +
** the name of the databank, as listed by NCBI, plus '{SI]', which identifies the type of parameter as a databank, plus
 +
** the date range to be searched for a relevant revision date, in the required format
 +
* a flag that indicates that the results should be stored in the Entrez system, rather than being immediately downloaded, on what is referred to as the 'History' server. This allows intermediate results to be stored at the NCBI rather than being downloaded, and is a powerful part of the available Entrez workflow.
 +
The 'API key' is an additional parameter provided by NCBI to individual users and serves to identify the API user - it is available free of charge after registration with the NCBI and is a hex digit string, 36 digits long. It should be included in all regular Entrez API calls, and doing so increases the allowed response frequency of the system - up to 10 calls can be made a second as long as an API key is present. The key is stored, like other sensitive data, in the ''appsettings.json'' file and read from there at programme startup - it can then be accessed through the logging data layer (logging_repo). It is not visible in the code on GitHub.<br/>
 +
The relevant lines of code are:
 
<div style="font-family: monospace; font-size: 13px" >
 
<div style="font-family: monospace; font-size: 13px" >
 
<pre>
 
<pre>
         public async Task CreatePMIDsListfromBanksAsync()
+
string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
 +
search_term = "&term=" + s.nlm_abbrev + "[SI]" + date_term;
 +
string search_url = search_baseURL + api_key + search_term + "&usehistory=y";
 +
</pre>
 +
</div>
 +
The response to this API call contains, amongst other things,
 +
* The number of records found
 +
* an integer 'query key'
 +
* a string 'web environment'
 +
The latter two are necessary to reference the list of Ids that have been stored on the Entrez History server. <br/>
 +
This is done in the next stage of the routine. Assuming there are any records at all (numbers vary from 0 to several hundred, if the search period is about a week, for each trial registry) the a 'fetch' API call is constructed, downloading up to 100 records at a time, The URL needs to include
 +
* the base URL of the Entrez Fetch engine
 +
* the database name ('pubmed')
 +
* The API 'key'
 +
* A WebEnv parameter that includes the web environment returned from the search query
 +
* A query_key parameter that includes the query keyenvironment returned from the search query
 +
* 'retstart' and 'retmax' (=100) parameters that indicate which 100 records should be returned (added within a loop)
 +
* A 'retmode' parameter that indicates that the data should be returned as concatenated XML files.
 +
The relevant code is below:
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
 +
fetch_baseURL += api_key + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
 +
numCallsNeeded = (int)(totalRecords / retmax) + 1;
 +
for (int i = 0; i < numCallsNeeded; i++)
 +
{
 +
      ...
 +
      string fetch_URL = fetch_baseURL + "&retstart=" + (i * retmax).ToString() + "&retmax=" + retmax.ToString();
 +
      fetch_URL += "&retmode=xml";
 +
      ...
 +
</pre>
 +
</div>
 +
Once the URL is constructed it is passed to the FetchPubMedRecordsAsync function, along with a 'download result' object, that accumulates the results of eachdownload and which ultimately is returned at the end of the whole process. The FetchPubMedRecordsAsync function calls the API and - assuming data is returned - deserialises it to up to 100 separate pubmed 'Articles' (= citations). Each of these is then compared to the object source data table in the 'mon' database (sf.source_data_objects). If a new citation it is written out as a new file and a new record is added to the object source table. If not it replaces the existing file and the record in the object source table is updated. <br/>
 +
The code for the whole top level function is shown below.
 +
<div style="font-family: monospace; font-size: 13px" >
 +
<pre>
 +
         public async Task<DownloadResult> ProcessPMIDsListfromBanksAsync()
 
         {
 
         {
             int totalRecords, numCallsNeeded, bank_id;
+
             int totalRecords = 0, numCallsNeeded = 0, bank_id = 0;
             int retmax = 1000;
+
             string search_term = "", date_term = "";
             string search_term;
+
             DownloadResult res = new DownloadResult();
             CopyHelpers helper = new CopyHelpers();
+
             XmlSerializer xSerializer = new XmlSerializer(typeof(eSearchResult));
             string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term=";
+
 
            bool include_dates = (args.type_id == 114) ? true : false;
+
             // This search can be (and usually is) date sensitive.
 +
 
 +
            if (args.type_id == 114)
 +
            {
 +
                string today = DateTime.Now.ToString("yyyy/MM/dd");
 +
                string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
 +
                date_term = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
 +
            }
  
             pubmed_repo.SetUpTempPMIDsByBankTable();
+
             // Get list of potential linked data banks (includes trial registries).
            pubmed_repo.SetUpBankPMIDsTable();
 
            pubmed_repo.SetUpDistinctBankPMIDsTable();
 
  
            // Get list of potential linked data banks (includes trial registries)
 
 
             IEnumerable<PMSource> banks = pubmed_repo.FetchDatabanks();
 
             IEnumerable<PMSource> banks = pubmed_repo.FetchDatabanks();
 
 
             foreach (PMSource s in banks)
 
             foreach (PMSource s in banks)
 
             {
 
             {
 
                 // get databank details
 
                 // get databank details
 +
 
                 bank_id = s.id;
 
                 bank_id = s.id;
                 search_term = s.nlm_abbrev + "[SI]";
+
                 string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
                if (include_dates)
+
                search_term = "&term=" + s.nlm_abbrev + "[SI]" + date_term;
                {
+
                 string search_url = search_baseURL + api_key + search_term + "&usehistory=y";
                    string today = DateTime.Now.ToString("yyyy/MM/dd");
 
                    string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
 
                    string date_term = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
 
                    search_term += date_term;
 
                }
 
                 string url = baseURL + search_term;
 
 
 
  
 
                 // Get the number of total records that have this databank reference
 
                 // Get the number of total records that have this databank reference
                 // and calculate the loop parameters
+
                 // and that (usually) have been revised recently
                totalRecords = await pubmed_repo.GetBankDataCountAsync(url);
+
                 // and calculate the loop parameters.
                 numCallsNeeded = (int)(totalRecords / retmax) + 1;
 
                pubmed_repo.TruncateTempPMIDsByBankTable();
 
  
                 // loop through the records and obtain and store relevant
+
                 string responseBody = await ch.GetStringFromURLAsync(search_url);
                // records retmax (= 1000) at a time
+
                 if (responseBody != null)
 
 
                 for (int i = 0; i < numCallsNeeded; i++)
 
 
                 {
 
                 {
                     try
+
                     using (TextReader reader = new StringReader(responseBody))
 
                     {
 
                     {
                         int start = i * 1000;
+
                         // The eSearchResult class corresponds to the returned data.
                        string selectedRecords = "&retstart=" + start.ToString() + "&retmax=" + retmax.ToString();
 
  
                         // Put a 2 second pause before each call.
+
                         eSearchResult result = (eSearchResult)xSerializer.Deserialize(reader);
                         await Task.Delay(2000);
+
                         if (result != null)
                        string responseBody = await webClient.GetStringAsync(url + selectedRecords);
+
                        {
 +
                            totalRecords = result.Count;
 +
                            query_key = result.QueryKey;
 +
                            web_env = result.WebEnv;
 +
                        }
 +
                    }
  
                        // The eSearchResult class allows the returned json string to be easily deserialised
+
                    // loop through the records and obtain and store relevant
                        // and the required values, of each Id in the IdList, can then be read.
+
                    // records, of PubMed Ids, retmax (= 100) at a time   
  
                         XmlSerializer xSerializer = new XmlSerializer(typeof(eSearchResult));
+
                    if (totalRecords > 0)
                         using (TextReader reader = new StringReader(responseBody))
+
                    {
 +
                         int retmax = 100;
 +
                        string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
 +
                        fetch_baseURL += api_key + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
 +
                        numCallsNeeded = (int)(totalRecords / retmax) + 1;
 +
                         for (int i = 0; i < numCallsNeeded; i++)
 
                         {
 
                         {
                             eSearchResult result = (eSearchResult)xSerializer.Deserialize(reader);
+
                             try
                            if (result != null)
 
 
                             {
 
                             {
                                 var FoundIds = Array.ConvertAll(result.IdList, ele => new PMIDByBank(ele.ToString()));
+
                                 // Retrieve the articles as nodes.
                                 pubmed_repo.StorePMIDsByBank(helper.bank_ids_helper, FoundIds);
+
 
 +
                                string fetch_URL = fetch_baseURL + "&retstart=" + (i * retmax).ToString() + "&retmax=" + retmax.ToString();
 +
                                fetch_URL += "&retmode=xml";
 +
                                await FetchPubMedRecordsAsync(fetch_URL, res);
 +
                                 System.Threading.Thread.Sleep(300);
 +
                            }
  
                                 string feedback = "Storing " + retmax.ToString() +
+
                            catch (HttpRequestException e)
                                                  " records from " + start.ToString() + " in bank " + search_term;
+
                            {
                                 StringHelpers.SendFeedback(feedback);
+
                                 logging_repo.LogError("In PubMed ProcessPMIDsListfromBanksAsync(): " + e.Message);
 +
                                 return null;
 
                             }
 
                             }
 
                         }
 
                         }
 
                     }
 
                     }
  
                     catch (HttpRequestException e)
+
                     logging_repo.LogLine("Processed " + totalRecords.ToString() + " from " + s.nlm_abbrev);
                    {
 
                        StringHelpers.SendError("In PubMed CreatePMIDsListfromBanksAsync(): " + e.Message);
 
                    }
 
 
                 }
 
                 }
 
                // transfer across to total table...
 
                pubmed_repo.TransferBankPMIDsToTotalTable(s.nlm_abbrev);
 
 
             }
 
             }
  
             pubmed_repo.DropTempPMIDByBankTable();
+
             return res;
            pubmed_repo.FillDistinctBankPMIDsTable();
+
         }
         }  
 
 
</pre>
 
</pre>
 
</div>
 
</div>
At the end of the process the pmids_by_bank_total table has a record of each PMID that meets the original search criteria. A call is then made to the FillDistinctBankPMIDsTable procedure. This stores the ''distinct'' PMIDs found into the previously created  distinct_pmids_by_bank table, grouping those PMIDs into 10s (using the table's identity column identity column and integer division). The PMIDs are then prepared for the next, file download, stage.
+
 
 +
====Downloaded updated versions of Pubmed references====
 +
In this second form of PubMed download, the Pubmed references that are listed within study based data sources (especially but not exclusively ClinicalTrials.gov) are checked to see if they have been updated or added during the specified period, normally since the last download of this type. This is a more complex task that has to be done in 4 stages:<br/>
 +
* Assembly of PMIDs referenced in source databases =>
 +
* Upload of Ids to the Entrez History server =>
 +
* Search of uploaded Ids for those revised or added in the specified period =>
 +
* Download of any 'qualifying' Pubmed records.
 +
The most straightforward part is the assembly of the PMIDs (PubMed Ids) referenced in some study based data sources, by collecting this data from 'ad.study_references' tables, in the relevant databases. '''N.B. Because this data should be as up to date as possible this download should not occur until after download / harvest / import cycles have been completed for the study based data sources.''' The PMIDs are collected and transferred to a single table, and are then aggregated into comma separated lists of 100 Ids each. Because about 95,000 PMID records exist in source databases the resulting table has about 950 rows. They are assembled in this fashion because each id list will later be inserted into a URL. The code, with in-line commenting removed for brevity, is shown below. The SQL statements that create the '100 Id' strings are also shown, in the CreatePMID_IDStrings function. Here an identity Id column in the distinct_pmids table is first used to number each record, and this number is then used to split them up into groups of 100, each of which which can be aggregated. A select distinct is used on the resulting table to reduce the row count by a factor of 100.
 
<div style="font-family: monospace; font-size: 13px" >
 
<div style="font-family: monospace; font-size: 13px" >
 
<pre>
 
<pre>
      public void SetUpDistinctBankPMIDsTable()
+
        public void CreatePMIDsListfromSources()
 
         {
 
         {
             using (var conn = new NpgsqlConnection(connString))
+
             ...
 +
            pubmed_repo.SetUpTempPMIDsBySourceTables();
 +
            CopyHelpers helper = new CopyHelpers();
 +
            IEnumerable<PMIDBySource> references;
 +
            ...
 +
            IEnumerable<Source> sources = pubmed_repo.FetchSourcesWithReferences();
 +
            foreach (Source s in sources)
 
             {
 
             {
                 string sql_string = @"DROP TABLE IF EXISTS pp.distinct_pmids_by_bank;
+
                 references = pubmed_repo.FetchSourceReferences(s.database_name);
                      CREATE TABLE IF NOT EXISTS pp.distinct_pmids_by_bank(
+
                 pubmed_repo.StorePmidsBySource(helper.source_ids_helper, references);
                        identity int GENERATED ALWAYS AS IDENTITY
 
                      , group_id int
 
                      , pmid varchar)";
 
                 conn.Execute(sql_string);
 
 
             }
 
             }
 +
            pubmed_repo.CreatePMID_IDStrings();
 
         }
 
         }
  
         public void FillDistinctBankPMIDsTable()
+
         public void CreatePMID_IDStrings()
 
         {
 
         {
 
             using (var conn = new NpgsqlConnection(connString))
 
             using (var conn = new NpgsqlConnection(connString))
 
             {
 
             {
                 string sql_string = @"INSERT INTO pp.distinct_pmids_by_bank(
+
                 string sql_string = @"INSERT INTO pp.distinct_pmids(
 
                           pmid)
 
                           pmid)
 
                           SELECT DISTINCT pmid
 
                           SELECT DISTINCT pmid
                           FROM pp.pmids_by_bank_total
+
                           FROM pp.pmids_by_source_total
 
                           ORDER BY pmid;";
 
                           ORDER BY pmid;";
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
  
                 sql_string = @"Update pp.distinct_pmids_by_bank
+
                 sql_string = @"Update pp.distinct_pmids
                           SET group_id = identity / 10;";
+
                           SET group_id = identity / 100;";
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
            }
 
        }
 
</pre>
 
</div>
 
  
====Assembling lists of Pubmed references====
+
                // fill the id list (100 ids in each striong)
The study_reference records are collected - one database at a time - and transferred to a single table.
+
                sql_string = @"INSERT INTO pp.pmid_id_strings(
 
+
                        id_string)
 
+
                        SELECT DISTINCT string_agg(pmid, ', ')  
<div style="font-family: monospace; font-size: 13px" >
+
                        OVER (PARTITION BY group_id)  
<pre>
+
                        from pp.distinct_pmids;";
        public void CreatePMIDsListfromSources()
+
                 conn.Execute(sql_string);
        {
 
            // Establish tables and support objects
 
 
 
            pubmed_repo.SetUpTempPMIDsBySourceTable();
 
            pubmed_repo.SetUpSourcePMIDsTable();
 
            pubmed_repo.SetUpDistinctSourcePMIDsTable();
 
            CopyHelpers helper = new CopyHelpers();
 
            IEnumerable<PMIDBySource> references;
 
 
 
            // Loop threough the study databases that hold
 
            // study_reference tables, i.e. with pmid ids
 
            IEnumerable<Source> sources = pubmed_repo.FetchSourcesWithReferences();
 
            foreach (Source s in sources)
 
            {
 
                pubmed_repo.TruncateTempPMIDsBySourceTable();
 
                references = pubmed_repo.FetchSourceReferences(s.database_name);
 
                 pubmed_repo.StorePmidsBySource(helper.source_ids_helper, references);
 
                pubmed_repo.TransferSourcePMIDsToTotalTable(s.id);
 
 
             }
 
             }
 
            pubmed_repo.FillDistinctSourcePMIDsTable();
 
            pubmed_repo.DropTempPMIDBySourceTable();
 
 
         }
 
         }
 
</pre>
 
</pre>
 
</div>
 
</div>
 
+
The routine above is called near the beginning of the main routine, after key variables have been initialised and the URLs used for posting, searching and fetching data have been partially constructed. The routine then proceeds to upload each batch of 100 Ids to the Entrez history server. For each batch uploaded the system receives, in response, a 'web environment' string and an integer query key. These are fed into the next stage. <br/>
====Download operations====
+
If no data parameters have been supplied (i.e. this is a general re-download of all referenced PMIDs) the system simply fetches all 100 listed Pubmed articles by sending a constructed fetch URL to the FetchPubMedRecordsAsync function, which as described above retrieves the data, splits it into its constituent Pubmed citations, and then either writes new or replaces the relevant XML file.<br/>
 +
If there is a time period specified, as is usually the case, the 100 listed citations are first searched to see if any have been updated or revised in that time. In the great majority of cases there are none, but in a few hundred cases usually 1 record is identified. The results are retained on the History Server, and if the relevant record count is 1 or more the identified Pubmed record(s) is downloaded. Each search stage also generates a 'web environment' string and an integer query key, and these must be used in any subsequent fetch operation.<br/>
 +
All the API URLs are constructed in a similar way - beginning with
 +
* the relevant base URL for the service required (post, search or fetch)
 +
* the name of the database (pubmed)
 +
* The API key
 +
The post URL simply has the Id list appended in each case.<br/>
 +
The search URL has to use the query key and web environment from the post to identify the data to search. Note that this has to be done using a very specific syntax: the search 'term' parameter must start with the query key preceded by a '#' (coded as %23 in the URL) and followed by '+AND+' and then any other parameters - here that constructed from the cut-off date. The URL also has to end with '&usehistory=y' to ensure the results stay on the Entrez server. <br/>
 +
The fetch URL also needs the query key and web environment from the search (with a date) or post (without a date) to download the Pubmed records. It has a max return value of 100 articles added (the default is 20) and the return mode must be specified as XML. In either case the URL is sent to the FetchPubMedRecordsAsync which does the download and subsequent file revision. The whole top level routine is shown below. Note the inclusion of pauses (thread sleeps of 200 milliseconds) at various points, to ensure that accesses stay well within the limit of 10 per second.
 
<div style="font-family: monospace; font-size: 13px" >
 
<div style="font-family: monospace; font-size: 13px" >
 
<pre>
 
<pre>
         public async Task<DownloadResult> DownloadPubmedEntriesAsync(IEnumerable<string> idstrings)
+
         public async Task<DownloadResult> ProcessPMIDsListfromDBSourcesAsync()
 
         {
 
         {
            string baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?";
 
            baseURL += "tool=ECRINMDR&email=steve.canham@ecrin.org&db=pubmed&id=";
 
 
             DownloadResult res = new DownloadResult();
 
             DownloadResult res = new DownloadResult();
             bool ignore_revision_date = (args.type_id == 121) ? true : false;
+
             XmlSerializer post_xSerializer = new XmlSerializer(typeof(ePostResult));
 
+
            XmlSerializer search_xSerializer = new XmlSerializer(typeof(eSearchResult));
 +
            string date_string = "";
 +
            int string_num = 0;
 
             try
 
             try
 
             {
 
             {
                 // loop through the references - already in groups of 10
+
                 // Set up bases of search strings
                // from processing in the database call
 
  
                 foreach (string idstring in idstrings)
+
                 string post_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/epost.fcgi?db=pubmed";
 +
                post_baseURL += api_key;
 +
                string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
 +
                search_baseURL += api_key;
 +
                if (args.type_id == 114)
 
                 {
 
                 {
                     // Construct the fetch URL using the 10 Ids and
+
                     string today = DateTime.Now.ToString("yyyy/MM/dd");
                     // retrieve the articles as nodes
+
                     string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
 +
                    date_string = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
 +
                }
 +
                string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
 +
                fetch_baseURL += api_key;
 +
                string post_URL, search_URL, fetch_URL;
  
                    string url = baseURL + idstring + "&retmode=xml";
+
                // Make a list of all PMIDs in the relevant DBs,
                    XmlDocument xdoc = new XmlDocument();
+
                // as PMID strings 100 ids long. Then take each string
                    string responseBody = await webClient.GetStringAsync(url);
+
                // and post it to the Entry history server
                    xdoc.LoadXml(responseBody);
+
                // getting back the web environment and query key parameters
                    XmlNodeList articles = xdoc.GetElementsByTagName("PubmedArticle");
 
  
                    // Consider each article node in turn
+
                CreatePMIDsListfromSources();
  
                    foreach (XmlNode article in articles)
+
                IEnumerable<string> idstrings = pubmed_repo.FetchSourcePMIDStrings();
 +
                foreach (string idstring in idstrings)
 +
                {
 +
                    // Construct the post URL using the 100 Ids
 +
                    string_num++;
 +
                    post_URL = post_baseURL + "&id=" + idstring;
 +
                    System.Threading.Thread.Sleep(200);
 +
                    string post_responseBody = await ch.GetStringFromURLAsync(post_URL);
 +
                    if (post_responseBody != null)
 
                     {
 
                     {
                         string pmid = article.SelectSingleNode("MedlineCitation/PMID").InnerText;
+
                         using (TextReader post_reader = new StringReader(post_responseBody))
                        if (Int32.TryParse(pmid, out int ipmid))
 
 
                         {
 
                         {
                             // get current or new file download record, calculate
+
                             // The eSearchResult class corresponds to the returned data.
                            // and store last revised date. Write new or replace
+
                             ePostResult post_result = (ePostResult)post_xSerializer.Deserialize(post_reader);
                             // file and update file_record (by ref).
+
                             if (post_result != null)
                            res.num_checked++;
 
                            DateTime? last_revised_datetime = GetDateLastRevised(article);
 
                            ObjectFileRecord file_record = logging_repo.FetchObjectFileRecord(pmid, source.id);
 
                             if (file_record == null)
 
 
                             {
 
                             {
                                 string remote_url = "https://www.ncbi.nlm.nih.gov/pubmed/" + pmid;
+
                                 // search the articles in these ids for recent revisions
                                 file_record = new ObjectFileRecord(source.id, pmid, remote_url, saf_id);
+
 
                                 file_record.last_revised = last_revised_datetime;
+
                                 query_key = post_result.QueryKey;
                                 WriteNewFile(article, ipmid, file_record);
+
                                 web_env = post_result.WebEnv;
                                  
+
                                 if (date_string == "")
                                logging_repo.InsertObjectFileRec(file_record);
+
                                 {
                                res.num_added++;
+
                                    // No need to search - fetch all 100 pubmed records immediately
                                res.num_downloaded++;
+
                                    fetch_URL = fetch_baseURL + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
                            }
+
                                    fetch_URL += "&retmax=100&retmode=xml";
                            else
+
                                    System.Threading.Thread.Sleep(200);
                            {
+
                                    await FetchPubMedRecordsAsync(fetch_URL, res);
                                 // normally should be less then but here <= to be sure
+
                                 }
                                 if (ignore_revision_date ||
+
                                 else
                                  (last_revised_datetime != null
 
                                          && file_record.last_downloaded <= last_revised_datetime))
 
 
                                 {
 
                                 {
                                     file_record.last_saf_id = saf_id;
+
                                     // search for those that have been revised on or since the cutoff date
                                     file_record.last_revised = last_revised_datetime;
+
 
                                     ReplaceFile(article, file_record);
+
                                    search_URL = search_baseURL + "&term=%23" + query_key.ToString() + "+AND+" + date_string;
 +
                                    search_URL += "&WebEnv=" + web_env + "&usehistory=y";
 +
 
 +
                                     System.Threading.Thread.Sleep(200);
 +
                                    string search_responseBody = await ch.GetStringFromURLAsync(search_URL);
 +
                                     if (search_responseBody != null)
 +
                                    {
 +
                                        int totalRecords = 0;
 +
                                        using (TextReader search_reader = new StringReader(search_responseBody))
 +
                                        {
 +
                                            // The eSearchResult class corresponds to the returned data.
 +
                                            eSearchResult search_result = (eSearchResult)search_xSerializer.Deserialize(search_reader);
 +
                                            if (search_result != null)
 +
                                            {
 +
                                                totalRecords = search_result.Count;
 +
                                                query_key = search_result.QueryKey;
 +
                                                web_env = search_result.WebEnv;
  
                                    logging_repo.StoreObjectFileRec(file_record);
+
                                                if (totalRecords > 0)
                                    res.num_downloaded++;
+
                                                {
 +
                                                    fetch_URL = fetch_baseURL + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
 +
                                                    fetch_URL += "&retmax=100&retmode=xml";
 +
                                                    System.Threading.Thread.Sleep(200);
 +
                                                    await FetchPubMedRecordsAsync(fetch_URL, res);
 +
                                                }
 +
                                            }
 +
                                        }
 +
                                    }
 
                                 }
 
                                 }
 
                             }
 
                             }
 
                            if (res.num_checked % 100 == 0) StringHelpers.SendFeedback(res.num_checked.ToString());
 
 
                         }
 
                         }
 
                     }
 
                     }
  
                     System.Threading.Thread.Sleep(800);
+
                     if (string_num % 10 == 0) logging_repo.LogLine(string_num.ToString() + " lines checked");
 
                 }
 
                 }
  
Line 251: Line 314:
 
             catch (HttpRequestException e)
 
             catch (HttpRequestException e)
 
             {
 
             {
                 StringHelpers.SendError("In PubMed DownloadPubmedEntriesUsingSourcesAsync(): " + e.Message);
+
                 logging_repo.LogError("In PubMed ProcessPMIDsListfromDBSourcesAsync(): " + e.Message);
                 return res;
+
                 return null;
 
             }
 
             }
        }
 
</pre>
 
</div>
 
  
<div style="font-family: monospace; font-size: 13px" >
 
<pre>
 
        private DateTime? GetDateLastRevised(XmlNode article)
 
        {
 
            DateTime? date_last_revised = null;
 
 
            string year = article.SelectSingleNode("MedlineCitation/DateRevised/Year").InnerText ?? "";
 
            string month = article.SelectSingleNode("MedlineCitation/DateRevised/Month").InnerText ?? "";
 
            string day = article.SelectSingleNode("MedlineCitation/DateRevised/Day").InnerText ?? "";
 
 
            if (year != "" && month != "" && day != "")
 
            {
 
                if (Int32.TryParse(year, out int iyear)
 
                && Int32.TryParse(month, out int imonth)
 
                && Int32.TryParse(day, out int iday))
 
                {
 
                    date_last_revised = new DateTime(iyear, imonth, iday);
 
                }
 
            }
 
            return date_last_revised;
 
 
         }
 
         }
 
</pre>
 
</pre>
 
</div>
 
</div>
  
<div style="font-family: monospace; font-size: 13px" >
+
===Data Harvest===
<pre>
+
Because no processing of the PubMed record takes place during download, all of the transformation of the data into the ECRIN metadata format has to take place during the harvesting stage. The PubMed data is relatively rich, but some additional processing is required to find the 'managing organisation' - the publisher - of the related journal article.<br/>
        private void WriteNewFile(XmlNode article, int ipmid, ObjectFileRecord file_record)
+
The data available in each file, and its structure, is described in '''[[PubMed Data Structure]]''', which is derived from the data available on the PubMed web site. This describes the structure of the source data, identifies the main elements and their attributes, and indicates which are extracted within the MDR system.<br/>
        {
+
The code for the Harvest process is available on the ECRIN GitHub site, so there is no attempt to go through all aspects of the data processing and transformation during harvesting. Instead some of the main points, issues and features are listed as bullet points below:<br/>
            string folder_name = Path.Combine(source.local_folder, "PM" + (ipmid / 10000).ToString("00000") + "xxxx");
+
* The initial purpose of the harvest process is to produce and store a 'citation object' rather than a 'data object'. Both are stored in the sd tables of the database, in tables of the same name. Data objects records are created before any import, from the Citation Object, so the data structures in the ad schema are the same as for all other data sources. This is because it is impossible to derive the Managing Organisation from the PubMed data alone, so additional processing is required before the Data Objects can be created.
            if (!Directory.Exists(folder_name))
+
* As with other harvest operations, a blank Object is created at the beginning of the process, and the values of data points and attributes are gradually added to it, in this case by parsing the XML file, identifying element and attribute nodes, and reading the date values from them.
            {
+
* The main citation object created - for all PubMed metadata files - is the PubMed citation itself. In about a third of cases, the publicly available PMC article is also added to the system as an additional data object.
                Directory.CreateDirectory(folder_name);
+
* The processing of titles is relatively complex, partly because of the use of brackets and other indicators of different title origin, which need to be properly interpreted and processed, and partly because of so-called 'vernacular titles'.
            }
+
* It is essential to obtain electronic and print issn values for the journal, not because they are used in the ECRIN metadata schema but because they can be used to identify the journal publisher. This is done after the main harvest, by using lookup tables in the context database.
            string filename = "PM" + ipmid.ToString("000000000") + ".xml";
+
* Dates are expressed in a variety of formats and need processing to convert to the more standard format used by the ECRIN metadata scheme. Some may represent a range rather than a single data, and some may be partial. A variety of utility functions are required for data processing.
            string full_path = Path.Combine(folder_name, filename);
+
* Topic data comes from a variety of sections within the source material, and therefore requires aggregation into a standard structure within the harvesting process.
 
+
* Date and identifiers can also be found in different parts of the XML source record. In some cases they can be duplicated. It is therefore necessary to check any addition of these types of data against what is already present.
            using (XmlWriter writer = XmlWriter.Create(full_path, settings))
+
* The display title of the object is not the article title, but a full citation in the form of authors - article title - pagination and journal details. This citation therefore has to be constructed during extraction, in a consistent fashion. A variety of data points are extracted for only this purpose, and are not used elsewhere in the ECRIN metadata schema.
            {
 
                article.WriteTo(writer);
 
            }
 
 
 
            file_record.local_path = full_path;
 
            file_record.download_status = 2;
 
            file_record.last_downloaded = DateTime.Now;
 
        }
 
</pre>
 
</div>
 
  
 +
===Data Aggregation===
 +
In the same way that two potential sources of study-PubMed links need to be considered when downloading Pubmed data, the same two sources of links need to be considered when aggregating that data. During downloading the links are used only to identify PubMed data requiring download since the last download event. During aggregation, the Pubmed data is considered to be already downloaded, and it is the links that are important, for all the PubMed data, in order to associate the article journal data objects with the correct studies.<br/>
 +
Data aggregation, which includes PubMed aggregation as the final step, should therefore be done after all study based data sources have been brought up to date, and then after all PubMed data has been downloaded and updated, to ensure that the data in each study based data source is as complete as possible.<br/>
 +
The key function here is ProcessStandaloneObjectIds(), which orchestrates the process by which the links study-article links are established. It makes use of a series of functions in a helper class called PubMedTransferHelper, as PubMed is currently the only object based data source, and therefore the only source where object ids are processed in a standalone fashion, i.e. without reference to any study data in the same data source. The key lines of this function are shown below.<br/>
 +
The initial task, once the helper class has been instantiated and some 'workspace' temporary tables have been created, is to get the 'bank-related' data from the PubMed database itself - i.e. all the PubMed identifiers (PMIDs) and trial registry ids listed in the PubMed data itself, as found in the ad.object_db_links table. The SQL used for this selects only object-db data where the type of 'db' is a trial registry (and not a 'databank'). It returns the source id of the object as 100135, i.e. PubMed, but the source id of the study is also returned by joining the object_db_links back to the contextual list of PubMed databank sources (ctx.nlm_databanks). The returned set of links are stored in the temporary table already constructed for this purpose.
 
<div style="font-family: monospace; font-size: 13px" >
 
<div style="font-family: monospace; font-size: 13px" >
 
<pre>
 
<pre>
        private void ReplaceFile(XmlNode article, ObjectFileRecord file_record)
+
      public IEnumerable<PMIDLink> FetchBankPMIDs()
        {
 
            string full_path = file_record.local_path;
 
            // ensure can over write
 
            if (File.Exists(full_path))
 
            {
 
                File.Delete(full_path);
 
            }
 
            using (XmlWriter writer = XmlWriter.Create(full_path, settings))
 
            {
 
                article.WriteTo(writer);
 
            }
 
            file_record.last_downloaded = DateTime.Now;
 
        }
 
</pre>
 
</div>
 
 
 
===Data Harvest===
 
 
 
 
 
===Data Aggregation===
 
public void SetupTempPMIDTable()
 
        {
 
            using (var conn = new NpgsqlConnection(connString))
 
            {
 
                string sql_string = @"DROP TABLE IF EXISTS nk.temp_pmids;
 
                      CREATE TABLE IF NOT EXISTS nk.temp_pmids(
 
                        source_id                INT
 
                      , sd_oid                  VARCHAR
 
                      , parent_study_source_id  INT
 
                      , parent_study_sd_sid      VARCHAR
 
                      , datetime_of_data_fetch  TIMESTAMPTZ
 
                      ); ";
 
                conn.Execute(sql_string);
 
            }
 
        }
 
 
 
        public void SetupDistinctPMIDTable()
 
        {
 
            using (var conn = new NpgsqlConnection(connString))
 
            {
 
                string sql_string = @"DROP TABLE IF EXISTS nk.distinct_pmids;
 
                      CREATE TABLE IF NOT EXISTS nk.distinct_pmids(
 
                        source_id                INT
 
                      , sd_oid                  VARCHAR
 
                      , parent_study_source_id  INT
 
                      , parent_study_sd_sid      VARCHAR
 
                      , datetime_of_data_fetch  TIMESTAMPTZ
 
                      ); ";
 
                conn.Execute(sql_string);
 
            }
 
        }
 
 
 
 
 
        public IEnumerable<PMIDLink> FetchBankPMIDs()
 
 
         {
 
         {
 
             using (var conn = new NpgsqlConnection(connString))
 
             using (var conn = new NpgsqlConnection(connString))
Line 380: Line 361:
 
             }
 
             }
 
         }
 
         }
 
+
</pre>
 
+
</div>
        public ulong StorePMIDLinks(PostgreSQLCopyHelper<PMIDLink> copyHelper, IEnumerable<PMIDLink> entities)
+
Then the other source of study-PMID links is interrogated, looping through all sources that include 'study references'. This is a process very similar to that during data download - a SQL statement is run against each relevant database using FetchSourceReferences, the study reference links are obtained, and added to the same temporary table as the bank based links.  
        {
+
<div style="font-family: monospace; font-size: 13px">
            using (var conn = new NpgsqlConnection(connString))
+
<pre>
            {
 
                conn.Open();
 
                return copyHelper.SaveAll(conn, entities);
 
            }
 
        }
 
 
 
 
 
 
         public IEnumerable<PMIDLink> FetchSourceReferences(int source_id, string db_name)
 
         public IEnumerable<PMIDLink> FetchSourceReferences(int source_id, string db_name)
 
         {
 
         {
Line 411: Line 385:
 
             }
 
             }
 
         }
 
         }
 
+
</pre>
 
+
</div>
 +
The ''distinct'' links between study and PubMed citations are then transferred to a table called nk.distinct_pmids. There are a few 'gotchas' to avoid here. Firstly some PMIDs obtained from the study source databases will have been formatted as a string of 24 characters (the default for all object ids, which are normally hash values). These need to be trimmed so that they can be compared properly with other PMIDs, in order that the following distinct select does eliminate all the genuine link duplications (i.e. that are in both types of link source). Then, after the distinct data has been transferred, the latest (maximum) datetime_of_data_fetch value has to be applied to the distinct record. Different copies of the same link, obtained at different times, will obviously contain different values for this - it is the latest that is needed, for future transcribing to the object links tables.
 +
<div style="font-family: monospace; font-size: 13px">
 +
<pre>
 
         public void FillDistinctPMIDsTable()
 
         public void FillDistinctPMIDsTable()
 
         {
 
         {
Line 450: Line 427:
 
             }
 
             }
 
         }
 
         }
 
+
</pre>
 
+
</div>
        public void CleanPMIDsdsidData1()
+
It is then important to try and tidy some of the worst data anomalies - in particular in the PMID values inserted into the bank data (which are entered manually in the source data). A large variety of string manipulations are carried out in functions CleanPMIDsdsidData1 to CleanPMIDsdsidData4. A few examples are shown below...
        {
+
<div style="font-family: monospace; font-size: 13px">
            string sql_string = "";
+
<pre>
            using (var conn = new NpgsqlConnection(connString))
 
            {
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = 'ACTRN' || parent_study_sd_sid
 
                        WHERE parent_study_source_id = 100116
 
                        AND length(parent_study_sd_sid) = 14;";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = Replace(parent_study_sd_sid, ' ', '')
 
                        WHERE parent_study_source_id = 100116;";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = Replace(parent_study_sd_sid, '#', '')
 
                        WHERE parent_study_source_id = 100116;";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = Replace(parent_study_sd_sid, ':', '')
 
                        WHERE parent_study_source_id = 100116;";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = Replace(parent_study_sd_sid, '[', '')
 
                        WHERE parent_study_source_id = 100116;";
 
                conn.Execute(sql_string);
 
 
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CHICTR', 'ChiCTR')
 
                        WHERE parent_study_source_id = 100118;";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = 'ChiCTR-' || parent_study_sd_sid
 
                        WHERE parent_study_source_id = 100118
 
                        and parent_study_sd_sid not ilike 'ChiCTR-%';";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                        SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'ChiCTR-ChiCTR', 'ChiCTR-')
 
                        WHERE parent_study_source_id = 100118;";
 
                conn.Execute(sql_string);
 
            }
 
        }
 
 
 
 
 
        public void CleanPMIDsdsidData2()
 
        {
 
            string sql_string = "";
 
            using (var conn = new NpgsqlConnection(connString))
 
            {
 
 
                 sql_string = @"UPDATE nk.distinct_pmids
 
                 sql_string = @"UPDATE nk.distinct_pmids
 
                     SET parent_study_sd_sid = Replace(parent_study_sd_sid, '/', '-')
 
                     SET parent_study_sd_sid = Replace(parent_study_sd_sid, '/', '-')
Line 517: Line 441:
 
                     WHERE parent_study_source_id = 100121
 
                     WHERE parent_study_source_id = 100121
 
                     and parent_study_sd_sid not ilike 'CTRI-%';";
 
                     and parent_study_sd_sid not ilike 'CTRI-%';";
                conn.Execute(sql_string);
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'REF-', '')
 
                    WHERE parent_study_source_id = 100121;";
 
                conn.Execute(sql_string);
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                    SET parent_study_sd_sid = Replace(parent_study_sd_sid, 'CTRI-CTRI', 'CTRI-')
 
                    WHERE parent_study_source_id = 100121;";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                  SET parent_study_sd_sid = 'RPCEC' || parent_study_sd_sid
 
                  WHERE parent_study_source_id = 100122
 
                  and parent_study_sd_sid not ilike 'RPCEC%';";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                  SET parent_study_sd_sid = UPPER(parent_study_sd_sid)
 
                  WHERE parent_study_source_id = 100123;";
 
                conn.Execute(sql_string);
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, ' ', '')
 
                  WHERE parent_study_source_id = 100123;";
 
                conn.Execute(sql_string);
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                  SET parent_study_sd_sid = replace(parent_study_sd_sid, '–', '-')
 
                  WHERE parent_study_source_id = 100123;";
 
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
  
Line 568: Line 459:
  
 
                 sql_string = @"UPDATE nk.distinct_pmids
 
                 sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EU', '')
+
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'ISRTN', 'ISRCTN')
                   WHERE parent_study_source_id = 100123;";
+
                   WHERE parent_study_source_id = 100126;";
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
  
 
                 sql_string = @"UPDATE nk.distinct_pmids
 
                 sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'CT', '')
+
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'ISRNT', 'ISRCTN')
                   WHERE parent_study_source_id = 100123;";
+
                   WHERE parent_study_source_id = 100126;";
 
 
                sql_string = @"UPDATE nk.distinct_pmids
 
                  SET parent_study_sd_sid = left(parent_study_sd_sid, 14)
 
                  WHERE parent_study_source_id = 100123
 
                  and length(parent_study_sd_sid) > 14;";
 
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
            }
 
        }
 
 
  
 +
                ...
 +
                ...
 +
</pre>
 +
</div>
 +
The processing from this point is shared by the general object id helper class and the specific Pubmed class. The latter is used to transfer the data to the same temp_object_ids table as used for objects in study based data sources - the data structures are the same. A general object transfer helper function, '''UpdateObjectsWithStudyIds''', is then used to insert the integer study ids into the data.
 +
<div style="font-family: monospace; font-size: 13px">
 +
<pre>     
 
         public void TransferPMIDLinksToObjectIds()
 
         public void TransferPMIDLinksToObjectIds()
 
         {
 
         {
Line 600: Line 490:
 
         }
 
         }
  
 +
        public void UpdateObjectsWithStudyIds(int source_id)
 +
        {
 +
            // Update the object parent study_id using the 'correct'
 +
            // value found in the all_ids_studies table
  
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                string sql_string = @"UPDATE nk.temp_object_ids t
 +
                          SET parent_study_id = s.study_id,
 +
                          is_preferred_study = s.is_preferred
 +
                          FROM nk.all_ids_studies s
 +
                          WHERE t.parent_study_sd_sid = s.sd_sid
 +
                          and t.parent_study_source_id = s.source_id;";
 +
                conn.Execute(sql_string);
 +
 +
                // Drop those link records that cannot be matched
 +
 +
                sql_string = @"DELETE FROM nk.temp_object_ids
 +
                            WHERE parent_study_id is null;";
 +
                conn.Execute(sql_string);
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The difficulty is that some of the studies in this table may be the 'less preferred' forms of tables that are in the system in multiple source databases. It is therefore necessary to interrogate the study_study_links table and replace the less preferred with the most preferred study identifiers. This in turn may result in duplicate records, so the table needs to be de-duplicated by creating a new table and 'selecting distinct' into it (not shown in the code below). This approach is necessary because once again the maximum of the datetime_of_data_fetch field is applied to the distinct record set, after that dataset is created. The datetime_of_data_fetch field therefore needs to be part of the table definition. The distinct version of the data is then renamed back to temp_object_ids.
 +
<div style="font-family: monospace; font-size: 13px">
 +
<pre> 
 
         public void InputPreferredSDSIDS()
 
         public void InputPreferredSDSIDS()
 
         {
 
         {
Line 615: Line 531:
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
  
                 // That may have produced some duplicates - if so get rid of them
+
                 ...
                 // needs to be done indirectly because of the need to get the maximum
+
                ...
                 // datetime_of_data_fetch for each duplciated object
+
                 ...
 +
            }
 +
        }
 +
</pre>
 +
</div>
 +
The PubMed study links are now complete, at least from the point of view of the study Ids, and can be added to the main all_ids_data_objects table. This gives each added data object an integer object_id (the identity value of the record).
 +
<div style="font-family: monospace; font-size: 13px">
 +
<pre> 
 +
        public void UpdateAllObjectIdsTable(int source_id)
 +
        {
 +
            using (var conn = new NpgsqlConnection(connString))
 +
            {
 +
                 // ...
  
                 sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids2;
+
                 string sql_string = @"INSERT INTO nk.all_ids_data_objects
                CREATE TABLE IF NOT EXISTS nk.temp_object_ids2(
+
                            (source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                  object_id                INT
+
                            parent_study_id, is_preferred_study, datetime_of_data_fetch)
                , source_id               INT
+
                            select source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                , sd_oid                   VARCHAR
+
                            parent_study_id, is_preferred_study, datetime_of_data_fetch
                , parent_study_source_id   INT
+
                            from nk.temp_object_ids";
                , parent_study_sd_sid     VARCHAR
 
                , parent_study_id         INT
 
                , is_preferred_study       BOOLEAN  default true
 
                , datetime_of_data_fetch   TIMESTAMPTZ
 
                ); ";
 
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
  
                 sql_string = @"INSERT INTO nk.temp_object_ids2(
+
                 // ...
                        source_id, sd_oid, parent_study_source_id,
 
                        parent_study_sd_sid, parent_study_id)
 
                        SELECT distinct
 
                        source_id, sd_oid, parent_study_source_id,
 
                        parent_study_sd_sid, parent_study_id
 
                        FROM nk.temp_object_ids;";
 
                conn.Execute(sql_string);
 
 
 
                // update with latest datetime_of_data_fetch
 
                // for each distinct study - data object link
 
  
                 sql_string = @"UPDATE nk.temp_object_ids2 to2
+
                 sql_string = @"UPDATE nk.all_ids_data_objects
                        set datetime_of_data_fetch = mx.max_fetch_date
+
                            SET object_id = id
                        FROM
+
                            WHERE source_id = " + source_id.ToString() + @"
                        ( select sd_oid, parent_study_sd_sid,
+
                            and object_id is null;";
                          max(datetime_of_data_fetch) as max_fetch_date
 
                          FROM nk.temp_object_ids
 
                          group by sd_oid, parent_study_sd_sid ) mx
 
                        WHERE to2.sd_oid = mx.sd_oid
 
                        and to2.parent_study_sd_sid = mx.parent_study_sd_sid;";
 
                conn.Execute(sql_string);
 
 
 
                sql_string = @"DROP TABLE IF EXISTS nk.temp_object_ids;
 
                ALTER TABLE nk.temp_object_ids2 RENAME TO temp_object_ids;";
 
                conn.Execute(sql_string);
 
 
 
                // Maybe a few blank pmids slip through...
 
 
 
                sql_string = @"delete from nk.temp_object_ids
 
                    where sd_oid is null or sd_oid = '';";
 
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
 
             }
 
             }
 
         }
 
         }
 
+
</pre>
 
+
</div>
 +
With study based data sources this is the end of the processing - the data objects are assumed at this stage to be unique, and their source data ids, created as hashes will certainly be unique. For PubMed citations. however, this is not the case. The objects will often be duplicated - a journal article can be referenced by many studies. Their source data id (sd_oid) is the PubMed identifier (PMID) and will be constant across sources - it is therefore possible to identify and resolve the duplicates. For any particular PubMed Id therefore, that is in the table more than once, the object_id needs to be replaced by the minimum of the available set. The function '''ResetIdsOfDuplicatedPMIDs''' is used to accomplish this.
 +
<div style="font-family: monospace; font-size: 13px">
 +
<pre> 
 
         public void ResetIdsOfDuplicatedPMIDs()
 
         public void ResetIdsOfDuplicatedPMIDs()
 
         {
 
         {
Line 692: Line 593:
 
                 sql_string = @"DROP TABLE nk.temp_min_object_ids;";
 
                 sql_string = @"DROP TABLE nk.temp_min_object_ids;";
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
 
                // ???? May be (yet) more duplicates have appeared, where a
 
                // study - pmid link has been generated in more than one way
 
                // (Links of the same paper to different studies are not uncommon)
 
 
 
             }
 
             }
 
         }
 
         }
 
+
</pre>
 
+
</div>
         public void DropTempPMIDTable()
+
Now, finally, the data object links for the PubMed records are all correct. The final stage is to create the utility table temp_objects_to_add with the correct data, so it can be used during transfer of the data object data itself. After that the system drops the temporary tables used in the links processing, and proceeds to the standard transfer of data objects and their attributes, in this instance from the PubMed database.
 +
<div style="font-family: monospace; font-size: 13px">
 +
<pre>
 +
         public void FillObjectsToAddTable(int source_id)
 
         {
 
         {
 
             using (var conn = new NpgsqlConnection(connString))
 
             using (var conn = new NpgsqlConnection(connString))
 
             {
 
             {
                 string sql_string = "DROP TABLE IF EXISTS pp.temp_pmids";
+
                 string sql_string = @"INSERT INTO nk.temp_objects_to_add
 +
                            (object_id, sd_oid)
 +
                            SELECT distinct object_id, sd_oid
 +
                            FROM nk.all_ids_data_objects
 +
                            WHERE source_id = " + source_id.ToString();
 
                 conn.Execute(sql_string);
 
                 conn.Execute(sql_string);
 
             }
 
             }
 
         }
 
         }
 +
</pre>
 +
</div>

Latest revision as of 12:37, 5 January 2021

Introduction

The PubMed data has a range of 'special issues' that demand additional processing in several places in the system. This is the only data that is not directly linked to studies within the data source, so the data objects retrieved from it, the PubMed citations and associated PMC articles, have to be linked within the MDR. There are two mechanisms for this. The first is embedded in the Pubmed data itself, in the shape of 'bank ids'. These are the identifiers of associated entries in 'databanks', and that term includes trial registries. This allows a PubMed citation to explicitly reference an associated study, through that study's registry id - most often but not exclusively the ClinicalTrials.gov NCT number. The second is in the 'associated reference' data found in some data sources. This appears under various names and in various forms, but usually includes a PubMed identifier, at least for the great majority of cited references (some are given only as DOIs or as textual citations). Note that although this list of references is obtained during a study based source's extraction, the citations themselves are not downloaded and processed into the system during that extraction. Instead all PubMed data is extracted and processed only when referencing PubMed (id = 100135) as the data source.
The system has to consider both PubMed linkage types, firstly when identifying the relevant PubMed data to download (PubMed contains well over 30 million records, so we only want the data we can link) and secondly when linking PubMed data objects to studies during data aggregation. The harvesting process for PubMed is also relatively complex, requiring additional calls into the contextual data. The PubMed specific aspects of data download, harvest and aggregation are therefore summarised below.

Data Download

Under normal circumstances downloads of PubMed data need to obtain only records that have been added to, or revised within, the PubMed system since the previous download. PubMed downloads therefore come with a cut-off date, that specifies only records added / edited on or after that date need be (re-) downloaded. In addition, they come with a search filter. The first is used to select records in PubMed that include 'bank' references to a trial registry. the second is used to assemble the reference lists already existing in the MDR, and use that list as the basis for checking revision dates in the PubMed database. In each case the 'type' of the saf (search and fetch) operation is 114: All new or revised records with a filter applied (download).
In the first, bank based, download the filter parameter is 10003: PubMed abstracts with references to any trial registry, whilst in the second, references based download it is 10004: Identifies PubMed references in Study sources that have not yet been downloaded.
Variants of these downloads exist using type 121: Filtered records (download). With filter 10003 this will cause the download of all bank-linked records to be downloaded, irrespective of date, whilst with filter 10004 all records referenced in study sources will be downloaded, irrespective of revision date. Use of this download type may be used occasionally, e.g. annually, after the Pubmed database itself is revised, to ensure synchronisation with the source. In all cases the download mechanism makes use of the 'Entrez' utilities API made available by the NCBI in the US, which supports PubMed along with many other (largely -omic) databases.
(see Sayers E. A General Introduction to the E-utilities. In: Entrez Programming Utilities Help [Internet]. Bethesda (MD): National Center for Biotechnology Information (US); 2010-. Available from: https://www.ncbi.nlm.nih.gov/books/NBK25497/).

Searching Pubmed for Bank linked records

The simpler task is to identify the records in PubMed that have a reference to a 'databank', in this context a trial registry, that have been revised or added since the last download (which provides the 'cut-off date'). The code is shown below. After some initial variable initialisations, and the conversion of the cut-off date into a suitable search parameter, each of the possible databanks is considered in turn. For each a search URL is composed, that consists of

  • the base URL of the Entrez Search engine
  • the database name ('pubmed')
  • An API 'key'
  • A query term, that consists of
    • the name of the databank, as listed by NCBI, plus '{SI]', which identifies the type of parameter as a databank, plus
    • the date range to be searched for a relevant revision date, in the required format
  • a flag that indicates that the results should be stored in the Entrez system, rather than being immediately downloaded, on what is referred to as the 'History' server. This allows intermediate results to be stored at the NCBI rather than being downloaded, and is a powerful part of the available Entrez workflow.

The 'API key' is an additional parameter provided by NCBI to individual users and serves to identify the API user - it is available free of charge after registration with the NCBI and is a hex digit string, 36 digits long. It should be included in all regular Entrez API calls, and doing so increases the allowed response frequency of the system - up to 10 calls can be made a second as long as an API key is present. The key is stored, like other sensitive data, in the appsettings.json file and read from there at programme startup - it can then be accessed through the logging data layer (logging_repo). It is not visible in the code on GitHub.
The relevant lines of code are:

string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
search_term = "&term=" + s.nlm_abbrev + "[SI]" + date_term;
string search_url = search_baseURL + api_key + search_term + "&usehistory=y";

The response to this API call contains, amongst other things,

  • The number of records found
  • an integer 'query key'
  • a string 'web environment'

The latter two are necessary to reference the list of Ids that have been stored on the Entrez History server.
This is done in the next stage of the routine. Assuming there are any records at all (numbers vary from 0 to several hundred, if the search period is about a week, for each trial registry) the a 'fetch' API call is constructed, downloading up to 100 records at a time, The URL needs to include

  • the base URL of the Entrez Fetch engine
  • the database name ('pubmed')
  • The API 'key'
  • A WebEnv parameter that includes the web environment returned from the search query
  • A query_key parameter that includes the query keyenvironment returned from the search query
  • 'retstart' and 'retmax' (=100) parameters that indicate which 100 records should be returned (added within a loop)
  • A 'retmode' parameter that indicates that the data should be returned as concatenated XML files.

The relevant code is below:

string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
fetch_baseURL += api_key + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
numCallsNeeded = (int)(totalRecords / retmax) + 1;
for (int i = 0; i < numCallsNeeded; i++)
{ 
       ...
       string fetch_URL = fetch_baseURL + "&retstart=" + (i * retmax).ToString() + "&retmax=" + retmax.ToString();
       fetch_URL += "&retmode=xml";
       ...

Once the URL is constructed it is passed to the FetchPubMedRecordsAsync function, along with a 'download result' object, that accumulates the results of eachdownload and which ultimately is returned at the end of the whole process. The FetchPubMedRecordsAsync function calls the API and - assuming data is returned - deserialises it to up to 100 separate pubmed 'Articles' (= citations). Each of these is then compared to the object source data table in the 'mon' database (sf.source_data_objects). If a new citation it is written out as a new file and a new record is added to the object source table. If not it replaces the existing file and the record in the object source table is updated.
The code for the whole top level function is shown below.

        public async Task<DownloadResult> ProcessPMIDsListfromBanksAsync()
        {
            int totalRecords = 0, numCallsNeeded = 0, bank_id = 0;
            string search_term = "", date_term = "";
            DownloadResult res = new DownloadResult();
            XmlSerializer xSerializer = new XmlSerializer(typeof(eSearchResult));

            // This search can be (and usually is) date sensitive.

            if (args.type_id == 114)
            {
                string today = DateTime.Now.ToString("yyyy/MM/dd");
                string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
                date_term = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
            }

            // Get list of potential linked data banks (includes trial registries).

            IEnumerable<PMSource> banks = pubmed_repo.FetchDatabanks();
            foreach (PMSource s in banks)
            {
                // get databank details

                bank_id = s.id;
                string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
                search_term = "&term=" + s.nlm_abbrev + "[SI]" + date_term;
                string search_url = search_baseURL + api_key + search_term + "&usehistory=y";

                // Get the number of total records that have this databank reference
                // and that (usually) have been revised recently 
                // and calculate the loop parameters.

                string responseBody = await ch.GetStringFromURLAsync(search_url);
                if (responseBody != null)
                {
                    using (TextReader reader = new StringReader(responseBody))
                    {
                        // The eSearchResult class corresponds to the returned data.

                        eSearchResult result = (eSearchResult)xSerializer.Deserialize(reader);
                        if (result != null)
                        {
                            totalRecords = result.Count;
                            query_key = result.QueryKey;
                            web_env = result.WebEnv;
                        }
                    }

                    // loop through the records and obtain and store relevant
                    // records, of PubMed Ids, retmax (= 100) at a time     

                    if (totalRecords > 0)
                    {
                        int retmax = 100;
                        string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
                        fetch_baseURL += api_key + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
                        numCallsNeeded = (int)(totalRecords / retmax) + 1;
                        for (int i = 0; i < numCallsNeeded; i++)
                        {
                            try
                            {
                                // Retrieve the articles as nodes.

                                string fetch_URL = fetch_baseURL + "&retstart=" + (i * retmax).ToString() + "&retmax=" + retmax.ToString();
                                fetch_URL += "&retmode=xml";
                                await FetchPubMedRecordsAsync(fetch_URL, res);
                                System.Threading.Thread.Sleep(300);
                            }

                            catch (HttpRequestException e)
                            {
                                logging_repo.LogError("In PubMed ProcessPMIDsListfromBanksAsync(): " + e.Message);
                                return null;
                            }
                        }
                    }

                    logging_repo.LogLine("Processed " + totalRecords.ToString() + " from " + s.nlm_abbrev);
                }
            }

            return res;
        }

Downloaded updated versions of Pubmed references

In this second form of PubMed download, the Pubmed references that are listed within study based data sources (especially but not exclusively ClinicalTrials.gov) are checked to see if they have been updated or added during the specified period, normally since the last download of this type. This is a more complex task that has to be done in 4 stages:

  • Assembly of PMIDs referenced in source databases =>
  • Upload of Ids to the Entrez History server =>
  • Search of uploaded Ids for those revised or added in the specified period =>
  • Download of any 'qualifying' Pubmed records.

The most straightforward part is the assembly of the PMIDs (PubMed Ids) referenced in some study based data sources, by collecting this data from 'ad.study_references' tables, in the relevant databases. N.B. Because this data should be as up to date as possible this download should not occur until after download / harvest / import cycles have been completed for the study based data sources. The PMIDs are collected and transferred to a single table, and are then aggregated into comma separated lists of 100 Ids each. Because about 95,000 PMID records exist in source databases the resulting table has about 950 rows. They are assembled in this fashion because each id list will later be inserted into a URL. The code, with in-line commenting removed for brevity, is shown below. The SQL statements that create the '100 Id' strings are also shown, in the CreatePMID_IDStrings function. Here an identity Id column in the distinct_pmids table is first used to number each record, and this number is then used to split them up into groups of 100, each of which which can be aggregated. A select distinct is used on the resulting table to reduce the row count by a factor of 100.

        public void CreatePMIDsListfromSources()
        {
            ...
            pubmed_repo.SetUpTempPMIDsBySourceTables();
            CopyHelpers helper = new CopyHelpers();
            IEnumerable<PMIDBySource> references;
            ...
            IEnumerable<Source> sources = pubmed_repo.FetchSourcesWithReferences();
            foreach (Source s in sources)
            {
                references = pubmed_repo.FetchSourceReferences(s.database_name);
                pubmed_repo.StorePmidsBySource(helper.source_ids_helper, references);
            }
            pubmed_repo.CreatePMID_IDStrings();
        }

        public void CreatePMID_IDStrings()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO pp.distinct_pmids(
                          pmid)
                          SELECT DISTINCT pmid
                          FROM pp.pmids_by_source_total
                          ORDER BY pmid;";
                conn.Execute(sql_string);

                sql_string = @"Update pp.distinct_pmids
                          SET group_id = identity / 100;";
                conn.Execute(sql_string);

                // fill the id list (100 ids in each striong)
                sql_string = @"INSERT INTO pp.pmid_id_strings(
                        id_string)
                        SELECT DISTINCT string_agg(pmid, ', ') 
                        OVER (PARTITION BY group_id) 
                        from pp.distinct_pmids;";
                conn.Execute(sql_string);
            }
        }

The routine above is called near the beginning of the main routine, after key variables have been initialised and the URLs used for posting, searching and fetching data have been partially constructed. The routine then proceeds to upload each batch of 100 Ids to the Entrez history server. For each batch uploaded the system receives, in response, a 'web environment' string and an integer query key. These are fed into the next stage.
If no data parameters have been supplied (i.e. this is a general re-download of all referenced PMIDs) the system simply fetches all 100 listed Pubmed articles by sending a constructed fetch URL to the FetchPubMedRecordsAsync function, which as described above retrieves the data, splits it into its constituent Pubmed citations, and then either writes new or replaces the relevant XML file.
If there is a time period specified, as is usually the case, the 100 listed citations are first searched to see if any have been updated or revised in that time. In the great majority of cases there are none, but in a few hundred cases usually 1 record is identified. The results are retained on the History Server, and if the relevant record count is 1 or more the identified Pubmed record(s) is downloaded. Each search stage also generates a 'web environment' string and an integer query key, and these must be used in any subsequent fetch operation.
All the API URLs are constructed in a similar way - beginning with

  • the relevant base URL for the service required (post, search or fetch)
  • the name of the database (pubmed)
  • The API key

The post URL simply has the Id list appended in each case.
The search URL has to use the query key and web environment from the post to identify the data to search. Note that this has to be done using a very specific syntax: the search 'term' parameter must start with the query key preceded by a '#' (coded as %23 in the URL) and followed by '+AND+' and then any other parameters - here that constructed from the cut-off date. The URL also has to end with '&usehistory=y' to ensure the results stay on the Entrez server.
The fetch URL also needs the query key and web environment from the search (with a date) or post (without a date) to download the Pubmed records. It has a max return value of 100 articles added (the default is 20) and the return mode must be specified as XML. In either case the URL is sent to the FetchPubMedRecordsAsync which does the download and subsequent file revision. The whole top level routine is shown below. Note the inclusion of pauses (thread sleeps of 200 milliseconds) at various points, to ensure that accesses stay well within the limit of 10 per second.

        public async Task<DownloadResult> ProcessPMIDsListfromDBSourcesAsync()
        {
            DownloadResult res = new DownloadResult();
            XmlSerializer post_xSerializer = new XmlSerializer(typeof(ePostResult));
            XmlSerializer search_xSerializer = new XmlSerializer(typeof(eSearchResult));
            string date_string = "";
            int string_num = 0;
            try
            {
                // Set up bases of search strings

                string post_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/epost.fcgi?db=pubmed";
                post_baseURL += api_key;
                string search_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed";
                search_baseURL += api_key;
                if (args.type_id == 114)
                {
                    string today = DateTime.Now.ToString("yyyy/MM/dd");
                    string cutoff = ((DateTime)args.cutoff_date).ToString("yyyy/MM/dd");
                    date_string = "&mindate=" + cutoff + "&maxdate=" + today + "&datetype=mdat";
                }
                string fetch_baseURL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed";
                fetch_baseURL += api_key;
                string post_URL, search_URL, fetch_URL;

                // Make a list of all PMIDs in the relevant DBs, 
                // as PMID strings 100 ids long. Then take each string
                // and post it to the Entry history server
                // getting back the web environment and query key parameters

                CreatePMIDsListfromSources();

                IEnumerable<string> idstrings = pubmed_repo.FetchSourcePMIDStrings();
                foreach (string idstring in idstrings)
                {
                    // Construct the post URL using the 100 Ids
                    string_num++;
                    post_URL = post_baseURL + "&id=" + idstring;
                    System.Threading.Thread.Sleep(200);
                    string post_responseBody = await ch.GetStringFromURLAsync(post_URL);
                    if (post_responseBody != null)
                    {
                        using (TextReader post_reader = new StringReader(post_responseBody))
                        {
                            // The eSearchResult class corresponds to the returned data.
                            ePostResult post_result = (ePostResult)post_xSerializer.Deserialize(post_reader);
                            if (post_result != null)
                            {
                                // search the articles in these ids for recent revisions

                                query_key = post_result.QueryKey;
                                web_env = post_result.WebEnv;
                                if (date_string == "")
                                {
                                    // No need to search - fetch all 100 pubmed records immediately
                                    fetch_URL = fetch_baseURL + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
                                    fetch_URL += "&retmax=100&retmode=xml";
                                    System.Threading.Thread.Sleep(200);
                                    await FetchPubMedRecordsAsync(fetch_URL, res);
                                }
                                else
                                {
                                    // search for those that have been revised on or since the cutoff date

                                    search_URL = search_baseURL + "&term=%23" + query_key.ToString() + "+AND+" + date_string;
                                    search_URL += "&WebEnv=" + web_env + "&usehistory=y";

                                    System.Threading.Thread.Sleep(200);
                                    string search_responseBody = await ch.GetStringFromURLAsync(search_URL);
                                    if (search_responseBody != null)
                                    {
                                        int totalRecords = 0;
                                        using (TextReader search_reader = new StringReader(search_responseBody))
                                        {
                                            // The eSearchResult class corresponds to the returned data.
                                            eSearchResult search_result = (eSearchResult)search_xSerializer.Deserialize(search_reader);
                                            if (search_result != null)
                                            {
                                                totalRecords = search_result.Count;
                                                query_key = search_result.QueryKey;
                                                web_env = search_result.WebEnv;

                                                if (totalRecords > 0)
                                                {
                                                    fetch_URL = fetch_baseURL + "&WebEnv=" + web_env + "&query_key=" + query_key.ToString();
                                                    fetch_URL += "&retmax=100&retmode=xml";
                                                    System.Threading.Thread.Sleep(200);
                                                    await FetchPubMedRecordsAsync(fetch_URL, res);
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }

                    if (string_num % 10 == 0) logging_repo.LogLine(string_num.ToString() + " lines checked");
                }

                return res;
            }

            catch (HttpRequestException e)
            {
                logging_repo.LogError("In PubMed ProcessPMIDsListfromDBSourcesAsync(): " + e.Message);
                return null;
            }

        }

Data Harvest

Because no processing of the PubMed record takes place during download, all of the transformation of the data into the ECRIN metadata format has to take place during the harvesting stage. The PubMed data is relatively rich, but some additional processing is required to find the 'managing organisation' - the publisher - of the related journal article.
The data available in each file, and its structure, is described in PubMed Data Structure, which is derived from the data available on the PubMed web site. This describes the structure of the source data, identifies the main elements and their attributes, and indicates which are extracted within the MDR system.
The code for the Harvest process is available on the ECRIN GitHub site, so there is no attempt to go through all aspects of the data processing and transformation during harvesting. Instead some of the main points, issues and features are listed as bullet points below:

  • The initial purpose of the harvest process is to produce and store a 'citation object' rather than a 'data object'. Both are stored in the sd tables of the database, in tables of the same name. Data objects records are created before any import, from the Citation Object, so the data structures in the ad schema are the same as for all other data sources. This is because it is impossible to derive the Managing Organisation from the PubMed data alone, so additional processing is required before the Data Objects can be created.
  • As with other harvest operations, a blank Object is created at the beginning of the process, and the values of data points and attributes are gradually added to it, in this case by parsing the XML file, identifying element and attribute nodes, and reading the date values from them.
  • The main citation object created - for all PubMed metadata files - is the PubMed citation itself. In about a third of cases, the publicly available PMC article is also added to the system as an additional data object.
  • The processing of titles is relatively complex, partly because of the use of brackets and other indicators of different title origin, which need to be properly interpreted and processed, and partly because of so-called 'vernacular titles'.
  • It is essential to obtain electronic and print issn values for the journal, not because they are used in the ECRIN metadata schema but because they can be used to identify the journal publisher. This is done after the main harvest, by using lookup tables in the context database.
  • Dates are expressed in a variety of formats and need processing to convert to the more standard format used by the ECRIN metadata scheme. Some may represent a range rather than a single data, and some may be partial. A variety of utility functions are required for data processing.
  • Topic data comes from a variety of sections within the source material, and therefore requires aggregation into a standard structure within the harvesting process.
  • Date and identifiers can also be found in different parts of the XML source record. In some cases they can be duplicated. It is therefore necessary to check any addition of these types of data against what is already present.
  • The display title of the object is not the article title, but a full citation in the form of authors - article title - pagination and journal details. This citation therefore has to be constructed during extraction, in a consistent fashion. A variety of data points are extracted for only this purpose, and are not used elsewhere in the ECRIN metadata schema.

Data Aggregation

In the same way that two potential sources of study-PubMed links need to be considered when downloading Pubmed data, the same two sources of links need to be considered when aggregating that data. During downloading the links are used only to identify PubMed data requiring download since the last download event. During aggregation, the Pubmed data is considered to be already downloaded, and it is the links that are important, for all the PubMed data, in order to associate the article journal data objects with the correct studies.
Data aggregation, which includes PubMed aggregation as the final step, should therefore be done after all study based data sources have been brought up to date, and then after all PubMed data has been downloaded and updated, to ensure that the data in each study based data source is as complete as possible.
The key function here is ProcessStandaloneObjectIds(), which orchestrates the process by which the links study-article links are established. It makes use of a series of functions in a helper class called PubMedTransferHelper, as PubMed is currently the only object based data source, and therefore the only source where object ids are processed in a standalone fashion, i.e. without reference to any study data in the same data source. The key lines of this function are shown below.
The initial task, once the helper class has been instantiated and some 'workspace' temporary tables have been created, is to get the 'bank-related' data from the PubMed database itself - i.e. all the PubMed identifiers (PMIDs) and trial registry ids listed in the PubMed data itself, as found in the ad.object_db_links table. The SQL used for this selects only object-db data where the type of 'db' is a trial registry (and not a 'databank'). It returns the source id of the object as 100135, i.e. PubMed, but the source id of the study is also returned by joining the object_db_links back to the contextual list of PubMed databank sources (ctx.nlm_databanks). The returned set of links are stored in the temporary table already constructed for this purpose.

       public IEnumerable<PMIDLink> FetchBankPMIDs()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"select 
                        100135 as source_id, 
                        d.id as parent_study_source_id, 
                        k.sd_oid, k.id_in_db as parent_study_sd_sid, 
                        a.datetime_of_data_fetch
                        from pubmed_ad.object_db_links k
                        inner join pubmed_ad.data_objects a 
                        on k.sd_oid = a.sd_oid
                        inner join context_ctx.nlm_databanks d
                        on k.db_name = d.nlm_abbrev
                        where bank_type <> 'databank'";
                return conn.Query<PMIDLink>(sql_string);
            }
        }

Then the other source of study-PMID links is interrogated, looping through all sources that include 'study references'. This is a process very similar to that during data download - a SQL statement is run against each relevant database using FetchSourceReferences, the study reference links are obtained, and added to the same temporary table as the bank based links.

        public IEnumerable<PMIDLink> FetchSourceReferences(int source_id, string db_name)
        {
            builder.Database = db_name;
            string db_conn_string = builder.ConnectionString;

            using (var conn = new NpgsqlConnection(db_conn_string))
            {
                string sql_string = @"select 
                        100135 as source_id, " +
                        source_id.ToString() + @" as parent_study_source_id, 
                        r.pmid as sd_oid, r.sd_sid as parent_study_sd_sid, 
                        s.datetime_of_data_fetch
                        from ad.study_references r
                        inner join ad.studies s
                        on r.sd_sid = s.sd_sid
                        where r.pmid is not null;";
                return conn.Query<PMIDLink>(sql_string);
            }
        }

The distinct links between study and PubMed citations are then transferred to a table called nk.distinct_pmids. There are a few 'gotchas' to avoid here. Firstly some PMIDs obtained from the study source databases will have been formatted as a string of 24 characters (the default for all object ids, which are normally hash values). These need to be trimmed so that they can be compared properly with other PMIDs, in order that the following distinct select does eliminate all the genuine link duplications (i.e. that are in both types of link source). Then, after the distinct data has been transferred, the latest (maximum) datetime_of_data_fetch value has to be applied to the distinct record. Different copies of the same link, obtained at different times, will obviously contain different values for this - it is the latest that is needed, for future transcribing to the object links tables.

        public void FillDistinctPMIDsTable()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // First ensure that any PMIDs (sd_oids) are in the same format
                // Some have a 'tail' of spaces after them, as the standard 
                // length of a sd_oid is 24 characters.
                
                string sql_string = @"UPDATE nk.temp_pmids
                         SET sd_oid = trim(sd_oid);";
                conn.Execute(sql_string);

                // Then transfer the distinct data

                sql_string = @"INSERT INTO nk.distinct_pmids(
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid)
                         SELECT distinct 
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid
                         FROM nk.temp_pmids;";
                conn.Execute(sql_string);

                // Update with latest datetime_of_data_fetch

                sql_string = @"UPDATE nk.distinct_pmids dp
                         set datetime_of_data_fetch = mx.max_fetch_date
                         FROM 
                         ( select sd_oid, parent_study_sd_sid, 
                           max(datetime_of_data_fetch) as max_fetch_date
                           FROM nk.temp_pmids
                           group by sd_oid, parent_study_sd_sid ) mx
                         WHERE dp.parent_study_sd_sid = mx.parent_study_sd_sid
                         and dp.sd_oid = mx.sd_oid;";
                conn.Execute(sql_string);
            }
        }

It is then important to try and tidy some of the worst data anomalies - in particular in the PMID values inserted into the bank data (which are entered manually in the source data). A large variety of string manipulations are carried out in functions CleanPMIDsdsidData1 to CleanPMIDsdsidData4. A few examples are shown below...

                sql_string = @"UPDATE nk.distinct_pmids
                     SET parent_study_sd_sid = Replace(parent_study_sd_sid, '/', '-')
                     WHERE parent_study_source_id = 100121;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                     SET parent_study_sd_sid = 'CTRI-' || parent_study_sd_sid
                     WHERE parent_study_source_id = 100121
                     and parent_study_sd_sid not ilike 'CTRI-%';";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRA-CT', 'EUDRACT')
                   WHERE parent_study_source_id = 100123;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EUDRACT', '')
                    WHERE parent_study_source_id = 100123;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'EURODRACT', '')
                   WHERE parent_study_source_id = 100123;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'ISRTN', 'ISRCTN')
                   WHERE parent_study_source_id = 100126;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.distinct_pmids
                   SET parent_study_sd_sid = replace(parent_study_sd_sid, 'ISRNT', 'ISRCTN')
                   WHERE parent_study_source_id = 100126;";
                conn.Execute(sql_string);

                ...
                ... 

The processing from this point is shared by the general object id helper class and the specific Pubmed class. The latter is used to transfer the data to the same temp_object_ids table as used for objects in study based data sources - the data structures are the same. A general object transfer helper function, UpdateObjectsWithStudyIds, is then used to insert the integer study ids into the data.

      
        public void TransferPMIDLinksToObjectIds()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO nk.temp_object_ids(
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid, datetime_of_data_fetch)
                         SELECT  
                         source_id, sd_oid, parent_study_source_id, 
                         parent_study_sd_sid, datetime_of_data_fetch
                         FROM nk.distinct_pmids";
                conn.Execute(sql_string);
            }
        }

        public void UpdateObjectsWithStudyIds(int source_id)
        {
            // Update the object parent study_id using the 'correct'
            // value found in the all_ids_studies table

            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"UPDATE nk.temp_object_ids t
                           SET parent_study_id = s.study_id, 
                           is_preferred_study = s.is_preferred
                           FROM nk.all_ids_studies s
                           WHERE t.parent_study_sd_sid = s.sd_sid
                           and t.parent_study_source_id = s.source_id;";
                conn.Execute(sql_string);

                // Drop those link records that cannot be matched

                sql_string = @"DELETE FROM nk.temp_object_ids
                             WHERE parent_study_id is null;";
                conn.Execute(sql_string);
            }
        }

The difficulty is that some of the studies in this table may be the 'less preferred' forms of tables that are in the system in multiple source databases. It is therefore necessary to interrogate the study_study_links table and replace the less preferred with the most preferred study identifiers. This in turn may result in duplicate records, so the table needs to be de-duplicated by creating a new table and 'selecting distinct' into it (not shown in the code below). This approach is necessary because once again the maximum of the datetime_of_data_fetch field is applied to the distinct record set, after that dataset is created. The datetime_of_data_fetch field therefore needs to be part of the table definition. The distinct version of the data is then renamed back to temp_object_ids.

   
        public void InputPreferredSDSIDS()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // replace any LHS sd_sids with the 'preferred' RHS

                string sql_string = @"UPDATE nk.temp_object_ids b
                               SET parent_study_sd_sid = preferred_sd_sid,
                               parent_study_source_id = preferred_source_id
                               FROM nk.study_study_links k
                               WHERE b.parent_study_sd_sid = k.sd_sid
                               and b.parent_study_source_id = k.source_id ;";
                conn.Execute(sql_string);

                ...
                ...
                ...
            }
        }

The PubMed study links are now complete, at least from the point of view of the study Ids, and can be added to the main all_ids_data_objects table. This gives each added data object an integer object_id (the identity value of the record).

   
        public void UpdateAllObjectIdsTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // ...

                string sql_string = @"INSERT INTO nk.all_ids_data_objects
                             (source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch)
                             select source_id, sd_oid, parent_study_source_id, parent_study_sd_sid,
                             parent_study_id, is_preferred_study, datetime_of_data_fetch
                             from nk.temp_object_ids";
                conn.Execute(sql_string);

                // ...

                sql_string = @"UPDATE nk.all_ids_data_objects
                            SET object_id = id
                            WHERE source_id = " + source_id.ToString() + @"
                            and object_id is null;";
                conn.Execute(sql_string);
            }
        }

With study based data sources this is the end of the processing - the data objects are assumed at this stage to be unique, and their source data ids, created as hashes will certainly be unique. For PubMed citations. however, this is not the case. The objects will often be duplicated - a journal article can be referenced by many studies. Their source data id (sd_oid) is the PubMed identifier (PMID) and will be constant across sources - it is therefore possible to identify and resolve the duplicates. For any particular PubMed Id therefore, that is in the table more than once, the object_id needs to be replaced by the minimum of the available set. The function ResetIdsOfDuplicatedPMIDs is used to accomplish this.

  
        public void ResetIdsOfDuplicatedPMIDs()
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                // Find the minimum object_id for each PMID in the table
                // source id for PubMed = 100135

                string sql_string = @"DROP TABLE IF EXISTS nk.temp_min_object_ids;
                                     CREATE TABLE nk.temp_min_object_ids as
                                     SELECT sd_oid, Min(id) as min_id
                                     FROM nk.all_ids_data_objects
                                     WHERE source_id = 100135
                                     GROUP BY sd_oid;";
                conn.Execute(sql_string);

                sql_string = @"UPDATE nk.all_ids_data_objects b
                               SET object_id = min_id
                               FROM nk.temp_min_object_ids m
                               WHERE b.sd_oid = m.sd_oid
                               and source_id = 100135;";
                conn.Execute(sql_string);

                sql_string = @"DROP TABLE nk.temp_min_object_ids;";
                conn.Execute(sql_string);
            }
        }

Now, finally, the data object links for the PubMed records are all correct. The final stage is to create the utility table temp_objects_to_add with the correct data, so it can be used during transfer of the data object data itself. After that the system drops the temporary tables used in the links processing, and proceeds to the standard transfer of data objects and their attributes, in this instance from the PubMed database.

 
        public void FillObjectsToAddTable(int source_id)
        {
            using (var conn = new NpgsqlConnection(connString))
            {
                string sql_string = @"INSERT INTO nk.temp_objects_to_add
                             (object_id, sd_oid)
                             SELECT distinct object_id, sd_oid 
                             FROM nk.all_ids_data_objects
                             WHERE source_id = " + source_id.ToString();
                conn.Execute(sql_string);
            }
        }