Direct harvest from an ArchivesSpace instance

There are two ways we can harvest EADs from an ArchivesSpace instance: we can pull them from your server, or you can push them to our server.

Option 1: We pull

The CAO runs a script on our server to connect to your ArchivesSpace instance and download your EADs to our server. We will need 2 things in order to accomplish this:

  1. We will need to have login credentials to your system
  2. You will need to let outside traffic into your ArchivesSpace backend (https://yourASpace.org:8089 – for example).

So, to get started.

  1. Let us know you’re interested.
  2. Send us your ASpace backend URL – by default, it’s the URL to your staff interface with port:8089.
  3. You need to create a basic user for us on your ArchivesSpace instance and let us know the credentials
  4. You need to let our machine past your firewall (we’ll give you our IP). This site may help with Linux machines. https://www.tecmint.com/open-port-for-specific-ip-address-in-firewalld/

What will happen?

Every day we will scan your ASpace instance to see if any resource records have changed. If they have, your system will export an EAD and we will add the file to your data directory on CAO. We use ArchivesSnake to accomplish this.  Info on ArchivesSnake is here: https://github.com/archivesspace-labs/ArchivesSnake

NOTE: Only changes to the top level resource will be seen as changes. So if you change a component in a resource, you will need to save the top level resource if you want it to be exported.

Why bother?

It’s a “set it and forget it” option.

Option 2: You Push

If you can’t or don’t want to give us login credentials to your ArchivesSpace instance, you can instead export and upload the EADs from your server to our server using Webdav.

One way to do this is to use the ArchivesSnake and webdavclient3 python modules.

Installation of ArchivesSnake can be found here: https://github.com/archivesspace-labs/ArchivesSnake. It’s pretty straight forward.

You then will need to run a python script on your server to accomplish the “push.” Below is a sample script you can use (based off of the University of Albany’s script), just replace the fakeUsername, mainAgencyCode, and fakePasswords.

Sample Script

# -*- coding: utf-8 -*-
import os
import sys
import dacs
import time
import csv
import shutil
from git import Repo
from datetime import datetime
from subprocess import Popen, PIPE, STDOUT
import asnake.logging as logging
from asnake.client import ASnakeClient

#setting up connection to WCSU webdav
from webdav3.client import Client
options = {
 'webdav_hostname': "http://archives.library.wcsu.edu/webdav/arclight/data/ead/mainAgencyCode/",
 'webdav_login':    "yourMainAgencyCode",
 'webdav_password': "fakePassword"
}
webDavClient = Client(options)
webDavClient.verify = False # To not check SSL certificates (Default = True)


print (str(datetime.now()) + " Exporting Records from ArchivesSpace")

print ("\tConnecting to ArchivesSpace")

client = ASnakeClient(baseurl="http://localhost:8089",
                      username="fakeUsername",
                      password="fakePassword")
client.authorize()
logging.setup_logging(stream=sys.stdout, level='INFO')

#repo = ASpace().repositories(3)

__location__ = os.path.dirname(os.path.realpath(__file__))

lastExportTime = time.time()
try:
    timePath = os.path.join(__location__, "lastExport.txt")
    with open(timePath, 'r') as timeFile:
        startTime = int(timeFile.read().replace('\n', ''))
        timeFile.close()
except:
    startTime = 1592193600

humanTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%d %H:%M:%S')
print ("\tChecking for collections updated since " + humanTime)
    
output_path = "/home/haponiks/archivesSnakeExport/eads"
staticData = os.path.join(output_path, "staticData")

#read existing exported collection data
collectionData = []
#collectionFile = open(os.path.join(staticData, "collections.csv"), "r", encoding='utf-8')
#for line in csv.reader(collectionFile, delimiter="|"):
#    collectionData.append(line)
#collectionFile.close()

#read existing exported subject data
subjectData = []
#subjectFile = open(os.path.join(staticData, "subjects.csv"), "r", encoding='utf-8')
#for line in csv.reader(subjectFile, delimiter="|"):
#    subjectData.append(line)
#subjectFile.close

print ("\tQuerying ArchivesSpace...")
modifiedList = client.get("repositories/3/resources?all_ids=true&modified_since=" + str(startTime)).json()
if len(modifiedList) > 0:
    print ("\tFound " + str(len(modifiedList)) + " new records!")
    print ("\tArchivesSpace URIs: " + str(modifiedList))
else:
    print ("\tFound no new records.")
for colID in modifiedList:
    collection = client.get("repositories/3/resources/" + str(colID)).json()
    if collection["publish"] != True: 
        print ("\t\tSkipping " + collection["title"] + " because it is unpublished")
    else:
        print ("\t\tExporting " + collection["title"] + " " + "(" + collection["id_0"] + ")")
    
        try:
            normalName = collection["finding_aid_title"]
        except:
            print ("\t\tError: incorrect Finding Aid Title (sort title)")
            normalName = collection["finding_aid_title"]
        
        #DACS notes/fields to check before exporting
        dacsNotes = ["ead_id", "abstract", 
#"acqinfo", 
"bioghist", 
"scopecontent", "arrangement", "creator"]
        checkDACS = {}
        for dacsNote in dacsNotes:
            checkDACS[dacsNote] = False
        checkAccessRestrict = False
        abstract = ""
        accessRestrict = ""
        
        if "ead_id" in collection.keys():
            checkDACS["ead_id"] = True
            
        for note in collection["notes"]:
            if "type" in note.keys():
                if note["type"] == "abstract":
                    checkDACS["abstract"] = True
                    abstract = note["content"][0].replace("\n", "
")
                if note["type"] == "accessrestrict":
                    checkAccessRestrict = True
                    for subnote in note["subnotes"]:
                        accessRestrict = "
" + subnote["content"].replace("\n", "
")
                    accessRestrict = accessRestrict.strip()
                if note["type"] == "acqinfo":
                    checkDACS["acqinfo"] = True
                if note["type"] == "bioghist":
                    checkDACS["bioghist"] = True
                if note["type"] == "scopecontent":
                    checkDACS["scopecontent"] = True
                if note["type"] == "arrangement":
                    checkDACS["arrangement"] = True
                    
                    
        for agent in collection["linked_agents"]:
            if agent["role"] == "creator":
                checkDACS["creator"] = True
        
        checkExport = all(value == True for value in checkDACS.values())
        if checkDACS["abstract"] != True:
            print ("\t\tFailed to update browse pages: Collection has no abstract.")
            print ("\t\tFailed to export collection: Collection has no abstract.")
        else:
            date = ""
            for dateData in collection["dates"]:
                if "expression" in dateData.keys():
                    date = dateData["expression"]
                else:
                    if "end" in dateData.keys():
                        normalDate = dateData["begin"] + "/" + dateData["end"]
                    else:
                        normalDate = dateData["begin"]
                    date = dacs.iso2DACS(normalDate)
            extent = ""
            for extentData in collection["extents"]:
                extent = extentData["number"] + " " + extentData["extent_type"]

            ID = collection["id_0"].lower().strip()
            eadID = collection["ead_id"].strip()
            checkCollection = False
            if checkCollection == False:
                collectionData.append([ID, checkExport, normalName, date, extent, abstract, collection["restrictions"], accessRestrict])

            for subjectRef in collection["subjects"]:
                subject = client.get(subjectRef["ref"]).json()
                if subject["source"] == "meg":
                    if subject["terms"][0]["term_type"] == "topical":
                        checkSubject = False
                        for existingSubject in subjectData:
                            if existingSubject[0] == subject["title"]:
                                if not ID in existingSubject:
                                    existingSubject.append(ID)
                                checkSubject = True
                        if checkSubject == False:
                            subjectData.append([subject["title"], subjectRef["ref"], ID])    
            if checkExport != True:
                print ("\t\tFailed to export collection: ")
                for checkNote in checkDACS.keys():
                    if checkDACS[checkNote] == False:
                        print ("\t\t\t" + checkNote + " is missing")
            else:

                #sorting collection
                eadDir = output_path
                if not os.path.isdir(eadDir):
                    os.mkdir(eadDir)            
            
                resourceID = collection["uri"].split("/resources/")[1]
                print ("\t\t\tExporting EAD to " + eadID+".xml")
                eadResponse = client.get("repositories/3/resource_descriptions/" + resourceID + ".xml?numbered_cs=true&include_daos=true")
                eadFile = os.path.join(eadDir, eadID + ".xml")
                f = open(eadFile, 'w', encoding='utf-8')
                f.write(eadResponse.text)
                f.close()
                print ("\t\t\tSuccess!")

		#uploading to WCSU webdav
                remote_path= os.path.join("/" + eadID + ".xml")
                webDavClient.upload_sync(remote_path=remote_path, local_path=eadFile)

print ("\tWriting static data back to files.")
#write new collection data back to file
collectionFile = open(os.path.join(staticData, "collections.csv"), "w", newline='', encoding='utf-8')
writer = csv.writer(collectionFile, delimiter='|')
writer.writerows(collectionData)
collectionFile.close()

#write new subjects data back to file
subjectFile = open(os.path.join(staticData, "subjects.csv"), "w", newline='', encoding='utf-8')
writer = csv.writer(subjectFile, delimiter='|')
writer.writerows(subjectData)
subjectFile.close()

endTimeHuman = datetime.utcfromtimestamp(lastExportTime).strftime('%Y-%m-%d %H:%M:%S')
print ("\tFinished! Last Export time is " + endTimeHuman)
timePath = os.path.join(__location__, "lastExport.txt")
with open(timePath, 'w') as timeFile:
    timeFile.write(str(lastExportTime).split(".")[0])
    timeFile.close()