There are two ways we can harvest EADs from an ArchivesSpace instance: we can pull them from your server, or you can push them to our server.
Option 1: We pull
The CAO runs a script on our server to connect to your ArchivesSpace instance and download your EADs to our server. We will need 2 things in order to accomplish this:
- We will need to have login credentials to your system
- You will need to let outside traffic into your ArchivesSpace backend (https://yourASpace.org:8089 – for example).
So, to get started.
- Let us know you’re interested.
- Send us your ASpace backend URL – by default, it’s the URL to your staff interface with port:8089.
- You need to create a basic user for us on your ArchivesSpace instance and let us know the credentials
- You need to let our machine past your firewall (we’ll give you our IP). This site may help with Linux machines. https://www.tecmint.com/open-port-for-specific-ip-address-in-firewalld/
What will happen?
Every day we will scan your ASpace instance to see if any resource records have changed. If they have, your system will export an EAD and we will add the file to your data directory on CAO. We use ArchivesSnake to accomplish this. Info on ArchivesSnake is here: https://github.com/archivesspace-labs/ArchivesSnake
NOTE: Only changes to the top level resource will be seen as changes. So if you change a component in a resource, you will need to save the top level resource if you want it to be exported.
Why bother?
It’s a “set it and forget it” option.
Option 2: You Push
If you can’t or don’t want to give us login credentials to your ArchivesSpace instance, you can instead export and upload the EADs from your server to our server using Webdav.
One way to do this is to use the ArchivesSnake and webdavclient3 python modules.
Installation of ArchivesSnake can be found here: https://github.com/archivesspace-labs/ArchivesSnake. It’s pretty straight forward.
You then will need to run a python script on your server to accomplish the “push.” Below is a sample script you can use (based off of the University of Albany’s script), just replace the fakeUsername, mainAgencyCode, and fakePasswords.
Sample Script
# -*- coding: utf-8 -*-
import os
import sys
import dacs
import time
import csv
import shutil
from git import Repo
from datetime import datetime
from subprocess import Popen, PIPE, STDOUT
import asnake.logging as logging
from asnake.client import ASnakeClient
#setting up connection to WCSU webdav
from webdav3.client import Client
options = {
'webdav_hostname': "http://archives.library.wcsu.edu/webdav/arclight/data/ead/mainAgencyCode/",
'webdav_login': "yourMainAgencyCode",
'webdav_password': "fakePassword"
}
webDavClient = Client(options)
webDavClient.verify = False # To not check SSL certificates (Default = True)
print (str(datetime.now()) + " Exporting Records from ArchivesSpace")
print ("\tConnecting to ArchivesSpace")
client = ASnakeClient(baseurl="http://localhost:8089",
username="fakeUsername",
password="fakePassword")
client.authorize()
logging.setup_logging(stream=sys.stdout, level='INFO')
#repo = ASpace().repositories(3)
__location__ = os.path.dirname(os.path.realpath(__file__))
lastExportTime = time.time()
try:
timePath = os.path.join(__location__, "lastExport.txt")
with open(timePath, 'r') as timeFile:
startTime = int(timeFile.read().replace('\n', ''))
timeFile.close()
except:
startTime = 1592193600
humanTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%d %H:%M:%S')
print ("\tChecking for collections updated since " + humanTime)
output_path = "/home/haponiks/archivesSnakeExport/eads"
staticData = os.path.join(output_path, "staticData")
#read existing exported collection data
collectionData = []
#collectionFile = open(os.path.join(staticData, "collections.csv"), "r", encoding='utf-8')
#for line in csv.reader(collectionFile, delimiter="|"):
# collectionData.append(line)
#collectionFile.close()
#read existing exported subject data
subjectData = []
#subjectFile = open(os.path.join(staticData, "subjects.csv"), "r", encoding='utf-8')
#for line in csv.reader(subjectFile, delimiter="|"):
# subjectData.append(line)
#subjectFile.close
print ("\tQuerying ArchivesSpace...")
modifiedList = client.get("repositories/3/resources?all_ids=true&modified_since=" + str(startTime)).json()
if len(modifiedList) > 0:
print ("\tFound " + str(len(modifiedList)) + " new records!")
print ("\tArchivesSpace URIs: " + str(modifiedList))
else:
print ("\tFound no new records.")
for colID in modifiedList:
collection = client.get("repositories/3/resources/" + str(colID)).json()
if collection["publish"] != True:
print ("\t\tSkipping " + collection["title"] + " because it is unpublished")
else:
print ("\t\tExporting " + collection["title"] + " " + "(" + collection["id_0"] + ")")
try:
normalName = collection["finding_aid_title"]
except:
print ("\t\tError: incorrect Finding Aid Title (sort title)")
normalName = collection["finding_aid_title"]
#DACS notes/fields to check before exporting
dacsNotes = ["ead_id", "abstract",
#"acqinfo",
"bioghist",
"scopecontent", "arrangement", "creator"]
checkDACS = {}
for dacsNote in dacsNotes:
checkDACS[dacsNote] = False
checkAccessRestrict = False
abstract = ""
accessRestrict = ""
if "ead_id" in collection.keys():
checkDACS["ead_id"] = True
for note in collection["notes"]:
if "type" in note.keys():
if note["type"] == "abstract":
checkDACS["abstract"] = True
abstract = note["content"][0].replace("\n", " ")
if note["type"] == "accessrestrict":
checkAccessRestrict = True
for subnote in note["subnotes"]:
accessRestrict = " " + subnote["content"].replace("\n", " ")
accessRestrict = accessRestrict.strip()
if note["type"] == "acqinfo":
checkDACS["acqinfo"] = True
if note["type"] == "bioghist":
checkDACS["bioghist"] = True
if note["type"] == "scopecontent":
checkDACS["scopecontent"] = True
if note["type"] == "arrangement":
checkDACS["arrangement"] = True
for agent in collection["linked_agents"]:
if agent["role"] == "creator":
checkDACS["creator"] = True
checkExport = all(value == True for value in checkDACS.values())
if checkDACS["abstract"] != True:
print ("\t\tFailed to update browse pages: Collection has no abstract.")
print ("\t\tFailed to export collection: Collection has no abstract.")
else:
date = ""
for dateData in collection["dates"]:
if "expression" in dateData.keys():
date = dateData["expression"]
else:
if "end" in dateData.keys():
normalDate = dateData["begin"] + "/" + dateData["end"]
else:
normalDate = dateData["begin"]
date = dacs.iso2DACS(normalDate)
extent = ""
for extentData in collection["extents"]:
extent = extentData["number"] + " " + extentData["extent_type"]
ID = collection["id_0"].lower().strip()
eadID = collection["ead_id"].strip()
checkCollection = False
if checkCollection == False:
collectionData.append([ID, checkExport, normalName, date, extent, abstract, collection["restrictions"], accessRestrict])
for subjectRef in collection["subjects"]:
subject = client.get(subjectRef["ref"]).json()
if subject["source"] == "meg":
if subject["terms"][0]["term_type"] == "topical":
checkSubject = False
for existingSubject in subjectData:
if existingSubject[0] == subject["title"]:
if not ID in existingSubject:
existingSubject.append(ID)
checkSubject = True
if checkSubject == False:
subjectData.append([subject["title"], subjectRef["ref"], ID])
if checkExport != True:
print ("\t\tFailed to export collection: ")
for checkNote in checkDACS.keys():
if checkDACS[checkNote] == False:
print ("\t\t\t" + checkNote + " is missing")
else:
#sorting collection
eadDir = output_path
if not os.path.isdir(eadDir):
os.mkdir(eadDir)
resourceID = collection["uri"].split("/resources/")[1]
print ("\t\t\tExporting EAD to " + eadID+".xml")
eadResponse = client.get("repositories/3/resource_descriptions/" + resourceID + ".xml?numbered_cs=true&include_daos=true")
eadFile = os.path.join(eadDir, eadID + ".xml")
f = open(eadFile, 'w', encoding='utf-8')
f.write(eadResponse.text)
f.close()
print ("\t\t\tSuccess!")
#uploading to WCSU webdav
remote_path= os.path.join("/" + eadID + ".xml")
webDavClient.upload_sync(remote_path=remote_path, local_path=eadFile)
print ("\tWriting static data back to files.")
#write new collection data back to file
collectionFile = open(os.path.join(staticData, "collections.csv"), "w", newline='', encoding='utf-8')
writer = csv.writer(collectionFile, delimiter='|')
writer.writerows(collectionData)
collectionFile.close()
#write new subjects data back to file
subjectFile = open(os.path.join(staticData, "subjects.csv"), "w", newline='', encoding='utf-8')
writer = csv.writer(subjectFile, delimiter='|')
writer.writerows(subjectData)
subjectFile.close()
endTimeHuman = datetime.utcfromtimestamp(lastExportTime).strftime('%Y-%m-%d %H:%M:%S')
print ("\tFinished! Last Export time is " + endTimeHuman)
timePath = os.path.join(__location__, "lastExport.txt")
with open(timePath, 'w') as timeFile:
timeFile.write(str(lastExportTime).split(".")[0])
timeFile.close()