Split up worker and worker logic

Break the worker function into one that ranges over the channel and one
that actually does the work of associating the file with a document if
it is determined to match.
This commit is contained in:
Ian Molee 2024-04-05 02:51:11 -07:00
parent b6de64cde6
commit 03c0840041
1 changed files with 41 additions and 45 deletions

50
main.go
View File

@ -79,7 +79,7 @@ func run(args []string) ([]*Document, error) {
// documents, so we can identify unassociated files later and then // documents, so we can identify unassociated files later and then
// create new documents for them. This needs to be distinct for each // create new documents for them. This needs to be distinct for each
// timestamp, so it's created inside the timestamp loop's scope. // timestamp, so it's created inside the timestamp loop's scope.
associatedFiles sync.Map claimedFiles sync.Map
// We might need to create new documents for files that weren't // We might need to create new documents for files that weren't
// associated with any document at this timestamp, so we need to make // associated with any document at this timestamp, so we need to make
@ -96,7 +96,7 @@ func run(args []string) ([]*Document, error) {
doc: doc, doc: doc,
fileNumbers: fileTimes[timestamp], fileNumbers: fileTimes[timestamp],
timestamp: timestamp, timestamp: timestamp,
associatedFiles: &associatedFiles, claimedFiles: &claimedFiles,
wg: &wg, wg: &wg,
} }
log( log(
@ -115,7 +115,7 @@ func run(args []string) ([]*Document, error) {
// documents for them. // documents for them.
var docsAdded int var docsAdded int
for _, fileNumber := range fileTimes[timestamp] { for _, fileNumber := range fileTimes[timestamp] {
if _, ok := associatedFiles.Load(fileNumber); !ok { if _, ok := claimedFiles.Load(fileNumber); !ok {
dm.AddNewDocument(fileNumber, timestamp) dm.AddNewDocument(fileNumber, timestamp)
docsAdded++ docsAdded++
} }
@ -137,7 +137,7 @@ type WorkItem struct {
doc *Document doc *Document
fileNumbers []int fileNumbers []int
timestamp int timestamp int
associatedFiles *sync.Map claimedFiles *sync.Map
wg *sync.WaitGroup wg *sync.WaitGroup
} }
@ -229,50 +229,46 @@ func (dm *DocumentManager) SortedDocuments() []*Document {
// document against each file and if a match is found, associate the file with the // document against each file and if a match is found, associate the file with the
// document sent in the work item, and record the file as having been matched. // document sent in the work item, and record the file as having been matched.
func (dm *DocumentManager) ComparisonWorker(workerID int) { func (dm *DocumentManager) ComparisonWorker(workerID int) {
defer dm.wg.Done()
for workItem := range dm.WorkCh { for workItem := range dm.WorkCh {
for _, fileNumber := range workItem.fileNumbers { dm.maybeAssociateFileWithDocument(workItem, workerID)
if _, ok := workItem.associatedFiles.Load(fileNumber); ok { }
// This file has already been matched; skip it. }
func (dm *DocumentManager) maybeAssociateFileWithDocument(workItem WorkItem, workerID int) {
defer workItem.wg.Done()
for _, candidateFileNumber := range workItem.fileNumbers {
if _, ok := workItem.claimedFiles.Load(candidateFileNumber); ok {
// This file has already been matched with another document, so skip it.
continue continue
} }
latestFileNumber := workItem.doc.LatestAssociatedFile() latestFileNumber := workItem.doc.LatestAssociatedFile()
similarity, err := dm.compareFiles(latestFileNumber, fileNumber) similarity, err := dm.compareFiles(latestFileNumber, candidateFileNumber)
if err != nil { if err != nil {
// Simplistic error handling: log the error and continue. // Simplistic error handling: log the error and continue.
slog.Error( slog.Error(
"error comparing files", "error comparing files",
"file1", latestFileNumber, "latestAssociatedFile", latestFileNumber,
"file2", fileNumber, "candidateFile", candidateFileNumber,
"document", workItem.doc.ID, "document", workItem.doc.ID,
"worker", workerID, "worker", workerID,
) )
} }
// If current file doesn't match current document, skip to the next file. // If current file matches current document, record it and exit.
if similarity < dm.similarityThreshold { if similarity >= dm.similarityThreshold {
continue workItem.doc.AssociateFile(candidateFileNumber, workItem.timestamp)
} workItem.claimedFiles.Store(candidateFileNumber, struct{}{})
// Current file matches current document, so record this.
workItem.doc.AssociateFile(fileNumber, workItem.timestamp)
workItem.associatedFiles.Store(fileNumber, struct{}{})
log( log(
"match found", "match found",
"document", workItem.doc.ID, "document", workItem.doc.ID,
"file", fileNumber, "file", candidateFileNumber,
"time", workItem.timestamp, "time", workItem.timestamp,
"worker", workerID, "worker", workerID,
) )
return
// We don't need to consider this document anymore since we've found
// a match. End processing and wait for more work.
break
} }
workItem.wg.Done()
} }
// Report that this worker is shutting down.
dm.wg.Done()
} }
// compareFiles computes how much two files overlap, on a scale // compareFiles computes how much two files overlap, on a scale