6Список использованных источников
Промежуточное программное обеспечение gLite: http://glite.web.cern.ch/glite
Проект EGEE: http://www.eu-egee.org
Проект RDIG: http://egee-rdig.ru
Foster и др. "Open Grid Services Architecture (OGSA) v1.0", http://www.gridforum.org/documents/GFD.30.pdf
Менеджер ресурсов хранения данных (SRM): http://sdm.lbl.gov/srm-wg
Схема GLUE: http://glueschema.forge.cnaf.infn.it, http://forge.ogf.org/sf/projects/glue-wg
Язык описания заданий (JDL): Job Description Language HowTo, DataGrid-01-TEN-0102-0_2 , http://server11.infn.it/workload-grid/docs/DataGrid-01-TEN-0102-0_2-Document.doc
JDL Attributes Specification, EGEE-JRA1-TEC-555796-JDL-Attributes-v0-6, https://edms.cern.ch/file/555796/1
GT 4.0 Pre WS GRAM: Developer Guide, http://www.globus.org/toolkit/docs/4.0/execution/prewsgram/developer-index.html
Проект Globus: http://www.globus.org
Программа sudo: http://www.sudo.ws/sudo
Язык UML: http://www.uml.org
Протокол FTP: http://rfc.net/rfc959.html; http://rfc.net/rfc1579.html
FTP-сервер Pure-FTPd: http://www.pureftpd.org
7Приложение 1. GLUE-схема для описания вычислительных грид-ресурсов
Рис. 12 Схема GLUE версии 1.3 [6] (на языке UML [11])
8Приложение 2. Обработчик заданий Вычислительного элемента LCG-CE ППО gLite
В этом приложении приведен программный код обычного обработчика заданий (для СУПЗ OpenPBS) - без возможности предоставления сред исполнения по запросам пользователей. В рамках системы запуска заданий, подготовленных для исполнения в различных средах исполнения (СЗЗ-РСИ) этот обработчик будет заменен на службу предоставления сред исполнения (СПСИ). Поскольку СПСИ должна быть совместима с ППО gLite, программный код обработчика может служить исходным примером для написания новой грид-службы (СПСИ).
Программный код обработчика заданий (для СУПЗ OpenPBS).
use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::Core::Paths;
use IO::File;
use Config;
use POSIX;
package Globus::GRAM::JobManager::pbs;
@ISA = qw(Globus::GRAM::JobManager);
my ($mpirun, $qsub, $qstat, $qdel, $cluster, $cpu_per_node, $remote_shell);
BEGIN
{
$mpirun = '@MPIRUN@';
$qsub = '@QSUB@';
$qstat = '@QSTAT@';
$qdel = '@QDEL@';
$cluster = @CLUSTER@;
$cpu_per_node = @CPU_PER_NODE@;
$remote_shell = '@REMOTE_SHELL@';
}
sub submit
{
my $self = shift;
my $description = $self->{JobDescription};
my $tag = $description->cache_tag() or $ENV{GLOBUS_GRAM_JOB_CONTACT};
my $status;
my $pbs_job_script;
my $pbs_job_script_name;
my $errfile = "";
my $job_id;
my $rsh_env;
my $script_url;
my @arguments;
my $email_when = "";
my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
my %library_vars;
$self->log("Entering pbs submit");
# check jobtype
if(defined($description->jobtype()))
{
if($description->jobtype !~ /^(mpi|single|multiple)$/)
{
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
}
}
if( $description->directory eq "")
{
return Globus::GRAM::Error::RSL_DIRECTORY();
}
chdir $description->directory() or
return Globus::GRAM::Error::BAD_DIRECTORY();
if( $description->executable eq "")
{
return Globus::GRAM::Error::RSL_EXECUTABLE();
}
elsif(! -f $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
}
elsif(! -x $description->executable())
{
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
}
elsif( $description->stdin() eq "")
{
return Globus::GRAM::Error::RSL_STDIN;
}
elsif(! -r $description->stdin())
{
return Globus::GRAM::Error::STDIN_NOT_FOUND();
}
$self->log("Determining job max time cpu from job description");
if(defined($description->max_cpu_time()))
{
$cpu_time = $description->max_cpu_time();
$self->log(" using maxcputime of $cpu_time");
}
elsif(! $cluster && defined($description->max_time()))
{
$cpu_time = $description->max_time();
$self->log(" using maxtime of $cpu_time");
}
else
{
$cpu_time = 0;
$self->log(' using queue default');
}
$self->log("Determining job max wall time limit from job description");
if(defined($description->max_wall_time()))
{
$wall_time = $description->max_wall_time();
$self->log(" using maxwalltime of $wall_time");
}
elsif($cluster && defined($description->max_time()))
{
$wall_time = $description->max_time();
$self->log(" using maxtime of $wall_time");
}
else
{
$wall_time = 0;
$self->log(' using queue default');
}
$self->log('Building job script');
$script_url = "$tag/pbs_job_script.$$";
system("$cache_pgm -add -t $tag -n $script_url file:/dev/null");
$pbs_job_script_name = `$cache_pgm -query -t $tag $script_url`;
chomp($pbs_job_script_name);
if($pbs_job_script_name eq "")
{
return Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED();
}
$pbs_job_script = new IO::File($pbs_job_script_name, '>');
$pbs_job_script->print(<
#! /bin/sh
# PBS batch job script built by Globus job manager
#
#PBS -S /bin/sh
EOF
if($description->email_address() ne '')
{
$pbs_job_script->print("#PBS -M " . $description->email_address() . "\n");
}
if($description->emailonabort() eq 'yes')
{
$email_when .= 'a';
}
if($description->emailonexecution() eq 'yes')
{
$email_when .= 'b';
}
if($description->emailontermination() eq 'yes')
{
$email_when .= 'e';
}
if($email_when eq '')
{
$email_when = 'n';
}
$pbs_job_script->print("#PBS -m $email_when\n");
if($description->queue() ne '')
{
$pbs_job_script->print("#PBS -q ". $description->queue() . "\n");
}
if($description->project() ne '')
{
$pbs_job_script->print("#PBS -A " . $description->project() . "\n");
}
if($cpu_time != 0)
{
if($description->jobtype() eq 'multiple')
{
$total_cpu_time = $cpu_time * $description->count();
}
else
{
$total_cpu_time = $cpu_time;
}
$pbs_job_script->print("#PBS -l pcput=${cpu_time}:00\n");
$pbs_job_script->print("#PBS -l cput=${total_cpu_time}:00\n");
}
if($wall_time != 0)
{
$pbs_job_script->print("#PBS -l walltime=${wall_time}:00\n");
}
if($description->max_memory() != 0)
{
if($description->jobtype() eq 'multiple')
{
$max_memory = $description->max_memory() * $description->count;
}
else
{
$max_memory = $description->max_memory();
}
$pbs_job_script->print("#PBS -l mem=${max_memory}mb\n");
}
$pbs_job_script->print("#PBS -o " . $description->stdout() . "\n");
$pbs_job_script->print("#PBS -e " . $description->stderr() . "\n");
if($description->host_count() != 0)
{
$pbs_job_script->print("#PBS -l nodes=" .
$description->host_count().
"\n");
}
elsif($cluster && $cpu_per_node != 0)
{
$pbs_job_script->print("#PBS -l nodes=" .
POSIX::ceil($description->count /
$cpu_per_node).
"\n");
}
$rsh_env = "";
$library_vars{LD_LIBRARY_PATH} = 0;
if($Config{osname} eq 'irix')
{
$library_vars{LD_LIBRARYN32_PATH} = 0;
$library_vars{LD_LIBRARY64_PATH} = 0;
}
foreach my $tuple ($description->environment())
{
if(!ref($tuple) || scalar(@$tuple) != 2)
{
return Globus::GRAM::Error::RSL_ENVIRONMENT();
}
if(exists($library_vars{$tuple->[0]}))
{
$tuple->[1] .= ":$library_string";
$library_vars{$tuple->[0]} = 1;
}
push(@new_env, $tuple->[0] . "=" . '"' . $tuple->[1] . '"');
$tuple->[0] =~ s/\\/\\\\/g;
$tuple->[0] =~ s/\$/\\\$/g;
$tuple->[0] =~ s/"/\\\"/g;
$tuple->[0] =~ s/`/\\\`/g;
$tuple->[1] =~ s/\\/\\\\/g;
$tuple->[1] =~ s/\$/\\\$/g;
$tuple->[1] =~ s/"/\\\"/g;
$tuple->[1] =~ s/`/\\\`/g;
$rsh_env .= $tuple->[0] . "=" . '"' . $tuple->[1] . '"' . ";\n"
. "export " . $tuple->[0] . ";\n";
}
foreach (keys %library_vars)
{
if($library_vars{$_} == 0)
{
push(@new_env, $_ . "=" . $library_path);
$rsh_env .= "$_=$library_path;\n"
. "export $_;\n";
}
}
$pbs_job_script->print("#PBS -v " . join(',', @new_env));
$pbs_job_script->print("\n#Change to directory requested by user\n");
$pbs_job_script->print('cd ' . $description->directory() . "\n");
@arguments = $description->arguments();
foreach(@arguments)
{
if(ref($_))
{
return Globus::GRAM::Error::RSL_ARGUMENTS;
}
}
if($arguments[0])
{
foreach(@arguments)
{
$self->log("Transforming argument \"$_\"\n");
$_ =~ s/\\/\\\\/g;
$_ =~ s/\$/\\\$/g;
$_ =~ s/"/\\\"/g;
$_ =~ s/`/\\\`/g;
$self->log("Transformed to \"$_\"\n");
$args .= '"' . $_ . '" ';
}
}
else
{
$args = '';
}
if($description->jobtype() eq "mpi")
{
$pbs_job_script->print("$mpirun -np " . $description->count() . " ");
if($cluster)
{
$pbs_job_script->print(" -machinefile \$PBS_NODEFILE ");
}
$pbs_job_script->print($description->executable()
. " $args < "
. $description->stdin() . "\n");
}
elsif($description->jobtype() eq 'multiple' && !$cluster)
{
for(my $i = 0; $i < $description->count(); $i++)
{
$pbs_job_script->print($description->executable() . " $args <" .
$description->stdin() . "&\n");
}
$pbs_job_script->print("wait\n");
}
elsif($description->jobtype() eq 'multiple')
{
my $count = $description->count;
my $cmd_script_url ;
my $cmd_script_name ;
my $cmd_script ;
my $stdin = $description->stdin();
$cmd_script_url = "$tag/pbs_cmd_script.$$";
system("$cache_pgm -add -t $tag -n $cmd_script_url file:/dev/null");
$cmd_script_name = `$cache_pgm -q -t $tag $cmd_script_url`;
chomp($cmd_script_name);
if($cmd_script_name eq "")
{
return Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED();
}
$cmd_script = new IO::File($cmd_script_name, '>');
$cmd_script->print("#!/bin/sh\n");
$cmd_script->print('cd ' . $description->directory() . "\n");
$cmd_script->print("$rsh_env\n");
$cmd_script->print($description->executable() . " $args\n");
$cmd_script->close();
$pbs_job_script->print(<
hosts=\`cat \$PBS_NODEFILE\`;
counter=0
while test \$counter -lt $count; do
for host in \$hosts; do
if test \$counter -lt $count; then
$remote_shell \$host "/bin/sh $cmd_script_name" < $stdin &
counter=\`expr \$counter + 1\`
else
break
fi
done
done
wait
EOF
}
else
{
$pbs_job_script->print($description->executable() . " $args <" .
$description->stdin() . "\n");
}
$pbs_job_script->close();
if($description->logfile() ne "")
{
$errfile = "2>>" . $description->logfile();
}
chomp($job_id = `$qsub < $pbs_job_script_name $errfile`);
if($? == 0)
{
#system("$cache_pgm -cleanup-url $script_url");
return {JOB_ID => $job_id,
JOB_STATE => Globus::GRAM::JobState::PENDING };
}
#system("$cache_pgm -cleanup-url $tag/pbs_job_script.$$");
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}
sub poll
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();
my $state;
my $status_line;
my $exit_code;
$self->log("polling job $job_id");
# Get first line matching job id
$_ = (grep(/job_state/, `$qstat -f $job_id 2>/dev/null`))[0];
# get the exit code of the qstat command. for info search $CHILD_ERROR
# in perlvar documentation.
$exit_code = $? >> 8;
# return code 153 = "Unknown Job Id".
# verifying that the job is no longer there.
if($exit_code == 153)
{
$self->log("qstat rc is 153 == Unknown Job ID == DONE");
$state = Globus::GRAM::JobState::DONE;
}
else
{
# Get 3rd field (after = )
$_ = (split(/\s+/))[3];
if(/Q|W|T/)
{
$state = Globus::GRAM::JobState::PENDING;
}
elsif(/S|H/)
{
$state = Globus::GRAM::JobState::SUSPENDED
}
elsif(/R|E/)
{
$state = Globus::GRAM::JobState::ACTIVE;
}
else
{
# This else is reached by an unknown response from pbs.
# It could be that PBS was temporarily unavailable, but that it
# can recover and the submitted job is fine.
# So, we want the JM to ignore this poll and keep the same state
# as the previous state. Returning an empty hash below will tell
# the JM to ignore the respose.
$self->log("qstat returned an unknown response. Telling JM to ignore this poll");
return {};
}
}
return {JOB_STATE => $state};
}
sub cancel
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();
$self->log("cancel job $job_id");
system("$qdel $job_id >/dev/null 2>/dev/null");
if($? == 0)
{
return { JOB_STATE => Globus::GRAM::JobState::FAILED }
}
return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}
1;
|