388 lines
14 KiB
Python
388 lines
14 KiB
Python
"""Compatible content models for MariaDB."""
|
|
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from sqlalchemy import Column, String, Boolean, Text, Integer, DateTime, BigInteger, Index, ForeignKey
|
|
from sqlalchemy.orm import relationship
|
|
|
|
from app.core.models.base_compatible import BaseModel
|
|
|
|
|
|
class Content(BaseModel):
|
|
"""Content model compatible with existing MariaDB schema."""
|
|
|
|
__tablename__ = "content"
|
|
|
|
# Basic content information
|
|
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, index=True)
|
|
filename = Column(String(255), nullable=False)
|
|
original_filename = Column(String(255), nullable=False)
|
|
file_path = Column(String(500), nullable=False)
|
|
|
|
# File metadata
|
|
file_size = Column(BigInteger, nullable=False) # bytes
|
|
file_type = Column(String(100), nullable=False)
|
|
mime_type = Column(String(100), nullable=False)
|
|
file_extension = Column(String(10), nullable=False)
|
|
|
|
# Content metadata
|
|
title = Column(String(255), nullable=True)
|
|
description = Column(Text, nullable=True)
|
|
tags = Column(Text, nullable=True) # JSON or comma-separated
|
|
|
|
# Status and visibility
|
|
is_public = Column(Boolean, default=False, nullable=False)
|
|
is_active = Column(Boolean, default=True, nullable=False)
|
|
is_indexed = Column(Boolean, default=False, nullable=False)
|
|
is_converted = Column(Boolean, default=False, nullable=False)
|
|
|
|
# Access and security
|
|
access_password = Column(String(255), nullable=True)
|
|
download_count = Column(Integer, default=0, nullable=False)
|
|
view_count = Column(Integer, default=0, nullable=False)
|
|
|
|
# Processing status
|
|
processing_status = Column(String(50), default="pending", nullable=False)
|
|
processing_error = Column(Text, nullable=True)
|
|
processing_started = Column(DateTime, nullable=True)
|
|
processing_completed = Column(DateTime, nullable=True)
|
|
|
|
# File hashes for integrity
|
|
md5_hash = Column(String(32), nullable=True, index=True)
|
|
sha256_hash = Column(String(64), nullable=True, index=True)
|
|
|
|
# Thumbnails and previews
|
|
thumbnail_path = Column(String(500), nullable=True)
|
|
preview_path = Column(String(500), nullable=True)
|
|
|
|
# TON Blockchain integration
|
|
ton_transaction_hash = Column(String(100), nullable=True, index=True)
|
|
ton_storage_proof = Column(Text, nullable=True)
|
|
ton_storage_fee = Column(BigInteger, default=0, nullable=False) # nanotons
|
|
|
|
# Expiration and cleanup
|
|
expires_at = Column(DateTime, nullable=True)
|
|
auto_delete = Column(Boolean, default=False, nullable=False)
|
|
|
|
# Relationships
|
|
user = relationship("User", back_populates="content")
|
|
|
|
# Table indexes for performance
|
|
__table_args__ = (
|
|
Index('idx_content_user_active', 'user_id', 'is_active'),
|
|
Index('idx_content_public_indexed', 'is_public', 'is_indexed'),
|
|
Index('idx_content_file_type', 'file_type', 'mime_type'),
|
|
Index('idx_content_created', 'created_at'),
|
|
Index('idx_content_size', 'file_size'),
|
|
Index('idx_content_processing', 'processing_status'),
|
|
Index('idx_content_ton_tx', 'ton_transaction_hash'),
|
|
Index('idx_content_expires', 'expires_at', 'auto_delete'),
|
|
)
|
|
|
|
def is_expired(self) -> bool:
|
|
"""Check if content is expired."""
|
|
if not self.expires_at:
|
|
return False
|
|
return datetime.utcnow() > self.expires_at
|
|
|
|
def is_image(self) -> bool:
|
|
"""Check if content is an image."""
|
|
return self.file_type.lower() in ['image', 'img'] or \
|
|
self.mime_type.startswith('image/')
|
|
|
|
def is_video(self) -> bool:
|
|
"""Check if content is a video."""
|
|
return self.file_type.lower() == 'video' or \
|
|
self.mime_type.startswith('video/')
|
|
|
|
def is_document(self) -> bool:
|
|
"""Check if content is a document."""
|
|
return self.file_type.lower() in ['document', 'doc', 'pdf'] or \
|
|
self.mime_type in ['application/pdf', 'application/msword', 'text/plain']
|
|
|
|
def get_file_size_human(self) -> str:
|
|
"""Get human-readable file size."""
|
|
size = self.file_size
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if size < 1024.0:
|
|
return f"{size:.1f} {unit}"
|
|
size /= 1024.0
|
|
return f"{size:.1f} PB"
|
|
|
|
def increment_download_count(self) -> None:
|
|
"""Increment download counter."""
|
|
self.download_count += 1
|
|
|
|
def increment_view_count(self) -> None:
|
|
"""Increment view counter."""
|
|
self.view_count += 1
|
|
|
|
def mark_as_indexed(self) -> None:
|
|
"""Mark content as indexed."""
|
|
self.is_indexed = True
|
|
|
|
def mark_as_converted(self) -> None:
|
|
"""Mark content as converted."""
|
|
self.is_converted = True
|
|
self.processing_status = "completed"
|
|
self.processing_completed = datetime.utcnow()
|
|
|
|
def set_processing_error(self, error: str) -> None:
|
|
"""Set processing error."""
|
|
self.processing_status = "error"
|
|
self.processing_error = error
|
|
self.processing_completed = datetime.utcnow()
|
|
|
|
def start_processing(self) -> None:
|
|
"""Mark processing as started."""
|
|
self.processing_status = "processing"
|
|
self.processing_started = datetime.utcnow()
|
|
self.processing_error = None
|
|
|
|
def get_tags_list(self) -> List[str]:
|
|
"""Get tags as list."""
|
|
if not self.tags:
|
|
return []
|
|
|
|
# Try to parse as JSON first, fallback to comma-separated
|
|
try:
|
|
import json
|
|
return json.loads(self.tags)
|
|
except:
|
|
return [tag.strip() for tag in self.tags.split(',') if tag.strip()]
|
|
|
|
def set_tags_list(self, tags: List[str]) -> None:
|
|
"""Set tags from list."""
|
|
import json
|
|
self.tags = json.dumps(tags) if tags else None
|
|
|
|
def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]:
|
|
"""Convert to dictionary with option to exclude sensitive data."""
|
|
exclude = set()
|
|
if not include_sensitive:
|
|
exclude.update({"access_password", "file_path", "processing_error"})
|
|
|
|
data = super().to_dict(exclude=exclude)
|
|
|
|
# Add computed fields
|
|
data.update({
|
|
"file_size_human": self.get_file_size_human(),
|
|
"is_image": self.is_image(),
|
|
"is_video": self.is_video(),
|
|
"is_document": self.is_document(),
|
|
"is_expired": self.is_expired(),
|
|
"tags_list": self.get_tags_list(),
|
|
})
|
|
|
|
return data
|
|
|
|
def to_public_dict(self) -> Dict[str, Any]:
|
|
"""Convert to public dictionary (minimal content info)."""
|
|
return {
|
|
"id": self.id,
|
|
"filename": self.filename,
|
|
"title": self.title,
|
|
"description": self.description,
|
|
"file_type": self.file_type,
|
|
"file_size": self.file_size,
|
|
"file_size_human": self.get_file_size_human(),
|
|
"is_image": self.is_image(),
|
|
"is_video": self.is_video(),
|
|
"is_document": self.is_document(),
|
|
"download_count": self.download_count,
|
|
"view_count": self.view_count,
|
|
"tags_list": self.get_tags_list(),
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
}
|
|
|
|
|
|
class ContentShare(BaseModel):
|
|
"""Content sharing model for tracking shared content."""
|
|
|
|
__tablename__ = "content_shares"
|
|
|
|
content_id = Column(Integer, ForeignKey('content.id'), nullable=False, index=True)
|
|
user_id = Column(Integer, ForeignKey('users.id'), nullable=True, index=True) # Can be null for anonymous shares
|
|
|
|
# Share metadata
|
|
share_token = Column(String(100), unique=True, nullable=False, index=True)
|
|
share_url = Column(String(500), nullable=False)
|
|
|
|
# Share settings
|
|
is_active = Column(Boolean, default=True, nullable=False)
|
|
is_password_protected = Column(Boolean, default=False, nullable=False)
|
|
share_password = Column(String(255), nullable=True)
|
|
|
|
# Access control
|
|
max_downloads = Column(Integer, nullable=True) # Null = unlimited
|
|
download_count = Column(Integer, default=0, nullable=False)
|
|
view_count = Column(Integer, default=0, nullable=False)
|
|
|
|
# Time limits
|
|
expires_at = Column(DateTime, nullable=True)
|
|
|
|
# Tracking
|
|
ip_address = Column(String(45), nullable=True)
|
|
user_agent = Column(Text, nullable=True)
|
|
|
|
# Relationships
|
|
content = relationship("Content")
|
|
user = relationship("User")
|
|
|
|
__table_args__ = (
|
|
Index('idx_shares_content_active', 'content_id', 'is_active'),
|
|
Index('idx_shares_token', 'share_token'),
|
|
Index('idx_shares_expires', 'expires_at'),
|
|
)
|
|
|
|
def is_expired(self) -> bool:
|
|
"""Check if share is expired."""
|
|
if not self.expires_at:
|
|
return False
|
|
return datetime.utcnow() > self.expires_at
|
|
|
|
def is_download_limit_reached(self) -> bool:
|
|
"""Check if download limit is reached."""
|
|
if not self.max_downloads:
|
|
return False
|
|
return self.download_count >= self.max_downloads
|
|
|
|
def is_valid(self) -> bool:
|
|
"""Check if share is valid."""
|
|
return (self.is_active and
|
|
not self.is_expired() and
|
|
not self.is_download_limit_reached())
|
|
|
|
def increment_download(self) -> bool:
|
|
"""Increment download count and return if still valid."""
|
|
if not self.is_valid():
|
|
return False
|
|
|
|
self.download_count += 1
|
|
return self.is_valid()
|
|
|
|
def increment_view(self) -> None:
|
|
"""Increment view count."""
|
|
self.view_count += 1
|
|
|
|
|
|
class ContentMetadata(BaseModel):
|
|
"""Extended metadata for content files."""
|
|
|
|
__tablename__ = "content_metadata"
|
|
|
|
content_id = Column(Integer, ForeignKey('content.id'), unique=True, nullable=False, index=True)
|
|
|
|
# Image metadata
|
|
image_width = Column(Integer, nullable=True)
|
|
image_height = Column(Integer, nullable=True)
|
|
image_dpi = Column(Integer, nullable=True)
|
|
image_color_space = Column(String(50), nullable=True)
|
|
|
|
# Video metadata
|
|
video_duration = Column(Integer, nullable=True) # seconds
|
|
video_bitrate = Column(Integer, nullable=True)
|
|
video_fps = Column(Integer, nullable=True)
|
|
video_resolution = Column(String(20), nullable=True) # e.g., "1920x1080"
|
|
video_codec = Column(String(50), nullable=True)
|
|
|
|
# Audio metadata
|
|
audio_duration = Column(Integer, nullable=True) # seconds
|
|
audio_bitrate = Column(Integer, nullable=True)
|
|
audio_sample_rate = Column(Integer, nullable=True)
|
|
audio_channels = Column(Integer, nullable=True)
|
|
audio_codec = Column(String(50), nullable=True)
|
|
|
|
# Document metadata
|
|
document_pages = Column(Integer, nullable=True)
|
|
document_words = Column(Integer, nullable=True)
|
|
document_language = Column(String(10), nullable=True)
|
|
document_author = Column(String(255), nullable=True)
|
|
|
|
# EXIF data (JSON)
|
|
exif_data = Column(Text, nullable=True)
|
|
|
|
# GPS coordinates
|
|
gps_latitude = Column(String(50), nullable=True)
|
|
gps_longitude = Column(String(50), nullable=True)
|
|
gps_altitude = Column(String(50), nullable=True)
|
|
|
|
# Technical metadata
|
|
compression_ratio = Column(String(20), nullable=True)
|
|
quality_score = Column(Integer, nullable=True) # 0-100
|
|
|
|
# Relationships
|
|
content = relationship("Content")
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert metadata to dictionary."""
|
|
data = super().to_dict(exclude={"content_id"})
|
|
|
|
# Parse EXIF data if present
|
|
if self.exif_data:
|
|
try:
|
|
import json
|
|
data["exif_data"] = json.loads(self.exif_data)
|
|
except:
|
|
data["exif_data"] = None
|
|
|
|
return data
|
|
|
|
def set_exif_data(self, exif_dict: Dict[str, Any]) -> None:
|
|
"""Set EXIF data from dictionary."""
|
|
if exif_dict:
|
|
import json
|
|
self.exif_data = json.dumps(exif_dict)
|
|
else:
|
|
self.exif_data = None
|
|
|
|
def get_exif_data(self) -> Optional[Dict[str, Any]]:
|
|
"""Get EXIF data as dictionary."""
|
|
if not self.exif_data:
|
|
return None
|
|
|
|
try:
|
|
import json
|
|
return json.loads(self.exif_data)
|
|
except:
|
|
return None
|
|
|
|
|
|
class ContentVersion(BaseModel):
|
|
"""Content version history for tracking changes."""
|
|
|
|
__tablename__ = "content_versions"
|
|
|
|
content_id = Column(Integer, ForeignKey('content.id'), nullable=False, index=True)
|
|
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, index=True)
|
|
|
|
# Version information
|
|
version_number = Column(Integer, nullable=False)
|
|
version_name = Column(String(100), nullable=True)
|
|
change_description = Column(Text, nullable=True)
|
|
|
|
# File information
|
|
file_path = Column(String(500), nullable=False)
|
|
file_size = Column(BigInteger, nullable=False)
|
|
file_hash = Column(String(64), nullable=False)
|
|
|
|
# Status
|
|
is_current = Column(Boolean, default=False, nullable=False)
|
|
|
|
# Relationships
|
|
content = relationship("Content")
|
|
user = relationship("User")
|
|
|
|
__table_args__ = (
|
|
Index('idx_versions_content_number', 'content_id', 'version_number'),
|
|
Index('idx_versions_current', 'content_id', 'is_current'),
|
|
)
|
|
|
|
def mark_as_current(self) -> None:
|
|
"""Mark this version as current."""
|
|
self.is_current = True
|
|
|
|
|
|
# Add relationship to User model
|
|
# This would be added to the User model:
|
|
# content = relationship("Content", back_populates="user") |