Ingester
Introduction
The ingester script has been designed to provide a simple to use, generic tool for regular ingestions into the CEDA archive. The workflow of the ingester is:
- List files either from particular set of directories or from a listing file.
- For each file in the list:
- Work out the destination in the archive using a regular expression.
- Deposit the file in the archive.
- optionally remove the source file.
(Source code is stored here: https://breezy.badc.rl.ac.uk/cedadev/ingest_lib)
Config files
The ingester uses a config file as used by ingest_control. A minimal example is show below.
[stream-name] script: ingester arrivals_users: spepler parton dirtemplate: /badc/datasetx/data/%(year)s regex: xxx(?P<year>\d{4})\d{4}.dat deleterChoice: arrivals
This looks in /datacentre/arrivals/users/spepler and /datacentre/arrivals/users/parton to create the list of source files. Files that match the pattern, xxx(?P<year>\d{4})\d{4}.dat, are deposited in the archive using the directory template to construct the archive directory. After ingestion the source files are deleted.
Typically, these stream configurations also contain other options for scheduling by ingest_control.
[cfarr-lidar] owner: gparton description: ingestion of lidar data directly from arrivals area (doesn't ingest those unpacked first to processing area) script: ingester -f #scheduler details follow: when: 5,15,25,35,45,55 * * * * timeout:36 notify_warning: graham.parton@stfc.ac.uk notify_fail: graham.parton@stfc.ac.uk # end of scheduler details arrivals_users: jagnew dirtemplate: /badc/chilbolton/data/%(instDict)s%(locDict)s/%(year)s/%(month)s/ regex:^(cfarr\-)(?P<instrument>[0-9a-z\-]*)_(?P<location>chilbolton|sparsholt)_(?P<year>\d{4})(?P<month>\d{2})(\d{2})(_[a-zA-Z\-]*)?\.(?P<ext>nc|png|zip)$ deleterChoice: arrivals force:on skip_and_clean_up: True skip: on
Config file options
relaxed_names |
Use relaxed names when depositing. Allows @ and : |
force | Force overwrite of existing files in archive. The -f command line option will override this option is used. |
warn_on_zero_length |
Just warn if zero length files are found in source list, rather that stopping. |
dirtemplate | Directory template for the destination archive path. This is in python format string replacement format. |
path_template | An alternative to dirtemplate. This template is used to create the full path of the destination in the archive, including the filename. This allows you to rename files as they are deposited. |
regex | The regular expression to match filename components that are used in the dirtemplate or path_template. Named groups must be used, for example (?P<year>\d{4}) in the regex will be inserted in the dirtemplate where %(year)s is found. Note that the whole source path is used as input to the regex so elements in the arrivals directory structure can be matched. |
arrivals_ignore_regex | a regex pattern for files in source area to ignore |
arrivals_dirs |
A space separated list of directories to search for files to ingest. In addition all the options used in the arrivals module are also used. |
arrivals_users | A space seperated list of usernames. This list is mapped to a list of directories that are appended to the arrivals_dirs list. The directory mapped to for a user is of the form /datacentre/arrivals/users/<username>/<streamname>. For example, if arrivals_users = spepler gparton, and the stream name is xxx then the list of directories searched are /datacentre/arrivals/users/spepler/xxx and /datacentre/arrivals/users/gparton/xxx. In addition all the options used in the arrivals module are also used. |
nthreads | Run a multi threaded ingester. defaults to 1. This increases the speed of the ingester proportional to the number of threads. Note it should be remembered that the number of deposit servers is finite and so increasing this number to the number of deposit servers will monopolise all the servers. One off jobs should keep nthreads < 15. Regular jobs nthreads < 10. |
force | Force overwriting files in the archive with matching paths and filenames. Defaults to off. |
skip |
Ignores files that match and are of the same size, but leaves these in the source area. Defaults to off. |
skip_and_clean_up |
Ignored files that match and are of the same size AND removes these in the source area. Defaults to off. |
Use at the command line
The ingester script can be run at the command line using two methods to associate it with the config file.
Using the -s and -c options at the command line for the stream name and config file.
$ ingester -c software/datasets/testdata/test.conf -s test-stream3
Or set the INGEST_CONF environment variable:
$ export INGEST_CONF=software/datasets/testdata/test.conf:test-stream3 $ ingester
Command line Options
-v | Verbose output |
-t | Trial run. Don't deposit or delete source files. |
-n N | Stop after processing N files for ingest. |
--max_fails N | Stop after N unsuccessful ingests. |
-f | Force overwrite of existing files in archive. |
-l FILE | Listing file used as sources. |
--skip | Skip files if the same size. |
Usage examples
Ingester as recursive deposit
If you are going to keep the directory structure as it appears in the processing area.
[recursive_ingest] script: ingester -v --skip path_template: /badc/dataset1/data/%(path)s regex: /datacentre/processing3/dataset1/(?P<path>.*)$ arrivals_dirs: /datacentre/processing3/dataset1 deleterChoice: notArrivals
Multi-threaded deposit
If you have a big data set.
[fstone_big_ingest] script: ingester -v --skip dirtemplate: /badc/flintstone/data/%(load_num)s regex: flintstone_data/yabber_(?P<load_num>\d+)_\d+\.dat$ arrivals_users: fred barny deleterChoice: arrivals nthreads: 5
Deposit from many sources
The arrivals_users options are added to the arrivals_dirs option, so you can use both together to pickup from all over the place.
[avenger_team] script: ingester path_template: /badc/Avengers/data/%(film)s/%(scene)s regex: commentlog\.(?P<film>)\.(?P<scene>)\.txt$ arrivals_users: hulk ironman cap_marvel thor antman wasp black_panther strange vision banner arrivals_dirs: /datacentre/processing3/avengers/hulk_smashed /datacentre/processing3/avengers/thanos deleterChoice: keep nthreads: 5
Using the ingester as the basis for your own ingest script
If you need more information than is just contained in the source filename and path, but your workflow follows the pattern of the ingester, it is easy to write your script using the ingester libraries. The example below demonstrates how this can be used.
The first thing to do is to import the Ingester class from the ingest_lib library and the standard re regular expression library. We are going to add some extra functionality to the base Ingester class and run it in the same way as the ingester script.
from ingest_lib import Ingester import re class DataXIngester(Ingester): pass i = DataXIngester() i.ingest()
This would work exactly like the ingester. Now let's add some extra functionality. We overload the archive_dir method so that it adds product_type to the other information found in the regex matching. The archive_dir method should return the archive directory name or None if the filename does not match.
from ingest_lib import Ingester import re class DataXIngester(Ingester): def archive_dir(self, filename): found = re.search(self.regex, filename) if not found: return None found_dict = found.groupdict() # set plots or data product_type = {'jpg': 'plots', 'png': 'plots', 'nc': 'data', 'na': 'data', 'tar': 'data'} found_dict['product_type'] = product_type[found_dict['ext']] completed_template = self.dirtemplate % found_dict return completed_template def main(): i = DataXIngester() i.ingest() if __name__ == "__main__": main()
If the config file has these options.
dirtemplate: /badc/datasetX/%(product_type)s/%(year)s regex: datax_(?P<year>\d{4})\d{4}\.(?P<ext>)
Data files will now be filed like so:
datax_20180103.nc -> /badc/datasetX/data/2018
datax_20170103.nc -> /badc/datasetX/data/2017
datax_20180103.png -> /badc/datasetX/plots/2018