Skip to content

Added postprocess method to Skeleton#52

Open
sam-grant wants to merge 3 commits intoMu2e:mainfrom
sam-grant:main
Open

Added postprocess method to Skeleton#52
sam-grant wants to merge 3 commits intoMu2e:mainfrom
sam-grant:main

Conversation

@sam-grant
Copy link
Collaborator

@sam-grant sam-grant commented Jan 31, 2026

Postprocess method in Skeleton

I've been using this feature for months, but forgot to make a PR for it.

It lets you run postprocessing as part of you processor class when you inherit from Skeleton, it's really useful.

The output from pyprocess parallel processing is a list of results per file, so the whatever you return from your process_function gets appended to a results list. This makes sense, but if you wanted to combine your results over the full dataset you can now override the postprocess method and do the combination there. It would then be executed automatically as part of your processing pipeline when you call execute.

Working example:

If the output from each worker is an awkward array, you can merge them in postprocess like this:

import awkward as ak
from pyutils.pyprocess import Skeleton, Processor 

class DemoProcessor(Skeleton): 
    def __init__(self):
        super().__init__()
        self.file_list_path = "file_list.txt"
        self.branches = ["event"]
        self.use_remote = True
        self.location = "disk"
        self.worker_verbostiy = 0
        

    def process_file(self, file_name):
        """ 
        Custom worker function 
        Passed to pyprocess in by Skeleton execute method for parallelisation
        The result from each worker is stored in a list 
        """
        
        this_processor = Processor(
            use_remote = self.use_remote,
            location = self.location,
            verbosity = self.worker_verbosity
        )
        
        this_result = this_processor.process_data(
            file_name = file_name,
            branches = self.branches,
            
        )
        
        return this_result

    def postprocess(self, results):
        """ Demo posprocess: merge results list"""
        
        print(f"Raw results type: {type(results)}") # list
        print(f"Raw results length: {len(results)}")  # one per file
        print(f"Raw results first element type: {type(results[0])}") # ak.Array

        # In this case, every result list element is an awkward array 
        # We can merge them into one here:
        merged_results = ak.concatenate(results)

        return merged_results

demo = DemoProcessor()
results = demo.execute()

print(f"\nFinal result type: {type(results)}")

Our results list of many arrays is merged into a single awkward array:

[Skeleton] [INFO] Skeleton init
[Skeleton] [INFO] Starting analysis
[pyprocess] [INFO] Initialised Processor:
	path = 'EventNtuple/ntuple'
	use_remote = True
	location = disk
	schema = root
	verbosity=1
[pyprocess] [INFO] Loading file list from file_list.txt
[pyprocess] [OK] Successfully loaded file list
	Path: None
	Count: 10 files
[pyprocess] [INFO] Starting processing on 10 files with 10 threads

Processing: 100%|██████████████████████████████| 10[/10](https://analytics-hub.fnal.gov/10) [00:06<00:00,  1.59file[/s](https://analytics-hub.fnal.gov/s), successful=10, failed=0]

[pyprocess] [INFO] Returning 10 results
Raw results type: <class 'list'>
Raw results length: 10
Raw results first element type: <class 'awkward.highlevel.Array'>
[Skeleton] [OK] Analysis complete

Final result type: <class 'awkward.highlevel.Array'>

@sam-grant sam-grant marked this pull request as ready for review January 31, 2026 20:48
@sam-grant
Copy link
Collaborator Author

I also fixed the f-string issue. Sorry I meant that to be a separate PR.

Copy link
Collaborator

@sophiemiddleton sophiemiddleton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants