fileProcessor.py

Original trac page

Introduction

The fileProcessor script has been designed to complement the unpacker and ingester and takes its inspiration from how fromDeliveries was written. Its aim to to provide a generic tool that can be used as part of an ingest stream on the ingest system. It has been designed to:

  1. perform renaming, gzipping and tarring functions according to settings held in a configuration file
  2. migrate data that have passed a quarantine period into an "ingest" area
  3. give options for handling the source files - whether to delete or not
  4. utilises the arrivals library to set up the source area as needed
  5. utilises the processLib for common functions such as renaming and removal of source files
  6. tracks files all the way through the process to ensure completion before removal of the source file is attempted (if a step fails it is removed from the delete list)
  7. Ability to call an external, bespoke library to carry out more complex renaming tasks

Although designed primarily with the /datacentre/processing area in mind, in theory this could be used elsewhere on the system.

Where source is stored

Source code is stored in the CEDA svn repository here : http://proj.badc.rl.ac.uk/badc/browser/ceda_software/fileProcessor/trunk .

Files needed

The fileProcessor is driven by settings within a configuration file. How entries in this config file should be set out is discussed below, followed by examples.

how to use

One off instances can be run as this:

python /usr/local/ingest_software/fileProcessor/fileProcessor.py -c <configfile> -s <stream> [-d|-v]

where :

-c <configfile> is the path to the configuration file that you have set up your option in for the processing  -s <stream> is the part of the config file that you want the options to be read from -d is the dry run - printing what would happen to the screen and not actually doing the action.  -v is the "verbose" flag which returns lots of logging of what is going on

Where does it work?

The fileProcessor can cope with files at any source area available on Ingest1 and can place the output of the operation into any writeable area too. Primarily, though, it has been designed to source files from the arrivals area (/datacente/arrivals/users/<userID>/<streamId>/) and output files into a "quarantine" area typically under /datacentre/processing/<dataset>/quarantineDir/<stream>. As a last step the script will migrate data from the quarantine area to an area ready for ingesting from with the ingester script, typically this should be somewhere like /datacentre/processing/<dataset>/readyToIngest/<stream>/ .

Config files

Within the config file the configuration settings referred to are delineated from other settings for other runs by a section name given in square brackets - these are the "streams" within a

Below are the generic options that are required for ALL instances of the processor, followed by particular sections detailing what is required for renaming, gzipping and tarring options.

For an explanation of how these map to the process to help you work out what you need to put in there see this diagram.

Generic config settings

[stream-name]
owner: <insert your username here - this is important to help those looking after the system work out who is running jobs>
description: <a short description detailing the job and what it does>
# standard bits for most config files:
script: <command line call for the job, including optional entries>
lockfile: <path and name to a lockfile - standard practice is to pop these under /home/badc/lockfiles/>
lock: <flag for the ingest_control system to stop other instances happening or not>
when: <scheduler times in standard crontab format - e.g. minutes hours day-of-month month year to run the script. Used to schedule recurring tasks under ingest_control>
timeout: <number of hours the script is permitted to continue running for before being terminated - the default is 12>
notify_ok: <space separated list of email addresses to email if the jobs runs ok>
notify_warning: <space separated list of email addresses to email if there are warning messages issued> 
notify_fail: <space separated list of email addresses to email if the job fails>
# end of scheduler details
order: <comma separated list of what functions to do and in which order: reName,zip,tar>
arrivals_users: <either give a space separated list of the users who will contribute to this data stream>
arrivals_dirs: <OR a space separated list of absolute paths to the source directories for the incoming data>
arrivals_wait: <how old the files should be in seconds before being considered for ingestion>
fileAge: <how old the files should be days before being considered for ingestion - note, will be retired in due course>
#if a call to an external library is needed to generate the destination path use the next two lines:
headerclass: <this is the absolute path to the external library to help generate the filename to which the source file will be renamed. Usually under /home/badc/software/datasets/<dataset>>
accessmethod: <name of the init class within the external library - usually leave this as: archive_path_class>
quarantineDir: <the area where the files are written to during the processing stages and where they will remain until they have passed the quarantine period>
fileNameTemplate: <if renmaing then this template is used to map contents from regex to new filenames, can be used in tandem with headerclass if more complex work is needed>
regex: <regular expresion to uniquely identify files to be processed and parts to use for constructing the new filename (see fileNameTemplate)>
archiveDir: <a template that is used to indicate where in the archive to check for existing tarballs which can be used to append contents to based on tarOptions>
tarTemplate: <output tar file file name template built up using parts identified from tarRegex>
tarRegex: <regular expression to identify files to be placed in tar file and ascertain tar file name components used in tarTemplate>
tarOutputRegex: <A regex to return items used in the archiveDir and tarTemplate to spot already existing tarballs in the processingDir, ingestDir or archiveDir areas to possibly append to, depending on the tarOptions setting> 
tarOptions: <option on how to process tar file - options are new|append newFile will ignore existing content in archive/ingest stream and create an entirely new file (which will subsquently replace any that exist down stream during ingest process), append will add new files to existing tar file if it exists and new files are newer versions than that already existing in the downstream tar file>
deleterChoice: <one of arrivals|notArrivals to delete data from /datacentre/arrivals/users/ or /datacentre/processing otherwise the files will be kept>
quarantineCheck: <the type of check to carry out for the quarantine period, options are: fileAge, filename - filename will apply a (?P<year>[0-9])(?P<month>?P<day> regex to the filename to try and get the yyyymmdd string to use, so use with care!)>
quarantinePeriod: <time in days that the file should have passed the quarantine check by before being moved to ingest>
ingestRegex: <used to identify files to test quarantine status and move to ingest area - this is a regular expression which will need to contain group labels to get yyyymmdd info from filenames if fileName is quarantine check selection>
ingestDir: <the directory to which files will be moved once they have passed the quarantine check, where they could be picked up by another script for processing - e.g. ingestion, or some other checking/processing>

Example

[ukmo-nimrod-composite]
owner: gparton
description: process incoming nimrod composite data (both UK and Europe)
script: python /usr/local/ingest_software/fileProcessor/fileProcessor.py -c /home/badc/software/datasets/ukmo-nimrod/ukmo-nimrod_fileProcessor.cfg -s ukmo-nimrod-composite
order: reName,zip,tar
mode: operational
when: 6,16,26,36,46,56 * * * *
#notify_ok: graham.parton@stfc.ac.uk
#notify_warning: graham.parton@stfc.ac.uk
notify_fail: graham.parton@stfc.ac.uk
timeout: 36
arrivals_users: dartmetoffice
arrivals_wait: 480
arrivals_maxfiles: 80000
fileAge:0
headerclass: /home/badc/software/datasets/ukmo-nimrod/nimrod_mapper
accessmethod: archive_path_class
archiveDir: /badc/ukmo-nimrod/data/composite/%(area)s-%(resolution)skm/%(year)s/
quarantineDir: /datacentre/processing/ukmo-nimrod/quarantine/ukmo-nimrod-composite/
lockfile: /home/badc/lockfiles/ukmo-nimrod-composite-process.lock
lock:yes
fileNameTemplate: metoffice-c-band-rain-radar_%(area)s_%(year)s%(month)s%(day)s%(hour)s%(minute)s_%(resolution)skm-composite.%(type)s
regex: (?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})(?P<hour>[0-9]{2})(?P<minute>[0-9]{2})_nimrod_(?P<areaDict>ng_radar|ps_area20)_rainrate_composite_(?P<resolution>1|5)km_(?P<
typeDict>UK_cutout_300X306_correct\.gif|EU_cutout_435X345_uk|EU\.gif|EU)(?!\.gz)(?!\.tmp)$
tarTemplate: metoffice-c-band-rain-radar_%(area)s_%(year)s%(month)s%(day)s_%(resolution)skm-composite.%(type)s.gz.tar
tarRegex: metoffice-c-band-rain-radar_(?P<area>uk|europe)_(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})([0-9]{4})_(?P<resolution>1|5)km-composite.(?P<type>dat|gif).gz
tarOutputRegex: metoffice-c-band-rain-radar_(?P<area>uk|europe)_(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})_(?P<resolution>1|5)km-composite.(?P<type>dat|gif)(\.gz\.tar)$
tarOptions: append
deleteOption: arrivals
quaratinePeriod:1
quarantineCheck: fileName
ingestDir: /datacentre/processing/ukmo-nimrod/readyToIngest/ukmo-nimrod-composite/
ingestRegex: metoffice-c-band-rain-radar_(?P<area>uk|europe)_(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})_(?P<resolution>1|5)km-composite.(?P<type>dat|gif)(\.gz\.tar)$

Did this answer your question? Thanks for the feedback There was a problem submitting your feedback. Please try again later.