#include "dataProcessing.h" /* Function: DataProcessing * Params In * N/A * Return * N/A * Purpose * Default constructor. Sets up some variables that will be needed for * processing. */ DataProcessing::DataProcessing( QString rDir, LoadRefPulse* rPulse, ChannelMasks* cMasks, ProcParams* pParams, MatrixData* uavData, QWidget* parent, bool rec ) { // Copy paramters to member variables. data = uavData; // Data structure pointer. recordingOn = rec; // Should we write out raw/processed data to local files. refPulse = rPulse; // Pointer to the reference pulse values window. chanMasks = cMasks; // Pointer to the channel masks window. rPath = rDir; // The directory to record values to. procParams = pParams; // Pointer to a structure containing parameters for processing. // Initialize variables. recordingOn = false; // Is data recording on? recStart = false; // Is the data recording being started? setRefs = false; // Are the reference waveforms being set? numOutput = 0; // No data traces have been written. fileNum = 1; // Currently writing to the first set of binary files. hz = new double[2]; // Array for the hz values used in the frequency axis. cNum = data->getChanNum(); // Connect the signal to the function in the parent. connect( this, SIGNAL( writeMsg( QString, int ) ), parent, SLOT( writeToLog( QString, int ) ) ); // Initialize some arrays; serverAddr = new QString[ cNum ]; received = new bool[ cNum ]; utcTime = new uint32_t[ cNum ]; ppsTime = new uint32_t[ cNum ]; epriNum = new uint32_t[ cNum ]; outRaw = new FILE*[ cNum ]; outFinal = new FILE*[ cNum ]; clientSockets = new Client*[ cNum ]; /* * Create the hostnames for each channel that will be used * and create the Client objects for each channel. Then * initialize the output streams for raw and final data. */ for( int i = 0; i < cNum; ++i ) { QString host = "10.14."; QString tempRad; QString tempChan; tempRad.setNum( uavData->getRadarNum() ); tempChan.setNum( i+1 ); host.append( tempRad ); host.append( "." ); host.append( tempChan ); serverAddr[i] = host; clientSockets[i] = new Client( serverAddr[i], this ); outRaw[i] = NULL; outFinal[i] = NULL; } // Reset the received variable with false. resetReceived(); /* * When the refresh() signal is emitted, the refreshWindow() function * of the parent is called. */ connect( this, SIGNAL( refresh() ), parent, SLOT( refreshWindows() ) ); connect( this, SIGNAL( notNull() ), parent, SLOT( enableButtons() ) ); connect( this, SIGNAL( disableC( int ) ), parent, SLOT( disableChannel( int ) ) ); connect( this, SIGNAL( status( QString, uint32_t, uint32_t ) ), parent, SLOT( setStatus( QString, uint32_t, uint32_t ) ) ); connect( this, SIGNAL( errorClose() ), parent, SLOT( closeQL() ) ); // Set the constants and generate the reference pulse. setConstants(); setFFTs(); setRefWaves(); // Set the variables to use for the axis. setDepths(); } /* Function: ~DataProcessing * Params In * N/A * Return * N/A * Purpose * Default destructor. Called when the DataProcessing object is being * deleted. Purpose is to close the output files and free up * as much allocated memory as possible. */ DataProcessing::~DataProcessing() { for( int i = 0; i < cNum; ++i ) { if( recordingOn ) { fclose( outRaw[i] ); fclose( outFinal[i] ); outRaw[i] = NULL; outFinal[i] = NULL; } delete clientSockets[i]; clientSockets[i] = NULL; fftwf_destroy_plan( pFor[i] ); fftwf_destroy_plan( pForRef[i] ); } fftwf_destroy_plan( pInv ); } /* Function: setConstants * Params In * N/A * Return * N/A * Purpose * */ void DataProcessing::setConstants() { // Set the variables needed for setting up the ffts, based on the radar parameters. f_samp = procParams->sFrequency; f_start = procParams->f0; f_stop = procParams->f1; tau = procParams->tau; dataSize = ( procParams->rCount ) * 2; // Set variables for use in the processing. refLen = ( tau * f_samp ) + 1; tTime = new float[ refLen ]; bw = f_stop - f_start; chirp_rate = ( bw / tau ); size_pc = COLUMNLENGTH; nyquist_zone = ( int )( f_start / ( f_samp / 2.0 ) ); // Setup constant for use in decimation. f_axis = new float[ size_pc ]; // Setup f_axis. for( int i = 0; i < size_pc; ++i ) { f_axis[i] = ( i * ( f_samp / ( size_pc - 1 ) ) ) + ( nyquist_zone * ( f_samp / 2.0 ) ); } // Find the beginning and end of the area left after truncation. start_trunc = 0; stop_trunc = 0; for( int j = 0; j < size_pc; ++j ) { if( f_axis[j] <= f_start ) { start_trunc = j; } if( f_axis[size_pc - 1 - j] >= f_stop ) { stop_trunc = size_pc - j; } } truncLen = stop_trunc - start_trunc + 1; dataSizeFinal = ( refLen + dataSize-1 ) * ( (float)truncLen / size_pc ); data_final = new float[dataSizeFinal]; data->setColumnLen( MAXPOINTS, 0 ); data->setColumnLen( MAXPOINTS, 1 ); data->setColumnLen( dataSizeFinal, 2 ); data->setColumnLen( refLen, 3 ); } void DataProcessing::setFFTs() { // Initialize some arrays. pFor = new fftwf_plan[ cNum ]; pForRef = new fftwf_plan[ cNum ]; data_in = new float*[ cNum ]; data_in_ref = new float*[ cNum ]; // Allocate some memory for variables used in the FFT process. data_pc_final = (fftwf_complex*) fftwf_malloc( sizeof( fftwf_complex ) * size_pc ); data_raw = (fftwf_complex*) fftwf_malloc( sizeof( fftwf_complex ) * size_pc ); data_freq = (fftwf_complex*) fftwf_malloc( sizeof( fftwf_complex ) * size_pc ); data_pc = (fftwf_complex*) fftwf_malloc( sizeof( fftwf_complex ) * size_pc ); data_pc_ref = (fftwf_complex*) fftwf_malloc( sizeof( fftwf_complex ) * size_pc ); // Create arrays and plans for data and reference FFTs per channel. for( int i = 0; i < cNum; ++i ) { data_in[i] = new float[ size_pc ]; data_in_ref[i] = new float[ size_pc ]; pFor[i] = fftwf_plan_dft_r2c_1d( size_pc, data_in[i], data_pc, FFTW_MEASURE ); pForRef[i] = fftwf_plan_dft_r2c_1d( size_pc, data_in_ref[i], data_pc_ref, FFTW_MEASURE ); } // ATTENTION: WILL NEED TO CHANGE WHEN TAU/HIGH INTEGRATION STUFF CHANGES data_pc_final_trunc = (fftwf_complex*) fftwf_malloc( sizeof( fftwf_complex ) * truncLen ); data_pc_final_complex = (fftwf_complex*) fftwf_malloc( sizeof( fftwf_complex ) * truncLen ); pInv = fftwf_plan_dft_1d( truncLen, data_pc_final_trunc, data_pc_final_complex, FFTW_BACKWARD, FFTW_MEASURE ); } void DataProcessing::setRefWaves() { // Get the values to use for the reference waveforms for each channel. float** refs = refPulse->getValues(); for( int i = 0; i < cNum; ++i ) { for( int z = 0; z < refLen; ++z ) { data_in_ref[i][z] = refs[i][z]; } for( int j = refLen; j < size_pc; ++j ) { data_in_ref[i][j] = 0.0; } } refs = NULL; // Set the values so they can be displayed in a plot window. data->setRefPulse( data_in_ref ); } /* Function: getData * Params In * N/A * Return * N/A * Purpose * Function calls getData for every channel. * The function begins the process of receiving new * data from every channel. */ void DataProcessing::getData() { //Call the socket object to get new data. for( int i = 0; i < cNum; ++i ) { if( !chanMasks->chanMasked( i ) ) { clientSockets[i]->getData( i ); } } } /* Function: resetReceived * Params In * N/A * Return * N/A * Purpose * Function sets every array element to false * for the variable received. The variable keeps * track of which channels have received data for * the current update. After each update, the * variable needs to get reset. */ void DataProcessing::resetReceived() { // Reset whether the data for each channel has been received. for( int i = 0; i < cNum; ++i ) { received[i] = false; } } /* Function: refreshData * Params In * N/A * Return * N/A * Purpose */ void DataProcessing::refreshData() { getData(); } /* Function: processData * Params In * N/A * Return * N/A * Purpose * Calls executeFFts(), which will perform the pulse-compression * methods on the data. After those * functions are performed, the final data is written out to * a file and then stored in the data container. * Then, numOutput is incremented to keep track of how * many sets of data have been processed since the variable * was last reset. fileOutput() is called to check if new * files need to created for output and finally a refresh() * SIGNAL is emitted, telling QuickLook to refresh the windows. */ void DataProcessing::processData() { if( !setRefs ) { uint32_t recordStart = 3735928559; // DEADBEEF hex value in decimal. uint32_t recordSize = data->getColumnLen( 2 ); for( int i = 0; i < cNum; ++i ) { // Store the raw data for the channel. storeRawData( i+1 ); executeFFTs( pFor[i], pForRef[i], i+1 ); /* Write the final data to file. * First, the decimal value for DEADBEEF hex is written * to mark the beginning of the record, followed by the * size of the data to be written. */ if( recordingOn ) { fwrite( &recordStart, sizeof( uint32_t ), 1, outFinal[i] ); fwrite( &utcTime[i], sizeof( uint32_t ), 1, outFinal[i] ); fwrite( &ppsTime[i], sizeof( uint32_t ), 1, outFinal[i] ); fwrite( &epriNum[i], sizeof( uint32_t ), 1, outFinal[i] ); fwrite( &recordSize, sizeof( uint32_t ), 1, outFinal[i] ); fwrite( data_pc_final_complex, sizeof( fftwf_complex ), recordSize, outFinal[i] ); fflush( outFinal[i] ); } // Store the data for the channel. storePulseData( i+1 ); } numOutput++; // Increase the output counter. fileOutput(); // Check to see if files should be closed and new ones opened. // Emit a signal to tell QuickLook to refresh the open Echo and A-Scope windows. QString num; num.setNum( watch.elapsed() ); //logViewer->appendPlainText( "Time to receive and process all data: " + num ); if( data->getColumnNum() == 1 ) { emit notNull(); } emit refresh(); } } /* Function: fileOutput * Params In * N/A * Return * N/A * Purpose * The function checks if 500 traces have been written out * to files. If so, a new file is created for every channel * and the counter is reset to 0. The purpose of the function * is to limit the file size of the output files. */ void DataProcessing::fileOutput() { if( outRaw[0] ) { // Check of the number of traces written to file is 2000 or greater. if( numOutput >= 2000 || recStart ) { if( recStart ) { recordingOn = true; recStart = false; } // Reset numOutput and increment fileNum. numOutput = 0; fileNum++; if( recordingOn ) { // Close the old files, create and open the new files. for( int i = 0; i < cNum; ++i ) { fclose( outRaw[i] ); fclose( outFinal[i] ); outRaw[i] = NULL; outFinal[i] = NULL; char raw[100]; char final[100]; QString timestamp = timeStamp(); QString rawFileName, finalFileName; rawFileName.append( rPath ); rawFileName.append( "/" ); rawFileName.append( timestamp ); rawFileName.append( "_raw_ch%02d_%03d.bin" ); finalFileName.append( rPath ); finalFileName.append( "/" ); finalFileName.append( timestamp ); finalFileName.append( "_final_ch%02d_%03d.bin" ); sprintf( raw, rawFileName.toStdString().c_str(), i+1, fileNum ); sprintf( final, finalFileName.toStdString().c_str(), i+1, fileNum ); outRaw[i] = fopen( raw, "wb" ); outFinal[i] = fopen( final, "wb" ); } } } } else { if( recStart ) { numOutput = 0; // Close the old files, create and open the new files. for( int i = 0; i < cNum; ++i ) { char raw[100]; char final[100]; QString timestamp = timeStamp(); QString rawFileName, finalFileName; rawFileName.append( rPath ); rawFileName.append( "/" ); rawFileName.append( timestamp ); rawFileName.append( "_raw_ch%02d_%03d.bin" ); finalFileName.append( rPath ); finalFileName.append( "/" ); finalFileName.append( timestamp ); finalFileName.append( "_final_ch%02d_%03d.bin" ); sprintf( raw, rawFileName.toStdString().c_str(), i+1, fileNum ); sprintf( final, finalFileName.toStdString().c_str(), i+1, fileNum ); outRaw[i] = fopen( raw, "wb" ); outFinal[i] = fopen( final, "wb" ); } recordingOn = true; recStart = false; } } } /* Function: setDepths() * Params In * N/A * Return * N/A * Purpose * Function generates the values to use for the window scales. * Air (m), Ice (m), and Time (us) all have different values for * pre- and post-decimated data. Frequency (Hz) only has a value * for pre-decimated data. Calls MatrixData->setDepthScaleVal() * to set the values. */ void DataProcessing::setDepths() { double microseconds = 1000000.0; // Setup the Time (us) and Frequency (MHz) scales. // Time has both pre-decimated and post-decimated values. double timePre = ( 1.0 / ( f_samp / microseconds ) ); // Start from time 0. double timePost = ( 1.0 / ( (double)( f_samp / microseconds ) * ( (double)data->getColumnLen( 2 ) / ( (double)( (double)data->getColumnLen( 0 ) + refLen ) ) ) ) ); // Start from time -refLen. hz[0] = ( ( f_samp / 2.0 ) * ( nyquist_zone ) ); hz[1] = ( f_samp / MAXPOINTS ); //std::cout << "Depths: " << timePre << " " << timePost << std::endl; data->setDepthScaleVal( timePre, timePost, hz ); } void DataProcessing::disableChan( int chan ) { emit disableC( chan ); } /* Function: executeFFts * Params In * fftwf_plan p: The data plan to be executed for the FFT. * int channel: The channel for which the pulse-compression is being computed. * Return * N/A * Purpose * The function performs the pulse-compression methods on the data. * First, FFTs are performed on the channel data and the reference data. * The resulting frequency-domain data is then stored for plotting. * FFTW, to save time, only computes the first zone of data, so the mirror * image needs to be copied to the second half of the array. Next, the data * needs to be decimated. From the beginning to the end of the truncation * area, the data is multiplied by the conjugate of the reference. * Afterwards, the Inverse FFT is computed, and the resulting values * are truncated and scaled. */ void DataProcessing::executeFFTs( fftwf_plan p, fftwf_plan r, int channel ) { // Execute the forward FFT on the data. fftwf_execute( p ); // Store the data generated from the FFT. storeFreqData( channel ); // FFTW only computes the first half (non-redundant) data. Fill in the arrays with the mirror images. // Needed as the sample range can be in the second nyquist zone. for( int i = 0; i < size_pc/2; ++i ) { data_pc[size_pc-i] = conj( data_pc[i] ); } // Execute the forward FFT on the reference waveform. fftwf_execute( r ); // FFTW only computes the first half (non-redundant) data. Fill in the arrays with the mirror images. // Needed as the sample range can be in the second nyquist zone. for( int i = 0; i < (size_pc / 2); ++i ) { data_pc_ref[size_pc-1-i] = conj( data_pc_ref[i] ); } // Decimation for( int z = start_trunc; z <= stop_trunc; ++z ) { data_pc_final_trunc[z-start_trunc] = data_pc[z] * conj( data_pc_ref[z] ); } /*std::ofstream outDD; outDD.open("output/dataDecimatedOutputDataComplexCh01_001.bin", std::ios::binary | std::ios::out ); outDD.write( ( char* ) data_pc_final_trunc, sizeof( fftwf_complex ) * truncLen ); outDD.flush(); outDD.close();*/ // Inverse FFT fftwf_execute( pInv ); /*std::ofstream outFD; outFD.open("output/dataTruncatedOutputDataComplexCh01_001.bin", std::ios::binary | std::ios::out ); outFD.write( ( char* ) data_pc_final_complex, sizeof( fftwf_complex ) * truncLen ); outFD.flush(); outFD.close();*/ // Truncation and scaling. for( int j = 0; j < dataSizeFinal; ++j ) { data_pc_final_complex[j] = ( data_pc_final_complex[j] / size_pc ) * 2 * ((float)truncLen / size_pc); } } /* Function: storePulseData * Params In * int chan: The channel of the data to be stored. * Return * N/A * Purpose * Function stores the final pulse-compressed data. First, it is pushed onto a vector, then * added to an array. The vector is added to the complete matrix. */ void DataProcessing::storePulseData( int chan ) { std::vector< fftwf_complex > finalVec; for( int i = 0; i < dataSizeFinal; ++i ) { finalVec.push_back( data_pc_final_complex[i] ); } data->setPulseMatrixData( chan, finalVec ); } /* Function: storeFreqData * Params In * int chan: The channel of the data to be stored. * Return * N/A * Purpose * Function stores the frequency-domain data, pre-decimation/truncation. First, it is pushed onto a vector, then * added to an array. The vector is added to the complete matrix. */ void DataProcessing::storeFreqData( int chan ) { std::vector< fftwf_complex > finalVec; // If the nyquist zone is odd, start from size_pc-1 to store the mirror image of the frequency domain data. if( ( (int)nyquist_zone % 2 ) == 1 ) { for( int i = size_pc-1; i >= 0; --i ) { finalVec.push_back( data_pc[i] ); } } // The nyquist zone is even, so just copy the data. else if( ( (int)nyquist_zone % 2 ) == 0 ) { for( int i = 0; i < size_pc; ++i ) { finalVec.push_back( data_pc[i] ); } } data->setFreqMatrixData( chan, finalVec ); } /* Function: storeRawData * Params In * int chan: The channel of the data to be stored. * Return * N/A * Purpose * Function stores the raw, unprocessed data. First, it is pushed onto a vector, then * added to an array. The vector is added to the complete matrix. */ void DataProcessing::storeRawData( int chan ) { std::vector< fftwf_complex > finalVec; // Loop from refLen to dataSize+refLen because data_in // is padded with 0's for the first refLen points, // which is required for the FFT to be computed properly. for( int i = refLen; i < dataSize+refLen; ++i ) { finalVec.push_back( data_in[chan-1][i] ); } data->setRawMatrixData( chan, finalVec ); } /* Function: copyData * Params In * double* newD: Array containing the new data received from the server. * int newDSize: Size of newD * int c: The channel from which the data arrived. * Return * N/A * Purpose * The function first finds the mean, so that the data is normalized around 0. * Second, the data is copied into the data_in array. The first refLen values * in data_in are set to 0, for the pulse-compression process. Then the data * is copied, and the rest of the array is filled with 0's. The value of * received is set to true for channel c, and the raw data is written to file. * Finally, the raw data is stored for plotting and if every channel has * been received, processData() and resetReceived() are called. */ void DataProcessing::copyData( float* newD, uint32_t* newH, int newDSize, int c ) { // Find the mean of the new data, then subtract the mean to normalize. long double mean = 0.0; for( int i = 0; i < newDSize; ++i ) { mean += newD[i]; } mean = mean / newDSize; // Process the header information. if( newH ) { processHeaderData( newH, c ); } /* * Pad the first refLen+3 points with 0's. This is required for circular convolution. * The +3 is needed as the first three points in the new data are header information, * and useless for the data processing. * After the data is copied, pad the rest of the array with 0's. */ for( int n = 0; n < refLen; ++n ) { data_in[c][n] = 0.0; } for( int i = refLen; i < newDSize+refLen; ++i ) { data_in[c][i] = newD[i-refLen]-mean; } for( int n = refLen+newDSize; n < size_pc; ++n ) { data_in[c][n] = 0.0; } if( newD ) { delete [] newD; } newD = NULL; if( newH ) { delete [] newH; } newH = NULL; received[c] = true; /* * Set the columnLen of the raw data to the size-3. * The first 3 numbers are hex header values. */ int len = data->getColumnLen( 0 ); if( ( len != newDSize ) && ( newDSize != 0 ) ) { data->setColumnLen( newDSize, 0 ); setDepths(); } /* * Copy the raw data to a file. First, write the integer value of DEADBEEF * to signal the start of a new trace. */ if( recordingOn ) { uint32_t recordStart = 3735928559; uint32_t recordSize = data->getColumnLen( 0 ); fwrite( &recordStart, sizeof( uint32_t ), 1, outRaw[c] ); fwrite( &utcTime[c], sizeof( uint32_t ), 1, outRaw[c] ); fwrite( &ppsTime[c], sizeof( uint32_t ), 1, outRaw[c] ); fwrite( &epriNum[c], sizeof( uint32_t ), 1, outRaw[c] ); fwrite( &recordSize, sizeof( uint32_t ), 1, outRaw[c] ); fwrite( data_in[c], sizeof( float ), recordSize, outRaw[c] ); fflush( outRaw[c] ); } // If all of the channels have received their new data, process the Data. bool check = true; for( int i = 0; i < cNum; ++i ) { if( !received[i] && !chanMasks->chanMasked(i) ) { check = false; } } if( check ) { processData(); resetReceived(); } } QString DataProcessing::timeStamp() { time_t rawtime; struct tm* timeInfo; char buffer[80]; time( &rawtime ); timeInfo = localtime( &rawtime ); strftime( buffer, 80, "%Y%m%d_%H%M%S", timeInfo ); QString output = buffer; return output; } void DataProcessing::processHeaderData( uint32_t* headers, int chan ) { utcTime[chan] = headers[1]; ppsTime[chan] = headers[2]; epriNum[chan] = headers[3]; uint32_t utc_hr, utc_min, utc_sec; utc_hr = utcTime[chan] / 3600; utc_min = ( utcTime[chan] % 3600 ) / 60; utc_sec = utcTime[chan] - ( utc_hr*3600 ) - ( utc_min*60 ); QString utcString; QString num; num.setNum( utc_hr ); utcString.append( num ); utcString.append( ":" ); num.clear(); num.setNum( utc_min ); utcString.append( num ); utcString.append( ":" ); num.clear(); num.setNum( utc_sec ); utcString.append( num ); if( chan == 0 ) { emit status( utcString, ppsTime[chan], epriNum[chan] ); } } void DataProcessing::setTimeOffset( double offset ) { timeOffset = offset; } void DataProcessing::setRecordingPath( QString path ) { rPath = path; } void DataProcessing::startRec() { // Set the flag that recording should start the next time the files are checked. recordingOn = false; recStart = true; } void DataProcessing::stopRec() { // Disable the data recording. recordingOn = false; recStart = false; } void DataProcessing::writeToLog( QString msg, int type ) { emit writeMsg( msg, type ); } void DataProcessing::closeQL() { emit errorClose(); } void DataProcessing::changeRefWaves() { setRefs = true; setRefWaves(); setRefs = false; }