Wednesday, 14 June 2023

In multi threading batch last task is dependent on all the tasks execution . Need to create dependency for execute last task on other tasks statuses in x++ D365 (F&O)

 1. In multi threading batch last task is dependent on all the tasks execution . Need to create dependency for execute last task on other tasks statuses in x++ D365 (F&O)

2. Dependency task is created in createCleanupTask method.

Contract class:

/// <summary>

/// A data contract class for CE sync batch <c>CCBOrderSyncCEService</c>.

/// </summary>

/// <remarks>

/// Developed to sync F&O orders with CE, dated 09 Dec, 22 by Himanshu.

/// </remarks>

[DataContractAttribute]

class CCBOrderSyncCEContract

{

    int     batchThread;

    str     soList;


    /// <summary>

    /// Parameter method for <c>ThreadCount</c>.

    /// </summary>

    /// <param name = "_threadCount">Thread count for batch</param>

    /// <returns>NoYesId</returns>

    [DataMemberAttribute('Thread count'), SysOperationLabelAttribute("Thread count"), SysOperationHelpText('Max thread count in the current batch.')]

    public int parmBatchThread(int _threadCount = batchThread)

    {

        batchThread = _threadCount;

        return batchThread;

    }


    /// <summary>

    /// Parameter method for <c>SOList</c>.

    /// </summary>

    /// <param name = "_solist">List of SO to be processed</param>

    /// <returns>SO list</returns>

    [DataMemberAttribute, SysOperationControlVisibilityAttribute(false)]

    public str parmSOList(str _solist = soList)

    {

        soList = _solist;

        return soList;

    }


    /// <summary>

    /// This method sets the sales order in a list.

    /// </summary>

    /// <param name = "_solist">List of SO to be processed</param>

    public void setSOList(List _solist)

    {

        soList = SysOperationHelper::base64Encode(_solist.pack());

    }


    /// <summary>

    /// This method gets the list of SO to be processed.

    /// </summary>

    public List getSOList()

    {

        return List::create(SysOperationHelper::base64Decode(soList));

    }


}

Controller class:

/// <summary>

/// Controller class for F&O orders to sync with CE.

/// </summary>

/// <remarks>

/// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.

/// </remarks>

class CCBOrderSyncCEController extends SysOperationServiceController

{

    /// <summary>

    /// Method for batch job description.

    /// </summary>

    /// <returns>Caption</returns>

    public ClassDescription caption()

    {

        ClassDescription    ret;


        ret = "@CCB:CCBSyncOrderWithCE";


        return ret;

    }


    /// <summary>

    /// Method for dialog caption.

    /// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.

    /// </summary>

    /// <param name = "_dialogCaption">_dialogCaption</param>

    /// <returns>returns caption for dialog</returns>

    public LabelType parmDialogCaption(LabelType _dialogCaption = '')

    {

        LabelType   caption;


        caption = "@CCB:CCBSyncOrderWithCE";


        return caption;

    }


    /// <summary>

    /// Construct Method for controller class.

    /// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.

    /// </summary>

    /// <returns>returns CCBOrderSyncCEController object</returns>

    public static CCBOrderSyncCEController construct()

    {

        return new CCBOrderSyncCEController();

    }


    /// <summary>

    /// Main Method for controller class.

    /// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.

    /// </summary>

    /// <param name = "args">args</param>

    public static void main(Args args)

    {

        CCBOrderSyncCEController        controller;

        IdentifierName                  className;

        IdentifierName                  methodName;

        SysOperationExecutionMode       executionMode;


        [className, methodName, executionMode] = SysOperationServiceController::parseServiceInfo(args);


        controller = new CCBOrderSyncCEController(className, methodName, executionMode);

        

        if (controller.prompt())

        {

            controller.run();

        }

    }


}

Service class:

/// <summary>

/// Class to sync F&O orders with CE.

/// </summary>

/// <remarks>

/// N Developed to sync F&O orders with CE, dated 04 Oct, 22 by Himanshu.

/// </remarks>

class CCBOrderSyncCEService extends SysOperationServiceBase

{

    Set     setClass = new Set(Types::Class);


    /// <summary>

    /// This method gets the class name to attach a method for multi thread.

    /// </summary>

    /// <returns>Returns <c>CCBOrderSyncCEService</c> class object.</returns>

    ClassName getClassName()

    {

        return classStr(CCBOrderSyncCEService);

    }


    /// <summary>

    /// This method performs CE sync of pending orders from F&O.

    /// </summary>

    /// <param name = "_contract"> Contract for CE sync batch job.</param>

    public void startProcess(CCBOrderSyncCEContract _contract)

    {

        #OCCRetryCount

        SysOperationServiceController   controller;

        CCBOrderSyncCEContract          dataContract;

        SalesTable                      salesTable, salesTableExists;

        CCBOrderSyncCEErrorLogTable     syncErrorLog;

        BatchHeader                     batchHeader;

        List                            soList;

        int                             batchThread, soCount, soErrRectCnt, totRecord, recordCount, recordPerThread;


        //Method to create separate threads for the available orders to sync with CE

        void createTask()

        {

            try

            {

                controller = new SysOperationServiceController(this.getClassName(), methodStr(CCBOrderSyncCEService,syncOrdersWithCE), SysOperationExecutionMode::Asynchronous);

                dataContract = controller.getDataContractObject('_contract');

                dataContract.setSOList(soList);

                setClass.add(controller);

                batchHeader.addRuntimeTask(controller, batchHeader.parmBatchHeaderId());

                //batchHeader.save();

            }

            catch(Exception::UpdateConflict)

            {

                if (appl.ttsLevel() == 0)

                {

                    if (xSession::currentRetryCount() >= #RetryNum)

                    {

                        throw Exception::UpdateConflictNotRecovered;

                    }

                    else

                    {

                        retry;

                    }

                }

            }

            catch

            {

                info(strFmt(this.getErrorStr()));

            }

        }

        //Check if dual wite map for sales table/sales line is running or not

        if (!this.checkDualWriteMap())

        {

            throw error("@CCB:CCBCESyncDWMapStatus");

            return;

        }


        //Get batch thread count from batch parameter

        batchThread = _contract.parmBatchThread();


        //Get total record count to sync with CE

        select count(RecId) from salesTable

            where salesTable.CCBToSync       == NoYes::Yes

               && (salesTable.SalesStatus    == SalesStatus::Invoiced || salesTable.SalesStatus == SalesStatus::Canceled)

            notexists join syncErrorLog

                where syncErrorLog.OrderNumber  == salesTable.SalesId;


        soCount = salesTable.RecId;


        //Get total record count to sync with CE from exixting error log, where error is rectified

        select count(RecId) from salesTableExists

            join syncErrorLog

                where syncErrorLog.OrderNumber          == salesTableExists.SalesId

                   && syncErrorLog.ErrorRectified       == NoYes::Yes

                   && salesTableExists.CCBToSync        == NoYes::Yes

                   && (salesTableExists.SalesStatus     == SalesStatus::Invoiced || salesTableExists.SalesStatus == SalesStatus::Canceled);


        soErrRectCnt = salesTableExists.RecId;


        //Get total records to be processed

        totRecord = soCount + soErrRectCnt;


        if (totRecord)

        {

            batchHeader = BatchHeader::getCurrentBatchHeader();


            //Calculate no of records that needs to be bundled in a single thread.

            recordPerThread = totRecord != 0 ? real2int(roundUp((totRecord / batchThread), 1)) : 0;


            //Fetch all the orders which do not exists in the CE sync error log and sync is pending.

            soList = new List(Types::String);


            while select SalesId from salesTable

                where salesTable.CCBToSync       == NoYes::Yes

                   && (salesTable.SalesStatus    == SalesStatus::Invoiced || salesTable.SalesStatus == SalesStatus::Canceled)

                notexists join syncErrorLog

                    where syncErrorLog.OrderNumber  == salesTable.SalesId

            {

                recordCount++ ;

                soList.addEnd(salesTable.SalesId);


                if (recordCount == recordPerThread)

                {

                    createTask();

                    recordCount = 0;

                    soList = new List(Types::String);

                }

            }


            //Fetch all the orders which exists in the CE sync error log where error is rectified.

            while select SalesId from salesTableExists

                join syncErrorLog

                    where syncErrorLog.OrderNumber          == salesTableExists.SalesId

                       && syncErrorLog.ErrorRectified       == NoYes::Yes

                       && salesTableExists.CCBToSync        == NoYes::Yes

                       && (salesTableExists.SalesStatus     == SalesStatus::Invoiced || salesTableExists.SalesStatus == SalesStatus::Canceled)

            {

                recordCount++ ;

                soList.addEnd(salesTableExists.SalesId);


                if (recordCount == recordPerThread)

                {

                    createTask();

                    recordCount = 0;

                    soList = new List(Types::String);

                }

            }


            //Create last thread with all remaining orders

            if (recordCount > 0)

            {

                createTask();

                recordCount = 0;

            }


            // for clean up data we added new task

            this.createCleanupTask(setClass,batchHeader);

        }

        else

        {

            Info("@CCB:CCBCESyncOrderStatus");

        }

    }


    /// <summary>

    /// Reset dual write check for all orders in F&O to let them sync with CE.

    /// </summary>

    /// <param name = "_contract">Contract for CE sync batch job.</param>

    public void syncOrdersWithCE(CCBOrderSyncCEContract _contract)

    {

        #OCCRetryCount


        SalesTable                      salesTableUpd;

        SalesLine                       salesLineUpd;

        SalesId                         salesId;

        List                            salesOrderList;

        ListEnumerator                  soEnumList;

        //int                             cnt;

        

        salesOrderList  = new List(Types::String);

        salesOrderList  = _contract.getSOList();

        soEnumList      = salesOrderList.getEnumerator();

        

        while (soEnumList.moveNext())

        {

            salesId = soEnumList.current();


            if (salesId)

            {

                //cnt++;

                //Fetch sales table to update CE sync flag

                select firstonly forupdate salesTableUpd

                    where salesTableUpd.SalesId == salesId;


                try

                {

                    while select forupdate salesLineUpd

                        where salesLineUpd.SalesId      == salesTableUpd.SalesId

                           && salesLineUpd.CCBToSync    == NoYes::Yes

                    {

                        salesLineUpd.CCBToSync  = NoYes::No;

                        ttsbegin;

                        salesLineUpd.update();

                        ttscommit;

                    }


                    ttsbegin;

                    salesTableUpd.CCBToSync = NoYes::No;

                    salesTableUpd.update();

                    ttscommit;

                }

                catch(Exception::Deadlock)

                {

                    if (xSession::currentRetryCount() >= #RetryNum)

                    {

                        this.addErrorLog(salesTableUpd.SalesId, salesTableUpd.InventLocationId);

                    }

                    else

                    {

                        salesTableUpd.reread();

                        retry;

                    }

                }

                catch(Exception::UpdateConflict)

                {

                    if (xSession::currentRetryCount() >= #RetryNum)

                    {

                        salesTableUpd.reread();

                        this.addErrorLog(salesTableUpd.SalesId, salesTableUpd.InventLocationId);

                    }

                    else

                    {

                        salesTableUpd.reread();

                        retry;

                    }

                }

                catch

                {

                    this.addErrorLog(salesTableUpd.SalesId, salesTableUpd.InventLocationId);

                }

            }

        }

        //Info(strFmt("%1", cnt));

    }


    /// <summary>

    /// Add error log for CE sync.

    /// </summary>

    /// <param name = "_salesId">Sales order</param>

    /// <param name = "_warehouse">Warehouse</param>

    public void addErrorLog(SalesId _salesId, SalesInventLocationId _warehouse)

    {

        CCBOrderSyncCEErrorLogTable     ceSyncErrorLog;

        SalesLine                       salesLine;

        WHSLoadLine                     loadLine;


        //Fetch load id

        select firstonly LoadId from loadLine

            exists join salesLine

            where salesLine.InventTransId   == loadLine.InventTransId

               && salesLine.SalesId         == _salesId

               && loadLine.InventTransType  == InventTransType::Sales;


        ceSyncErrorLog.LoadId           = loadLine.LoadId;

        ceSyncErrorLog.InventLocationId = _warehouse;

        ceSyncErrorLog.OrderNumber      = _salesId;

        ceSyncErrorLog.ErrorInfo        = this.getErrorStr();

        

        ceSyncErrorLog.insert();

    }


    /// <summary>

    /// Method to capture the standard error caught during the current process.

    /// </summary>

    /// <returns>error</returns>

    public str getErrorStr()

    {

        SysInfologEnumerator        enumerator;

        SysInfologMessageStruct     msgStruct;

        Exception                   exception;

        str                         error;


        enumerator = SysInfologEnumerator::newData(infolog.cut());


        while (enumerator.moveNext())

        {

            msgStruct = new SysInfologMessageStruct(enumerator.currentMessage());

            exception = enumerator.currentException();

            error = strfmt("@SYS82282"+ ' '+"@MCR26302", error, msgStruct.message());

        }

        return error;

    }


    /// <summary>

    /// This method is used to check if Dual write map is running or not.

    /// </summary>

    /// <returns>Boolean true/false, if the process can move further.</returns>

    public boolean checkDualWriteMap()

    {

        DualWriteProjectConfiguration   dualWriteProjConfig;

        DMFEntity                       dmfEntity;

        BusinessEventsDefinition        eventDef;

        boolean                         ret = false;


        Select firstonly RecId from dualWriteProjConfig

            exists join dmfEntity

            exists join eventDef

            where eventDef.RefEntityName        == dmfEntity.TargetEntity

               && dmfEntity.EntityName          == dualWriteProjConfig.InternalEntityName

               && dualWriteProjConfig.Status    == DualWriteProjectStatus::Queueing

               && (eventDef.RefTableName == tableStr(SalesTable) || eventDef.RefTableName == tableStr(SalesLine));


        ret = (dualWriteProjConfig.RecId) ? false : true;


        return ret;

    }


    /// <summary>

    /// Create task for clean up data

    /// </summary>

    /// <param name = "_set"> set</param>

    /// <param name = "_batchHeader">BatchHeader</param>

    public void createCleanupTask(Set _set, BatchHeader _batchHeader)

    {

        #OCCRetryCount

        SysOperationServiceController   controller,batchTask;

        SetEnumerator                   se;


        try

        {

            controller = new SysOperationServiceController(this.getClassName(), methodStr(CCBOrderSyncCEService,cleanUpData), SysOperationExecutionMode::Asynchronous);

            

            _batchHeader.addRuntimeTask(controller, _batchHeader.parmBatchHeaderId());

            

            // Create dependencies

            se = _set.getEnumerator();

            while (se.moveNext())

            {

                batchTask = se.current(); // Task to make dependent on

                _batchHeader.addDependency(controller,batchTask,BatchDependencyStatus::Finished);

            }

            _batchHeader.save();

        }

        catch(Exception::UpdateConflict)

        {

            if (appl.ttsLevel() == 0)

            {

                if (xSession::currentRetryCount() >= #RetryNum)

                {

                    throw Exception::UpdateConflictNotRecovered;

                }

                else

                {

                    retry;

                }

            }

        }

        catch

        {

            info(strFmt(this.getErrorStr()));

        }

    }


    /// <summary>

    ///  Clean up error log data

    /// </summary>

    public void cleanUpData()

    {

        CCBOrderSyncCEErrorLogTable errorLog,errorLogLast,errorLogDelete;

        SalesTable                  salesTable;


        //Delete logs where order is synced.

        delete_from errorLog

            exists join salesTable

            where salesTable.SalesId == errorLog.OrderNumber

            && salesTable.CCBToSync  == NoYes::No;


        errorLog.clear();

        //delete rectified errors

        //delete_from errorLog where errorLog.ErrorRectified == NoYes::Yes;

        delete_from errorLog

            where errorLog.ErrorRectified == NoYes::Yes;

        

        errorLog.clear();

        //keep only the last error of each order:

        while select OrderNumber from errorLog

            group by OrderNumber

        {

            errorLogLast.clear();

            errorLogDelete.clear();


            select firstonly RecId from errorLogLast

                order by CreatedDateTime desc

                where errorLogLast.OrderNumber == errorLog.OrderNumber;


            ttsbegin;

            //Delete records

            delete_from errorLogDelete

                where errorLogDelete.OrderNumber == errorLog.OrderNumber

                &&    errorLogDelete.RecId != errorLogLast.RecId;

            ttscommit;

        }

        Info("@CCB:LogCleanUp");

    }


}





No comments:

Post a Comment