Nexus3.X_私服迁移
需求
# 原私服地址:http://17.31.142.75:8081/
http://17.31.142.75:8081/repository/maven-releases/
http://17.31.142.75:8081/repository/maven-snapshots/
admin
04251qaz
# 迁移企业网地址:http://192.168.1.17:8082/
http://192.168.1.17:8082/repository/maven-releases/
http://192.168.1.17:8082/repository/maven-snapshots/
admin
hbriz2023
迁移方案(使用脚本从阿里云私服下载到本地再上传到企业私服)
# maven-snapshots 为例
## 使用脚本从阿里云私服下载到本地
cd /home/aliyun-maven
# python3 down_nexus_3.X.py
# 实时监控文件数量:watch -n 5 "find /home/aliyun-maven/maven-snapshots -type f | wc -l" #18767
============================================================
下载完成!
============================================================
总计组件: 3042
总计文件: 18767
成功下载: 18767
跳过(已存在): 0
下载失败: 0
保存目录: /home/aliyun-maven/maven-snapshots
## 从本地上传到企业私服
# python3 upload_nexus_3.X.py
扫描文件...
找到 18767 个需要上传的文件
上传进度: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 18767/18767 [31:30<00:00, 9.92it/s, link.sdk.cp-1.0.0-20 ✓]
上传完成!
成功: 18767
失败: 0
# maven-releases 同理
down_nexus_3.X.py
# cat down_nexus_3.X.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用 Nexus 3 REST API 下载所有 artifacts - 兼容Python 3.5及以下版本
优化版本:增加重试机制、进度显示、错误处理
"""
import os
import sys
import time
import json
import requests
from requests.auth import HTTPBasicAuth
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def setup_requests_session(username, password):
"""创建带重试机制的会话"""
session = requests.Session()
session.auth = HTTPBasicAuth(username, password)
# 配置重试机制
retry_strategy = Retry(
total=5, # 总重试次数
backoff_factor=1, # 重试等待时间:1, 2, 4, 8, 16秒
status_forcelist=[429, 500, 502, 503, 504], # 遇到这些状态码重试
allowed_methods=["GET", "HEAD"] # 只对GET和HEAD请求重试
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def download_from_nexus3():
base_url = "http://17.31.142.75:8081"
repository = "maven-snapshots"
local_dir = "/home/aliyun-maven/maven-snapshots"
username = "admin"
password = "04251qaz"
# 创建会话
session = setup_requests_session(username, password)
# 第一步:获取所有组件
print("正在获取组件列表...")
components = []
continuation_token = None
page = 1
while True:
url = "{}/service/rest/v1/components".format(base_url)
params = {"repository": repository}
if continuation_token:
params["continuationToken"] = continuation_token
print("正在获取第 {} 页数据...".format(page))
try:
response = session.get(url, params=params, timeout=30)
if response.status_code != 200:
print("获取组件失败: HTTP {}".format(response.status_code))
print("响应内容:", response.text[:200])
# 尝试获取错误信息
try:
error_data = response.json()
print("错误详情:", json.dumps(error_data, ensure_ascii=False))
except:
pass
# 如果是认证失败,直接退出
if response.status_code in [401, 403]:
print("认证失败,请检查用户名和密码")
sys.exit(1)
break
try:
data = response.json()
except json.JSONDecodeError as e:
print("JSON解析失败:", e)
print("响应内容:", response.text[:200])
break
items = data.get("items", [])
components.extend(items)
print("本页找到 {} 个组件,累计 {} 个".format(len(items), len(components)))
continuation_token = data.get("continuationToken")
if not continuation_token:
print("所有数据获取完成")
break
page += 1
except requests.exceptions.Timeout:
print("连接超时,重试中...")
time.sleep(5)
continue
except requests.exceptions.ConnectionError:
print("连接错误,请检查网络连接")
sys.exit(1)
except Exception as e:
print("获取数据时发生未知错误:", str(e))
break
if not components:
print("未找到任何组件,请检查仓库名称和权限")
return
print("总共找到 {} 个组件".format(len(components)))
# 第二步:下载每个组件的所有 assets
total_assets = 0
downloaded_assets = 0
skipped_assets = 0
failed_assets = 0
# 首先统计所有 assets 数量
for component in components:
assets = component.get("assets", [])
total_assets += len(assets)
print("需要下载 {} 个文件".format(total_assets))
# 记录失败的文件,便于重试
failed_files = []
for i, component in enumerate(components, 1):
group = component.get("group", "")
name = component.get("name", "")
version = component.get("version", "")
print("\n[{}/{}] 处理组件: {}:{}:{}".format(
i, len(components), group, name, version
))
# 获取组件的所有 assets
assets = component.get("assets", [])
for j, asset in enumerate(assets, 1):
download_url = asset.get("downloadUrl")
path = asset.get("path")
checksum = asset.get("checksum", {})
if not download_url or not path:
print(" [{}/{}] 无效的asset: URL或路径为空".format(j, len(assets)))
failed_assets += 1
continue
local_path = os.path.join(local_dir, path)
local_dir_path = os.path.dirname(local_path)
# 创建目录
if not os.path.exists(local_dir_path):
try:
os.makedirs(local_dir_path, exist_ok=True)
except OSError as e:
print(" [{}/{}] 创建目录失败: {}".format(j, len(assets), str(e)))
failed_files.append((download_url, local_path, str(e)))
failed_assets += 1
continue
# 检查文件是否已存在(可选:添加大小校验)
if os.path.exists(local_path):
file_size = os.path.getsize(local_path)
remote_md5 = checksum.get("md5")
# 如果有MD5校验,可以验证文件完整性
if remote_md5:
try:
import hashlib
with open(local_path, 'rb') as f:
local_md5 = hashlib.md5(f.read()).hexdigest()
if local_md5 == remote_md5:
print(" [{}/{}] 文件已存在且校验通过: {} ({} bytes)".format(
j, len(assets), path, file_size))
downloaded_assets += 1
else:
print(" [{}/{}] 文件存在但校验失败,重新下载: {}".format(
j, len(assets), path))
os.remove(local_path) # 删除损坏的文件
# 继续下载
except:
print(" [{}/{}] 文件已存在: {} ({} bytes)".format(
j, len(assets), path, file_size))
downloaded_assets += 1
else:
print(" [{}/{}] 文件已存在: {} ({} bytes)".format(
j, len(assets), path, file_size))
downloaded_assets += 1
continue
# 下载文件
print(" [{}/{}] 下载: {}".format(j, len(assets), path))
try:
response = session.get(download_url, stream=True, timeout=60)
if response.status_code == 200:
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 验证下载大小
actual_size = os.path.getsize(local_path)
if total_size > 0 and actual_size != total_size:
print(" [{}/{}] 警告: 文件大小不匹配 (期望: {}, 实际: {})".format(
j, len(assets), total_size, actual_size))
print(" [{}/{}] 下载成功: {} ({} bytes)".format(
j, len(assets), path, actual_size))
downloaded_assets += 1
elif response.status_code == 404:
print(" [{}/{}] 文件不存在: {}".format(j, len(assets), path))
skipped_assets += 1
else:
print(" [{}/{}] 下载失败: HTTP {}".format(
j, len(assets), response.status_code))
failed_files.append((download_url, local_path,
"HTTP {}".format(response.status_code)))
failed_assets += 1
except requests.exceptions.Timeout:
print(" [{}/{}] 下载超时: {}".format(j, len(assets), path))
failed_files.append((download_url, local_path, "Timeout"))
failed_assets += 1
except Exception as e:
print(" [{}/{}] 下载异常: {} - {}".format(
j, len(assets), path, str(e)))
failed_files.append((download_url, local_path, str(e)))
failed_assets += 1
# 添加小延迟,避免对服务器造成过大压力
time.sleep(0.1)
# 打印总结
print("\n" + "="*60)
print("下载完成!")
print("="*60)
print("总计组件: {}".format(len(components)))
print("总计文件: {}".format(total_assets))
print("成功下载: {}".format(downloaded_assets))
print("跳过(已存在): {}".format(skipped_assets))
print("下载失败: {}".format(failed_assets))
print("保存目录: {}".format(local_dir))
# 如果有失败的文件,保存到日志文件
if failed_files:
log_file = os.path.join(local_dir, "download_failures.log")
with open(log_file, 'w', encoding='utf-8') as f:
f.write("失败文件列表:\n")
for url, path, error in failed_files:
f.write("URL: {}\n".format(url))
f.write("路径: {}\n".format(path))
f.write("错误: {}\n".format(error))
f.write("-" * 50 + "\n")
print("\n失败文件列表已保存到: {}".format(log_file))
print("您可以使用此文件进行重试")
if __name__ == "__main__":
try:
download_from_nexus3()
except KeyboardInterrupt:
print("\n\n用户中断,程序退出")
sys.exit(0)
except Exception as e:
print("程序执行出错:", str(e))
sys.exit(1)
upload_nexus_3.X.py
# cat upload_nexus_3.X.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
增强版:带进度显示和断点续传功能
"""
import os
import sys
import json
import requests
from tqdm import tqdm
def upload_with_progress():
local_dir = "/home/aliyun-maven/maven-snapshots"
nexus_url = "http://192.168.1.17:8082"
repository = "maven-snapshots"
username = "admin"
password = "hbriz2023"
# 状态文件路径(用于断点续传)
status_file = os.path.join(local_dir, ".upload_status.json")
# 加载之前的进度
uploaded_files = set()
if os.path.exists(status_file):
try:
with open(status_file, 'r') as f:
data = json.load(f)
uploaded_files = set(data.get('uploaded', []))
print("检测到上次的进度,已上传 {} 个文件".format(len(uploaded_files)))
except:
pass
auth = (username, password)
# 收集所有文件
print("扫描文件...")
all_files = []
for root, dirs, files in os.walk(local_dir):
for file in files:
# 跳过状态文件
if file == ".upload_status.json":
continue
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, local_dir)
# 检查是否已上传
if rel_path in uploaded_files:
continue
all_files.append({
'path': file_path,
'rel_path': rel_path,
'size': os.path.getsize(file_path)
})
print("找到 {} 个需要上传的文件".format(len(all_files)))
if not all_files:
print("没有需要上传的文件")
return
# 上传文件
uploaded = []
failed = []
with tqdm(total=len(all_files), desc="上传进度") as pbar:
for file_info in all_files:
file_path = file_info['path']
rel_path = file_info['rel_path']
file_size = file_info['size']
upload_url = "{}/repository/{}/{}".format(nexus_url, repository, rel_path)
try:
with open(file_path, 'rb') as f:
response = requests.put(upload_url, auth=auth, data=f, timeout=180)
if response.status_code in [200, 201]:
uploaded.append(rel_path)
pbar.set_postfix_str("{} ✓".format(os.path.basename(file_path)[:20]))
else:
failed.append(rel_path)
pbar.set_postfix_str("{} ✗".format(os.path.basename(file_path)[:20]))
except Exception as e:
failed.append(rel_path)
pbar.set_postfix_str("{} ✗".format(os.path.basename(file_path)[:20]))
pbar.update(1)
# 保存进度
if uploaded:
with open(status_file, 'w') as f:
json.dump({'uploaded': uploaded}, f)
# 清理状态文件
if os.path.exists(status_file):
os.remove(status_file)
print("\n上传完成!")
print("成功: {}".format(len(uploaded)))
print("失败: {}".format(len(failed)))
if failed:
print("\n失败文件列表:")
for f in failed[:10]: # 只显示前10个
print(" - {}".format(f))
if len(failed) > 10:
print(" ... 还有 {} 个失败文件".format(len(failed) - 10))
if __name__ == "__main__":
upload_with_progress()
递归下载xml, .md5, .sha1文件脚本
cat down_nexus_3.X_xml_md5_sha1.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用 Nexus 3 REST API 下载所有 artifacts - 完整版本
下载所有目录中的 maven-metadata.xml, .md5, .sha1 文件
"""
import os
import sys
import time
import json
import requests
from requests.auth import HTTPBasicAuth
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def setup_requests_session(username, password):
"""创建带重试机制的会话"""
session = requests.Session()
session.auth = HTTPBasicAuth(username, password)
# 配置重试机制
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "HEAD"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def download_file(session, url, local_path):
"""下载单个文件"""
try:
response = session.get(url, stream=True, timeout=30)
if response.status_code == 200:
# 创建目录
local_dir = os.path.dirname(local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
# 下载文件
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True, f"成功下载 ({os.path.getsize(local_path)} bytes)"
elif response.status_code == 404:
return False, "文件不存在 (404)"
else:
return False, f"HTTP {response.status_code}"
except requests.exceptions.Timeout:
return False, "超时"
except Exception as e:
return False, str(e)
def find_metadata_urls_from_components(components, base_url, repository):
"""从组件数据中提取所有可能的metadata文件URL"""
metadata_urls = set() # 使用set避免重复
for component in components:
group = component.get("group", "")
name = component.get("name", "")
version = component.get("version", "")
path = component.get("assets", [{}])[0].get("path", "") if component.get("assets") else ""
if not all([group, name]):
continue
# 项目级别的metadata
project_dir = group.replace('.', '/') + '/' + name
project_metadata_url = f"{base_url}/repository/{repository}/{project_dir}/maven-metadata.xml"
metadata_urls.add(project_metadata_url)
# 如果组件有版本信息,尝试版本目录下的metadata
if version:
version_dir = f"{project_dir}/{version}"
version_metadata_url = f"{base_url}/repository/{repository}/{version_dir}/maven-metadata.xml"
metadata_urls.add(version_metadata_url)
# 尝试从path中提取更多可能的metadata路径
if path:
# 尝试找到路径中的所有可能目录
path_parts = path.split('/')
for i in range(len(path_parts) - 1):
# 构建目录路径
dir_path = '/'.join(path_parts[:i+1])
if dir_path.endswith('/'):
dir_path = dir_path[:-1]
# 检查是否可能是包含metadata的目录
# 排除文件路径,只保留目录
if not path_parts[i].endswith(('.jar', '.pom', '.war', '.ear', '.zip', '.tar.gz', '.tgz')):
metadata_url = f"{base_url}/repository/{repository}/{dir_path}/maven-metadata.xml"
metadata_urls.add(metadata_url)
return list(metadata_urls)
def crawl_directory_for_metadata(session, base_url, repository, start_dir=""):
"""递归爬取仓库目录结构,查找所有可能的metadata文件"""
metadata_urls = set()
urls_to_check = [f"{base_url}/repository/{repository}/{start_dir}"]
print(f"开始爬取目录结构...")
while urls_to_check:
current_url = urls_to_check.pop(0)
try:
# 尝试列出目录内容
response = session.get(current_url, timeout=30)
if response.status_code == 200:
# 检查是否是HTML页面(Nexus的目录浏览页面)
if 'text/html' in response.headers.get('content-type', ''):
# 这里可以解析HTML来获取子目录
# 简化处理:基于已知的目录结构猜测
pass
# 检查这个目录下是否有maven-metadata.xml
metadata_url = current_url.rstrip('/') + '/maven-metadata.xml'
metadata_urls.add(metadata_url)
except:
continue
# 避免无限循环,限制深度
if len(current_url.split('/')) > 15:
continue
return list(metadata_urls)
def find_all_metadata_files(session, base_url, repository):
"""查找仓库中所有可能的metadata文件"""
metadata_files = []
# 方法1: 通过搜索API查找
print("方法1: 通过搜索API查找metadata文件...")
search_url = f"{base_url}/service/rest/v1/search"
# 搜索maven-metadata.xml文件
params = {
"repository": repository,
"q": "maven-metadata.xml"
}
try:
continuation_token = None
while True:
if continuation_token:
params["continuationToken"] = continuation_token
response = session.get(search_url, params=params, timeout=30)
if response.status_code == 200:
data = response.json()
for item in data.get("items", []):
download_url = item.get("downloadUrl", "")
path = item.get("path", "")
if download_url and path:
metadata_files.append((download_url, path))
continuation_token = data.get("continuationToken")
if not continuation_token:
break
else:
print(f"搜索失败: HTTP {response.status_code}")
break
except Exception as e:
print(f"搜索API出错: {e}")
# 方法2: 如果搜索API找不到,尝试直接访问已知路径
if not metadata_files:
print("方法2: 尝试直接访问已知路径...")
# 获取所有组件
components = []
continuation_token = None
try:
while True:
url = f"{base_url}/service/rest/v1/components"
params = {"repository": repository}
if continuation_token:
params["continuationToken"] = continuation_token
response = session.get(url, params=params, timeout=30)
if response.status_code == 200:
data = response.json()
components.extend(data.get("items", []))
continuation_token = data.get("continuationToken")
if not continuation_token:
break
else:
break
except:
pass
# 从组件中提取可能的metadata路径
if components:
print(f"从 {len(components)} 个组件中提取metadata路径...")
metadata_urls = find_metadata_urls_from_components(components, base_url, repository)
for url in metadata_urls:
path = url.replace(f"{base_url}/repository/{repository}/", "")
metadata_files.append((url, path))
return metadata_files
def download_metadata_files(session, metadata_files, local_dir):
"""下载所有找到的metadata文件"""
downloaded_files = 0
skipped_files = 0
failed_files = []
print(f"\n开始下载 {len(metadata_files)} 个metadata文件...")
print("="*60)
for i, (url, path) in enumerate(metadata_files, 1):
local_path = os.path.join(local_dir, path)
# 检查文件是否已存在
if os.path.exists(local_path):
file_size = os.path.getsize(local_path)
print(f"[{i}/{len(metadata_files)}] 文件已存在: {path} ({file_size} bytes)")
skipped_files += 1
continue
print(f"[{i}/{len(metadata_files)}] 下载: {path}")
# 下载主文件
success, message = download_file(session, url, local_path)
if success:
downloaded_files += 1
print(f" ✓ {message}")
# 尝试下载对应的md5和sha1文件
for ext in ['.md5', '.sha1']:
ext_url = url + ext
ext_path = local_path + ext
if not os.path.exists(ext_path):
ext_success, ext_message = download_file(session, ext_url, ext_path)
if ext_success:
print(f" ✓ 下载{ext}成功")
else:
print(f" ⚠ 下载{ext}失败: {ext_message}")
else:
failed_files.append((url, local_path, message))
print(f" ✗ 失败: {message}")
# 小延迟,避免对服务器造成压力
time.sleep(0.1)
return downloaded_files, skipped_files, failed_files
def download_all_metadata():
"""主函数:下载所有metadata文件"""
base_url = "http://1.1.142.75:8081"
repository = "maven-snapshots"
local_dir = "/home/aliyun-maven-3/maven-snapshots"
username = "admin"
password = "Hs2wsx"
# 创建会话
session = setup_requests_session(username, password)
print("="*60)
print("Nexus 3 Metadata文件下载工具")
print("="*60)
print(f"服务器: {base_url}")
print(f"仓库: {repository}")
print(f"本地目录: {local_dir}")
print("="*60)
# 查找所有metadata文件
metadata_files = find_all_metadata_files(session, base_url, repository)
if not metadata_files:
print("未找到任何metadata文件")
return
print(f"找到 {len(metadata_files)} 个可能的metadata文件")
# 去重处理
unique_files = {}
for url, path in metadata_files:
# 按路径去重
unique_files[path] = (url, path)
metadata_files = list(unique_files.values())
print(f"去重后剩余 {len(metadata_files)} 个文件")
# 下载文件
downloaded_files, skipped_files, failed_files = download_metadata_files(
session, metadata_files, local_dir
)
# 打印总结
print("\n" + "="*60)
print("下载完成!")
print("="*60)
print(f"找到文件: {len(metadata_files)}")
print(f"成功下载: {downloaded_files}")
print(f"跳过(已存在): {skipped_files}")
print(f"下载失败: {len(failed_files)}")
if failed_files:
print(f"\n失败文件详情 (前10个):")
for url, path, error in failed_files[:10]:
print(f" 文件: {os.path.basename(path)}")
print(f" 错误: {error}")
if len(failed_files) > 10:
print(f" ... 还有 {len(failed_files) - 10} 个失败项")
# 保存失败记录到文件
log_file = os.path.join(local_dir, "metadata_download_failures.log")
with open(log_file, 'w', encoding='utf-8') as f:
f.write("metadata文件下载失败列表:\n")
f.write("="*60 + "\n")
for url, path, error in failed_files:
f.write(f"文件: {path}\n")
f.write(f"URL: {url}\n")
f.write(f"错误: {error}\n")
f.write("-" * 50 + "\n")
print(f"\n详细失败记录已保存到: {log_file}")
if __name__ == "__main__":
try:
download_all_metadata()
except KeyboardInterrupt:
print("\n\n用户中断,程序退出")
sys.exit(0)
except Exception as e:
print("程序执行出错:", str(e))
import traceback
traceback.print_exc()
sys.exit(1)
递归上传xml, .md5, .sha1文件脚本
cat upload_nexux_3.X_xml_md5_sha1.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
上传 maven-metadata.xml, .md5, .sha1 文件到 Nexus 3
"""
import os
import sys
import hashlib
import requests
from requests.auth import HTTPBasicAuth
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def setup_requests_session(username, password):
"""创建带重试机制的会话"""
session = requests.Session()
session.auth = HTTPBasicAuth(username, password)
# 配置重试机制
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["PUT", "POST", "GET", "HEAD"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def calculate_md5(file_path):
"""计算文件的MD5值"""
try:
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
except Exception as e:
print(f" 计算MD5失败: {str(e)}")
return None
def calculate_sha1(file_path):
"""计算文件的SHA1值"""
try:
with open(file_path, 'rb') as f:
return hashlib.sha1(f.read()).hexdigest()
except Exception as e:
print(f" 计算SHA1失败: {str(e)}")
return None
def upload_file(session, url, file_path):
"""上传单个文件到Nexus"""
try:
# 检查文件是否存在
if not os.path.exists(file_path):
return False, f"文件不存在: {file_path}"
# 读取文件内容
with open(file_path, 'rb') as f:
file_content = f.read()
# 设置请求头
headers = {
'Content-Type': 'application/octet-stream',
}
# 如果是校验文件,添加相应的Content-Type
if file_path.endswith('.md5'):
headers['Content-Type'] = 'text/plain'
elif file_path.endswith('.sha1'):
headers['Content-Type'] = 'text/plain'
# 发送PUT请求上传文件
response = session.put(url, data=file_content, headers=headers, timeout=60)
if response.status_code in [200, 201, 204]:
return True, f"上传成功 (HTTP {response.status_code})"
else:
return False, f"上传失败 (HTTP {response.status_code}): {response.text[:200]}"
except requests.exceptions.Timeout:
return False, "连接超时"
except Exception as e:
return False, f"上传异常: {str(e)}"
def find_metadata_files(base_dir):
"""在指定目录及其子目录中查找所有maven-metadata.xml文件"""
metadata_files = []
print(f"扫描目录: {base_dir}")
for root, dirs, files in os.walk(base_dir):
for file in files:
if file == "maven-metadata.xml":
metadata_path = os.path.join(root, file)
metadata_files.append(metadata_path)
return metadata_files
def verify_checksum_files(metadata_path):
"""验证对应的.md5和.sha1文件是否存在且有效"""
checksums_ok = True
messages = []
# 检查MD5文件
md5_path = metadata_path + '.md5'
if os.path.exists(md5_path):
# 读取存储的MD5值
try:
with open(md5_path, 'r') as f:
stored_md5 = f.read().strip()
# 计算实际的MD5值
actual_md5 = calculate_md5(metadata_path)
if actual_md5 and actual_md5 == stored_md5:
messages.append("MD5校验通过")
else:
messages.append(f"MD5校验失败 (存储: {stored_md5}, 实际: {actual_md5})")
checksums_ok = False
except Exception as e:
messages.append(f"读取MD5文件失败: {str(e)}")
checksums_ok = False
else:
messages.append("MD5文件不存在")
checksums_ok = False
# 检查SHA1文件
sha1_path = metadata_path + '.sha1'
if os.path.exists(sha1_path):
# 读取存储的SHA1值
try:
with open(sha1_path, 'r') as f:
stored_sha1 = f.read().strip()
# 计算实际的SHA1值
actual_sha1 = calculate_sha1(metadata_path)
if actual_sha1 and actual_sha1 == stored_sha1:
messages.append("SHA1校验通过")
else:
messages.append(f"SHA1校验失败 (存储: {stored_sha1}, 实际: {actual_sha1})")
checksums_ok = False
except Exception as e:
messages.append(f"读取SHA1文件失败: {str(e)}")
checksums_ok = False
else:
messages.append("SHA1文件不存在")
checksums_ok = False
return checksums_ok, messages
def generate_checksum_files(metadata_path):
"""为maven-metadata.xml生成.md5和.sha1文件"""
generated = []
# 生成MD5文件
md5 = calculate_md5(metadata_path)
if md5:
md5_path = metadata_path + '.md5'
with open(md5_path, 'w') as f:
f.write(md5)
generated.append('.md5')
# 生成SHA1文件
sha1 = calculate_sha1(metadata_path)
if sha1:
sha1_path = metadata_path + '.sha1'
with open(sha1_path, 'w') as f:
f.write(sha1)
generated.append('.sha1')
return generated
def upload_metadata_files():
"""主函数:上传所有metadata文件"""
base_dir = "/home/aliyun-maven-3/maven-snapshots"
nexus_base_url = "http://192.168.1.17:8082"
repository = "maven-snapshots"
username = "admin"
password = "hs23"
print("="*60)
print("Nexus 3 Metadata 文件上传工具")
print("="*60)
print(f"本地目录: {base_dir}")
print(f"Nexus地址: {nexus_base_url}")
print(f"仓库名称: {repository}")
print("="*60)
# 创建会话
session = setup_requests_session(username, password)
# 查找所有metadata文件
print("\n正在查找maven-metadata.xml文件...")
metadata_files = find_metadata_files(base_dir)
if not metadata_files:
print("未找到任何maven-metadata.xml文件")
return
print(f"找到 {len(metadata_files)} 个maven-metadata.xml文件")
# 询问是否自动生成校验文件
print("\n是否要为缺失的校验文件(.md5/.sha1)自动生成?")
print("1. 是 - 自动生成缺失的校验文件")
print("2. 否 - 只上传已存在的文件")
print("3. 生成并覆盖所有校验文件")
choice = input("请选择 (1/2/3, 默认2): ").strip()
if choice == '1':
auto_generate = 'missing'
print("将自动生成缺失的校验文件")
elif choice == '3':
auto_generate = 'all'
print("将重新生成所有校验文件")
else:
auto_generate = 'none'
print("只上传已存在的文件")
# 统计信息
total_files = 0
uploaded_files = 0
skipped_files = 0
failed_files = []
print("\n开始上传文件...")
print("="*60)
for i, metadata_path in enumerate(metadata_files, 1):
# 计算相对路径
rel_path = os.path.relpath(metadata_path, base_dir)
dir_path = os.path.dirname(rel_path)
print(f"\n[{i}/{len(metadata_files)}] 处理: {rel_path}")
# 验证校验文件
checksums_ok, messages = verify_checksum_files(metadata_path)
if not checksums_ok and auto_generate in ['missing', 'all']:
print(" 校验文件存在问题,重新生成...")
generated = generate_checksum_files(metadata_path)
if generated:
print(f" 已生成: {', '.join(generated)}")
# 重新验证
checksums_ok, messages = verify_checksum_files(metadata_path)
# 显示校验结果
for msg in messages:
print(f" {msg}")
# 准备上传的三个文件
files_to_upload = [
(metadata_path, 'maven-metadata.xml'),
(metadata_path + '.md5', 'maven-metadata.xml.md5'),
(metadata_path + '.sha1', 'maven-metadata.xml.sha1')
]
# 上传每个文件
for local_path, filename in files_to_upload:
total_files += 1
# 检查文件是否存在
if not os.path.exists(local_path):
if filename == 'maven-metadata.xml':
print(f" 错误: 主文件不存在!")
failed_files.append((local_path, "文件不存在"))
continue
else:
# 对于校验文件,如果不存在则跳过(除非用户选择生成)
print(f" ⚠ 跳过: {filename} 不存在")
skipped_files += 1
continue
# 构建上传URL
# Nexus上传URL格式: /repository/{repository}/{path/to/file}
upload_url = f"{nexus_base_url}/repository/{repository}/{dir_path}/{filename}"
print(f" 上传: {filename}")
# 上传文件
success, message = upload_file(session, upload_url, local_path)
if success:
uploaded_files += 1
print(f" ✓ {message}")
else:
failed_files.append((local_path, message))
print(f" ✗ {message}")
# 添加小延迟,避免对服务器造成过大压力
import time
time.sleep(0.2)
# 打印总结
print("\n" + "="*60)
print("上传完成!")
print("="*60)
print(f"处理metadata文件: {len(metadata_files)}")
print(f"总文件数: {total_files}")
print(f"成功上传: {uploaded_files}")
print(f"跳过: {skipped_files}")
print(f"失败: {len(failed_files)}")
if failed_files:
print(f"\n失败文件详情:")
for file_path, error in failed_files[:10]: # 只显示前10个失败项
print(f" {os.path.basename(file_path)}: {error}")
if len(failed_files) > 10:
print(f" ... 还有 {len(failed_files) - 10} 个失败项")
# 保存失败记录到文件
log_file = os.path.join(base_dir, "metadata_upload_failures.log")
with open(log_file, 'w', encoding='utf-8') as f:
f.write("metadata文件上传失败列表:\n")
f.write("="*60 + "\n")
for file_path, error in failed_files:
f.write(f"文件: {os.path.basename(file_path)}\n")
f.write(f"路径: {file_path}\n")
f.write(f"错误: {error}\n")
f.write("-" * 50 + "\n")
print(f"\n详细失败记录已保存到: {log_file}")
print("\n提示:")
print("1. 上传的文件可以在Nexus Web界面中查看")
print("2. 使用浏览器访问: http://192.168.1.17:8082")
print("3. 登录后选择 'Browse' -> 'maven-releases' 仓库查看文件")
def main():
"""主程序入口"""
print("Nexus 3 Metadata文件上传工具")
print("="*40)
print("功能:")
print("1. 扫描本地目录中的maven-metadata.xml文件")
print("2. 验证/生成校验文件(.md5/.sha1)")
print("3. 批量上传到Nexus 3仓库")
print("="*40)
try:
upload_metadata_files()
except KeyboardInterrupt:
print("\n\n用户中断,程序退出")
sys.exit(0)
except Exception as e:
print(f"\n程序执行出错: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 悩姜!


