Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/core/components/impl/socketappserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def _process_application(self, conn, addr):

dn = ''
for rdn in user_cert_data['subject']:
dn += '/' + '+'.join('%s=%s' % (DN_TRANSLATION[key], value) for key, value in rdn if key in DN_TRANSLATION)
dn += '/' + '+'.join('%s=%s' % (DN_TRANSLATION[key], value) for key, value in rdn)

user_info = master.identify_user(dn = dn, check_trunc = True)

Expand Down
19 changes: 12 additions & 7 deletions lib/dataformat/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class Site(object):
"""Represents a site. Owns lists of dataset and block replicas, which are organized into partitions."""

__slots__ = ['_name', 'id', 'host', 'storage_type', 'backend', 'status', 'filename_mapping',
'_dataset_replicas', 'partitions']
'_dataset_replicas', 'partitions', 'x509proxy']

_storage_types = ['disk', 'mss', 'buffer', 'unknown']
TYPE_DISK, TYPE_MSS, TYPE_BUFFER, TYPE_UNKNOWN = range(1, len(_storage_types) + 1)
Expand Down Expand Up @@ -90,7 +90,7 @@ def map(self, lfn):
return None


def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', status = STAT_UNKNOWN, filename_mapping = {}, sid = 0):
def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', status = STAT_UNKNOWN, filename_mapping = {}, x509proxy = None, sid = 0):
self._name = name
self.host = host
self.storage_type = Site.storage_type_val(storage_type)
Expand All @@ -107,18 +107,21 @@ def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', stat

self.partitions = {} # {Partition: SitePartition}

self.x509proxy = x509proxy

def __str__(self):
return 'Site %s (host=%s, storage_type=%s, backend=%s, status=%s, id=%d)' % \
(self._name, self.host, Site.storage_type_name(self.storage_type), self.backend, Site.status_name(self.status), self.id)
return 'Site %s (host=%s, storage_type=%s, backend=%s, status=%s, x509=%s, id=%d)' % \
(self._name, self.host, Site.storage_type_name(self.storage_type), self.backend, Site.status_name(self.status), self.x509proxy, self.id)

def __repr__(self):
return 'Site(%s,%s,\'%s\',%s,\'%s\',%s,%d)' % \
(repr(self._name), repr(self.host), Site.storage_type_name(self.storage_type), repr(self.backend), Site.status_name(self.status), repr(self.filename_mapping), self.id)
return 'Site(%s,%s,\'%s\',%s,\'%s\',%s,%s,%d)' % \
(repr(self._name), repr(self.host), Site.storage_type_name(self.storage_type), repr(self.backend), Site.status_name(self.status), repr(self.filename_mapping), repr(self.x509proxy), self.id)

def __eq__(self, other):
return self is other or \
(self._name == other._name and self.host == other.host and self.storage_type == other.storage_type and \
self.backend == other.backend and self.status == other.status and self.filename_mapping == other.filename_mapping)
self.backend == other.backend and self.status == other.status and \
self.filename_mapping == other.filename_mapping and self.x509proxy == other.x509proxy)

def __ne__(self, other):
return not self.__eq__(other)
Expand All @@ -134,6 +137,8 @@ def copy(self, other):
for protocol, mapping in other.filename_mapping.iteritems():
self.filename_mapping[protocol] = Site.FileNameMapping(mapping._chains)

self.x509proxy = other.x509proxy

def embed_into(self, inventory, check = False):
updated = False

Expand Down
10 changes: 10 additions & 0 deletions lib/dealer/plugins/popularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import math

from dynamo.dataformat import Dataset
from dynamo.policy.variables import replica_variables
from dynamo.policy.condition import Condition
from base import BaseHandler, DealerRequest

LOG = logging.getLogger(__name__)
Expand All @@ -19,6 +21,10 @@ def __init__(self, config):
self.max_dataset_size = config.max_dataset_size * 1.e+12
self.max_replication = config.max_replication
self.request_to_replica_threshold = config.request_to_replica_threshold
try:
self.condition = Condition(config.condition, replica_variables)
except:
self.condition = None

self._datasets = []

Expand All @@ -36,6 +42,7 @@ def get_requests(self, inventory, policy): # override
LOG.debug('Dataset %s request weight %f', dataset.name, request_weight)

dataset_in_source_groups = False

for dr in dataset.replicas:
for br in dr.block_replicas:
if br.group.name in self.source_groups:
Expand All @@ -46,6 +53,9 @@ def get_requests(self, inventory, policy): # override
if not dataset_in_source_groups:
continue

if not self.condition.match(dataset):
continue

if request_weight <= 0.:
continue

Expand Down
4 changes: 3 additions & 1 deletion lib/detox/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class ReplicaCondition(Condition):
def __init__(self, text):
Condition.__init__(self, text, replica_variables)

def get_matching_blocks(self, replica):
"""If this is a block-level condition, return the list of matching block replicas."""

Expand All @@ -18,3 +18,5 @@ def get_matching_blocks(self, replica):
class SiteCondition(Condition):
def __init__(self, text):
Condition.__init__(self, text, site_variables)


3 changes: 3 additions & 0 deletions lib/detox/detoxpolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ def evaluate(self, replica):
if self.condition.match(replica):
self.has_match = True


if issubclass(self.decision.action_cls, BlockAction):
# block-level
matching_block_replicas = self.condition.get_matching_blocks(replica)

if len(matching_block_replicas) == len(replica.block_replicas):
# but all blocks matched - return dataset level
action = self.decision.action_cls.dataset_level(self)
Expand Down Expand Up @@ -246,3 +248,4 @@ def evaluate(self, replica):
replica.block_replicas.update(block_replicas_tmp)

return actions

35 changes: 0 additions & 35 deletions lib/fileop/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,6 @@ def histogram_binning(tmin,tmax):

return (nbins,dt)

#class OperationFilter:
# """
# Allows to generate a sql filter string to be applied to the query of historic file
# operations tables.
# """
# def __init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'):
# self.source_filter = source_filter
# self.mss_filter = mss_filter
# self.period = period
# self.upto = upto
# self.exit_code = exit_code
#
# def generate_filter_string(self):
#
# where_sql = ""
#
# return where_sql
#
#class DeletionFilter(OperationFilter):
# """
# Allows to generate a sql filter string to be applied to the query of historic file
# deletions tables.
# """
# def __init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'):
# OperationFilter.__init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0')
#
#class TransferFilter(OperationFilter):
# """
# Allows to generate a sql filter string to be applied to the query of historic file
# transfer tables.
# """
# def __init__(self,source_filter="",destination_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'):
# OperationFilter.__init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0')
# self.destintation_filter = destination_filter

class Sites:
"""
Defines the sites.
Expand Down
40 changes: 37 additions & 3 deletions lib/fileop/impl/fts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, config):

# Proxy to be forwarded to FTS
self.x509proxy = config.get('x509proxy', None)
self.x509proxy_orig = config.get('x509proxy', None)

# Bookkeeping device
self.db = MySQL(config.db_params)
Expand All @@ -67,7 +68,15 @@ def num_pending_transfers(self): #override
file_states = ['SUBMITTED', 'READY', 'ACTIVE', 'STAGING', 'STARTED']

jobs = self._ftscall('list_jobs', state_in = ['SUBMITTED', 'ACTIVE', 'STAGING'])
from random import shuffle
shuffle(jobs)

total_count = 0
for job in jobs:
total_count = total_count + 1
# if total_count > 50:
# LOG.info('-- in fts: total_count > ' + str(total_count))
# break
job_info = self._ftscall('get_job_status', job['job_id'], list_files = True)
for file_info in job_info['files']:
if file_info['file_state'] in file_states:
Expand All @@ -87,7 +96,15 @@ def num_pending_deletions(self): #override
file_states = ['SUBMITTED', 'READY', 'ACTIVE']

jobs = self._ftscall('list_jobs', state_in = ['SUBMITTED', 'ACTIVE'])
from random import shuffle
shuffle(jobs)

total_count = 0
for job in jobs:
total_count = total_count + 1
# if total_count > 1:
# LOG.info('-- in fts: total_count > ' + str(total_count))
# break
job_info = self._ftscall('get_job_status', job['job_id'], list_files = True)
for file_info in job_info['dm']:
if file_info['file_state'] in file_states:
Expand Down Expand Up @@ -127,6 +144,20 @@ def start_transfers(self, batch_id, batch_tasks): #override
dest_pfn = sub.destination.to_pfn(lfn, 'gfal2')
source_pfn = task.source.to_pfn(lfn, 'gfal2')

self.x509proxy = self.x509proxy_orig
#if sub.destination.x509proxy is not None:
self.x509proxy = sub.destination.x509proxy

if task.source.storage_type == Site.TYPE_MSS:
self.x509proxy = task.source.x509proxy

#LOG.info("CCCCCCCCCCC")
#LOG.info("File is: %s" % sub.file.lfn)
#LOG.info("Destination is: %s" % sub.destination.name)
#LOG.info("Source is: %s" % task.source.name)
#LOG.info("x509 is: %s" % self.x509proxy)
#LOG.info("CCCCCCCCCCC")

if dest_pfn is None or source_pfn is None:
# either gfal2 is not supported or lfn could not be mapped
LOG.warning('Could not obtain PFN for %s at %s or %s', lfn, sub.destination.name, task.source.name)
Expand Down Expand Up @@ -173,7 +204,8 @@ def start_transfers(self, batch_id, batch_tasks): #override

if len(transfers) != 0:
LOG.debug('Submit new transfer job for %d files', len(transfers))
job = fts3.new_job(transfers, retry = self.fts_retry, overwrite = True, verify_checksum = verify_checksum, metadata = self.metadata_string)
job = fts3.new_job(transfers, retry = self.fts_retry, overwrite = True,
verify_checksum = verify_checksum, metadata = self.metadata_string)
success = self._submit_job(job, 'transfer', batch_id, dict((pfn, task.id) for pfn, task in t_pfn_to_task.iteritems()))

for transfer in transfers:
Expand Down Expand Up @@ -317,6 +349,7 @@ def _ftscallurl(self, url):
return self._do_ftscall(url = url)

def _do_ftscall(self, binding = None, url = None):

if self._context is None:
# request_class = Request -> use "requests"-based https call (instead of default PyCURL,
# which may not be able to handle proxy certificates depending on the cURL installation)
Expand All @@ -337,10 +370,12 @@ def _do_ftscall(self, binding = None, url = None):
LOG.debug('FTS: %s', reqstring)

wait_time = 1.

for attempt in xrange(10):
try:
if binding is not None:
method, args, kwd = binding

return getattr(fts3, method)(context, *args, **kwd)
else:
return json.loads(context.get(url))
Expand Down Expand Up @@ -368,8 +403,6 @@ def _submit_job(self, job, optype, batch_id, pfn_to_tid):
LOG.error('Failed to submit %s to FTS: Exception %s (%s)', optype, exc_type.__name__, str(exc))
return False

LOG.debug('FTS job id: %s', job_id)

# list of file-level operations (one-to-one with pfn)
try:
if optype == 'transfer' or optype == 'staging':
Expand Down Expand Up @@ -456,6 +489,7 @@ def _get_status(self, batch_id, optype):
result = self._ftscall('get_job_status', job_id = job_id, list_files = True)
except:
LOG.error('Failed to get job status for FTS job %s', job_id)
LOG.error(optype)
continue

if optype == 'transfer' or optype == 'staging':
Expand Down
Loading