diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/config/12id/S12-PILATUS1/dqconfig.ini b/config/12id/S12-PILATUS1/dqconfig.ini index 2a9759a..90d8469 100644 --- a/config/12id/S12-PILATUS1/dqconfig.ini +++ b/config/12id/S12-PILATUS1/dqconfig.ini @@ -3,10 +3,8 @@ 'time_zone' = 'America/Chicago' #real-time verifier -'feedback_type' = console, pv -'detector' = pilatus300 -'detector_basic' = cam1 -'detector_image' = image1 -'cnt_rate' = True +'feedback_type' = pv_12, a +'detector' = S12-PILATUS1 +'decor' = acq_time > S12-PILATUS1:cam1:AcquireTime, file_name > S12-PILATUS1:cam1:FullFileName_RBV +'callback_pv' = S12-PILATUS1:cam1:FullFileName_RBV 'no_frames' = -1 - diff --git a/config/12id/S12-PILATUS1/schemas/limits.json b/config/12id/S12-PILATUS1/schemas/limits.json index 1166452..0d96519 100644 --- a/config/12id/S12-PILATUS1/schemas/limits.json +++ b/config/12id/S12-PILATUS1/schemas/limits.json @@ -1,20 +1,20 @@ { - "data" : { - "sum" : { - "low_limit" : 100, - "high_limit" : 2300 - }, - "sat" : { - "low_limit" : 0, - "high_limit" : 2300 - }, - "frame_sat" : { - "low_limit" : 0, - "high_limit" : 4000 - }, - "rate_sat" : { - "low_limit" : 100, - "high_limit" : 2300 - } - } -} + "data": { + "frame_sat_cnt_rate": { + "high_limit": 2300 + }, + "sum": { + "high_limit": 100000, + "low_limit": -10000000 + }, + "point_sat_rate": { + "high_limit": 230000 + }, + "point_sat": { + "high_limit": 100000 + }, + "frame_sat_pts": { + "high_limit": 10000 + } + } +} \ No newline at end of file diff --git a/config/12id/S12-PILATUS1/schemas/quality_checks.json b/config/12id/S12-PILATUS1/schemas/quality_checks.json index c971465..b069777 100644 --- a/config/12id/S12-PILATUS1/schemas/quality_checks.json +++ b/config/12id/S12-PILATUS1/schemas/quality_checks.json @@ -1 +1 @@ -{ "data" : ["QUALITYCHECK_SUM", "QUALITYCHECK_FRAME_SAT", "QUALITYCHECK_RATE_SAT"]} +{ "data" : ["sum", "frame_sat_pts", "frame_sat_cnt_rate"]} diff --git a/config/12id/pilatus300/dqconfig.ini b/config/12id/pilatus300/dqconfig.ini index 49eef54..248ef43 100644 --- a/config/12id/pilatus300/dqconfig.ini +++ b/config/12id/pilatus300/dqconfig.ini @@ -3,10 +3,8 @@ 'time_zone' = 'America/Chicago' #real-time verifier -'feedback_type' = console, pv +'feedback_type' = console, pv_12 'detector' = pilatus300 -'detector_basic' = cam1 -'detector_image' = image1 -'cnt_rate' = True +'decor' = acq_time > pilatus300:cam1:AcquireTime, file_name > pilatus300:cam1:FullFileName_RBV +'callback_pv' = pilatus300:cam1:FullFileName_RBV 'no_frames' = -1 - diff --git a/config/12id/pilatus300/schemas/limits.json b/config/12id/pilatus300/schemas/limits.json index 1166452..0d96519 100644 --- a/config/12id/pilatus300/schemas/limits.json +++ b/config/12id/pilatus300/schemas/limits.json @@ -1,20 +1,20 @@ { - "data" : { - "sum" : { - "low_limit" : 100, - "high_limit" : 2300 - }, - "sat" : { - "low_limit" : 0, - "high_limit" : 2300 - }, - "frame_sat" : { - "low_limit" : 0, - "high_limit" : 4000 - }, - "rate_sat" : { - "low_limit" : 100, - "high_limit" : 2300 - } - } -} + "data": { + "frame_sat_cnt_rate": { + "high_limit": 2300 + }, + "sum": { + "high_limit": 100000, + "low_limit": -10000000 + }, + "point_sat_rate": { + "high_limit": 230000 + }, + "point_sat": { + "high_limit": 100000 + }, + "frame_sat_pts": { + "high_limit": 10000 + } + } +} \ No newline at end of file diff --git a/config/12id/pilatus300/schemas/quality_checks.json b/config/12id/pilatus300/schemas/quality_checks.json index c971465..b069777 100644 --- a/config/12id/pilatus300/schemas/quality_checks.json +++ b/config/12id/pilatus300/schemas/quality_checks.json @@ -1 +1 @@ -{ "data" : ["QUALITYCHECK_SUM", "QUALITYCHECK_FRAME_SAT", "QUALITYCHECK_RATE_SAT"]} +{ "data" : ["sum", "frame_sat_pts", "frame_sat_cnt_rate"]} diff --git a/config/12id/start_verifier.py b/config/12id/start_verifier.py index a6c672b..dbe574c 100644 --- a/config/12id/start_verifier.py +++ b/config/12id/start_verifier.py @@ -57,9 +57,8 @@ from multiprocessing.managers import SyncManager import os from os.path import expanduser -import dquality.realtime.real_time as real +import dquality.real_time_pv as real import argparse -import json import sys import time @@ -168,7 +167,7 @@ def start_server(arg): #key = 'test' #os.system("medm -x /local/bfrosik/data-quality/config/12id/test/qualityFeedbackTest.adl &") else: - print 'not supported instrument' + print ('not supported instrument') home = expanduser("~") conf = os.path.join(home, ".dquality", instrument) diff --git a/config/12id/verui.ui b/config/12id/verui.ui new file mode 100644 index 0000000..0a80db5 --- /dev/null +++ b/config/12id/verui.ui @@ -0,0 +1,162 @@ + + + MainWindow + + + + 0 + 0 + 689 + 564 + + + + MainWindow + + + + + + 40 + 20 + 200 + 34 + + + + + + + <html><head/><body><p><span style=" font-weight:600;">Detector</span></p></body></html> + + + + + + + + + + + + 40 + 80 + 591 + 256 + + + + + + + + + Minimum Integrated Counts of a Frame + + + + + + + Maximum Integrated Counts of a Frame + + + + + + + Saturation Count of a Pixel + + + + + + + Maximum Number of Saturated Pixels of a Frame + + + + + + + Saturation Count Rate of a Pixel + + + + + + + Maximum Number of Count Rate Saturation Pixels of a Frame + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 40 + 350 + 591 + 121 + + + + + + + + 0 + 0 + 689 + 28 + + + + + Verifier + + + + + + + + + + start verifier + + + + + stop verifier + + + + + + diff --git a/config/12id/window_ver.py b/config/12id/window_ver.py new file mode 100644 index 0000000..bf76870 --- /dev/null +++ b/config/12id/window_ver.py @@ -0,0 +1,232 @@ +import sys +import os +from os.path import expanduser +from configobj import ConfigObj +import json +import epics +from PyQt4 import QtGui, uic +from PyQt4.QtCore import pyqtSignal, pyqtSlot +import zmq + + +default_det = "S12-PILATUS1" +default_cntl_port = 5511 +default_cntl_host = 'localhost' +ui = "verui.ui" + + +class zmq_consumer(): + def __init__(self, host = default_cntl_host, port=default_cntl_port): + context = zmq.Context() + self.socket = context.socket(zmq.PAIR) + self.socket.connect("tcp://" + host + ":%s" % port) + + +class Window(QtGui.QMainWindow): + statusBarSignal = pyqtSignal(str, str) + + def __init__(self): + super(Window, self).__init__() + # set parameters from config file + self.detector = default_det + self.conf_map, self.quality_checks = self.get_ver_params() + + self.ui = uic.loadUi(ui) + self.ui.show() + + self.show_limits() + self.ui.det_name.setText(self.detector) + + self.ui.det_name.returnPressed.connect(lambda: self.set_detector()) + + self.ui.frame_sum_ll.returnPressed.connect(lambda: self.set_limit(self.ui.frame_sum_ll, 'sum','low_limit')) + self.ui.frame_sum_hl.returnPressed.connect(lambda: self.set_limit(self.ui.frame_sum_hl, 'sum','high_limit')) + self.ui.point_sat_hl.returnPressed.connect(lambda: self.set_limit(self.ui.point_sat_hl, 'point_sat','high_limit')) + self.ui.frame_sat_pts_hl.returnPressed.connect(lambda: self.set_limit(self.ui.frame_sat_pts_hl, 'frame_sat_pts','high_limit')) + self.ui.point_sat_rate_hl.returnPressed.connect(lambda: self.set_limit(self.ui.point_sat_rate_hl, 'point_sat_rate','high_limit')) + self.ui.frame_sat_pts_rate_hl.returnPressed.connect(lambda: self.set_limit(self.ui.frame_sat_pts_rate_hl, 'frame_sat_cnt_rate','high_limit')) + + self.setEpicsQualityFeedbackUpdate() + + self.verifier_on = 0 + self.ui.actionStart_verifier.triggered.connect(self.start_verifier) + self.ui.actionStop_verifier.triggered.connect(self.stop_verifier) + + self.statusBarSignal.connect(self.onVerifierPVchange) + self.zmq_menu = zmq_consumer() + self.ui.statusBar.showMessage("verifier off") + + self.list_cnt = 0 + + + def start_verifier(self): + socket = self.zmq_menu.socket + socket.send_json( + dict( + key="start_ver", + detector=self.detector + ) + ) + self.verifier_on = 1 + self.set_status_color('yellow') + msg = 'not acquireing' + + self.ui.statusBar.showMessage(msg) + + + def stop_verifier(self): + socket = self.zmq_menu.socket + socket.send_json( + dict( + key="stop_ver" + ) + ) + self.verifier_on = 0 + self.set_status_color('none') + msg = 'off' + + self.ui.statusBar.showMessage(msg) + + + def set_detector(self): + restart = False + if self.verifier_on == 1: + self.stop_verifier() + restart = True + + self.detector = str(self.ui.det_name.text()) + self.conf_map, self.quality_checks = self.get_ver_params() + self.show_limits() + + if restart: + self.start_verifier() + + + def show_limits(self): + try: + self.limits_file = self.conf_map['limits'] + except KeyError: + self.limits_file = None + + with open(self.limits_file) as limitsfile: + self.limits = json.loads(limitsfile.read())['data'] + self.ui.frame_sum_ll.setText(str(self.limits['sum']['low_limit'])) + self.ui.frame_sum_hl.setText(str(self.limits['sum']['high_limit'])) + self.ui.point_sat_hl.setText(str(self.limits['point_sat']['high_limit'])) + self.ui.frame_sat_pts_hl.setText(str(self.limits['frame_sat_pts']['high_limit'])) + self.ui.point_sat_rate_hl.setText(str(self.limits['point_sat_rate']['high_limit'])) + self.ui.frame_sat_pts_rate_hl.setText(str(self.limits['frame_sat_cnt_rate']['high_limit'])) + limitsfile.close() + + + def set_limit(self, le_limit, key1, key2): + restart = False + if self.verifier_on == 1: + self.stop_verifier() + restart = True + + limit_val = int(le_limit.text()) + self.limits[key1][key2] = limit_val + data_limits = {} + data_limits['data'] = self.limits + with open(self.limits_file, 'w') as limitsfile: + json.dump(data_limits, limitsfile) + limitsfile.close() + + if restart: + self.start_verifier() + + + def setEpicsQualityFeedbackUpdate(self): + try: + self.acquire = epics.PV(self.detector + ':cam1:Acquire', callback=self.epicsCallbackFunc) + self.status = epics.PV(self.detector + ':STAT', callback=self.epicsCallbackFunc) + except: + self.ui.statusBar.showMessage('verifier off') + + + def epicsCallbackFunc(self, pvname, char_value, **kws): + self.statusBarSignal.emit(pvname, char_value) + + + @pyqtSlot(str, str) + def onVerifierPVchange(self, pvname, char_value): + if not pvname is None: + if "STAT" in pvname: + msg = epics.caget(self.detector+':STAT', as_string=True) + if msg is None: + return + failed = False + if msg.endswith('status'): + pass + elif msg.endswith('pass'): + self.set_status_color('green') + else: + failed = True + self.set_status_color('red') + self.ui.statusBar.showMessage(msg) + if failed: + list_item = msg + if self.list_cnt <= 4: + self.ui.list_failed.insertItem(0, list_item) + self.list_cnt = self.list_cnt + 1 + else: + self.ui.list_failed.takeItem(4) + self.ui.list_failed.insertItem(0, list_item) + elif "Acquire" in pvname: + if self.verifier_on is 1: + if int(float(char_value)) is 0: + self.set_status_color('yellow') + msg = 'not acquireing' + self.ui.statusBar.showMessage(msg) + else: + self.ui.statusBar.showMessage("ver pv name not defined") + + + def set_status_color(self, color): + if color is 'red': + self.ui.statusBar.setStyleSheet( + "QStatusBar{padding-left:8px;background:rgba(255,0,0,120);color:black;font-weight:bold;}") + elif color is 'green': + self.ui.statusBar.setStyleSheet( + "QStatusBar{padding-left:8px;background:rgba(0,255,0,120);color:black;font-weight:bold;}") + elif color is 'yellow': + self.ui.statusBar.setStyleSheet( + "QStatusBar{padding-left:8px;background:rgba(255,255,0,120);color:black;font-weight:bold;}") + elif color is 'none': + self.ui.statusBar.setStyleSheet( + "QStatusBar{padding-left:8px;background:rgba(0,0,0,0);color:black;font-weight:bold;}") + + + def get_ver_params(self): + home = expanduser("~") + conf = os.path.join(home, '.dquality', self.detector) + if os.path.isdir(conf): + config = os.path.join(conf, 'dqconfig.ini') + if not os.path.isfile(config): + return None + conf_map = ConfigObj(config) + try: + qcfile = conf_map['quality_checks'] + except KeyError: + qcfile = None + with open(qcfile) as qc_file: + quality_checks = json.loads(qc_file.read()) + qc_file.close() + return conf_map, quality_checks + + +if __name__ == "__main__": + app = QtGui.QApplication(sys.argv) + a = Window() + socket = a.zmq_menu.socket + #sys.exit(app.exec_()) + # stop verifier on exit + res = app.exec_() + socket.send_json( + dict( + key="stop_ver" + ) + ) + socket.close() + sys.exit(res) diff --git a/config/1id/dqconfig.ini b/config/1id/dqconfig.ini index 07c3417..b661b32 100644 --- a/config/1id/dqconfig.ini +++ b/config/1id/dqconfig.ini @@ -7,8 +7,6 @@ #'consumers' = test/schemas/consumers.json #real-time verifier -'feedback_type' = console +'feedback_type' = a, console 'detector' = BBF1 -'detector_basic' = cam1 -'detector_image' = image1 'no_frames' = 20 \ No newline at end of file diff --git a/config/1id/schemas/quality_checks.json b/config/1id/schemas/quality_checks.json index d38aded..5ddf937 100644 --- a/config/1id/schemas/quality_checks.json +++ b/config/1id/schemas/quality_checks.json @@ -1 +1 @@ -{ "data" : ["QUALITYCHECK_SAT", "ACC_SAT"]} \ No newline at end of file +{ "data" : ["saturation", "acc_sat"]} \ No newline at end of file diff --git a/config/32id_micro/dqconfig.ini b/config/32id_micro/dqconfig.ini index 6e3c2f0..7686afe 100644 --- a/config/32id_micro/dqconfig.ini +++ b/config/32id_micro/dqconfig.ini @@ -11,8 +11,6 @@ #'consumers' = test/schemas/consumers.json #real-time verifier -'feedback_type' = console +'feedback_type' = a, console 'detector' = 32idcPG3 -'detector_basic' = cam1 -'detector_image' = image1 'no_frames' = 20 \ No newline at end of file diff --git a/config/32id_micro/schemas/quality_checks.json b/config/32id_micro/schemas/quality_checks.json index a98d2d0..e8eb4c1 100644 --- a/config/32id_micro/schemas/quality_checks.json +++ b/config/32id_micro/schemas/quality_checks.json @@ -1,3 +1,3 @@ -{"data" : [ "QUALITYCHECK_MEAN", "STAT_MEAN", "QUALITYCHECK_STD" ], - "data_dark": [ "QUALITYCHECK_MEAN" ], - "data_white": [ "QUALITYCHECK_MEAN", "QUALITYCHECK_STD" ]} +{"data" : [ "mean", "st_dev", "stat_mean"], + "data_dark": [ "mean"], + "data_white": [ "mean", "stat_mean"]} diff --git a/config/32id_nano/dqconfig.ini b/config/32id_nano/dqconfig.ini index 9886b72..21a799e 100644 --- a/config/32id_nano/dqconfig.ini +++ b/config/32id_nano/dqconfig.ini @@ -11,8 +11,6 @@ #'consumers' = test/schemas/consumers.json #real-time verifier -'feedback_type' = console +'feedback_type' = a, console 'detector' = 32idcPG3 -'detector_basic' = cam1 -'detector_image' = image1 'no_frames' = 20 \ No newline at end of file diff --git a/config/32id_nano/schemas/quality_checks.json b/config/32id_nano/schemas/quality_checks.json index a98d2d0..e8eb4c1 100644 --- a/config/32id_nano/schemas/quality_checks.json +++ b/config/32id_nano/schemas/quality_checks.json @@ -1,3 +1,3 @@ -{"data" : [ "QUALITYCHECK_MEAN", "STAT_MEAN", "QUALITYCHECK_STD" ], - "data_dark": [ "QUALITYCHECK_MEAN" ], - "data_white": [ "QUALITYCHECK_MEAN", "QUALITYCHECK_STD" ]} +{"data" : [ "mean", "st_dev", "stat_mean"], + "data_dark": [ "mean"], + "data_white": [ "mean", "stat_mean"]} diff --git a/config/default/schemas/quality_checks.json b/config/default/schemas/quality_checks.json index 407b680..e8eb4c1 100644 --- a/config/default/schemas/quality_checks.json +++ b/config/default/schemas/quality_checks.json @@ -1,3 +1,3 @@ -{"data" : [ "QUALITYCHECK_MEAN", "QUALITYCHECK_STD", "STAT_MEAN"], - "data_dark": [ "QUALITYCHECK_MEAN"], - "data_white": [ "QUALITYCHECK_MEAN", "QUALITYCHECK_STD"]} +{"data" : [ "mean", "st_dev", "stat_mean"], + "data_dark": [ "mean"], + "data_white": [ "mean", "stat_mean"]} diff --git a/config/mona/DAQStream.py b/config/mona/DAQStream.py new file mode 100644 index 0000000..e19c2cf --- /dev/null +++ b/config/mona/DAQStream.py @@ -0,0 +1,330 @@ +#!/home/beams/USER2BMB/Apps/BlueSky/bin/python + +import argparse +import sys +import numpy as np +import zmq +import TraceSerializer +import time +import sys +import os +import pvaccess + +sys.path.append(os.path.join(os.path.dirname(__file__), './local')) +import flatbuffers + + +def parse_arguments(): + parser = argparse.ArgumentParser( + description='Data Acquisition Process') + parser.add_argument("--image_pv", default="2bmbPG3:Pva1:Image", + help="EPICS PVA image PV name. Default to lyra point grey.") + #arguments for zmq feed + parser.add_argument("--feed_socket", default=5577, + help="A socket the data is received. Default to 5577.") + parser.add_argument("--feed_host", default="localhost", + help="A host sending data. Default to localhost.") + + parser.add_argument('--bind_address_publisher', default="tcp://*:5560", + help='Address to bind publisher.') + parser.add_argument('--publisher_hwm', type=int, default=10 * 1024, + help='Sets high water mark value for publisher.') + parser.add_argument('--synchronize_subscribers', action='store_true', + help='Waits for all subscribers to join.') + parser.add_argument('--subscriber_count', type=int, + help='Number of expected subscribers.') + parser.add_argument('--bind_address_rep', default=None, + help='Address to bind REP socket (for synchronization)') + + parser.add_argument('--daq_mod', type=int, default=2, + help='Data acqusition mod (0=detector; 1=simulate; 2=test)') + parser.add_argument('--simulation_file', default="../data/hornby_4_x1_conv.h5", + help='File name for mock data acquisition. ' + 'Default to shale data.') + parser.add_argument('--beg_sinogram', type=int, default=0, + help='Starting sinogram for reconstruction.') + parser.add_argument('--num_sinograms', type=int, default=0, + help='Number of sinograms to reconstruct.') + parser.add_argument('--num_sinogram_columns', type=int, default=2048, + help='Number of columns per sinogram.') + parser.add_argument('--num_sinogram_projections', type=int, default=1440, + help='Number of projections per sinogram.') + return parser.parse_args() + + +def synchronize_subs(context, subscriber_count, bind_address_rep): + print("Synching") + # Socket to receive signals + sync_socket = context.socket(zmq.REP) + sync_socket.bind(bind_address_rep) + + # Get synchronization from subscribers + counter = 0 + while counter < subscriber_count: + # wait for synchronization request + msg = sync_socket.recv() + # send synchronization reply + sync_socket.send(b'') + counter += 1 + print("Joined subscriber: {}/{}".format(counter, subscriber_count)) + + +def setup_simulation_data(input_f, beg_sinogram=0, num_sinograms=0): + import h5py as h5 + ifptr = h5.File(input_f, 'r') + # idata = np.array(ifptr['exchange/data'], dtype=np.float32) + # itheta = np.array(ifptr['exchange/theta'], dtype=np.float32) + # if num_sinograms>0: + # if (beg_sinogram<0) or (beg_sinogram+num_sinograms>idata.shape[1]): + # raise Exception("Exceeds the sinogram boundary: {} vs. {}".format( + # beg_sinogram+num_sinograms, idata.shape[1])) + # idata = idata[:, beg_sinogram:beg_sinogram+num_sinograms, :] + if num_sinograms > 0: + idata = np.array(ifptr['exchange/data'][:, beg_sinogram:beg_sinogram + num_sinograms, :]) + else: + idata = idata[:, beg_sinogram:beg_sinogram + num_sinograms, :] + itheta = np.array(ifptr['exchange/theta']) + ifptr.close() + return idata, itheta + + +def test_daq(publisher_socket, builder, + rotation_step=0.25, num_sinograms=0, + num_sinogram_columns=2048, seq=0, + num_sinogram_projections=1440): + print("Sending projections") + if num_sinograms < 1: num_sinograms = 2048 + # Randomly generate image data + dims = (num_sinograms, num_sinogram_columns) + image = np.random.randint(2, size=dims, dtype=np.uint16) + + for uniqueId in range(num_sinogram_projections): + builder.Reset() + serializer = TraceSerializer.ImageSerializer(builder) + serialized_data = serializer.serialize(image=image, uniqueId=uniqueId + 7, + rotation_step=rotation_step, seq=seq) + seq += 1 + publisher_socket.send(serialized_data) + + return seq + + +def simulate_daq(publisher_socket, builder, input_f, + beg_sinogram=0, num_sinograms=0, seq=0): + # Read image data and theta values + idata, itheta = setup_simulation_data(input_f, beg_sinogram, num_sinograms) + + print(idata.shape, len(itheta), idata.size) + for uniqueId, projId, rotation in zip(range(idata.shape[0]), range(idata.shape[0]), itheta): + builder.Reset() + proj = idata[projId] + serializer = TraceSerializer.ImageSerializer(builder) + serialized_data = serializer.serialize(image=proj, uniqueId=uniqueId, + rotation=rotation, seq=seq) + seq += 1 + publisher_socket.send(serialized_data) + + return seq + + +class TImageTransfer: + def __init__(self, publisher_socket, pv_image, builder, + beg_sinogram=0, num_sinograms=0, seq=0): + self.publisher_socket = publisher_socket + self.pv_image = pv_image + self.builder = builder + self.beg_sinogram = beg_sinogram + self.num_sinograms = num_sinograms + self.seq = seq + self.pv_channel = None + + def __enter__(self): + self.pv_channel = pvaccess.Channel(self.pv_image) + x, y = self.pv_channel.get('field()')['dimension'] + self.dims = (y['size'], x['size']) + labels = [item["name"] for item in self.pv_channel.get('field()')["attribute"]] + self.theta_key = labels.index("SampleRotary") + self.scan_delta_key = labels.index("ScanDelta") + self.start_position_key = labels.index("StartPos") + print(self.dims) + if self.num_sinograms > 0: + if (self.beg_sinogram < 0) or (self.beg_sinogram + self.num_sinograms > self.dims[0]): + raise Exception("Exceeds the sinogram boundary: {} vs. {}".format( + self.beg_sinogram + self.num_sinograms, self.dims[0])) + self.beg_index = self.beg_sinogram * self.dims[1] + self.end_index = self.beg_sinogram * self.dims[1] + self.num_sinograms * self.dims[1] + self.pv_channel.subscribe('push_image_data', self.push_image_data) + + return self + + def start_monitor(self, smon="value,attribute,uniqueId"): + self.pv_channel.startMonitor(smon) + while True: time.sleep(60) # Forever monitor + + def push_image_data(self, data): + img = np.frombuffer(data['value'][0]['ushortValue'], dtype=np.uint16) + uniqueId = data['uniqueId'] + # scanDelta = data['ScanDelta'] + # scanDelta = data['StartPos'] + # scanDelta = data['SaveDest'] + # theta = (uniqueID%360)*scanDelta + # theta = (uniqueId%(360/0.24))*0.24 + # theta = data["attribute"][theta_key]["value"][0]["value"] + scan_delta = data["attribute"][self.scan_delta_key]["value"][0]["value"] + start_position = data["attribute"][self.start_position_key]["value"][0]["value"] + theta = (start_position + uniqueId * scan_delta) % 360.0 + print(uniqueId, theta) + if self.num_sinograms != 0: + img = img[self.beg_index: self.end_index] + img = img.reshape((self.num_sinograms, self.dims[1])) + else: + img = img.reshape(self.dims) + + self.builder.Reset() + serializer = TraceSerializer.ImageSerializer(self.builder) + serialized_data = serializer.serialize(image=img, uniqueId=uniqueId, + rotation=theta, seq=self.seq) + self.publisher_socket.send(serialized_data) + self.seq += 1 + + def __exit__(self, exc_type, exc_value, traceback): + print("\nTrying to gracefully terminate...") + self.pv_channel.stopMonitor() + self.pv_channel.unsubscribe('push_image_data') + + print("Send terminate signal...") + self.publisher_socket.send("end_data".encode()) + print("Done sending...") + if exc_type is not None: + print("{} {} {}".format(exc_type, exc_value, traceback)) + return False + return self + + +class zmq_feed(): + """ + This class represents ZeroMQ connection. + """ + def __init__(self, publisher_socket, feed_socket, feed_host, builder, + beg_sinogram=0, num_sinograms=0, seq=0): + self.publisher_socket = publisher_socket + self.builder = builder + self.beg_sinogram = beg_sinogram + self.num_sinograms = num_sinograms + self.seq = seq + + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PAIR) + self.socket.connect("tcp://" + feed_host +":%s" % feed_socket) + + def destroy(self): + """ + Destroys Context. This also closes socket associated with the context. + """ + self.context.destroy() + + + def receive_zmq_send(self, zmq_host, zmq_rcv_port): + """ + This function receives data from socket and sends it to publisher. + """ + + interrupted = False + while not interrupted: + print ('waiting') + msg = self.socket.recv_json() + key = msg.get("key") + if key == "end": + print ('end of data, closing connection') + interrupted = True + self.destroy() + elif key == "dim": + print('initializing dims') + self.dims = (msg["dim_x"], msg["dim_y"]) + if self.num_sinograms > 0: + if (self.beg_sinogram < 0) or (self.beg_sinogram + self.num_sinograms > self.dims[0]): + raise Exception("Exceeds the sinogram boundary: {} vs. {}".format( + self.beg_sinogram + self.num_sinograms, self.dims[0])) + self.beg_index = self.beg_sinogram * self.dims[1] + self.end_index = self.beg_sinogram * self.dims[1] + self.num_sinograms * self.dims[1] + elif key == "image": + print('got msg') + msg["receiving_timestamp"] = time.time() + dtype = msg["dtype"] + uniqueId = msg['image_number'] + theta = msg['theta'] + ver_result = msg['ver'] + + + img = np.frombuffer(self.socket.recv(), dtype=dtype) + + if self.num_sinograms != 0: + img = img[self.beg_index: self.end_index] + img = img.reshape((self.num_sinograms, self.dims[1])) + else: + img = img.reshape(self.dims) + + self.builder.Reset() + serializer = TraceSerializer.ImageSerializer(self.builder) + serialized_data = serializer.serialize(image=img, uniqueId=uniqueId, + rotation=theta, seq=self.seq) + self.publisher_socket.send(serialized_data) + self.seq += 1 + + else: + pass + + print("Connection ended") + +def main(): + args = parse_arguments() + + # Setup serializer + builder = flatbuffers.Builder(0) + + # Setup zmq context + context = zmq.Context() + + # Publisher setup + publisher_socket = context.socket(zmq.PUB) + publisher_socket.set_hwm(args.publisher_hwm) + publisher_socket.bind(args.bind_address_publisher) + + # 1. Synchronize/handshake with remote + if args.synchronize_subscribers: + synchronize_subs(context, args.subscriber_count, args.bind_address_rep) + + # 2. Transfer data + time0 = time.time() + if args.daq_mod == 0: # Read data from PV + with TImageTransfer(publisher_socket=publisher_socket, + pv_image=args.image_pv, builder=builder, + beg_sinogram=args.beg_sinogram, + num_sinograms=args.num_sinograms, seq=0) as tdet: + tdet.start_monitor() # Infinite loop + + elif args.daq_mod == 1: # Simulate data acquisition with a file + simulate_daq(publisher_socket=publisher_socket, + input_f=args.simulation_file, builder=builder, + beg_sinogram=args.beg_sinogram, num_sinograms=args.num_sinograms) + elif args.daq_mod == 2: # Test data acquisition + test_daq(publisher_socket=publisher_socket, builder=builder, + num_sinograms=args.num_sinograms, # Y + num_sinogram_columns=args.num_sinogram_columns, # X + num_sinogram_projections=args.num_sinogram_projections) # Z + elif args.daq_mod == 3: # receive data over zmq connection + with zmq_feed(publisher_socket=publisher_socket, + feed_socket=args.feed_socket, feed_host=args.feed_host, builder=builder, + beg_sinogram=args.beg_sinogram, + num_sinograms=args.num_sinograms, seq=0) as zfeed: + zfeed.receive_zmq_send() + else: + print("Unknown mode: {}".format(args.daq_mod)); + + publisher_socket.send("end_data".encode()) + time1 = time.time() + print("Done sending; Total time={}".format(time1 - time0)) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/doc/demo/zmq_rec_check.py b/doc/demo/zmq_rec_check.py new file mode 100644 index 0000000..035092a --- /dev/null +++ b/doc/demo/zmq_rec_check.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# ######################################################################### +# Copyright (c) 2015, UChicago Argonne, LLC. All rights reserved. # +# # +# Copyright 2015. UChicago Argonne, LLC. This software was produced # +# under U.S. Government contract DE-AC02-06CH11357 for Argonne National # +# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the # +# U.S. Department of Energy. The U.S. Government has rights to use, # +# reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR # +# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR # +# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is # +# modified to produce derivative works, such modified software should # +# be clearly marked, so as not to confuse it with the version available # +# from ANL. # +# # +# Additionally, redistribution and use in source and binary forms, with # +# or without modification, are permitted provided that the following # +# conditions are met: # +# # +# * Redistributions of source code must retain the above copyright # +# notice, this list of conditions and the following disclaimer. # +# # +# * Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in # +# the documentation and/or other materials provided with the # +# distribution. # +# # +# * Neither the name of UChicago Argonne, LLC, Argonne National # +# Laboratory, ANL, the U.S. Government, nor the names of its # +# contributors may be used to endorse or promote products derived # +# from this software without specific prior written permission. # +# # +# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS # +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago # +# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # +# POSSIBILITY OF SUCH DAMAGE. # +# ######################################################################### +""" +Please make sure the installation :ref:`pre-requisite-reference-label` are met. + +This example takes one mandatory parameter: +instrument: a string defining the detector that will be used. It is assumed that a subdirectory with the name of +'instrument' exists in the ~/.dquality directory, and that the subdirectory contains configuration file named +dqconfig.init. + +This script calls zmq_receiver verifier. + +""" +import dquality.feeds.zmq_receiver as rec +import sys +import os +import argparse +from os.path import expanduser + +def main(arg): + + parser = argparse.ArgumentParser() + parser.add_argument("instrument", help="instrument name, name should have a matching directory in the .dquality folder") + + args = parser.parse_args() + instrument = args.instrument + + home = expanduser("~") + conf = os.path.join(home, ".dquality", instrument) + + rec.verify(conf) + + +if __name__ == "__main__": + main(sys.argv[1:]) + diff --git a/dquality/accumulator.py b/dquality/accumulator.py index 22f652c..eeef6d0 100644 --- a/dquality/accumulator.py +++ b/dquality/accumulator.py @@ -141,7 +141,8 @@ def init(config): with open(qcfile) as qc_file: dict = json.loads(qc_file.read()) - quality_checks = utils.get_quality_checks(dict) + #quality_checks = utils.get_quality_checks(dict) + quality_checks = dict try: report_type = conf['report_type'] diff --git a/dquality/check_rt.py b/dquality/check_rt.py index dc2d7e5..46c2ba0 100644 --- a/dquality/check_rt.py +++ b/dquality/check_rt.py @@ -52,7 +52,9 @@ """ import json -import dquality.realtime.real_time as real +import dquality.real_time_pv as real +import dquality.feeds.zmq_receiver as rec + __author__ = "Barbara Frosik" __copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." @@ -83,7 +85,14 @@ def realtime(conf, report_file = None, sequence = None): """ - bad_indexes = real.verify(conf, report_file, sequence) + rt = real.RT() + bad_indexes = rt.verify(conf, report_file, sequence) print (json.dumps(bad_indexes)) return bad_indexes + +def run_recev(config): + rec.verify(config) + +# if __name__ == "__main__": +# run_recev('test/dqconfig.ini') diff --git a/dquality/clients/__init__.py b/dquality/clients/__init__.py new file mode 100644 index 0000000..8c5eba3 --- /dev/null +++ b/dquality/clients/__init__.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# ######################################################################### +# Copyright (c) 2016, UChicago Argonne, LLC. All rights reserved. # +# # +# Copyright 2016. UChicago Argonne, LLC. This software was produced # +# under U.S. Government contract DE-AC02-06CH11357 for Argonne National # +# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the # +# U.S. Department of Energy. The U.S. Government has rights to use, # +# reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR # +# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR # +# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is # +# modified to produce derivative works, such modified software should # +# be clearly marked, so as not to confuse it with the version available # +# from ANL. # +# # +# Additionally, redistribution and use in source and binary forms, with # +# or without modification, are permitted provided that the following # +# conditions are met: # +# # +# * Redistributions of source code must retain the above copyright # +# notice, this list of conditions and the following disclaimer. # +# # +# * Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in # +# the documentation and/or other materials provided with the # +# distribution. # +# # +# * Neither the name of UChicago Argonne, LLC, Argonne National # +# Laboratory, ANL, the U.S. Government, nor the names of its # +# contributors may be used to endorse or promote products derived # +# from this software without specific prior written permission. # +# # +# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS # +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago # +# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # +# POSSIBILITY OF SUCH DAMAGE. # +# ######################################################################### + +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from dquality import * diff --git a/dquality/clients/fb_client/__init__.py b/dquality/clients/fb_client/__init__.py new file mode 100644 index 0000000..8c5eba3 --- /dev/null +++ b/dquality/clients/fb_client/__init__.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# ######################################################################### +# Copyright (c) 2016, UChicago Argonne, LLC. All rights reserved. # +# # +# Copyright 2016. UChicago Argonne, LLC. This software was produced # +# under U.S. Government contract DE-AC02-06CH11357 for Argonne National # +# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the # +# U.S. Department of Energy. The U.S. Government has rights to use, # +# reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR # +# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR # +# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is # +# modified to produce derivative works, such modified software should # +# be clearly marked, so as not to confuse it with the version available # +# from ANL. # +# # +# Additionally, redistribution and use in source and binary forms, with # +# or without modification, are permitted provided that the following # +# conditions are met: # +# # +# * Redistributions of source code must retain the above copyright # +# notice, this list of conditions and the following disclaimer. # +# # +# * Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in # +# the documentation and/or other materials provided with the # +# distribution. # +# # +# * Neither the name of UChicago Argonne, LLC, Argonne National # +# Laboratory, ANL, the U.S. Government, nor the names of its # +# contributors may be used to endorse or promote products derived # +# from this software without specific prior written permission. # +# # +# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS # +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago # +# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # +# POSSIBILITY OF SUCH DAMAGE. # +# ######################################################################### + +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from dquality import * diff --git a/dquality/clients/fb_client/feedback.py b/dquality/clients/fb_client/feedback.py new file mode 100644 index 0000000..e5e01df --- /dev/null +++ b/dquality/clients/fb_client/feedback.py @@ -0,0 +1,80 @@ +import dquality.common.constants as const +import dquality.clients.fb_client.pv_feedback as pv_fb +import dquality.clients.fb_client.pv_feedback_12 as pv_fb_12 + + +class Feedback(object): + """ + This class is a container of real-time feedback related information. + """ + def __init__(self, q, feedback_type, **kwargs): + """ + Constructor + + Parameters + ---------- + feedback_type : list + a list of configured feedbac types. Possible options: console, log, and pv + """ + self.q = q + self.feedback_type = feedback_type + # create pv driver if pv feedback + if const.FEEDBACK_PV in self.feedback_type: + # base support + self.pv = pv_fb.PV_FB(**kwargs) + elif const.FEEDBACK_PV_12 in self.feedback_type: + # customized support for beamline12 + self.pv = pv_fb_12.PV_FB_12(**kwargs) + + + def set_logger(self, logger): + """ + This function sets logger. + + Parameters + ---------- + logger : Logger + an instance of Logger + """ + self.logger = logger + + + def deliver(self): + """ + This function provides feedback as defined by the feedback_type in a real time. + + If the feedback type contains pv type, this function creates server and initiates driver handling the feedback + pvs.It dequeues results from the 'q' queue and processes all feedback types that have been configured. + It will stop processing the queue when it dequeues data indicating end status. + + Parameters + ---------- + none + + Returns + ------- + none + """ + self.pv.start_driver() #in the same process + evaluating = True + while evaluating: + results = self.q.get() + + if results == const.DATA_STATUS_END: + evaluating = False + elif results == const.DATA_STATUS_MISSING: + pass + else: + if results.failed: + for result in results.results: + if result.error != 0: + # for console and log feedback deliver only the errors + if const.FEEDBACK_CONSOLE in self.feedback_type: + print ('failed frame ' + str(results.index) + ' result of ' + + result.quality_id + ' is ' + str(result.res)) + if const.FEEDBACK_LOG in self.feedback_type: + self.logger.info('failed frame ' + str(results.index) + ' result of ' + + result.quality_id + ' is ' + str(result.res)) + if not self.pv is None: + self.pv.write_to_pv(results) + diff --git a/dquality/realtime/pv_feedback_driver.py b/dquality/clients/fb_client/pv_feedback.py similarity index 81% rename from dquality/realtime/pv_feedback_driver.py rename to dquality/clients/fb_client/pv_feedback.py index cd7b16b..dbe0159 100644 --- a/dquality/realtime/pv_feedback_driver.py +++ b/dquality/clients/fb_client/pv_feedback.py @@ -54,14 +54,41 @@ """ from pcaspy import SimpleServer, Driver +import sys +if sys.version[0] == '2': + import thread as thread +else: + import _thread as thread + + +class PV_FB(object): + def __init__(self, **kwargs): + for key in kwargs: + setattr(self, key, kwargs[key]) + + + def start_driver(self): + server = FbServer() + driver = server.init_driver(self.detector, self.feedback_pvs) + thread.start_new_thread(server.activate_pv, ()) + self.driver = driver + + + def write_to_pv(self, results): + text = results.file_name + if results.failed: + for result in results.results: + if result.error != 0: + pv = results.type + '_' + result.quality_id + self.driver.write(pv, result.res) class FbDriver(Driver): """ - This class is a driver that overrites write method and has a field counters. + This class is a driver that overrites write method. """ - def __init__(self, counters): + def __init__(self, **kwargs): """ Constructor @@ -73,11 +100,22 @@ def __init__(self, counters): """ super(FbDriver, self).__init__() - self.counters = counters + try: + self.counters = kwargs['counters'] + except: + pass - def write(self, pv, index): + + def reset_counters(self): + for pv in self.counters: + self.counters[pv] = 0 + self.setParam(pv+'_ctr', self.counters[pv]) + self.updatePVs() + + + def write(self, pv, result): """ - This function override write method fro Driver. + This function override write method from Driver. It sets the 'index' pv to the index value, increments count of failing frames for the data type and quality check indicated by pv, and sets the 'counter' pv to the new counter value. @@ -96,14 +134,15 @@ def write(self, pv, index): """ status = True - self.setParam(pv+'_ind', index) + self.setParam(pv+'_res', result) self.counters[pv] += 1 # this method is called on failed quality check, increase counter for this pv self.setParam(pv+'_ctr', self.counters[pv]) self.updatePVs() return status -class FbServer: + +class FbServer(object): """ This class is a server that controls the FbDriver. @@ -140,16 +179,15 @@ def init_driver(self, detector, feedback_pvs): prefix = detector + ':' pvdb = {} counters = {} + #add PV that follow index of failed farames and count of failed frames for each quality check for pv in feedback_pvs: - pvdb[pv+'_ind'] = { 'prec' : 0,} - pvdb[pv+'_ctr'] = { 'prec' : 0, - 'hihi' : 1, } + pvdb[pv+'_res'] = { 'prec' : 0,} + pvdb[pv+'_ctr'] = { 'prec' : 0,} counters[pv] = 0 - self.server = SimpleServer() self.server.createPV(prefix, pvdb) - driver = FbDriver(counters) + driver = FbDriver(counters=counters) return driver def activate_pv(self): diff --git a/dquality/clients/fb_client/pv_feedback_12.py b/dquality/clients/fb_client/pv_feedback_12.py new file mode 100644 index 0000000..a050d8a --- /dev/null +++ b/dquality/clients/fb_client/pv_feedback_12.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# ######################################################################### +# Copyright (c) 2016, UChicago Argonne, LLC. All rights reserved. # +# # +# Copyright 2016. UChicago Argonne, LLC. This software was produced # +# under U.S. Government contract DE-AC02-06CH11357 for Argonne National # +# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the # +# U.S. Department of Energy. The U.S. Government has rights to use, # +# reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR # +# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR # +# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is # +# modified to produce derivative works, such modified software should # +# be clearly marked, so as not to confuse it with the version available # +# from ANL. # +# # +# Additionally, redistribution and use in source and binary forms, with # +# or without modification, are permitted provided that the following # +# conditions are met: # +# # +# * Redistributions of source code must retain the above copyright # +# notice, this list of conditions and the following disclaimer. # +# # +# * Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in # +# the documentation and/or other materials provided with the # +# distribution. # +# # +# * Neither the name of UChicago Argonne, LLC, Argonne National # +# Laboratory, ANL, the U.S. Government, nor the names of its # +# contributors may be used to endorse or promote products derived # +# from this software without specific prior written permission. # +# # +# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS # +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago # +# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # +# POSSIBILITY OF SUCH DAMAGE. # +# ######################################################################### + +""" +Please make sure the installation :ref:`pre-requisite-reference-label` are met. + +This module contains classes handling real time feedback of the quality results via process variables. + +""" +import time +import dquality.clients.fb_client.pv_feedback as pvfb +import sys +if sys.version[0] == '2': + import thread as thread +else: + import _thread as thread + + +class PV_FB_12(pvfb.PV_FB): + def __init__(self, **kwargs): + super(PV_FB_12, self).__init__(**kwargs) + + + def start_driver(self): + server = FbServer_12() + driver = server.init_driver(self.detector, self.feedback_pvs) + thread.start_new_thread(server.activate_pv, ()) + self.driver = driver + + + def write_to_pv(self, results): + text = results.file_name + if results.failed: + msg = text + ' failed' + for result in results.results: + if result.error != 0: + qc = result.quality_id + msg = msg + ' ' + qc + ' with result ' + str(result.res) + else: + msg = text + ' verification pass' + self.driver.write(msg) + + +class FbDriver_12(pvfb.FbDriver): + """ + This class is a driver that overrites write method. + + """ + def __init__(self, **kwargs): + """ + Constructor + + Parameters + ---------- + counters : dict + a dictionary where a key is pv (one for data type and quality method) and value is the number of + failed frames + + """ + super(FbDriver_12, self).__init__() + + + def write(self, msg): + """ + This function override write method from Driver. + + It sets the 'index' pv to the index value, increments count of failing frames for the data type and quality + check indicated by pv, and sets the 'counter' pv to the new counter value. + + Parameters + ---------- + pv : str + a name of the pv, contains information about the data type and quality check (i.e. data_white_mean) + index : int + index of failed frame + + Returns + ------- + status : boolean + Driver status + + """ + status = True + self.setParam('STAT', msg) + self.updatePVs() + return status + + +class FbServer_12(pvfb.FbServer): + """ + This class is a server that controls the FbDriver. + + """ + + def __init__(self): + super(FbServer_12, self).__init__() + + + def init_driver(self, detector, feedback_pvs): + """ + This function initiates the driver. + + It creates process variables for the requested lidt of pv names. For each data type combination with the + applicable quality check two pvs are created: one holding frame index, and one holding count of failed frames. + It creates FbDriver instance and returns it to the calling function. + + Parameters + ---------- + detector : str + a pv name of the detector + feedback_pvs : list + a list of feedback process variables names, for each data type combination with the + applicable quality check + + Returns + ------- + driver : FbDriver + FbDriver instance + + """ + prefix = detector + ':' + pvdb = {} + + pvdb['STAT'] = { + 'type': 'char', + 'count' : 300, + 'value' : 'status' + } + + self.server.createPV(prefix, pvdb) + + driver = FbDriver_12() + return driver diff --git a/dquality/clients/fb_client/simple_feedback.py b/dquality/clients/fb_client/simple_feedback.py new file mode 100644 index 0000000..0a34774 --- /dev/null +++ b/dquality/clients/fb_client/simple_feedback.py @@ -0,0 +1,64 @@ +import dquality.common.constants as const +import dquality.common.utilities as utils +from epics import caput, caget + + +class Feedback(object): + """ + This class is a container of real-time feedback related information. + """ + + def __init__(self,feedback, detector, quality_checks, logger): + """ + Constructor + + Parameters + ---------- + feedback_type : list + a list of configured feedbac types. Possible options: console, log, and pv + """ + + self.feedback_type = feedback + if const.FEEDBACK_PV in self.feedback_type: + self.detector = detector + #zero out the ctr pvs + feedback_pvs = utils.get_feedback_pvs(quality_checks) + for fb_pv in feedback_pvs: + caput(self.detector + ':data_' + fb_pv + '_ctr', 0) + + if const.FEEDBACK_LOG in self.feedback_type: + self.logger = logger + + + def deliver(self, results): + """ + This function provides feedback as defined by the feedback_type in a real time. + + If the feedback type contains pv type, this function creates server and initiates driver handling the feedback + pvs.It dequeues results from the 'q' queue and processes all feedback types that have been configured. + It will stop processing the queue when it dequeues data indicating end status. + + Parameters + ---------- + none + + Returns + ------- + none + """ + if results.failed: + for result in results.results: + if result.error != 0: + # for console and log feedback deliver only the errors + if const.FEEDBACK_CONSOLE in self.feedback_type: + print('failed frame ' + str(results.index) + ' result of ' + + result.quality_id + ' is ' + str(result.res)) + if const.FEEDBACK_LOG in self.feedback_type: + self.logger.info('failed frame ' + str(results.index) + ' result of ' + + result.quality_id + ' is ' + str(result.res)) + if const.FEEDBACK_PV in self.feedback_type: + pv = self.detector + ':' + results.type + '_' + result.quality_id + caput(pv + '_res', result.res) + counter = caget(pv + '_ctr') + 1 + caput(pv + '_ctr', counter) + diff --git a/dquality/realtime/adapter_base.py b/dquality/clients/zmq_client.py similarity index 54% rename from dquality/realtime/adapter_base.py rename to dquality/clients/zmq_client.py index 897b35c..f0443bd 100644 --- a/dquality/realtime/adapter_base.py +++ b/dquality/clients/zmq_client.py @@ -49,118 +49,93 @@ """ Please make sure the installation :ref:`pre-requisite-reference-label` are met. -This module is an adapter connecting feed with a consuming process. -The functions in this module are customized for the consuming process. All functions must be implemented, as they -are called by the feed module. +This module feeds data to ZeroMQ connection. +The data, which represents a frame captured by detector, is sent in two parts. The first part is a json +stream and contains data attributes, such shape, type, theta and counter associated with this frame. +The second part is frame in bytes. + +This module requires configuration file with the following parameters: +'zmq_snd_port' - the ZeroMQ port the messages will be sent """ -from multiprocessing import Process -#from test_process import print_slices -import dquality.commom.containers as containers -from configobj import ConfigObj +import zmq +import dquality.common.constants as const + __author__ = "Barbara Frosik" __copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." __docformat__ = 'restructuredtext en' -__all__ = ['start_process', - 'parse_config'] - - - -def start_process(dataq, *args): - """ - This function parses the positional parameters. Then it starts a client process, passing in a queue as first - parameter, followed by the parsed parameters. The function of the process must be included in imports. +__all__ = ['zmq_sen.zmq_sen', + 'zmq_sen.send_to_zmq'] - Parameters - ---------- - dataq : multiprocessing.Queue - a queue used to transfer data from feed to client process - *args : list - a list of posisional parameters required by the client process - - Returns - ------- - None +class zmq_sen(): """ - - # This is an exmple code - # a = args[0] - # b = args[1] - # p = Process(target=print_slices, args=(dataq, a, b,)) - # p.start() - - -def parse_config(config): + This class represents ZeroMQ server. """ - This function parses the configuration file. It must return the specified variables. - - Parameters - ---------- - config : str - a configuration file - - Returns - ------- - no_frames : int - number of frames that will be processed - - detector : str - a string defining the first prefix in area detector, it has to match the area detector configuration - - detector_basic : str - a string defining the second prefix in area detector, defining the basic parameters, it has to - match the area detector configuration - - detector_image : str - a string defining the second prefix in area detector, defining the image parameters, it has to - match the area detector configuration - - """ - - # This is an exmple code - # if os.path.isfile(config): - # conf = ConfigObj(config) - # else: - # print ('configuration file ' + config + ' not found') - # return None - # - # try: - # no_frames = conf['no_frames'] - # except KeyError: - # print ('no_frames parameter not configured.') - # return None - # try: - # detector = conf['detector'] - # except KeyError: - # print ('configuration error: detector parameter not configured.') - # return None - # try: - # detector_basic = conf['detector_basic'] - # except KeyError: - # print ('configuration error: detector_basic parameter not configured.') - # return None - # try: - # detector_image = conf['detector_image'] - # except KeyError: - # print ('configuration error: detector_image parameter not configured.') - # return None - # - # return int(no_frames), detector, detector_basic, detector_image - - -def pack_data(slice): - """ - This function packs a single image data into a specific container. - - Parameters - ---------- - slice : nparray - image data - - """ - # This in an example code - # return containers.Data(slice) - + def __init__(self, port=None): + """ + Constructor + + This constructor creates zmq Context and socket for the zmq.PAIR. + It binds with the port and will accept one connection. + + Parameters + ---------- + port : str + serving port + + """ + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PAIR) + self.socket.bind("tcp://*:%s" % port) + + + def send_to_zmq(self, data): + """ + This sends out received data to an established connection. + + Parameters + ---------- + data : Data object + a Data instance containing frame attributes and frame data + + Returns + ------- + none + """ + if data.status == const.DATA_STATUS_END: + self.socket.send_json( + dict( + key="end", + document="... end of transmission ...", + )) + self.context.destroy() + if data.status == const.DATA_STATUS_DIM: + self.socket.send_json( + dict( + key="dim", + dim_x=data.dim_x, + dim_y=data.dim_y, + )) + else: + slice = data.slice + self.socket.send_json( + dict( + key="image", + dtype=str(slice.dtype), + shape=slice.shape, + ver=data.ver, + image_number=data.image_number, + theta= data.theta, + # image_timestamp=image_time, + # sending_timestamp=time.time(), + # rotation=_cache_["rotation"], + # rotation_timestamp=rotation_time, + document="... see next message ...", + + ), zmq.SNDMORE + ) + # binary image is not serializable in JSON, send separately + self.socket.send(slice.flatten()) diff --git a/dquality/common/constants.py b/dquality/common/constants.py index d1e6f02..f6c2181 100644 --- a/dquality/common/constants.py +++ b/dquality/common/constants.py @@ -1,12 +1,3 @@ -QUALITYCHECK_MEAN = 1 -QUALITYCHECK_STD = 2 -QUALITYCHECK_SAT = 3 -QUALITYCHECK_SUM = 4 -QUALITYCHECK_FRAME_SAT = 5 -QUALITYCHECK_RATE_SAT = 6 -STAT_START = 100 -STAT_MEAN = 100 -ACC_SAT = 101 QUALITYERROR_LOW = -1 QUALITYERROR_HIGH = -2 @@ -22,37 +13,12 @@ FEEDBACK_CONSOLE = 'console' FEEDBACK_LOG = 'log' FEEDBACK_PV = 'pv' +FEEDBACK_PV_12 = 'pv_12' +FEEDBACK_FULL = 'full' DATA_STATUS_DATA = 0 DATA_STATUS_MISSING = 1 DATA_STATUS_END = 2 +DATA_STATUS_DIM = 3 -mapper = { - 'QUALITYCHECK_MEAN' : 1, - 'QUALITYCHECK_STD' : 2, - 'QUALITYCHECK_SAT' : 3, - 'QUALITYCHECK_SUM' : 4, - 'QUALITYCHECK_FRAME_SAT' : 5, - 'QUALITYCHECK_RATE_SAT' : 6, - 'STAT_START' : 100, - 'STAT_MEAN' : 100, - 'ACC_SAT' : 101, - - 'QUALITYERROR_LOW' : -1, - 'QUALITYERROR_HIGH' : -2, - 'NO_ERROR' : 0, -} - -def get_id(name): - return mapper[name] - -def to_string(qualitycheck): - qc_map = {1:'mean', - 2:'st_dev', - 3:'saturation', - 4:'sum', - 5:'frame_sat', - 6:'rate_sat', - 100:'stat_mean', - 101:'acc_sat'} - return qc_map[qualitycheck] +ZMQ_CONTROLLER_PORT = 5511 \ No newline at end of file diff --git a/dquality/common/containers.py b/dquality/common/containers.py index 5280459..7dd117a 100644 --- a/dquality/common/containers.py +++ b/dquality/common/containers.py @@ -1,14 +1,8 @@ -from multiprocessing import Lock import dquality.common.constants as const from multiprocessing import Process import importlib from os import path import sys -import dquality.realtime.pv_feedback_driver as drv -if sys.version[0] == '2': - import thread as thread -else: - import _thread as thread class Result: @@ -27,7 +21,8 @@ class Results: This class is a container of results of all quality checks for a single frame, and attributes such as flag indicating if all quality checks passed, dat type, and index. """ - def __init__(self, type, index, failed, results): + def __init__(self, type, index, failed, results, text=None): + self.text = text self.type = type self.index = index self.failed = failed @@ -40,120 +35,13 @@ class Data: """ This class is a container of data. """ - def __init__(self, status, slice=None, type=None, acq_time = None): + def __init__(self, status, slice=None, type=None, **kwargs): self.status = status if status == const.DATA_STATUS_DATA: self.slice = slice self.type = type - if acq_time is not None: - self.acq_time = acq_time - - -class Feedback: - """ - This class is a container of real-time feedback related information. - """ - def __init__(self, feedback_type, ): - """ - Constructor - - Parameters - ---------- - feedback_type : list - a list of configured feedbac types. Possible options: console, log, and pv - """ - self.feedback_type = feedback_type - - def set_feedback_pv(self, feedback_pvs, detector): - """ - This function sets feedback_pvs, and detector fields. - - Parameters - ---------- - feedback_pvs : list - a list of feedback process variables names, for each data type combination with the - applicable quality check - detector : str - a pv name of the detector - """ - self.feedback_pvs = feedback_pvs - self.detector = detector - - def set_logger(self, logger): - """ - This function sets logger. - - Parameters - ---------- - logger : Logger - an instance of Logger - """ - self.logger = logger - - def set_driver(self, driver): - """ - This function sets driver. - - Parameters - ---------- - driver : FbDriver - an instance of FbDriver - """ - self.driver = driver - - def write_to_pv(self, pv, index): - """ - This function calls write method on driver field to update pv. - - Parameters - ---------- - pv : str - a name of the pv, contains information about the data type and quality check (i.e. data_white_mean) - index : int - index of failed frame - """ - self.driver.write(pv, index) - - def quality_feedback(self, feedbackq): - """ - This function provides feedback as defined by the feedback_type in a real time. - - If the feedback type contains pv type, this function creates server and initiates driver handling the feedback - pvs.It dequeues results from the 'feedbackq' queue and processes all feedback types that have been configured. - It will stop processing the queue when it dequeues data indicating end status. - - Parameters - ---------- - feedbackq : Queue - a queue that will deliver Result objects of failed quality check - - Returns - ------- - none - """ - if const.FEEDBACK_PV in self.feedback_type: - server = drv.FbServer() - driver = server.init_driver(self.detector, self.feedback_pvs) - thread.start_new_thread(server.activate_pv, ()) - self.set_driver(driver) - - evaluating = True - while evaluating: - while not feedbackq.empty(): - try: - result = feedbackq.get_nowait() - if result == const.DATA_STATUS_END: - evaluating = False - else: - if const.FEEDBACK_CONSOLE in self.feedback_type: - print ('failed frame '+str(result.index)+ ' result of '+const.to_string(result.quality_id)+ ' is '+ str(result.res)) - if const.FEEDBACK_LOG in self.feedback_type: - self.logger.info('failed frame '+str(result.index)+ ' result of '+const.to_string(result.quality_id)+ ' is '+ str(result.res)) - if const.FEEDBACK_PV in self.feedback_type: - quality_check = const.to_string(result.quality_id) - self.write_to_pv(result.type + '_' + quality_check, result.index) - except: - pass + for key in kwargs: # styles is a regular dictionary + setattr(self, key, kwargs[key]) class Aggregate: @@ -171,7 +59,7 @@ class Aggregate: """ - def __init__(self, data_type, quality_checks, aggregate_limit, feedbackq = None): + def __init__(self, data_type, quality_checks, **kwargs): #data_type, quality_checks, aggregate_limit=0, feedbackq = None): """ Constructor @@ -186,14 +74,20 @@ def __init__(self, data_type, quality_checks, aggregate_limit, feedbackq = None) responsible for delivering the feedback in areal time """ self.data_type = data_type - self.feedbackq = feedbackq - self.aggregate_limit = aggregate_limit + try: + self.feedbackq = kwargs['feedbackq'] + except KeyError: + self.feedbackq = None + + try: + self.aggregate_limit = kwargs['aggregate_limit'] + except KeyError: + self.aggregate_limit = -1 self.bad_indexes = {} self.good_indexes = {} self.results = {} - self.lock = Lock() for qc in quality_checks: self.results[qc] = [] @@ -202,8 +96,6 @@ def get_results(self, check): """ This returns the results of a given quality check. - This operation uses lock, as other process writes to results. - Parameters ---------- check : int @@ -214,35 +106,10 @@ def get_results(self, check): res : list a list containing results that passed the given quality check """ - self.lock.acquire() res = self.results[check] - self.lock.release() return res - def add_result(self, result, check): - """ - This add a new result for a given quality check to results. - - This operation uses lock, as other process reads the results. - - Parameters - ---------- - result : Result - a result instance - - check : int - a value indication quality check id - - Returns - ------- - none - """ - self.lock.acquire() - self.results[check].append(result) - self.lock.release() - - def handle_results(self, results): """ This handles all results for one frame. @@ -263,25 +130,18 @@ def handle_results(self, results): ------- none """ - def send_feedback(): - if self.feedbackq is not None: - for result in results.results: - if result.error != 0: - result.index = results.index - result.type = results.type - self.feedbackq.put(result) - if self.aggregate_limit == -1: - if results.failed: - send_feedback() + if self.feedbackq is not None: + self.feedbackq.put(results) else: if results.failed: self.bad_indexes[results.index] = results.results - send_feedback() + if self.feedbackq is not None: + self.feedbackq.put(results) else: self.good_indexes[results.index] = results.results for result in results.results: - self.add_result(result.res, result.quality_id) + self.results[result.quality_id].append(result) def is_empty(self): diff --git a/dquality/common/qualitychecks.py b/dquality/common/qualitychecks.py index 3b4b935..2520eba 100644 --- a/dquality/common/qualitychecks.py +++ b/dquality/common/qualitychecks.py @@ -59,9 +59,14 @@ __copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." __docformat__ = 'restructuredtext en' __all__ = ['find_result', - 'validate_mean_signal_intensity', - 'validate_signal_intensity_standard_deviation', - 'validate_stat_mean'] + 'mean', + 'st_dev', + 'sum', + 'frame_sat_cnt_rate', + 'frame_sat_pts', + 'stat_mean', + 'acc_sat', + 'run_quality_checks'] def find_result(res, quality_id, limits): @@ -87,16 +92,24 @@ def find_result(res, quality_id, limits): a Result object """ - if res < limits['low_limit']: - result = Result(res, quality_id, const.QUALITYERROR_LOW) - elif res > limits['high_limit']: - result = Result(res, quality_id, const.QUALITYERROR_HIGH) - else: - result = Result(res, quality_id, const.NO_ERROR) - return result + try: + ll = limits['low_limit'] + if res < ll: + return Result(res, quality_id, const.QUALITYERROR_LOW) + except KeyError: + pass + + try: + hl = limits['high_limit'] + if res > hl: + return Result(res, quality_id, const.QUALITYERROR_HIGH) + except KeyError: + pass + + return Result(res, quality_id, const.NO_ERROR) -def validate_mean_signal_intensity(data, limits): +def mean(**kws): """ This method validates mean value of the frame. @@ -117,14 +130,16 @@ def validate_mean_signal_intensity(data, limits): result : Result a Result object """ + limits = kws['limits'] + data = kws['data'] this_limits = limits['mean'] res = np.mean(data.slice) - result = find_result(res, const.QUALITYCHECK_MEAN, this_limits) + result = find_result(res, 'mean', this_limits) return result -def validate_signal_intensity_standard_deviation(data, limits): +def st_dev(**kws): """ This method validates standard deviation value of the frame. @@ -145,14 +160,16 @@ def validate_signal_intensity_standard_deviation(data, limits): result : Result a Result object """ + limits = kws['limits'] + data = kws['data'] this_limits = limits['std'] res = np.std(data.slice) - result = find_result(res, const.QUALITYCHECK_STD, this_limits) + result = find_result(res, 'st_dev', this_limits) return result -def validate_intensity_sum(data, limits): +def sum(**kws): """ This method validates a sum of all intensities value of the frame. @@ -173,13 +190,17 @@ def validate_intensity_sum(data, limits): result : Result a Result object """ + limits = kws['limits'] + data = kws['data'] + this_limits = limits['sum'] res = data.slice.sum() - result = find_result(res, const.QUALITYCHECK_SUM, this_limits) + result = find_result(res, 'sum', this_limits) + return result -def validate_cnt_rate_sat(data, limits): +def frame_sat_cnt_rate(**kws): """ This method validates a sum of all intensities value of the frame. @@ -200,19 +221,36 @@ def validate_cnt_rate_sat(data, limits): result : Result a Result object """ - this_limits = limits['rate_sat'] + # limits = kws['limits'] + # data = kws['data'] + # + # this_limits = limits['rate_sat'] + # acq_time = data.rate_sat + # res = data.slice.sum()/acq_time + # result = find_result(res, 'rate_sat', this_limits) + # return result + # + limits = kws['limits'] + data = kws['data'] + + # find how many pixels have saturation rate (intensity divided by acquire time) exceeding the + # point saturation rate limit acq_time = data.acq_time - res = data.slice.sum()/acq_time - result = find_result(res, const.QUALITYCHECK_RATE_SAT, this_limits) + sat_high = (limits['point_sat_rate'])['high_limit'] + points = (data.slice/acq_time > sat_high).sum() + + # evaluate if the number of saturated points are within limit + this_limits = limits['frame_sat_cnt_rate'] + result = find_result(points, 'frame_sat_cnt_rate', this_limits) return result -def validate_frame_saturation(data, limits): +def frame_sat_pts(**kws): """ This method validates saturation value of the frame. - This function calculates calculates the number of saturated pixels in the given frame. The result is compared with - threshhold values to determine the quality of the data. The result, comparison result, index, and quality_id values + This function calculates the number of saturated pixels in the given frame. The result is compared with + threshold value to determine the quality of the data. The result, comparison result, index, and quality_id values are saved in a new Result object. Parameters @@ -228,41 +266,20 @@ def validate_frame_saturation(data, limits): result : Result a Result object """ - this_limits = limits['frame_sat'] - sat_high = (limits['sat'])['high_limit'] - res = (data.slice > sat_high).sum() - result = Result(res, const.QUALITYCHECK_FRAME_SAT, this_limits) - return result - - -def validate_saturation(data, limits): - """ - This method validates saturation value of the frame. - - This function calculates calculates the number of saturated pixels in the given frame. The result is compared with - threshhold values to determine the quality of the data. The result, comparison result, index, and quality_id values - are saved in a new Result object. + limits = kws['limits'] + data = kws['data'] - Parameters - ---------- - data : Data - data instance that includes slice 2D data + # find how many pixels have intensity exceeding the point saturation limit + sat_high = (limits['point_sat'])['high_limit'] + points = (data.slice > sat_high).sum() - limits : dictionary - a dictionary containing threshold values for the evaluated data type - - Returns - ------- - result : Result - a Result object - """ - sat_high = (limits['sat'])['high_limit'] - res = (data.slice > sat_high).sum() - result = Result(res, const.QUALITYCHECK_SAT, const.NO_ERROR) + # evaluate if the number of saturated points are within limit + this_limits = limits['frame_sat_pts'] + result = find_result(points, 'frame_sat_pts', this_limits) return result -def validate_stat_mean(limits, aggregate, results): +def stat_mean(**kws): """ This is one of the statistical validation methods. @@ -287,26 +304,30 @@ def validate_stat_mean(limits, aggregate, results): result : Result a Result object """ + limits = kws['limits'] + aggregate = kws['aggregate'] + results = kws['results'] + this_limits = limits['stat_mean'] - stat_data = aggregate.get_results(const.QUALITYCHECK_MEAN) + stat_data = aggregate.get_results('mean') length = len(stat_data) # calculate std od mean values in aggregate if length == 0: - return find_result(0, const.STAT_MEAN, this_limits) + return find_result(0, 'stat_mean', this_limits) elif length == 1: mean_mean = np.mean(stat_data) else: mean_mean = np.mean(stat_data[0:(length -1)]) - result = results[const.QUALITYCHECK_MEAN] + result = results['mean'] delta = result.res - mean_mean - result = find_result(delta, const.STAT_MEAN, this_limits) + result = find_result(delta, 'stat_mean', this_limits) return result -def validate_accumulated_saturation(limits, aggregate, results): +def acc_sat(**kws): """ This is one of the statistical validation methods. @@ -331,27 +352,30 @@ def validate_accumulated_saturation(limits, aggregate, results): result : Result a Result object """ + limits = kws['limits'] + aggregate = kws['aggregate'] + results = kws['results'] + this_limits = limits['sat_points'] - stat_data = aggregate.get_results(const.QUALITYCHECK_SAT) + stat_data = aggregate.get_results('frame_sat') # calculate total saturated points - result = results[const.QUALITYCHECK_SAT] + result = results['saturation'] total = np.sum(stat_data) + result.res - result = find_result(total, const.ACC_SAT, this_limits) + result = find_result(total, 'acc_sat', this_limits) return result # maps the quality check ID to the function object -function_mapper = {const.QUALITYCHECK_MEAN : validate_mean_signal_intensity, - const.QUALITYCHECK_STD : validate_signal_intensity_standard_deviation, - const.QUALITYCHECK_SAT : validate_saturation, - const.QUALITYCHECK_FRAME_SAT : validate_frame_saturation, - const.QUALITYCHECK_RATE_SAT: validate_cnt_rate_sat, - const.QUALITYCHECK_SUM: validate_intensity_sum, - const.STAT_MEAN : validate_stat_mean, - const.ACC_SAT : validate_accumulated_saturation} - -def run_quality_checks(data, index, resultsq, aggregate, limits, quality_checks): +function_mapper = {'mean' : mean, + 'st_dev' : st_dev, + 'frame_sat_pts' : frame_sat_pts, + 'frame_sat_cnt_rate': frame_sat_cnt_rate, + 'sum': sum, + 'stat_mean' : stat_mean, + 'acc_sat' : acc_sat} + +def run_quality_checks(data, index, aggregate, limits, quality_checks): """ This function runs validation methods applicable to the frame data type and enqueues results. @@ -382,22 +406,16 @@ def run_quality_checks(data, index, resultsq, aggregate, limits, quality_checks) ------- none """ - quality_checks.sort() - results_dir = {} + #quality_checks.sort() + results_dict = {} failed = False - for function_id in quality_checks: - function = function_mapper[function_id] - if function_id < const.STAT_START: - result = function(data, limits) - results_dir[function_id] = result - if result.error != 0: - failed = True - else: - if not failed: - result = function(limits, aggregate, results_dir) - results_dir[function_id] = result - if result.error != 0: - failed = True - - results = Results(data.type, index, failed, results_dir) - resultsq.put(results) + for qc in quality_checks: + function = function_mapper[qc] + result = function(limits=limits, data=data, aggregate=aggregate, results=results_dict) + + results_dict[qc] = result + if result.error != 0: + failed = True + + results = Results(data.type, index, failed, results_dict) + return results diff --git a/dquality/common/utilities.py b/dquality/common/utilities.py index b64da55..cd7be78 100644 --- a/dquality/common/utilities.py +++ b/dquality/common/utilities.py @@ -451,6 +451,7 @@ def get_feedback_pvs(quality_checks): for type in quality_checks: qcs = quality_checks[type] for qc in qcs: - qc_str = type + '_' + const.to_string(qc) + #qc_str = type + '_' + const.to_string(qc) + qc_str = type + '_' + qc feedback_pvs.append(qc_str) return feedback_pvs diff --git a/dquality/data.py b/dquality/data.py index fb2e0cc..c9de562 100644 --- a/dquality/data.py +++ b/dquality/data.py @@ -151,7 +151,7 @@ def init(config): with open(qcfile) as qc_file: dict = json.loads(qc_file.read()) - quality_checks = utils.get_quality_checks(dict) + quality_checks = dict try: report_type = conf['report_type'] @@ -222,15 +222,18 @@ def process_data(data_type): for i in range(0,dt.shape[0]): data = Data(const.DATA_STATUS_DATA, dt[i], data_type) dataq.put(data) - # add delay to slow down flow up, so the flow down (results) - # are handled in synch - time.sleep(.1) + + def get_no_frames(): + return fp[data_tags['data']].shape[0] + fp[data_tags['data_white']].shape[0] +fp[data_tags['data_dark']].shape[0] fp, tags = utils.get_data_hdf(file) dataq = Queue() aggregateq = Queue() - p = Process(target=handler.handle_data, args=(dataq, limits, aggregateq, quality_checks, consumers)) + args = [limits, quality_checks, get_no_frames()] + kwargs = {} + kwargs['consumers'] = consumers + p = Process(target=handler.handle_data, args=(dataq, aggregateq, args, kwargs)) p.start() # assume a fixed order of data types; this will determine indexes on the data diff --git a/dquality/realtime/__init__.py b/dquality/feeds/__init__.py similarity index 100% rename from dquality/realtime/__init__.py rename to dquality/feeds/__init__.py diff --git a/dquality/realtime/adapter.py b/dquality/feeds/adapter.py similarity index 76% rename from dquality/realtime/adapter.py rename to dquality/feeds/adapter.py index 673393f..96e106c 100644 --- a/dquality/realtime/adapter.py +++ b/dquality/feeds/adapter.py @@ -55,7 +55,7 @@ """ -from multiprocessing import Process +from multiprocessing import Process, Queue from dquality.handler import handle_data import dquality.common.containers as containers import dquality.common.constants as const @@ -67,7 +67,8 @@ __docformat__ = 'restructuredtext en' __all__ = ['start_process', 'parse_config', - 'pack_data'] + 'pack_data', + 'pack_data_with_decor'] @@ -98,18 +99,10 @@ def start_process(dataq, logger, *args): quality_checks = args[2] aggregate_limit = args[3] consumers = args[4] - feedback = args[5] + feedbackq = args[5] - feedback_obj = containers.Feedback(feedback) - if const.FEEDBACK_LOG in feedback: - feedback_obj.set_logger(logger) - if const.FEEDBACK_PV in feedback: - feedback_pvs = utils.get_feedback_pvs(quality_checks) - detector = args[6] - feedback_obj.set_feedback_pv(feedback_pvs, detector) - - p = Process(target=handle_data, args=(dataq, limits, reportq, quality_checks, aggregate_limit, consumers, feedback_obj)) + p = Process(target=handle_data, args=(dataq, limits, reportq, quality_checks, aggregate_limit, consumers, feedbackq)) p.start() @@ -132,14 +125,6 @@ def parse_config(config): detector : str a string defining the first prefix in area detector, it has to match the area detector configuration - detector_basic : str - a string defining the second prefix in area detector, defining the basic parameters, it has to - match the area detector configuration - - detector_image : str - a string defining the second prefix in area detector, defining the image parameters, it has to - match the area detector configuration - """ conf = utils.get_config(config) @@ -160,23 +145,13 @@ def parse_config(config): try: detector = conf['detector'] except KeyError: - print ('configuration error: detector parameter not configured.') - return None - try: - detector_basic = conf['detector_basic'] - except KeyError: - print ('configuration error: detector_basic parameter not configured.') - return None - try: - detector_image = conf['detector_image'] - except KeyError: - print ('configuration error: detector_image parameter not configured.') - return None + print ('detector parameter not configured.') + detector = None - return int(no_frames), aggregate_limit, detector, detector_basic, detector_image + return int(no_frames), aggregate_limit, detector -def pack_data(slice, type): +def pack_data(slice, type, **kwargs): """ This function packs a single image data into a specific container. @@ -190,29 +165,10 @@ def pack_data(slice, type): """ if slice is not None: - return containers.Data(const.DATA_STATUS_DATA, slice, type) + return containers.Data(const.DATA_STATUS_DATA, slice, type, **kwargs) elif type == 'missing': return containers.Data(const.DATA_STATUS_MISSING) else: return containers.Data(const.DATA_STATUS_END) -def pack_data_with_decor(slice, type, acq_time): - """ - This function packs a single image data into a specific container. - - Parameters - ---------- - slice : nparray - image data - - type : str - data type, as 'data', 'data_white', or 'data_dark' - - """ - if slice is not None: - return containers.Data(const.DATA_STATUS_DATA, slice, type, acq_time) - elif type == 'missing': - return containers.Data(const.DATA_STATUS_MISSING) - else: - return containers.Data(const.DATA_STATUS_END) diff --git a/dquality/realtime/feed.py b/dquality/feeds/pv_feed.py similarity index 81% rename from dquality/realtime/feed.py rename to dquality/feeds/pv_feed.py index 2ff995f..1930e0d 100644 --- a/dquality/realtime/feed.py +++ b/dquality/feeds/pv_feed.py @@ -54,11 +54,7 @@ The change is detected with a callback. The data type is determined from PV. The data and the type are passed (as object) to the consuming process. This module requires configuration file with the following parameters: -'detector', a string defining the first prefix in area detector, it has to match the area detector configuration -'detector_basic', a string defining the second prefix in area detector, defining the basic parameters, it has to -match the area detector configuration -'detector_image', a string defining the second prefix in area detector, defining the image parameters, it has to -match the area detector configuration +'detector', a string defining the first prefix in area detector 'no_frames', number of frames that will be fed. If not given, the optional parameter 'sequence' to the feed_data method must not be None. It can be either int defining number of frames, or a list of touples, defining data types sequence. (i.e. [('data_dark', 4), ('data_white', 14), ('data', 614), ('data_dark', 619)) @@ -67,10 +63,13 @@ from epics import caget, PV from epics.ca import CAThread -from multiprocessing import Queue +from multiprocessing import Queue, Process import numpy as np -import dquality.realtime.adapter as adapter +import dquality.feeds.adapter as adapter +import dquality.handler as handler +import dquality.common.constants as const import sys +import time if sys.version[0] == '2': import Queue as tqueue else: @@ -110,7 +109,7 @@ def __init__(self): self.ctr = 0 self.sequence = None self.sequence_index = 0 - self.cntr_pv = None + self.callback_pv = None def deliver_data(self, data_pv, frame_type_pv, logger): @@ -163,10 +162,13 @@ def verify_sequence(logger, data_type): # self.exitq.put('exit') types = build_type_map() - done = False + self.done = False frame_index = 0 - while not done: - current_counter = self.thread_dataq.get() + while not self.done: + try: + current_counter = self.thread_dataq.get(timeout = 1) + except tqueue.Empty: + continue if self.no_frames < 0 or current_counter < self.no_frames: try: data = np.array(caget(data_pv)) @@ -189,16 +191,16 @@ def verify_sequence(logger, data_type): verify_sequence(logger, data_type) except: self.finish() - done = True + self.done = True logger.error('reading image raises exception, possibly the detector exposure time is too small') else: - done = True - + self.done = True self.finish() def get_packed_data(self, data, data_type): return adapter.pack_data(data, data_type) - + + def on_change(self, pvname=None, **kws): """ A callback method that activates when a frame counter of area detector changes. @@ -216,18 +218,22 @@ def on_change(self, pvname=None, **kws): ------- None """ - - current_ctr = kws['value'] - #on first callback adjust the values - if self.ctr == 0: - self.ctr = current_ctr - if self.no_frames >= 0: - self.no_frames += current_ctr + if pvname == self.counter_pv: + current_ctr = kws['value'] + #on first callback adjust the values + if self.ctr == 0: + self.ctr = current_ctr + if self.no_frames >= 0: + self.no_frames += current_ctr + else: + # if the callback is not on the counter pv, keep track of counter + self.ctr += 1 + current_ctr = self.ctr self.thread_dataq.put(current_ctr) - - - def start_processes(self, counter_pv, data_pv, frame_type_pv, logger, *args): + + + def start_processes(self, counter_pv, data_pv, frame_type_pv, logger, reportq, *args, **kwargs): """ This function starts processes and callbacks. @@ -256,19 +262,22 @@ def start_processes(self, counter_pv, data_pv, frame_type_pv, logger, *args): ------- None """ + self.counter_pv = counter_pv data_thread = CAThread(target = self.deliver_data, args=(data_pv, frame_type_pv, logger,)) data_thread.start() + p = Process(target=handler.handle_data, + args=(self.process_dataq, reportq, args, kwargs,)) + p.start() - adapter.start_process(self.process_dataq, logger, *args) - self.cntr_pv = PV(counter_pv) - self.cntr_pv.add_callback(self.on_change, index = 1) - - # self.exitq.get() - # print 'got Exit in exitq stopping callback' - # self.cntr.clear_callbacks() - - - def get_pvs(self, detector, detector_basic, detector_image): + try: + callback_pv_name = kwargs['callback_pv'] + except KeyError: + callback_pv_name = counter_pv + self.callback_pv = PV(callback_pv_name) + self.callback_pv.add_callback(self.on_change, index = 1) + + + def get_pvs(self, detector): """ This function takes defined strings from configuration file and constructs PV variables that are accessed during processing. @@ -278,14 +287,6 @@ def get_pvs(self, detector, detector_basic, detector_image): detector : str a string defining the first prefix in area detector, it has to match the area detector configuration - detector_basic : str - a string defining the second prefix in area detector, defining the basic parameters, it has to - match the area detector configuration - - detector_image : str - a string defining the second prefix in area detector, defining the image parameters, it has to - match the area detector configuration - Returns ------- acquire_pv : str @@ -308,27 +309,29 @@ def get_pvs(self, detector, detector_basic, detector_image): """ - acquire_pv = detector + ':' + detector_basic + ':' + 'Acquire' - counter_pv = detector + ':' + detector_basic + ':' + 'NumImagesCounter_RBV' - data_pv = detector + ':' + detector_image + ':' + 'ArrayData' - sizex_pv = detector + ':' + detector_image + ':' + 'ArraySize0_RBV' - sizey_pv = detector + ':' + detector_image + ':' + 'ArraySize1_RBV' - frame_type_pv = detector + ':' + detector_basic + ':' + 'FrameType' + acquire_pv = detector + ':cam1:Acquire' + counter_pv = detector + ':cam1:NumImagesCounter_RBV' + data_pv = detector + ':image1:ArrayData' + sizex_pv = detector + ':image1:ArraySize0_RBV' + sizey_pv = detector + ':image1:ArraySize1_RBV' + frame_type_pv = detector + ':cam1:FrameType' return acquire_pv, counter_pv, data_pv, sizex_pv, sizey_pv, frame_type_pv - def feed_data(self, no_frames, detector, detector_basic, detector_image, logger, sequence=None, *args): + def feed_data(self, logger, reportq, *args, **kwargs): #no_frames, detector, logger, sequence=None, *args): """ - This function is called by an client to start the process. + This function is called by a client to start the process. - It parses configuration and gets needed process variables. It stores necessary values in the self object. After all initial settings are completed, the method awaits for the area detector to start acquireing by polling the PV. When the area detective is active it starts processing. Parameters ---------- - config : str - a configuration file + no_frames : int + number of frames to feed, indefinately if -1 + + detector : str + detector name logger : Logger a Logger instance, recommended to use the same logger for feed and consuming process @@ -345,15 +348,8 @@ def feed_data(self, no_frames, detector, detector_basic, detector_image, logger, ------- None """ - acquire_pv, counter_pv, data_pv, sizex_pv, sizey_pv, frame_type_pv = self.get_pvs(detector, detector_basic, detector_image) - self.no_frames = no_frames - # if sequence is None: - # self.no_frames = no_frames - # elif type(sequence) is int: - # self.no_frames = sequence - # else: - # self.sequence = sequence - # self.no_frames = sequence[len(sequence)-1][1] + 1 + acquire_pv, counter_pv, data_pv, sizex_pv, sizey_pv, frame_type_pv = self.get_pvs(kwargs['detector']) + self.no_frames = args[2] test = True @@ -363,12 +359,19 @@ def feed_data(self, no_frames, detector, detector_basic, detector_image, logger, ack = caget(acquire_pv) if ack == 1: test = False - self.start_processes(counter_pv, data_pv, frame_type_pv, logger, *args) + self.start_processes(counter_pv, data_pv, frame_type_pv, logger, reportq, *args, **kwargs) + else: + time.sleep(.005) return caget(acquire_pv) def finish(self): - self.process_dataq.put(adapter.pack_data(None, "end")) - self.cntr_pv.clear_callbacks() + self.process_dataq.put(adapter.pack_data(None, const.DATA_STATUS_END)) + self.done = True + try: + self.callback_pv.clear_callbacks() + except: + pass + diff --git a/dquality/feeds/pv_feed_decorator.py b/dquality/feeds/pv_feed_decorator.py new file mode 100644 index 0000000..bc86f8f --- /dev/null +++ b/dquality/feeds/pv_feed_decorator.py @@ -0,0 +1,30 @@ +from dquality.feeds.pv_feed import Feed +import dquality.feeds.adapter as adapter +from epics import caget + +class FeedDecorator(Feed): + def __init__(self, decor): + Feed.__init__(self) + self.decor_map = {} + for entry in decor: + self.decor_map[entry] = decor[entry] + + + def get_packed_data(self, data, data_type): + args = {} + for entry in self.decor_map: + if entry == 'file_name': + full_name = caget(self.decor_map[entry], as_string=True) + rev_full_name = full_name[::-1] + ind = rev_full_name.find('/') + rev_name = rev_full_name[0:ind] + file_name = rev_name[::-1] + # for test + #file_name = 'file'+str(caget('BBF1:cam1:ArrayCounter_RBV')) + args[entry] = file_name + else: + args[entry] = caget(self.decor_map[entry]) + + return adapter.pack_data(data, data_type, **args) + + diff --git a/dquality/feeds/pva_feed.py b/dquality/feeds/pva_feed.py new file mode 100644 index 0000000..7fb47fd --- /dev/null +++ b/dquality/feeds/pva_feed.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# ######################################################################### +# Copyright (c) 2016, UChicago Argonne, LLC. All rights reserved. # +# # +# Copyright 2016. UChicago Argonne, LLC. This software was produced # +# under U.S. Government contract DE-AC02-06CH11357 for Argonne National # +# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the # +# U.S. Department of Energy. The U.S. Government has rights to use, # +# reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR # +# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR # +# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is # +# modified to produce derivative works, such modified software should # +# be clearly marked, so as not to confuse it with the version available # +# from ANL. # +# # +# Additionally, redistribution and use in source and binary forms, with # +# or without modification, are permitted provided that the following # +# conditions are met: # +# # +# * Redistributions of source code must retain the above copyright # +# notice, this list of conditions and the following disclaimer. # +# # +# * Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in # +# the documentation and/or other materials provided with the # +# distribution. # +# # +# * Neither the name of UChicago Argonne, LLC, Argonne National # +# Laboratory, ANL, the U.S. Government, nor the names of its # +# contributors may be used to endorse or promote products derived # +# from this software without specific prior written permission. # +# # +# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS # +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago # +# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # +# POSSIBILITY OF SUCH DAMAGE. # +# ######################################################################### + +""" +Please make sure the installation :ref:`pre-requisite-reference-label` are met. + +This module feeds the data coming from detector to a process using queue. +""" + +import dquality.common.qualitychecks as ver +import dquality.clients.fb_client.simple_feedback as fb +import dquality.clients.zmq_client as zmq_client +import dquality.common.containers as containers +import dquality.common.constants as const +import pvaccess + + +__author__ = "Barbara Frosik" +__copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." +__docformat__ = 'restructuredtext en' +__all__ = ['start_feed', + 'stop_feed' + 'on_change'] + + +class Feed: + """ + This class reads frames in a real time, and delivers to consumers. + """ + + def __init__(self, logger, limits, quality_checks, feedback, zmq_snd_port, pva_name, detector): + """ + Constructor + """ + # for communication with pvaccess - receiving data + self.data_type = 'data' + self.pva_name = pva_name + self.logger = logger + self.limits = limits + self.qc = quality_checks + self.feedback = feedback + self.zmq_snd_port = zmq_snd_port + self.detector = detector + if not self.feedback is None: + self.feedback_obj = fb.Feedback(self.feedback, self.detector, self.quality_checks, self.logger) + if not self.zmq_snd_port is None: + self.cons = zmq_client.zmq_sen(self.zmq_snd_port) + self.aggregate = containers.Aggregate(self.data_type, self.quality_checks) + + self.chan = None + + def feed_data(self): + self.chan = pvaccess.Channel(self.pva_name) + + x, y = self.chan.get('field()')['dimension'] + self.dims = (y['size'], x['size']) + print(self.dims) + #send the dimensions to client + data = containers.Data(const.DATA_STATUS_DIM) + data.dim_x = x + data.dim_y = y + self.cons.send_to_zmq(data) + + labels = [item['name'] for item in self.chan.get('field()')['attribute']] + self.theta_key = labels.index("SampleRotary") + self.scan_delta_key = labels.index("ScanDelta") + self.start_position_key = labels.index("StartPos") + + self.chan.subscribe('update', self.on_change) + self.chan.startMonitor("value,attribute,uniqueId") + + + def on_change(self, v): + uniqueId = v['uniqueId'] + print('uniqueId: ', uniqueId) + + img = v['value'][0]['ushortValue'] + scan_delta = v["attribute"][self.scan_delta_key]["value"][0]["value"] + start_position = v["attribute"][self.start_position_key]["value"][0]["value"] + theta = (start_position + uniqueId * scan_delta) % 360.0 + + img = img.reshape(self.dims) + + data = containers.Data(const.DATA_STATUS_DATA, img, self.data_type) + frame_results = ver.run_quality_checks(data, uniqueId, self.aggregate, self.limits, self.quality_checks) + + if not self.feedback is None: + self.feedback_obj.deliver(frame_results) + + if not self.zmq_snd_port is None: + data.theta = theta + data.image_number = uniqueId + data.ver = not frame_results.failed + self.cons.send_to_zmq(data) + + + def stop_feed(self): + # stop getting data + self.chan.stopMonitor() + self.chan.unsubscribe('update') + + # nothing to do to terminate updating of feedback pvs (maybe zero them?) + + # terminate zmq connection + data = containers.Data(const.DATA_STATUS_END) + self.cons.send_to_zmq(data) + diff --git a/dquality/feeds/zmq_feed.py b/dquality/feeds/zmq_feed.py new file mode 100644 index 0000000..9d5d8b6 --- /dev/null +++ b/dquality/feeds/zmq_feed.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# ######################################################################### +# Copyright (c) 2016, UChicago Argonne, LLC. All rights reserved. # +# # +# Copyright 2016. UChicago Argonne, LLC. This software was produced # +# under U.S. Government contract DE-AC02-06CH11357 for Argonne National # +# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the # +# U.S. Department of Energy. The U.S. Government has rights to use, # +# reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR # +# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR # +# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is # +# modified to produce derivative works, such modified software should # +# be clearly marked, so as not to confuse it with the version available # +# from ANL. # +# # +# Additionally, redistribution and use in source and binary forms, with # +# or without modification, are permitted provided that the following # +# conditions are met: # +# # +# * Redistributions of source code must retain the above copyright # +# notice, this list of conditions and the following disclaimer. # +# # +# * Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in # +# the documentation and/or other materials provided with the # +# distribution. # +# # +# * Neither the name of UChicago Argonne, LLC, Argonne National # +# Laboratory, ANL, the U.S. Government, nor the names of its # +# contributors may be used to endorse or promote products derived # +# from this software without specific prior written permission. # +# # +# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS # +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago # +# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # +# POSSIBILITY OF SUCH DAMAGE. # +# ######################################################################### + +""" +Please make sure the installation :ref:`pre-requisite-reference-label` are met. + +This module feeds data coming from ZeroMQ server to a process using queue. The parsing of the message is customized +for the BlueSky use case. Other cases may be added later. +The data, which represents a frame captured by detector, is received in two parts. The first part is received as json +stream and contains data attributes, such shape, type, theta and counter associated with this frame. +The second part is frame in bytes. +The frame array along with the received attributes is packed into Python object and sent to process defined in adapter +over queue. + +This module requires configuration file with the following parameters: +'zmq_rcv_port' - the ZeroMQ port +""" + +from multiprocessing import Queue, Process +import numpy as np +import zmq +import time +import sys +import json +import dquality.common.utilities as utils +import dquality.common.constants as const +import dquality.clients.feedback as fb +import dquality.common.containers as containers +import dquality.handler as handler + + +__author__ = "Barbara Frosik" +__copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." +__docformat__ = 'restructuredtext en' +__all__ = ['zmq_rec.zmq_rec', + 'zmq_rec.destroy', + 'init', + 'receive_zmq_send'] + + +class zmq_rec(): + """ + This class represents ZeroMQ connection. + """ + def __init__(self, host=None, port=None): + """ + Constructor + + This constructor creates zmq Context and socket for the zmq.PAIR. + It initiate connect to the server given by host and port. + + Parameters + ---------- + host : str + server host name + + port : str + serving port + + """ + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PAIR) + self.socket.connect("tcp://" + host +":%s" % port) + + + def destroy(self): + """ + Destroys Context. This also closes socket associated with the context. + """ + self.context.destroy() + + +def init(config): + """ + This function initializes variables according to configuration. + + It gets values from the configuration file, evaluates and processes the values. If mandatory parameter is missing, + the script logs an error and exits. + + Parameters + ---------- + config : str + configuration file name, including path + + Returns + ------- + logger : Logger + logger instance + + limits : dictionary + a dictionary containing limit values read from the configured 'limit' file + + quality_checks : dict + a dictionary containing quality check functions ids + + feedback : list + a list of strings defining real time feedback of quality checks errors. Currently supporting 'PV', 'log', and + 'console' + + report_type : int + report type; currently supporting 'none', 'error', and 'full' + + consumers : dict + a dictionary parsed from json file representing consumers + + zmq_host : str + ZeroMQ server host name + + zmq_rcv_port : str + ZeroMQ port + + detector : str + detector name, only needed if feedback contains pv + + """ + + conf = utils.get_config(config) + if conf is None: + print ('configuration file is missing') + exit(-1) + + logger = utils.get_logger(__name__, conf) + + limitsfile = utils.get_file(conf, 'limits', logger) + if limitsfile is None: + sys.exit(-1) + + with open(limitsfile) as limits_file: + limits = json.loads(limits_file.read()) + + qcfile = utils.get_file(conf, 'quality_checks', logger) + if qcfile is None: + sys.exit(-1) + + with open(qcfile) as qc_file: + dict = json.loads(qc_file.read()) + #quality_checks = utils.get_quality_checks(dict) + quality_checks = dict + + try: + feedback = conf['feedback_type'] + except KeyError: + feedback = None + + try: + report_type = conf['report_type'] + except KeyError: + report_type = const.REPORT_FULL + + try: + zmq_host = conf['zmq_host'] + except: + zmq_host = 'localhost' + + try: + zmq_rcv_port = conf['zmq_rcv_port'] + except: + zmq_rcv_port = None + print ('configuration error: zmq_rcv_port not configured') + + try: + detector = conf['detector'] + except KeyError: + print ('configuration error: detector parameter not configured.') + return None + + try: + consumers = conf['zmq_snd_port'] + except KeyError: + consumers = None + + return logger, limits, quality_checks, feedback, report_type, consumers, zmq_host, zmq_rcv_port, detector + + +def receive_zmq_send(dataq, zmq_host, zmq_rcv_port): + """ + This function receives data from socket and enqueues it into a queue until the end is detected. + + Parameters + ---------- + dataq : Queue + a queue passing data received from ZeroMQ server to another process + + zmq_host : str + ZeroMQ server host name + + zmq_rcv_port : str + ZeroMQ port + + Returns + ------- + none + """ + + conn = zmq_rec(zmq_host, zmq_rcv_port) + socket = conn.socket + interrupted = False + while not interrupted: + msg = socket.recv_json() + key = msg.get("key") + if key == "end": + data = containers.Data(const.DATA_STATUS_END) + dataq.put(data) + interrupted = True + conn.destroy() + elif key == "image": + msg["receiving_timestamp"] = time.time() + dtype = msg["dtype"] + shape = msg["shape"] + image_number = msg['image_number'] + #image_timestamp = msg['image_timestamp'] + theta = msg['rotation'] + + image = np.frombuffer(socket.recv(), dtype=dtype).reshape(shape) + + data = containers.Data(const.DATA_STATUS_DATA, image, 'data') + data.theta = theta + data.image_number = image_number + dataq.put(data) + + +def verify(config): + """ + This function starts real time verification process according to the given configuration. + + This function reads configuration and initiates variables accordingly. + It starts the handler process that verifies data and starts a process receiving the data from ZeroMQ server. + + Parameters + ---------- + conf : str + configuration file name, including path + + Returns + ------- + none + + """ + logger, limits, quality_checks, feedback, report_type, consumers, zmq_host, zmq_rcv_port, detector = init(config) + + feedback_obj = fb.Feedback(feedback) + if const.FEEDBACK_LOG in feedback: + feedback_obj.set_logger(logger) + + if const.FEEDBACK_PV in feedback: + feedback_pvs = utils.get_feedback_pvs(quality_checks) + feedback_obj.set_feedback_pv(feedback_pvs, detector) + + dataq = Queue() + p = Process(target=handler.handle_data, args=(dataq, limits, None, quality_checks, None, consumers, feedback_obj)) + p.start() + + receive_zmq_send(dataq, zmq_host, zmq_rcv_port) diff --git a/dquality/handler.py b/dquality/handler.py index 7ba957c..739a456 100644 --- a/dquality/handler.py +++ b/dquality/handler.py @@ -54,10 +54,10 @@ """ -from multiprocessing import Queue, Process import dquality.common.constants as const import dquality.common.qualitychecks as calc -from dquality.common.containers import Aggregate, Consumer_adapter +from dquality.common.containers import Aggregate +import dquality.clients.zmq_client as cons import sys from collections import deque if sys.version[0] == '2': @@ -91,18 +91,15 @@ def init_consumers(consumers): a list of Queues that are used to deliver frames to consumers """ - consumers_q = [] - for consumer in consumers: - path = consumers[consumer][0] - args = consumers[consumer][1] - q = Queue() - consumers_q.append(q) - adapter = Consumer_adapter(path) - adapter.start_process(q, consumer, args) - return consumers_q + consumer_zmq = [] + # connect to one consumer, may be extended to many later + zmq_sender = cons.zmq_sen(str(consumers)) + consumer_zmq.append(zmq_sender) + return consumer_zmq -def send_to_consumers(waiting_q, consumers_q, results): + +def send_to_consumers(consumer_zmq, data, results): """ This function receives frames in a real time and delivers them to the consumer processes. @@ -124,48 +121,17 @@ def send_to_consumers(waiting_q, consumers_q, results): ------- none """ - index = results.index - tempq = deque() - - def send_data(data): + if not consumer_zmq is None: + # for consumer in consumer_zmq: + # consumer.send_to_zmq(data, results) if data.status == const.DATA_STATUS_DATA: - data.failed = results.failed - for consumerq in consumers_q: - consumerq.put(data) - - search = True - while search: - data = waiting_q.pop() - if data.status == const.DATA_STATUS_END: - if len(tempq) == 0: - if len(waiting_q) == 0: - send_data(data) - else: - waiting_q.append_left(data) - else: - waiting_q.append_left(data) - while len(tempq) > 0: - waiting_q.append(tempq.popleft()) - search = False - elif data.status == const.DATA_STATUS_MISSING: - # send missing frames - if data.index < index: - send_data(data) - else: - tempq.appendleft(data) - else: - if data.index == results.index: - send_data(data) - search = False - else: - #put on temp queue - tempq.appendleft(data) - while len(tempq) > 0: - waiting_q.append(tempq.popleft()) - - - -def handle_data(dataq, limits, reportq, quality_checks, aggregate_limit, consumers=None, feedback_obj=None): + data.ver = not results.failed + data.image_number = results.index + for consumer in consumer_zmq: + consumer.send_to_zmq(data) + + +def handle_data(dataq, reportq, args, kwargs): """ This function creates and initializes all variables and handles data received on a 'dataq' queue. @@ -217,70 +183,51 @@ def handle_data(dataq, limits, reportq, quality_checks, aggregate_limit, consume ------- None """ - consumers_q = None - waiting_q = None - if consumers is not None: - consumers_q = init_consumers(consumers) - waiting_q = deque() - - feedbackq = None - if feedback_obj is not None: - feedbackq = Queue() - p = Process(target=feedback_obj.quality_feedback, args=(feedbackq,)) - p.start() - + try: + consumers = kwargs['consumers'] + consumer_zmq = init_consumers(consumers) + except KeyError: + consumer_zmq = None + + limits = args[0] + quality_checks = args[1] aggregates = {} types = quality_checks.keys() for type in types: - aggregates[type] = Aggregate(type, quality_checks[type], aggregate_limit, feedbackq) + aggregates[type] = Aggregate(type, quality_checks[type], **kwargs) - resultsq = Queue() interrupted = False index = 0 - num_processes = 0 while not interrupted: try: data = dataq.get(timeout=0.005) if data.status == const.DATA_STATUS_END: interrupted = True - while num_processes > 0: - results = resultsq.get() - aggregates[results.type].handle_results(results) - num_processes -= 1 - if feedbackq is not None: + send_to_consumers(consumer_zmq, data, const.DATA_STATUS_END) + try: + feedbackq = kwargs['feedbackq'] for _ in range(len(aggregates)): feedbackq.put(const.DATA_STATUS_END) - if waiting_q is not None: - waiting_q.appendleft(data) - send_to_consumers(waiting_q, consumers_q, results) + except KeyError: + pass elif data.status == const.DATA_STATUS_MISSING: - if waiting_q is not None: - data.index = index - waiting_q.appendleft(data) index += 1 - else: - if waiting_q is not None: - data.index = index - waiting_q.appendleft(data) + elif data.status == const.DATA_STATUS_DATA: type = data.type - p = Process(target=calc.run_quality_checks, - args=(data, index, resultsq, aggregates[type], limits[type], quality_checks[type])) - p.start() - num_processes += 1 + results = calc.run_quality_checks(data, index, aggregates[type], limits[type], quality_checks[type]) + send_to_consumers(consumer_zmq, data, results) + try: + results.file_name = data.file_name + except: + pass + aggregates[results.type].handle_results(results) index += 1 except queue.Empty: pass - while not resultsq.empty(): - results = resultsq.get_nowait() - aggregates[results.type].handle_results(results) - num_processes -= 1 - if consumers is not None: - send_to_consumers(waiting_q, consumers_q, results) - if reportq is not None: results = {} for type in aggregates: diff --git a/dquality/monitor.py b/dquality/monitor.py index 660fc56..ade48a8 100644 --- a/dquality/monitor.py +++ b/dquality/monitor.py @@ -163,7 +163,8 @@ def init(config): with open(qcfile) as qc_file: dict = json.loads(qc_file.read()) - quality_checks = utils.get_quality_checks(dict) + #quality_checks = utils.get_quality_checks(dict) + quality_checks = dict try: report_type = conf['report_type'] diff --git a/dquality/monitor_polling.py b/dquality/monitor_polling.py index 565e61c..c23b159 100644 --- a/dquality/monitor_polling.py +++ b/dquality/monitor_polling.py @@ -409,7 +409,8 @@ def init(config): with open(qcfile) as qc_file: dict = json.loads(qc_file.read()) - quality_checks = utils.get_quality_checks(dict) + #quality_checks = utils.get_quality_checks(dict) + quality_checks = dict try: report_type = conf['report_type'] diff --git a/dquality/realtime/real_time.py b/dquality/real_time_pv.py similarity index 71% rename from dquality/realtime/real_time.py rename to dquality/real_time_pv.py index 6483ee6..e7680f3 100644 --- a/dquality/realtime/real_time.py +++ b/dquality/real_time_pv.py @@ -53,24 +53,30 @@ plug in of area detector. The read of frame data from channel access happens on event of frame counter change. The change is detected with a callback. The data is passed to the consuming process. This module requires configuration file with the following parameters: -'detector', a string defining the first prefix in area detector, it has to match the area detector configuration -'detector_basic', a string defining the second prefix in area detector, defining the basic parameters, it has to -match the area detector configuration -'detector_image', a string defining the second prefix in area detector, defining the image parameters, it has to -match the area detector configuration +'detector', a string defining the first prefix in area detector. 'no_frames', number of frames that will be fed 'args', optional, list of process specific parameters, they need to be parsed to the desired format in the wrapper """ -from multiprocessing import Queue +from multiprocessing import Process, Queue import json import sys +import time import dquality.common.utilities as utils import dquality.common.report as report -from dquality.realtime.feed import Feed +from dquality.feeds.pv_feed import Feed import dquality.common.constants as const -import dquality.realtime.adapter as adapter -from dquality.realtime.feed_decorator import FeedDecorator +import dquality.clients.fb_client.feedback as fb +import dquality.feeds.adapter as adapter +from dquality.feeds.pv_feed_decorator import FeedDecorator + + +__author__ = "Barbara Frosik" +__copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." +__docformat__ = 'restructuredtext en' +__all__ = ['init', + 'RT.verify', + 'RT.finish'] def init(config): @@ -107,7 +113,6 @@ def init(config): a dictionary parsed from json file representing consumers """ - conf = utils.get_config(config) if conf is None: print ('configuration file is missing') @@ -115,12 +120,16 @@ def init(config): logger = utils.get_logger(__name__, conf) + feed_args = [] + feed_kwargs = {} + limitsfile = utils.get_file(conf, 'limits', logger) if limitsfile is None: sys.exit(-1) with open(limitsfile) as limits_file: limits = json.loads(limits_file.read()) + feed_args.append(limits) qcfile = utils.get_file(conf, 'quality_checks', logger) if qcfile is None: @@ -128,26 +137,64 @@ def init(config): with open(qcfile) as qc_file: dict = json.loads(qc_file.read()) - quality_checks = utils.get_quality_checks(dict) + feed_args.append(dict) + + try: + no_frames = int(conf['no_frames']) + except KeyError: + print ('no_frames parameter not configured. Continuous mode.') + no_frames = -1 + feed_args.append(no_frames) + + try: + callback_pv = conf['callback_pv'] + feed_kwargs['callback_pv'] = callback_pv + except KeyError: + pass + + try: + detector = conf['detector'] + feed_kwargs['detector'] = detector + except KeyError: + print ('detector parameter not configured.') + sys.exit(-1) + + try: + consumers = conf['zmq_snd_port'] + feed_kwargs['consumers'] = consumers + except KeyError: + pass + + try: + aggregate_limit = int(conf['aggregate_limit']) + except KeyError: + aggregate_limit = no_frames + feed_kwargs['aggregate_limit'] = aggregate_limit try: feedback = conf['feedback_type'] + if len(feedback) == 0: + feedback = None except KeyError: feedback = None + try: + decor_conf = conf['decor'] + decor_map = {} + for entry in decor_conf: + entry = entry.split('>') + decor_map[entry[0].strip()] = entry[1].strip() + if len(decor_map) == 0: + decor_map = None + except KeyError: + decor_map = None + try: report_type = conf['report_type'] except KeyError: report_type = const.REPORT_FULL - consumersfile = utils.get_file(conf, 'consumers', logger, False) - if consumersfile is None: - consumers = None - else: - with open(consumersfile) as consumers_file: - consumers = json.loads(consumers_file.read()) - - return logger, limits, quality_checks, feedback, report_type, consumers + return feed_args, feed_kwargs, feedback, decor_map, logger, report_type class RT: @@ -176,32 +223,35 @@ def verify(self, config, report_file=None, sequence = None): boolean """ - def get_decor(qc): - decor = {} - qc = [6] - if const.QUALITYCHECK_RATE_SAT in qc: - decor[const.QUALITYCHECK_RATE_SAT] = detector + ":" + detector_basic +":AcquireTime" - return decor + feed_args, feed_kwargs, feedback, decor_map, logger, report_type = init(config) + + # init the pv feedback + if not feedback is None: + feedbackq = Queue() + feedback_pvs = utils.get_feedback_pvs(feed_args[1]) + fb_args = {'feedback_pvs':feedback_pvs, 'detector':feed_kwargs['detector']} + feedback_obj = fb.Feedback(feedbackq, feedback, **fb_args) + # put the logger to args + if const.FEEDBACK_LOG in feedback: + feedback_obj.set_logger(logger) + feed_kwargs['feedbackq'] = feedbackq - logger, limits, quality_checks, feedback, report_type, consumers = init(config) - no_frames, aggregate_limit, detector, detector_basic, detector_image = adapter.parse_config(config) + self.p = Process(target=feedback_obj.deliver, args=()) + self.p.start() - aggregateq = Queue() + reportq = Queue() # address the special cases of quality checks when additional arguments are required - decor = get_decor(quality_checks) - if len(decor) is 0: + if decor_map is None: self.feed = Feed() else: - self.feed = FeedDecorator(decor) + self.feed = FeedDecorator(decor_map) - aggregate_limit = no_frames - args = limits, aggregateq, quality_checks, aggregate_limit, consumers, feedback, detector - ack = self.feed.feed_data(no_frames, detector, detector_basic, detector_image, logger, sequence, *args) + ack = self.feed.feed_data(logger, reportq, *feed_args, **feed_kwargs) if ack == 1: bad_indexes = {} - aggregate = aggregateq.get() + aggregate = reportq.get() if report_file is not None: report.report_results(logger, aggregate, None, report_file, report_type) @@ -211,5 +261,14 @@ def get_decor(qc): def finish(self): - self.feed.finish() + try: + self.feed.finish() + time.sleep(1) + except: + pass + + try: + self.p.terminate() + except: + pass diff --git a/dquality/real_time_pva.py b/dquality/real_time_pva.py new file mode 100644 index 0000000..278f142 --- /dev/null +++ b/dquality/real_time_pva.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# ######################################################################### +# Copyright (c) 2016, UChicago Argonne, LLC. All rights reserved. # +# # +# Copyright 2016. UChicago Argonne, LLC. This software was produced # +# under U.S. Government contract DE-AC02-06CH11357 for Argonne National # +# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the # +# U.S. Department of Energy. The U.S. Government has rights to use, # +# reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR # +# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR # +# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is # +# modified to produce derivative works, such modified software should # +# be clearly marked, so as not to confuse it with the version available # +# from ANL. # +# # +# Additionally, redistribution and use in source and binary forms, with # +# or without modification, are permitted provided that the following # +# conditions are met: # +# # +# * Redistributions of source code must retain the above copyright # +# notice, this list of conditions and the following disclaimer. # +# # +# * Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in # +# the documentation and/or other materials provided with the # +# distribution. # +# # +# * Neither the name of UChicago Argonne, LLC, Argonne National # +# Laboratory, ANL, the U.S. Government, nor the names of its # +# contributors may be used to endorse or promote products derived # +# from this software without specific prior written permission. # +# # +# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS # +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago # +# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # +# POSSIBILITY OF SUCH DAMAGE. # +# ######################################################################### + +""" +Please make sure the installation :ref:`pre-requisite-reference-label` are met. + +This module feeds the data coming from detector to a process using queue. It interracts with a channel access +plug in of area detector. The read of frame data from channel access happens on event of frame counter change. +The change is detected with a callback. The data is passed to the consuming process. +This module requires configuration file with the following parameters: +'detector', a string defining the first prefix in area detector. +'no_frames', number of frames that will be fed +'args', optional, list of process specific parameters, they need to be parsed to the desired format in the wrapper +""" + +import json +import signal +import sys +import dquality.common.utilities as utils +from dquality.feeds.pva_feed import Feed + + +__author__ = "Barbara Frosik" +__copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." +__docformat__ = 'restructuredtext en' +__all__ = ['init', + 'RT.verify', + 'RT.finish'] + + +def init(config): + """ + This function initializes variables according to configuration. + + It gets values from the configuration file, evaluates and processes the values. If mandatory parameter is missing, + the script logs an error and exits. + + Parameters + ---------- + config : str + configuration file name, including path + + Returns + ------- + logger : Logger + logger instance + + limits : dictionary + a dictionary containing limit values read from the configured 'limit' file + + quality_checks : dict + a dictionary containing quality check functions ids + + feedback : list + a list of strings defining real time feedback of quality checks errors. Currently supporting 'PV', 'log', and + 'console' + + zmq_snd_port : int + a port used to send the verified data out + + pva_name : str + pvaccess name + + detector : str + detector name + + """ + conf = utils.get_config(config) + if conf is None: + print ('configuration file is missing') + exit(-1) + + try: + pva_name = conf['pva_name'] + except KeyError: + print ('pva_name not configured') + exit(-1) + + logger = utils.get_logger(__name__, conf) + + limitsfile = utils.get_file(conf, 'limits', logger) + if limitsfile is None: + sys.exit(-1) + + with open(limitsfile) as limits_file: + limits = json.loads(limits_file.read()) + + qcfile = utils.get_file(conf, 'quality_checks', logger) + if qcfile is None: + sys.exit(-1) + + with open(qcfile) as qc_file: + dict = json.loads(qc_file.read()) + quality_checks = dict + + try: + feedback = conf['feedback_type'] + if len(feedback) == 0: + feedback = None + except KeyError: + feedback = None + + try: + zmq_snd_port = conf['zmq_snd_port'] + except KeyError: + zmq_snd_port = None + + try: + detector = conf['detector'] + except KeyError: + print ('detector parameter not configured.') + detector = None + + return logger, limits, quality_checks, feedback, zmq_snd_port, pva_name, detector + + +class RT: + + def verify(self, config): + """ + This function starts real time verification process according to the given configuration. + + Parameters + ---------- + conf : str + configuration file name, including path + + Returns + ------- + none + + """ + logger, limits, quality_checks, feedback, zmq_snd_port, pva_name, detector = init(config) + + self.feed = Feed(logger, limits, quality_checks, feedback, zmq_snd_port, pva_name, detector) + self.feed.feed_data() + + + def finish(self): + """ + This function gracefully terminates the pvaccess feed and it's clients + + """ + print ('finish') + self.feed.stop_feed() + + +def signal_handler(signal, frame): + rt.finish() + +rt = RT() +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) + +rt.verify('/home/phoebus/BFROSIK/.dquality/BBF1/dqconfig.ini') diff --git a/dquality/realtime/feed_decorator.py b/dquality/realtime/feed_decorator.py deleted file mode 100644 index c64bf40..0000000 --- a/dquality/realtime/feed_decorator.py +++ /dev/null @@ -1,22 +0,0 @@ -from dquality.realtime.feed import Feed -import dquality.realtime.adapter as adapter -import dquality.common.constants as const -from epics import caget - -class FeedDecorator(Feed): - def __init__(self, decor): - Feed.__init__(self) - try: - self.acq_time_pv = decor[const.QUALITYCHECK_RATE_SAT] - except: - self.acq_time_pv = None - - - def get_packed_data(self, data, data_type): - if self.acq_time_pv is None: - return adapter.pack_data(data, data_type) - else: - acq_time = caget(self.acq_time_pv) - return adapter.pack_data_with_decor(data, data_type, acq_time) - - diff --git a/qualityFeedback.adl b/qualityFeedback.adl index bac4f1e..05fc170 100644 --- a/qualityFeedback.adl +++ b/qualityFeedback.adl @@ -1,12 +1,12 @@ file { - name="/local/data-quality/qualityFeedback.adl" - version=030111 + name="/local/bfrosik/data-quality/qualityFeedback.adl" + version=030114 } display { object { - x=896 - y=597 + x=886 + y=589 width=743 height=480 } @@ -106,7 +106,7 @@ text { height=20 } monitor { - chan="pilatus300:data_mean_ind" + chan="BBF1:data_mean_ind" clr=24 bclr=4 } @@ -184,7 +184,7 @@ text { height=20 } monitor { - chan="pilatus300:data_st_dev_ind" + chan="BBF1:data_st_dev_ind" clr=24 bclr=4 } @@ -200,7 +200,7 @@ text { height=20 } monitor { - chan="pilatus300:data_stat_mean_ind" + chan="BBF1:data_stat_mean_ind" clr=24 bclr=4 } @@ -229,7 +229,7 @@ text { height=20 } monitor { - chan="pilatus300:data_mean_ctr" + chan="BBF1:data_mean_ctr" clr=24 bclr=4 } @@ -247,7 +247,7 @@ text { height=20 } monitor { - chan="pilatus300:data_st_dev_ctr" + chan="BBF1:data_st_dev_ctr" clr=24 bclr=4 } @@ -265,7 +265,7 @@ text { height=20 } monitor { - chan="pilatus300:data_stat_mean_ctr" + chan="BBF1:data_stat_mean_ctr" clr=24 bclr=4 } @@ -294,7 +294,7 @@ text { height=20 } monitor { - chan="pilatus300:data_white_mean_ind" + chan="BBF1:data_white_mean_ind" clr=24 bclr=4 } @@ -339,7 +339,7 @@ text { height=20 } monitor { - chan="pilatus300:data_white_st_dev_ind" + chan="BBF1:data_white_st_dev_ind" clr=24 bclr=4 } @@ -355,7 +355,7 @@ text { height=20 } monitor { - chan="pilatus300:data_white_stat_mean_ind" + chan="BBF1:data_white_stat_mean_ind" clr=24 bclr=4 } @@ -383,7 +383,7 @@ text { height=20 } monitor { - chan="pilatus300:data_white_mean_ctr" + chan="BBF1:data_white_mean_ctr" clr=24 bclr=4 } @@ -401,7 +401,7 @@ text { height=20 } monitor { - chan="pilatus300:data_white_st_dev_ctr" + chan="BBF1:data_white_st_dev_ctr" clr=24 bclr=4 } @@ -419,7 +419,7 @@ text { height=20 } monitor { - chan="pilatus300:data_white_stat_mean_ctr" + chan="BBF1:data_white_stat_mean_ctr" clr=24 bclr=4 } @@ -448,7 +448,7 @@ text { height=20 } monitor { - chan="pilatus300:data_dark_mean_ind" + chan="BBF1:data_dark_mean_ind" clr=24 bclr=4 } @@ -493,7 +493,7 @@ text { height=20 } monitor { - chan="pilatus300:data_dark_st_dev_ind" + chan="BBF1:data_dark_st_dev_ind" clr=24 bclr=4 } @@ -521,7 +521,7 @@ text { height=20 } monitor { - chan="pilatus300:data_dark_mean_ctr" + chan="BBF1:data_dark_mean_ctr" clr=24 bclr=4 } @@ -539,7 +539,7 @@ text { height=20 } monitor { - chan="pilatus300:data_dark_st_dev_ctr" + chan="BBF1:data_dark_st_dev_ctr" clr=24 bclr=4 } diff --git a/server_verifier.py b/server_verifier.py index 3e9490a..91a8758 100644 --- a/server_verifier.py +++ b/server_verifier.py @@ -55,8 +55,7 @@ from multiprocessing import Process from multiprocessing.managers import SyncManager -import multiprocessing as mp -import dquality.realtime.real_time as real +import dquality.real_time_pv as real import argparse import json import sys diff --git a/test/dqconfig.ini b/test/dqconfig.ini index aaa4350..b3cb2ab 100644 --- a/test/dqconfig.ini +++ b/test/dqconfig.ini @@ -9,10 +9,14 @@ 'time_zone' = America/Chicago 'file_type' = 'FILE_TYPE_HDF' 'extensions' = .txt, .hd5, .HD5, .hdf5, .HDF5, .h5, .H5, .hdf -'consumers' = test/schemas/consumers.json -'feedback_type' = log +'feedback_type' = pv, console 'detector' = BBF1 'detector_basic' = cam1 'detector_image' = image1 -'no_frames' = 100 \ No newline at end of file +'no_frames' = -1 + +#'zmq_rcv_port' = 5556 +#'zmq_host' = localhost + +#'zmq_snd_port' = 5577 \ No newline at end of file diff --git a/test/schemas_test/quality_checks_GE.json b/test/schemas_test/quality_checks_GE.json index d38aded..11f9546 100644 --- a/test/schemas_test/quality_checks_GE.json +++ b/test/schemas_test/quality_checks_GE.json @@ -1 +1 @@ -{ "data" : ["QUALITYCHECK_SAT", "ACC_SAT"]} \ No newline at end of file +{ "data" : ["sat", "acc_sat"]} \ No newline at end of file diff --git a/test/schemas_test/quality_checks_HDF.json b/test/schemas_test/quality_checks_HDF.json index e14aeb2..0b43def 100644 --- a/test/schemas_test/quality_checks_HDF.json +++ b/test/schemas_test/quality_checks_HDF.json @@ -1,3 +1,3 @@ -{"data" : [ "QUALITYCHECK_MEAN", "QUALITYCHECK_STD", "STAT_MEAN"], - "data_dark": [ "QUALITYCHECK_MEAN", "QUALITYCHECK_STD", "STAT_MEAN"], - "data_white": [ "QUALITYCHECK_MEAN", "QUALITYCHECK_STD", "STAT_MEAN"]} \ No newline at end of file +{"data" : [ "mean", "st_dev", "stat_mean"], + "data_dark": [ "mean", "st_dev", "stat_mean"], + "data_white": [ "mean", "st_dev", "stat_mean"]} \ No newline at end of file diff --git a/test/zmq_test/__init__.py b/test/zmq_test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/zmq_test/adapter1.py b/test/zmq_test/adapter1.py new file mode 100644 index 0000000..74e7707 --- /dev/null +++ b/test/zmq_test/adapter1.py @@ -0,0 +1,64 @@ +from multiprocessing import Process +from dquality.feeds.zmq_sender import receive_send_zmq +import dquality.common.containers as containers +import dquality.common.constants as const +import dquality.common.utilities as utils + + +__author__ = "Barbara Frosik" +__copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC." +__docformat__ = 'restructuredtext en' +__all__ = ['start_process', + 'parse_config', + 'pack_data'] + + + +def start_process(dataq, logger, *args): + """ + This function parses parameters and starts process consuming frames from feed. + + This function parses the positional parameters. Then it starts a client process, passing in a queue as first + parameter, followed by the parsed parameters. The function of the client process must be included in imports. + + Parameters + ---------- + dataq : multiprocessing.Queue + a queue used to transfer data from feed to client process + + logger : Logger + an instance of Logger, used by the application + + *args : list + a list of posisional parameters required by the client process + + Returns + ------- + none + """ + zmq_port = args[0] + + p = Process(target=receive_send_zmq, args=(dataq, zmq_port)) + p.start() + + +def pack_data(slice, type): + """ + This function packs a single image data into a specific container. + + Parameters + ---------- + slice : nparray + image data + + type : str + data type, as 'data', 'data_white', or 'data_dark' + + """ + if slice is not None: + return containers.Data(const.DATA_STATUS_DATA, slice, type) + elif type == 'missing': + return containers.Data(const.DATA_STATUS_MISSING) + else: + return containers.Data(const.DATA_STATUS_END) + diff --git a/test/zmq_test/rec_zmq.py b/test/zmq_test/rec_zmq.py new file mode 100644 index 0000000..17fc442 --- /dev/null +++ b/test/zmq_test/rec_zmq.py @@ -0,0 +1,7 @@ +import dquality.feeds.zmq_receiver as rec + +def run_recev(config): + rec.verify(config) + +if __name__ == "__main__": + run_recev('test/dqconfig.ini') \ No newline at end of file diff --git a/test/zmq_test/send_zmq.py b/test/zmq_test/send_zmq.py new file mode 100644 index 0000000..e0c236e --- /dev/null +++ b/test/zmq_test/send_zmq.py @@ -0,0 +1,51 @@ +import dquality.common.utilities as utils +from dquality.feeds.feed import Feed + +def init(config): + conf = utils.get_config(config) + if conf is None: + print ('configuration file is missing') + exit(-1) + + logger = utils.get_logger(__name__, conf) + try: + zmq_port = conf['zmq_rcv_port'] + except: + zmq_port = None + print ('configuration error: zmq_port not configured') + + try: + detector = conf['detector'] + except KeyError: + print ('configuration error: detector parameter not configured.') + return None + try: + detector_basic = conf['detector_basic'] + except KeyError: + print ('configuration error: detector_basic parameter not configured.') + return None + try: + detector_image = conf['detector_image'] + except KeyError: + print ('configuration error: detector_image parameter not configured.') + return None + + try: + no_frames = conf['no_frames'] + except KeyError: + print ('no_frames parameter not configured.') + return None + + return detector, detector_basic, detector_image, logger, zmq_port, no_frames + +def run_sender(config): + detector, detector_basic, detector_image, logger, zmq_port, no_frames = init(config) + + args = zmq_port, zmq_port + feed = Feed() + ack = feed.feed_data(int(no_frames), detector, detector_basic, detector_image, logger, None, *args) + + print ('done') + +if __name__ == "__main__": + run_sender('test/dqconfig.ini') \ No newline at end of file diff --git a/test/zmq_test/zmq_consumer.py b/test/zmq_test/zmq_consumer.py new file mode 100644 index 0000000..2613c30 --- /dev/null +++ b/test/zmq_test/zmq_consumer.py @@ -0,0 +1,92 @@ +from multiprocessing import Queue, Process +import numpy as np +import zmq +import time +import sys +import json + +#This module is for testing onle, acts as a zmq consumer + +class zmq_rec(): + """ + This class represents ZeroMQ connection. + """ + def __init__(self, host=None, port=None): + """ + Constructor + + This constructor creates zmq Context and socket for the zmq.PAIR. + It initiate connect to the server given by host and port. + + Parameters + ---------- + host : str + server host name + + port : str + serving port + + """ + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PAIR) + self.socket.connect("tcp://" + host +":%s" % port) + + def destroy(self): + """ + Destroys Context. This also closes socket associated with the context. + """ + self.context.destroy() + + +def receive_zmq_send(zmq_host, zmq_rcv_port): + """ + This function receives data from socket and enqueues it into a queue until the end is detected. + + Parameters + ---------- + dataq : Queue + a queue passing data received from ZeroMQ server to another process + + zmq_host : str + ZeroMQ server host name + + zmq_rcv_port : str + ZeroMQ port + + Returns + ------- + none + """ + + conn = zmq_rec(zmq_host, zmq_rcv_port) + socket = conn.socket + interrupted = False + while not interrupted: + print ('waiting') + msg = socket.recv_json() + key = msg.get("key") + if key == "end": + print ('end of data, closing connection') + interrupted = True + conn.destroy() + elif key == "image": + print('got msg') + msg["receiving_timestamp"] = time.time() + dtype = msg["dtype"] + shape = msg["shape"] + image_number = msg['image_number'] + #image_timestamp = msg['image_timestamp'] + theta = msg['theta'] + ver_result = msg['ver'] + + image = np.frombuffer(socket.recv(), dtype=dtype).reshape(shape) + print ('theta, index', theta, image_number, ver_result) + # print ('received data, index, theta, ver_result', data.shape, image_number, theta, ver_result) + else: + pass + + print("Connection ended") + +if __name__ == "__main__": + receive_zmq_send('localhost', 5577) + diff --git a/test/zmq_test/zmq_sender.py b/test/zmq_test/zmq_sender.py new file mode 100644 index 0000000..06fd10b --- /dev/null +++ b/test/zmq_test/zmq_sender.py @@ -0,0 +1,44 @@ +import zmq +import dquality.common.constants as const + +#this file is for testing, imitates BlueSky + +class zmq_sen(): + def __init__(self, port=None): + context = zmq.Context() + self.socket = context.socket(zmq.PAIR) + self.socket.bind("tcp://*:%s" % port) + +def receive_send_zmq(dataq, zmq_rcv_port): + conn = zmq_sen(zmq_rcv_port) + socket = conn.socket + interrupted = False + while not interrupted: + data = dataq.get() + if data.status == const.DATA_STATUS_END: + socket.send_json( + dict( + key="end", + document="... see next message ...", + )) + interrupted = True + else: + slice = data.slice.flatten() + print ('sending zmqdqu ') + socket.send_json( + dict( + key="image", + dtype=str(slice.dtype), + shape=slice.shape, + image_number=0, + # image_timestamp=image_time, + # sending_timestamp=time.time(), + rotation="5", + # rotation_timestamp=rotation_time, + document="... see next message ...", + + ), zmq.SNDMORE + ) + # binary image is not serializable in JSON, send separately + socket.send(slice) + diff --git a/zmq_controller.py b/zmq_controller.py new file mode 100644 index 0000000..3ac45eb --- /dev/null +++ b/zmq_controller.py @@ -0,0 +1,99 @@ +import zmq +import os +import signal +import sys +from os.path import expanduser +import dquality.real_time_pv as real +import dquality.common.constants as const +import threading +import time + + +class zmq_server(): + """ + This class represents ZeroMQ connection. + """ + + def __init__(self, port=const.ZMQ_CONTROLLER_PORT): + """ + Constructor + + This constructor creates zmq Context and socket for the server in zmq.PAIR. + It initiate binds and listens for a connection. + + Parameters + ---------- + port : str + serving port + + """ + + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PAIR) + self.socket.bind("tcp://*:%s" % port) + self.ver = None + self.interrupted = False + + + def destroy(self): + """ + Destroys Context. This also closes socket associated with the context. + """ + self.interrupted = True + self.socket.close() + self.context.destroy() + + +def receive(conn): + """ + This function receives data from socket and enqueues it into a queue until the end is detected. + + Parameters + ---------- + conn : zmq_server + a zmq_server instance + + Returns + ------- + none + """ + while not conn.interrupted: + msg = conn.socket.recv_json() + key = msg.get("key") + if key == "start_ver": + detector = msg["detector"] + home = expanduser("~") + conf = os.path.join(home, '.dquality', detector) + if os.path.isdir(conf): + config = os.path.join(conf, 'dqconfig.ini') + if not os.path.isfile(config): + print ('missing configuration file') + else: + conn.ver = real.RT() + th = threading.Thread(target=conn.ver.verify, args=(config,)) + th.start() + + elif key == "stop_ver": + # print('stopping ver') + if not conn.ver is None: + conn.ver.finish() + conn.ver = None + + else: + pass + + print("Connection ended") + +if __name__ == "__main__": + def signal_handler(signal, frame): + if not conn.ver is None: + conn.ver.finish() + time.sleep(1) + conn.destroy() + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + + conn = zmq_server(const.ZMQ_CONTROLLER_PORT) + receive(conn) +