1. In multi threading batch last task is dependent on all the tasks execution . Need to create dependency for execute last task on other tasks statuses in x++ D365 (F&O)
2. Dependency task is created in createCleanupTask method.
Contract class:
/// <summary>
/// A data contract class for CE sync batch <c>CCBOrderSyncCEService</c>.
/// </summary>
/// <remarks>
/// Developed to sync F&O orders with CE, dated 09 Dec, 22 by Himanshu.
/// </remarks>
[DataContractAttribute]
class CCBOrderSyncCEContract
{
int batchThread;
str soList;
/// <summary>
/// Parameter method for <c>ThreadCount</c>.
/// </summary>
/// <param name = "_threadCount">Thread count for batch</param>
/// <returns>NoYesId</returns>
[DataMemberAttribute('Thread count'), SysOperationLabelAttribute("Thread count"), SysOperationHelpText('Max thread count in the current batch.')]
public int parmBatchThread(int _threadCount = batchThread)
{
batchThread = _threadCount;
return batchThread;
}
/// <summary>
/// Parameter method for <c>SOList</c>.
/// </summary>
/// <param name = "_solist">List of SO to be processed</param>
/// <returns>SO list</returns>
[DataMemberAttribute, SysOperationControlVisibilityAttribute(false)]
public str parmSOList(str _solist = soList)
{
soList = _solist;
return soList;
}
/// <summary>
/// This method sets the sales order in a list.
/// </summary>
/// <param name = "_solist">List of SO to be processed</param>
public void setSOList(List _solist)
{
soList = SysOperationHelper::base64Encode(_solist.pack());
}
/// <summary>
/// This method gets the list of SO to be processed.
/// </summary>
public List getSOList()
{
return List::create(SysOperationHelper::base64Decode(soList));
}
}
Controller class:
/// <summary>
/// Controller class for F&O orders to sync with CE.
/// </summary>
/// <remarks>
/// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.
/// </remarks>
class CCBOrderSyncCEController extends SysOperationServiceController
{
/// <summary>
/// Method for batch job description.
/// </summary>
/// <returns>Caption</returns>
public ClassDescription caption()
{
ClassDescription ret;
ret = "@CCB:CCBSyncOrderWithCE";
return ret;
}
/// <summary>
/// Method for dialog caption.
/// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.
/// </summary>
/// <param name = "_dialogCaption">_dialogCaption</param>
/// <returns>returns caption for dialog</returns>
public LabelType parmDialogCaption(LabelType _dialogCaption = '')
{
LabelType caption;
caption = "@CCB:CCBSyncOrderWithCE";
return caption;
}
/// <summary>
/// Construct Method for controller class.
/// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.
/// </summary>
/// <returns>returns CCBOrderSyncCEController object</returns>
public static CCBOrderSyncCEController construct()
{
return new CCBOrderSyncCEController();
}
/// <summary>
/// Main Method for controller class.
/// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.
/// </summary>
/// <param name = "args">args</param>
public static void main(Args args)
{
CCBOrderSyncCEController controller;
IdentifierName className;
IdentifierName methodName;
SysOperationExecutionMode executionMode;
[className, methodName, executionMode] = SysOperationServiceController::parseServiceInfo(args);
controller = new CCBOrderSyncCEController(className, methodName, executionMode);
if (controller.prompt())
{
controller.run();
}
}
}
Service class:
/// <summary>
/// Class to sync F&O orders with CE.
/// </summary>
/// <remarks>
/// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.
/// </remarks>
class CCBOrderSyncCEService extends SysOperationServiceBase
{
Set setClass = new Set(Types::Class);
/// <summary>
/// This method gets the class name to attach a method for multi thread.
/// </summary>
/// <returns>Returns <c>CCBOrderSyncCEService</c> class object.</returns>
ClassName getClassName()
{
return classStr(CCBOrderSyncCEService);
}
/// <summary>
/// This method performs CE sync of pending orders from F&O.
/// </summary>
/// <param name = "_contract"> Contract for CE sync batch job.</param>
public void startProcess(CCBOrderSyncCEContract _contract)
{
#OCCRetryCount
SysOperationServiceController controller;
CCBOrderSyncCEContract dataContract;
SalesTable salesTable, salesTableExists;
CCBOrderSyncCEErrorLogTable syncErrorLog;
BatchHeader batchHeader;
List soList;
int batchThread, soCount, soErrRectCnt, totRecord, recordCount, recordPerThread;
//Method to create separate threads for the available orders to sync with CE
void createTask()
{
try
{
controller = new SysOperationServiceController(this.getClassName(), methodStr(CCBOrderSyncCEService,syncOrdersWithCE), SysOperationExecutionMode::Asynchronous);
dataContract = controller.getDataContractObject('_contract');
dataContract.setSOList(soList);
setClass.add(controller);
batchHeader.addRuntimeTask(controller, batchHeader.parmBatchHeaderId());
//batchHeader.save();
}
catch(Exception::UpdateConflict)
{
if (appl.ttsLevel() == 0)
{
if (xSession::currentRetryCount() >= #RetryNum)
{
throw Exception::UpdateConflictNotRecovered;
}
else
{
retry;
}
}
}
catch
{
info(strFmt(this.getErrorStr()));
}
}
//Check if dual wite map for sales table/sales line is running or not
if (!this.checkDualWriteMap())
{
throw error("@CCB:CCBCESyncDWMapStatus");
return;
}
//Get batch thread count from batch parameter
batchThread = _contract.parmBatchThread();
//Get total record count to sync with CE
select count(RecId) from salesTable
where salesTable.CCBToSync == NoYes::Yes
&& (salesTable.SalesStatus == SalesStatus::Invoiced || salesTable.SalesStatus == SalesStatus::Canceled)
notexists join syncErrorLog
where syncErrorLog.OrderNumber == salesTable.SalesId;
soCount = salesTable.RecId;
//Get total record count to sync with CE from exixting error log, where error is rectified
select count(RecId) from salesTableExists
join syncErrorLog
where syncErrorLog.OrderNumber == salesTableExists.SalesId
&& syncErrorLog.ErrorRectified == NoYes::Yes
&& salesTableExists.CCBToSync == NoYes::Yes
&& (salesTableExists.SalesStatus == SalesStatus::Invoiced || salesTableExists.SalesStatus == SalesStatus::Canceled);
soErrRectCnt = salesTableExists.RecId;
//Get total records to be processed
totRecord = soCount + soErrRectCnt;
if (totRecord)
{
batchHeader = BatchHeader::getCurrentBatchHeader();
//Calculate no of records that needs to be bundled in a single thread.
recordPerThread = totRecord != 0 ? real2int(roundUp((totRecord / batchThread), 1)) : 0;
//Fetch all the orders which do not exists in the CE sync error log and sync is pending.
soList = new List(Types::String);
while select SalesId from salesTable
where salesTable.CCBToSync == NoYes::Yes
&& (salesTable.SalesStatus == SalesStatus::Invoiced || salesTable.SalesStatus == SalesStatus::Canceled)
notexists join syncErrorLog
where syncErrorLog.OrderNumber == salesTable.SalesId
{
recordCount++ ;
soList.addEnd(salesTable.SalesId);
if (recordCount == recordPerThread)
{
createTask();
recordCount = 0;
soList = new List(Types::String);
}
}
//Fetch all the orders which exists in the CE sync error log where error is rectified.
while select SalesId from salesTableExists
join syncErrorLog
where syncErrorLog.OrderNumber == salesTableExists.SalesId
&& syncErrorLog.ErrorRectified == NoYes::Yes
&& salesTableExists.CCBToSync == NoYes::Yes
&& (salesTableExists.SalesStatus == SalesStatus::Invoiced || salesTableExists.SalesStatus == SalesStatus::Canceled)
{
recordCount++ ;
soList.addEnd(salesTableExists.SalesId);
if (recordCount == recordPerThread)
{
createTask();
recordCount = 0;
soList = new List(Types::String);
}
}
//Create last thread with all remaining orders
if (recordCount > 0)
{
createTask();
recordCount = 0;
}
// for clean up data we added new task
this.createCleanupTask(setClass,batchHeader);
}
else
{
Info("@CCB:CCBCESyncOrderStatus");
}
}
/// <summary>
/// Reset dual write check for all orders in F&O to let them sync with CE.
/// </summary>
/// <param name = "_contract">Contract for CE sync batch job.</param>
public void syncOrdersWithCE(CCBOrderSyncCEContract _contract)
{
#OCCRetryCount
SalesTable salesTableUpd;
SalesLine salesLineUpd;
SalesId salesId;
List salesOrderList;
ListEnumerator soEnumList;
//int cnt;
salesOrderList = new List(Types::String);
salesOrderList = _contract.getSOList();
soEnumList = salesOrderList.getEnumerator();
while (soEnumList.moveNext())
{
salesId = soEnumList.current();
if (salesId)
{
//cnt++;
//Fetch sales table to update CE sync flag
select firstonly forupdate salesTableUpd
where salesTableUpd.SalesId == salesId;
try
{
while select forupdate salesLineUpd
where salesLineUpd.SalesId == salesTableUpd.SalesId
&& salesLineUpd.CCBToSync == NoYes::Yes
{
salesLineUpd.CCBToSync = NoYes::No;
ttsbegin;
salesLineUpd.update();
ttscommit;
}
ttsbegin;
salesTableUpd.CCBToSync = NoYes::No;
salesTableUpd.update();
ttscommit;
}
catch(Exception::Deadlock)
{
if (xSession::currentRetryCount() >= #RetryNum)
{
this.addErrorLog(salesTableUpd.SalesId, salesTableUpd.InventLocationId);
}
else
{
salesTableUpd.reread();
retry;
}
}
catch(Exception::UpdateConflict)
{
if (xSession::currentRetryCount() >= #RetryNum)
{
salesTableUpd.reread();
this.addErrorLog(salesTableUpd.SalesId, salesTableUpd.InventLocationId);
}
else
{
salesTableUpd.reread();
retry;
}
}
catch
{
this.addErrorLog(salesTableUpd.SalesId, salesTableUpd.InventLocationId);
}
}
}
//Info(strFmt("%1", cnt));
}
/// <summary>
/// Add error log for CE sync.
/// </summary>
/// <param name = "_salesId">Sales order</param>
/// <param name = "_warehouse">Warehouse</param>
public void addErrorLog(SalesId _salesId, SalesInventLocationId _warehouse)
{
CCBOrderSyncCEErrorLogTable ceSyncErrorLog;
SalesLine salesLine;
WHSLoadLine loadLine;
//Fetch load id
select firstonly LoadId from loadLine
exists join salesLine
where salesLine.InventTransId == loadLine.InventTransId
&& salesLine.SalesId == _salesId
&& loadLine.InventTransType == InventTransType::Sales;
ceSyncErrorLog.LoadId = loadLine.LoadId;
ceSyncErrorLog.InventLocationId = _warehouse;
ceSyncErrorLog.OrderNumber = _salesId;
ceSyncErrorLog.ErrorInfo = this.getErrorStr();
ceSyncErrorLog.insert();
}
/// <summary>
/// Method to capture the standard error caught during the current process.
/// </summary>
/// <returns>error</returns>
public str getErrorStr()
{
SysInfologEnumerator enumerator;
SysInfologMessageStruct msgStruct;
Exception exception;
str error;
enumerator = SysInfologEnumerator::newData(infolog.cut());
while (enumerator.moveNext())
{
msgStruct = new SysInfologMessageStruct(enumerator.currentMessage());
exception = enumerator.currentException();
error = strfmt("@SYS82282"+ ' '+"@MCR26302", error, msgStruct.message());
}
return error;
}
/// <summary>
/// This method is used to check if Dual write map is running or not.
/// </summary>
/// <returns>Boolean true/false, if the process can move further.</returns>
public boolean checkDualWriteMap()
{
DualWriteProjectConfiguration dualWriteProjConfig;
DMFEntity dmfEntity;
BusinessEventsDefinition eventDef;
boolean ret = false;
Select firstonly RecId from dualWriteProjConfig
exists join dmfEntity
exists join eventDef
where eventDef.RefEntityName == dmfEntity.TargetEntity
&& dmfEntity.EntityName == dualWriteProjConfig.InternalEntityName
&& dualWriteProjConfig.Status == DualWriteProjectStatus::Queueing
&& (eventDef.RefTableName == tableStr(SalesTable) || eventDef.RefTableName == tableStr(SalesLine));
ret = (dualWriteProjConfig.RecId) ? false : true;
return ret;
}
/// <summary>
/// Create task for clean up data
/// </summary>
/// <param name = "_set"> set</param>
/// <param name = "_batchHeader">BatchHeader</param>
public void createCleanupTask(Set _set, BatchHeader _batchHeader)
{
#OCCRetryCount
SysOperationServiceController controller,batchTask;
SetEnumerator se;
try
{
controller = new SysOperationServiceController(this.getClassName(), methodStr(CCBOrderSyncCEService,cleanUpData), SysOperationExecutionMode::Asynchronous);
_batchHeader.addRuntimeTask(controller, _batchHeader.parmBatchHeaderId());
// Create dependencies
se = _set.getEnumerator();
while (se.moveNext())
{
batchTask = se.current(); // Task to make dependent on
_batchHeader.addDependency(controller,batchTask,BatchDependencyStatus::Finished);
}
_batchHeader.save();
}
catch(Exception::UpdateConflict)
{
if (appl.ttsLevel() == 0)
{
if (xSession::currentRetryCount() >= #RetryNum)
{
throw Exception::UpdateConflictNotRecovered;
}
else
{
retry;
}
}
}
catch
{
info(strFmt(this.getErrorStr()));
}
}
/// <summary>
/// Clean up error log data
/// </summary>
public void cleanUpData()
{
CCBOrderSyncCEErrorLogTable errorLog,errorLogLast,errorLogDelete;
SalesTable salesTable;
//Delete logs where order is synced.
delete_from errorLog
exists join salesTable
where salesTable.SalesId == errorLog.OrderNumber
&& salesTable.CCBToSync == NoYes::No;
errorLog.clear();
//delete rectified errors
//delete_from errorLog where errorLog.ErrorRectified == NoYes::Yes;
delete_from errorLog
where errorLog.ErrorRectified == NoYes::Yes;
errorLog.clear();
//keep only the last error of each order:
while select OrderNumber from errorLog
group by OrderNumber
{
errorLogLast.clear();
errorLogDelete.clear();
select firstonly RecId from errorLogLast
order by CreatedDateTime desc
where errorLogLast.OrderNumber == errorLog.OrderNumber;
ttsbegin;
//Delete records
delete_from errorLogDelete
where errorLogDelete.OrderNumber == errorLog.OrderNumber
&& errorLogDelete.RecId != errorLogLast.RecId;
ttscommit;
}
Info("@CCB:LogCleanUp");
}
}
No comments:
Post a Comment