diff --git a/README.rst b/README.rst index e10072e2..e881504e 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,13 @@ -PyCA – Opencast Capture Agent -============================= +fork with following changes: + +- capture.py will not start capturing if connection to opencast endpoint is not possible. The original function service() will endless stay in a while-loop with 5sec sleep until endpoint is connected. Events in the database will not start recording. To change this, the already installed flag 'force_update' is used. The while-loop will only wait and loop if force_update=True and return immediately if force_update=False. force_update is passed through the calling functions register_ca(), recording_state(), set_service_status_immediate(), update_agent_state() +- Inputs are now possible. The Definition in pyca.conf is extended with an item inputs +- register_ca() is extended by the registration of the input configuration +- Ingest only uploads the selected tracks from schedule events + + + + .. image:: https://github.com/opencast/pyCA/workflows/Test%20pyCA/badge.svg?branch=master :target: https://github.com/opencast/pyCA/actions?query=workflow%3A%22Test+pyCA%22+branch%3Amaster @@ -11,6 +19,8 @@ PyCA – Opencast Capture Agent :target: https://github.com/opencast/pyCA/blob/master/license.lgpl :alt: LGPL-3 license +PyCA – Opencast Capture Agent +============================= **PyCA** is a fully functional Opencast_ capture agent written in Python. It is free software licensed under the terms of the `GNU Lesser General Public License`_. diff --git a/etc/pyca.conf b/etc/pyca.conf index 409f4dd1..eb5068f7 100644 --- a/etc/pyca.conf +++ b/etc/pyca.conf @@ -40,6 +40,12 @@ # Default: sqlite:///pyca.db #database = sqlite:///pyca.db +# Name of Inputs +# If inputs='' or selected inputs in event attachment='' all tracks are uploaded +# Type: list of strings (write as '...', '...') +# default: inputs = '' +# inputs = 'HDMI', 'presenter', 'black board' + [capture] @@ -78,8 +84,9 @@ directory = './recordings' # Default: ffmpeg -nostats -re -f lavfi -r 25 -i testsrc -t {{time}} {{dir}}/{{name}}.webm' command = 'ffmpeg -nostats -re -f lavfi -r 25 -i testsrc -f lavfi -i sine -t {{time}} {{dir}}/{{name}}.webm' -# Flavors of output files produced by the capture command. One flavors should -# be specified for every output file. +# Flavors of output files produced by the capture command. One flavors must +# be specified for every output file an Input. Flavor-names must be unique, number +# of flavors = number of inputs = number of output files # Type: list of strings (write as '...', '...') # Default: 'presenter/source' #flavors = 'presenter/source' diff --git a/pyca/config.py b/pyca/config.py index a6668850..6862dc7a 100644 --- a/pyca/config.py +++ b/pyca/config.py @@ -20,6 +20,7 @@ cal_lookahead = integer(min=0, default=14) backup_mode = boolean(default=false) database = string(default='sqlite:///pyca.db') +inputs = force_list(default=list('')) [capture] directory = string(default='./recordings') diff --git a/pyca/ingest.py b/pyca/ingest.py index 5e984062..5f8b6714 100644 --- a/pyca/ingest.py +++ b/pyca/ingest.py @@ -23,6 +23,61 @@ notify = sdnotify.SystemdNotifier() +def get_input_params(event): + '''Extract the input configuration parameters from the properties attached + to the schedule-entry + ''' + + inputs = [] + for attachment in event.get_data().get('attach'): + data = attachment.get('data') + if (attachment.get('x-apple-filename') == + 'org.opencastproject.capture.agent.properties'): + for prop in data.split('\n'): + if prop.startswith('capture.device.names'): + param = prop.split('=', 1) + inputs = param[1].split(',') + break + return inputs + + +def trackinput_selected(event, flavor, track): + ''' check if input corresponding to flavor is selected in + schedule-attachment parameter 'capture.device.names' + returns True if input is selected or if capture.device.names='' + ''' + + # inputs from pyca.conf + inputs_conf = config('agent', 'inputs') + + # if no inputs defined, return True -> add all tracks to mediapackage + if (inputs_conf == ['']): + logger.info('No inputs in config defined') + return True + + # flavors from pyca.conf + flavors_conf = config('capture', 'flavors') + + # inputs in event attachment + inputs_event = get_input_params(event) + + # if no inputs in attachment, return True -> add all tracks to mediapackage + if (inputs_event == ['']): + logger.info('No inputs in schedule') + # print('No inputs in event attachment') + return True + + # Input corresponding to track-flavor from pyca.conf + input_track = inputs_conf[flavors_conf.index(flavor)] + + if input_track in inputs_event: + # Input corresponding to flavor is selected in attachment + return True + + # Input corresponding to flavor is not selected in attachment + return False + + def get_config_params(properties): '''Extract the set of configuration parameters from the properties attached to the schedule @@ -45,7 +100,7 @@ def ingest(event): # Update status set_service_status(Service.INGEST, ServiceStatus.BUSY) notify.notify('STATUS=Uploading') - recording_state(event.uid, 'uploading') + recording_state(event.uid, 'uploading', force_update=True) update_event_status(event, Status.UPLOADING) # Select ingest service @@ -86,11 +141,14 @@ def ingest(event): # add track for (flavor, track) in event.get_tracks(): - logger.info('Adding track (%s -> %s)', flavor, track) - track = track.encode('ascii', 'ignore') - fields = [('mediaPackage', mediapackage), ('flavor', flavor), - ('BODY1', (pycurl.FORM_FILE, track))] - mediapackage = http_request(service_url + '/addTrack', fields) + if trackinput_selected(event, flavor, track): + logger.info('Adding track (%s -> %s)', flavor, track) + track = track.encode('ascii', 'ignore') + fields = [('mediaPackage', mediapackage), ('flavor', flavor), + ('BODY1', (pycurl.FORM_FILE, track))] + mediapackage = http_request(service_url + '/addTrack', fields) + else: + logger.info('Ignoring track (%s -> %s)', flavor, track) # ingest logger.info('Ingest recording') @@ -125,7 +183,7 @@ def safe_start_ingest(event): except Exception: logger.exception('Something went wrong during the upload') # Update state if something went wrong - recording_state(event.uid, 'upload_error') + recording_state(event.uid, 'upload_error', force_update=True) update_event_status(event, Status.FAILED_UPLOADING) set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE) @@ -134,7 +192,8 @@ def control_loop(): '''Main loop of the capture agent, retrieving and checking the schedule as well as starting the capture process if necessry. ''' - set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE) + set_service_status_immediate(Service.INGEST, ServiceStatus.IDLE, + force_update=True) notify.notify('READY=1') notify.notify('STATUS=Running') while not terminate(): diff --git a/pyca/schedule.py b/pyca/schedule.py index d85dca3a..d5bbcdce 100644 --- a/pyca/schedule.py +++ b/pyca/schedule.py @@ -66,7 +66,12 @@ def get_schedule(db): lookahead = config('agent', 'cal_lookahead') * 24 * 60 * 60 if lookahead: params['cutoff'] = str((timestamp() + lookahead) * 1000) - uri = '%s/calendars?%s' % (service('scheduler')[0], + + service_endpoint = service('scheduler', force_update=True) + if not service_endpoint: + logger.warning('Missing endpoint for updating schedule.') + return + uri = '%s/calendars?%s' % (service_endpoint[0], urlencode(params)) try: vcal = http_request(uri) diff --git a/pyca/utils.py b/pyca/utils.py index 195fed1f..1add795d 100644 --- a/pyca/utils.py +++ b/pyca/utils.py @@ -129,9 +129,14 @@ def service(service_name, force_update=False): service_name, config('services', service_id)) except pycurl.error: - logger.exception('Could not get %s endpoint. Retry in 5s', - service_name) - time.sleep(5.0) + if force_update: + logger.exception('Could not get %s endpoint. Retry in 5s', + service_name) + time.sleep(5.0) + else: + logger.warning('Could not get %s endpoint. Ignoring', + service_name) + break return config('services', service_id) @@ -140,7 +145,7 @@ def ensurelist(x): return x if type(x) == list else [x] -def register_ca(status='idle'): +def register_ca(status='idle', force_update=False): '''Register this capture agent at the Matterhorn admin server so that it shows up in the admin interface. @@ -151,7 +156,7 @@ def register_ca(status='idle'): # here. We will just run silently in the background: if config('agent', 'backup_mode'): return - service_endpoint = service('capture.admin') + service_endpoint = service('capture.admin', force_update) if not service_endpoint: logger.warning('Missing endpoint for updating agent status.') return @@ -165,8 +170,20 @@ def register_ca(status='idle'): except pycurl.error as e: logger.warning('Could not set agent state to %s: %s', status, e) + # register_configuration + url += '/configuration' + inputstring = ",".join(config('agent', 'inputs')) + params = [('configuration', + '{\'capture.device.names\': \'' + inputstring + '\' }')] + try: + response = http_request(url, params).decode('utf-8') + if response: + logger.info(response) + except pycurl.error as e: + logger.warning('Could not set configuration: %s', e) -def recording_state(recording_id, status): + +def recording_state(recording_id, status, force_update=False): '''Send the state of the current recording to the Matterhorn core. :param recording_id: ID of the current recording @@ -177,9 +194,14 @@ def recording_state(recording_id, status): # in the background: if config('agent', 'backup_mode'): return + service_endpoint = service('capture.admin', force_update) + # check if service_endpoint is availible, otherwise service()[0] + # is not defined + if not service_endpoint: + logger.warning('Missing endpoint for updating agent status.') + return params = [('state', status)] - url = service('capture.admin')[0] - url += f'/recordings/{recording_id}' + url = f'{service_endpoint[0]}/recordings/{recording_id}' try: result = http_request(url, params).decode('utf-8') logger.info(result) @@ -209,12 +231,12 @@ def set_service_status(dbs, service, status): dbs.commit() -def set_service_status_immediate(service, status): +def set_service_status_immediate(service, status, force_update=False): '''Update the status of a particular service in the database and send an immediate signal to Opencast. ''' set_service_status(service, status) - update_agent_state() + update_agent_state(force_update) @db.with_session @@ -229,7 +251,7 @@ def get_service_status(dbs, service): return db.ServiceStatus.STOPPED -def update_agent_state(): +def update_agent_state(force_update=False): '''Update the current agent state in opencast. ''' status = 'idle' @@ -242,7 +264,7 @@ def update_agent_state(): elif get_service_status(db.Service.INGEST) == db.ServiceStatus.BUSY: status = 'uploading' - register_ca(status=status) + register_ca(status=status, force_update=force_update) def terminate(shutdown=None):