创建多进程
import multiprocessing
from applications.create_app import create_app
from applications.models.app.app import AppData
from common.model.base_model import db
from sqlalchemy.orm.attributes import flag_modifiedapp = create_app()def worker(record_id, new_id):with app.app_context():record = db.session.query(AppData).filter(AppData.id == record_id).first()if not record.data.get("updated_ids"):record.data["updated_ids"] = []record.data["updated_ids"].append(new_id)flag_modified(record, "data")db.session.commit()def test_thread():processes = []for i in range(1, 9):p = multiprocessing.Process(target=worker, args=(1, i))processes.append(p)p.start()for p in processes:p.join()with app.app_context():record = db.session.query(AppData).filter(AppData.id == 1).first()print(record.data["updated_ids"])if __name__ == "__main__":test_thread()
创建多线程
def test_():test_thread()def test_thread():thread1 = threading.Thread(target=worker, args=(1, 1))thread2 = threading.Thread(target=worker, args=(1, 2))thread3 = threading.Thread(target=worker, args=(1, 3))thread4 = threading.Thread(target=worker, args=(1, 4))thread5 = threading.Thread(target=worker, args=(1, 5))thread6 = threading.Thread(target=worker, args=(1, 6))thread7 = threading.Thread(target=worker, args=(1, 7))thread8 = threading.Thread(target=worker, args=(1, 8))thread1.start()thread2.start()thread3.start()thread4.start()thread5.start()thread6.start()thread7.start()thread8.start()thread1.join()thread2.join()thread3.join()thread4.join()thread5.join()thread6.join()thread7.join()thread8.join()record = AppData.query.filter(AppData.id == 1).first()print(record.data["updated_ids"]) def worker(record_id, new_id):with db.app.app_context():record = db.session.query(AppData).filter_by(id=record_id).with_for_update().first()if not record.data.get("updated_ids"):record.data["updated_ids"] = []record.data["updated_ids"].append(new_id)flag_modified(record, "data")db.session.commit()