Wednesday, 14 June 2023

Multi threading batch in x++ (D365 F&O)

 1. User has option to select number of threads from contract class parameters.

2.  We have calculation to select number of records per thread

3. startProcess() method is the main method for deviding records per thread and adding tasks

4. defaultly batch processing check box enabled when we open menuitem.


Contract class:

[DataContractAttribute]

class CCBSOPostalAddressUpdateBatchContract implements SysOperationValidatable

{

    int     batchThread;

    str     soList,inventTransIdList;


    

    /// <summary>

    /// Parameter method for <c>ThreadCount</c>.

    /// </summary>

    /// <param name = "_threadCount">Thread count for batch</param>

    /// <returns>NoYesId</returns>

    [DataMemberAttribute('Thread count'), SysOperationLabelAttribute("Thread count"), SysOperationHelpText('Max thread count in the current batch.')]

    public int parmBatchThread(int _threadCount = batchThread)

    {

        batchThread = _threadCount;

        return batchThread;

    }


    /// <summary>

    /// Parameter method for <c>SOList</c>.

    /// </summary>

    /// <param name = "_solist">List of SO to be processed</param>

    /// <returns>SO list</returns>

    [DataMemberAttribute, SysOperationControlVisibilityAttribute(false)]

    public str parmSOList(str _solist = soList)

    {

        soList = _solist;

        return soList;

    }


    /// <summary>

    /// This method sets the sales order in a list.

    /// </summary>

    /// <param name = "_solist">List of SO to be processed</param>

    public void setSOList(List _solist)

    {

        soList = SysOperationHelper::base64Encode(_solist.pack());

    }


    /// <summary>

    /// This method gets the list of SO to be processed.

    /// </summary>

    /// <returns>decode so list </returns>

    public List getSOList()

    {

        return List::create(SysOperationHelper::base64Decode(soList));

    }


    /// <summary>

    /// Parameter method for <c>inventTransIdList</c>.

    /// </summary>

    /// <param name = "_inventTransIdList">List of location to be processed</param>

    /// <returns>location list</returns>

    [DataMemberAttribute, SysOperationControlVisibilityAttribute(false)]

    public str parmInventTransIdList(str _inventTransIdList = inventTransIdList)

    {

        inventTransIdList = _inventTransIdList;

        return inventTransIdList;

    }


    /// <summary>

    /// This method sets the invent transactions in a list.

    /// </summary>

    /// <param name = "_inventTransIdList">List of SO to be processed</param>

    public void setInventTransIdList(List _inventTransIdList)

    {

        inventTransIdList = SysOperationHelper::base64Encode(_inventTransIdList.pack());

    }


    /// <summary>

    /// This method gets the list of invent transactions to be processed.

    /// </summary>

    /// <returns>decode invent transaction ids</returns>

    public List getInventTransIdList()

    {

        return List::create(SysOperationHelper::base64Decode(inventTransIdList));

    }


    /// <summary>

    /// validation for parameters

    /// </summary>

    /// <returns>true/false</returns>

    public boolean validate()

    {

        boolean ret = true;


        if (batchThread < 2 )

        {

            ret = checkFailed("@CCB:SOThreadCount");

        }

        return ret;

    }


}


Controller class:

/// <summary>

/// to update SO postal addressess

/// </summary>

class CCBSOPostalAddressUpdateBatchController extends SysOperationServiceController

{


    /// <summary>

    /// Retrieves the caption of the dialog box.

    /// </summary>

    /// <returns>The caption of the dialog box.</returns>

    public ClassDescription caption()

    {

        return "@CCB:SODeliveryAddressUpdate";

    }


    /// <summary>

    /// Construct Method for controller class.

    /// N Developed to sync update SO postal addresses, dated 27 May, 23 by Irfan.

    /// </summary>

    /// <returns>returns CCBSOPostalAddressUpdateController object</returns>

    public static CCBSOPostalAddressUpdateBatchController construct()

    {

        return new CCBSOPostalAddressUpdateBatchController();

    }


    /// <summary>

    /// Initializes the update update so postal addressessr controller class instance.

    /// </summary>

    /// <param name = "_args">args</param>

    public static void main(Args _args)

    {

        CCBSOPostalAddressUpdateBatchController     controller;

        IdentifierName                              className;

        IdentifierName                              methodName;

        SysOperationExecutionMode                   executionMode;


        [className, methodName, executionMode]      = SysOperationServiceController::parseServiceInfo(_args);

        

        controller = new CCBSOPostalAddressUpdateBatchController(className, methodName, executionMode);

        controller.isInBatchFlagSet = true;   /// for default batch processing as enabled

        //controller.isInBatch();

        controller.parmInBatch(true);

        controller.batchInfo().parmBatchExecute(true);

        //controller.batchInfo().fieldBatchExecuteValue(true);

        if (controller.prompt())

        {

            controller.run();

        }

    }


}

Service Class:

/// <summary>

/// for update cost estimate process number for blank process number records in cost estimate header and lines tables

/// </summary>

class CCBSOPostalAddressUpdateBatchService extends SysOperationServiceBase

{

    #OCCRetryCount

    utcdatetime                      validDateTime       = DateTimeUtil::utcNow();

    utcdatetime                      validFrom           = DateTimeUtil::minValue();

    utcdatetime                      validTo             = DateTimeUtil::maxValue();



    /// <summary>

    /// This method gets the class name to attach a method for multi thread.

    /// </summary>

    /// <returns>Returns <c>CCBSOPostalAddressUpdateBatchService</c> class object.</returns>

    ClassName getClassName()

    {

        return classStr(CCBSOPostalAddressUpdateBatchService);

    }


    /// <summary>

    /// This method performs update SO postal addresses.

    /// </summary>

    /// <param name = "_contract"> Contract for update SO postal addresses.</param>

    public void startProcess(CCBSOPostalAddressUpdateBatchContract _contract)

    {

        

        SysOperationServiceController           controller;

        CCBSOPostalAddressUpdateBatchContract   dataContract;

        BatchHeader                             batchHeader;

        List                                    soList,inventTransIdList;

        int                                     batchThread,batchThreadHeader,batchThreadLines;

        int                                     totRecord,headerCount,LinesCount,recordCount, recordPerThread;

        SalesTable                              salesTable;

        SalesLine                               salesLine;

        LogisticsPostalAddress                  logisticsPostalAddress,logisticsPostalAddressNewAddress;

        CCBDeliveryHeaderImport                 deliveryHeaderImport;

        boolean                                 btachProcessFlag = false;

        

        //Method to create separate threads for the available orders to sync with CE

        void createTask(boolean lastTask = false)

        {

            try

            {

                if (lastTask)

                {

                    controller = new SysOperationServiceController(this.getClassName(), methodStr(CCBSOPostalAddressUpdateBatchService,processOrderLinesUpdate), SysOperationExecutionMode::Asynchronous);

                }

                else

                {

                    controller = new SysOperationServiceController(this.getClassName(), methodStr(CCBSOPostalAddressUpdateBatchService,processOrderHeaderUpdate), SysOperationExecutionMode::Asynchronous);

                }

                dataContract = controller.getDataContractObject('_contract');


                if (lastTask)

                {

                    dataContract.setInventTransIdList(inventTransIdList);

                }

                else

                {

                    dataContract.setSOList(soList);

                }

                batchHeader.addRuntimeTask(controller, batchHeader.parmBatchHeaderId());

                batchHeader.save();

            }

            catch(Exception::UpdateConflict)

            {

                if (appl.ttsLevel() == 0)

                {

                    if (xSession::currentRetryCount() >= #RetryNum)

                    {

                        throw Exception::UpdateConflictNotRecovered;

                    }

                    else

                    {

                        retry;

                    }

                }

            }

        }

        try

        {

            if (!this.isExecutingInBatch())

            {

                btachProcessFlag = true;

                throw error("@CCB:SOMultiThreadBatchError");

            }


            //Get batch thread count from batch parameter

            batchThread     = _contract.parmBatchThread(); 

            if (batchThread < 2)

            {

                batchThread = 2;

            }

        

            batchThreadHeader = roundDown(batchThread/2,1);

            batchThreadLines  = roundUp(batchThread/2,1);


            select forupdate validtimestate(validFrom, validTo) count(RecId) from salesTable

                where salesTable.SalesStatus          != SalesStatus::Delivered

                    && salesTable.SalesStatus         != SalesStatus::Invoiced

                    && salesTable.SalesStatus         != SalesStatus::Canceled

            join  logisticsPostalAddress

                where logisticsPostalAddress.RecId    == salesTable.DeliveryPostalAddress

                    && logisticsPostalAddress.ValidTo  < validDateTime

            exists join logisticsPostalAddressNewAddress

                where logisticsPostalAddressNewAddress.Location == logisticsPostalAddress.Location

                && logisticsPostalAddressNewAddress.ValidTo     >= validDateTime

            notexists join deliveryHeaderImport

                where deliveryHeaderImport.OrderNum   == salesTable.SalesId;


            headerCount = salesTable.RecId;


            logisticsPostalAddress.clear();

            deliveryHeaderImport.clear();

            logisticsPostalAddressNewAddress.clear();

            select validtimestate(validFrom, validTo) count(RecId) from salesLine

                where salesLine.SalesStatus         != SalesStatus::Delivered

                && salesLine.SalesStatus            != SalesStatus::Invoiced

                && salesLine.SalesStatus            != SalesStatus::Canceled

            join  logisticsPostalAddress

                where logisticsPostalAddress.RecId  == salesLine.DeliveryPostalAddress

                && logisticsPostalAddress.ValidTo    < validDateTime

            exists join logisticsPostalAddressNewAddress

                where logisticsPostalAddressNewAddress.Location == logisticsPostalAddress.Location

                && logisticsPostalAddressNewAddress.ValidTo     >= validDateTime

            notexists join deliveryHeaderImport

                where deliveryHeaderImport.OrderNum == salesLine.SalesId;


            LinesCount = salesLine.RecId;


            totRecord  = headerCount+LinesCount;


            if (totRecord)

            {

                batchHeader = BatchHeader::getCurrentBatchHeader();


                // for header orders loop

                if (headerCount)

                {

                    recordPerThread   = headerCount != 0 ? real2int(roundUp((headerCount / batchThreadHeader), 1)) : 0;

                    soList            = new List(Types::String);


                    salesTable.clear();

                    logisticsPostalAddress.clear();

                    deliveryHeaderImport.clear();

                    logisticsPostalAddressNewAddress.clear();


                    while select validtimestate(validFrom, validTo) DeliveryPostalAddress,SalesId,SalesStatus from  salesTable

                        order by salesTable.DeliveryPostalAddress asc

                        where salesTable.SalesStatus          != SalesStatus::Delivered

                        && salesTable.SalesStatus             != SalesStatus::Invoiced

                        && salesTable.SalesStatus             != SalesStatus::Canceled

                    join  RecId,ValidFrom,ValidTo,Location from logisticsPostalAddress

                        where logisticsPostalAddress.RecId    == salesTable.DeliveryPostalAddress

                        && logisticsPostalAddress.ValidTo      < validDateTime

                    exists join logisticsPostalAddressNewAddress

                        where logisticsPostalAddressNewAddress.Location == logisticsPostalAddress.Location

                        && logisticsPostalAddressNewAddress.ValidTo     >= validDateTime

                    notexists join deliveryHeaderImport

                        where deliveryHeaderImport.OrderNum   == salesTable.SalesId

                    {

                        recordCount++ ;

                        soList.addEnd(salesTable.SalesId);


                        if (recordCount == recordPerThread)

                        {

                            createTask();

                            recordCount         = 0;

                            soList              = new List(Types::String);

                        }

                    }


                    //Create last thread with all remaining orders

                    if (recordCount > 0)

                    {

                        createTask();

                        recordCount         = 0;

                    }

            

                }


                // for order lines loop

                recordPerThread = 0;

                recordCount     = 0;

                if (LinesCount)

                {

                    //Calculate no of records that needs to be bundled in a single thread.

                    recordPerThread   = LinesCount != 0 ? real2int(roundUp((LinesCount / batchThreadLines), 1)) : 0;

                    inventTransIdList = new List(Types::String);


                    salesTable.clear();

                    salesLine.clear();

                    logisticsPostalAddress.clear();

                    deliveryHeaderImport.clear();

                    logisticsPostalAddressNewAddress.clear();


                    // fetch line loop records

                    while select validtimestate(validFrom, validTo) InventTransId,DeliveryPostalAddress,SalesStatus,SalesId,LineNum from salesLine

                        order by salesLine.DeliveryPostalAddress asc

                        where salesLine.SalesStatus           != SalesStatus::Delivered

                        && salesLine.SalesStatus              != SalesStatus::Invoiced

                        && salesLine.SalesStatus              != SalesStatus::Canceled

                    join RecId,ValidFrom,ValidTo,Location from logisticsPostalAddress

                        where logisticsPostalAddress.RecId    == salesLine.DeliveryPostalAddress

                        && logisticsPostalAddress.ValidTo      < validDateTime

                    exists join logisticsPostalAddressNewAddress

                        where logisticsPostalAddressNewAddress.Location == logisticsPostalAddress.Location

                        && logisticsPostalAddressNewAddress.ValidTo     >= validDateTime

                    notexists join deliveryHeaderImport

                        where deliveryHeaderImport.OrderNum   == salesLine.SalesId

                    {

                        recordCount++ ;

                        inventTransIdList.addEnd(salesLine.InventTransId);


                        if (recordCount == recordPerThread)

                        {

                            createTask(true);

                            recordCount         = 0;

                            inventTransIdList   = new List(Types::String);

                        }

                    }


                    //Create last thread with all remaining orders

                    if (recordCount > 0 )

                    {

                        createTask(true);

                        recordCount         = 0;

                    }

                }

            

                Info("@CCB:SODeliveryAddressUpdateCompleted");

            }

            else

            {

                Info("@CCB:SOExpiredAddress");

            }

        }

        catch(Exception::Error)

        {

            if (btachProcessFlag)

            {

                Info("@CCB:SOMultiThreadBatchError");

            }

        }

    }


    /// <summary>

    /// update all sales order headers

    /// </summary>

    /// <param name = "_contract">Contract for CE sync batch job.</param>

    public void processOrderHeaderUpdate(CCBSOPostalAddressUpdateBatchContract _contract)

    {

        SalesTable                      salesTable;

        WHSLoadLine                     whsLoadLine;

        SalesId                         salesId;

        LogisticsPostalAddress          logisticsPostalAddress,logisticsPostalAddressNew;

        List                            salesOrderList = new List(Types::String);

        ListEnumerator                  soEnumList;

        LogisticsPostalAddressRecId     oldDeliveryAddress;

        

        salesOrderList                  = _contract.getSOList();

        soEnumList                      = salesOrderList.getEnumerator();

        

        while (soEnumList.moveNext())

        {

            salesId = soEnumList.current();


            if (salesId)

            {

                try

                {

                    select firstonly forupdate validtimestate(validFrom, validTo)  salesTable

                        join  logisticsPostalAddress

                        where logisticsPostalAddress.RecId    == salesTable.DeliveryPostalAddress

                            && logisticsPostalAddress.ValidTo < validDateTime

                            && salesTable.SalesId             == salesId;

                    if (salesTable.RecId)

                    {

                        if (oldDeliveryAddress != salesTable.DeliveryPostalAddress)

                        {

                            logisticsPostalAddressNew             = LogisticsPostalAddress::findByLocation(logisticsPostalAddress.Location);

                        }

                        oldDeliveryAddress      = salesTable.DeliveryPostalAddress;


                        if ( logisticsPostalAddressNew.RecId != logisticsPostalAddress.RecId)

                        {

                            select firstonly whsLoadLine

                                    where whsLoadLine.OrderNum == salesTable.SalesId;


                            if ( !whsLoadLine

                                || (whsLoadLine && whsLoadTable::find(whsLoadLine.LoadId).LoadStatus != WHSLoadStatus::Shipped)

                               )

                            {

                                ttsbegin;

                                salesTable.DeliveryPostalAddress = logisticsPostalAddressNew.RecId;

                                salesTable.doUpdate();

                                ttscommit;

                            }

                        }

                    }

                }

                catch(Exception::UpdateConflict)

                {

                    if (appl.ttsLevel() == 0)

                    {

                        if (xSession::currentRetryCount() >= #RetryNum)

                        {

                            salesTable.reread();

                            throw Exception::UpdateConflictNotRecovered;

                        }

                        else

                        {

                            salesTable.reread();

                            retry;

                        }

                    }

                }

                catch

                {

                    info(strFmt("@CCB:SOHeaderAddressIssue", salesTable.SalesId));

                }

            }

        }

    }


    /// <summary>

    /// Update sales order lines address

    /// </summary>

    /// <param name = "_contract">CCBSOPostalAddressUpdateBatchContract</param>

    public void processOrderLinesUpdate(CCBSOPostalAddressUpdateBatchContract _contract)

    {

        SalesLine                       salesLine;

        WHSLoadLine                     whsLoadLine;

        TradeInventTransId              inventTransId;

        LogisticsPostalAddress          logisticsPostalAddress,logisticsPostalAddressNew;

        List                            inventTransIdList = new List(Types::String);

        ListEnumerator                  inventEnumTransIdList;

        LogisticsPostalAddressRecId     oldDeliveryAddress;

        

        inventTransIdList               = _contracT.getInventTransIdList();

        inventEnumTransIdList           = inventTransIdList.getEnumerator();

        

        while (inventEnumTransIdList.moveNext())

        {

            inventTransId = inventEnumTransIdList.current();

            if (inventTransId)

            {

                try

                {

                    select firstonly forupdate validtimestate(validFrom, validTo)  salesLine

                        join  logisticsPostalAddress

                        where logisticsPostalAddress.RecId    == salesLine.DeliveryPostalAddress

                            && logisticsPostalAddress.ValidTo < validDateTime

                            && salesLine.InventTransId        == inventTransId;

                    {

                        if (oldDeliveryAddress != salesLine.DeliveryPostalAddress)

                        {

                            logisticsPostalAddressNew             = LogisticsPostalAddress::findByLocation(logisticsPostalAddress.Location);

                        }

                        oldDeliveryAddress      = salesLine.DeliveryPostalAddress;


                        if ( logisticsPostalAddressNew.RecId != logisticsPostalAddress.RecId)

                        {

                            select firstonly whsLoadLine

                                    where whsLoadLine.OrderNum == salesLine.SalesId;

                    

                            if ( !whsLoadLine

                                 || (whsLoadLine && whsLoadTable::find(whsLoadLine.LoadId).LoadStatus != WHSLoadStatus::Shipped)

                               )

                            {

                                salesLine.DeliveryPostalAddress = logisticsPostalAddressNew.RecId;

                                ttsbegin;

                                salesLine.doUpdate();

                                ttscommit;

                            }

                        }

                    }

                }

                catch(Exception::Deadlock)

                {

                    if (xSession::currentRetryCount() >= #RetryNum)

                    {

                        //this.addErrorLog(salesTableUpd.SalesId, salesTableUpd.InventLocationId);

                    }

                    else

                    {

                        //salesTableUpd.reread();

                        retry;

                    }

                }

                catch(Exception::UpdateConflict)

                {

                    if (appl.ttsLevel() == 0)

                    {

                        if (xSession::currentRetryCount() >= #RetryNum)

                        {

                            salesLine.reread();

                            throw Exception::UpdateConflictNotRecovered;

                        }

                        else

                        {

                            salesLine.reread();

                            retry;

                        }

                    }

                }

                catch

                {

                    info(strFmt("@CCB:SOLineAddressIssue", salesLine.SalesId,salesLine.LineNum));

                }

            }

        }

    }


}



No comments:

Post a Comment